mirror of
https://github.com/ilyakooo0/streamly.git
synced 2024-10-07 05:38:42 +03:00
rename variable
This commit is contained in:
parent
a0d9419533
commit
3946e8f062
@ -97,7 +97,7 @@ data ChildEvent a =
|
||||
-- exceptions or using atomicModify
|
||||
data Context m a =
|
||||
Context { outputQueue :: IORef [ChildEvent a]
|
||||
, synchOutQ :: MVar Bool -- wakeup mechanism for outQ
|
||||
, doorBell :: MVar Bool -- wakeup mechanism for outQ
|
||||
, enqueue :: AsyncT m a -> IO ()
|
||||
, runqueue :: m ()
|
||||
, runningThreads :: IORef (Set ThreadId)
|
||||
@ -219,7 +219,7 @@ send ctx msg = liftIO $ do
|
||||
atomicModifyIORefCAS_ (outputQueue ctx) $ \es -> msg : es
|
||||
-- XXX need a memory barrier? The wake up must happen only after the
|
||||
-- store has finished otherwise we can have lost wakeup problems.
|
||||
void $ tryPutMVar (synchOutQ ctx) True
|
||||
void $ tryPutMVar (doorBell ctx) True
|
||||
|
||||
{-# INLINE sendStop #-}
|
||||
sendStop :: MonadIO m => Context m a -> m ()
|
||||
@ -319,13 +319,13 @@ sendWorkerWait ctx = do
|
||||
-- sending a worker in a loop running into a livelock
|
||||
done <- queueEmpty ctx
|
||||
when (not done) (pushWorker ctx)
|
||||
void $ liftIO $ takeMVar (synchOutQ ctx)
|
||||
void $ liftIO $ takeMVar (doorBell ctx)
|
||||
|
||||
-- Note: This is performance sensitive code.
|
||||
{-# NOINLINE pullWorker #-}
|
||||
pullWorker :: MonadAsync m => Context m a -> AsyncT m a
|
||||
pullWorker ctx = AsyncT $ \pctx stp yld -> do
|
||||
res <- liftIO $ tryTakeMVar (synchOutQ ctx)
|
||||
res <- liftIO $ tryTakeMVar (doorBell ctx)
|
||||
when (isNothing res) $ sendWorkerWait ctx
|
||||
list <- liftIO $ atomicModifyIORefCAS (outputQueue ctx) $ \x -> ([], x)
|
||||
(runAsyncT $ processEvents list) pctx stp yld
|
||||
@ -377,7 +377,7 @@ pullFork m1 m2 fifo = AsyncT $ \_ stp yld -> do
|
||||
pushL q m1 >> pushL q m2
|
||||
let ctx =
|
||||
Context { outputQueue = outQ
|
||||
, synchOutQ = outQMv
|
||||
, doorBell = outQMv
|
||||
, runningThreads = running
|
||||
, runqueue = runqueueFIFO ctx q
|
||||
, enqueue = pushL q
|
||||
@ -390,7 +390,7 @@ pullFork m1 m2 fifo = AsyncT $ \_ stp yld -> do
|
||||
let checkEmpty = liftIO (readIORef q) >>= return . null
|
||||
let ctx =
|
||||
Context { outputQueue = outQ
|
||||
, synchOutQ = outQMv
|
||||
, doorBell = outQMv
|
||||
, runningThreads = running
|
||||
, runqueue = runqueueLIFO ctx q
|
||||
, enqueue = enqueueLIFO q
|
||||
|
Loading…
Reference in New Issue
Block a user