Update docs for Stream channels and refactor (minor)

This commit is contained in:
Harendra Kumar 2024-02-12 22:58:07 +05:30
parent 00e667d2a7
commit fdf2684cc2
11 changed files with 204 additions and 121 deletions

View File

@ -5,6 +5,9 @@
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
-- This module contains operations that are common for Stream and Fold
-- channels.
module Streamly.Internal.Data.Channel
(

View File

@ -413,6 +413,8 @@ sendStop workerCount rateInfo q bell workerInfo = do
myThreadId >>= \tid ->
void $ sendEvent q bell (ChildStop tid Nothing)
-- XXX Shouldn't we perform a workerStopUpdate even in this case?
-- | Add a 'ChildStop' event with exception to the channel's output queue.
{-# NOINLINE sendException #-}
sendException ::

View File

@ -5,15 +5,9 @@
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
-- Non-parallelizable stream combinators like unfoldrM, iterateM etc. can be
-- evaluated concurrently with the stream consumer by using `eval`.
-- Parallelizable combinators like repeatM, replicateM can generate the stream
-- concurrently using 'concatMap'.
-- Single effects related functionality can be moved to
-- Data.Async/Control.Async.
-- Common Channel functionality to Data.Channel.
-- Stream channel to Data.Stream.Channel.
module Streamly.Internal.Data.Stream.Concurrent
@ -21,10 +15,8 @@ module Streamly.Internal.Data.Stream.Concurrent
-- * Imports
-- $setup
module Streamly.Internal.Data.Stream.Concurrent.Channel
-- * Types
, MonadAsync
MonadAsync -- XXX Move in channel?
-- * Combinators
-- | Stream combinators using a concurrent channel
@ -89,7 +81,6 @@ import Control.Monad.IO.Class (MonadIO(liftIO))
import Streamly.Internal.Control.Concurrent (MonadAsync, askRunInIO)
import Streamly.Internal.Control.ForkLifted (forkManaged)
import Streamly.Internal.Data.Channel.Dispatcher (modifyThread)
import Streamly.Internal.Data.Channel.Types (ChildEvent(..))
import Streamly.Internal.Data.Channel.Worker (sendEvent)
import Streamly.Internal.Data.Stream (Stream, Step(..))
import Streamly.Internal.Data.SVar.Type (adaptState)
@ -100,6 +91,7 @@ import qualified Streamly.Internal.Data.Stream as D
import qualified Streamly.Internal.Data.StreamK as K
import Prelude hiding (mapM, sequence, concat, concatMap, zipWith)
import Streamly.Internal.Data.Channel.Types
import Streamly.Internal.Data.Stream.Concurrent.Channel
-- $setup
@ -113,8 +105,7 @@ import Streamly.Internal.Data.Stream.Concurrent.Channel
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Data.Parser as Parser
-- >>> import qualified Streamly.Data.StreamK as StreamK
-- >>> import qualified Streamly.Internal.Data.Stream as Stream hiding (append2)
-- >>> import qualified Streamly.Internal.Data.Stream.Concurrent as Stream
-- >>> import qualified Streamly.Internal.Data.Stream.Prelude as Stream
-- >>> import Prelude hiding (concatMap, concat, zipWith)
-- >>> :{
-- delay n = do
@ -192,7 +183,7 @@ _appendGeneric newChan modifier stream1 stream2 = K.concatEffect action
action = do
chan <- newChan modifier
let cfg = modifier defaultConfig
done = K.nilM (stopChannel chan)
done = K.nilM (shutdown chan)
case getStopWhen cfg of
AllStop -> do
toChannelK chan stream2
@ -333,7 +324,7 @@ parConcatMapChanK chan f stream =
parConcatMapChanKAny :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKAny chan f stream =
let done = K.nilM (stopChannel chan)
let done = K.nilM (shutdown chan)
run q = concatMapDivK q (\x -> K.append (f x) done)
in K.concatMapEffect (`run` stream) (mkEnqueue chan run)
@ -341,7 +332,7 @@ parConcatMapChanKAny chan f stream =
parConcatMapChanKFirst :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKFirst chan f stream =
let done = K.nilM (stopChannel chan)
let done = K.nilM (shutdown chan)
run q = concatMapDivK q f
in K.concatEffect $ do
res <- K.uncons stream

View File

@ -8,29 +8,30 @@
module Streamly.Internal.Data.Stream.Concurrent.Channel
(
-- ** Primitives
module Streamly.Internal.Data.Stream.Concurrent.Channel.Type
, module Streamly.Internal.Data.Stream.Concurrent.Channel.Operations
, module Streamly.Internal.Data.Stream.Concurrent.Channel.Append
, module Streamly.Internal.Data.Stream.Concurrent.Channel.Interleave
, module Streamly.Internal.Data.Stream.Concurrent.Channel.Dispatcher
, module Streamly.Internal.Data.Stream.Concurrent.Channel.Consumer
-- * Channel
, Channel (..)
-- ** Allocating and Using
, newChannel
, module Streamly.Internal.Data.Stream.Concurrent.Channel.Append
, module Streamly.Internal.Data.Stream.Concurrent.Channel.Interleave
, withChannel
, withChannelK
-- quiesceChannel -- wait for running tasks but do not schedule any more.
-- ** Conversions
, module Streamly.Internal.Data.Stream.Concurrent.Channel.Operations
-- * Configuration
-- ** Configuration
, Config
, defaultConfig
-- ** Limits
-- *** Limits
, maxThreads
, maxBuffer
-- ** Rate Control
-- *** Rate Control
, Rate(..)
, rate
, avgRate
@ -38,17 +39,17 @@ module Streamly.Internal.Data.Stream.Concurrent.Channel
, maxRate
, constRate
-- ** Stop behavior
-- *** Stop behavior
, StopWhen (..)
, stopWhen
, getStopWhen
-- ** Scheduling behavior
-- *** Scheduling behavior
, eager
, ordered
, interleaved
-- ** Diagnostics
-- *** Diagnostics
, inspect
)
where

View File

@ -95,7 +95,7 @@ workLoopLIFO qref sv winfo = run
work <- dequeue qref
case work of
QEmpty ->
liftIO $ stop sv winfo
liftIO $ stopWith winfo sv
QInner (RunInIO runin, m) ->
process runin m True
QOuter (RunInIO runin, m) ->
@ -119,16 +119,16 @@ workLoopLIFO qref sv winfo = run
res <- restoreM r
case res of
Continue -> run
Suspend -> liftIO $ stop sv winfo
Suspend -> liftIO $ stopWith winfo sv
where
single a = do
res <- liftIO $ yield sv winfo a
res <- liftIO $ yieldWith winfo sv a
return $ if res then Continue else Suspend
yieldk a r = do
res <- liftIO $ yield sv winfo a
res <- liftIO $ yieldWith winfo sv a
if res
then K.foldStreamShared undefined yieldk single (return Continue) r
else do
@ -159,7 +159,7 @@ workLoopLIFOLimited qref sv winfo = run
work <- dequeue qref
case work of
QEmpty ->
liftIO $ stop sv winfo
liftIO $ stopWith winfo sv
QInner item ->
process item True
QOuter item ->
@ -185,24 +185,24 @@ workLoopLIFOLimited qref sv winfo = run
res <- restoreM r
case res of
Continue -> run
Suspend -> liftIO $ stop sv winfo
Suspend -> liftIO $ stopWith winfo sv
-- Avoid any side effects, undo the yield limit decrement if we
-- never yielded anything.
else liftIO $ do
enqueueLIFO sv qref inner item
incrementYieldLimit (remainingWork sv)
stop sv winfo
stopWith winfo sv
where
single a = do
res <- liftIO $ yield sv winfo a
res <- liftIO $ yieldWith winfo sv a
return $ if res then Continue else Suspend
-- XXX can we pass on the yield limit downstream to limit the
-- concurrency of constituent streams.
yieldk a r = do
res <- liftIO $ yield sv winfo a
res <- liftIO $ yieldWith winfo sv a
yieldLimitOk <- liftIO $ decrementYieldLimit (remainingWork sv)
if res && yieldLimitOk
then K.foldStreamShared undefined yieldk single incrContinue r
@ -494,7 +494,7 @@ preStopCheck sv heap =
abortExecution :: Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution sv winfo = do
incrementYieldLimit (remainingWork sv)
stop sv winfo
stopWith winfo sv
-- XXX In absence of a "noyield" primitive (i.e. do not pre-empt inside a
-- critical section) from GHC RTS, we have a difficult problem. Assume we have
@ -536,7 +536,7 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry
then liftIO $ do
-- put the entry back in the heap and stop
requeueOnHeapTop heap (Entry seqNo ent) seqNo
stop sv winfo
stopWith winfo sv
else runStreamWithYieldLimit True seqNo r
loopHeap seqNo ent =
@ -563,13 +563,13 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry
res <- liftIO $ dequeueFromHeapSeq heap (prevSeqNo + 1)
case res of
Ready (Entry seqNo hent) -> loopHeap seqNo hent
Clearing -> liftIO $ stop sv winfo
Clearing -> liftIO $ stopWith winfo sv
Waiting _ ->
if stopping
then do
r <- liftIO $ preStopCheck sv heap
if r
then liftIO $ stop sv winfo
then liftIO $ stopWith winfo sv
else processWorkQueue prevSeqNo
else inline processWorkQueue prevSeqNo
@ -579,7 +579,7 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry
then do
work <- dequeueAhead q
case work of
Nothing -> liftIO $ stop sv winfo
Nothing -> liftIO $ stopWith winfo sv
Just (m, seqNo) -> do
if seqNo == prevSeqNo + 1
then processWithToken q heap sv winfo m seqNo
@ -591,7 +591,7 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry
-- only in yield continuation where we may have a remaining stream to be
-- pushed on the heap.
singleStreamFromHeap seqNo a = do
void $ liftIO $ yield sv winfo a
void $ liftIO $ yieldWith winfo sv a
nextHeap seqNo
-- XXX when we have an unfinished stream on the heap we cannot account all
@ -620,10 +620,10 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry
liftIO $ do
requeueOnHeapTop heap ent seqNo
incrementYieldLimit (remainingWork sv)
stop sv winfo
stopWith winfo sv
yieldStreamFromHeap seqNo a r = do
continue <- liftIO $ yield sv winfo a
continue <- liftIO $ yieldWith winfo sv a
runStreamWithYieldLimit continue seqNo r
{-# NOINLINE drainHeap #-}
@ -639,7 +639,7 @@ drainHeap q heap sv winfo = do
case r of
Ready (Entry seqNo hent) ->
processHeap q heap sv winfo hent seqNo True
_ -> liftIO $ stop sv winfo
_ -> liftIO $ stopWith winfo sv
data HeapStatus = HContinue | HStop
@ -754,7 +754,7 @@ processWithToken q heap sv winfo action sno = do
where
singleOutput seqNo a = do
continue <- liftIO $ yield sv winfo a
continue <- liftIO $ yieldWith winfo sv a
if continue
then return $ TokenContinue (seqNo + 1)
else do
@ -765,7 +765,7 @@ processWithToken q heap sv winfo action sno = do
-- incrementing the yield in a stop continuation. Essentiatlly all
-- "unstream" calls in this function must increment yield limit on stop.
yieldOutput seqNo a r = do
continue <- liftIO $ yield sv winfo a
continue <- liftIO $ yieldWith winfo sv a
yieldLimitOk <- liftIO $ decrementYieldLimit (remainingWork sv)
if continue && yieldLimitOk
then do
@ -844,7 +844,7 @@ workLoopAhead q heap sv winfo = do
case r of
Ready (Entry seqNo hent) ->
processHeap q heap sv winfo hent seqNo False
Clearing -> liftIO $ stop sv winfo
Clearing -> liftIO $ stopWith winfo sv
Waiting _ -> do
-- Before we execute the next item from the work queue we check
-- if we are beyond the yield limit. It is better to check the
@ -867,7 +867,7 @@ workLoopAhead q heap sv winfo = do
then do
work <- dequeueAhead q
case work of
Nothing -> liftIO $ stop sv winfo
Nothing -> liftIO $ stopWith winfo sv
Just (m, seqNo) -> do
if seqNo == 0
then processWithToken q heap sv winfo m seqNo

View File

@ -374,7 +374,7 @@ sendWorkerWait eagerEval delay dispatch sv = go
liftIO
$ withDiagMVar
(svarInspectMode sv)
(dumpSVar sv)
(dumpChannel sv)
"sendWorkerWait: nothing to do"
$ takeMVar (outputDoorBell sv)
(_, len) <- liftIO $ readIORef (outputQueue sv)

View File

@ -65,7 +65,7 @@ workLoopFIFO q sv winfo = run
run = do
work <- liftIO $ tryPopR q
case work of
Nothing -> liftIO $ stop sv winfo
Nothing -> liftIO $ stopWith winfo sv
Just (RunInIO runin, m) -> do
r <- liftIO
$ runin
@ -74,10 +74,10 @@ workLoopFIFO q sv winfo = run
res <- restoreM r
case res of
Continue -> run
Suspend -> liftIO $ stop sv winfo
Suspend -> liftIO $ stopWith winfo sv
single a = do
res <- liftIO $ yield sv winfo a
res <- liftIO $ yieldWith winfo sv a
return $ if res then Continue else Suspend
-- XXX in general we would like to yield "n" elements from a single stream
@ -85,7 +85,7 @@ workLoopFIFO q sv winfo = run
-- expensive in certain cases. Similarly, we can use time limit for
-- yielding.
yieldk a r = do
res <- liftIO $ yield sv winfo a
res <- liftIO $ yieldWith winfo sv a
runInIO <- askRunInIO
-- XXX If the queue is empty we do not need to enqueue. We can just
-- continue evaluating the stream.
@ -109,7 +109,7 @@ workLoopFIFOLimited q sv winfo = run
run = do
work <- liftIO $ tryPopR q
case work of
Nothing -> liftIO $ stop sv winfo
Nothing -> liftIO $ stopWith winfo sv
Just (RunInIO runin, m) -> do
yieldLimitOk <- liftIO $ decrementYieldLimit (remainingWork sv)
if yieldLimitOk
@ -121,18 +121,18 @@ workLoopFIFOLimited q sv winfo = run
res <- restoreM r
case res of
Continue -> run
Suspend -> liftIO $ stop sv winfo
Suspend -> liftIO $ stopWith winfo sv
else liftIO $ do
enqueueFIFO sv q (RunInIO runin, m)
incrementYieldLimit (remainingWork sv)
stop sv winfo
stopWith winfo sv
single a = do
res <- liftIO $ yield sv winfo a
res <- liftIO $ yieldWith winfo sv a
return $ if res then Continue else Suspend
yieldk a r = do
res <- liftIO $ yield sv winfo a
res <- liftIO $ yieldWith winfo sv a
runInIO <- askRunInIO
liftIO $ enqueueFIFO sv q (runInIO, r)
yieldLimitOk <- liftIO $ decrementYieldLimit (remainingWork sv)

View File

@ -128,7 +128,7 @@ fromChannelRaw sv = K.MkStream $ \st yld sng stp -> do
when (svarInspectMode sv) $ liftIO $ do
t <- getTime Monotonic
writeIORef (svarStopTime (svarStats sv)) (Just t)
printSVar (dumpSVar sv) "SVar Done"
printSVar (dumpChannel sv) "SVar Done"
{-# INLINE processEvents #-}
processEvents [] = K.MkStream $ \st yld sng stp -> do
@ -203,7 +203,7 @@ fromChannelK sv =
when (svarInspectMode sv) $ do
r <- liftIO $ readIORef (svarStopTime (svarStats sv))
when (isNothing r) $
printSVar (dumpSVar sv) "SVar Garbage Collected"
printSVar (dumpChannel sv) "SVar Garbage Collected"
cleanupSVar (workerThreads sv)
-- If there are any SVars referenced by this SVar a GC will prompt
-- them to be cleaned up quickly.
@ -249,7 +249,7 @@ _fromChannelD svar = D.Stream step FromSVarInit
when (svarInspectMode svar) $ do
r <- liftIO $ readIORef (svarStopTime (svarStats svar))
when (isNothing r) $
printSVar (dumpSVar svar) "SVar Garbage Collected"
printSVar (dumpChannel svar) "SVar Garbage Collected"
cleanupSVar (workerThreads svar)
-- If there are any SVars referenced by this SVar a GC will prompt
-- them to be cleaned up quickly.
@ -290,5 +290,5 @@ _fromChannelD svar = D.Stream step FromSVarInit
when (svarInspectMode sv) $ do
t <- liftIO $ getTime Monotonic
liftIO $ writeIORef (svarStopTime (svarStats sv)) (Just t)
liftIO $ printSVar (dumpSVar sv) "SVar Done"
liftIO $ printSVar (dumpChannel sv) "SVar Done"
return D.Stop

View File

@ -9,15 +9,17 @@
module Streamly.Internal.Data.Stream.Concurrent.Channel.Type
(
Channel(..)
, yield
, stop
, stopChannel
, dumpSVar
, yieldWith
, stopWith
, exceptionWith
, shutdown
, dumpChannel
)
where
import Control.Concurrent (ThreadId)
import Control.Concurrent.MVar (MVar)
import Control.Exception (SomeException(..))
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (IORef)
@ -27,7 +29,7 @@ import Streamly.Internal.Control.Concurrent (RunInIO)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS_)
import Streamly.Internal.Data.Channel.Dispatcher (dumpSVarStats)
import Streamly.Internal.Data.Channel.Worker
(sendYield, sendStop, sendEvent)
(sendYield, sendStop, sendEvent, sendException)
import Streamly.Internal.Data.StreamK (StreamK)
import Streamly.Internal.Data.Channel.Types
@ -44,105 +46,179 @@ import Streamly.Internal.Data.Channel.Types
-- the combined results as output stream.
data Channel m a = Channel
{
svarMrun :: RunInIO m
-- FORWARD FLOW: Flow of data from the workers to the consumer
-- | Runner for the monadic actions in the stream. Captures the monad
-- state at the point where the channel was created and uses the same
-- state to run all actions.
svarMrun :: RunInIO m
---------------------------------------------------------------------------
-- FORWARD FLOW: Flow of data from the workers to the consumer
---------------------------------------------------------------------------
-- Shared output queue (events, length)
-- XXX For better efficiency we can try a preallocated array type (perhaps
-- something like a vector) that allows an O(1) append. That way we will
-- avoid constructing and reversing the list. Possibly we can also avoid
-- the GC copying overhead. When the size increases we should be able to
-- allocate the array in chunks.
--
-- [LOCKING] Frequent locked access. This is updated by workers on each
-- yield and once in a while read by the consumer thread. This could have
-- big locking overhead if the number of workers is high.
--
-- XXX We can use a per-CPU data structure to reduce the locking overhead.
-- However, a per-cpu structure cannot guarantee the exact sequence in
-- which the elements were added, though that may not be important.
--
-- XXX We can send a bundle of events of one type coaleseced together in an
-- unboxed structure.
, outputQueue :: IORef ([ChildEvent a], Int)
-- [LOCKING] Infrequent MVar. Used when the outputQ transitions from empty
-- to non-empty, or a work item is queued by a worker to the work queue and
-- doorBellOnWorkQ is set by the consumer.
, outputDoorBell :: MVar () -- signal the consumer about output
-- | (events, count): output queue of the channel. This is where the
-- workers queue the results.
--
-- [LOCKING] Frequently locked. This is locked and updated by workers
-- on each yield, and locked, updated by the consumer thread once in a
-- while for reading. Workers' locking contention may be high if there are
-- a large number of workers.
, outputQueue :: IORef ([ChildEvent a], Int)
-- | Door bell for workers to wakeup the consumer.
--
-- [LOCKING] Infrequently locked. Used only when the 'outputQueue'
-- transitions from empty to non-empty, or a work item is queued by a
-- worker to the work queue and 'doorBellOnWorkQ' is set by the consumer.
, outputDoorBell :: MVar ()
-- XXX Can we use IO instead of m here?
, readOutputQ :: m [ChildEvent a]
, postProcess :: m Bool
, readOutputQ :: m [ChildEvent a] -- XXX remove
, postProcess :: m Bool -- XXX remove
---------------------------------------------------------------------------
-- Scheduling --
---------------------------------------------------------------------------
-- Combined/aggregate parameters
-- This is capped to maxBufferLimit if set to more than that. Otherwise
-- | This is capped to 'maxBufferLimit' if set to more than that. Otherwise
-- potentially each worker may yield one value to the buffer in the worst
-- case exceeding the requested buffer size.
, maxWorkerLimit :: Limit
-- | Maximum size of the 'outputQueue'. The actual worst case buffer could
-- be double of this as the consumer may read the queue and the workers may
-- fill it up even before the consumer has started consuming.
, maxBufferLimit :: Limit
-- | Tracks how many yields are remaining before the channel stops, used
-- when 'maxYields' option is enabled.
--
-- [LOCKING] Read only access by consumer when dispatching a worker.
-- Decremented by workers when picking work and undo decrement if the
-- worker does not yield a value.
, remainingWork :: Maybe (IORef Count)
, yieldRateInfo :: Maybe YieldRateInfo
, remainingWork :: Maybe (IORef Count)
, enqueue :: Bool -> (RunInIO m, StreamK m a) -> IO ()
-- | Rate control information for the channel used when 'rate' control is
-- enabled,
, yieldRateInfo :: Maybe YieldRateInfo
, enqueue :: Bool -> (RunInIO m, StreamK m a) -> IO () -- XXX remove
, eagerDispatch :: m ()
, isWorkDone :: IO Bool
, isQueueDone :: IO Bool
, doorBellOnWorkQ :: IORef Bool -- ring outputDoorBell on enqueue so that a
-- sleeping consumer thread can wake up and send more workers.
, workLoop :: Maybe WorkerInfo -> m ()
-- Shared, thread tracking
-- | When set to True, ring 'outputDoorBell' when a work item is queued on
-- the work queue.
, doorBellOnWorkQ :: IORef Bool
, workLoop :: Maybe WorkerInfo -> m () -- XXX remove
-- | Tracks all active worker threads.
--
-- [LOCKING] Updated unlocked, only by consumer thread.
, workerThreads :: IORef (Set ThreadId)
, workerThreads :: IORef (Set ThreadId)
-- | Total number of active worker threads.
--
-- [LOCKING] Updated locked, by consumer thread when dispatching a worker
-- and by the worker threads when the thread stops. This is read unsafely
-- and by a worker threads when the thread stops. This is read without lock
-- at several places where we want to rely on an approximate value.
, workerCount :: IORef Int
, workerCount :: IORef Int
-- XXX Can we use IO instead of m here?
, accountThread :: ThreadId -> m ()
, workerStopMVar :: MVar () -- Used only in ordered streams
-- cleanup: to track garbage collection of SVar --
, svarRef :: Maybe (IORef ())
-- | Used when 'ordered' is enabled.
, workerStopMVar :: MVar ()
---------------------------------------------------------------------------
-- cleanup --
---------------------------------------------------------------------------
-- | IORef to call a cleanup function when the channel is garbage
-- collected.
, svarRef :: Maybe (IORef ())
---------------------------------------------------------------------------
-- Stats --
, svarStats :: SVarStats
---------------------------------------------------------------------------
-- | Stats collection.
, svarStats :: SVarStats
---------------------------------------------------------------------------
-- Diagnostics --
---------------------------------------------------------------------------
-- | When 'inspect' mode is enabled we report diagnostic data about the
-- channel at certain points.
, svarInspectMode :: Bool
, svarCreator :: ThreadId
-- | threadId of the thread that created the channel
, svarCreator :: ThreadId
}
{-# INLINE yield #-}
yield :: Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield sv =
-- | Yield a value to the channel. Worker latencies are collected in the
-- supplied 'WorkerInfo' record, and periodically pushed to the channel's
-- 'workerPendingLatency' stat. Worker latencies are updated in the channel
-- only if the channel has a 'YieldRateInfo' attached and the
-- 'workerPollingInterval' is non-zero.
--
-- Even unregistered workers can use this API.
--
{-# INLINE yieldWith #-}
yieldWith ::
Maybe WorkerInfo -- ^ Rate control info for the worker
-> Channel m a
-> a
-> IO Bool -- ^ True means the worker can continue otherwise stop.
yieldWith winfo chan =
sendYield
(maxBufferLimit sv)
(maxWorkerLimit sv)
(workerCount sv)
(yieldRateInfo sv)
(outputQueue sv)
(outputDoorBell sv)
(maxBufferLimit chan)
(maxWorkerLimit chan)
(workerCount chan)
(yieldRateInfo chan)
(outputQueue chan)
(outputDoorBell chan)
winfo
{-# INLINE stop #-}
stop :: Channel m a -> Maybe WorkerInfo -> IO ()
stop sv =
-- | The worker stops yielding and exits. The final update of the collected
-- latency stats in 'WorkerInfo' are pushed to the channel. Upon receiving the
-- 'ChildStop' event the channel would remove the worker from its set of
-- registered workers.
--
-- A worker that uses this API must have been registered on the Channel prior
-- to invoking this API. This is usually done by the dispatcher when the
-- worker is dispatched.
{-# INLINE stopWith #-}
stopWith :: Maybe WorkerInfo -> Channel m a -> IO ()
stopWith winfo chan =
sendStop
(workerCount sv)
(yieldRateInfo sv)
(outputQueue sv)
(outputDoorBell sv)
(workerCount chan)
(yieldRateInfo chan)
(outputQueue chan)
(outputDoorBell chan)
winfo
-- | Stop the channel. Kill all running worker threads.
{-# INLINABLE stopChannel #-}
stopChannel :: MonadIO m => Channel m a -> m ()
stopChannel chan = liftIO $ do
-- | Like 'stopWith' but stops with the specified exception.
{-# INLINE exceptionWith #-}
exceptionWith :: Maybe WorkerInfo -> Channel m a -> SomeException -> IO ()
exceptionWith _winfo chan =
sendException (outputQueue chan) (outputDoorBell chan)
-- | Shutdown the channel. Kill all the registered worker threads.
{-# INLINABLE shutdown #-}
shutdown :: MonadIO m => Channel m a -> m ()
shutdown chan = liftIO $ do
atomicModifyIORefCAS_ (workerCount chan) $ \n -> n - 1
void
$ sendEvent
@ -150,9 +226,11 @@ stopChannel chan = liftIO $ do
(outputDoorBell chan)
ChildStopChannel
{-# NOINLINE dumpSVar #-}
dumpSVar :: Channel m a -> IO String
dumpSVar sv = do
-- | Dump the channel stats for diagnostics. Used when 'inspect' option is
-- enabled.
{-# NOINLINE dumpChannel #-}
dumpChannel :: Channel m a -> IO String
dumpChannel sv = do
xs <- sequence $ intersperse (return "\n")
[ return (dumpCreator (svarCreator sv))
, return "---------CURRENT STATE-----------"

View File

@ -8,12 +8,19 @@
--
module Streamly.Internal.Data.Stream.Prelude
(
module Streamly.Internal.Data.Stream.Concurrent
-- XXX Move to Stream.Channel
-- * Concurrency Channels
module Streamly.Internal.Data.Stream.Concurrent.Channel
-- * Concurrent Streams
, module Streamly.Internal.Data.Stream.Concurrent
-- * Time
, module Streamly.Internal.Data.Stream.Time
-- * Lifted
, module Streamly.Internal.Data.Stream.Lifted
)
where
import Streamly.Internal.Data.Stream.Concurrent
import Streamly.Internal.Data.Stream.Concurrent.Channel
import Streamly.Internal.Data.Stream.Time
import Streamly.Internal.Data.Stream.Lifted

View File

@ -62,6 +62,7 @@ import Data.Proxy (Proxy(..))
import Streamly.Data.Fold (Fold)
import Streamly.Internal.Data.Fold (Fold (..))
import Streamly.Internal.Data.IsMap (IsMap(..))
import Streamly.Internal.Data.Channel.Types (Rate, rate)
import Streamly.Internal.Data.Stream (Stream)
import Streamly.Internal.Data.Time.Units
( AbsTime