From f7415e9b443d33bb185db8a688613885da1738e8 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Sun, 16 Oct 2022 16:18:27 +0530 Subject: [PATCH] Use list based concat functions in place of binary ops --- src/Streamly/Data/Stream/Concurrent.hs | 27 ++- .../Internal/Data/Stream/Concurrent.hs | 162 +++++++++++++----- test/Streamly/Test/Data/Stream/Concurrent.hs | 56 +++--- 3 files changed, 155 insertions(+), 90 deletions(-) diff --git a/src/Streamly/Data/Stream/Concurrent.hs b/src/Streamly/Data/Stream/Concurrent.hs index fd8745ed3..34d886933 100644 --- a/src/Streamly/Data/Stream/Concurrent.hs +++ b/src/Streamly/Data/Stream/Concurrent.hs @@ -41,38 +41,33 @@ module Streamly.Data.Stream.Concurrent -- | Stream combinators using a concurrent channel. -- ** Evaluate - -- | Evaluate a stream concurrently using a channel. + -- | Evaluates a stream concurrently using a channel. , eval , evalWith -- ** Map - -- | Use a single channel to evaluate mapped actions concurrently. + -- | Uses a single channel to evaluate mapped actions concurrently. , mapM , mapMWith , sequence , sequenceWith - -- ** Combine two - -- | Use one channel for each pair. When you have to chain more than two - -- actions concat family of operations are much more efficient because they - -- use a single channel for all streams. - -- - -- XXX Do not expose binary operations, instead use these to concatenate n - -- streams from a list in the given style? + -- ** List of streams + -- | Shares a single channel across many streams. , append , ahead , parallel - , combineWith + , concatListWith - -- ** Apply - -- | Apply arguments to a function concurrently. Uses a separate channel - -- for each application. + -- ** Stream of streams + -- *** Apply + -- | Apply argument streams to a function concurrently. Uses a separate + -- channel for each application. , apply , applyWith - -- ** Combine many - -- | Share a single channel across many streams. - , concatList + -- *** Concat + -- | Shares a single channel across many streams. , concat , concatWith , concatMap diff --git a/src/Streamly/Internal/Data/Stream/Concurrent.hs b/src/Streamly/Internal/Data/Stream/Concurrent.hs index 3253b4af1..254593262 100644 --- a/src/Streamly/Internal/Data/Stream/Concurrent.hs +++ b/src/Streamly/Internal/Data/Stream/Concurrent.hs @@ -46,13 +46,13 @@ module Streamly.Internal.Data.Stream.Concurrent -- | Stream combinators using a concurrent channel -- ** Evaluate - -- | Evaluate a stream concurrently using a channel. + -- | Evaluates a stream concurrently using a channel. , eval , evalWith -- Add unfoldrM/iterateM? -- ** Map - -- | Use a single channel to evaluate all actions. + -- | Uses a single channel to evaluate all actions. , mapM , mapMWith , sequence @@ -62,22 +62,32 @@ module Streamly.Internal.Data.Stream.Concurrent -- ** Combine two -- | Use a channel for each pair. -- combine/concur/conjoin + , append2 + , interleave2 + , ahead2 + , parallel2 + , parallelFst2 + , parallelMin2 + , combineWith + + -- ** List of streams + -- | Shares a single channel across many streams. , append , interleave , ahead , parallel , parallelFst , parallelMin - , combineWith + , concatListWith - -- ** Apply - -- | Use a separate channel for each application. + -- ** Stream of streams + -- *** Apply + -- | Uses a separate channel for each application. , apply , applyWith - -- ** Combine many - -- | Share a single channel across many streams. - , concatList + -- *** Concat + -- | Shares a single channel across many streams. , concat , concatWith , concatMap @@ -161,7 +171,7 @@ eval :: MonadAsync m => Stream m a -> Stream m a eval = evalWith id ------------------------------------------------------------------------------- --- appending two streams +-- combining two streams ------------------------------------------------------------------------------- {-# INLINE _appendGeneric #-} @@ -239,68 +249,68 @@ combineWith modifier stream1 stream2 = -- all streams. However, with this operation you can precisely control the -- scheduling by creating arbitrary shape expression trees. -- --- >>> append = Async.combineWith id +-- >>> append2 = Async.combineWith id -- -- The following code finishes in 4 seconds: -- -- >>> stream1 = Stream.fromEffect (delay 4) -- >>> stream2 = Stream.fromEffect (delay 2) --- >>> Stream.fold Fold.toList $ stream1 `Async.append` stream2 +-- >>> Stream.fold Fold.toList $ stream1 `Async.append2` stream2 -- 2 sec -- 4 sec -- [2,4] -- -{-# INLINE append #-} -append :: MonadAsync m => Stream m a -> Stream m a -> Stream m a -append = combineWith id +{-# INLINE append2 #-} +append2 :: MonadAsync m => Stream m a -> Stream m a -> Stream m a +append2 = combineWith id -- | Like 'append' but interleaves the streams fairly instead of prioritizing -- the left stream. This schedules all streams in a round robin fashion over -- limited number of threads. -- --- >>> interleave = Async.combineWith Async.interleaved +-- >>> interleave2 = Async.combineWith Async.interleaved -- -{-# INLINE interleave #-} -interleave :: MonadAsync m => Stream m a -> Stream m a -> Stream m a -interleave = combineWith interleaved +{-# INLINE interleave2 #-} +interleave2 :: MonadAsync m => Stream m a -> Stream m a -> Stream m a +interleave2 = combineWith interleaved -- | Like 'append' but with 'ordered' on. -- --- >>> ahead = Async.combineWith Async.ordered +-- >>> ahead2 = Async.combineWith Async.ordered -- -{-# INLINE ahead #-} -ahead :: MonadAsync m => Stream m a -> Stream m a -> Stream m a -ahead = combineWith ordered +{-# INLINE ahead2 #-} +ahead2 :: MonadAsync m => Stream m a -> Stream m a -> Stream m a +ahead2 = combineWith ordered --- | Like 'append' but with 'eager' on. +-- | Like 'append2' but with 'eager' on. -- --- >>> parallel = Async.combineWith Async.eager +-- >>> parallel2 = Async.combineWith Async.eager -- -{-# INLINE parallel #-} -parallel :: MonadAsync m => Stream m a -> Stream m a -> Stream m a -parallel = combineWith eager +{-# INLINE parallel2 #-} +parallel2 :: MonadAsync m => Stream m a -> Stream m a -> Stream m a +parallel2 = combineWith eager --- | Like 'parallel' but stops the output as soon as the first stream stops. +-- | Like 'parallel2' but stops the output as soon as the first stream stops. -- --- >>> parallelFst = Async.combineWith (Async.eager . Async.stopWhen Async.FirstStops) +-- >>> parallelFst2 = Async.combineWith (Async.eager . Async.stopWhen Async.FirstStops) -- -- /Pre-release/ -{-# INLINE parallelFst #-} -parallelFst :: MonadAsync m => Stream m a -> Stream m a -> Stream m a -parallelFst = combineWith (eager . stopWhen FirstStops) +{-# INLINE parallelFst2 #-} +parallelFst2 :: MonadAsync m => Stream m a -> Stream m a -> Stream m a +parallelFst2 = combineWith (eager . stopWhen FirstStops) --- | Like 'parallel' but stops the output as soon as any of the two streams +-- | Like 'parallel2' but stops the output as soon as any of the two streams -- stops. -- --- >>> parallelMin = Async.combineWith (Async.eager . Async.stopWhen Async.AnyStops) +-- >>> parallelMin2 = Async.combineWith (Async.eager . Async.stopWhen Async.AnyStops) -- -- /Pre-release/ -{-# INLINE parallelMin #-} -parallelMin :: MonadAsync m => Stream m a -> Stream m a -> Stream m a -parallelMin = combineWith (eager . stopWhen AnyStops) +{-# INLINE parallelMin2 #-} +parallelMin2 :: MonadAsync m => Stream m a -> Stream m a -> Stream m a +parallelMin2 = combineWith (eager . stopWhen AnyStops) ------------------------------------------------------------------------------- --- concat +-- concat streams ------------------------------------------------------------------------------- -- | A runner function takes a queuing function @q@ and a stream, it splits the @@ -327,9 +337,6 @@ mkEnqueue chan runner = do eagerDispatch chan in q --- XXX Can be renamed to concatMapWithK if we move concatMapWithK to higher --- level module. We can keep only Channel based ops in this module. - -- | Takes the head element of the input stream and queues the tail of the -- stream to the channel, then maps the supplied function on the head and -- evaluates the resulting stream. @@ -489,13 +496,74 @@ concatWith modifier = concatMapWith modifier id concat :: MonadAsync m => Stream m (Stream m a) -> Stream m a concat = concatWith id +------------------------------------------------------------------------------- +-- concat Lists +------------------------------------------------------------------------------- + +-- | Like 'concatWith' but works on a list of streams. +-- +-- >>> concatListWith modifier = Async.concatWith modifier . Stream.fromList +-- +{-# INLINE concatListWith #-} +concatListWith :: MonadAsync m => (Config -> Config) -> [Stream m a] -> Stream m a +concatListWith modifier = concatWith modifier . Stream.fromList + -- | Like 'concat' but works on a list of streams. -- --- >>> concatList = Async.concat . Stream.fromList +-- >>> append = Async.concatListWith id -- -{-# INLINE concatList #-} -concatList :: MonadAsync m => [Stream m a] -> Stream m a -concatList = concat . Stream.fromList +{-# INLINE append #-} +append :: MonadAsync m => [Stream m a] -> Stream m a +append = concatListWith id + +-- | Like 'append' but interleaves the streams fairly instead of prioritizing +-- the left stream. This schedules all streams in a round robin fashion over +-- limited number of threads. +-- +-- >>> interleave = Async.concatListWith Async.interleaved +-- +{-# INLINE interleave #-} +interleave :: MonadAsync m => [Stream m a] -> Stream m a +interleave = concatListWith interleaved + +-- | Like 'append' but with 'ordered' on. +-- +-- >>> ahead = Async.concatListWith Async.ordered +-- +{-# INLINE ahead #-} +ahead :: MonadAsync m => [Stream m a] -> Stream m a +ahead = concatListWith ordered + +-- | Like 'append' but with 'eager' on. +-- +-- >>> parallel = Async.concatListWith Async.eager +-- +{-# INLINE parallel #-} +parallel :: MonadAsync m => [Stream m a] -> Stream m a +parallel = concatListWith eager + +-- | Like 'parallel' but stops the output as soon as the first stream stops. +-- +-- >>> parallelFst = Async.concatListWith (Async.eager . Async.stopWhen Async.FirstStops) +-- +-- /Pre-release/ +{-# INLINE parallelFst #-} +parallelFst :: MonadAsync m => [Stream m a] -> Stream m a +parallelFst = concatListWith (eager . stopWhen FirstStops) + +-- | Like 'parallel' but stops the output as soon as any of the two streams +-- stops. +-- +-- >>> parallelMin = Async.concatListWith (Async.eager . Async.stopWhen Async.AnyStops) +-- +-- /Pre-release/ +{-# INLINE parallelMin #-} +parallelMin :: MonadAsync m => [Stream m a] -> Stream m a +parallelMin = concatListWith (eager . stopWhen AnyStops) + +------------------------------------------------------------------------------- +-- Applicative +------------------------------------------------------------------------------- {-# INLINE applyWith #-} {-# SPECIALIZE applyWith :: @@ -513,6 +581,10 @@ applyWith modifier stream1 stream2 = apply :: MonadAsync m => Stream m (a -> b) -> Stream m a -> Stream m b apply = applyWith id +------------------------------------------------------------------------------- +-- Map +------------------------------------------------------------------------------- + -- | -- >>> mapMWith modifier f = Async.concatMapWith modifier (Stream.fromEffect . f) -- diff --git a/test/Streamly/Test/Data/Stream/Concurrent.hs b/test/Streamly/Test/Data/Stream/Concurrent.hs index 47a81b355..bb6669427 100644 --- a/test/Streamly/Test/Data/Stream/Concurrent.hs +++ b/test/Streamly/Test/Data/Stream/Concurrent.hs @@ -68,8 +68,9 @@ transformCombineFromList :: transformCombineFromList constr eq listOp op a b c = withMaxSuccess maxTestCount $ monadicIO $ do - stream <- run (Stream.fold Fold.toList $ - constr a `Async.append` op (constr b `Async.append` constr c)) + let s1 = op (Async.append [constr b, constr c]) + let s2 = Async.append [constr a, s1] + stream <- run (Stream.fold Fold.toList s2) let list = a <> listOp (b <> c) listEquals eq stream list @@ -144,8 +145,8 @@ exceptionPropagation f = do (Left (ExampleException "E") :: Either ExampleException [Int]) it "concatMap throwM" $ do - let s1 = Async.concatList $ fmap Stream.fromPure [1..4] - s2 = Async.concatList $ fmap Stream.fromPure [5..8] + let s1 = Async.concatListWith id $ fmap Stream.fromPure [1..4] + s2 = Async.concatListWith id $ fmap Stream.fromPure [5..8] try $ tl ( let bind = flip Async.concatMap in bind s1 $ \x -> @@ -186,9 +187,8 @@ timeOrdering f = do takeCombined :: Int -> IO () takeCombined n = do let constr = Stream.fromFoldable - r <- Stream.fold Fold.toList $ - Stream.take n - (constr ([] :: [Int]) `Async.append` constr ([] :: [Int])) + let s = Async.append [constr ([] :: [Int]), constr ([] :: [Int])] + r <- Stream.fold Fold.toList $ Stream.take n s r `shouldBe` [] --------------------------------------------------------------------------- @@ -243,44 +243,42 @@ main = hspec -- XXX Need to use eq instead of sortEq for ahead oeprations -- Binary append prop1 "append [] []" - $ cmp (Stream.nil `Async.append` Stream.nil) sortEq [] + $ cmp (Async.append [Stream.nil, Stream.nil]) sortEq [] prop1 "append [] [1]" - $ cmp (Stream.nil `Async.append` Stream.fromPure 1) sortEq [1] + $ cmp (Async.append [Stream.nil, Stream.fromPure 1]) sortEq [1] prop1 "append [1] []" - $ cmp (Stream.fromPure 1 `Async.append` Stream.nil) sortEq [1] + $ cmp (Async.append [Stream.fromPure 1, Stream.nil]) sortEq [1] prop1 "append [0] [1]" - $ let stream = - Stream.fromPure 0 `Async.append` Stream.fromPure 1 + $ let stream = Async.append [Stream.fromPure 0, Stream.fromPure 1] in cmp stream sortEq [0, 1] prop1 "append [0] [] [1]" $ let stream = - Stream.fromPure 0 - `Async.append` Stream.nil - `Async.append` Stream.fromPure 1 + Async.append + [Stream.fromPure 0, Stream.nil, Stream.fromPure 1] in cmp stream sortEq [0, 1] - prop1 "append left associated" + prop1 "append2 left associated" $ let stream = Stream.fromPure 0 - `Async.append` Stream.fromPure 1 - `Async.append` Stream.fromPure 2 - `Async.append` Stream.fromPure 3 + `Async.append2` Stream.fromPure 1 + `Async.append2` Stream.fromPure 2 + `Async.append2` Stream.fromPure 3 in cmp stream sortEq [0, 1, 2, 3] prop1 "append right associated" $ let stream = Stream.fromPure 0 - `Async.append` (Stream.fromPure 1 - `Async.append` (Stream.fromPure 2 - `Async.append` Stream.fromPure 3)) + `Async.append2` (Stream.fromPure 1 + `Async.append2` (Stream.fromPure 2 + `Async.append2` Stream.fromPure 3)) in cmp stream sortEq [0, 1, 2, 3] prop1 "append balanced" - $ let leaf x y = Stream.fromPure x `Async.append` Stream.fromPure y - leaf11 = leaf 0 1 `Async.append` leaf 2 (3 :: Int) - leaf12 = leaf 4 5 `Async.append` leaf 6 7 - stream = leaf11 `Async.append` leaf12 + $ let leaf x y = Stream.fromPure x `Async.append2` Stream.fromPure y + leaf11 = leaf 0 1 `Async.append2` leaf 2 (3 :: Int) + leaf12 = leaf 4 5 `Async.append2` leaf 6 7 + stream = leaf11 `Async.append2` leaf12 in cmp stream sortEq [0, 1, 2, 3, 4, 5, 6,7] prop1 "combineWith (maxThreads 1)" @@ -291,14 +289,14 @@ main = hspec in cmp stream (==) [1,2,3,4,5,6,7,8,9,10] prop1 "apply (async arg1)" - $ let s1 = Async.apply (pure (,)) (pure 1 `Async.append` pure 2) + $ let s1 = Async.apply (pure (,)) (pure 1 `Async.append2` pure 2) s2 = Async.apply s1 (pure 3) :: Stream IO (Int, Int) xs = Stream.fold Fold.toList s2 in sort <$> xs `shouldReturn` [(1, 3), (2, 3)] prop1 "apply (async arg2)" $ let s1 = pure (1,) - s2 = Async.apply s1 (pure 2 `Async.append` pure 3) + s2 = Async.apply s1 (pure 2 `Async.append2` pure 3) xs = Stream.fold Fold.toList s2 :: IO [(Int, Int)] in sort <$> xs `shouldReturn` [(1, 2), (1, 3)] @@ -319,6 +317,6 @@ main = hspec #ifdef DEVBUILD describe "Time ordering" $ timeOrdering Async.append #endif - describe "Exception propagation" $ exceptionPropagation Async.append + describe "Exception propagation" $ exceptionPropagation Async.append2 -- Ad-hoc tests it "takes n from stream of streams" $ takeCombined 2