Tidy Fixed.Internal a bit.

This commit is contained in:
Michael Walker 2014-12-27 22:22:49 +00:00
parent 220589f68c
commit 34ac178ca7

View File

@ -1,20 +1,28 @@
{-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE RankNTypes #-} {-# LANGUAGE RankNTypes #-}
-- | Concurrent monads with a fixed scheduler. -- | Concurrent monads with a fixed scheduler: internal types and
-- functions.
module Control.Monad.Conc.Fixed.Internal where module Control.Monad.Conc.Fixed.Internal where
import Control.Applicative ((<$>)) import Control.Monad (liftM, mapAndUnzipM)
import Control.Monad.Cont (Cont, runCont) import Control.Monad.Cont (Cont, runCont)
import Data.Map (Map) import Data.Map (Map)
import Data.Maybe (catMaybes, fromJust, isNothing) import Data.Maybe (catMaybes, fromJust, isNothing)
import qualified Data.Map as M import qualified Data.Map as M
-- * Types
-- | The underlying monad is based on continuations over Actions.
type M n r a = Cont (Action n r) a
-- | CVars are represented as a reference containing a maybe value,
-- and a list of things blocked on it.
type R r a = r (Maybe a, [Block])
-- | Doing this with a typeclass proved to be really hard, so here's a -- | Doing this with a typeclass proved to be really hard, so here's a
-- dict of methods for different implementations to override! -- dict of methods for different implementations to override!
--
-- Constraints: Functor (c t), Functor n, Monad (c t), Monad n.
data Fixed c n r t = F data Fixed c n r t = F
{ newRef :: forall a. a -> n (r a) { newRef :: forall a. a -> n (r a)
-- ^ Create a new reference -- ^ Create a new reference
@ -29,6 +37,8 @@ data Fixed c n r t = F
-- type. -- type.
} }
-- * Running @Conc@ monads
-- | Scheduling is done in terms of a trace of 'Action's. Blocking can -- | Scheduling is done in terms of a trace of 'Action's. Blocking can
-- only occur as a result of an action, and they cover (most of) the -- only occur as a result of an action, and they cover (most of) the
-- primitives of the concurrency. `spawn` is absent as it can be -- primitives of the concurrency. `spawn` is absent as it can be
@ -47,7 +57,7 @@ data Action n r =
-- integers, but you shouldn't assume they are necessarily contiguous. -- integers, but you shouldn't assume they are necessarily contiguous.
type ThreadId = Int type ThreadId = Int
-- | A @Scheduler@ maintains some internal state, `s`, takes the -- | A @Scheduler@ maintains some internal state, @s@, takes the
-- 'ThreadId' of the last thread scheduled, and the list of runnable -- 'ThreadId' of the last thread scheduled, and the list of runnable
-- threads (which will never be empty). It produces a 'ThreadId' to -- threads (which will never be empty). It produces a 'ThreadId' to
-- schedule, and a new state. -- schedule, and a new state.
@ -81,7 +91,7 @@ data ThreadAction =
| BlockedTake | BlockedTake
-- ^ Get blocked on a take. -- ^ Get blocked on a take.
| TryTake Bool [ThreadId] | TryTake Bool [ThreadId]
-- ^ try to take from a 'CVar', possibly waking up some threads. -- ^ Try to take from a 'CVar', possibly waking up some threads.
| Lift | Lift
-- ^ Lift an action from the underlying monad. -- ^ Lift an action from the underlying monad.
deriving (Eq, Show) deriving (Eq, Show)
@ -89,13 +99,13 @@ data ThreadAction =
-- | Run a concurrent computation with a given 'Scheduler' and initial -- | Run a concurrent computation with a given 'Scheduler' and initial
-- state, returning `Just result` if it terminates, and `Nothing` if a -- state, returning `Just result` if it terminates, and `Nothing` if a
-- deadlock is detected. -- deadlock is detected.
runFixed :: (Functor (c t), Functor n, Monad (c t), Monad n) => Fixed c n r t runFixed :: (Monad (c t), Monad n) => Fixed c n r t
-> Scheduler s -> s -> c t a -> n (Maybe a) -> Scheduler s -> s -> c t a -> n (Maybe a)
runFixed fixed sched s ma = (\(a,_,_) -> a) <$> runFixed' fixed sched s ma runFixed fixed sched s ma = liftM (\(a,_,_) -> a) $ runFixed' fixed sched s ma
-- | Variant of 'runConc' which returns the final state of the -- | Variant of 'runConc' which returns the final state of the
-- scheduler and an execution trace. -- scheduler and an execution trace.
runFixed' :: (Functor (c t), Functor n, Monad (c t), Monad n) => Fixed c n r t runFixed' :: (Monad (c t), Monad n) => Fixed c n r t
-> Scheduler s -> s -> c t a -> n (Maybe a, s, Trace) -> Scheduler s -> s -> c t a -> n (Maybe a, s, Trace)
runFixed' fixed sched s ma = do runFixed' fixed sched s ma = do
ref <- newRef fixed Nothing ref <- newRef fixed Nothing
@ -108,6 +118,8 @@ runFixed' fixed sched s ma = do
return (out, s', reverse trace) return (out, s', reverse trace)
-- * Running threads
-- | A @Block@ is used to determine what sort of block a thread is -- | A @Block@ is used to determine what sort of block a thread is
-- experiencing. -- experiencing.
data Block = WaitFull ThreadId | WaitEmpty ThreadId deriving Eq data Block = WaitFull ThreadId | WaitEmpty ThreadId deriving Eq
@ -115,13 +127,6 @@ data Block = WaitFull ThreadId | WaitEmpty ThreadId deriving Eq
-- | Threads are represented as a tuple of (next action, is blocked). -- | Threads are represented as a tuple of (next action, is blocked).
type Threads n r = Map ThreadId (Action n r, Bool) type Threads n r = Map ThreadId (Action n r, Bool)
-- | The underlying monad is based on continuations over Actions.
type M n r a = Cont (Action n r) a
-- | CVars are represented as a reference containing a maybe value,
-- and a list of things blocked on it.
type R r a = r (Maybe a, [Block])
-- | Run a collection of threads, until there are no threads left. -- | Run a collection of threads, until there are no threads left.
-- --
-- A thread is represented as a tuple of (next action, is blocked). -- A thread is represented as a tuple of (next action, is blocked).
@ -130,7 +135,7 @@ type R r a = r (Maybe a, [Block])
-- efficient to prepend to a list than append. As this function isn't -- efficient to prepend to a list than append. As this function isn't
-- exposed to users of the library, this is just an internal gotcha to -- exposed to users of the library, this is just an internal gotcha to
-- watch out for. -- watch out for.
runThreads :: (Functor (c t), Functor n, Monad (c t), Monad n) => Fixed c n r t runThreads :: (Monad (c t), Monad n) => Fixed c n r t
-> Trace -> ThreadId -> Scheduler s -> s -> Threads n r -> r (Maybe a) -> n (s, Trace) -> Trace -> ThreadId -> Scheduler s -> s -> Threads n r -> r (Maybe a) -> n (s, Trace)
runThreads fixed sofar prior sched s threads ref runThreads fixed sofar prior sched s threads ref
| isTerminated = return (s, sofar) | isTerminated = return (s, sofar)
@ -153,7 +158,7 @@ runThreads fixed sofar prior sched s threads ref
-- | Run a single thread one step, by dispatching on the type of -- | Run a single thread one step, by dispatching on the type of
-- 'Action'. -- 'Action'.
stepThread :: (Functor (c t), Functor n, Monad (c t), Monad n) stepThread :: (Monad (c t), Monad n)
=> Action n r => Action n r
-> Fixed c n r t -> ThreadId -> Threads n r -> n (Threads n r, Maybe ThreadAction) -> Fixed c n r t -> ThreadId -> Threads n r -> n (Threads n r, Maybe ThreadAction)
stepThread (AFork a b) = stepFork a b stepThread (AFork a b) = stepFork a b
@ -166,7 +171,7 @@ stepThread (ALift na) = stepLift na
stepThread AStop = stepStop stepThread AStop = stepStop
-- | Start a new thread, assigning it a unique 'ThreadId' -- | Start a new thread, assigning it a unique 'ThreadId'
stepFork :: (Functor (c t), Functor n, Monad (c t), Monad n) stepFork :: (Monad (c t), Monad n)
=> Action n r -> Action n r => Action n r -> Action n r
-> Fixed c n r t -> ThreadId -> Threads n r -> n (Threads n r, Maybe ThreadAction) -> Fixed c n r t -> ThreadId -> Threads n r -> n (Threads n r, Maybe ThreadAction)
stepFork a b _ i threads = stepFork a b _ i threads =
@ -174,7 +179,7 @@ stepFork a b _ i threads =
in return (goto b i threads', Just $ Fork newid) in return (goto b i threads', Just $ Fork newid)
-- | Put a value into a @CVar@, blocking the thread until it's empty. -- | Put a value into a @CVar@, blocking the thread until it's empty.
stepPut :: (Functor (c t), Functor n, Monad (c t), Monad n) stepPut :: (Monad (c t), Monad n)
=> R r a -> a -> Action n r => R r a -> a -> Action n r
-> Fixed c n r t -> ThreadId -> Threads n r -> n (Threads n r, Maybe ThreadAction) -> Fixed c n r t -> ThreadId -> Threads n r -> n (Threads n r, Maybe ThreadAction)
stepPut ref a c fixed i threads = do stepPut ref a c fixed i threads = do
@ -189,7 +194,7 @@ stepPut ref a c fixed i threads = do
return (goto c i threads', Just $ Put woken) return (goto c i threads', Just $ Put woken)
-- | Try to put a value into a @CVar@, without blocking. -- | Try to put a value into a @CVar@, without blocking.
stepTryPut :: (Functor (c t), Functor n, Monad (c t), Monad n) stepTryPut :: (Monad (c t), Monad n)
=> R r a -> a -> (Bool -> Action n r) => R r a -> a -> (Bool -> Action n r)
-> Fixed c n r t -> ThreadId -> Threads n r -> n (Threads n r, Maybe ThreadAction) -> Fixed c n r t -> ThreadId -> Threads n r -> n (Threads n r, Maybe ThreadAction)
stepTryPut ref a c fixed i threads = do stepTryPut ref a c fixed i threads = do
@ -203,7 +208,7 @@ stepTryPut ref a c fixed i threads = do
-- | Get the value from a @CVar@, without emptying, blocking the -- | Get the value from a @CVar@, without emptying, blocking the
-- thread until it's full. -- thread until it's full.
stepGet :: (Functor (c t), Functor n, Monad (c t), Monad n) stepGet :: (Monad (c t), Monad n)
=> R r a -> (a -> Action n r) => R r a -> (a -> Action n r)
-> Fixed c n r t -> ThreadId -> Threads n r -> n (Threads n r, Maybe ThreadAction) -> Fixed c n r t -> ThreadId -> Threads n r -> n (Threads n r, Maybe ThreadAction)
stepGet ref c fixed i threads = do stepGet ref c fixed i threads = do
@ -216,7 +221,7 @@ stepGet ref c fixed i threads = do
-- | Take the value from a @CVar@, blocking the thread until it's -- | Take the value from a @CVar@, blocking the thread until it's
-- full. -- full.
stepTake :: (Functor (c t), Functor n, Monad (c t), Monad n) stepTake :: (Monad (c t), Monad n)
=> R r a -> (a -> Action n r) => R r a -> (a -> Action n r)
-> Fixed c n r t -> ThreadId -> Threads n r -> n (Threads n r, Maybe ThreadAction) -> Fixed c n r t -> ThreadId -> Threads n r -> n (Threads n r, Maybe ThreadAction)
stepTake ref c fixed i threads = do stepTake ref c fixed i threads = do
@ -231,7 +236,7 @@ stepTake ref c fixed i threads = do
return (threads', Just BlockedTake) return (threads', Just BlockedTake)
-- | Try to take the value from a @CVar@, without blocking. -- | Try to take the value from a @CVar@, without blocking.
stepTryTake :: (Functor (c t), Functor n, Monad (c t), Monad n) stepTryTake :: (Monad (c t), Monad n)
=> R r a -> (Maybe a -> Action n r) => R r a -> (Maybe a -> Action n r)
-> Fixed c n r t -> ThreadId -> Threads n r -> n (Threads n r, Maybe ThreadAction) -> Fixed c n r t -> ThreadId -> Threads n r -> n (Threads n r, Maybe ThreadAction)
stepTryTake ref c fixed i threads = do stepTryTake ref c fixed i threads = do
@ -245,7 +250,7 @@ stepTryTake ref c fixed i threads = do
-- | Lift an action from the underlying monad into the @Conc@ -- | Lift an action from the underlying monad into the @Conc@
-- computation. -- computation.
stepLift :: (Functor (c t), Functor n, Monad (c t), Monad n) stepLift :: (Monad (c t), Monad n)
=> n (Action n r) => n (Action n r)
-> Fixed c n r t -> ThreadId -> Threads n r -> n (Threads n r, Maybe ThreadAction) -> Fixed c n r t -> ThreadId -> Threads n r -> n (Threads n r, Maybe ThreadAction)
stepLift na _ i threads = do stepLift na _ i threads = do
@ -253,16 +258,18 @@ stepLift na _ i threads = do
return (goto a i threads, Just Lift) return (goto a i threads, Just Lift)
-- | Kill the current thread. -- | Kill the current thread.
stepStop :: (Functor (c t), Functor n, Monad (c t), Monad n) stepStop :: (Monad (c t), Monad n)
=> Fixed c n r t -> ThreadId -> Threads n r -> n (Threads n r, Maybe ThreadAction) => Fixed c n r t -> ThreadId -> Threads n r -> n (Threads n r, Maybe ThreadAction)
stepStop _ i threads = return (kill i threads, Nothing) stepStop _ i threads = return (kill i threads, Nothing)
-- * Manipulating threads
-- | Replace the @Action@ of a thread. -- | Replace the @Action@ of a thread.
goto :: Action n r -> ThreadId -> Threads n r -> Threads n r goto :: Action n r -> ThreadId -> Threads n r -> Threads n r
goto a = M.alter $ \(Just (_, b)) -> Just (a, b) goto a = M.alter $ \(Just (_, b)) -> Just (a, b)
-- | Block a thread on a @CVar@. -- | Block a thread on a @CVar@.
block :: (Functor (c t), Functor n, Monad (c t), Monad n) => Fixed c n r t block :: (Monad (c t), Monad n) => Fixed c n r t
-> R r a -> (ThreadId -> Block) -> ThreadId -> Threads n r -> n (Threads n r) -> R r a -> (ThreadId -> Block) -> ThreadId -> Threads n r -> n (Threads n r)
block fixed ref typ tid threads = do block fixed ref typ tid threads = do
(val, blocks) <- readRef fixed ref (val, blocks) <- readRef fixed ref
@ -279,10 +286,10 @@ kill :: ThreadId -> Threads n r -> Threads n r
kill = M.delete kill = M.delete
-- | Wake every thread blocked on a @CVar@ read/write. -- | Wake every thread blocked on a @CVar@ read/write.
wake :: (Functor (c t), Functor n, Monad (c t), Monad n) => Fixed c n r t wake :: (Monad (c t), Monad n) => Fixed c n r t
-> R r a -> (ThreadId -> Block) -> Threads n r -> n (Threads n r, [ThreadId]) -> R r a -> (ThreadId -> Block) -> Threads n r -> n (Threads n r, [ThreadId])
wake fixed ref typ m = do wake fixed ref typ m = do
(m', woken) <- unzip <$> mapM wake' (M.toList m) (m', woken) <- mapAndUnzipM wake' (M.toList m)
return (M.fromList m', catMaybes woken) return (M.fromList m', catMaybes woken)