diff --git a/src/Streamly/Core.hs b/src/Streamly/Core.hs index e57adcac4..ba752cd0f 100644 --- a/src/Streamly/Core.hs +++ b/src/Streamly/Core.hs @@ -75,7 +75,8 @@ import Control.Monad.IO.Class (MonadIO(..)) import Control.Monad.Trans.Class (MonadTrans (lift)) import Control.Monad.Trans.Control (MonadBaseControl, control) import Data.Atomics (casIORef, readForCAS, peekTicket - ,atomicModifyIORefCAS_) + ,atomicModifyIORefCAS_ + ,writeBarrier) import Data.Concurrent.Queue.MichaelScott (LinkedQueue, newQ, pushL, tryPopR, nullQ) import Data.Functor (void) @@ -424,9 +425,9 @@ send sv msg = do len <- atomicModifyIORefCAS (outputQueue sv) $ \(es, n) -> ((msg : es, n + 1), n) when (len <= 0) $ do - -- XXX need a memory barrier? The wake up must happen only after the - -- store has finished otherwise we can have lost wakeup problems. - -- + -- The wake up must happen only after the store has finished otherwise + -- we can have lost wakeup problems. + writeBarrier -- Since multiple workers can try this at the same time, it is possible -- that we may put a spurious MVar after the consumer has already seen -- the output. But that's harmless, at worst it may cause the consumer @@ -463,6 +464,7 @@ enqueueLIFO sv q m = do case ms of [] -> (m : ms, True) _ -> (m : ms, False) + writeBarrier when v $ void $ tryPutMVar (doorBell sv) () atomicModifyIORefCAS_ (waitingForWork sv) (const False) @@ -507,7 +509,9 @@ enqueueFIFO sv q m = do withDoorBell = do emp <- nullQ q pushL q m + writeBarrier when emp $ void $ tryPutMVar (doorBell sv) () + atomicModifyIORefCAS_ (waitingForWork sv) (const False) runqueueFIFO :: MonadIO m => SVar m a -> LinkedQueue (Stream m a) -> m () runqueueFIFO sv q = run @@ -612,6 +616,7 @@ enqueueAhead sv q m = do ([], n) -> ([m], n + 1) -- increment sequence _ -> error "not empty" when w $ do + writeBarrier void $ tryPutMVar (doorBell sv) () atomicModifyIORefCAS_ (waitingForWork sv) (const False) @@ -770,7 +775,9 @@ modifyThread sv tid = do then let new = (S.delete tid old) in (new, new) else let new = (S.insert tid old) in (new, old) if null changed - then liftIO $ void $ tryPutMVar (doorBell sv) () + then liftIO $ do + writeBarrier + void $ tryPutMVar (doorBell sv) () else return () -- | This is safe even if we are adding more threads concurrently because if @@ -878,6 +885,7 @@ sendWorkerWait sv = do -- the enqueue to send us a doorbell. liftIO $ atomicModifyIORefCAS_ (waitingForWork sv) $ const True + liftIO $ writeBarrier -- check again, this time we have set the waitingForWork flag so we -- are guaranteed to get a doorbell in case the status changed