Use list based concat functions in place of binary ops

This commit is contained in:
Harendra Kumar 2022-10-16 16:18:27 +05:30 committed by Harendra Kumar
parent 0cd6d6248a
commit f7415e9b44
3 changed files with 155 additions and 90 deletions

View File

@ -41,38 +41,33 @@ module Streamly.Data.Stream.Concurrent
-- | Stream combinators using a concurrent channel.
-- ** Evaluate
-- | Evaluate a stream concurrently using a channel.
-- | Evaluates a stream concurrently using a channel.
, eval
, evalWith
-- ** Map
-- | Use a single channel to evaluate mapped actions concurrently.
-- | Uses a single channel to evaluate mapped actions concurrently.
, mapM
, mapMWith
, sequence
, sequenceWith
-- ** Combine two
-- | Use one channel for each pair. When you have to chain more than two
-- actions concat family of operations are much more efficient because they
-- use a single channel for all streams.
--
-- XXX Do not expose binary operations, instead use these to concatenate n
-- streams from a list in the given style?
-- ** List of streams
-- | Shares a single channel across many streams.
, append
, ahead
, parallel
, combineWith
, concatListWith
-- ** Apply
-- | Apply arguments to a function concurrently. Uses a separate channel
-- for each application.
-- ** Stream of streams
-- *** Apply
-- | Apply argument streams to a function concurrently. Uses a separate
-- channel for each application.
, apply
, applyWith
-- ** Combine many
-- | Share a single channel across many streams.
, concatList
-- *** Concat
-- | Shares a single channel across many streams.
, concat
, concatWith
, concatMap

View File

@ -46,13 +46,13 @@ module Streamly.Internal.Data.Stream.Concurrent
-- | Stream combinators using a concurrent channel
-- ** Evaluate
-- | Evaluate a stream concurrently using a channel.
-- | Evaluates a stream concurrently using a channel.
, eval
, evalWith
-- Add unfoldrM/iterateM?
-- ** Map
-- | Use a single channel to evaluate all actions.
-- | Uses a single channel to evaluate all actions.
, mapM
, mapMWith
, sequence
@ -62,22 +62,32 @@ module Streamly.Internal.Data.Stream.Concurrent
-- ** Combine two
-- | Use a channel for each pair.
-- combine/concur/conjoin
, append2
, interleave2
, ahead2
, parallel2
, parallelFst2
, parallelMin2
, combineWith
-- ** List of streams
-- | Shares a single channel across many streams.
, append
, interleave
, ahead
, parallel
, parallelFst
, parallelMin
, combineWith
, concatListWith
-- ** Apply
-- | Use a separate channel for each application.
-- ** Stream of streams
-- *** Apply
-- | Uses a separate channel for each application.
, apply
, applyWith
-- ** Combine many
-- | Share a single channel across many streams.
, concatList
-- *** Concat
-- | Shares a single channel across many streams.
, concat
, concatWith
, concatMap
@ -161,7 +171,7 @@ eval :: MonadAsync m => Stream m a -> Stream m a
eval = evalWith id
-------------------------------------------------------------------------------
-- appending two streams
-- combining two streams
-------------------------------------------------------------------------------
{-# INLINE _appendGeneric #-}
@ -239,68 +249,68 @@ combineWith modifier stream1 stream2 =
-- all streams. However, with this operation you can precisely control the
-- scheduling by creating arbitrary shape expression trees.
--
-- >>> append = Async.combineWith id
-- >>> append2 = Async.combineWith id
--
-- The following code finishes in 4 seconds:
--
-- >>> stream1 = Stream.fromEffect (delay 4)
-- >>> stream2 = Stream.fromEffect (delay 2)
-- >>> Stream.fold Fold.toList $ stream1 `Async.append` stream2
-- >>> Stream.fold Fold.toList $ stream1 `Async.append2` stream2
-- 2 sec
-- 4 sec
-- [2,4]
--
{-# INLINE append #-}
append :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
append = combineWith id
{-# INLINE append2 #-}
append2 :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
append2 = combineWith id
-- | Like 'append' but interleaves the streams fairly instead of prioritizing
-- the left stream. This schedules all streams in a round robin fashion over
-- limited number of threads.
--
-- >>> interleave = Async.combineWith Async.interleaved
-- >>> interleave2 = Async.combineWith Async.interleaved
--
{-# INLINE interleave #-}
interleave :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
interleave = combineWith interleaved
{-# INLINE interleave2 #-}
interleave2 :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
interleave2 = combineWith interleaved
-- | Like 'append' but with 'ordered' on.
--
-- >>> ahead = Async.combineWith Async.ordered
-- >>> ahead2 = Async.combineWith Async.ordered
--
{-# INLINE ahead #-}
ahead :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
ahead = combineWith ordered
{-# INLINE ahead2 #-}
ahead2 :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
ahead2 = combineWith ordered
-- | Like 'append' but with 'eager' on.
-- | Like 'append2' but with 'eager' on.
--
-- >>> parallel = Async.combineWith Async.eager
-- >>> parallel2 = Async.combineWith Async.eager
--
{-# INLINE parallel #-}
parallel :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
parallel = combineWith eager
{-# INLINE parallel2 #-}
parallel2 :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
parallel2 = combineWith eager
-- | Like 'parallel' but stops the output as soon as the first stream stops.
-- | Like 'parallel2' but stops the output as soon as the first stream stops.
--
-- >>> parallelFst = Async.combineWith (Async.eager . Async.stopWhen Async.FirstStops)
-- >>> parallelFst2 = Async.combineWith (Async.eager . Async.stopWhen Async.FirstStops)
--
-- /Pre-release/
{-# INLINE parallelFst #-}
parallelFst :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
parallelFst = combineWith (eager . stopWhen FirstStops)
{-# INLINE parallelFst2 #-}
parallelFst2 :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
parallelFst2 = combineWith (eager . stopWhen FirstStops)
-- | Like 'parallel' but stops the output as soon as any of the two streams
-- | Like 'parallel2' but stops the output as soon as any of the two streams
-- stops.
--
-- >>> parallelMin = Async.combineWith (Async.eager . Async.stopWhen Async.AnyStops)
-- >>> parallelMin2 = Async.combineWith (Async.eager . Async.stopWhen Async.AnyStops)
--
-- /Pre-release/
{-# INLINE parallelMin #-}
parallelMin :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
parallelMin = combineWith (eager . stopWhen AnyStops)
{-# INLINE parallelMin2 #-}
parallelMin2 :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
parallelMin2 = combineWith (eager . stopWhen AnyStops)
-------------------------------------------------------------------------------
-- concat
-- concat streams
-------------------------------------------------------------------------------
-- | A runner function takes a queuing function @q@ and a stream, it splits the
@ -327,9 +337,6 @@ mkEnqueue chan runner = do
eagerDispatch chan
in q
-- XXX Can be renamed to concatMapWithK if we move concatMapWithK to higher
-- level module. We can keep only Channel based ops in this module.
-- | Takes the head element of the input stream and queues the tail of the
-- stream to the channel, then maps the supplied function on the head and
-- evaluates the resulting stream.
@ -489,13 +496,74 @@ concatWith modifier = concatMapWith modifier id
concat :: MonadAsync m => Stream m (Stream m a) -> Stream m a
concat = concatWith id
-------------------------------------------------------------------------------
-- concat Lists
-------------------------------------------------------------------------------
-- | Like 'concatWith' but works on a list of streams.
--
-- >>> concatListWith modifier = Async.concatWith modifier . Stream.fromList
--
{-# INLINE concatListWith #-}
concatListWith :: MonadAsync m => (Config -> Config) -> [Stream m a] -> Stream m a
concatListWith modifier = concatWith modifier . Stream.fromList
-- | Like 'concat' but works on a list of streams.
--
-- >>> concatList = Async.concat . Stream.fromList
-- >>> append = Async.concatListWith id
--
{-# INLINE concatList #-}
concatList :: MonadAsync m => [Stream m a] -> Stream m a
concatList = concat . Stream.fromList
{-# INLINE append #-}
append :: MonadAsync m => [Stream m a] -> Stream m a
append = concatListWith id
-- | Like 'append' but interleaves the streams fairly instead of prioritizing
-- the left stream. This schedules all streams in a round robin fashion over
-- limited number of threads.
--
-- >>> interleave = Async.concatListWith Async.interleaved
--
{-# INLINE interleave #-}
interleave :: MonadAsync m => [Stream m a] -> Stream m a
interleave = concatListWith interleaved
-- | Like 'append' but with 'ordered' on.
--
-- >>> ahead = Async.concatListWith Async.ordered
--
{-# INLINE ahead #-}
ahead :: MonadAsync m => [Stream m a] -> Stream m a
ahead = concatListWith ordered
-- | Like 'append' but with 'eager' on.
--
-- >>> parallel = Async.concatListWith Async.eager
--
{-# INLINE parallel #-}
parallel :: MonadAsync m => [Stream m a] -> Stream m a
parallel = concatListWith eager
-- | Like 'parallel' but stops the output as soon as the first stream stops.
--
-- >>> parallelFst = Async.concatListWith (Async.eager . Async.stopWhen Async.FirstStops)
--
-- /Pre-release/
{-# INLINE parallelFst #-}
parallelFst :: MonadAsync m => [Stream m a] -> Stream m a
parallelFst = concatListWith (eager . stopWhen FirstStops)
-- | Like 'parallel' but stops the output as soon as any of the two streams
-- stops.
--
-- >>> parallelMin = Async.concatListWith (Async.eager . Async.stopWhen Async.AnyStops)
--
-- /Pre-release/
{-# INLINE parallelMin #-}
parallelMin :: MonadAsync m => [Stream m a] -> Stream m a
parallelMin = concatListWith (eager . stopWhen AnyStops)
-------------------------------------------------------------------------------
-- Applicative
-------------------------------------------------------------------------------
{-# INLINE applyWith #-}
{-# SPECIALIZE applyWith ::
@ -513,6 +581,10 @@ applyWith modifier stream1 stream2 =
apply :: MonadAsync m => Stream m (a -> b) -> Stream m a -> Stream m b
apply = applyWith id
-------------------------------------------------------------------------------
-- Map
-------------------------------------------------------------------------------
-- |
-- >>> mapMWith modifier f = Async.concatMapWith modifier (Stream.fromEffect . f)
--

View File

@ -68,8 +68,9 @@ transformCombineFromList ::
transformCombineFromList constr eq listOp op a b c =
withMaxSuccess maxTestCount $
monadicIO $ do
stream <- run (Stream.fold Fold.toList $
constr a `Async.append` op (constr b `Async.append` constr c))
let s1 = op (Async.append [constr b, constr c])
let s2 = Async.append [constr a, s1]
stream <- run (Stream.fold Fold.toList s2)
let list = a <> listOp (b <> c)
listEquals eq stream list
@ -144,8 +145,8 @@ exceptionPropagation f = do
(Left (ExampleException "E") :: Either ExampleException [Int])
it "concatMap throwM" $ do
let s1 = Async.concatList $ fmap Stream.fromPure [1..4]
s2 = Async.concatList $ fmap Stream.fromPure [5..8]
let s1 = Async.concatListWith id $ fmap Stream.fromPure [1..4]
s2 = Async.concatListWith id $ fmap Stream.fromPure [5..8]
try $ tl (
let bind = flip Async.concatMap
in bind s1 $ \x ->
@ -186,9 +187,8 @@ timeOrdering f = do
takeCombined :: Int -> IO ()
takeCombined n = do
let constr = Stream.fromFoldable
r <- Stream.fold Fold.toList $
Stream.take n
(constr ([] :: [Int]) `Async.append` constr ([] :: [Int]))
let s = Async.append [constr ([] :: [Int]), constr ([] :: [Int])]
r <- Stream.fold Fold.toList $ Stream.take n s
r `shouldBe` []
---------------------------------------------------------------------------
@ -243,44 +243,42 @@ main = hspec
-- XXX Need to use eq instead of sortEq for ahead oeprations
-- Binary append
prop1 "append [] []"
$ cmp (Stream.nil `Async.append` Stream.nil) sortEq []
$ cmp (Async.append [Stream.nil, Stream.nil]) sortEq []
prop1 "append [] [1]"
$ cmp (Stream.nil `Async.append` Stream.fromPure 1) sortEq [1]
$ cmp (Async.append [Stream.nil, Stream.fromPure 1]) sortEq [1]
prop1 "append [1] []"
$ cmp (Stream.fromPure 1 `Async.append` Stream.nil) sortEq [1]
$ cmp (Async.append [Stream.fromPure 1, Stream.nil]) sortEq [1]
prop1 "append [0] [1]"
$ let stream =
Stream.fromPure 0 `Async.append` Stream.fromPure 1
$ let stream = Async.append [Stream.fromPure 0, Stream.fromPure 1]
in cmp stream sortEq [0, 1]
prop1 "append [0] [] [1]"
$ let stream =
Stream.fromPure 0
`Async.append` Stream.nil
`Async.append` Stream.fromPure 1
Async.append
[Stream.fromPure 0, Stream.nil, Stream.fromPure 1]
in cmp stream sortEq [0, 1]
prop1 "append left associated"
prop1 "append2 left associated"
$ let stream =
Stream.fromPure 0
`Async.append` Stream.fromPure 1
`Async.append` Stream.fromPure 2
`Async.append` Stream.fromPure 3
`Async.append2` Stream.fromPure 1
`Async.append2` Stream.fromPure 2
`Async.append2` Stream.fromPure 3
in cmp stream sortEq [0, 1, 2, 3]
prop1 "append right associated"
$ let stream =
Stream.fromPure 0
`Async.append` (Stream.fromPure 1
`Async.append` (Stream.fromPure 2
`Async.append` Stream.fromPure 3))
`Async.append2` (Stream.fromPure 1
`Async.append2` (Stream.fromPure 2
`Async.append2` Stream.fromPure 3))
in cmp stream sortEq [0, 1, 2, 3]
prop1 "append balanced"
$ let leaf x y = Stream.fromPure x `Async.append` Stream.fromPure y
leaf11 = leaf 0 1 `Async.append` leaf 2 (3 :: Int)
leaf12 = leaf 4 5 `Async.append` leaf 6 7
stream = leaf11 `Async.append` leaf12
$ let leaf x y = Stream.fromPure x `Async.append2` Stream.fromPure y
leaf11 = leaf 0 1 `Async.append2` leaf 2 (3 :: Int)
leaf12 = leaf 4 5 `Async.append2` leaf 6 7
stream = leaf11 `Async.append2` leaf12
in cmp stream sortEq [0, 1, 2, 3, 4, 5, 6,7]
prop1 "combineWith (maxThreads 1)"
@ -291,14 +289,14 @@ main = hspec
in cmp stream (==) [1,2,3,4,5,6,7,8,9,10]
prop1 "apply (async arg1)"
$ let s1 = Async.apply (pure (,)) (pure 1 `Async.append` pure 2)
$ let s1 = Async.apply (pure (,)) (pure 1 `Async.append2` pure 2)
s2 = Async.apply s1 (pure 3) :: Stream IO (Int, Int)
xs = Stream.fold Fold.toList s2
in sort <$> xs `shouldReturn` [(1, 3), (2, 3)]
prop1 "apply (async arg2)"
$ let s1 = pure (1,)
s2 = Async.apply s1 (pure 2 `Async.append` pure 3)
s2 = Async.apply s1 (pure 2 `Async.append2` pure 3)
xs = Stream.fold Fold.toList s2 :: IO [(Int, Int)]
in sort <$> xs `shouldReturn` [(1, 2), (1, 3)]
@ -319,6 +317,6 @@ main = hspec
#ifdef DEVBUILD
describe "Time ordering" $ timeOrdering Async.append
#endif
describe "Exception propagation" $ exceptionPropagation Async.append
describe "Exception propagation" $ exceptionPropagation Async.append2
-- Ad-hoc tests
it "takes n from stream of streams" $ takeCombined 2