Add context save, restore and continue functions

Modularize the context functionality
This commit is contained in:
Harendra Kumar 2017-06-11 02:56:22 +05:30
parent a39705bb0d
commit 4adbddf74c
5 changed files with 105 additions and 80 deletions

View File

@ -20,7 +20,7 @@ cabal-version: >= 1.10
library
hs-source-dirs: src
exposed-modules: Duct
, Duct.Event
, Duct.Context
, Duct.Log
, Duct.AsyncT
, Duct.Threads
@ -33,6 +33,7 @@ library
, containers >= 0.5.6
, time >= 1.5
, transformers >= 0.4.2
, ghc-prim
-- libraries not bundled with GHC
, lifted-base

View File

@ -14,7 +14,6 @@ module Duct.AsyncT
, waitAsync
, (<**)
, onNothing
, RemoteStatus (..)
, waitForChildren
, dbg
)
@ -39,21 +38,13 @@ import Data.IORef (IORef, newIORef, readIORef,
writeIORef)
import Data.List (delete)
import Data.Maybe (isJust, isNothing)
import Unsafe.Coerce (unsafeCoerce)
import Duct.Event
import Duct.Context
-- import Debug.Trace
newtype AsyncT m a = AsyncT { runAsyncT :: StateT Context m (Maybe a) }
------------------------------------------------------------------------------
-- Remote data types
------------------------------------------------------------------------------
data RemoteStatus = WasRemote | WasParallel | NoRemote
deriving (Typeable, Eq, Show)
------------------------------------------------------------------------------
-- Utilities
------------------------------------------------------------------------------
@ -96,12 +87,10 @@ instance Monad m => Alternative (AsyncT m) where
empty = AsyncT $ return Nothing
(<|>) x y = AsyncT $ do
mx <- runAsyncT x
was <- getData `onNothing` return NoRemote
if was == WasRemote
then return Nothing
else case mx of
Nothing -> runAsyncT y
justx -> return justx
loc <- getLocation
case loc of
RemoteNode -> return Nothing
_ -> maybe (runAsyncT y) (return . Just) mx
-- | A synonym of 'empty' that can be used in a monadic expression. It stops
-- the computation, which allows the next computation in an 'Alternative'
@ -113,38 +102,15 @@ stop = empty
-- Monad
------------------------------------------------------------------------------
-- | Total variant of `tail` that returns an empty list when given an empty list.
tailsafe :: [a] -> [a]
tailsafe [] = []
tailsafe (_:xs) = xs
instance Monad m => Monad (AsyncT m) where
return = pure
m >>= f = AsyncT $ do
-- Save the 'm' and 'f', in case we migrate to a new thread before this
-- bind operation completes, we run the operation manually in the new
-- thread.
modify $ \Context { fstack = fs, .. } ->
Context { currentm = unsafeCoerce m
, fstack = unsafeCoerce f : fs
, .. }
return = pure
-- Inner bind operations in 'm' add their 'f' to fstack. If we migrate to
-- a new thread somewhere in the middle we unwind the fstack and run these
-- functions manually after migration.
mres <- runAsyncT m
let res = case mres of
Nothing -> empty
Just x -> f x
modify $ \Context { fstack = fs, .. } ->
Context { currentm = unsafeCoerce res
, fstack = tailsafe fs
, .. }
runAsyncT res
m >>= f = AsyncT $ do
saveContext m f
runAsyncT m >>= restoreContext >>= runAsyncT
instance Monad m => MonadPlus (AsyncT m) where
mzero = empty

View File

@ -1,9 +1,16 @@
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RecordWildCards #-}
module Duct.Event
module Duct.Context
( Context(..)
, initContext
, saveContext
, restoreContext
, continueContext
, Location(..)
, getLocation
, setLocation
, getData
, setData
, modifyData
@ -11,33 +18,39 @@ module Duct.Event
)
where
import Control.Applicative (Alternative(..))
import Control.Concurrent (ThreadId)
import Control.Concurrent.STM (TChan)
import Control.Monad.State (MonadState, gets, modify)
import Control.Monad.State (MonadState, gets, modify, StateT, MonadPlus)
import qualified Control.Monad.Trans.State.Lazy as Lazy (get, gets, modify, put)
import Data.Dynamic (TypeRep, Typeable, typeOf)
import Data.IORef (IORef)
import qualified Data.Map as M
import Unsafe.Coerce (unsafeCoerce)
import GHC.Prim (Any)
------------------------------------------------------------------------------
-- State of a continuation
------------------------------------------------------------------------------
type SData = ()
-- | Describes the context of a computation.
data Context = forall m a b. Context
{ event :: Maybe SData -- untyped, XXX rename to mailbox - can we do
data Context = Context
{ event :: Maybe () -- untyped, XXX rename to mailbox - can we do
-- away with this?
-- ^ event data to use in a continuation in a new thread
-- the 'm' and 'f' in an 'm >>= f' operation of the monad
-- In nested binds we store the current m only, but the whole stack of fs
, currentm :: m a
, fstack :: [a -> m b]
, currentm :: Any -- untyped, the real type is: m a
, fstack :: [Any] -- untyped, the real type is: [a -> m b]
-- ^ List of continuations
, mfData :: M.Map TypeRep SData
-- log only when there is a restore or if we are teleporting
-- We can use a HasLog constraint to statically enable/disable logging
-- , journal :: Maybe Log
, location :: Location
, mfData :: M.Map TypeRep Any -- untyped, type coerced
-- ^ State data accessed with get or put operations
-- XXX All of the following can be removed
@ -92,15 +105,68 @@ initContext
-> IORef Int
-> Context
initContext x childChan pending credit =
Context { event = mempty
, currentm = x
Context { event = mempty
, currentm = unsafeCoerce x
, fstack = []
, location = Local
, mfData = mempty
, parentChannel = Nothing
, childChannel = childChan
, pendingThreads = pending
, threadCredit = credit }
------------------------------------------------------------------------------
-- Where is the computation running?
------------------------------------------------------------------------------
data Location = Local | ForkedThread | RemoteNode
deriving (Typeable, Eq, Show)
getLocation :: Monad m => StateT Context m Location
getLocation = Lazy.gets location
setLocation :: Monad m => Location -> StateT Context m ()
setLocation loc = Lazy.modify $ \Context { .. } -> Context { location = loc, .. }
------------------------------------------------------------------------------
-- Saving and restoring the context of a computation
------------------------------------------------------------------------------
-- Save the 'm' and 'f', in case we migrate to a new thread before the current
-- bind operation completes, we run the operation manually in the new thread
-- using this context.
saveContext :: Monad m => f a -> (a -> f b) -> StateT Context m ()
saveContext m f =
Lazy.modify $ \Context { fstack = fs, .. } ->
Context { currentm = unsafeCoerce m
, fstack = unsafeCoerce f : fs
, .. }
-- pop the top function from the continuation stack, create the next closure,
-- set it as the current closure and return it.
restoreContext :: (Alternative f, Monad m) => Maybe a -> StateT Context m (f b)
restoreContext x = do
-- XXX fstack must be non-empty when this is called.
ctx@Context { fstack = f:fs } <- Lazy.get
let mres = case x of
Nothing -> empty
Just y -> (unsafeCoerce f) y
Lazy.put ctx { currentm = unsafeCoerce mres, fstack = fs }
return mres
continueContext :: MonadPlus m => Context -> m a
continueContext Context { currentm = m, fstack = fs } =
unsafeCoerce m >>= runfStack (unsafeCoerce fs)
where
-- runfStack :: Monad m => [a -> AsyncT m a] -> a -> AsyncT m b
runfStack [] _ = empty
runfStack (f:ff) x = f x >>= runfStack ff
------------------------------------------------------------------------------
-- * Extensible State: Session Data Management
------------------------------------------------------------------------------

View File

@ -12,7 +12,7 @@ module Duct.Threads
)
where
import Control.Applicative ((<|>), empty)
import Control.Applicative ((<|>))
import Control.Concurrent (ThreadId, forkIO, killThread,
myThreadId, threadDelay)
import Control.Concurrent.STM (TChan, atomically, newTChan,
@ -35,7 +35,7 @@ import Data.Maybe (fromJust)
import Unsafe.Coerce (unsafeCoerce)
import Duct.AsyncT
import Duct.Event
import Duct.Context
------------------------------------------------------------------------------
-- Pick up from where we left in the previous thread
@ -45,15 +45,8 @@ import Duct.Event
-- to a new thread. Run the stack of pending functions in fstack. The types
-- don't match. We just coerce the types here, we know that they actually match.
continue :: (MonadIO m, Monad (AsyncT m)) => Context -> StateT Context m (Maybe a)
continue Context { currentm = x, fstack = fs } =
runAsyncT $ unsafeCoerce x >>= runfStack (unsafeCoerce fs)
where
runfStack :: Monad m => [a -> AsyncT m a] -> a -> AsyncT m b
runfStack [] _ = empty
runfStack (f:fs) x = f x >>= runfStack fs
continue :: Monad m => Context -> StateT Context m (Maybe a)
continue = runAsyncT . continueContext
------------------------------------------------------------------------------
-- Thread Management (creation, reaping and killing)
@ -244,8 +237,8 @@ parallel ioaction = AsyncT $ do
-- in this thread or a new thread.
Nothing -> do
lift $ genAsyncEvents cont ioaction
was <- getData `onNothing` return NoRemote
when (was /= WasRemote) $ setData WasParallel
loc <- getLocation
when (loc /= RemoteNode) $ setLocation ForkedThread
return Nothing
-- | An task stream generator that produces an infinite stream of tasks by
@ -273,10 +266,10 @@ async io = do
-- computation. Note that in Applicatives it might result in an undesired
-- serialization.
sync :: MonadIO m => AsyncT m a -> AsyncT m a
sync x = do
setData WasRemote
r <- x
delData WasRemote
sync x = AsyncT $ do
setLocation RemoteNode
r <- runAsyncT x
setLocation Local
return r
-- | An task stream generator that produces an infinite stream of tasks by
@ -320,8 +313,8 @@ react setHandler iob = AsyncT $ do
lift $ setHandler $ \dat ->do
runStateT (continue cont) cont{event= Just $ unsafeCoerce dat}
liftIO iob
was <- getData `onNothing` return NoRemote
when (was /= WasRemote) $ setData WasParallel
loc <- getLocation
when (loc /= RemoteNode) $ setLocation ForkedThread
return Nothing
j@(Just _) -> do

View File

@ -6,23 +6,22 @@ import Control.Concurrent.STM
import Control.Monad.State
import Data.IORef
import Duct.Event
import Duct.Context
f :: StateT EventF IO String
f :: StateT Context IO String
f = do
setData "x"
Just x <- getData
return x
runEvent :: forall m a. (Alternative m, MonadIO m) => StateT EventF m a -> m a
runEvent :: forall m a. (Alternative m, MonadIO m) => StateT Context m a -> m a
runEvent t = do
zombieChan <- liftIO $ atomically newTChan
pendingRef <- liftIO $ newIORef []
credit <- liftIO $ newIORef maxBound
th <- liftIO $ myThreadId
(r, _) <- runStateT t $ initEventF
(empty :: m a) th zombieChan pendingRef credit
(r, _) <- runStateT t $ initContext
(empty :: m a) zombieChan pendingRef credit
return r
main = do