Merge branch 'primops-simp'. Closes #20.

This only implements the atomic-primops operations which deal with CRefs,
as handling relaxed memory in the presence of MutVar#s is really hard.
This commit is contained in:
Michael Walker 2015-11-15 15:45:12 +00:00
commit 64057590db
8 changed files with 320 additions and 107 deletions

View File

@ -12,6 +12,7 @@ module Control.Monad.Conc.Class
, spawn
, forkFinally
, killThread
, cas
-- * Bound Threads
@ -42,6 +43,7 @@ import qualified Control.Monad.State.Lazy as SL
import qualified Control.Monad.State.Strict as SS
import qualified Control.Monad.Writer.Lazy as WL
import qualified Control.Monad.Writer.Strict as WS
import qualified Data.Atomics as A
#if __GLASGOW_HASKELL__ < 710
import Control.Applicative (Applicative)
@ -73,6 +75,11 @@ class ( Applicative m, Monad m
-- @readCRef@, @modifyCRef@, and @atomicWriteCRef@ are used.
type CRef m :: * -> *
-- | When performing compare-and-swap operations on @CRef@s, a
-- @Ticket@ is a proof that a thread observed a specific previous
-- value.
type Ticket m :: * -> *
-- | An abstract handle to a thread.
type ThreadId m :: *
@ -144,9 +151,13 @@ class ( Applicative m, Monad m
newCRef :: a -> m (CRef m a)
-- | Read the current value stored in a reference.
--
-- > readCRef cref = readForCAS cref >>= peekTicket
readCRef :: CRef m a -> m a
readCRef cref = readForCAS cref >>= peekTicket
-- | Atomically modify the value stored in a reference.
-- | Atomically modify the value stored in a reference. This imposes
-- a full memory barrier.
modifyCRef :: CRef m a -> (a -> (a, b)) -> m b
-- | Write a new value into an @CRef@, without imposing a memory
@ -160,6 +171,35 @@ class ( Applicative m, Monad m
atomicWriteCRef :: CRef m a -> a -> m ()
atomicWriteCRef r a = modifyCRef r $ const (a, ())
-- | Read the current value stored in a reference, returning a
-- @Ticket@, for use in future compare-and-swap operations.
readForCAS :: CRef m a -> m (Ticket m a)
-- | Extract the actual Haskell value from a @Ticket@.
--
-- This shouldn't need to do any monadic computation, the @m@
-- appears in the result type because of the need for injectivity in
-- the @Ticket@ type family, which can't be expressed currently.
peekTicket :: Ticket m a -> m a
-- | Perform a machine-level compare-and-swap (CAS) operation on a
-- @CRef@. Returns an indication of success and a @Ticket@ for the
-- most current value in the @CRef@.
--
-- This is strict in the \"new\" value argument.
casCRef :: CRef m a -> Ticket m a -> a -> m (Bool, Ticket m a)
-- | A replacement for 'modifyCRef' using a compare-and-swap.
--
-- This is strict in the \"new\" value argument.
modifyCRefCAS :: CRef m a -> (a -> (a, b)) -> m b
-- | A variant of 'modifyCRefCAS' which doesn't return a result.
--
-- > modifyCRefCAS_ cref f = modifyCRefCAS cref (\a -> (f a, ()))
modifyCRefCAS_ :: CRef m a -> (a -> a) -> m ()
modifyCRefCAS_ cref f = modifyCRefCAS cref (\a -> (f a, ()))
-- | Perform an STM transaction atomically.
atomically :: STMLike m a -> m a
@ -266,6 +306,7 @@ instance MonadConc IO where
type STMLike IO = STM
type CVar IO = MVar
type CRef IO = IORef
type Ticket IO = A.Ticket
type ThreadId IO = C.ThreadId
readCVar = readMVar
@ -287,6 +328,10 @@ instance MonadConc IO where
modifyCRef = atomicModifyIORef
writeCRef = writeIORef
atomicWriteCRef = atomicWriteIORef
readForCAS = A.readForCAS
peekTicket = return . A.peekTicket
casCRef = A.casIORef
modifyCRefCAS = A.atomicModifyIORefCAS
atomically = S.atomically
-- | Create a concurrent computation for the provided action, and
@ -322,6 +367,16 @@ rtsSupportsBoundThreads = False
isCurrentThreadBound :: MonadConc m => m Bool
isCurrentThreadBound = return False
-- | Compare-and-swap a value in a @CRef@, returning an indication of
-- success and the new value.
cas :: MonadConc m => CRef m a -> a -> m (Bool, a)
cas cref a = do
tick <- readForCAS cref
(suc, tick') <- casCRef cref tick a
a' <- peekTicket tick'
return (suc, a')
-------------------------------------------------------------------------------
-- Transformer instances
@ -329,6 +384,7 @@ instance MonadConc m => MonadConc (ReaderT r m) where
type STMLike (ReaderT r m) = STMLike m
type CVar (ReaderT r m) = CVar m
type CRef (ReaderT r m) = CRef m
type Ticket (ReaderT r m) = Ticket m
type ThreadId (ReaderT r m) = ThreadId m
fork = reader fork
@ -351,6 +407,10 @@ instance MonadConc m => MonadConc (ReaderT r m) where
modifyCRef r = lift . modifyCRef r
writeCRef r = lift . writeCRef r
atomicWriteCRef r = lift . atomicWriteCRef r
readForCAS = lift . readForCAS
peekTicket = lift . peekTicket
casCRef r t = lift . casCRef r t
modifyCRefCAS r = lift . modifyCRefCAS r
atomically = lift . atomically
_concKnowsAbout = lift . _concKnowsAbout
_concForgets = lift . _concForgets
@ -363,6 +423,7 @@ instance (MonadConc m, Monoid w) => MonadConc (WL.WriterT w m) where
type STMLike (WL.WriterT w m) = STMLike m
type CVar (WL.WriterT w m) = CVar m
type CRef (WL.WriterT w m) = CRef m
type Ticket (WL.WriterT w m) = Ticket m
type ThreadId (WL.WriterT w m) = ThreadId m
fork = writerlazy fork
@ -385,6 +446,10 @@ instance (MonadConc m, Monoid w) => MonadConc (WL.WriterT w m) where
modifyCRef r = lift . modifyCRef r
writeCRef r = lift . writeCRef r
atomicWriteCRef r = lift . atomicWriteCRef r
readForCAS = lift . readForCAS
peekTicket = lift . peekTicket
casCRef r t = lift . casCRef r t
modifyCRefCAS r = lift . modifyCRefCAS r
atomically = lift . atomically
_concKnowsAbout = lift . _concKnowsAbout
_concForgets = lift . _concForgets
@ -397,6 +462,7 @@ instance (MonadConc m, Monoid w) => MonadConc (WS.WriterT w m) where
type STMLike (WS.WriterT w m) = STMLike m
type CVar (WS.WriterT w m) = CVar m
type CRef (WS.WriterT w m) = CRef m
type Ticket (WS.WriterT w m) = Ticket m
type ThreadId (WS.WriterT w m) = ThreadId m
fork = writerstrict fork
@ -419,6 +485,10 @@ instance (MonadConc m, Monoid w) => MonadConc (WS.WriterT w m) where
modifyCRef r = lift . modifyCRef r
writeCRef r = lift . writeCRef r
atomicWriteCRef r = lift . atomicWriteCRef r
readForCAS = lift . readForCAS
peekTicket = lift . peekTicket
casCRef r t = lift . casCRef r t
modifyCRefCAS r = lift . modifyCRefCAS r
atomically = lift . atomically
_concKnowsAbout = lift . _concKnowsAbout
_concForgets = lift . _concForgets
@ -431,6 +501,7 @@ instance MonadConc m => MonadConc (SL.StateT s m) where
type STMLike (SL.StateT s m) = STMLike m
type CVar (SL.StateT s m) = CVar m
type CRef (SL.StateT s m) = CRef m
type Ticket (SL.StateT s m) = Ticket m
type ThreadId (SL.StateT s m) = ThreadId m
fork = statelazy fork
@ -453,6 +524,10 @@ instance MonadConc m => MonadConc (SL.StateT s m) where
modifyCRef r = lift . modifyCRef r
writeCRef r = lift . writeCRef r
atomicWriteCRef r = lift . atomicWriteCRef r
readForCAS = lift . readForCAS
peekTicket = lift . peekTicket
casCRef r t = lift . casCRef r t
modifyCRefCAS r = lift . modifyCRefCAS r
atomically = lift . atomically
_concKnowsAbout = lift . _concKnowsAbout
_concForgets = lift . _concForgets
@ -465,6 +540,7 @@ instance MonadConc m => MonadConc (SS.StateT s m) where
type STMLike (SS.StateT s m) = STMLike m
type CVar (SS.StateT s m) = CVar m
type CRef (SS.StateT s m) = CRef m
type Ticket (SS.StateT s m) = Ticket m
type ThreadId (SS.StateT s m) = ThreadId m
fork = statestrict fork
@ -487,6 +563,10 @@ instance MonadConc m => MonadConc (SS.StateT s m) where
modifyCRef r = lift . modifyCRef r
writeCRef r = lift . writeCRef r
atomicWriteCRef r = lift . atomicWriteCRef r
readForCAS = lift . readForCAS
peekTicket = lift . peekTicket
casCRef r t = lift . casCRef r t
modifyCRefCAS r = lift . modifyCRefCAS r
atomically = lift . atomically
_concKnowsAbout = lift . _concKnowsAbout
_concForgets = lift . _concForgets
@ -499,6 +579,7 @@ instance (MonadConc m, Monoid w) => MonadConc (RL.RWST r w s m) where
type STMLike (RL.RWST r w s m) = STMLike m
type CVar (RL.RWST r w s m) = CVar m
type CRef (RL.RWST r w s m) = CRef m
type Ticket (RL.RWST r w s m) = Ticket m
type ThreadId (RL.RWST r w s m) = ThreadId m
fork = rwslazy fork
@ -521,6 +602,10 @@ instance (MonadConc m, Monoid w) => MonadConc (RL.RWST r w s m) where
modifyCRef r = lift . modifyCRef r
writeCRef r = lift . writeCRef r
atomicWriteCRef r = lift . atomicWriteCRef r
readForCAS = lift . readForCAS
peekTicket = lift . peekTicket
casCRef r t = lift . casCRef r t
modifyCRefCAS r = lift . modifyCRefCAS r
atomically = lift . atomically
_concKnowsAbout = lift . _concKnowsAbout
_concForgets = lift . _concForgets
@ -533,6 +618,7 @@ instance (MonadConc m, Monoid w) => MonadConc (RS.RWST r w s m) where
type STMLike (RS.RWST r w s m) = STMLike m
type CVar (RS.RWST r w s m) = CVar m
type CRef (RS.RWST r w s m) = CRef m
type Ticket (RS.RWST r w s m) = Ticket m
type ThreadId (RS.RWST r w s m) = ThreadId m
fork = rwsstrict fork
@ -555,6 +641,10 @@ instance (MonadConc m, Monoid w) => MonadConc (RS.RWST r w s m) where
modifyCRef r = lift . modifyCRef r
writeCRef r = lift . writeCRef r
atomicWriteCRef r = lift . atomicWriteCRef r
readForCAS = lift . readForCAS
peekTicket = lift . peekTicket
casCRef r t = lift . casCRef r t
modifyCRefCAS r = lift . modifyCRefCAS r
atomically = lift . atomically
_concKnowsAbout = lift . _concKnowsAbout
_concForgets = lift . _concForgets

View File

@ -93,6 +93,7 @@ instance Ca.MonadMask (Conc n r s) where
instance Monad n => C.MonadConc (Conc n r (STMLike n r)) where
type CVar (Conc n r (STMLike n r)) = CVar r
type CRef (Conc n r (STMLike n r)) = CRef r
type Ticket (Conc n r (STMLike n r)) = Ticket
type STMLike (Conc n r (STMLike n r)) = STMLike n r
type ThreadId (Conc n r (STMLike n r)) = Int
@ -115,9 +116,16 @@ instance Monad n => C.MonadConc (Conc n r (STMLike n r)) where
newCRef a = toConc (\c -> ANewRef a c)
readCRef ref = toConc (AReadRef ref)
writeCRef ref a = toConc (\c -> AWriteRef ref a (c ()))
modifyCRef ref f = toConc (AModRef ref f)
readCRef ref = toConc (AReadRef ref)
readForCAS ref = toConc (AReadRefCas ref)
peekTicket tick = toConc (APeekTicket tick)
writeCRef ref a = toConc (\c -> AWriteRef ref a (c ()))
casCRef ref tick a = toConc (ACasRef ref tick a)
modifyCRef ref f = toConc (AModRef ref f)
modifyCRefCAS ref f = toConc (AModRefCas ref f)
-- ----------

View File

@ -13,6 +13,7 @@ module Test.DejaFu.Deterministic.Internal
, M(..)
, CVar(..)
, CRef(..)
, Ticket(..)
, Fixed
, cont
, runCont
@ -42,7 +43,7 @@ module Test.DejaFu.Deterministic.Internal
-- * Synchronised and Unsynchronised Actions
, ActionType(..)
, isBarrier
, isSynchronised
, synchronises
, crefOf
, cvarOf
, simplify
@ -64,7 +65,6 @@ import Test.DejaFu.Deterministic.Internal.Memory
import Test.DejaFu.Deterministic.Internal.Threading
import qualified Data.IntMap.Strict as I
import qualified Data.Map as M
#if __GLASGOW_HASKELL__ < 710
import Control.Applicative ((<$>), (<*>))
@ -92,7 +92,7 @@ runFixed' fixed runstm sched memtype s idSource ma = do
ref <- newRef fixed Nothing
let c = ma >>= liftN fixed . writeRef fixed ref . Just . Right
let threads = launch' Unmasked 0 ((\a _ -> a) $ runCont c $ const AStop) M.empty
let threads = launch' Unmasked 0 ((\a _ -> a) $ runCont c $ const AStop) I.empty
(s', idSource', trace) <- runThreads fixed runstm sched memtype s threads idSource ref
out <- readRef fixed ref
@ -127,19 +127,19 @@ runThreads fixed runstm sched memtype origg origthreads idsrc ref = go idsrc []
where
(chosen, g') = sched g ((\p (_,_,a) -> (p,a)) <$> prior <*> listToMaybe sofar) $ unsafeToNonEmpty runnable'
runnable' = [(t, nextActions t) | t <- sort $ M.keys runnable]
runnable = M.filter (isNothing . _blocking) threadsc
thread = M.lookup chosen threadsc
runnable' = [(t, nextActions t) | t <- sort $ I.keys runnable]
runnable = I.filter (isNothing . _blocking) threadsc
thread = I.lookup chosen threadsc
threadsc = addCommitThreads wb threads
isBlocked = isJust . _blocking $ fromJust thread
isNonexistant = isNothing thread
isTerminated = 0 `notElem` M.keys threads
isDeadlocked = isLocked 0 threads && (((~= OnCVarFull undefined) <$> M.lookup 0 threads) == Just True ||
((~= OnCVarEmpty undefined) <$> M.lookup 0 threads) == Just True ||
((~= OnMask undefined) <$> M.lookup 0 threads) == Just True)
isSTMLocked = isLocked 0 threads && ((~= OnCTVar []) <$> M.lookup 0 threads) == Just True
isTerminated = 0 `notElem` I.keys threads
isDeadlocked = isLocked 0 threads && (((~= OnCVarFull undefined) <$> I.lookup 0 threads) == Just True ||
((~= OnCVarEmpty undefined) <$> I.lookup 0 threads) == Just True ||
((~= OnMask undefined) <$> I.lookup 0 threads) == Just True)
isSTMLocked = isLocked 0 threads && ((~= OnCTVar []) <$> I.lookup 0 threads) == Just True
unblockWaitingOn tid = M.map unblock where
unblockWaitingOn tid = fmap unblock where
unblock thrd = case _blocking thrd of
Just (OnMask t) | t == tid -> thrd { _blocking = Nothing }
_ -> thrd
@ -154,47 +154,16 @@ runThreads fixed runstm sched memtype origg origthreads idsrc ref = go idsrc []
| prior `notElem` map (Just . fst) runnable' = [(Start t, na) | (t, na) <- runnable', t /= chosen]
| otherwise = [(if Just t == prior then Continue else SwitchTo t, na) | (t, na) <- runnable', t /= chosen]
nextActions t = lookahead . _continuation . fromJust $ M.lookup t threadsc
nextActions t = lookahead . _continuation . fromJust $ I.lookup t threadsc
stop = return (g, idSource, sofar)
die reason = writeRef fixed ref (Just $ Left reason) >> stop
loop threads' idSource' act wb' =
let sofar' = ((decision, alternatives, act) : sofar)
threads'' = if (interruptible <$> M.lookup chosen threads') /= Just False then unblockWaitingOn chosen threads' else threads'
threads'' = if (interruptible <$> I.lookup chosen threads') /= Just False then unblockWaitingOn chosen threads' else threads'
in go idSource' sofar' (Just chosen) g' (delCommitThreads threads'') wb'
-- | Look as far ahead in the given continuation as possible.
lookahead :: Action n r s -> NonEmpty Lookahead
lookahead = unsafeToNonEmpty . lookahead' where
lookahead' (AFork _ _) = [WillFork]
lookahead' (AMyTId _) = [WillMyThreadId]
lookahead' (ANewVar _) = [WillNewVar]
lookahead' (APutVar (CVar (c, _)) _ k) = WillPutVar c : lookahead' k
lookahead' (ATryPutVar (CVar (c, _)) _ _) = [WillTryPutVar c]
lookahead' (AReadVar (CVar (c, _)) _) = [WillReadVar c]
lookahead' (ATakeVar (CVar (c, _)) _) = [WillTakeVar c]
lookahead' (ATryTakeVar (CVar (c, _)) _) = [WillTryTakeVar c]
lookahead' (ANewRef _ _) = [WillNewRef]
lookahead' (AReadRef (CRef (r, _)) _) = [WillReadRef r]
lookahead' (AModRef (CRef (r, _)) _ _) = [WillModRef r]
lookahead' (AWriteRef (CRef (r, _)) _ k) = WillWriteRef r : lookahead' k
lookahead' (ACommit t c) = [WillCommitRef t c]
lookahead' (AAtom _ _) = [WillSTM]
lookahead' (AThrow _) = [WillThrow]
lookahead' (AThrowTo tid _ k) = WillThrowTo tid : lookahead' k
lookahead' (ACatching _ _ _) = [WillCatching]
lookahead' (APopCatching k) = WillPopCatching : lookahead' k
lookahead' (AMasking ms _ _) = [WillSetMasking False ms]
lookahead' (AResetMask b1 b2 ms k) = (if b1 then WillSetMasking else WillResetMasking) b2 ms : lookahead' k
lookahead' (ALift _) = [WillLift]
lookahead' (AKnowsAbout _ k) = WillKnowsAbout : lookahead' k
lookahead' (AForgets _ k) = WillForgets : lookahead' k
lookahead' (AAllKnown k) = WillAllKnown : lookahead' k
lookahead' (AYield k) = WillYield : lookahead' k
lookahead' (AReturn k) = WillReturn : lookahead' k
lookahead' AStop = [WillStop]
--------------------------------------------------------------------------------
-- * Single-step execution
@ -228,8 +197,12 @@ stepThread fixed runstm memtype action idSource tid threads wb = case action of
ATryTakeVar var c -> stepTryTakeVar var c
ANewRef a c -> stepNewRef a c
AReadRef ref c -> stepReadRef ref c
AReadRefCas ref c -> stepReadRefCas ref c
APeekTicket tick c -> stepPeekTicket tick c
AModRef ref f c -> stepModRef ref f c
AModRefCas ref f c -> stepModRefCas ref f c
AWriteRef ref a c -> stepWriteRef ref a c
ACasRef ref tick a c -> stepCasRef ref tick a c
ACommit t c -> stepCommit t c
AAtom stm c -> stepAtom stm c
ALift na -> stepLift na
@ -290,12 +263,28 @@ stepThread fixed runstm memtype action idSource tid threads wb = case action of
val <- readCRef fixed cref tid
simple (goto (c val) tid threads) $ ReadRef crid
-- | Read from a @CRef@ for future compare-and-swap operations.
stepReadRefCas cref@(CRef (crid, _)) c = do
tick <- readForTicket fixed cref tid
simple (goto (c tick) tid threads) $ ReadRefCas crid
-- | Extract the value from a @Ticket@.
stepPeekTicket (Ticket (crid, _, a)) c = simple (goto (c a) tid threads) $ PeekTicket crid
-- | Modify a @CRef@.
stepModRef cref@(CRef (crid, _)) f c = synchronised $ do
(new, val) <- f <$> readCRef fixed cref tid
writeImmediate fixed cref new
simple (goto (c val) tid threads) $ ModRef crid
-- | Modify a @CRef@ using a compare-and-swap.
--
-- Not actually implemented with a CAS here because the observable
-- behaviour is the same, it's just the speed that may differ.
stepModRefCas cref f c = do
Right (threads', idSource', ModRef crid, wb') <- stepModRef cref f c
return $ Right (threads', idSource', ModRefCas crid, wb')
-- | Write to a @CRef@ without synchronising
stepWriteRef cref@(CRef (crid, _)) a c = case memtype of
-- Write immediately.
@ -313,6 +302,11 @@ stepThread fixed runstm memtype action idSource tid threads wb = case action of
wb' <- bufferWrite fixed wb crid cref a tid
return $ Right (goto c tid threads, idSource, WriteRef crid, wb')
-- | Perform a compare-and-swap on a @CRef@.
stepCasRef cref@(CRef (crid, _)) tick a c = synchronised $ do
(suc, tick') <- casCRef fixed cref tid tick a
simple (goto (c (suc, tick')) tid threads) $ CasRef crid suc
-- | Commit a @CRef@ write
stepCommit c t = do
wb' <- case memtype of
@ -367,7 +361,7 @@ stepThread fixed runstm memtype action idSource tid threads wb = case action of
stepThrowTo t e c = synchronised $
let threads' = goto c tid threads
blocked = block (OnMask t) tid threads
in case M.lookup t threads of
in case I.lookup t threads of
Just thread
| interruptible thread -> case propagate (wrap e) t threads' of
Just threads'' -> simple threads'' $ ThrowTo t
@ -390,7 +384,7 @@ stepThread fixed runstm memtype action idSource tid threads wb = case action of
stepMasking m ma c = simple threads' $ SetMasking False m where
a = runCont (ma umask) (AResetMask False False m' . c)
m' = _masking . fromJust $ M.lookup tid threads
m' = _masking . fromJust $ I.lookup tid threads
umask mb = resetMask True m' >> mb >>= \b -> resetMask False m >> return b
resetMask typ ms = cont $ \k -> AResetMask typ True ms $ k ()
@ -411,7 +405,7 @@ stepThread fixed runstm memtype action idSource tid threads wb = case action of
-- | Create a new @CRef@, using the next 'CRefId'.
stepNewRef a c = do
let (idSource', newcrid) = nextCRId idSource
ref <- newRef fixed (I.empty, a)
ref <- newRef fixed (I.empty, 0, a)
let cref = CRef (newcrid, ref)
return $ Right (goto (c cref) tid threads, idSource', NewRef newcrid, wb)

View File

@ -47,11 +47,21 @@ newtype CVar r a = CVar (CVarId, r (Maybe a))
-- | The mutable non-blocking reference type. These are like 'IORef's.
--
-- @CRef@s are represented as a unique numeric identifier, and a
-- @CRef@s are represented as a unique numeric identifier and a
-- reference containing (a) any thread-local non-synchronised writes
-- (so each thread sees its latest write) and the current value
-- visible to all threads.
newtype CRef r a = CRef (CRefId, r (IntMap a, a))
-- (so each thread sees its latest write), (b) a commit count (used in
-- compare-and-swaps), and (c) the current value visible to all
-- threads.
newtype CRef r a = CRef (CRefId, r (IntMap a, Integer, a))
-- | The compare-and-swap proof type.
--
-- @Ticket@s are represented as just a wrapper around the identifier
-- of the 'CRef' it came from, the commit count at the time it was
-- produced, and an @a@ value. This doesn't work in the source package
-- (atomic-primops) because of the need to use pointer equality. Here
-- we can just pack extra information into 'CRef' to avoid that need.
newtype Ticket a = Ticket (CRefId, Integer, a)
-- | Dict of methods for implementations to override.
type Fixed n r s = Ref n r (M n r s)
@ -82,10 +92,14 @@ data Action n r s =
| forall a. ATakeVar (CVar r a) (a -> Action n r s)
| forall a. ATryTakeVar (CVar r a) (Maybe a -> Action n r s)
| forall a. ANewRef a (CRef r a -> Action n r s)
| forall a. AReadRef (CRef r a) (a -> Action n r s)
| forall a b. AModRef (CRef r a) (a -> (a, b)) (b -> Action n r s)
| forall a. AWriteRef (CRef r a) a (Action n r s)
| forall a. ANewRef a (CRef r a -> Action n r s)
| forall a. AReadRef (CRef r a) (a -> Action n r s)
| forall a. AReadRefCas (CRef r a) (Ticket a -> Action n r s)
| forall a. APeekTicket (Ticket a) (a -> Action n r s)
| forall a b. AModRef (CRef r a) (a -> (a, b)) (b -> Action n r s)
| forall a b. AModRefCas (CRef r a) (a -> (a, b)) (b -> Action n r s)
| forall a. AWriteRef (CRef r a) a (Action n r s)
| forall a. ACasRef (CRef r a) (Ticket a) a ((Bool, Ticket a) -> Action n r s)
| forall e. Exception e => AThrow e
| forall e. Exception e => AThrowTo ThreadId e (Action n r s)
@ -239,10 +253,19 @@ data ThreadAction =
-- ^ Create a new 'CRef'.
| ReadRef CRefId
-- ^ Read from a 'CRef'.
| ReadRefCas CRefId
-- ^ Read from a 'CRef' for a future compare-and-swap.
| PeekTicket CRefId
-- ^ Extract the value from a 'Ticket'.
| ModRef CRefId
-- ^ Modify a 'CRef'.
| ModRefCas CRefId
-- ^ Modify a 'CRef' using a compare-and-swap.
| WriteRef CRefId
-- ^ Write to a 'CRef' without synchronising.
| CasRef CRefId Bool
-- ^ Attempt to to a 'CRef' using a compare-and-swap, synchronising
-- it.
| CommitRef ThreadId CRefId
-- ^ Commit the last write to the given 'CRef' by the given thread,
-- so that all threads can see the updated value.
@ -303,8 +326,12 @@ instance NFData ThreadAction where
rnf (TryTakeVar c b ts) = rnf (c, b, ts)
rnf (NewRef c) = rnf c
rnf (ReadRef c) = rnf c
rnf (ReadRefCas c) = rnf c
rnf (PeekTicket c) = rnf c
rnf (ModRef c) = rnf c
rnf (ModRefCas c) = rnf c
rnf (WriteRef c) = rnf c
rnf (CasRef c b) = rnf (c, b)
rnf (CommitRef t c) = rnf (t, c)
rnf (STM ts) = rnf ts
rnf (ThrowTo t) = rnf t
@ -337,10 +364,19 @@ data Lookahead =
-- ^ Will create a new 'CRef'.
| WillReadRef CRefId
-- ^ Will read from a 'CRef'.
| WillPeekTicket CRefId
-- ^ Will extract the value from a 'Ticket'.
| WillReadRefCas CRefId
-- ^ Will read from a 'CRef' for a future compare-and-swap.
| WillModRef CRefId
-- ^ Will modify a 'CRef'.
| WillModRefCas CRefId
-- ^ Will nodify a 'CRef' using a compare-and-swap.
| WillWriteRef CRefId
-- ^ Will write to a 'CRef' without synchronising.
| WillCasRef CRefId
-- ^ Will attempt to to a 'CRef' using a compare-and-swap,
-- synchronising it.
| WillCommitRef ThreadId CRefId
-- ^ Will commit the last write by the given thread to the 'CRef'.
| WillSTM
@ -385,27 +421,70 @@ instance NFData Lookahead where
rnf (WillTakeVar c) = rnf c
rnf (WillTryTakeVar c) = rnf c
rnf (WillReadRef c) = rnf c
rnf (WillReadRefCas c) = rnf c
rnf (WillPeekTicket c) = rnf c
rnf (WillModRef c) = rnf c
rnf (WillModRefCas c) = rnf c
rnf (WillWriteRef c) = rnf c
rnf (WillCasRef c) = rnf c
rnf (WillCommitRef t c) = rnf (t, c)
rnf (WillThrowTo t) = rnf t
rnf (WillSetMasking b m) = b `seq` m `seq` ()
rnf (WillResetMasking b m) = b `seq` m `seq` ()
rnf l = l `seq` ()
-- | Look as far ahead in the given continuation as possible.
lookahead :: Action n r s -> NonEmpty Lookahead
lookahead = unsafeToNonEmpty . lookahead' where
lookahead' (AFork _ _) = [WillFork]
lookahead' (AMyTId _) = [WillMyThreadId]
lookahead' (ANewVar _) = [WillNewVar]
lookahead' (APutVar (CVar (c, _)) _ k) = WillPutVar c : lookahead' k
lookahead' (ATryPutVar (CVar (c, _)) _ _) = [WillTryPutVar c]
lookahead' (AReadVar (CVar (c, _)) _) = [WillReadVar c]
lookahead' (ATakeVar (CVar (c, _)) _) = [WillTakeVar c]
lookahead' (ATryTakeVar (CVar (c, _)) _) = [WillTryTakeVar c]
lookahead' (ANewRef _ _) = [WillNewRef]
lookahead' (AReadRef (CRef (r, _)) _) = [WillReadRef r]
lookahead' (AReadRefCas (CRef (r, _)) _) = [WillReadRefCas r]
lookahead' (APeekTicket (Ticket (r, _, _)) _) = [WillPeekTicket r]
lookahead' (AModRef (CRef (r, _)) _ _) = [WillModRef r]
lookahead' (AModRefCas (CRef (r, _)) _ _) = [WillModRefCas r]
lookahead' (AWriteRef (CRef (r, _)) _ k) = WillWriteRef r : lookahead' k
lookahead' (ACasRef (CRef (r, _)) _ _ _) = [WillCasRef r]
lookahead' (ACommit t c) = [WillCommitRef t c]
lookahead' (AAtom _ _) = [WillSTM]
lookahead' (AThrow _) = [WillThrow]
lookahead' (AThrowTo tid _ k) = WillThrowTo tid : lookahead' k
lookahead' (ACatching _ _ _) = [WillCatching]
lookahead' (APopCatching k) = WillPopCatching : lookahead' k
lookahead' (AMasking ms _ _) = [WillSetMasking False ms]
lookahead' (AResetMask b1 b2 ms k) = (if b1 then WillSetMasking else WillResetMasking) b2 ms : lookahead' k
lookahead' (ALift _) = [WillLift]
lookahead' (AKnowsAbout _ k) = WillKnowsAbout : lookahead' k
lookahead' (AForgets _ k) = WillForgets : lookahead' k
lookahead' (AAllKnown k) = WillAllKnown : lookahead' k
lookahead' (AYield k) = WillYield : lookahead' k
lookahead' (AReturn k) = WillReturn : lookahead' k
lookahead' AStop = [WillStop]
-- | A simplified view of the possible actions a thread can perform.
data ActionType =
UnsynchronisedRead CRefId
-- ^ A 'readCRef'.
-- ^ A 'readCRef' or a 'readForCAS'.
| UnsynchronisedWrite CRefId
-- ^ A 'writeCRef'.
| UnsynchronisedOther
-- ^ Some other action which doesn't require cross-thread
-- communication.
| PartiallySynchronisedCommit CRefId
-- ^ A commit.
| PartiallySynchronisedWrite CRefId
-- ^ A 'casCRef'
| PartiallySynchronisedModify CRefId
-- ^ A 'modifyCRefCAS'
| SynchronisedModify CRefId
-- ^ An 'atomicModifyCRef'.
| SynchronisedCommit CRefId
-- ^ A commit.
| SynchronisedRead CVarId
-- ^ A 'readCVar' or 'takeCVar' (or @try@/@blocked@ variants).
| SynchronisedWrite CVarId
@ -418,8 +497,10 @@ data ActionType =
instance NFData ActionType where
rnf (UnsynchronisedRead r) = rnf r
rnf (UnsynchronisedWrite r) = rnf r
rnf (PartiallySynchronisedCommit r) = rnf r
rnf (PartiallySynchronisedWrite r) = rnf r
rnf (PartiallySynchronisedModify r) = rnf r
rnf (SynchronisedModify r) = rnf r
rnf (SynchronisedCommit r) = rnf r
rnf (SynchronisedRead c) = rnf c
rnf (SynchronisedWrite c) = rnf c
rnf a = a `seq` ()
@ -432,17 +513,21 @@ isBarrier (SynchronisedWrite _) = True
isBarrier SynchronisedOther = True
isBarrier _ = False
-- | Check if an action is synchronised.
isSynchronised :: ActionType -> Bool
isSynchronised (SynchronisedCommit _) = True
isSynchronised a = isBarrier a
-- | Check if an action is synchronises a given 'CRef'.
synchronises :: ActionType -> CRefId -> Bool
synchronises (PartiallySynchronisedCommit c) r = c == r
synchronises (PartiallySynchronisedWrite c) r = c == r
synchronises (PartiallySynchronisedModify c) r = c == r
synchronises a _ = isBarrier a
-- | Get the 'CRef' affected.
crefOf :: ActionType -> Maybe CRefId
crefOf (UnsynchronisedRead r) = Just r
crefOf (UnsynchronisedWrite r) = Just r
crefOf (SynchronisedModify r) = Just r
crefOf (SynchronisedCommit r) = Just r
crefOf (PartiallySynchronisedCommit r) = Just r
crefOf (PartiallySynchronisedWrite r) = Just r
crefOf (PartiallySynchronisedModify r) = Just r
crefOf _ = Nothing
-- | Get the 'CVar' affected.
@ -466,9 +551,12 @@ simplify (TakeVar c _) = SynchronisedRead c
simplify (BlockedTakeVar _) = SynchronisedOther
simplify (TryTakeVar c _ _) = SynchronisedRead c
simplify (ReadRef r) = UnsynchronisedRead r
simplify (ReadRefCas r) = UnsynchronisedRead r
simplify (ModRef r) = SynchronisedModify r
simplify (ModRefCas r) = PartiallySynchronisedModify r
simplify (WriteRef r) = UnsynchronisedWrite r
simplify (CommitRef _ r) = SynchronisedCommit r
simplify (CasRef r _) = PartiallySynchronisedWrite r
simplify (CommitRef _ r) = PartiallySynchronisedCommit r
simplify (STM _) = SynchronisedOther
simplify BlockedSTM = SynchronisedOther
simplify (ThrowTo _) = SynchronisedOther
@ -483,9 +571,12 @@ simplify' (WillReadVar c) = SynchronisedRead c
simplify' (WillTakeVar c) = SynchronisedRead c
simplify' (WillTryTakeVar c) = SynchronisedRead c
simplify' (WillReadRef r) = UnsynchronisedRead r
simplify' (WillReadRefCas r) = UnsynchronisedRead r
simplify' (WillModRef r) = SynchronisedModify r
simplify' (WillModRefCas r) = PartiallySynchronisedModify r
simplify' (WillWriteRef r) = UnsynchronisedWrite r
simplify' (WillCommitRef _ r) = SynchronisedCommit r
simplify' (WillCasRef r) = PartiallySynchronisedWrite r
simplify' (WillCommitRef _ r) = PartiallySynchronisedCommit r
simplify' WillSTM = SynchronisedOther
simplify' (WillThrowTo _) = SynchronisedOther
simplify' _ = UnsynchronisedOther

View File

@ -1,5 +1,6 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE GADTs #-}
-- | Operations over @CRef@s and @CVar@s
module Test.DejaFu.Deterministic.Internal.Memory where
@ -14,7 +15,6 @@ import Test.DejaFu.Deterministic.Internal.Threading
import Test.DejaFu.Internal
import qualified Data.IntMap.Strict as I
import qualified Data.Map as M
#if __GLASGOW_HASKELL__ < 710
import Data.Foldable (mapM_)
@ -48,8 +48,8 @@ bufferWrite fixed (WriteBuffer wb) i cref@(CRef (_, ref)) new tid = do
let buffer' = I.insertWith (><) i write wb
-- Write the thread-local value to the @CRef@'s update map.
(map, def) <- readRef fixed ref
writeRef fixed ref (I.insert tid new map, def)
(map, count, def) <- readRef fixed ref
writeRef fixed ref (I.insert tid new map, count, def)
return $ WriteBuffer buffer'
@ -65,14 +65,43 @@ commitWrite fixed w@(WriteBuffer wb) i = case maybe EmptyL viewl $ I.lookup i wb
-- | Read from a @CRef@, returning a newer thread-local non-committed
-- write if there is one.
readCRef :: Monad n => Fixed n r s -> CRef r a -> ThreadId -> n a
readCRef fixed (CRef (_, ref)) tid = do
(map, def) <- readRef fixed ref
return $ I.findWithDefault def tid map
readCRef fixed cref tid = do
(val, _) <- readCRefPrim fixed cref tid
return val
-- | Write and commit to a @CRef@ immediately, clearing the update
-- map.
-- | Read from a @CRef@, returning a @Ticket@ representing the current
-- view of the thread.
readForTicket :: Monad n => Fixed n r s -> CRef r a -> ThreadId -> n (Ticket a)
readForTicket fixed cref@(CRef (crid, _)) tid = do
(val, count) <- readCRefPrim fixed cref tid
return $ Ticket (crid, count, val)
-- | Perform a compare-and-swap on a @CRef@ if the ticket is still
-- valid. This is strict in the \"new\" value argument.
casCRef :: Monad n => Fixed n r s -> CRef r a -> ThreadId -> Ticket a -> a -> n (Bool, Ticket a)
casCRef fixed cref tid (Ticket (_, cc, _)) !new = do
tick'@(Ticket (_, cc', _)) <- readForTicket fixed cref tid
if cc == cc'
then do
writeImmediate fixed cref new
tick'' <- readForTicket fixed cref tid
return (True, tick'')
else return (False, tick')
-- | Read the local state of a @CRef@.
readCRefPrim :: Monad n => Fixed n r s -> CRef r a -> ThreadId -> n (a, Integer)
readCRefPrim fixed (CRef (_, ref)) tid = do
(vals, count, def) <- readRef fixed ref
return (I.findWithDefault def tid vals, count)
-- | Write and commit to a @CRef@ immediately, clearing the update map
-- and incrementing the write count.
writeImmediate :: Monad n => Fixed n r s -> CRef r a -> a -> n ()
writeImmediate fixed (CRef (_, ref)) a = writeRef fixed ref (I.empty, a)
writeImmediate fixed (CRef (_, ref)) a = do
(_, count, _) <- readRef fixed ref
writeRef fixed ref (I.empty, count + 1, a)
-- | Flush all writes in the buffer.
writeBarrier :: Monad n => Fixed n r s -> WriteBuffer r -> n ()
@ -81,14 +110,14 @@ writeBarrier fixed (WriteBuffer wb) = mapM_ flush $ I.elems wb where
-- | Add phantom threads to the thread list to commit pending writes.
addCommitThreads :: WriteBuffer r -> Threads n r s -> Threads n r s
addCommitThreads (WriteBuffer wb) ts = ts <> M.fromList phantoms where
addCommitThreads (WriteBuffer wb) ts = ts <> I.fromList phantoms where
phantoms = [(negate k - 1, mkthread $ fromJust c) | (k, b) <- I.toList wb, let c = go $ viewl b, isJust c]
go (BufferedWrite tid (CRef (crid, _)) _ :< _) = Just $ ACommit tid crid
go EmptyL = Nothing
-- | Remove phantom threads.
delCommitThreads :: Threads n r s -> Threads n r s
delCommitThreads = M.filterWithKey $ \k _ -> k >= 0
delCommitThreads = I.filterWithKey $ \k _ -> k >= 0
--------------------------------------------------------------------------------
-- * Manipulating @CVar@s

View File

@ -7,12 +7,12 @@ module Test.DejaFu.Deterministic.Internal.Threading where
import Control.Exception (Exception, MaskingState(..), SomeException, fromException)
import Data.List (intersect, nub)
import Data.Map (Map)
import Data.IntMap.Strict (IntMap)
import Data.Maybe (fromMaybe, isJust, isNothing)
import Test.DejaFu.STM (CTVarId)
import Test.DejaFu.Deterministic.Internal.Common
import qualified Data.Map as M
import qualified Data.IntMap as I
#if __GLASGOW_HASKELL__ < 710
import Control.Applicative ((<$>))
@ -22,7 +22,7 @@ import Control.Applicative ((<$>))
-- * Threads
-- | Threads are stored in a map index by 'ThreadId'.
type Threads n r s = Map ThreadId (Thread n r s)
type Threads n r s = IntMap (Thread n r s)
-- | All the state of a thread.
data Thread n r s = Thread
@ -67,14 +67,14 @@ thread ~= theblock = case (_blocking thread, theblock) of
-- deadlock.
isLocked :: ThreadId -> Threads n r a -> Bool
isLocked tid ts
| allKnown = case M.lookup tid ts of
| allKnown = case I.lookup tid ts of
Just thread -> noRefs $ _blocking thread
Nothing -> False
| otherwise = M.null $ M.filter (isNothing . _blocking) ts
| otherwise = I.null $ I.filter (isNothing . _blocking) ts
where
-- | Check if all threads are in a fully-known state.
allKnown = all _fullknown $ M.elems ts
allKnown = all _fullknown $ I.elems ts
-- | Check if no other runnable thread has a reference to anything
-- the block references.
@ -85,11 +85,11 @@ isLocked tid ts
-- | Get IDs of all threads (other than the one under
-- consideration) which reference a 'CVar'.
findCVar cvarid = M.keys $ M.filterWithKey (check [Left cvarid]) ts
findCVar cvarid = I.keys $ I.filterWithKey (check [Left cvarid]) ts
-- | Get IDs of all runnable threads (other than the one under
-- consideration) which reference some 'CTVar's.
findCTVars ctvids = M.keys $ M.filterWithKey (check (map Right ctvids)) ts
findCTVars ctvids = I.keys $ I.filterWithKey (check (map Right ctvids)) ts
-- | Check if a thread references a variable, and if it's not the
-- thread under consideration.
@ -106,7 +106,7 @@ data Handler n r s = forall e. Exception e => Handler (e -> Action n r s)
-- | Propagate an exception upwards, finding the closest handler
-- which can deal with it.
propagate :: SomeException -> ThreadId -> Threads n r s -> Maybe (Threads n r s)
propagate e tid threads = case M.lookup tid threads >>= go . _handlers of
propagate e tid threads = case I.lookup tid threads >>= go . _handlers of
Just (act, hs) -> Just $ except act hs tid threads
Nothing -> Nothing
@ -120,36 +120,36 @@ interruptible thread = _masking thread == Unmasked || (_masking thread == Masked
-- | Register a new exception handler.
catching :: Exception e => (e -> Action n r s) -> ThreadId -> Threads n r s -> Threads n r s
catching h = M.alter $ \(Just thread) -> Just $ thread { _handlers = Handler h : _handlers thread }
catching h = I.alter $ \(Just thread) -> Just $ thread { _handlers = Handler h : _handlers thread }
-- | Remove the most recent exception handler.
uncatching :: ThreadId -> Threads n r s -> Threads n r s
uncatching = M.alter $ \(Just thread) -> Just $ thread { _handlers = tail $ _handlers thread }
uncatching = I.alter $ \(Just thread) -> Just $ thread { _handlers = tail $ _handlers thread }
-- | Raise an exception in a thread.
except :: Action n r s -> [Handler n r s] -> ThreadId -> Threads n r s -> Threads n r s
except act hs = M.alter $ \(Just thread) -> Just $ thread { _continuation = act, _handlers = hs, _blocking = Nothing }
except act hs = I.alter $ \(Just thread) -> Just $ thread { _continuation = act, _handlers = hs, _blocking = Nothing }
-- | Set the masking state of a thread.
mask :: MaskingState -> ThreadId -> Threads n r s -> Threads n r s
mask ms = M.alter $ \(Just thread) -> Just $ thread { _masking = ms }
mask ms = I.alter $ \(Just thread) -> Just $ thread { _masking = ms }
--------------------------------------------------------------------------------
-- * Manipulating threads
-- | Replace the @Action@ of a thread.
goto :: Action n r s -> ThreadId -> Threads n r s -> Threads n r s
goto a = M.alter $ \(Just thread) -> Just (thread { _continuation = a })
goto a = I.alter $ \(Just thread) -> Just (thread { _continuation = a })
-- | Start a thread with the given ID, inheriting the masking state
-- from the parent thread. This ID must not already be in use!
launch :: ThreadId -> ThreadId -> ((forall b. M n r s b -> M n r s b) -> Action n r s) -> Threads n r s -> Threads n r s
launch parent tid a threads = launch' mask tid a threads where
mask = fromMaybe Unmasked $ _masking <$> M.lookup parent threads
mask = fromMaybe Unmasked $ _masking <$> I.lookup parent threads
-- | Start a thread with the given ID and masking state. This must not already be in use!
launch' :: MaskingState -> ThreadId -> ((forall b. M n r s b -> M n r s b) -> Action n r s) -> Threads n r s -> Threads n r s
launch' mask tid a = M.insert tid thread where
launch' mask tid a = I.insert tid thread where
thread = Thread { _continuation = a umask, _blocking = Nothing, _handlers = [], _masking = mask, _known = [], _fullknown = False }
umask mb = resetMask True Unmasked >> mb >>= \b -> resetMask False mask >> return b
@ -157,11 +157,11 @@ launch' mask tid a = M.insert tid thread where
-- | Kill a thread.
kill :: ThreadId -> Threads n r s -> Threads n r s
kill = M.delete
kill = I.delete
-- | Block a thread.
block :: BlockedOn -> ThreadId -> Threads n r s -> Threads n r s
block blockedOn = M.alter doBlock where
block blockedOn = I.alter doBlock where
doBlock (Just thread) = Just $ thread { _blocking = Just blockedOn }
doBlock _ = error "Invariant failure in 'block': thread does NOT exist!"
@ -169,7 +169,7 @@ block blockedOn = M.alter doBlock where
-- blocks, this will wake all threads waiting on at least one of the
-- given 'CTVar's.
wake :: BlockedOn -> Threads n r s -> (Threads n r s, [ThreadId])
wake blockedOn threads = (M.map unblock threads, M.keys $ M.filter isBlocked threads) where
wake blockedOn threads = (I.map unblock threads, I.keys $ I.filter isBlocked threads) where
unblock thread
| isBlocked thread = thread { _blocking = Nothing }
| otherwise = thread
@ -180,18 +180,18 @@ wake blockedOn threads = (M.map unblock threads, M.keys $ M.filter isBlocked thr
-- | Record that a thread knows about a shared variable.
knows :: [Either CVarId CTVarId] -> ThreadId -> Threads n r s -> Threads n r s
knows theids = M.alter go where
knows theids = I.alter go where
go (Just thread) = Just $ thread { _known = nub $ theids ++ _known thread }
go _ = error "Invariant failure in 'knows': thread does NOT exist!"
-- | Forget about a shared variable.
forgets :: [Either CVarId CTVarId] -> ThreadId -> Threads n r s -> Threads n r s
forgets theids = M.alter go where
forgets theids = I.alter go where
go (Just thread) = Just $ thread { _known = filter (`notElem` theids) $ _known thread }
go _ = error "Invariant failure in 'forgets': thread does NOT exist!"
-- | Record that a thread's shared variable state is fully known.
fullknown :: ThreadId -> Threads n r s -> Threads n r s
fullknown = M.alter go where
fullknown = I.alter go where
go (Just thread) = Just $ thread { _fullknown = True }
go _ = error "Invariant failure in 'fullknown': thread does NOT exist!"

View File

@ -298,7 +298,7 @@ dependentActions memtype buf a1 a2 = case (a1, a2) of
(_, _)
-- Two actions on the same CRef where at least one is synchronised
| same crefOf && (isSynchronised a1 || isSynchronised a2) -> True
| same crefOf && (synchronises a1 (fromJust $ crefOf a1) || synchronises a2 (fromJust $ crefOf a2)) -> True
-- Two actions on the same CVar
| same cvarOf -> True

View File

@ -104,6 +104,7 @@ library
-- other-modules:
-- other-extensions:
build-depends: base >=4.5 && <5
, atomic-primops
, containers
, deepseq
, exceptions >=0.7