Move parConcatMapChanK to the Channel module

Update the doc of parConcatMapChanK
Update doc of toChannelK
This commit is contained in:
Harendra Kumar 2024-02-15 03:44:01 +05:30
parent 7f6deea1e6
commit a0f352938a
3 changed files with 148 additions and 140 deletions

View File

@ -23,17 +23,20 @@ module Streamly.Internal.Data.Stream.Channel
-- ** Evaluation
, withChannelK
, withChannel
, chanConcatMapK
-- quiesceChannel -- wait for running tasks but do not schedule any more.
)
where
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Streamly.Internal.Data.Stream (Stream)
import Streamly.Internal.Control.Concurrent (askRunInIO)
import Streamly.Internal.Data.SVar.Type (adaptState)
import qualified Streamly.Internal.Data.StreamK as K
import Streamly.Internal.Data.Channel.Types
import Streamly.Internal.Data.Stream.Channel.Type
import Streamly.Internal.Data.Stream.Channel.Operations
import Streamly.Internal.Data.Stream.Channel.Append
@ -82,3 +85,132 @@ withChannel :: MonadAsync m =>
withChannel modifier input evaluator =
let f chan stream = K.fromStream $ evaluator chan (K.toStream stream)
in K.toStream $ withChannelK modifier (K.fromStream input) f
-------------------------------------------------------------------------------
-- Evaluator
-------------------------------------------------------------------------------
-- | @concatMapHeadK consumeTail mapHead stream@, maps a stream generation
-- function on the head element and performs a side effect on the tail.
--
-- Used for concurrent evaluation of streams using a Channel. A worker
-- evaluating the stream would queue the tail and go on to evaluate the head.
-- The tail is picked up by another worker which does the same.
{-# INLINE concatMapHeadK #-}
concatMapHeadK :: Monad m =>
(K.StreamK m a -> m ()) -- ^ Queue the tail
-> (a -> K.StreamK m b) -- ^ Generate a stream from the head
-> K.StreamK m a
-> K.StreamK m b
concatMapHeadK consumeTail mapHead stream =
K.mkStream $ \st yld sng stp -> do
let foldShared = K.foldStreamShared st yld sng stp
single a = foldShared $ mapHead a
yieldk a r = consumeTail r >> single a
in K.foldStreamShared (adaptState st) yieldk single stp stream
-------------------------------------------------------------------------------
-- concat streams
-------------------------------------------------------------------------------
-- | 'mkEnqueue chan f returns a queuing function @enq@. @enq@ takes a
-- @stream@ and enqueues @f enq stream@ on the channel. One example of @f@ is
-- 'concatMapHeadK'. When the enqueued value with 'concatMapHeadK' as @f@ is
-- evaluated, it generates an output stream from the head and enqueues @f enq
-- tail@ on the channel. Thus whenever the enqueued stream is evaluated it
-- generates a stream from the head and queues the tail on the channel.
--
-- Note that @enq@ and runner are mutually recursive, mkEnqueue ties the
-- knot between the two.
--
{-# INLINE mkEnqueue #-}
mkEnqueue :: MonadAsync m =>
Channel m b
-- | @divider enq stream@
-> ((K.StreamK m a -> m ()) -> K.StreamK m a -> K.StreamK m b)
-- | Queuing function @enq@
-> m (K.StreamK m a -> m ())
mkEnqueue chan runner = do
runInIO <- askRunInIO
return
$ let f stream = do
-- When using parConcatMap with lazy dispatch we enqueue the
-- outer stream tail and then map a stream generator on the
-- head, which is also queued. If we pick both head and tail
-- with equal priority we may keep blowing up the tail into
-- more and more streams. To avoid that we give preference to
-- the inner streams when picking up for execution. This
-- requires two work queues, one for outer stream and one for
-- inner. Here we enqueue the outer loop stream.
liftIO $ enqueue chan False (runInIO, runner f stream)
-- XXX In case of eager dispatch we can just directly dispatch
-- a worker with the tail stream here rather than first queuing
-- and then dispatching a worker which dequeues the work. The
-- older implementation did a direct dispatch here and its perf
-- characterstics looked much better.
eagerDispatch chan
in f
{-# INLINE parConcatMapChanKAll #-}
parConcatMapChanKAll :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKAll chan f stream =
let run q = concatMapHeadK q f
in K.concatMapEffect (`run` stream) (mkEnqueue chan run)
-- K.parConcatMap (_appendWithChanK chan) f stream
{-# INLINE parConcatMapChanKAny #-}
parConcatMapChanKAny :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKAny chan f stream =
let done = K.nilM (shutdown chan)
run q = concatMapHeadK q (\x -> K.append (f x) done)
in K.concatMapEffect (`run` stream) (mkEnqueue chan run)
{-# INLINE parConcatMapChanKFirst #-}
parConcatMapChanKFirst :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKFirst chan f stream =
let done = K.nilM (shutdown chan)
run q = concatMapHeadK q f
in K.concatEffect $ do
res <- K.uncons stream
case res of
Nothing -> return K.nil
Just (h, t) -> do
q <- mkEnqueue chan run
q t
return $ K.append (f h) done
-- | Make a concurrent stream evaluator from a stream, to be used in
-- 'withChannelK' or 'toChannelK'. Maps a stream generation function on each
-- element of the stream, the evaluation of the map on each element happens
-- concurrently. All the generated streams are merged together in the output of
-- the channel. The scheduling and termination behavior depends on the channel
-- settings.
--
-- Note that if you queue a stream on the channel using 'toChannelK', it will
-- be picked up by a worker and the worker would evaluate the entire stream
-- serially and emit the results on the channel. However, if you transform the
-- stream using 'parConcatMapChanK' and queue it on the channel, it
-- parallelizes the function map on each element of the stream. The simplest
-- example is @parConcatMapChanK id id@ which is equivalent to evaluating each
-- element of the stream concurrently.
--
-- A channel worker evaluating this function would enqueue the tail on the
-- channel's work queue and go on to evaluate the head generating an output
-- stream. The tail is picked up by another worker which does the same and so
-- on.
{-# INLINE chanConcatMapK #-}
chanConcatMapK :: MonadAsync m =>
(Config -> Config)
-> Channel m b
-> (a -> K.StreamK m b)
-> K.StreamK m a
-> K.StreamK m b
chanConcatMapK modifier chan f stream = do
let cfg = modifier defaultConfig
case getStopWhen cfg of
AllStop -> parConcatMapChanKAll chan f stream
FirstStops -> parConcatMapChanKFirst chan f stream
AnyStops -> parConcatMapChanKAny chan f stream

View File

@ -88,6 +88,8 @@ import Test.Inspection (inspect, hasNoTypeClassesExcept)
-- XXX Should be a Fold, singleton API could be called joinChannel, or the fold
-- can be called joinChannel.
-- XXX If we use toChannelK multiple times on a channel make sure the channel
-- does not go away before we use the subsequent ones.
-- | High level function to enqueue a work item on the channel. The fundamental
-- unit of work is a stream. Each stream enqueued on the channel is picked up
@ -100,13 +102,17 @@ import Test.Inspection (inspect, hasNoTypeClassesExcept)
-- be generated serially one after the other. Only two or more streams can be
-- run concurrently with each other.
--
-- See 'chanConcatMapK' for concurrent evaluation of each element of a stream.
-- Alternatively, you can wrap each element of the original stream into a
-- stream generating action and queue all those streams on the channel. Then
-- all of them would be evaluated concurrently. However, that would not be
-- streaming in nature, it would require buffering space for the entire
-- original stream. Prefer 'chanConcatMapK' for larger streams.
--
-- Items from each evaluated streams are queued to the same output queue of the
-- channel which can be read using 'fromChannelK'. 'toChannelK' can be called
-- multiple times to enqueue multiple streams on the channel.
--
-- The fundamental unit of work is a stream. If you want to run single actions
-- concurrently, wrap each action into a singleton stream and queue all those
-- streams on the channel.
{-# INLINE toChannelK #-}
toChannelK :: MonadRunInIO m => Channel m a -> K.StreamK m a -> m ()
toChannelK chan m = do

View File

@ -77,16 +77,15 @@ where
import Control.Concurrent (myThreadId, killThread)
import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Streamly.Internal.Control.Concurrent (MonadAsync, askRunInIO)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Control.ForkLifted (forkManaged)
import Streamly.Internal.Data.Channel.Dispatcher (modifyThread)
import Streamly.Internal.Data.Channel.Worker (sendEvent)
import Streamly.Internal.Data.Stream (Stream, Step(..))
import Streamly.Internal.Data.Stream.Channel
( Channel(..), newChannel, fromChannel, toChannelK, withChannelK
, withChannel, shutdown
, withChannel, shutdown, chanConcatMapK
)
import Streamly.Internal.Data.SVar.Type (adaptState)
import qualified Streamly.Internal.Data.MutArray as Unboxed
import qualified Streamly.Internal.Data.Stream as Stream
@ -258,133 +257,6 @@ parTwo modifier stream1 stream2 =
$ appendWithK
modifier (Stream.toStreamK stream1) (Stream.toStreamK stream2)
-------------------------------------------------------------------------------
-- Evaluator
-------------------------------------------------------------------------------
-- | @concatMapHeadK consumeTail mapHead stream@, maps a stream generation
-- function on the head element and performs a side effect on the tail.
--
-- Used for concurrent evaluation of streams using a Channel. A worker
-- evaluating the stream would queue the tail and go on to evaluate the head.
-- The tail is picked up by another worker which does the same.
{-# INLINE concatMapHeadK #-}
concatMapHeadK :: Monad m =>
(K.StreamK m a -> m ()) -- ^ Queue the tail
-> (a -> K.StreamK m b) -- ^ Generate a stream from the head
-> K.StreamK m a
-> K.StreamK m b
concatMapHeadK consumeTail mapHead stream =
K.mkStream $ \st yld sng stp -> do
let foldShared = K.foldStreamShared st yld sng stp
single a = foldShared $ mapHead a
yieldk a r = consumeTail r >> single a
in K.foldStreamShared (adaptState st) yieldk single stp stream
-------------------------------------------------------------------------------
-- concat streams
-------------------------------------------------------------------------------
-- | 'mkEnqueue chan divider' returns a queuing function @enq@. @enq@ takes a
-- @stream@ and enqueues the stream returned by @divider enq stream@ on the
-- channel. Divider generates an output stream from the head and enqueues the
-- tail on the channel.
--
-- The returned function @enq@ basically queues two streams on the channel, the
-- first stream is a stream generated from the head element of the
-- input stream, the second stream is a lazy action which when evaluated would
-- recursively do the same thing again for the tail. If we keep on evaluating
-- the second stream, ultimately all the elements in the original stream
-- (@StreamK m a@) would be mapped to individual streams (@StreamK m b@) which
-- are individually queued on the channel.
--
-- Note that @enq@ and runner are mutually recursive, mkEnqueue ties the
-- knot between the two.
--
{-# INLINE mkEnqueue #-}
mkEnqueue :: MonadAsync m =>
Channel m b
-- | @divider enq stream@
-> ((K.StreamK m a -> m ()) -> K.StreamK m a -> K.StreamK m b)
-- | Queuing function @enq@
-> m (K.StreamK m a -> m ())
mkEnqueue chan runner = do
runInIO <- askRunInIO
return
$ let f stream = do
-- When using parConcatMap with lazy dispatch we enqueue the
-- outer stream tail and then map a stream generator on the
-- head, which is also queued. If we pick both head and tail
-- with equal priority we may keep blowing up the tail into
-- more and more streams. To avoid that we give preference to
-- the inner streams when picking up for execution. This
-- requires two work queues, one for outer stream and one for
-- inner. Here we enqueue the outer loop stream.
liftIO $ enqueue chan False (runInIO, runner f stream)
-- XXX In case of eager dispatch we can just directly dispatch
-- a worker with the tail stream here rather than first queuing
-- and then dispatching a worker which dequeues the work. The
-- older implementation did a direct dispatch here and its perf
-- characterstics looked much better.
eagerDispatch chan
in f
-- | Takes the head element of the input stream and queues the tail of the
-- stream to the channel, then maps the supplied function on the head and
-- evaluates the resulting stream.
--
-- This function is designed to be used by worker threads on a channel to
-- concurrently map and evaluate a stream.
{-# INLINE parConcatMapChanK #-}
parConcatMapChanK :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanK chan f stream =
let run q = concatMapHeadK q f
in K.concatMapEffect (`run` stream) (mkEnqueue chan run)
-- K.parConcatMap (_appendWithChanK chan) f stream
{-# INLINE parConcatMapChanKAny #-}
parConcatMapChanKAny :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKAny chan f stream =
let done = K.nilM (shutdown chan)
run q = concatMapHeadK q (\x -> K.append (f x) done)
in K.concatMapEffect (`run` stream) (mkEnqueue chan run)
{-# INLINE parConcatMapChanKFirst #-}
parConcatMapChanKFirst :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKFirst chan f stream =
let done = K.nilM (shutdown chan)
run q = concatMapHeadK q f
in K.concatEffect $ do
res <- K.uncons stream
case res of
Nothing -> return K.nil
Just (h, t) -> do
q <- mkEnqueue chan run
q t
return $ K.append (f h) done
-- XXX Move this to the Channel module as an evaluator. Rename to
-- parConcatMapChanK or just parConcatMapK.
-- XXX If we use toChannelK multiple times on a channel make sure the channel
-- does not go away before we use the subsequent ones.
{-# INLINE parConcatMapChanKGeneric #-}
parConcatMapChanKGeneric :: MonadAsync m =>
(Config -> Config)
-> Channel m b
-> (a -> K.StreamK m b)
-> K.StreamK m a
-> K.StreamK m b
parConcatMapChanKGeneric modifier chan f stream = do
let cfg = modifier defaultConfig
case getStopWhen cfg of
AllStop -> parConcatMapChanK chan f stream
FirstStops -> parConcatMapChanKFirst chan f stream
AnyStops -> parConcatMapChanKAny chan f stream
-- XXX Add a deep evaluation variant that evaluates individual elements in the
-- generated streams in parallel.
@ -395,7 +267,7 @@ parConcatMapChanKGeneric modifier chan f stream = do
parConcatMapK :: MonadAsync m =>
(Config -> Config) -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapK modifier f input =
let g = parConcatMapChanKGeneric modifier
let g = chanConcatMapK modifier
in withChannelK modifier input (`g` f)
-- | Map each element of the input to a stream and then concurrently evaluate
@ -673,12 +545,10 @@ parConcatIterate modifier f input =
where
iterateStream channel =
parConcatMapChanKGeneric modifier channel (generate channel)
iterateStream chan = chanConcatMapK modifier chan (generate chan)
generate channel x =
-- XXX The channel q should be FIFO for DFS, otherwise it is BFS
x `K.cons` iterateStream channel (Stream.toStreamK $ f x)
-- XXX The channel q should be FIFO for DFS, otherwise it is BFS
generate chan x = x `K.cons` iterateStream chan (Stream.toStreamK $ f x)
-------------------------------------------------------------------------------
-- Generate