mirror of
https://github.com/composewell/streamly.git
synced 2024-10-27 12:12:05 +03:00
781 lines
30 KiB
Haskell
781 lines
30 KiB
Haskell
{-# OPTIONS_GHC -fno-warn-unused-imports #-}
|
|
-- |
|
|
-- Module : ConcurrentStreams
|
|
-- Copyright : (c) 2017 Composewell Technologies
|
|
--
|
|
-- License : BSD3
|
|
-- Maintainer : streamly@composewell.com
|
|
--
|
|
-- In this tutorial we will show how streamly can be used for idiomatic and
|
|
-- declarative concurrent programming. Before you go through this tutorial we
|
|
-- recommend that you take a look at the Streamly serial streams tutorial.
|
|
|
|
module ConcurrentStreams
|
|
(
|
|
-- * Concurrent Streams
|
|
-- $concurrentStreams
|
|
|
|
-- * Combining Streams
|
|
-- $flavors
|
|
|
|
-- * Imports and Supporting Code
|
|
-- $imports
|
|
|
|
-- * Generating Streams Concurrently
|
|
-- $generatingConcurrently
|
|
|
|
-- * Concurrent Pipeline Stages
|
|
-- $concurrentApplication
|
|
|
|
-- * Mapping Concurrently
|
|
-- $concurrentTransformation
|
|
|
|
-- * Merging Streams
|
|
|
|
-- ** Semigroup Style
|
|
|
|
-- *** Deep Speculative Composition ('Ahead')
|
|
-- $ahead
|
|
|
|
-- *** Deep Asynchronous Composition ('Async')
|
|
-- $async
|
|
|
|
-- *** Wide Asynchronous Composition ('WAsync')
|
|
-- $wasync
|
|
|
|
-- *** Parallel Asynchronous Composition ('Parallel')
|
|
-- $parallel
|
|
|
|
-- XXX we should deprecate and remove the mkAsync API
|
|
-- Custom composition
|
|
-- custom
|
|
|
|
-- ** Monoid Style
|
|
-- $monoid
|
|
|
|
-- * Nesting Streams
|
|
|
|
-- ** Monad
|
|
-- *** Deep Speculative Nesting ('Ahead')
|
|
-- $aheadNesting
|
|
|
|
-- *** Deep Asynchronous Nesting ('Async')
|
|
-- $concurrentNesting
|
|
|
|
-- *** Wide Asynchronous Nesting ('WAsync')
|
|
-- $wasyncNesting
|
|
|
|
-- *** Parallel Asynchronous Nesting ('Parallel')
|
|
-- $parallelNesting
|
|
|
|
-- ** Applicative
|
|
-- $applicative
|
|
|
|
-- * Zipping Streams
|
|
|
|
-- ** Parallel Zipping
|
|
-- $parallelzip
|
|
|
|
-- * Concurrent Programming
|
|
-- $concurrent
|
|
|
|
-- * Writing Concurrent Programs
|
|
-- $programs
|
|
|
|
-- * Where to go next?
|
|
-- $furtherReading
|
|
)
|
|
where
|
|
|
|
import Streamly.Prelude
|
|
import Data.Semigroup
|
|
import Control.Applicative
|
|
import Control.Monad
|
|
import Control.Monad.IO.Class (MonadIO(..))
|
|
import Control.Monad.Trans.Class (MonadTrans (lift))
|
|
|
|
-- CAUTION: please keep setup and imports sections in sync
|
|
|
|
-- $setup
|
|
-- >>> :m
|
|
-- >>> import Data.Function ((&))
|
|
-- >>> import Streamly.Prelude ((|:), (|&))
|
|
-- >>> import qualified Streamly.Prelude as Stream
|
|
-- >>> import qualified Streamly.Data.Fold as Fold
|
|
--
|
|
-- >>> import Control.Concurrent (threadDelay, myThreadId)
|
|
-- >>> :{
|
|
-- delay n = Stream.fromEffect $ do
|
|
-- threadDelay (n * 1000000)
|
|
-- tid <- myThreadId
|
|
-- putStrLn (show tid ++ ": Delay " ++ show n)
|
|
-- :}
|
|
--
|
|
-- >>> import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
|
|
-- >>> hSetBuffering stdout LineBuffering
|
|
--
|
|
|
|
-- $imports
|
|
--
|
|
-- In most of example snippets we do not repeat the imports. Where imports are
|
|
-- not explicitly specified use the imports shown below.
|
|
--
|
|
-- >>> :m
|
|
-- >>> import Data.Function ((&))
|
|
-- >>> import Streamly.Prelude ((|:), (|&))
|
|
-- >>> import qualified Streamly.Prelude as Stream
|
|
-- >>> import qualified Streamly.Data.Fold as Fold
|
|
--
|
|
-- To illustrate concurrent vs serial composition aspects, we will use the
|
|
-- following @delay@ function to introduce a sleep or delay specified in
|
|
-- seconds. After the delay it prints the number of seconds it slept.
|
|
--
|
|
-- >>> import Control.Concurrent (threadDelay, myThreadId)
|
|
-- >>> :{
|
|
-- delay n = Stream.fromEffect $ do
|
|
-- threadDelay (n * 1000000)
|
|
-- tid <- myThreadId
|
|
-- putStrLn (show tid ++ ": Delay " ++ show n)
|
|
-- :}
|
|
--
|
|
-- For concurrent examples, use line buffering, otherwise output from different
|
|
-- threads may get mixed:
|
|
--
|
|
-- >>> import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
|
|
-- >>> hSetBuffering stdout LineBuffering
|
|
--
|
|
|
|
-- $concurrentStreams
|
|
--
|
|
-- Many stream operations can be done concurrently:
|
|
--
|
|
-- * Streams can be generated concurrently.
|
|
--
|
|
-- * Streams can be merged concurrently.
|
|
--
|
|
-- * Multiple stages in a streaming pipeline can run concurrently.
|
|
--
|
|
-- * Streams can be mapped and zipped concurrently.
|
|
--
|
|
-- * In monadic composition they combine like a list transformer,
|
|
-- providing concurrent non-determinism.
|
|
--
|
|
-- There are three basic concurrent stream styles, 'Ahead', 'Async', and
|
|
-- 'Parallel'. The 'Ahead' style streams are similar to 'Serial' except that
|
|
-- they can speculatively execute multiple stream actions concurrently in
|
|
-- advance. 'Ahead' would return exactly the same stream as 'Serial' except
|
|
-- that it may execute the actions concurrently. The 'Async' style streams,
|
|
-- like 'Ahead', speculatively execute multiple stream actions in advance but
|
|
-- return the results in their finishing order rather than in the stream
|
|
-- traversal order. 'Parallel' is like 'Async' except that it provides
|
|
-- unbounded parallelism instead of controlled parallelism.
|
|
--
|
|
-- For easy reference, we can classify the stream types based on /execution order/,
|
|
-- /consumption order/, and /bounded or unbounded/ concurrency.
|
|
-- Execution could be serial (i.e. synchronous) or asynchronous. In serial
|
|
-- execution we execute the next action in the stream only after the previous
|
|
-- one has finished executing. In asynchronous execution multiple actions in
|
|
-- the stream can be executed asynchronously i.e. the next action can start
|
|
-- executing even before the first one has finished. Consumption order
|
|
-- determines the order in which the outputs generated by the composition are
|
|
-- consumed. Consumption could be serial or asynchronous. In serial
|
|
-- consumption, the outputs are consumed in the traversal order, in
|
|
-- asynchronous consumption the outputs are consumed as they arrive i.e. first
|
|
-- come first serve order.
|
|
--
|
|
-- +------------+--------------+--------------+--------------+
|
|
-- | Type | Execution | Consumption | Concurrency |
|
|
-- +============+==============+==============+==============+
|
|
-- | 'Serial' | Serial | Serial | None |
|
|
-- +------------+--------------+--------------+--------------+
|
|
-- | 'Ahead' | Asynchronous | Serial | bounded |
|
|
-- +------------+--------------+--------------+--------------+
|
|
-- | 'Async' | Asynchronous | Asynchronous | bounded |
|
|
-- +------------+--------------+--------------+--------------+
|
|
-- | 'Parallel' | Asynchronous | Asynchronous | unbounded |
|
|
-- +------------+--------------+--------------+--------------+
|
|
--
|
|
-- All these types can be freely inter-converted using type conversion
|
|
-- combinators or type annotations, without any cost, to achieve the desired
|
|
-- composition style. To force a particular type of composition, we coerce the
|
|
-- stream type using the corresponding type adapting combinator from
|
|
-- 'fromSerial', 'fromAhead', 'fromAsync', or 'fromParallel'. The default stream type
|
|
-- is inferred as 'Serial' unless you change it by using one of the combinators
|
|
-- or by using a type annotation.
|
|
|
|
-- $flavors
|
|
--
|
|
-- Streams can be combined using '<>' or 'mappend' to form a
|
|
-- composite. Composite streams can be interpreted in a depth first or
|
|
-- breadth first manner using an appropriate type conversion before
|
|
-- consumption. Deep (e.g. 'Serial') stream type variants traverse a
|
|
-- composite stream in a depth first manner, such that each stream is
|
|
-- traversed fully before traversing the next stream. Wide
|
|
-- (e.g. 'WSerial') stream types traverse it in a breadth first
|
|
-- manner, such that one element from each stream is traversed before
|
|
-- coming back to the first stream again.
|
|
--
|
|
-- Each stream type has a wide traversal variant prefixed by 'W'. The wide
|
|
-- variant differs only in the Semigroup\/Monoid, Applicative\/Monad
|
|
-- compositions of the streams.
|
|
-- The following table summarizes the basic types and the corresponding wide
|
|
-- variants:
|
|
--
|
|
-- @
|
|
-- +------------+-----------+
|
|
-- | Deep | Wide |
|
|
-- +============+===========+
|
|
-- | 'Serial' | 'WSerial' |
|
|
-- +------------+-----------+
|
|
-- | 'Ahead' | 'WAhead' |
|
|
-- +------------+-----------+
|
|
-- | 'Async' | 'WAsync' |
|
|
-- +------------+-----------+
|
|
-- @
|
|
--
|
|
-- Other than these types there are also 'ZipSerial' and 'ZipAsync' types that
|
|
-- zip streams serially or concurrently using 'Applicative' operation. These
|
|
-- types are not monads they are only applicatives and they do not differ in
|
|
-- 'Semigroup' composition.
|
|
--
|
|
|
|
-- $programs
|
|
--
|
|
-- When writing concurrent programs it is advised to not use the concurrent
|
|
-- style stream combinators blindly at the top level. That might create too
|
|
-- much concurrency where it is not even required, and can even degrade
|
|
-- performance in some cases. In some cases it can also lead to surprising
|
|
-- behavior because of some code that is supposed to be serial becoming
|
|
-- concurrent. Please be aware that all concurrency capable APIs that you may
|
|
-- have used under the scope of a concurrent stream combinator will become
|
|
-- concurrent. For example if you have a 'repeatM' somewhere in your program
|
|
-- and you use 'fromParallel' on top, the 'repeatM' becomes fully parallel,
|
|
-- resulting into an infinite parallel execution . Instead, use the
|
|
-- /Keep It Serial and Stupid/ principle, start with the default serial
|
|
-- composition and enable concurrent combinators only when and where necessary.
|
|
-- When you use a concurrent combinator you can use an explicit 'fromSerial'
|
|
-- combinator to suppress any unnecessary concurrency under the scope of that
|
|
-- combinator.
|
|
|
|
-- $generatingConcurrently
|
|
--
|
|
-- Monadic construction and generation functions like 'consM', 'unfoldrM',
|
|
-- 'replicateM', 'repeatM', 'iterateM' and 'fromFoldableM' work concurrently
|
|
-- when used with appropriate stream type combinator. The pure versions of
|
|
-- these APIs are not concurrent, however you can use the monadic versions even
|
|
-- for pure computations by wrapping the pure value in a monad to get the
|
|
-- concurrent generation capability where required.
|
|
--
|
|
-- The following code finishes in 3 seconds (6 seconds when serial):
|
|
--
|
|
-- >>> let p n = threadDelay (n * 1000000) >> return n
|
|
-- >>> Stream.toList $ Stream.fromParallel $ p 3 |: p 2 |: p 1 |: Stream.nil
|
|
-- [1,2,3]
|
|
--
|
|
-- >>> Stream.toList $ Stream.fromAhead $ p 3 |: p 2 |: p 1 |: Stream.nil
|
|
-- [3,2,1]
|
|
--
|
|
-- The following finishes in 10 seconds (100 seconds when serial):
|
|
--
|
|
-- >>> Stream.drain $ Stream.fromAsync $ Stream.replicateM 10 $ p 10
|
|
--
|
|
|
|
-- $concurrentTransformation
|
|
--
|
|
-- Monadic transformation functions 'mapM' and 'sequence' work concurrently
|
|
-- when used with appropriate stream type combinators. The pure versions do not
|
|
-- work concurrently, however you can use the monadic versions even for pure
|
|
-- computations to get the concurrent transformation capability where required.
|
|
--
|
|
-- This would print a value every second (2 seconds when serial):
|
|
--
|
|
-- >>> let p n = threadDelay (n * 1000000) >> return n
|
|
-- >>> :{
|
|
-- parMap =
|
|
-- Stream.repeatM (p 1)
|
|
-- & Stream.fromSerial -- repeatM is serial
|
|
-- & Stream.mapM (\x -> p 1 >> print x)
|
|
-- & Stream.fromAhead -- mapM is cocnurrent using Ahead style
|
|
-- & Stream.drain
|
|
-- :}
|
|
--
|
|
|
|
-- $concurrentApplication
|
|
--
|
|
-- The concurrent function application operators '|$' and '|&' apply a stream
|
|
-- argument to a stream function concurrently to compose a concurrent pipeline
|
|
-- of stream processing functions:
|
|
--
|
|
-- Because both the stages run concurrently, we would see a delay of only 1
|
|
-- second instead of 2 seconds in the following:
|
|
--
|
|
-- >>> let p n = threadDelay (n * 1000000) >> return n
|
|
-- >>> :{
|
|
-- parApp =
|
|
-- Stream.repeatM (p 1)
|
|
-- |& Stream.mapM (\x -> p 1 >> print x)
|
|
-- & Stream.drain
|
|
-- :}
|
|
|
|
-- $ahead
|
|
--
|
|
-- The 'Semigroup' operation '<>' of the 'Ahead' type combines two streams in a
|
|
-- /serial depth first/ manner with concurrent lookahead. We use the 'fromAhead'
|
|
-- type combinator to effect 'Ahead' style of composition. We can also use an
|
|
-- explicit 'Ahead' type annotation for the stream to achieve the same effect.
|
|
--
|
|
-- When two streams are combined in this manner, the streams are traversed in
|
|
-- depth first manner just like 'Serial', however it can execute the next
|
|
-- stream concurrently and keep the results ready when its turn arrives.
|
|
-- Concurrent execution of the next stream(s) is performed if the first stream
|
|
-- blocks or if it cannot produce output at the rate that is enough to meet the
|
|
-- consumer demand. Multiple streams can be executed concurrently to meet the
|
|
-- demand. The following example would print the result in a second even
|
|
-- though each action in each stream takes one second:
|
|
--
|
|
-- >>> p n = threadDelay 1000000 >> return n
|
|
-- >>> stream1 = p 1 |: p 2 |: Stream.nil
|
|
-- >>> stream2 = p 3 |: p 4 |: Stream.nil
|
|
-- >>> Stream.toList $ Stream.fromAhead $ stream1 <> stream2
|
|
-- [1,2,3,4]
|
|
--
|
|
-- Each stream is constructed 'fromAhead' and then both the streams are merged
|
|
-- 'fromAhead', therefore, all the actions can run concurrently but the result is
|
|
-- presented in serial order.
|
|
--
|
|
-- You can also use the polymorphic combinator 'ahead' in place of '<>' to
|
|
-- compose any type of streams in this manner.
|
|
|
|
-- $async
|
|
--
|
|
-- The 'Semigroup' operation '<>' of the 'Async' type combines the two
|
|
-- streams in a depth first manner with parallel look ahead. We use the
|
|
-- 'fromAsync' type combinator to effect 'Async' style of composition. We
|
|
-- can also use the 'Async' type annotation for the stream type to achieve
|
|
-- the same effect.
|
|
--
|
|
-- When two streams with multiple elements are combined in this manner, the
|
|
-- streams are traversed in depth first manner just like 'Serial', however it
|
|
-- can execute the next stream concurrently and return the results from it
|
|
-- as they arrive i.e. the results from the next stream may be yielded even
|
|
-- before the results from the first stream. Concurrent execution of the next
|
|
-- stream(s) is performed if the first stream blocks or if it cannot produce
|
|
-- output at the rate that is enough to meet the consumer demand. Multiple
|
|
-- streams can be executed concurrently to meet the demand.
|
|
-- In the example below each element in the stream introduces a constant delay
|
|
-- of 1 second, however, it takes just one second to produce all the results.
|
|
-- The results are not guaranteed to be in any particular order:
|
|
--
|
|
-- >>> p n = threadDelay 1000000 >> return n
|
|
-- >>> stream1 = p 1 |: p 2 |: Stream.nil
|
|
-- >>> stream2 = p 3 |: p 4 |: Stream.nil
|
|
-- >>> Stream.toList $ Stream.fromAsync $ stream1 <> stream2
|
|
-- ...
|
|
--
|
|
-- The constituent streams are also composed in 'Async' manner and the
|
|
-- composition of streams too. We can compose the constituent streams to run
|
|
-- serially, in that case it would take 2 seconds to produce all the results.
|
|
-- The elements in the serial streams would be in serial order in the results:
|
|
--
|
|
-- >>> p n = threadDelay 1000000 >> return n
|
|
-- >>> stream = (Stream.fromSerial stream1) <> (Stream.fromSerial stream2)
|
|
-- >>> Stream.toList $ Stream.fromAsync stream
|
|
-- ...
|
|
--
|
|
-- In the following example we can see that new threads are started when a
|
|
-- computation blocks. Notice that the output from the stream with the
|
|
-- shortest delay is printed first. The whole computation takes @maximum of
|
|
-- (3, 2, 1) = 3@ seconds:
|
|
--
|
|
-- >>> Stream.drain $ Stream.fromAsync $ delay 3 <> delay 2 <> delay 1
|
|
-- ThreadId ...: Delay 1
|
|
-- ThreadId ...: Delay 2
|
|
-- ThreadId ...: Delay 3
|
|
--
|
|
-- When we have a tree of computations composed using this style, the tree is
|
|
-- traversed in DFS style just like the 'Serial' style, the only difference is
|
|
-- that here we can move on to executing the next stream if a stream blocks.
|
|
-- However, we will not start new threads if we have sufficient output to
|
|
-- saturate the consumer. This is why we call it left-biased demand driven or
|
|
-- adaptive concurrency style, the concurrency tends to stay on the left side
|
|
-- of the composition as long as possible. More threads are started based on
|
|
-- the pull rate of the consumer. The following example prints an output every
|
|
-- second as all of the actions are concurrent.
|
|
--
|
|
-- >>> Stream.drain $ Stream.fromAsync $ (delay 1 <> delay 2) <> (delay 3 <> delay 4)
|
|
-- ThreadId ...: Delay 1
|
|
-- ThreadId ...: Delay 2
|
|
-- ThreadId ...: Delay 3
|
|
-- ThreadId ...: Delay 4
|
|
--
|
|
-- All the computations may even run in a single thread when more threads are
|
|
-- not needed. As you can see, in the following example the computations are
|
|
-- run in a single thread one after another, because none of them blocks.
|
|
-- However, if the thread consuming the stream were faster than the producer
|
|
-- then it would have started parallel threads for each computation to keep up
|
|
-- even if none of them blocks:
|
|
--
|
|
-- >>> :{
|
|
-- traced m = Stream.fromEffect (myThreadId >>= print) >> return m
|
|
-- stream = traced (sqrt 9) <> traced (sqrt 16) <> traced (sqrt 25)
|
|
-- main = Stream.drain $ Stream.fromAsync stream
|
|
-- :}
|
|
--
|
|
-- Note that the order of printing in the above examples may change due to
|
|
-- variations in scheduling latencies for concurrent threads.
|
|
--
|
|
-- The polymorphic version of the 'Async' binary operation '<>' is called
|
|
-- 'async'. We can use 'async' to join streams in a left biased
|
|
-- adaptively concurrent manner irrespective of the type, notice that we have
|
|
-- not used the 'fromAsync' combinator in the following example:
|
|
--
|
|
-- >>> Stream.drain $ delay 3 `Stream.async` delay 2 `Stream.async` delay 1
|
|
-- ThreadId ...: Delay 1
|
|
-- ThreadId ...: Delay 2
|
|
-- ThreadId ...: Delay 3
|
|
--
|
|
-- Since the concurrency provided by this operator is demand driven it cannot
|
|
-- be used when the composed computations start timers that are relative to
|
|
-- each other because all computations may not be started at the same time and
|
|
-- therefore timers in all of them may not start at the same time. When
|
|
-- relative timing among all computations is important or when we need to start
|
|
-- all computations at once for any reason 'Parallel' style must be used
|
|
-- instead.
|
|
--
|
|
-- 'Async' style utilizes resources optimally and should be preferred over
|
|
-- 'Parallel' or 'WAsync' unless you really need those. 'Async' should be used
|
|
-- when we know that the computations can run in parallel but we do not care if
|
|
-- they actually run in parallel or not, that decision can be left to the
|
|
-- scheduler based on demand. Also, note that 'async' operator can be used to fold
|
|
-- infinite number of streams in contrast to the 'Parallel' or 'WAsync' styles,
|
|
-- because it does not require us to run all of them at the same time in a fair
|
|
-- manner.
|
|
|
|
-- $wasync
|
|
--
|
|
-- The 'Semigroup' operation '<>' of the 'WAsync' type combines two streams in
|
|
-- a concurrent manner using /breadth first traversal/. We use the 'fromWAsync'
|
|
-- type combinator to effect 'WAsync' style of composition. We can also use the
|
|
-- 'WAsync' type annotation for the stream to achieve the same effect.
|
|
--
|
|
-- When streams with multiple elements are combined in this manner, we traverse
|
|
-- all the streams concurrently in a breadth first manner i.e. one action from
|
|
-- each stream is performed and yielded to the resulting stream before we come
|
|
-- back to the first stream again and so on. Even though we execute the actions
|
|
-- in a breadth first order the outputs are consumed on a first come first
|
|
-- serve basis.
|
|
--
|
|
-- In the following example we can see that outputs are produced in the breadth
|
|
-- first traversal order but this is not guaranteed.
|
|
--
|
|
-- >>> stream1 = print 1 |: print 2 |: Stream.nil
|
|
-- >>> stream2 = print 3 |: print 4 |: Stream.nil
|
|
-- >>> Stream.drain $ Stream.fromWAsync $ stream1 <> stream2
|
|
-- 1
|
|
-- 3
|
|
-- 2
|
|
-- 4
|
|
--
|
|
-- The polymorphic version of the binary operation '<>' of the 'WAsync' type is
|
|
-- 'wAsync'. We can use 'wAsync' to join streams using a breadth first
|
|
-- concurrent traversal irrespective of the type, notice that we have not used
|
|
-- the 'fromWAsync' combinator in the following example:
|
|
--
|
|
-- >>> Stream.drain $ delay 3 `Stream.wAsync` delay 2 `Stream.wAsync` delay 1
|
|
-- ThreadId ...: Delay 1
|
|
-- ThreadId ...: Delay 2
|
|
-- ThreadId ...: Delay 3
|
|
--
|
|
-- Since the concurrency provided by this style is demand driven it may not
|
|
-- be used when the composed computations start timers that are relative to
|
|
-- each other because all computations may not be started at the same time and
|
|
-- therefore timers in all of them may not start at the same time. When
|
|
-- relative timing among all computations is important or when we need to start
|
|
-- all computations at once for any reason 'Parallel' style must be used
|
|
-- instead.
|
|
--
|
|
|
|
-- $parallel
|
|
--
|
|
-- The 'Semigroup' operation '<>' of the 'Parallel' type combines the two
|
|
-- streams in a fairly concurrent manner with round robin scheduling. We use
|
|
-- the 'fromParallel' type combinator to effect 'Parallel' style of composition.
|
|
-- We can also use the 'Parallel' type annotation for the stream type to
|
|
-- achieve the same effect.
|
|
--
|
|
-- When two streams with multiple elements are combined in this manner, the
|
|
-- monadic actions in both the streams are performed concurrently with a fair
|
|
-- round robin scheduling. The outputs are yielded in the order in which the
|
|
-- actions complete. This is pretty similar to the 'WAsync' type, the
|
|
-- difference is that 'WAsync' is adaptive to the consumer demand and may or
|
|
-- may not execute all actions in parallel depending on the demand, whereas
|
|
-- 'Parallel' runs all the streams in parallel irrespective of the demand.
|
|
--
|
|
-- The polymorphic version of the binary operation '<>' of the 'Parallel' type
|
|
-- is 'parallel'. We can use 'parallel' to join streams in a fairly concurrent
|
|
-- manner irrespective of the type, notice that we have not used the
|
|
-- 'fromParallel' combinator in the following example:
|
|
--
|
|
-- >>> Stream.drain $ delay 3 `Stream.parallel` delay 2 `Stream.wAsync` delay 1
|
|
-- ThreadId ...: Delay 1
|
|
-- ThreadId ...: Delay 2
|
|
-- ThreadId ...: Delay 3
|
|
--
|
|
-- Note that this style of composition cannot be used to combine infinite
|
|
-- number of streams, as it will lead to an infinite sized scheduling queue.
|
|
--
|
|
|
|
-- XXX to be removed
|
|
-- $custom
|
|
--
|
|
-- The 'mkAsync' API can be used to create references to asynchronously running
|
|
-- stream computations. We can then use 'uncons' to explore the streams
|
|
-- arbitrarily and then recompose individual elements to create a new stream.
|
|
-- This way we can dynamically decide which stream to explore at any given
|
|
-- time. Take an example of a merge sort of two sorted streams. We need to
|
|
-- keep consuming items from the stream which has the lowest item in the sort
|
|
-- order. This can be achieved using async references to streams. See
|
|
-- "MergeSort.hs" in <https://github.com/composewell/streamly-examples Streamly Examples>.
|
|
|
|
-- $monoid
|
|
--
|
|
-- All of the following are equivalent and start ten concurrent tasks each with
|
|
-- a delay from 1 to 10 seconds, resulting in the printing of each number every
|
|
-- second:
|
|
--
|
|
-- >>> :{
|
|
-- main = do
|
|
-- Stream.drain $ Stream.fromAsync $ foldMap delay [1..10]
|
|
-- Stream.drain $ Stream.concatFoldableWith Stream.async (map delay [1..10])
|
|
-- Stream.drain $ Stream.concatMapFoldableWith Stream.async delay [1..10]
|
|
-- Stream.drain $ Stream.concatForFoldableWith Stream.async [1..10] delay
|
|
-- :}
|
|
--
|
|
|
|
-- $aheadNesting
|
|
--
|
|
-- The 'Monad' composition of 'Ahead' type behaves just like 'Serial' except
|
|
-- that it can speculatively perform a bounded number of next iterations of a
|
|
-- loop concurrently.
|
|
--
|
|
-- >>> :{
|
|
-- Stream.toList $ Stream.fromAhead $ do
|
|
-- x <- Stream.fromFoldable [3,2,1]
|
|
-- delay x
|
|
-- return x
|
|
-- :}
|
|
-- ThreadId ...: Delay 1
|
|
-- ThreadId ...: Delay 2
|
|
-- ThreadId ...: Delay 3
|
|
-- [3,2,1]
|
|
--
|
|
-- This code finishes in 3 seconds, 'Serial' would take 6 seconds. As we can
|
|
-- see all the three iterations are concurrent and run in different threads,
|
|
-- however, the results are returned in the serial order.
|
|
--
|
|
-- Concurrency is demand driven, when multiple streams are composed using this
|
|
-- style, the iterations are executed in a depth first manner just like
|
|
-- 'Serial' i.e. nested iterations are executed before we proceed to the next
|
|
-- outer iteration. The only difference is that we may execute multiple future
|
|
-- iterations concurrently and keep the results ready.
|
|
--
|
|
-- The 'fromAhead' type combinator can be used to switch to this style of
|
|
-- composition. Alternatively, a type annotation can be used to specify the
|
|
-- type of the stream as 'Ahead'.
|
|
--
|
|
|
|
-- $concurrentNesting
|
|
--
|
|
-- The 'Monad' composition of 'Async' type can perform the iterations of a
|
|
-- loop concurrently.
|
|
--
|
|
-- >>> :{
|
|
-- Stream.drain $ Stream.fromAsync $ do
|
|
-- x <- Stream.fromFoldable [3,2,1]
|
|
-- delay x
|
|
-- :}
|
|
-- ThreadId ...: Delay 1
|
|
-- ThreadId ...: Delay 2
|
|
-- ThreadId ...: Delay 3
|
|
--
|
|
-- As we can see the code after the @fromFoldable@ statement is run three
|
|
-- times, once for each value of @x@. All the three iterations are concurrent
|
|
-- and run in different threads. The iteration with least delay finishes first.
|
|
-- When compared to imperative programming, this can be viewed as a @for@ loop
|
|
-- with three concurrent iterations.
|
|
--
|
|
-- Concurrency is demand driven i.e. more concurrent iterations are started
|
|
-- only if the previous iterations are not able to saturate the consumer of the
|
|
-- output stream. This works exactly the same way as the merging of two
|
|
-- streams using 'async' works.
|
|
--
|
|
-- The 'fromAsync' type combinator can be used to switch to this style of
|
|
-- composition. Alternatively, a type annotation can be used to specify the
|
|
-- type of the stream as 'Async'.
|
|
--
|
|
-- When multiple streams are nested using this style, the iterations are
|
|
-- concurrently evaluated in a depth first manner:
|
|
--
|
|
--
|
|
-- >>> :{
|
|
-- Stream.drain $ Stream.fromAsync $ do
|
|
-- x <- Stream.fromFoldable [1,2]
|
|
-- y <- Stream.fromFoldable [3,4]
|
|
-- Stream.fromEffect $ putStrLn $ show (x, y)
|
|
-- :}
|
|
-- (1,3)
|
|
-- (1,4)
|
|
-- (2,3)
|
|
-- (2,4)
|
|
--
|
|
-- Nested iterations are given preference for concurrent evaluation i.e.
|
|
-- (1,4) will be scheduled in preference to (2,3).
|
|
|
|
-- $wasyncNesting
|
|
--
|
|
-- Like 'Async', the 'Monad' composition of 'WAsync' runs the iterations of a
|
|
-- loop concurrently. It differs from 'Async' in the nested loop behavior. Like
|
|
-- 'WSerial', the nested loops in this type are traversed and executed in a
|
|
-- breadth first manner rather than the depth first manner of 'Async' style.
|
|
--
|
|
-- >>> :{
|
|
-- Stream.drain $ Stream.fromWAsync $ do
|
|
-- x <- Stream.fromSerial $ Stream.fromFoldable [1,2]
|
|
-- y <- Stream.fromSerial $ Stream.fromFoldable [3,4]
|
|
-- Stream.fromEffect $ putStrLn $ show (x, y)
|
|
-- :}
|
|
-- (1,3)
|
|
-- (1,4)
|
|
-- (2,3)
|
|
-- (2,4)
|
|
--
|
|
-- Note that (2,3) is preferred to (1,4) when evaluating the iterations
|
|
-- concurrently. This works exactly the same way as the merging of two streams
|
|
-- using 'wAsync' works.
|
|
--
|
|
-- The 'fromWAsync' type combinator can be used to switch to this style of
|
|
-- composition. Alternatively, a type annotation can be used to specify the
|
|
-- type of the stream as 'WAsync'.
|
|
--
|
|
|
|
-- $parallelNesting
|
|
--
|
|
-- Just like 'Async' or 'WAsync' the 'Monad' composition of 'Parallel' runs the
|
|
-- iterations of a loop concurrently.
|
|
--
|
|
-- >>> :{
|
|
-- Stream.drain $ Stream.fromParallel $ do
|
|
-- x <- Stream.fromFoldable [3,2,1]
|
|
-- delay x
|
|
-- :}
|
|
-- ThreadId ...: Delay 1
|
|
-- ThreadId ...: Delay 2
|
|
-- ThreadId ...: Delay 3
|
|
--
|
|
-- It differs from 'Async' and 'WAsync' in the nested loop behavior. All
|
|
-- iterations of the loop are run fully concurrently irrespective of the
|
|
-- demand. This works exactly the same way as the merging of streams using
|
|
-- 'parallel' works.
|
|
--
|
|
-- The 'fromParallel' type combinator can be used to switch to this style of
|
|
-- composition. Alternatively, a type annotation can be used to specify the
|
|
-- type of the stream as 'Parallel'.
|
|
--
|
|
|
|
-- $applicative
|
|
--
|
|
-- 'Async' can run the iterations concurrently, therefore, it takes a total
|
|
-- of 6 seconds which is max (1, 2) + max (3, 4):
|
|
--
|
|
-- >>> (Stream.toList $ Stream.fromAsync $ (,) <$> s1 <*> s2) >>= print
|
|
-- ...
|
|
--
|
|
-- @
|
|
-- ThreadId 34: Delay 1
|
|
-- ThreadId 36: Delay 2
|
|
-- ThreadId 35: Delay 3
|
|
-- ThreadId 36: Delay 3
|
|
-- ThreadId 35: Delay 4
|
|
-- ThreadId 36: Delay 4
|
|
-- [(1,3),(2,3),(1,4),(2,4)]
|
|
-- @
|
|
--
|
|
-- Similarly, 'WAsync' as well can run the iterations concurrently, but with a
|
|
-- different style of scheduling than 'Async' as explained in the Monad
|
|
-- section, therefore, it too takes a total of 6 seconds (2 + 4):
|
|
--
|
|
-- >>> (Stream.toList $ Stream.fromWAsync $ (,) <$> s1 <*> s2) >>= print
|
|
-- ...
|
|
--
|
|
-- @
|
|
-- ThreadId 34: Delay 1
|
|
-- ThreadId 36: Delay 2
|
|
-- ThreadId 35: Delay 3
|
|
-- ThreadId 36: Delay 3
|
|
-- ThreadId 35: Delay 4
|
|
-- ThreadId 36: Delay 4
|
|
-- [(1,3),(2,3),(1,4),(2,4)]
|
|
-- @
|
|
|
|
-- $parallelzip
|
|
--
|
|
-- The applicative instance of 'ZipAsync' type zips streams concurrently.
|
|
-- 'fromZipAsync' type combinator can be used to switch to parallel applicative
|
|
-- zip composition:
|
|
--
|
|
-- This takes 7 seconds to zip, which is max (1,3) + max (2,4) because 1 and 3
|
|
-- are produced concurrently, and 2 and 4 are produced concurrently:
|
|
--
|
|
-- >>> d n = delay n >> return n
|
|
-- >>> s1 = Stream.fromSerial $ d 1 <> d 2
|
|
-- >>> s2 = Stream.fromSerial $ d 3 <> d 4
|
|
-- >>> (Stream.toList $ Stream.fromZipAsync $ (,) <$> s1 <*> s2) >>= print
|
|
-- ThreadId ...: Delay 1
|
|
-- ThreadId ...: Delay 2
|
|
-- ThreadId ...: Delay 3
|
|
-- ThreadId ...: Delay 4
|
|
-- [(1,3),(2,4)]
|
|
--
|
|
|
|
-- $concurrent
|
|
--
|
|
-- When writing concurrent programs there are two distinct places where the
|
|
-- programmer can control the concurrency. First, when /composing/ a stream by
|
|
-- merging multiple streams we can choose an appropriate sum style operators to
|
|
-- combine them concurrently or serially. Second, when /processing/ a stream in
|
|
-- a monadic composition we can choose one of the monad composition types to
|
|
-- choose the desired type of concurrency.
|
|
--
|
|
-- In the following example the squares of @x@ and @y@ are computed
|
|
-- concurrently using the 'async' operation and the square roots of their
|
|
-- sum are computed serially because of the 'fromSerial' combinator. We can
|
|
-- choose different combinators for the monadic processing and the stream
|
|
-- generation, to control the concurrency. We can also use the 'fromAsync'
|
|
-- combinator instead of explicitly folding with 'async'.
|
|
--
|
|
-- >>> import Data.List (sum)
|
|
-- >>> :{
|
|
-- main = do
|
|
-- z <- Stream.toList
|
|
-- $ Stream.fromSerial -- Serial monadic processing (sqrt below)
|
|
-- $ do
|
|
-- x2 <- Stream.concatForFoldableWith Stream.async [1..100] $ -- Concurrent @"for"@ loop
|
|
-- \x -> return $ x * x -- body of the loop
|
|
-- y2 <- Stream.concatForFoldableWith Stream.async [1..100] $
|
|
-- \y -> return $ y * y
|
|
-- return $ sqrt (x2 + y2)
|
|
-- print $ sum z
|
|
-- :}
|
|
--
|
|
-- We can see how this directly maps to the imperative style
|
|
-- <https://en.wikipedia.org/wiki/OpenMP OpenMP> model, we use combinators
|
|
-- and operators instead of the ugly pragmas.
|
|
--
|
|
-- For more concurrent programming examples see,
|
|
-- <https://github.com/composewell/streamly-examples>.
|
|
|
|
-- $furtherReading
|
|
--
|
|
-- * Read the reactive programming tutorial
|
|
-- * See the examples in <https://github.com/composewell/streamly-examples streamly-examples> repo.
|