mirror of
https://github.com/composewell/streamly.git
synced 2024-11-10 12:47:22 +03:00
Update doc and rename APIs (SVars).
This commit is contained in:
parent
a1ab9be4d0
commit
f5807938f7
@ -32,10 +32,10 @@ module Asyncly.Core
|
||||
, EndOfStream (..)
|
||||
, newSVar1
|
||||
, newSVar2
|
||||
, joinSVar2
|
||||
, streamSVar
|
||||
|
||||
-- * Concurrent Streams
|
||||
, parallel
|
||||
, parAlt
|
||||
, parLeft
|
||||
)
|
||||
@ -92,19 +92,24 @@ data SVarTag = Conjunction | Disjunction deriving Eq
|
||||
data SVarSched = LIFO | FIFO deriving Eq
|
||||
|
||||
-- | Identify the type of the SVar. Two computations using the same style can
|
||||
-- be bunched on the same SVar.
|
||||
-- be scheduled on the same SVar.
|
||||
data SVarStyle = SVarStyle SVarTag SVarSched deriving Eq
|
||||
|
||||
-- | An SVar is a conduit to multiple streams running concurrently. It has an
|
||||
-- associated runqueue that holds the streams to be picked by a pool of worker
|
||||
-- threads. It has an associated output queue where the output stream elements
|
||||
-- are placed by the worker threads. A doorBell is used by the worker threads
|
||||
-- to intimate the consumer thread about availability of new results in the
|
||||
-- output queue.
|
||||
-- | An SVar (A Stream Var or an Sched Var) is a conduit to multiple streams
|
||||
-- running concurrently. It has an associated runqueue that holds the streams
|
||||
-- to be picked and run by a pool of worker threads. It has an associated
|
||||
-- output queue where the output stream elements are placed by the worker
|
||||
-- threads. A doorBell is used by the worker threads to intimate the consumer
|
||||
-- thread about availability of new results in the output queue. More workers
|
||||
-- are added to the SVar by 'streamSVar' on demand if the output produced is
|
||||
-- not keeping pace with the consumer. On bounded SVars, workers block on the
|
||||
-- output queue to provide throttling when the consumer is not pulling fast
|
||||
-- enough. The number of workers may even get reduced depending on the
|
||||
-- consuming pace.
|
||||
--
|
||||
-- New work is enqueued either at the time of creation of the SVar or as a
|
||||
-- result of executing the parallel combinators i.e. '<|' and '<|>' by already
|
||||
-- enqueued work.
|
||||
-- result of executing the parallel combinators i.e. '<|' and '<|>' when the
|
||||
-- already enqueued computations get evaluated. See 'joinSVar2'.
|
||||
data SVar m a =
|
||||
SVar { outputQueue :: IORef [ChildEvent a]
|
||||
, doorBell :: MVar Bool -- wakeup mechanism for outQ
|
||||
@ -122,9 +127,9 @@ data SVar m a =
|
||||
-- TBD use a functor instead of the bare type a?
|
||||
|
||||
-- | Represents a monadic stream of values of type 'a' constructed using
|
||||
-- actions in monad 'm'. Streams can be composed sequentially or in parallel in
|
||||
-- product style compositions (monadic bind multiplies streams in a ListT
|
||||
-- fashion) and using sum style compositions like 'Semigroup', 'Monoid',
|
||||
-- actions in monad 'm'. Streams can be composed sequentially or in parallel;
|
||||
-- in product style compositions (monadic bind multiplies streams in a ListT
|
||||
-- fashion) or in sum style compositions like 'Semigroup', 'Monoid',
|
||||
-- 'Alternative' or variants of these.
|
||||
newtype Stream m a =
|
||||
Stream {
|
||||
@ -138,7 +143,7 @@ newtype Stream m a =
|
||||
-- | A monad that can perform asynchronous/concurrent IO operations.
|
||||
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
|
||||
|
||||
-- | Yield a singleton value in a stream.
|
||||
-- | Yield a singleton value as a stream.
|
||||
yields :: a -> Stream m a
|
||||
yields a = Stream $ \_ _ yld -> yld a Nothing
|
||||
|
||||
@ -147,7 +152,7 @@ yields a = Stream $ \_ _ yld -> yld a Nothing
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
-- | '<>' concatenates two streams sequentially i.e. the first stream is
|
||||
-- exhausted completely before yields any element from the second stream.
|
||||
-- exhausted completely before yielding any element from the second stream.
|
||||
instance Semigroup (Stream m a) where
|
||||
m1 <> m2 = go m1
|
||||
where
|
||||
@ -305,7 +310,7 @@ sendWorkerWait sv = do
|
||||
then (pushWorker sv) >> sendWorkerWait sv
|
||||
else void (liftIO $ takeMVar (doorBell sv))
|
||||
|
||||
-- | An 'async' stream has finished but is still being used.
|
||||
-- | A stream being pulled from 'SVar' has ended.
|
||||
data EndOfStream = EndOfStream deriving Show
|
||||
instance Exception EndOfStream
|
||||
|
||||
@ -457,27 +462,49 @@ newSVar2 style m1 m2 = do
|
||||
-- TBD for pure work (when we are not in the IO monad) we can divide it into
|
||||
-- just the number of CPUs.
|
||||
|
||||
{-# NOINLINE makeAsync #-}
|
||||
makeAsync :: MonadAsync m
|
||||
{-# NOINLINE withNewSVar2 #-}
|
||||
withNewSVar2 :: MonadAsync m
|
||||
=> SVarStyle -> Stream m a -> Stream m a -> Stream m a
|
||||
makeAsync style m1 m2 = Stream $ \_ stp yld -> do
|
||||
withNewSVar2 style m1 m2 = Stream $ \_ stp yld -> do
|
||||
sv <- newSVar2 style m1 m2
|
||||
(runStream (streamSVar sv)) Nothing stp yld
|
||||
|
||||
-- | Compose two streams in parallel using a scheduling policy specified by
|
||||
-- 'SVarStyle'. Note: This is designed to scale for right associated
|
||||
-- compositions, therefore always use a right fold for folding large or
|
||||
-- infinite structures. For left associated structures it will first
|
||||
-- destructure the whole structure and then start executing, consuming memory
|
||||
-- proportional to the size of the structure, just like a left fold.
|
||||
{-# INLINE parallel #-}
|
||||
parallel :: MonadAsync m => SVarStyle -> Stream m a -> Stream m a -> Stream m a
|
||||
parallel style m1 m2 = Stream $ \st stp yld -> do
|
||||
-- | Join two computations on the currently running 'SVar' queue for concurrent
|
||||
-- execution. The 'SVarStyle' required by the current composition context is
|
||||
-- passed as one of the parameters. If the style does not match with the style
|
||||
-- of the current 'SVar' we create a new 'SVar' and schedule the computations
|
||||
-- on that. The newly created SVar joins as one of the computations on the
|
||||
-- current SVar queue.
|
||||
--
|
||||
-- When we are using parallel composition, an SVar is passed around as a state
|
||||
-- variable. We try to schedule a new parallel computation on the SVar passed
|
||||
-- to us. The first time, when no SVar exists, a new SVar is created.
|
||||
-- Subsequently, 'joinSVar2' may get called when a computation already
|
||||
-- scheduled on the SVar is further evaluated. For example, when (a \<|> b) is
|
||||
-- evaluated it calls a 'joinSVar2' to put 'a' and 'b' on the current scheduler
|
||||
-- queue. However, if the scheduling and composition style of the new
|
||||
-- computation being scheduled is different than the style of the current SVar,
|
||||
-- then we create a new SVar and schedule it on that.
|
||||
--
|
||||
-- For example:
|
||||
--
|
||||
-- * (x \<|> y) \<|> (t \<|> u) -- all of them get scheduled on the same SVar
|
||||
-- * (x \<|> y) \<|> (t \<| u) -- @t@ and @u@ get scheduled on a new child SVar
|
||||
-- because of the scheduling policy change.
|
||||
-- * if we 'adapt' a stream of type 'AsyncT' to a stream of type
|
||||
-- 'ParallelT', we create a new SVar at the transitioning bind.
|
||||
-- * When the stream is switching from disjunctive composition to conjunctive
|
||||
-- composition and vice-versa we create a new SVar to isolate the scheduling
|
||||
-- of the two.
|
||||
--
|
||||
{-# INLINE joinSVar2 #-}
|
||||
joinSVar2 :: MonadAsync m
|
||||
=> SVarStyle -> Stream m a -> Stream m a -> Stream m a
|
||||
joinSVar2 style m1 m2 = Stream $ \st stp yld -> do
|
||||
case st of
|
||||
Nothing -> (runStream (makeAsync style m1 m2)) Nothing stp yld
|
||||
Just sv | svarStyle sv /= style ->
|
||||
(runStream (makeAsync style m1 m2)) Nothing stp yld
|
||||
Just sv -> liftIO ((enqueue sv) m2) >> (runStream m1) st stp yld
|
||||
Just sv | svarStyle sv == style ->
|
||||
liftIO ((enqueue sv) m2) >> (runStream m1) st stp yld
|
||||
_ -> (runStream (withNewSVar2 style m1 m2)) Nothing stp yld
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Semigroup and Monoid style compositions for parallel actions
|
||||
@ -495,15 +522,20 @@ parAhead = undefined
|
||||
(<>|) = parAhead
|
||||
-}
|
||||
|
||||
-- | Same as '<|>'.
|
||||
-- | Same as '<|>'. Since this schedules all the composed streams fairly you
|
||||
-- cannot fold infinite number of streams using this operation.
|
||||
{-# INLINE parAlt #-}
|
||||
parAlt :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
|
||||
parAlt = parallel (SVarStyle Disjunction FIFO)
|
||||
parAlt = joinSVar2 (SVarStyle Disjunction FIFO)
|
||||
|
||||
-- | Same as '<|'.
|
||||
-- | Same as '<|'. Since this schedules the left side computation first you can
|
||||
-- right fold an infinite container using this operator. However a left fold
|
||||
-- will not work well as it first unpeels the whole structure before scheduling
|
||||
-- a computation requiring an amount of memory proportional to the size of the
|
||||
-- structure.
|
||||
{-# INLINE parLeft #-}
|
||||
parLeft :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
|
||||
parLeft = parallel (SVarStyle Disjunction LIFO)
|
||||
parLeft = joinSVar2 (SVarStyle Disjunction LIFO)
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
-- Instances (only used for deriving newtype instances)
|
||||
|
@ -372,13 +372,13 @@ parbind
|
||||
-> Stream m a
|
||||
-> (a -> Stream m b)
|
||||
-> Stream m b
|
||||
parbind k m f = go m
|
||||
parbind par m f = go m
|
||||
where
|
||||
go (Stream g) =
|
||||
Stream $ \ctx stp yld ->
|
||||
let run x = (runStream x) ctx stp yld
|
||||
yield a Nothing = run $ f a
|
||||
yield a (Just r) = run $ f a `k` (go r)
|
||||
yield a (Just r) = run $ f a `par` (go r)
|
||||
in g Nothing stp yield
|
||||
|
||||
-- | Execute a monadic action for each element in the stream, running
|
||||
@ -388,7 +388,7 @@ instance MonadAsync m => Monad (AsyncT m) where
|
||||
return = pure
|
||||
(AsyncT m) >>= f = AsyncT $ parbind par m g
|
||||
where g x = getAsyncT (f x)
|
||||
par = parallel (SVarStyle Conjunction LIFO)
|
||||
par = joinSVar2 (SVarStyle Conjunction LIFO)
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Applicative
|
||||
@ -484,7 +484,7 @@ instance MonadAsync m => Monad (ParallelT m) where
|
||||
return = pure
|
||||
(ParallelT m) >>= f = ParallelT $ parbind par m g
|
||||
where g x = getParallelT (f x)
|
||||
par = parallel (SVarStyle Conjunction FIFO)
|
||||
par = joinSVar2 (SVarStyle Conjunction FIFO)
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Applicative
|
||||
|
Loading…
Reference in New Issue
Block a user