mirror of
https://github.com/composewell/streamly.git
synced 2024-09-20 07:58:27 +03:00
rename queue variables
This commit is contained in:
parent
e2b2acd6b5
commit
f9f6c96d7a
@ -86,8 +86,8 @@ data ChildEvent a =
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
data Context m a =
|
||||
Context { childChannel :: TBQueue (ChildEvent a)
|
||||
, pendingWork :: TBQueue (AsyncT m a)
|
||||
Context { outputQueue :: TBQueue (ChildEvent a)
|
||||
, workQueue :: TBQueue (AsyncT m a)
|
||||
, runningThreads :: IORef (Set ThreadId)
|
||||
, doneThreads :: IORef (Set ThreadId)
|
||||
}
|
||||
@ -185,9 +185,9 @@ push ctx = run (Just ctx) (dequeueLoop ctx)
|
||||
|
||||
where
|
||||
|
||||
send msg = atomically $ writeTBQueue (childChannel ctx) msg
|
||||
send msg = atomically $ writeTBQueue (outputQueue ctx) msg
|
||||
stop = do
|
||||
workQEmpty <- liftIO $ atomically $ isEmptyTBQueue (pendingWork ctx)
|
||||
workQEmpty <- liftIO $ atomically $ isEmptyTBQueue (workQueue ctx)
|
||||
if (not workQEmpty) then push ctx
|
||||
else liftIO $ myThreadId >>= \tid -> send (ChildStop tid Nothing)
|
||||
yield a _ Nothing = liftIO $ myThreadId >>= \tid -> send (ChildDone tid a)
|
||||
@ -250,16 +250,16 @@ pullWorker ctx = AsyncT $ \pctx stp yld -> do
|
||||
done <- f ctx tid
|
||||
if done then finish else cont
|
||||
|
||||
res <- liftIO $ atomically $ tryReadTBQueue (childChannel ctx)
|
||||
res <- liftIO $ atomically $ tryReadTBQueue (outputQueue ctx)
|
||||
case res of
|
||||
Nothing -> do
|
||||
liftIO $ threadDelay 4
|
||||
let workQ = pendingWork ctx
|
||||
outQ = childChannel ctx
|
||||
let workQ = workQueue ctx
|
||||
outQ = outputQueue ctx
|
||||
workQEmpty <- liftIO $ atomically $ isEmptyTBQueue workQ
|
||||
outQEmpty <- liftIO $ atomically $ isEmptyTBQueue outQ
|
||||
when (not workQEmpty && outQEmpty) $ pushWorker ctx
|
||||
void $ liftIO $ atomically $ peekTBQueue (childChannel ctx)
|
||||
void $ liftIO $ atomically $ peekTBQueue (outputQueue ctx)
|
||||
continue
|
||||
Just ev ->
|
||||
case ev of
|
||||
@ -289,8 +289,7 @@ handleChildException pchan e = do
|
||||
{-# INLINE pushWorker #-}
|
||||
pushWorker :: MonadAsync m => Context m a -> m ()
|
||||
pushWorker ctx = do
|
||||
let chan = childChannel ctx
|
||||
tid <- doFork (push ctx) (handleChildException chan)
|
||||
tid <- doFork (push ctx) (handleChildException (outputQueue ctx))
|
||||
liftIO $ modifyIORef (runningThreads ctx) $ (\s -> S.insert tid s)
|
||||
|
||||
-- | Split the original computation in a pull-push pair. The original
|
||||
@ -305,14 +304,14 @@ pullFork m1 m2 = AsyncT $ \_ stp yld -> do
|
||||
where
|
||||
|
||||
newContext = do
|
||||
channel <- atomically $ newTBQueue 32
|
||||
work <- atomically $ newTBQueue 32
|
||||
outQ <- atomically $ newTBQueue 32
|
||||
workQ <- atomically $ newTBQueue 32
|
||||
running <- newIORef S.empty
|
||||
done <- newIORef S.empty
|
||||
return $ Context { childChannel = channel
|
||||
return $ Context { outputQueue = outQ
|
||||
, runningThreads = running
|
||||
, doneThreads = done
|
||||
, pendingWork = work
|
||||
, workQueue = workQ
|
||||
}
|
||||
|
||||
-- Concurrency rate control. Our objective is to create more threads on
|
||||
@ -349,9 +348,9 @@ pullFork m1 m2 = AsyncT $ \_ stp yld -> do
|
||||
{-# INLINE forkWorker #-}
|
||||
forkWorker :: MonadAsync m => Context m a -> m ()
|
||||
forkWorker ctx = do
|
||||
let chan = childChannel ctx
|
||||
tid <- doFork (push ctx) (handleChildException chan)
|
||||
liftIO $ atomically $ writeTBQueue chan (ChildCreate tid)
|
||||
let q = outputQueue ctx
|
||||
tid <- doFork (push ctx) (handleChildException q)
|
||||
liftIO $ atomically $ writeTBQueue q (ChildCreate tid)
|
||||
|
||||
{-# INLINE queueWork #-}
|
||||
queueWork :: MonadAsync m => Context m a -> AsyncT m a -> m ()
|
||||
@ -363,14 +362,14 @@ queueWork ctx m = do
|
||||
-- TBD If we run out of threads we can also evaluate the action completely
|
||||
-- right here, disallowing any further child workers and turning the
|
||||
-- parallel composition into interleaved serial composition.
|
||||
workQFull <- liftIO $ atomically $ isFullTBQueue (pendingWork ctx)
|
||||
workQFull <- liftIO $ atomically $ isFullTBQueue (workQueue ctx)
|
||||
when (workQFull) $ forkWorker ctx
|
||||
liftIO $ atomically $ writeTBQueue (pendingWork ctx) m
|
||||
liftIO $ atomically $ writeTBQueue (workQueue ctx) m
|
||||
|
||||
{-# INLINE dequeueLoop #-}
|
||||
dequeueLoop :: MonadAsync m => Context m a -> AsyncT m a
|
||||
dequeueLoop ctx = AsyncT $ \_ stp yld -> do
|
||||
work <- liftIO $ atomically $ tryReadTBQueue (pendingWork ctx)
|
||||
work <- liftIO $ atomically $ tryReadTBQueue (workQueue ctx)
|
||||
case work of
|
||||
Nothing -> stp
|
||||
Just m -> do
|
||||
|
Loading…
Reference in New Issue
Block a user