mirror of
https://github.com/barrucadu/dejafu.git
synced 2025-01-03 11:13:06 +03:00
Mutating a CVar only wakes up threads blocked on that CVar.
This commit is contained in:
parent
df5756c812
commit
86a2617b41
@ -85,14 +85,10 @@ instance C.ConcCVar (CVar t) (Conc t) where
|
||||
-- monad. Internally, these are implemented as 'IORef's, but they are
|
||||
-- structured to behave fairly similarly to 'MVar's. One notable
|
||||
-- difference is that 'MVar's are single-wakeup, and wake up in a FIFO
|
||||
-- order. @CVar@s wake up all blocked threads, and it is up to the
|
||||
-- scheduler which one runs next.
|
||||
--
|
||||
-- In fact, due to implementation failings (which will be fixed!) at
|
||||
-- the moment, mutating a @CVar@ wakes up /all/ threads blocking in an
|
||||
-- appropriate way (i.e., on read or write, depending on the
|
||||
-- mutation), not just the ones blocked on this particular @CVar@.
|
||||
newtype CVar t a = V (IORef (Maybe a)) deriving Eq
|
||||
-- order. Writing to a @CVar@ wakes up all threads blocked on reading
|
||||
-- it, and it is up to the scheduler which one runs next. Taking from
|
||||
-- a @CVar@ behaves analogously.
|
||||
newtype CVar t a = V (IORef (Maybe a, [Block])) deriving Eq
|
||||
|
||||
-- | Lift an 'IO' action into the 'Conc' monad.
|
||||
--
|
||||
@ -125,7 +121,7 @@ fork (C ma) = C $ cont $ \c -> Fork (runCont ma $ const Stop) $ c ()
|
||||
-- | Create a new empty 'CVar'.
|
||||
new :: Conc t (CVar t a)
|
||||
new = liftIO $ do
|
||||
ioref <- newIORef Nothing
|
||||
ioref <- newIORef (Nothing, [])
|
||||
return $ V ioref
|
||||
|
||||
-- | Block on a 'CVar' until it is empty, then write to it.
|
||||
@ -177,7 +173,7 @@ runConc' :: Scheduler s -> s -> (forall t. Conc t a) -> IO (Maybe a, s)
|
||||
runConc' sched s ma = do
|
||||
mvar <- newEmptyMVar
|
||||
let (C c) = ma >>= liftIO . putMVar mvar . Just
|
||||
s' <- runThreads (negate 1) sched s (M.fromList [(0, (runCont c $ const Stop, Nothing))]) mvar
|
||||
s' <- runThreads (negate 1) sched s (M.fromList [(0, (runCont c $ const Stop, False))]) mvar
|
||||
out <- takeMVar mvar
|
||||
return (out, s')
|
||||
|
||||
@ -217,10 +213,12 @@ makeNP sched = newsched where
|
||||
|
||||
-- | A @Block@ is used to determine what sort of block a thread is
|
||||
-- experiencing.
|
||||
data Block = WaitFull | WaitEmpty
|
||||
data Block = WaitFull ThreadId | WaitEmpty ThreadId deriving Eq
|
||||
|
||||
-- | Run a collection of threads, until there are no threads left.
|
||||
runThreads :: ThreadId -> Scheduler s -> s -> Map ThreadId (Action, Maybe Block) -> MVar (Maybe a) -> IO s
|
||||
--
|
||||
-- A thread is represented as a tuple of (next action, is blocked).
|
||||
runThreads :: ThreadId -> Scheduler s -> s -> Map ThreadId (Action, Bool) -> MVar (Maybe a) -> IO s
|
||||
runThreads last sched s threads mvar
|
||||
| M.null threads = return s
|
||||
| M.null runnable = do
|
||||
@ -238,43 +236,43 @@ runThreads last sched s threads mvar
|
||||
|
||||
where
|
||||
(chosen, s') = if last == -1 then (0, s) else sched s last $ M.keys runnable
|
||||
runnable = M.filter (isNothing . snd) threads
|
||||
runnable = M.filter (not . snd) threads
|
||||
thread = M.lookup chosen threads
|
||||
isBlocked = isJust . snd . fromJust $ M.lookup chosen threads
|
||||
isBlocked = snd . fromJust $ M.lookup chosen threads
|
||||
|
||||
-- | Run a single thread one step, by dispatching on the type of
|
||||
-- 'Action'.
|
||||
runThread :: (Action, ThreadId) -> Map ThreadId (Action, Maybe Block) -> IO (Map ThreadId (Action, Maybe Block))
|
||||
runThread :: (Action, ThreadId) -> Map ThreadId (Action, Bool) -> IO (Map ThreadId (Action, Bool))
|
||||
runThread (Fork a b, i) threads = return . goto b i $ launch a threads
|
||||
|
||||
runThread (Put v a c, i) threads = do
|
||||
let (V ref) = v
|
||||
val <- readIORef ref
|
||||
(val, blocks) <- readIORef ref
|
||||
case val of
|
||||
Just _ -> return $ block WaitEmpty i threads
|
||||
Just _ -> block v WaitEmpty i threads
|
||||
Nothing -> do
|
||||
writeIORef ref $ Just a
|
||||
return . goto c i $ wakeGetters threads
|
||||
writeIORef ref (Just a, blocks)
|
||||
goto c i <$> wake v WaitFull threads
|
||||
|
||||
runThread (Get v c, i) threads = do
|
||||
let (V ref) = v
|
||||
val <- readIORef ref
|
||||
(val, _) <- readIORef ref
|
||||
case val of
|
||||
Just val' -> return $ goto (c val') i threads
|
||||
Nothing -> return $ block WaitFull i threads
|
||||
Nothing -> block v WaitFull i threads
|
||||
|
||||
runThread (Take v c, i) threads = do
|
||||
let (V ref) = v
|
||||
val <- readIORef ref
|
||||
(val, blocks) <- readIORef ref
|
||||
case val of
|
||||
Just val' -> do
|
||||
writeIORef ref Nothing
|
||||
return . goto (c val') i $ wakePutters threads
|
||||
Nothing -> return $ block WaitFull i threads
|
||||
writeIORef ref (Nothing, blocks)
|
||||
goto (c val') i <$> wake v WaitEmpty threads
|
||||
Nothing -> block v WaitFull i threads
|
||||
|
||||
runThread (TryTake v c, i) threads = do
|
||||
let (V ref) = v
|
||||
val <- readIORef ref
|
||||
(val, _) <- readIORef ref
|
||||
return $ goto (c val) i threads
|
||||
|
||||
runThread (Lift io, i) threads = do
|
||||
@ -287,26 +285,30 @@ runThread (Stop, i) threads = return $ kill i threads
|
||||
goto :: Ord k => a -> k -> Map k (a, b) -> Map k (a, b)
|
||||
goto a = M.alter $ \(Just (_, b)) -> Just (a, b)
|
||||
|
||||
-- | Set the 'Block' of a thread.
|
||||
block :: Ord k => b -> k -> Map k (a, Maybe b) -> Map k (a, Maybe b)
|
||||
block b = M.alter $ \(Just (a, _)) -> Just (a, Just b)
|
||||
-- | Block a thread on a 'CVar'.
|
||||
block :: Ord k => CVar t v -> (k -> Block) -> k -> Map k (a, Bool) -> IO (Map k (a, Bool))
|
||||
block (V ref) typ tid threads = do
|
||||
(val, blocks) <- readIORef ref
|
||||
writeIORef ref (val, typ tid : blocks)
|
||||
return $ M.alter (\(Just (a, _)) -> Just (a, True)) tid threads
|
||||
|
||||
-- | Start a thread with the next free ID.
|
||||
launch :: (Ord k, Enum k) => a -> Map k (a, Maybe b) -> Map k (a, Maybe b)
|
||||
launch a m = M.insert (succ . maximum $ M.keys m) (a, Nothing) m
|
||||
launch :: (Ord k, Enum k) => a -> Map k (a, Bool) -> Map k (a, Bool)
|
||||
launch a m = M.insert (succ . maximum $ M.keys m) (a, False) m
|
||||
|
||||
-- | Kill a thread.
|
||||
kill :: Ord k => k -> Map k (a, b) -> Map k (a, b)
|
||||
kill = M.delete
|
||||
|
||||
-- | Wake every thread blocked on a 'CVar' read.
|
||||
wakeGetters :: Map k (a, Maybe Block) -> Map k (a, Maybe Block)
|
||||
wakeGetters = M.map wake where
|
||||
wake (a, Just WaitFull) = (a, Nothing)
|
||||
wake a = a
|
||||
wake :: Ord k => CVar t v -> (k -> Block) -> Map k (a, Bool) -> IO (Map k (a, Bool))
|
||||
wake (V ref) typ = fmap M.fromList . mapM wake . M.toList where
|
||||
wake a@(tid, (act, True)) = do
|
||||
let block = typ tid
|
||||
(val, blocks) <- readIORef ref
|
||||
|
||||
-- | Wake every thread blocked on a 'CVar' write.
|
||||
wakePutters :: Map k (a, Maybe Block) -> Map k (a, Maybe Block)
|
||||
wakePutters = M.map wake where
|
||||
wake (a, Just WaitEmpty) = (a, Nothing)
|
||||
wake a = a
|
||||
if block `elem` blocks
|
||||
then writeIORef ref (val, filter (/= block) blocks) >> return (tid, (act, False))
|
||||
else return a
|
||||
|
||||
wake a = return a
|
||||
|
Loading…
Reference in New Issue
Block a user