Reimplement CVar blocking in the same way as CTVar blocking

This commit is contained in:
Michael Walker 2015-02-09 23:43:18 +00:00
parent 7f26aa9654
commit 1cc5ade782
4 changed files with 53 additions and 79 deletions

View File

@ -348,5 +348,6 @@ findFailures2 p xs = findFailures xs 0 [] where
-- | Pretty-print a failure
showfail :: Failure -> String
showfail Deadlock = "[deadlock]"
showfail STMDeadlock = "[stm-deadlock]"
showfail InternalError = "[internal-error]"
showfail FailureInNoTest = "[_concNoTest]"

View File

@ -105,7 +105,7 @@ atomically stm = C $ cont $ AAtom stm
newEmptyCVar :: Conc t (CVar t a)
newEmptyCVar = C $ cont lifted where
lifted c = ANew $ \cvid -> c <$> newEmptyCVar' cvid
newEmptyCVar' cvid = V <$> newSTRef (cvid, Nothing, [])
newEmptyCVar' cvid = V <$> newSTRef (cvid, Nothing)
-- | Block on a 'CVar' until it is empty, then write to it.
putCVar :: CVar t a -> a -> Conc t ()

View File

@ -110,7 +110,7 @@ atomically stm = C $ cont $ AAtom stm
newEmptyCVar :: ConcIO t (CVar t a)
newEmptyCVar = C $ cont lifted where
lifted c = ANew $ \cvid -> c <$> newEmptyCVar' cvid
newEmptyCVar' cvid = V <$> newIORef (cvid, Nothing, [])
newEmptyCVar' cvid = V <$> newIORef (cvid, Nothing)
-- | Block on a 'CVar' until it is empty, then write to it.
putCVar :: CVar t a -> a -> ConcIO t ()

View File

@ -6,13 +6,13 @@
module Test.DejaFu.Deterministic.Internal where
import Control.DeepSeq (NFData(..))
import Control.Monad (liftM, mapAndUnzipM)
import Control.Monad (liftM)
import Control.Monad.Cont (Cont, runCont)
import Control.State
import Data.List (intersect)
import Data.List.Extra
import Data.Map (Map)
import Data.Maybe (catMaybes, fromJust, isJust, isNothing)
import Data.Maybe (fromJust, isJust, isNothing)
import Test.DejaFu.STM (CTVarId, Result(..), initialCTVarId)
import qualified Data.Map as M
@ -22,9 +22,9 @@ import qualified Data.Map as M
-- | The underlying monad is based on continuations over Actions.
type M n r s a = Cont (Action n r s) a
-- | CVars are represented as a reference containing a Maybe value, a
-- list of things blocked on it, and a unique numeric identifier.
type R r a = r (CVarId, Maybe a, [Block])
-- | CVars are represented as a unique numeric identifier, and a
-- reference containing a Maybe value.
type R r a = r (CVarId, Maybe a)
-- | Dict of methods for implementations to override.
type Fixed n r s = Wrapper n r (Cont (Action n r s))
@ -187,18 +187,15 @@ runFixed fixed runstm sched s ma = do
-- * Running threads
-- | A @Block@ is used to determine what sort of @CVar@-related block
-- a thread is experiencing.
data Block = WaitFull ThreadId | WaitEmpty ThreadId deriving Eq
-- | A @BlockedOn@ is used to determine what sort of variable a thread
-- is blocked on.
data BlockedOn = OnCVar | OnCTVar [CTVarId] deriving Eq
data BlockedOn = OnCVarFull CVarId | OnCVarEmpty CVarId | OnCTVar [CTVarId] deriving Eq
-- | Determine if a thread is blocked in a certain way.
-- | Determine if a thread is blocked in a certainw ay.
(~=) :: (a, Maybe BlockedOn) -> BlockedOn -> Bool
(_, Just OnCVar) ~= OnCVar = True
(_, Just (OnCTVar _)) ~= (OnCTVar _) = True
(_, Just (OnCVarFull _)) ~= (OnCVarFull _) = True
(_, Just (OnCVarEmpty _)) ~= (OnCVarEmpty _) = True
(_, Just (OnCTVar _)) ~= (OnCTVar _) = True
_ ~= _ = False
-- | Threads are represented as a tuple of (next action, is blocked).
@ -238,7 +235,8 @@ runThreads fixed runstm sched origg origthreads ref = go (-1, initialCTVarId, 0)
isBlocked = isJust . snd $ fromJust thread
isNonexistant = isNothing thread
isTerminated = 0 `notElem` M.keys threads
isDeadlocked = M.null runnable && ((~= OnCVar) `fmap` M.lookup 0 threads) == Just True
isDeadlocked = M.null runnable && (((~= OnCVarFull undefined) `fmap` M.lookup 0 threads) == Just True ||
((~= OnCVarEmpty undefined) `fmap` M.lookup 0 threads) == Just True)
isSTMLocked = M.null runnable && ((~= OnCTVar []) `fmap` M.lookup 0 threads) == Just True
decision
@ -291,12 +289,12 @@ stepThread fixed runstm action (scheduler, schedstate) (lastcvid, lastctvid, las
-- | Get the value from a @CVar@, without emptying, blocking the
-- thread until it's full.
stepGet ref c = do
(cvid, val, _) <- readRef (wref fixed) ref
(cvid, val) <- readRef (wref fixed) ref
case val of
Just val' -> return $ Right (goto (c val') tid threads, lastcvid, lastctvid, lasttid, Read cvid)
Nothing -> do
threads' <- block fixed ref WaitFull tid threads
return $ Right (threads', lastcvid, lastctvid, lasttid, BlockedRead cvid)
Nothing -> return $
let threads' = block (OnCVarFull cvid) tid threads
in Right (threads', lastcvid, lastctvid, lasttid, BlockedRead cvid)
-- | Take the value from a @CVar@, blocking the thread until it's
-- full.
@ -314,13 +312,14 @@ stepThread fixed runstm action (scheduler, schedstate) (lastcvid, lastctvid, las
-- | Run a STM transaction atomically.
stepAtom stm c = do
(res, newctvid) <- runstm stm lastctvid
case res of
Success touched val ->
let (threads', woken) = wakeSTM touched threads
in return $ Right (goto (c val) tid threads', lastcvid, newctvid, lasttid, STM woken)
Retry touched ->
let threads' = blockSTM touched tid threads
in return $ Right (threads', lastcvid, newctvid, lasttid, BlockedSTM)
return . Right $
case res of
Success touched val ->
let (threads', woken) = wake (OnCTVar touched) threads
in (goto (c val) tid threads', lastcvid, newctvid, lasttid, STM woken)
Retry touched ->
let threads' = block (OnCTVar touched) tid threads
in (threads', lastcvid, newctvid, lasttid, BlockedSTM)
-- | Create a new @CVar@, using the next 'CVarId'.
stepNew na = do
@ -349,7 +348,7 @@ stepThread fixed runstm action (scheduler, schedstate) (lastcvid, lastctvid, las
-- | Get the ID of a CVar
getCVarId :: Monad n => Fixed n r s -> R r a -> n CVarId
getCVarId fixed ref = (\(cvid,_,_) -> cvid) `liftM` readRef (wref fixed) ref
getCVarId fixed ref = fst `liftM` readRef (wref fixed) ref
-- | Put a value into a @CVar@, in either a blocking or nonblocking
-- way.
@ -357,20 +356,20 @@ putIntoCVar :: Monad n
=> Bool -> R r a -> a -> (Bool -> Action n r s)
-> Fixed n r s -> ThreadId -> Threads n r s -> n (Bool, Threads n r s, [ThreadId])
putIntoCVar blocking ref a c fixed threadid threads = do
(cvid, val, blocks) <- readRef (wref fixed) ref
(cvid, val) <- readRef (wref fixed) ref
case val of
Just _
| blocking -> do
threads' <- block fixed ref WaitEmpty threadid threads
return (False, threads', [])
| blocking ->
let threads' = block (OnCVarEmpty cvid) threadid threads
in return (False, threads', [])
| otherwise ->
return (False, goto (c False) threadid threads, [])
Nothing -> do
writeRef (wref fixed) ref (cvid, Just a, blocks)
(threads', woken) <- wake fixed ref WaitFull threads
writeRef (wref fixed) ref (cvid, Just a)
let (threads', woken) = wake (OnCVarFull cvid) threads
return (True, goto (c True) threadid threads', woken)
-- | Take a value from a @CVar@, in either a blocking or nonblocking
@ -379,18 +378,18 @@ takeFromCVar :: Monad n
=> Bool -> R r a -> (Maybe a -> Action n r s)
-> Fixed n r s -> ThreadId -> Threads n r s -> n (Bool, Threads n r s, [ThreadId])
takeFromCVar blocking ref c fixed threadid threads = do
(cvid, val, blocks) <- readRef (wref fixed) ref
(cvid, val) <- readRef (wref fixed) ref
case val of
Just _ -> do
writeRef (wref fixed) ref (cvid, Nothing, blocks)
(threads', woken) <- wake fixed ref WaitEmpty threads
writeRef (wref fixed) ref (cvid, Nothing)
let (threads', woken) = wake (OnCVarEmpty cvid) threads
return (True, goto (c val) threadid threads', woken)
Nothing
| blocking -> do
threads' <- block fixed ref WaitFull threadid threads
return (False, threads', [])
| blocking ->
let threads' = block (OnCVarFull cvid) threadid threads
in return (False, threads', [])
| otherwise ->
return (False, goto (c Nothing) threadid threads, [])
@ -401,14 +400,6 @@ takeFromCVar blocking ref c fixed threadid threads = do
goto :: Action n r s -> ThreadId -> Threads n r s -> Threads n r s
goto a = M.alter $ \(Just (_, b)) -> Just (a, b)
-- | Block a thread on a @CVar@.
block :: Monad n
=> Fixed n r s -> R r a -> (ThreadId -> Block) -> ThreadId -> Threads n r s -> n (Threads n r s)
block fixed ref typ tid threads = do
(cvid, val, blocks) <- readRef (wref fixed) ref
writeRef (wref fixed) ref (cvid, val, typ tid : blocks)
return $ M.alter (\(Just (a, _)) -> Just (a, Just OnCVar)) tid threads
-- | Start a thread with the given ID. This must not already be in use!
launch :: ThreadId -> Action n r s -> Threads n r s -> Threads n r s
launch tid a = M.insert tid (a, Nothing)
@ -417,38 +408,20 @@ launch tid a = M.insert tid (a, Nothing)
kill :: ThreadId -> Threads n r s -> Threads n r s
kill = M.delete
-- | Wake every thread blocked on a @CVar@ read/write.
wake :: Monad n
=> Fixed n r s -> R r a -> (ThreadId -> Block) -> Threads n r s -> n (Threads n r s, [ThreadId])
wake fixed ref typ m = do
(m', woken) <- mapAndUnzipM wake' (M.toList m)
-- | Block a thread.
block :: BlockedOn -> ThreadId -> Threads n r s -> Threads n r s
block blockedOn = M.alter doBlock where
doBlock (Just (a, _)) = Just (a, Just blockedOn)
return (M.fromList m', catMaybes woken)
where
wake' a@(tid, (act, Just OnCVar)) = do
let blck = typ tid
(cvid, val, blocks) <- readRef (wref fixed) ref
if blck `elem` blocks
then writeRef (wref fixed) ref (cvid, val, filter (/= blck) blocks) >> return ((tid, (act, Nothing)), Just tid)
else return (a, Nothing)
wake' a = return (a, Nothing)
-- | Block a thread on some 'CTVar's.
blockSTM :: [CTVarId] -> ThreadId -> Threads n r s -> Threads n r s
blockSTM blockedOn = M.alter block where
block (Just (a, _)) = Just (a, Just $ OnCTVar blockedOn)
-- | Unblock all threads waiting on at least one of the given
-- 'CTVar's, returning the list of unblocked threads.
wakeSTM :: [CTVarId] -> Threads n r s -> (Threads n r s, [ThreadId])
wakeSTM blockedOn threads = (M.map unblock threads, M.keys $ M.filter isBlocked threads) where
unblock thread@(a, Just (OnCTVar _))
-- | Unblock all threads waiting on the appropriate block. For 'CTVar'
-- blocks, this will wake all threads waiting on at least one of the
-- given 'CTVar's.
wake :: BlockedOn -> Threads n r s -> (Threads n r s, [ThreadId])
wake blockedOn threads = (M.map unblock threads, M.keys $ M.filter isBlocked threads) where
unblock thread@(a, _)
| isBlocked thread = (a, Nothing)
| otherwise = thread
unblock thread = thread
isBlocked (_, Just (OnCTVar ctvids)) = ctvids `intersect` blockedOn /= []
isBlocked _ = False
isBlocked (_, theblock) = case (theblock, blockedOn) of
(Just (OnCTVar ctvids), OnCTVar blockedOn') -> ctvids `intersect` blockedOn' /= []
_ -> theblock == Just blockedOn