Rename some dispatcher functions

This commit is contained in:
Harendra Kumar 2024-02-15 06:09:24 +05:30
parent 9992b820f2
commit 79e743667b
3 changed files with 20 additions and 20 deletions

View File

@ -19,11 +19,11 @@ module Streamly.Internal.Data.Stream.Channel
, module Streamly.Internal.Data.Stream.Channel.Dispatcher , module Streamly.Internal.Data.Stream.Channel.Dispatcher
, module Streamly.Internal.Data.Stream.Channel.Consumer , module Streamly.Internal.Data.Stream.Channel.Consumer
, module Streamly.Internal.Data.Stream.Channel.Operations , module Streamly.Internal.Data.Stream.Channel.Operations
, chanConcatMapK
-- ** Evaluation -- ** Evaluation
, withChannelK , withChannelK
, withChannel , withChannel
, chanConcatMapK
-- quiesceChannel -- wait for running tasks but do not schedule any more. -- quiesceChannel -- wait for running tasks but do not schedule any more.
) )
where where

View File

@ -63,11 +63,11 @@ readOutputQBounded eagerEval sv = do
cnt <- liftIO $ readIORef $ workerCount sv cnt <- liftIO $ readIORef $ workerCount sv
when (cnt <= 0) $ do when (cnt <= 0) $ do
done <- liftIO $ isWorkDone sv done <- liftIO $ isWorkDone sv
when (not done) (pushWorker 0 sv) when (not done) (forkWorker 0 sv)
{-# INLINE blockingRead #-} {-# INLINE blockingRead #-}
blockingRead = do blockingRead = do
sendWorkerWait eagerEval sendWorkerDelay (dispatchWorker 0) sv dispatchAllWait eagerEval sendWorkerDelay (dispatchWorker 0) sv
liftIO (fst `fmap` readOutputQChan sv) liftIO (fst `fmap` readOutputQChan sv)
-- | Same as 'readOutputQBounded' but uses 'dispatchWorkerPaced' to -- | Same as 'readOutputQBounded' but uses 'dispatchWorkerPaced' to
@ -90,7 +90,7 @@ readOutputQPaced sv = do
{-# INLINE blockingRead #-} {-# INLINE blockingRead #-}
blockingRead = do blockingRead = do
sendWorkerWait False sendWorkerDelayPaced dispatchWorkerPaced sv dispatchAllWait False sendWorkerDelayPaced dispatchWorkerPaced sv
liftIO (fst `fmap` readOutputQChan sv) liftIO (fst `fmap` readOutputQChan sv)
-- | If there is work to do dispatch as many workers as the target rate -- | If there is work to do dispatch as many workers as the target rate
@ -113,7 +113,7 @@ postProcessPaced sv = do
-- finished, therefore we cannot just rely on dispatchWorkerPaced -- finished, therefore we cannot just rely on dispatchWorkerPaced
-- which may or may not send a worker. -- which may or may not send a worker.
noWorker <- allThreadsDone (workerThreads sv) noWorker <- allThreadsDone (workerThreads sv)
when noWorker $ pushWorker 0 sv when noWorker $ forkWorker 0 sv
return r return r
else return False else return False
@ -137,7 +137,7 @@ postProcessBounded sv = do
r <- liftIO $ isWorkDone sv r <- liftIO $ isWorkDone sv
-- Note that we need to guarantee a worker, therefore we cannot just -- Note that we need to guarantee a worker, therefore we cannot just
-- use dispatchWorker which may or may not send a worker. -- use dispatchWorker which may or may not send a worker.
when (not r) (pushWorker 0 sv) when (not r) (forkWorker 0 sv)
-- XXX do we need to dispatch many here? -- XXX do we need to dispatch many here?
-- void $ dispatchWorker sv -- void $ dispatchWorker sv
return r return r

View File

@ -12,10 +12,10 @@ module Streamly.Internal.Data.Stream.Channel.Dispatcher
-- *** Worker Dispatching -- *** Worker Dispatching
-- | Low level functions used to build readOutputQ and postProcess -- | Low level functions used to build readOutputQ and postProcess
-- functions. -- functions.
pushWorker forkWorker
, dispatchWorker , dispatchWorker
, dispatchWorkerPaced , dispatchWorkerPaced
, sendWorkerWait , dispatchAllWait
, sendWorkerDelay , sendWorkerDelay
, sendWorkerDelayPaced , sendWorkerDelayPaced
, startChannel -- XXX bootstrap? , startChannel -- XXX bootstrap?
@ -46,12 +46,12 @@ import Streamly.Internal.Data.Stream.Channel.Type
-- | Low level API to create a worker. Forks a thread which executes the -- | Low level API to create a worker. Forks a thread which executes the
-- 'workLoop' of the channel. -- 'workLoop' of the channel.
{-# NOINLINE pushWorker #-} {-# NOINLINE forkWorker #-}
pushWorker :: MonadRunInIO m => forkWorker :: MonadRunInIO m =>
Count -- ^ max yield limit for the worker Count -- ^ max yield limit for the worker
-> Channel m a -> Channel m a
-> m () -> m ()
pushWorker yieldMax sv = do forkWorker yieldMax sv = do
liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1 liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1
when (svarInspectMode sv) when (svarInspectMode sv)
$ recordMaxWorkers (workerCount sv) (svarStats sv) $ recordMaxWorkers (workerCount sv) (svarStats sv)
@ -140,7 +140,7 @@ checkMaxBuffer active sv = do
(_, n) <- liftIO $ readIORef (outputQueue sv) (_, n) <- liftIO $ readIORef (outputQueue sv)
return $ fromIntegral lim > n + active return $ fromIntegral lim > n + active
-- | Higher level API to dispatch a worker, it uses 'pushWorker' to create a -- | Higher level API to dispatch a worker, it uses 'forkWorker' to create a
-- worker. Dispatches a worker only if: -- worker. Dispatches a worker only if:
-- --
-- * the channel has work to do -- * the channel has work to do
@ -176,13 +176,13 @@ dispatchWorker yieldCount sv = do
then do then do
r1 <- checkMaxBuffer active sv r1 <- checkMaxBuffer active sv
if r1 if r1
then pushWorker yieldCount sv >> return True then forkWorker yieldCount sv >> return True
else return False else return False
else return False else return False
else do else do
when (active <= 0) $ do when (active <= 0) $ do
r <- liftIO $ isWorkDone sv r <- liftIO $ isWorkDone sv
when (not r) $ pushWorker 0 sv when (not r) $ forkWorker 0 sv
return False return False
else return False else return False
@ -314,15 +314,15 @@ dispatchWorkerPaced sv = do
-- --
-- When this function returns we are sure that there is some output available. -- When this function returns we are sure that there is some output available.
-- --
{-# NOINLINE sendWorkerWait #-} {-# NOINLINE dispatchAllWait #-}
sendWorkerWait dispatchAllWait
:: MonadIO m :: MonadIO m
=> Bool -- ^ 'eager' option is on => Bool -- ^ 'eager' option is on
-> (Channel m a -> IO ()) -- ^ delay function -> (Channel m a -> IO ()) -- ^ delay function
-> (Channel m a -> m Bool) -- ^ dispatcher function -> (Channel m a -> m Bool) -- ^ dispatcher function
-> Channel m a -> Channel m a
-> m () -> m ()
sendWorkerWait eagerEval delay dispatch sv = go dispatchAllWait eagerEval delay dispatch sv = go
where where
@ -400,7 +400,7 @@ sendWorkerWait eagerEval delay dispatch sv = go
$ withDiagMVar $ withDiagMVar
(svarInspectMode sv) (svarInspectMode sv)
(dumpChannel sv) (dumpChannel sv)
"sendWorkerWait: nothing to do" "dispatchAllWait: nothing to do"
$ takeMVar (outputDoorBell sv) $ takeMVar (outputDoorBell sv)
(_, len) <- liftIO $ readIORef (outputQueue sv) (_, len) <- liftIO $ readIORef (outputQueue sv)
if len <= 0 if len <= 0
@ -417,11 +417,11 @@ startChannel :: MonadRunInIO m =>
Channel m a -> m () Channel m a -> m ()
startChannel chan = do startChannel chan = do
case yieldRateInfo chan of case yieldRateInfo chan of
Nothing -> pushWorker 0 chan Nothing -> forkWorker 0 chan
Just yinfo -> Just yinfo ->
if svarLatencyTarget yinfo == maxBound if svarLatencyTarget yinfo == maxBound
then liftIO $ threadDelay maxBound then liftIO $ threadDelay maxBound
else pushWorker 1 chan else forkWorker 1 chan
-- | Noop as of now. -- | Noop as of now.
sendWorkerDelayPaced :: Channel m a -> IO () sendWorkerDelayPaced :: Channel m a -> IO ()