Add write barriers for doorBell

This commit is contained in:
Harendra Kumar 2018-06-02 22:39:06 +05:30
parent 6a231858e3
commit 3324822d6a

View File

@ -75,7 +75,8 @@ import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Class (MonadTrans (lift))
import Control.Monad.Trans.Control (MonadBaseControl, control)
import Data.Atomics (casIORef, readForCAS, peekTicket
,atomicModifyIORefCAS_)
,atomicModifyIORefCAS_
,writeBarrier)
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, newQ, pushL,
tryPopR, nullQ)
import Data.Functor (void)
@ -424,9 +425,9 @@ send sv msg = do
len <- atomicModifyIORefCAS (outputQueue sv) $ \(es, n) ->
((msg : es, n + 1), n)
when (len <= 0) $ do
-- XXX need a memory barrier? The wake up must happen only after the
-- store has finished otherwise we can have lost wakeup problems.
--
-- The wake up must happen only after the store has finished otherwise
-- we can have lost wakeup problems.
writeBarrier
-- Since multiple workers can try this at the same time, it is possible
-- that we may put a spurious MVar after the consumer has already seen
-- the output. But that's harmless, at worst it may cause the consumer
@ -463,6 +464,7 @@ enqueueLIFO sv q m = do
case ms of
[] -> (m : ms, True)
_ -> (m : ms, False)
writeBarrier
when v $ void $ tryPutMVar (doorBell sv) ()
atomicModifyIORefCAS_ (waitingForWork sv) (const False)
@ -507,7 +509,9 @@ enqueueFIFO sv q m = do
withDoorBell = do
emp <- nullQ q
pushL q m
writeBarrier
when emp $ void $ tryPutMVar (doorBell sv) ()
atomicModifyIORefCAS_ (waitingForWork sv) (const False)
runqueueFIFO :: MonadIO m => SVar m a -> LinkedQueue (Stream m a) -> m ()
runqueueFIFO sv q = run
@ -612,6 +616,7 @@ enqueueAhead sv q m = do
([], n) -> ([m], n + 1) -- increment sequence
_ -> error "not empty"
when w $ do
writeBarrier
void $ tryPutMVar (doorBell sv) ()
atomicModifyIORefCAS_ (waitingForWork sv) (const False)
@ -770,7 +775,9 @@ modifyThread sv tid = do
then let new = (S.delete tid old) in (new, new)
else let new = (S.insert tid old) in (new, old)
if null changed
then liftIO $ void $ tryPutMVar (doorBell sv) ()
then liftIO $ do
writeBarrier
void $ tryPutMVar (doorBell sv) ()
else return ()
-- | This is safe even if we are adding more threads concurrently because if
@ -878,6 +885,7 @@ sendWorkerWait sv = do
-- the enqueue to send us a doorbell.
liftIO $ atomicModifyIORefCAS_ (waitingForWork sv) $ const True
liftIO $ writeBarrier
-- check again, this time we have set the waitingForWork flag so we
-- are guaranteed to get a doorbell in case the status changed