diff --git a/src/Streamly/Internal/Data/Stream/Channel.hs b/src/Streamly/Internal/Data/Stream/Channel.hs index e08cbf68c..14897c0db 100644 --- a/src/Streamly/Internal/Data/Stream/Channel.hs +++ b/src/Streamly/Internal/Data/Stream/Channel.hs @@ -19,11 +19,11 @@ module Streamly.Internal.Data.Stream.Channel , module Streamly.Internal.Data.Stream.Channel.Dispatcher , module Streamly.Internal.Data.Stream.Channel.Consumer , module Streamly.Internal.Data.Stream.Channel.Operations + , chanConcatMapK -- ** Evaluation , withChannelK , withChannel - , chanConcatMapK -- quiesceChannel -- wait for running tasks but do not schedule any more. ) where diff --git a/src/Streamly/Internal/Data/Stream/Channel/Consumer.hs b/src/Streamly/Internal/Data/Stream/Channel/Consumer.hs index bf173649e..b299a90cb 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Consumer.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Consumer.hs @@ -63,11 +63,11 @@ readOutputQBounded eagerEval sv = do cnt <- liftIO $ readIORef $ workerCount sv when (cnt <= 0) $ do done <- liftIO $ isWorkDone sv - when (not done) (pushWorker 0 sv) + when (not done) (forkWorker 0 sv) {-# INLINE blockingRead #-} blockingRead = do - sendWorkerWait eagerEval sendWorkerDelay (dispatchWorker 0) sv + dispatchAllWait eagerEval sendWorkerDelay (dispatchWorker 0) sv liftIO (fst `fmap` readOutputQChan sv) -- | Same as 'readOutputQBounded' but uses 'dispatchWorkerPaced' to @@ -90,7 +90,7 @@ readOutputQPaced sv = do {-# INLINE blockingRead #-} blockingRead = do - sendWorkerWait False sendWorkerDelayPaced dispatchWorkerPaced sv + dispatchAllWait False sendWorkerDelayPaced dispatchWorkerPaced sv liftIO (fst `fmap` readOutputQChan sv) -- | If there is work to do dispatch as many workers as the target rate @@ -113,7 +113,7 @@ postProcessPaced sv = do -- finished, therefore we cannot just rely on dispatchWorkerPaced -- which may or may not send a worker. noWorker <- allThreadsDone (workerThreads sv) - when noWorker $ pushWorker 0 sv + when noWorker $ forkWorker 0 sv return r else return False @@ -137,7 +137,7 @@ postProcessBounded sv = do r <- liftIO $ isWorkDone sv -- Note that we need to guarantee a worker, therefore we cannot just -- use dispatchWorker which may or may not send a worker. - when (not r) (pushWorker 0 sv) + when (not r) (forkWorker 0 sv) -- XXX do we need to dispatch many here? -- void $ dispatchWorker sv return r diff --git a/src/Streamly/Internal/Data/Stream/Channel/Dispatcher.hs b/src/Streamly/Internal/Data/Stream/Channel/Dispatcher.hs index 0b968d5f1..f339abe74 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Dispatcher.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Dispatcher.hs @@ -12,10 +12,10 @@ module Streamly.Internal.Data.Stream.Channel.Dispatcher -- *** Worker Dispatching -- | Low level functions used to build readOutputQ and postProcess -- functions. - pushWorker + forkWorker , dispatchWorker , dispatchWorkerPaced - , sendWorkerWait + , dispatchAllWait , sendWorkerDelay , sendWorkerDelayPaced , startChannel -- XXX bootstrap? @@ -46,12 +46,12 @@ import Streamly.Internal.Data.Stream.Channel.Type -- | Low level API to create a worker. Forks a thread which executes the -- 'workLoop' of the channel. -{-# NOINLINE pushWorker #-} -pushWorker :: MonadRunInIO m => +{-# NOINLINE forkWorker #-} +forkWorker :: MonadRunInIO m => Count -- ^ max yield limit for the worker -> Channel m a -> m () -pushWorker yieldMax sv = do +forkWorker yieldMax sv = do liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1 when (svarInspectMode sv) $ recordMaxWorkers (workerCount sv) (svarStats sv) @@ -140,7 +140,7 @@ checkMaxBuffer active sv = do (_, n) <- liftIO $ readIORef (outputQueue sv) return $ fromIntegral lim > n + active --- | Higher level API to dispatch a worker, it uses 'pushWorker' to create a +-- | Higher level API to dispatch a worker, it uses 'forkWorker' to create a -- worker. Dispatches a worker only if: -- -- * the channel has work to do @@ -176,13 +176,13 @@ dispatchWorker yieldCount sv = do then do r1 <- checkMaxBuffer active sv if r1 - then pushWorker yieldCount sv >> return True + then forkWorker yieldCount sv >> return True else return False else return False else do when (active <= 0) $ do r <- liftIO $ isWorkDone sv - when (not r) $ pushWorker 0 sv + when (not r) $ forkWorker 0 sv return False else return False @@ -314,15 +314,15 @@ dispatchWorkerPaced sv = do -- -- When this function returns we are sure that there is some output available. -- -{-# NOINLINE sendWorkerWait #-} -sendWorkerWait +{-# NOINLINE dispatchAllWait #-} +dispatchAllWait :: MonadIO m => Bool -- ^ 'eager' option is on -> (Channel m a -> IO ()) -- ^ delay function -> (Channel m a -> m Bool) -- ^ dispatcher function -> Channel m a -> m () -sendWorkerWait eagerEval delay dispatch sv = go +dispatchAllWait eagerEval delay dispatch sv = go where @@ -400,7 +400,7 @@ sendWorkerWait eagerEval delay dispatch sv = go $ withDiagMVar (svarInspectMode sv) (dumpChannel sv) - "sendWorkerWait: nothing to do" + "dispatchAllWait: nothing to do" $ takeMVar (outputDoorBell sv) (_, len) <- liftIO $ readIORef (outputQueue sv) if len <= 0 @@ -417,11 +417,11 @@ startChannel :: MonadRunInIO m => Channel m a -> m () startChannel chan = do case yieldRateInfo chan of - Nothing -> pushWorker 0 chan + Nothing -> forkWorker 0 chan Just yinfo -> if svarLatencyTarget yinfo == maxBound then liftIO $ threadDelay maxBound - else pushWorker 1 chan + else forkWorker 1 chan -- | Noop as of now. sendWorkerDelayPaced :: Channel m a -> IO ()