Add some concurrent combinators for folds

This commit is contained in:
Harendra Kumar 2024-08-10 10:37:49 +05:30
parent d5e3468b66
commit 006f9bb61d
5 changed files with 156 additions and 31 deletions

View File

@ -1,3 +1,4 @@
{-# LANGUAGE CPP #-}
-- |
-- Module : Streamly.Data.Fold.Prelude
-- Copyright : (c) 2021 Composewell Technologies
@ -11,6 +12,12 @@
--
module Streamly.Data.Fold.Prelude
(
-- * Setup
-- | To execute the code examples provided in this module in ghci, please
-- run the following commands first.
--
-- $setup
-- * "Streamly.Data.Fold"
-- | All "Streamly.Data.Fold" combinators are re-exported via this
-- module. For more pre-release combinators also see
@ -39,16 +46,12 @@ import Streamly.Internal.Data.Fold (toContainerIO)
import Streamly.Internal.Data.Fold.Prelude
import Streamly.Internal.Data.IsMap.HashMap ()
#include "DocTestDataFold.hs"
-- | Split the input stream based on a hashable component of the key field and
-- fold each split using the given fold. Useful for map/reduce, bucketizing
-- the input in different bins or for generating histograms.
--
-- >>> import Data.List (sortOn)
-- >>> import Data.HashMap.Strict (HashMap)
-- >>> import qualified Data.HashMap.Strict as HM
-- >>> import qualified Streamly.Data.Fold.Prelude as Fold
-- >>> import qualified Streamly.Data.Stream as Stream
--
-- Consider a stream of key value pairs:
--
-- >>> input = Stream.fromList [("k1",1),("k1",1.1),("k2",2), ("k2",2.2)]

View File

@ -12,11 +12,6 @@
-- concurrently with the driver. The driver just pushes an element to the
-- fold's buffer and waits for async evaluation to finish.
--
-- Avoid scanning a stream using a concurrent fold. When scanning a stream
-- using a concurrent fold we need to keep in mind that the result of the scan
-- may be delayed because of the asynchronous execution. The results may not be
-- same as in the case of a synchronous fold.
--
-- Stages in a fold pipeline can be made concurrent using 'parEval'.
--
-- = Concurrent Fold Combinators
@ -46,10 +41,6 @@
-- element is a streamable chunk and each fold consumes one chunk at a time.
-- This is like parConcatMap in streams.
--
-- We also need to have a lconcatMap to expand the chunks in the input to
-- streams before folding. This will require an input Skip constructor. In
-- fact, parLmapM would be implemented in terms of this like in streams.
--
-- Concurrent append: if one fold's buffer becomes full then use the next one
-- Concurrent interleave/partition: Round robin to n folds.
-- Concurrent distribute to multiple folds.
@ -57,6 +48,11 @@
module Streamly.Internal.Data.Fold.Concurrent
(
parEval
, parLmapM
, parTeeWith
, parDistribute
, parPartition
, parUnzipWithM
)
where
@ -69,6 +65,8 @@ import Streamly.Internal.Data.Fold (Fold(..), Step (..))
import Streamly.Internal.Data.Channel.Worker (sendEvent)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import qualified Streamly.Internal.Data.Fold as Fold
import Streamly.Internal.Data.Fold.Channel.Type
import Streamly.Internal.Data.Channel.Types
@ -78,10 +76,25 @@ import Streamly.Internal.Data.Channel.Types
-- XXX Cleanup the fold if the stream is interrupted. Add a GC hook.
-- | Evaluate a fold asynchronously using a concurrent channel. The driver just
-- queues the input stream values to the fold channel buffer and returns. The
-- fold evaluates the queued values asynchronously. On finalization, 'parEval'
-- waits for the asynchronous fold to complete before it returns.
-- | 'parEval' introduces a concurrent stage at the input of the fold. The
-- inputs are asynchronously queued in a buffer and evaluated concurrently with
-- the evaluation of the source stream. On finalization, 'parEval' waits for
-- the asynchronous fold to complete before it returns.
--
-- In the following example both the stream and the fold have a 1 second delay,
-- but the delay is not compounded because both run concurrently.
--
-- >>> delay x = threadDelay 1000000 >> print x >> return x
--
-- >>> src = Stream.delay 1 (Stream.enumerateFromTo 1 3)
-- >>> dst = Fold.parEval id (Fold.lmapM delay Fold.sum)
-- >>> Stream.fold dst src
-- ...
--
-- Another example:
--
-- >>> Stream.toList $ Stream.groupsOf 4 dst src
-- ...
--
{-# INLINABLE parEval #-}
parEval :: MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b
@ -128,11 +141,6 @@ parEval modifier f =
Nothing -> Partial chan
Just b -> Done b
-- XXX We can use a separate type for non-scanning folds that will
-- introduce a lot of complexity. Are there combinators that rely on the
-- "extract" function even in non-scanning use cases?
-- Instead of making such folds partial we can also make them return a
-- Maybe type.
extract _ = error "Concurrent folds do not support scanning"
-- XXX depending on the use case we may want to either wait for the result
@ -161,3 +169,101 @@ parEval modifier f =
writeIORef (svarStopTime (svarStats chan)) (Just t)
printSVar (dumpChannel chan) "SVar Done"
return b
-- XXX We can have a lconcatMap (unfoldMany) to expand the chunks in the input
-- to streams before folding. This will require an input Skip constructor. In
-- fact, parLmapM can be implemented in terms of this like in streams.
-- | Evaluate the mapped actions concurrently with respect to each other. The
-- results may be unordered or ordered depending on the configuration.
--
-- /Unimplemented/
{-# INLINABLE parLmapM #-}
parLmapM :: -- MonadAsync m =>
(Config -> Config) -> (a -> m b) -> Fold m b r -> Fold m a r
parLmapM = undefined
-- | Execute both the folds in a tee concurrently.
--
-- Definition:
--
-- >>> parTeeWith cfg f c1 c2 = Fold.teeWith f (Fold.parEval cfg c1) (Fold.parEval cfg c2)
--
-- Example:
--
-- >>> delay x = threadDelay 1000000 >> print x >> return x
-- >>> c1 = Fold.lmapM delay Fold.sum
-- >>> c2 = Fold.lmapM delay Fold.length
-- >>> dst = Fold.parTeeWith id (,) c1 c2
-- >>> Stream.fold dst src
-- ...
--
{-# INLINABLE parTeeWith #-}
parTeeWith :: MonadAsync m =>
(Config -> Config)
-> (a -> b -> c)
-> Fold m x a
-> Fold m x b
-> Fold m x c
parTeeWith cfg f c1 c2 = Fold.teeWith f (parEval cfg c1) (parEval cfg c2)
-- | Distribute the input to all the folds in the supplied list concurrently.
--
-- Definition:
--
-- >>> parDistribute cfg = Fold.distribute . fmap (Fold.parEval cfg)
--
-- Example:
--
-- >>> delay x = threadDelay 1000000 >> print x >> return x
-- >>> c = Fold.lmapM delay Fold.sum
-- >>> dst = Fold.parDistribute id [c,c,c]
-- >>> Stream.fold dst src
-- ...
--
{-# INLINABLE parDistribute #-}
parDistribute :: MonadAsync m =>
(Config -> Config) -> [Fold m a b] -> Fold m a [b]
parDistribute cfg = Fold.distribute . fmap (parEval cfg)
-- | Select first fold for Left input and second for Right input. Both folds
-- run concurrently.
--
-- Definition
--
-- >>> parPartition cfg c1 c2 = Fold.partition (Fold.parEval cfg c1) (Fold.parEval cfg c2)
--
-- Example:
--
-- >>> delay x = threadDelay 1000000 >> print x >> return x
-- >>> c1 = Fold.lmapM delay Fold.sum
-- >>> c2 = Fold.lmapM delay Fold.sum
-- >>> dst = Fold.parPartition id c1 c2
-- >>> Stream.fold dst $ (fmap (\x -> if even x then Left x else Right x)) src
-- ...
--
{-# INLINABLE parPartition #-}
parPartition :: MonadAsync m =>
(Config -> Config) -> Fold m b x -> Fold m c y -> Fold m (Either b c) (x, y)
parPartition cfg c1 c2 = Fold.partition (parEval cfg c1) (parEval cfg c2)
-- | Split and distribute the output to two different folds and then zip the
-- results. Both the consumer folds run concurrently.
--
-- Definition
--
-- >>> parUnzipWithM cfg f c1 c2 = Fold.unzipWithM f (Fold.parEval cfg c1) (Fold.parEval cfg c2)
--
-- Example:
--
-- >>> delay x = threadDelay 1000000 >> print x >> return x
-- >>> c1 = Fold.lmapM delay Fold.sum
-- >>> c2 = Fold.lmapM delay Fold.sum
-- >>> dst = Fold.parUnzipWithM id (pure . id) c1 c2
-- >>> Stream.fold dst $ (fmap (\x -> (x, x* x))) src
-- ...
--
{-# INLINABLE parUnzipWithM #-}
parUnzipWithM :: MonadAsync m
=> (Config -> Config) -> (a -> m (b,c)) -> Fold m b x -> Fold m c y -> Fold m a (x,y)
parUnzipWithM cfg f c1 c2 = Fold.unzipWithM f (parEval cfg c1) (parEval cfg c2)

View File

@ -1,3 +1,4 @@
{-# LANGUAGE CPP #-}
-- |
-- Module : Streamly.Internal.Data.Fold.Time
-- Copyright : (c) 2019 Composewell Technologies
@ -23,13 +24,7 @@ import Streamly.Internal.Data.Fold (Fold(..), Step (..))
import Streamly.Internal.Control.Concurrent (MonadAsync, withRunInIO)
import Streamly.Internal.Data.Tuple.Strict (Tuple3'(..))
-- $setup
-- >>> :m
-- >>> :set -fno-warn-deprecations
-- >>> :set -XFlexibleContexts
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Fold.Async as Fold
#include "DocTestDataFold.hs"
-- XXX We can use asyncClock here. A parser can be used to return an input that
-- arrives after the timeout.

View File

@ -0,0 +1,19 @@
{- $setup
>>> :m
>>> :set -XFlexibleContexts
>>> import Control.Concurrent (threadDelay)
>>> import Data.List (sortOn)
>>> import Data.HashMap.Strict (HashMap)
>>> import Streamly.Data.Fold (Fold)
>>> import Streamly.Data.Stream (Stream)
>>> import qualified Data.HashMap.Strict as HM
>>> import qualified Streamly.Data.Fold.Prelude as Fold
>>> import qualified Streamly.Data.Stream.Prelude as Stream
For APIs that have not been released yet.
>>> import qualified Streamly.Internal.Data.Fold as Fold
>>> import qualified Streamly.Internal.Data.Fold.Prelude as Fold
-}

View File

@ -95,6 +95,7 @@ extra-source-files:
bin/mk-tags.sh
configure
configure.ac
src/doctest/*.hs
targets/streamly-targets.cabal
targets/Targets.hs
test/test-runner/Main.hs
@ -362,6 +363,7 @@ library
include-dirs:
src
, src/doctest
, src/Streamly/Internal/Data/Stream
hs-source-dirs: src