Add MonadSTM instances for Conc and ConcIO

This commit is contained in:
Michael Walker 2015-02-09 22:31:05 +00:00
parent 9b5e010d90
commit 36e83e72f8
3 changed files with 68 additions and 46 deletions

View File

@ -15,6 +15,7 @@ module Test.DejaFu.Deterministic
, runConc
, fork
, spawn
, atomically
-- * Communication: CVars
, CVar
@ -46,6 +47,7 @@ import Control.State (Wrapper(..), refST)
import Data.STRef (STRef, newSTRef)
import Test.DejaFu.Deterministic.Internal
import Test.DejaFu.Deterministic.Schedule
import Test.DejaFu.STM (STMLike, runTransactionST)
import qualified Control.Monad.Conc.Class as C
@ -53,10 +55,11 @@ import qualified Control.Monad.Conc.Class as C
-- universally-quantified indexing state trick as used by 'ST' and
-- 'STRef's to prevent mutable references from leaking out of the
-- monad.
newtype Conc t a = C { unC :: M (ST t) (STRef t) a } deriving (Functor, Applicative, Monad)
newtype Conc t a = C { unC :: M (ST t) (STRef t) (STMLike t) a } deriving (Functor, Applicative, Monad)
instance C.MonadConc (Conc t) where
type CVar (Conc t) = CVar t
type CVar (Conc t) = CVar t
type STMLike (Conc t) = STMLike t (ST t) (STRef t)
fork = fork
newEmptyCVar = newEmptyCVar
@ -65,9 +68,10 @@ instance C.MonadConc (Conc t) where
readCVar = readCVar
takeCVar = takeCVar
tryTakeCVar = tryTakeCVar
_concNoTest = _concNoTest
atomically = atomically
_concNoTest = _concNoTest
fixed :: Fixed (ST t) (STRef t)
fixed :: Fixed (ST t) (STRef t) (STMLike t)
fixed = Wrapper refST $ \ma -> cont (\c -> ALift $ c <$> ma)
-- | The concurrent variable type used with the 'Conc' monad. One
@ -91,6 +95,12 @@ readCVar cvar = C $ cont $ AGet $ unV cvar
fork :: Conc t () -> Conc t ()
fork (C ma) = C $ cont $ \c -> AFork (runCont ma $ const AStop) $ c ()
-- | Run the provided 'MonadSTM' transaction atomically. If 'retry' is
-- called, it will be blocked until any of the touched 'CTVar's have
-- been written to.
atomically :: STMLike t (ST t) (STRef t) a -> Conc t a
atomically stm = C $ cont $ AAtom stm
-- | Create a new empty 'CVar'.
newEmptyCVar :: Conc t (CVar t a)
newEmptyCVar = C $ cont lifted where
@ -132,4 +142,4 @@ _concNoTest ma = C $ cont $ \c -> ANoTest (unC ma) c
-- making your head hurt, check out the \"How @runST@ works\" section
-- of <https://ocharles.org.uk/blog/guest-posts/2014-12-18-rank-n-types.html>
runConc :: Scheduler s -> s -> (forall t. Conc t a) -> (Either Failure a, s, Trace)
runConc sched s ma = runST $ runFixed fixed sched s $ unC ma
runConc sched s ma = runST $ runFixed fixed runTransactionST sched s $ unC ma

View File

@ -19,6 +19,7 @@ module Test.DejaFu.Deterministic.IO
, liftIO
, fork
, spawn
, atomically
-- * Communication: CVars
, CVar
@ -49,18 +50,20 @@ import Control.State (Wrapper(..), refIO)
import Data.IORef (IORef, newIORef)
import Test.DejaFu.Deterministic.Internal
import Test.DejaFu.Deterministic.Schedule
import Test.DejaFu.STM (STMLike, runTransactionIO)
import qualified Control.Monad.Conc.Class as C
import qualified Control.Monad.IO.Class as IO
-- | The 'IO' variant of Test.DejaFu.Deterministic's @Conc@ monad.
newtype ConcIO t a = C { unC :: M IO IORef a } deriving (Functor, Applicative, Monad)
newtype ConcIO t a = C { unC :: M IO IORef (STMLike t) a } deriving (Functor, Applicative, Monad)
instance IO.MonadIO (ConcIO t) where
liftIO = liftIO
instance C.MonadConc (ConcIO t) where
type CVar (ConcIO t) = CVar t
type CVar (ConcIO t) = CVar t
type STMLike (ConcIO t) = STMLike t IO IORef
fork = fork
newEmptyCVar = newEmptyCVar
@ -69,9 +72,10 @@ instance C.MonadConc (ConcIO t) where
readCVar = readCVar
takeCVar = takeCVar
tryTakeCVar = tryTakeCVar
atomically = atomically
_concNoTest = _concNoTest
fixed :: Fixed IO IORef
fixed :: Fixed IO IORef (STMLike t)
fixed = Wrapper refIO $ unC . liftIO
-- | The concurrent variable type used with the 'ConcIO' monad. These
@ -96,6 +100,12 @@ readCVar cvar = C $ cont $ AGet $ unV cvar
fork :: ConcIO t () -> ConcIO t ()
fork (C ma) = C $ cont $ \c -> AFork (runCont ma $ const AStop) $ c ()
-- | Run the provided 'MonadSTM' transaction atomically. If 'retry' is
-- called, it will be blocked until any of the touched 'CTVar's have
-- been written to.
atomically :: STMLike t IO IORef a -> ConcIO t a
atomically stm = C $ cont $ AAtom stm
-- | Create a new empty 'CVar'.
newEmptyCVar :: ConcIO t (CVar t a)
newEmptyCVar = C $ cont lifted where
@ -128,4 +138,4 @@ _concNoTest ma = C $ cont $ \c -> ANoTest (unC ma) c
-- state, returning an failure reason on error. Also returned is the
-- final state of the scheduler, and an execution trace.
runConcIO :: Scheduler s -> s -> (forall t. ConcIO t a) -> IO (Either Failure a, s, Trace)
runConcIO sched s ma = runFixed fixed sched s $ unC ma
runConcIO sched s ma = runFixed fixed runTransactionIO sched s $ unC ma

View File

@ -12,20 +12,21 @@ import Control.State
import Data.List.Extra
import Data.Map (Map)
import Data.Maybe (catMaybes, fromJust, isNothing)
import Test.DejaFu.STM (CTVarId, Result(..))
import qualified Data.Map as M
-- * The @Conc@ Monad
-- | The underlying monad is based on continuations over Actions.
type M n r a = Cont (Action n r) a
type M n r s a = Cont (Action n r s) a
-- | CVars are represented as a reference containing a Maybe value, a
-- list of things blocked on it, and a unique numeric identifier.
type R r a = r (CVarId, Maybe a, [Block])
-- | Dict of methods for implementations to override.
type Fixed n r = Wrapper n r (Cont (Action n r))
type Fixed n r s = Wrapper n r (Cont (Action n r s))
-- * Running @Conc@ Computations
@ -33,16 +34,17 @@ type Fixed n r = Wrapper n r (Cont (Action n r))
-- only occur as a result of an action, and they cover (most of) the
-- primitives of the concurrency. 'spawn' is absent as it is
-- implemented in terms of 'newEmptyCVar', 'fork', and 'putCVar'.
data Action n r =
AFork (Action n r) (Action n r)
| forall a. APut (R r a) a (Action n r)
| forall a. ATryPut (R r a) a (Bool -> Action n r)
| forall a. AGet (R r a) (a -> Action n r)
| forall a. ATake (R r a) (a -> Action n r)
| forall a. ATryTake (R r a) (Maybe a -> Action n r)
| forall a. ANoTest (M n r a) (a -> Action n r)
| ANew (CVarId -> n (Action n r))
| ALift (n (Action n r))
data Action n r s =
AFork (Action n r s) (Action n r s)
| forall a. APut (R r a) a (Action n r s)
| forall a. ATryPut (R r a) a (Bool -> Action n r s)
| forall a. AGet (R r a) (a -> Action n r s)
| forall a. ATake (R r a) (a -> Action n r s)
| forall a. ATryTake (R r a) (Maybe a -> Action n r s)
| forall a. ANoTest (M n r s a) (a -> Action n r s)
| forall a. AAtom (s n r a) (a -> Action n r s)
| ANew (CVarId -> n (Action n r s))
| ALift (n (Action n r s))
| AStop
-- | Every live thread has a unique identitifer.
@ -161,15 +163,15 @@ instance NFData Failure where
-- state, returning a 'Just' if it terminates, and 'Nothing' if a
-- deadlock is detected. Also returned is the final state of the
-- scheduler, and an execution trace.
runFixed :: Monad n => Fixed n r
-> Scheduler s -> s -> M n r a -> n (Either Failure a, s, Trace)
runFixed fixed sched s ma = do
runFixed :: Monad n => Fixed n r s -> (forall x. s n r x -> CTVarId -> n (Result x, CTVarId))
-> Scheduler g -> g -> M n r s a -> n (Either Failure a, g, Trace)
runFixed fixed runstm sched s ma = do
ref <- newRef (wref fixed) Nothing
let c = ma >>= liftN fixed . writeRef (wref fixed) ref . Just . Right
let threads = M.fromList [(0, (runCont c $ const AStop, False))]
(s', trace) <- runThreads fixed (-1, 0) [] (negate 1) sched s threads ref
(s', trace) <- runThreads fixed runstm (-1, 0) [] (negate 1) sched s threads ref
out <- readRef (wref fixed) ref
return (fromJust out, s', reverse trace)
@ -181,7 +183,7 @@ runFixed fixed sched s ma = do
data Block = WaitFull ThreadId | WaitEmpty ThreadId deriving Eq
-- | Threads are represented as a tuple of (next action, is blocked).
type Threads n r = Map ThreadId (Action n r, Bool)
type Threads n r s = Map ThreadId (Action n r s, Bool)
-- | Run a collection of threads, until there are no threads left.
--
@ -191,15 +193,15 @@ type Threads n r = Map ThreadId (Action n r, Bool)
-- efficient to prepend to a list than append. As this function isn't
-- exposed to users of the library, this is just an internal gotcha to
-- watch out for.
runThreads :: Monad n => Fixed n r
-> (CVarId, ThreadId) -> Trace -> ThreadId -> Scheduler s -> s -> Threads n r -> r (Maybe (Either Failure a)) -> n (s, Trace)
runThreads fixed (lastcvid, lasttid) sofar prior sched s threads ref
runThreads :: Monad n => Fixed n r s -> (forall x. s n r x -> CTVarId -> n (Result x, CTVarId))
-> (CVarId, ThreadId) -> Trace -> ThreadId -> Scheduler g -> g -> Threads n r s -> r (Maybe (Either Failure a)) -> n (g, Trace)
runThreads fixed runstm (lastcvid, lasttid) sofar prior sched s threads ref
| isTerminated = return (s, sofar)
| isDeadlocked = writeRef (wref fixed) ref (Just $ Left Deadlock) >> return (s, sofar)
| isNonexistant = writeRef (wref fixed) ref (Just $ Left InternalError) >> return (s, sofar)
| isBlocked = writeRef (wref fixed) ref (Just $ Left InternalError) >> return (s, sofar)
| otherwise = do
stepped <- stepThread fixed (fst $ fromJust thread) (sched, s) (lastcvid, lasttid) chosen threads
stepped <- stepThread fixed runstm (fst $ fromJust thread) (sched, s) (lastcvid, lasttid) chosen threads
case stepped of
Right (threads', act) -> do
let sofar' = (decision, alternatives, act) : sofar
@ -207,7 +209,7 @@ runThreads fixed (lastcvid, lasttid) sofar prior sched s threads ref
let lastcvid' = case act of { New c -> c; _ -> lastcvid }
let lasttid' = case act of { Fork t -> t; _ -> lasttid }
runThreads fixed (lastcvid', lasttid') sofar' chosen sched s' threads' ref
runThreads fixed runstm (lastcvid', lasttid') sofar' chosen sched s' threads' ref
Left failure -> writeRef (wref fixed) ref (Just $ Left failure) >> return (s, sofar)
@ -233,10 +235,10 @@ runThreads fixed (lastcvid, lasttid) sofar prior sched s threads ref
-- | Run a single thread one step, by dispatching on the type of
-- 'Action'.
stepThread :: Monad n => Fixed n r
-> Action n r
-> (Scheduler s, s) -> (CVarId, ThreadId) -> ThreadId -> Threads n r -> n (Either Failure (Threads n r, ThreadAction))
stepThread fixed action (scheduler, schedstate) (lastcvid, lasttid) tid threads = case action of
stepThread :: Monad n => Fixed n r s -> (forall x. s n r x -> CTVarId -> n (Result x, CTVarId))
-> Action n r s
-> (Scheduler g, g) -> (CVarId, ThreadId) -> ThreadId -> Threads n r s -> n (Either Failure (Threads n r s, ThreadAction))
stepThread fixed runstm action (scheduler, schedstate) (lastcvid, lasttid) tid threads = case action of
AFork a b -> stepFork a b
APut ref a c -> stepPut ref a c
ATryPut ref a c -> stepTryPut ref a c
@ -304,7 +306,7 @@ stepThread fixed action (scheduler, schedstate) (lastcvid, lasttid) tid threads
-- | Run a computation atomically. If this fails, the entire thing fails.
stepNoTest ma c = do
(a, _, _) <- runFixed fixed scheduler schedstate ma
(a, _, _) <- runFixed fixed runstm scheduler schedstate ma
return $
case a of
Right a' -> Right (goto (c a') tid threads, NoTest)
@ -316,14 +318,14 @@ stepThread fixed action (scheduler, schedstate) (lastcvid, lasttid) tid threads
-- * Manipulating @CVar@s
-- | Get the ID of a CVar
getCVarId :: Monad n => Fixed n r -> R r a -> n CVarId
getCVarId :: Monad n => Fixed n r s -> R r a -> n CVarId
getCVarId fixed ref = (\(cvid,_,_) -> cvid) `liftM` readRef (wref fixed) ref
-- | Put a value into a @CVar@, in either a blocking or nonblocking
-- way.
putIntoCVar :: Monad n
=> Bool -> R r a -> a -> (Bool -> Action n r)
-> Fixed n r -> ThreadId -> Threads n r -> n (Bool, Threads n r, [ThreadId])
=> Bool -> R r a -> a -> (Bool -> Action n r s)
-> Fixed n r s -> ThreadId -> Threads n r s -> n (Bool, Threads n r s, [ThreadId])
putIntoCVar blocking ref a c fixed threadid threads = do
(cvid, val, blocks) <- readRef (wref fixed) ref
@ -344,8 +346,8 @@ putIntoCVar blocking ref a c fixed threadid threads = do
-- | Take a value from a @CVar@, in either a blocking or nonblocking
-- way.
takeFromCVar :: Monad n
=> Bool -> R r a -> (Maybe a -> Action n r)
-> Fixed n r -> ThreadId -> Threads n r -> n (Bool, Threads n r, [ThreadId])
=> Bool -> R r a -> (Maybe a -> Action n r s)
-> Fixed n r s -> ThreadId -> Threads n r s -> n (Bool, Threads n r s, [ThreadId])
takeFromCVar blocking ref c fixed threadid threads = do
(cvid, val, blocks) <- readRef (wref fixed) ref
@ -366,28 +368,28 @@ takeFromCVar blocking ref c fixed threadid threads = do
-- * Manipulating threads
-- | Replace the @Action@ of a thread.
goto :: Action n r -> ThreadId -> Threads n r -> Threads n r
goto :: Action n r s -> ThreadId -> Threads n r s -> Threads n r s
goto a = M.alter $ \(Just (_, b)) -> Just (a, b)
-- | Block a thread on a @CVar@.
block :: Monad n
=> Fixed n r -> R r a -> (ThreadId -> Block) -> ThreadId -> Threads n r -> n (Threads n r)
=> Fixed n r s -> R r a -> (ThreadId -> Block) -> ThreadId -> Threads n r s -> n (Threads n r s)
block fixed ref typ tid threads = do
(cvid, val, blocks) <- readRef (wref fixed) ref
writeRef (wref fixed) ref (cvid, val, typ tid : blocks)
return $ M.alter (\(Just (a, _)) -> Just (a, True)) tid threads
-- | Start a thread with the given ID. This must not already be in use!
launch :: ThreadId -> Action n r -> Threads n r -> Threads n r
launch :: ThreadId -> Action n r s -> Threads n r s -> Threads n r s
launch tid a = M.insert tid (a, False)
-- | Kill a thread.
kill :: ThreadId -> Threads n r -> Threads n r
kill :: ThreadId -> Threads n r s -> Threads n r s
kill = M.delete
-- | Wake every thread blocked on a @CVar@ read/write.
wake :: Monad n
=> Fixed n r -> R r a -> (ThreadId -> Block) -> Threads n r -> n (Threads n r, [ThreadId])
=> Fixed n r s -> R r a -> (ThreadId -> Block) -> Threads n r s -> n (Threads n r s, [ThreadId])
wake fixed ref typ m = do
(m', woken) <- mapAndUnzipM wake' (M.toList m)