Add logging APIs

This commit is contained in:
Harendra Kumar 2017-06-28 21:40:09 +05:30
parent 3a988778f6
commit bf5a11889e
7 changed files with 168 additions and 49 deletions

View File

@ -10,11 +10,17 @@
module Strands
( wait
, waitLogged
, gather
, gatherLogged
, async
, sample
, threads
, logged
, suspend
, Log
)
where
import Strands.Context (Log, Loggable)
import Strands.Threads

View File

@ -81,12 +81,8 @@ instance Monad m => Alternative (AsyncT m) where
instance Monad m => Monad (AsyncT m) where
return = pure
-- Inner bind-operations in 'm' add their 'f' to fstack. If we migrate the
-- context to a new thread, somewhere in the middle, we unwind the fstack
-- and run these functions when we resume the context after migration.
m >>= f = AsyncT $ do
saveContext m f
runAsyncT m >>= restoreContext >>= runAsyncT
saveContext m f >> runAsyncT m >>= restoreContext >>= runAsyncT
instance Monad m => MonadPlus (AsyncT m) where
mzero = empty

View File

@ -1,9 +1,14 @@
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RecordWildCards #-}
module Strands.Context
( ChildEvent(..)
, Loggable
, Log (..)
, CtxLog (..)
, LogEntry (..)
, Context(..)
, initContext
, saveContext
@ -28,11 +33,40 @@ import Unsafe.Coerce (unsafeCoerce)
import GHC.Prim (Any)
------------------------------------------------------------------------------
-- State of a continuation
-- Log types
------------------------------------------------------------------------------
-- Logging is done using the 'logged' combinator. It remembers the last value
-- returned by a given computation and replays it the next time the computation
-- is resumed. However this could be problematic if we do not annotate all
-- impure computations. The program can take a different path due to a
-- non-logged computation returning a different value. In that case we may
-- replay a wrong value. To detect this we can use a unique id for each logging
-- site and abort if the id does not match on replay.
-- | Constraint type synonym for a value that can be logged.
type Loggable a = (Show a, Read a)
data LogEntry =
Executing -- we are inside this computation, not yet done
| Result (Maybe String) -- computation done, we have the result to replay
deriving (Read, Show)
data Log = Log [LogEntry] deriving Show
-- log entries and replay entries
data CtxLog = CtxLog [LogEntry] [LogEntry] deriving (Read, Show)
------------------------------------------------------------------------------
-- Parent child thread communication types
------------------------------------------------------------------------------
data ChildEvent a = ChildDone ThreadId (Maybe SomeException)
------------------------------------------------------------------------------
-- State threaded around the monad
------------------------------------------------------------------------------
-- | Describes the context of a computation.
data Context = Context
{
@ -53,9 +87,12 @@ data Context = Context
-- computation has nested binds we need to store the next 'f' for each
-- layer of nesting.
-- log only when there is a restore or if we are teleporting
-- We can use a HasLog constraint to statically enable/disable logging
-- , journal :: Maybe Log
-- Logging is done by the 'logged' primitive in this journal only if logsRef
-- exists.
, journal :: CtxLog
-- When we suspend we save the logs in this IORef and exit.
, logsRef :: Maybe (IORef [Log])
, location :: Location
---------------------------------------------------------------------------
@ -116,11 +153,14 @@ initContext
-> IORef [ThreadId]
-> IORef Int
-> (b -> m a)
-> Maybe (IORef [Log])
-> Context
initContext x childChan pending credit finalizer =
initContext x childChan pending credit finalizer lref =
Context { mailBox = Nothing
, currentm = unsafeCoerce x
, fstack = [unsafeCoerce finalizer]
, journal = CtxLog [] []
, logsRef = lref
, location = Worker
-- , mfData = mempty
, childChannel = unsafeCoerce childChan
@ -169,14 +209,10 @@ restoreContext :: (MonadPlus m1, Monad m) => Maybe a -> StateT Context m (m1 b)
restoreContext x = do
-- XXX fstack must be non-empty when this is called.
ctx@Context { fstack = f:fs } <- Lazy.get
let mnext = maybe mzero (unsafeCoerce f) x
Lazy.put ctx { currentm = unsafeCoerce mnext, fstack = fs }
return mnext
let mres = case x of
Nothing -> mzero
Just y -> (unsafeCoerce f) y
Lazy.put ctx { currentm = unsafeCoerce mres, fstack = fs }
return mres
-- | We can retrieve a context at any point and resume that context at some
-- later point, upon resumption we start executing again from the same point

View File

@ -1,25 +0,0 @@
{-# LANGUAGE ConstraintKinds #-}
module Strands.Log
( Loggable
, LogEntry (..)
, Replaying
, Log (..)
)
where
------------------------------------------------------------------------------
-- Log types
------------------------------------------------------------------------------
-- | Constraint type synonym for a value that can be logged.
type Loggable a = (Show a, Read a)
data LogEntry =
Executing -- we are inside this computation, not yet done
| Maybe String -- computation is done and we have the result to replay
deriving (Read, Show)
type Replaying = Bool
data Log = Log Replaying [LogEntry] deriving (Read, Show)

View File

@ -12,6 +12,11 @@ module Strands.Threads
, wait
, gather
, logged
, suspend
, waitLogged
, gatherLogged
)
where
@ -27,7 +32,7 @@ import qualified Control.Exception.Lifted as EL
import Control.Monad.Catch (MonadCatch, MonadThrow, throwM,
try)
import Control.Monad.IO.Class (MonadIO (..))
import Control.Monad.State (get, gets, modify, mzero,
import Control.Monad.State (get, gets, modify, mzero, put,
runStateT, when, StateT)
import Control.Monad.Trans.Class (MonadTrans (lift))
import Control.Monad.Trans.Control (MonadBaseControl, liftBaseWith)
@ -496,14 +501,14 @@ noTrans x = AsyncT $ x >>= return . Just
-- | Run an 'AsyncT m' computation and collect the results generated by each
-- thread of the computation in a list.
waitAsync :: forall m a. (MonadIO m, MonadCatch m)
=> (a -> AsyncT m a) -> AsyncT m a -> m ()
waitAsync finalizer m = do
=> (a -> AsyncT m a) -> Maybe (IORef [Log]) -> AsyncT m a -> m ()
waitAsync finalizer lref m = do
childChan <- liftIO $ atomically newTChan
pendingRef <- liftIO $ newIORef []
credit <- liftIO $ newIORef maxBound
let ctx = initContext (empty :: AsyncT m a) childChan pendingRef credit
finalizer
finalizer lref
r <- try $ runStateT (runAsyncT $ m >>= finalizer) ctx
@ -532,11 +537,101 @@ gather :: forall m a. (MonadIO m, MonadCatch m)
=> AsyncT m a -> m [a]
gather m = do
resultsRef <- liftIO $ newIORef []
waitAsync (gatherResult resultsRef) m
waitAsync (gatherResult resultsRef) Nothing m
liftIO $ readIORef resultsRef
-- | Run an 'AsyncT m' computation, wait for it to finish and discard the
-- results.
wait :: forall m a. (MonadIO m, MonadCatch m)
=> AsyncT m a -> m ()
wait m = waitAsync (const mzero) m
wait m = waitAsync (const mzero) Nothing m
------------------------------------------------------------------------------
-- Logging
------------------------------------------------------------------------------
logged :: (Loggable a, MonadIO m) => AsyncT m a -> AsyncT m a
logged m = AsyncT $ do
ctx <- get
case logsRef ctx of
Nothing -> runAsyncT m
Just _ -> withLog ctx m
where
withLog ctx action =
case journal ctx of
-- no replay
j@(CtxLog _ []) -> do
preLog ctx j
x <- runAsyncT action
postLog ctx j x
return x
-- replaying the log
CtxLog ls (r:rs) -> do
let j = CtxLog ls rs
preLog ctx j
case r of
Executing -> do
x <- runAsyncT action
postLog ctx (CtxLog ls []) x
return x
Result val -> do
let x = fmap read val
postLog ctx j x
return x
-- add an Executing entry at the head of the log
preLog = appendLog Executing
postLog ctx j x = appendLog (Result (fmap show x)) ctx j
appendLog l ctx (CtxLog ls rs) = put $ ctx {journal = CtxLog (l : ls) rs}
-- XXX enable or disable the suspension points by sending an exception to the
-- computation.
--
-- | Suspend works as an exit point with current thread state (partially done)
-- saved. The computation exits only if all the threads in the computation hit
-- the suspend point or exit normally. The threads which hit the suspension and
-- were suspended can be re-started by replaying the log. If there are threads
-- which did not suspend or did not exit, the computation will not exit. For
-- completed threads output is returned and for suspended threads the logs to
-- resume them are returned.
suspend :: MonadIO m => AsyncT m ()
suspend = logged $ AsyncT $ do
ctx <- get
case logsRef ctx of
Nothing -> return $ Just ()
Just ref ->
case journal ctx of
CtxLog ls [] -> do
-- replace the "Executing" entry at the head of the log
-- with a "()" so that we do not enter suspend on replay
liftIO $ atomicModifyIORef ref $ \logs ->
(Log (logResult (Just ()) : tail ls) : logs, ())
return Nothing
_ -> error "Bug: replay inside suspend"
where logResult x = Result (fmap show x)
-- | Run an 'AsyncT m' computation with logging enabled and collect the results
-- or logs generated by each thread of the computation.
gatherLogged :: forall m a. (MonadIO m, MonadCatch m)
=> AsyncT m a -> m ([a], [Log])
gatherLogged m = do
resultsRef <- liftIO $ newIORef []
lref <- liftIO $ newIORef []
waitAsync (gatherResult resultsRef) (Just lref) m
res <- liftIO $ readIORef resultsRef
logs <- liftIO $ readIORef lref
return (res, logs)
-- | Run an 'AsyncT m' computation with logging enabled, wait for it to finish
-- and discard the results. If the computation suspends collect the logs to
-- replay later.
waitLogged :: forall m a. (MonadIO m, MonadCatch m)
=> AsyncT m a -> m [Log]
waitLogged m = do
lref <- liftIO $ newIORef []
waitAsync (const mzero) (Just lref) m
logs <- liftIO $ readIORef lref
return logs

View File

@ -21,7 +21,6 @@ library
hs-source-dirs: src
exposed-modules: Strands
, Strands.Context
, Strands.Log
, Strands.AsyncT
, Strands.Threads

12
test/logged.hs Normal file
View File

@ -0,0 +1,12 @@
import Strands
import Control.Monad.IO.Class (liftIO)
main = do
xs <- waitLogged $ logged $ threads 0 $ do
r <- logged $ return (5::Int)
logged $ liftIO $ print ("A",r)
suspend
logged $ liftIO $ print ("B",r)
suspend
liftIO $ print ("C",r)
putStrLn $ show xs