Change the semantics of <> and <|>.

See the changelog for more details.
This commit is contained in:
Harendra Kumar 2018-04-16 23:16:26 +05:30
parent fbb187d27e
commit 69a9ae1f17
7 changed files with 519 additions and 409 deletions

View File

@ -1,6 +1,15 @@
## Unreleased ## Unreleased
### Breaking changes ### Breaking changes
* Change the semantics of the Semigroup instance for `InterleavedT`, `AsyncT`
and `ParallelT`. Now the `<>` operation interleaves two streams for
`InterleavedT` (just like the now deprecated `<=>` operation which has been
renamed to `interleave`). For `AsyncT` `<>` now concurrently merges two
streams (just like the now deprecated `<|` operation which has been renamed
to `asyncmerge`). For `ParallelT` the `<>` operation now behaves like the
earlier `Alternative` operator `<|>`.
* Change the semantics of `Alternative` instance. The `<|>` operator now has a
different behavior for each type. See the documentation for more details.
* Change the type of `foldrM` to make it consistent with `foldrM` in base * Change the type of `foldrM` to make it consistent with `foldrM` in base
### Deprecations ### Deprecations
@ -11,6 +20,8 @@
* `runZipStream` to `runZipSerial` * `runZipStream` to `runZipSerial`
* `Streaming` to `IsStream` * `Streaming` to `IsStream`
* `runStreaming` to `runStream` * `runStreaming` to `runStream`
* `<=>` to `interleave`
* `<|` to `asyncmerge`
* `each` to `fromFoldable` * `each` to `fromFoldable`
* `scan` to `scanx` * `scan` to `scanx`
* `foldl` to `foldx` * `foldl` to `foldx`

View File

@ -18,22 +18,24 @@ module Streamly
MonadAsync MonadAsync
, IsStream , IsStream
-- * Product Style Composition -- * General Stream Styles
-- $product -- $product
, SerialT , SerialT
, InterleavedT , InterleavedT
, AsyncT , AsyncT
, ParallelT , ParallelT
-- * Zip Style Composition -- * Zip Style Streams
-- $zipping -- $zipping
, ZipSerial , ZipSerial
, ZipAsync , ZipAsync
-- * Sum Style Composition -- * Type Independent Sum Operations
-- $sum -- $sum
, (<=>) , append
, (<|) , interleave
, asyncmerge
, parmerge
-- * Transformation -- * Transformation
, async , async
@ -78,6 +80,8 @@ module Streamly
, runZipStream , runZipStream
, StreamT , StreamT
, ZipStream , ZipStream
, (<=>)
, (<|)
) )
where where
@ -146,18 +150,14 @@ import Control.Monad.Trans.Class (MonadTrans (..))
-- being zipped serially whereas 'ZipAsync' produces both the elements being -- being zipped serially whereas 'ZipAsync' produces both the elements being
-- zipped concurrently. -- zipped concurrently.
-- --
-- Two streams of the same type can be combined using a sum style composition -- Two streams of the same type can be merged using a sum style composition to
-- to generate a stream of the same type where the output stream would contain -- generate a stream of the same type where the output stream would contain all
-- all elements of both the streams. However, the sequence in which the -- elements of both the streams. However, the sequence or the manner
-- elements in the resulting stream are produced depends on the combining -- (concurrent or serial) in which the elements in the resulting stream are
-- operator. Four distinct sum style operators, '<>', '<=>', '<|' and '<|>' -- produced depends on the type of the stream.
-- combine two streams in different ways, each corresponding to the one of the
-- four ways of combining monadically. See the respective section below for
-- more details.
-- --
-- Concurrent composition types 'AsyncT', 'ParallelT', 'ZipAsync' and -- Concurrent composition types 'AsyncT', 'ParallelT', 'ZipAsync' require the
-- concurrent composition operators '<|' and '<|>' require the underlying monad -- underlying monad of the streaming monad transformer to be 'MonadAsync'.
-- of the streaming monad transformer to be 'MonadAsync'.
-- --
-- For more details please see the "Streamly.Tutorial" and "Streamly.Examples" -- For more details please see the "Streamly.Tutorial" and "Streamly.Examples"
-- (the latter is available only when built with the 'examples' build flag). -- (the latter is available only when built with the 'examples' build flag).
@ -179,37 +179,11 @@ import Control.Monad.Trans.Class (MonadTrans (..))
-- not monads. -- not monads.
-- $sum -- $sum
-- -- Each stream style provides its own way of merging streams. The 'Semigroup'
-- Just like product style composition there are four distinct ways to combine -- '<>' operation can be used to merge streams in the style of the current
-- streams in sum style each directly corresponding to one of the product style -- type. In case you want to merge streams in a particular style you can either
-- composition. -- use a type adapter to force that type of composition or alternatively use
-- -- the type independent merge operations described in this section.
-- The standard semigroup append '<>' operator appends two streams serially,
-- this style corresponds to the 'SerialT' style of monadic composition.
--
-- @
-- main = ('toList' . 'serially' $ (return 1 <> return 2) <> (return 3 <> return 4)) >>= print
-- @
-- @
-- [1,2,3,4]
-- @
--
-- The standard 'Alternative' operator '<|>' fairly interleaves two streams in
-- parallel, this operator corresponds to the 'ParallelT' style.
--
-- @
-- main = ('toList' . 'serially' $ (return 1 <> return 2) \<|\> (return 3 <> return 4)) >>= print
-- @
-- @
-- [1,3,2,4]
-- @
--
-- Unlike '<|', this operator cannot be used to fold infinite containers since
-- that might accumulate too many partially drained streams. To be clear, it
-- can combine infinite streams but not infinite number of streams.
--
-- Two additional sum style composition operators that streamly introduces are
-- described below.
-- $adapters -- $adapters
-- --

View File

@ -24,12 +24,18 @@ module Streamly.Core
, Stream (..) , Stream (..)
-- * Construction -- * Construction
, scons , cons
, srepeat , repeat
, snil , nil
-- * Composition -- * Semigroup Style Composition
, append
, interleave , interleave
, asyncmerge
, parmerge
-- * Alternative
, alt
-- * Concurrent Stream Vars (SVars) -- * Concurrent Stream Vars (SVars)
, SVar , SVar
@ -42,10 +48,6 @@ module Streamly.Core
, joinStreamVar2 , joinStreamVar2
, fromStreamVar , fromStreamVar
, toStreamVar , toStreamVar
-- * Concurrent Streams
, parAlt
, parLeft
) )
where where
@ -76,6 +78,7 @@ import Data.Maybe (isNothing)
import Data.Semigroup (Semigroup(..)) import Data.Semigroup (Semigroup(..))
import Data.Set (Set) import Data.Set (Set)
import qualified Data.Set as S import qualified Data.Set as S
import Prelude hiding (repeat)
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- Parent child thread communication type -- Parent child thread communication type
@ -215,14 +218,14 @@ newtype Stream m a =
-- 'MonadAsync'. -- 'MonadAsync'.
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
scons :: a -> Maybe (Stream m a) -> Stream m a cons :: a -> Maybe (Stream m a) -> Stream m a
scons a r = Stream $ \_ _ yld -> yld a r cons a r = Stream $ \_ _ yld -> yld a r
srepeat :: a -> Stream m a repeat :: a -> Stream m a
srepeat a = let x = scons a (Just x) in x repeat a = let x = cons a (Just x) in x
snil :: Stream m a nil :: Stream m a
snil = Stream $ \_ stp _ -> stp nil = Stream $ \_ stp _ -> stp
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- Composing streams -- Composing streams
@ -237,10 +240,10 @@ snil = Stream $ \_ stp _ -> stp
-- Semigroup -- Semigroup
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- | '<>' concatenates two streams sequentially i.e. the first stream is -- | Concatenates two streams sequentially i.e. the first stream is
-- exhausted completely before yielding any element from the second stream. -- exhausted completely before yielding any element from the second stream.
instance Semigroup (Stream m a) where append :: Stream m a -> Stream m a -> Stream m a
m1 <> m2 = go m1 append m1 m2 = go m1
where where
go (Stream m) = Stream $ \_ stp yld -> go (Stream m) = Stream $ \_ stp yld ->
let stop = (runStream m2) Nothing stp yld let stop = (runStream m2) Nothing stp yld
@ -248,19 +251,21 @@ instance Semigroup (Stream m a) where
yield a (Just r) = yld a (Just (go r)) yield a (Just r) = yld a (Just (go r))
in m Nothing stop yield in m Nothing stop yield
instance Semigroup (Stream m a) where
(<>) = append
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- Monoid -- Monoid
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
instance Monoid (Stream m a) where instance Monoid (Stream m a) where
mempty = Stream $ \_ stp _ -> stp mempty = nil
mappend = (<>) mappend = (<>)
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- Interleave -- Interleave
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- | Same as '<=>'.
interleave :: Stream m a -> Stream m a -> Stream m a interleave :: Stream m a -> Stream m a -> Stream m a
interleave m1 m2 = Stream $ \_ stp yld -> do interleave m1 m2 = Stream $ \_ stp yld -> do
let stop = (runStream m2) Nothing stp yld let stop = (runStream m2) Nothing stp yld
@ -603,32 +608,13 @@ joinStreamVar2 style m1 m2 = Stream $ \st stp yld ->
-- Semigroup and Monoid style compositions for parallel actions -- Semigroup and Monoid style compositions for parallel actions
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
{- {-# INLINE asyncmerge #-}
-- | Same as '<>|'. asyncmerge :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
parAhead :: Stream m a -> Stream m a -> Stream m a asyncmerge = joinStreamVar2 (SVarStyle Disjunction LIFO)
parAhead = undefined
-- | Sequential composition similar to '<>' except that it can execute the {-# INLINE parmerge #-}
-- action on the right in parallel ahead of time. Returns the results in parmerge :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
-- sequential order like '<>' from left to right. parmerge = joinStreamVar2 (SVarStyle Disjunction FIFO)
(<>|) :: Stream m a -> Stream m a -> Stream m a
(<>|) = parAhead
-}
-- | Same as '<|>'. Since this schedules all the composed streams fairly you
-- cannot fold infinite number of streams using this operation.
{-# INLINE parAlt #-}
parAlt :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
parAlt = joinStreamVar2 (SVarStyle Disjunction FIFO)
-- | Same as '<|'. Since this schedules the left side computation first you can
-- right fold an infinite container using this operator. However a left fold
-- will not work well as it first unpeels the whole structure before scheduling
-- a computation requiring an amount of memory proportional to the size of the
-- structure.
{-# INLINE parLeft #-}
parLeft :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
parLeft = joinStreamVar2 (SVarStyle Disjunction LIFO)
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
-- Instances (only used for deriving newtype instances) -- Instances (only used for deriving newtype instances)
@ -655,17 +641,18 @@ instance Monad m => Monad (Stream m) where
-- Alternative & MonadPlus -- Alternative & MonadPlus
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- | `empty` represents an action that takes non-zero time to complete. Since alt :: Stream m a -> Stream m a -> Stream m a
-- all actions take non-zero time, an `Alternative` composition ('<|>') is a alt m1 m2 = Stream $ \_ stp yld ->
-- monoidal composition executing all actions in parallel, it is similar to let stop = runStream m2 Nothing stp yld
-- '<>' except that it runs all the actions in parallel and interleaves their yield = yld
-- results fairly. in runStream m1 Nothing stop yield
instance MonadAsync m => Alternative (Stream m) where instance MonadAsync m => Alternative (Stream m) where
empty = mempty empty = nil
(<|>) = parAlt (<|>) = alt
instance MonadAsync m => MonadPlus (Stream m) where instance MonadAsync m => MonadPlus (Stream m) where
mzero = empty mzero = nil
mplus = (<|>) mplus = (<|>)
------------------------------------------------------------------------------- -------------------------------------------------------------------------------

View File

@ -100,8 +100,9 @@ import Prelude hiding (filter, drop, dropWhile, take,
import qualified Prelude import qualified Prelude
import qualified System.IO as IO import qualified System.IO as IO
import Streamly.Core import qualified Streamly.Core as S
import Streamly.Streams hiding (runStream) import Streamly.Core (Stream(Stream))
import Streamly.Streams
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- Construction -- Construction
@ -141,7 +142,7 @@ each = fromFoldable
iterate :: IsStream t => (a -> a) -> a -> t m a iterate :: IsStream t => (a -> a) -> a -> t m a
iterate step = fromStream . go iterate step = fromStream . go
where where
go s = scons s (Just (go (step s))) go s = S.cons s (Just (go (step s)))
-- | Iterate a monadic function from a seed value, streaming the results forever -- | Iterate a monadic function from a seed value, streaming the results forever
iterateM :: (IsStream t, Monad m) => (a -> m a) -> a -> t m a iterateM :: (IsStream t, Monad m) => (a -> m a) -> a -> t m a
@ -180,7 +181,7 @@ foldr step acc m = go (toStream m)
let stop = return acc let stop = return acc
yield a Nothing = return (step a acc) yield a Nothing = return (step a acc)
yield a (Just x) = go x >>= \b -> return (step a b) yield a (Just x) = go x >>= \b -> return (step a b)
in (runStream m1) Nothing stop yield in (S.runStream m1) Nothing stop yield
-- | Lazy right fold with a monadic step function. For example, to fold a -- | Lazy right fold with a monadic step function. For example, to fold a
-- stream into a list: -- stream into a list:
@ -197,7 +198,7 @@ foldrM step acc m = go (toStream m)
let stop = return acc let stop = return acc
yield a Nothing = step a acc yield a Nothing = step a acc
yield a (Just x) = (go x) >>= (step a) yield a (Just x) = (go x) >>= (step a)
in (runStream m1) Nothing stop yield in (S.runStream m1) Nothing stop yield
-- | Strict left scan with an extraction function. Like 'scanl'', but applies a -- | Strict left scan with an extraction function. Like 'scanl'', but applies a
-- user supplied extraction function (the third argument) at each step. This is -- user supplied extraction function (the third argument) at each step. This is
@ -213,7 +214,7 @@ scanx step begin done m = cons (done begin) $ fromStream $ go (toStream m) begin
yield a (Just x) = yield a (Just x) =
let s = step acc a let s = step acc a
in yld (done s) (Just (go x s)) in yld (done s) (Just (go x s))
in runStream m1 Nothing stop yield in S.runStream m1 Nothing stop yield
{-# DEPRECATED scan "Please use scanx instead." #-} {-# DEPRECATED scan "Please use scanx instead." #-}
scan :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b scan :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
@ -240,7 +241,7 @@ foldx step begin done m = get $ go (toStream m) begin
get m1 = get m1 =
let yield a Nothing = return $ done a let yield a Nothing = return $ done a
yield _ _ = undefined yield _ _ = undefined
in (runStream m1) Nothing undefined yield in (S.runStream m1) Nothing undefined yield
-- Note, this can be implemented by making a recursive call to "go", -- Note, this can be implemented by making a recursive call to "go",
-- however that is more expensive because of unnecessary recursion -- however that is more expensive because of unnecessary recursion
@ -252,8 +253,8 @@ foldx step begin done m = get $ go (toStream m) begin
let s = step acc a let s = step acc a
in case r of in case r of
Nothing -> yld s Nothing Nothing -> yld s Nothing
Just x -> (runStream (go x s)) Nothing undefined yld Just x -> (S.runStream (go x s)) Nothing undefined yld
in (runStream m1) Nothing stop yield in (S.runStream m1) Nothing stop yield
{-# DEPRECATED foldl "Please use foldx instead." #-} {-# DEPRECATED foldl "Please use foldx instead." #-}
foldl :: (IsStream t, Monad m) foldl :: (IsStream t, Monad m)
@ -275,7 +276,7 @@ foldxM step begin done m = go begin (toStream m)
let stop = acc >>= done let stop = acc >>= done
yield a Nothing = acc >>= \b -> step b a >>= done yield a Nothing = acc >>= \b -> step b a >>= done
yield a (Just x) = acc >>= \b -> go (step b a) x yield a (Just x) = acc >>= \b -> go (step b a) x
in (runStream m1) Nothing stop yield in (S.runStream m1) Nothing stop yield
{-# DEPRECATED foldlM "Please use foldxM instead." #-} {-# DEPRECATED foldlM "Please use foldxM instead." #-}
foldlM :: (IsStream t, Monad m) foldlM :: (IsStream t, Monad m)
@ -294,7 +295,7 @@ uncons m =
let stop = return Nothing let stop = return Nothing
yield a Nothing = return (Just (a, nil)) yield a Nothing = return (Just (a, nil))
yield a (Just x) = return (Just (a, fromStream x)) yield a (Just x) = return (Just (a, fromStream x))
in (runStream (toStream m)) Nothing stop yield in (S.runStream (toStream m)) Nothing stop yield
-- | Write a stream of Strings to an IO Handle. -- | Write a stream of Strings to an IO Handle.
toHandle :: (IsStream t, MonadIO m) => IO.Handle -> t m String -> m () toHandle :: (IsStream t, MonadIO m) => IO.Handle -> t m String -> m ()
@ -304,7 +305,7 @@ toHandle h m = go (toStream m)
let stop = return () let stop = return ()
yield a Nothing = liftIO (IO.hPutStrLn h a) yield a Nothing = liftIO (IO.hPutStrLn h a)
yield a (Just x) = liftIO (IO.hPutStrLn h a) >> go x yield a (Just x) = liftIO (IO.hPutStrLn h a) >> go x
in (runStream m1) Nothing stop yield in (S.runStream m1) Nothing stop yield
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- Special folds -- Special folds
@ -323,7 +324,7 @@ take n m = fromStream $ go n (toStream m)
go n1 m1 = Stream $ \ctx stp yld -> go n1 m1 = Stream $ \ctx stp yld ->
let yield a Nothing = yld a Nothing let yield a Nothing = yld a Nothing
yield a (Just x) = yld a (Just (go (n1 - 1) x)) yield a (Just x) = yld a (Just (go (n1 - 1) x))
in if n1 <= 0 then stp else (runStream m1) ctx stp yield in if n1 <= 0 then stp else (S.runStream m1) ctx stp yield
-- | Include only those elements that pass a predicate. -- | Include only those elements that pass a predicate.
{-# INLINE filter #-} {-# INLINE filter #-}
@ -334,8 +335,8 @@ filter p m = fromStream $ go (toStream m)
let yield a Nothing | p a = yld a Nothing let yield a Nothing | p a = yld a Nothing
| otherwise = stp | otherwise = stp
yield a (Just x) | p a = yld a (Just (go x)) yield a (Just x) | p a = yld a (Just (go x))
| otherwise = (runStream x) ctx stp yield | otherwise = (S.runStream x) ctx stp yield
in (runStream m1) ctx stp yield in (S.runStream m1) ctx stp yield
-- | End the stream as soon as the predicate fails on an element. -- | End the stream as soon as the predicate fails on an element.
{-# INLINE takeWhile #-} {-# INLINE takeWhile #-}
@ -347,7 +348,7 @@ takeWhile p m = fromStream $ go (toStream m)
| otherwise = stp | otherwise = stp
yield a (Just x) | p a = yld a (Just (go x)) yield a (Just x) | p a = yld a (Just (go x))
| otherwise = stp | otherwise = stp
in (runStream m1) ctx stp yield in (S.runStream m1) ctx stp yield
-- | Discard first 'n' elements from the stream and take the rest. -- | Discard first 'n' elements from the stream and take the rest.
drop :: IsStream t => Int -> t m a -> t m a drop :: IsStream t => Int -> t m a -> t m a
@ -355,11 +356,11 @@ drop n m = fromStream $ go n (toStream m)
where where
go n1 m1 = Stream $ \ctx stp yld -> go n1 m1 = Stream $ \ctx stp yld ->
let yield _ Nothing = stp let yield _ Nothing = stp
yield _ (Just x) = (runStream $ go (n1 - 1) x) ctx stp yld yield _ (Just x) = (S.runStream $ go (n1 - 1) x) ctx stp yld
-- Somehow "<=" check performs better than a ">" -- Somehow "<=" check performs better than a ">"
in if n1 <= 0 in if n1 <= 0
then (runStream m1) ctx stp yld then (S.runStream m1) ctx stp yld
else (runStream m1) ctx stp yield else (S.runStream m1) ctx stp yield
-- | Drop elements in the stream as long as the predicate succeeds and then -- | Drop elements in the stream as long as the predicate succeeds and then
-- take the rest of the stream. -- take the rest of the stream.
@ -370,9 +371,9 @@ dropWhile p m = fromStream $ go (toStream m)
go m1 = Stream $ \ctx stp yld -> go m1 = Stream $ \ctx stp yld ->
let yield a Nothing | p a = stp let yield a Nothing | p a = stp
| otherwise = yld a Nothing | otherwise = yld a Nothing
yield a (Just x) | p a = (runStream x) ctx stp yield yield a (Just x) | p a = (S.runStream x) ctx stp yield
| otherwise = yld a (Just x) | otherwise = yld a (Just x)
in (runStream m1) ctx stp yield in (S.runStream m1) ctx stp yield
-- | Determine whether all elements of a stream satisfy a predicate. -- | Determine whether all elements of a stream satisfy a predicate.
all :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool all :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool
@ -383,7 +384,7 @@ all p m = go (toStream m)
| otherwise = return False | otherwise = return False
yield a (Just x) | p a = go x yield a (Just x) | p a = go x
| otherwise = return False | otherwise = return False
in (runStream m1) Nothing (return True) yield in (S.runStream m1) Nothing (return True) yield
-- | Determine whether any of the elements of a stream satisfy a predicate. -- | Determine whether any of the elements of a stream satisfy a predicate.
any :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool any :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool
@ -394,7 +395,7 @@ any p m = go (toStream m)
| otherwise = return False | otherwise = return False
yield a (Just x) | p a = return True yield a (Just x) | p a = return True
| otherwise = go x | otherwise = go x
in (runStream m1) Nothing (return False) yield in (S.runStream m1) Nothing (return False) yield
-- | Determine the sum of all elements of a stream of numbers -- | Determine the sum of all elements of a stream of numbers
sum :: (IsStream t, Monad m, Num a) => t m a -> m a sum :: (IsStream t, Monad m, Num a) => t m a -> m a
@ -409,7 +410,7 @@ head :: (IsStream t, Monad m) => t m a -> m (Maybe a)
head m = head m =
let stop = return Nothing let stop = return Nothing
yield a _ = return (Just a) yield a _ = return (Just a)
in (runStream (toStream m)) Nothing stop yield in (S.runStream (toStream m)) Nothing stop yield
-- | Extract all but the first element of the stream, if any. -- | Extract all but the first element of the stream, if any.
tail :: (IsStream t, Monad m) => t m a -> m (Maybe (t m a)) tail :: (IsStream t, Monad m) => t m a -> m (Maybe (t m a))
@ -417,7 +418,7 @@ tail m =
let stop = return Nothing let stop = return Nothing
yield _ Nothing = return $ Just nil yield _ Nothing = return $ Just nil
yield _ (Just t) = return $ Just $ fromStream t yield _ (Just t) = return $ Just $ fromStream t
in (runStream (toStream m)) Nothing stop yield in (S.runStream (toStream m)) Nothing stop yield
-- | Extract the last element of the stream, if any. -- | Extract the last element of the stream, if any.
{-# INLINE last #-} {-# INLINE last #-}
@ -429,7 +430,7 @@ null :: (IsStream t, Monad m) => t m a -> m Bool
null m = null m =
let stop = return True let stop = return True
yield _ _ = return False yield _ _ = return False
in (runStream (toStream m)) Nothing stop yield in (S.runStream (toStream m)) Nothing stop yield
-- | Determine whether an element is present in the stream. -- | Determine whether an element is present in the stream.
elem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool elem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool
@ -439,7 +440,7 @@ elem e m = go (toStream m)
let stop = return False let stop = return False
yield a Nothing = return (a == e) yield a Nothing = return (a == e)
yield a (Just x) = if a == e then return True else go x yield a (Just x) = if a == e then return True else go x
in (runStream m1) Nothing stop yield in (S.runStream m1) Nothing stop yield
-- | Determine whether an element is not present in the stream. -- | Determine whether an element is not present in the stream.
notElem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool notElem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool
@ -449,7 +450,7 @@ notElem e m = go (toStream m)
let stop = return True let stop = return True
yield a Nothing = return (a /= e) yield a Nothing = return (a /= e)
yield a (Just x) = if a == e then return False else go x yield a (Just x) = if a == e then return False else go x
in (runStream m1) Nothing stop yield in (S.runStream m1) Nothing stop yield
-- | Determine the length of the stream. -- | Determine the length of the stream.
length :: (IsStream t, Monad m) => t m a -> m Int length :: (IsStream t, Monad m) => t m a -> m Int
@ -463,10 +464,10 @@ reverse m = fromStream $ go Nothing (toStream m)
go rev rest = Stream $ \svr stp yld -> go rev rest = Stream $ \svr stp yld ->
let stop = case rev of let stop = case rev of
Nothing -> stp Nothing -> stp
Just str -> runStream str svr stp yld Just str -> S.runStream str svr stp yld
yield a Nothing = runStream (a `scons` rev) svr stp yld yield a Nothing = S.runStream (a `S.cons` rev) svr stp yld
yield a (Just x) = runStream (go (Just $ a `scons` rev) x) svr stp yld yield a (Just x) = S.runStream (go (Just $ a `S.cons` rev) x) svr stp yld
in runStream rest svr stop yield in S.runStream rest svr stop yield
-- XXX replace the recursive "go" with continuation -- XXX replace the recursive "go" with continuation
-- | Determine the minimum element in a stream. -- | Determine the minimum element in a stream.
@ -477,7 +478,7 @@ minimum m = go Nothing (toStream m)
let stop = return r let stop = return r
yield a Nothing = return $ min_ a r yield a Nothing = return $ min_ a r
yield a (Just x) = go (min_ a r) x yield a (Just x) = go (min_ a r) x
in (runStream m1) Nothing stop yield in (S.runStream m1) Nothing stop yield
min_ a r = case r of min_ a r = case r of
Nothing -> Just a Nothing -> Just a
@ -492,7 +493,7 @@ maximum m = go Nothing (toStream m)
let stop = return r let stop = return r
yield a Nothing = return $ max_ a r yield a Nothing = return $ max_ a r
yield a (Just x) = go (max_ a r) x yield a (Just x) = go (max_ a r) x
in (runStream m1) Nothing stop yield in (S.runStream m1) Nothing stop yield
max_ a r = case r of max_ a r = case r of
Nothing -> Just a Nothing -> Just a
@ -514,7 +515,7 @@ mapM f m = fromStream $ go (toStream m)
let stop = stp let stop = stp
yield a Nothing = f a >>= \b -> yld b Nothing yield a Nothing = f a >>= \b -> yld b Nothing
yield a (Just x) = f a >>= \b -> yld b (Just (go x)) yield a (Just x) = f a >>= \b -> yld b (Just (go x))
in (runStream m1) Nothing stop yield in (S.runStream m1) Nothing stop yield
-- | Apply a monadic action to each element of the stream and discard the -- | Apply a monadic action to each element of the stream and discard the
-- output of the action. -- output of the action.
@ -525,7 +526,7 @@ mapM_ f m = go (toStream m)
let stop = return () let stop = return ()
yield a Nothing = void (f a) yield a Nothing = void (f a)
yield a (Just x) = f a >> go x yield a (Just x) = f a >> go x
in (runStream m1) Nothing stop yield in (S.runStream m1) Nothing stop yield
-- | Reduce a stream of monadic actions to a stream of the output of those -- | Reduce a stream of monadic actions to a stream of the output of those
-- actions. -- actions.
@ -536,7 +537,7 @@ sequence m = fromStream $ go (toStream m)
let stop = stp let stop = stp
yield a Nothing = a >>= \b -> yld b Nothing yield a Nothing = a >>= \b -> yld b Nothing
yield a (Just x) = a >>= \b -> yld b (Just (go x)) yield a (Just x) = a >>= \b -> yld b (Just (go x))
in (runStream m1) Nothing stop yield in (S.runStream m1) Nothing stop yield
-- | Generate a stream by performing an action @n@ times. -- | Generate a stream by performing an action @n@ times.
replicateM :: (IsStream t, Monad m) => Int -> m a -> t m a replicateM :: (IsStream t, Monad m) => Int -> m a -> t m a
@ -557,13 +558,13 @@ zipWithM f m1 m2 = fromStream $ go (toStream m1) (toStream m2)
where where
go mx my = Stream $ \_ stp yld -> do go mx my = Stream $ \_ stp yld -> do
let merge a ra = let merge a ra =
let yield2 b Nothing = (runStream (g a b)) Nothing stp yld let yield2 b Nothing = (S.runStream (g a b)) Nothing stp yld
yield2 b (Just rb) = yield2 b (Just rb) =
(runStream (g a b <> go ra rb)) Nothing stp yld (S.runStream (g a b <> go ra rb)) Nothing stp yld
in (runStream my) Nothing stp yield2 in (S.runStream my) Nothing stp yield2
let yield1 a Nothing = merge a snil let yield1 a Nothing = merge a S.nil
yield1 a (Just ra) = merge a ra yield1 a (Just ra) = merge a ra
(runStream mx) Nothing stp yield1 (S.runStream mx) Nothing stp yield1
g a b = toStream $ f a b g a b = toStream $ f a b
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
@ -577,4 +578,4 @@ zipAsyncWithM :: (IsStream t, MonadAsync m)
zipAsyncWithM f m1 m2 = fromStream $ Stream $ \_ stp yld -> do zipAsyncWithM f m1 m2 = fromStream $ Stream $ \_ stp yld -> do
ma <- async m1 ma <- async m1
mb <- async m2 mb <- async m2
(runStream (toStream (zipWithM f ma mb))) Nothing stp yld (S.runStream (toStream (zipWithM f ma mb))) Nothing stp yld

View File

@ -28,7 +28,7 @@ module Streamly.Streams
, SVarTag (..) , SVarTag (..)
, SVarStyle (..) , SVarStyle (..)
, SVar , SVar
, newEmptySVar , S.newEmptySVar
-- * Construction -- * Construction
, nil , nil
@ -47,6 +47,14 @@ module Streamly.Streams
-- * Transformation -- * Transformation
, async , async
-- * Merging Streams
, append
, interleave
, asyncmerge
, parmerge
, (<=>) --deprecated
, (<|) --deprecated
-- * Stream Styles -- * Stream Styles
, SerialT , SerialT
, StreamT -- deprecated , StreamT -- deprecated
@ -80,10 +88,6 @@ module Streamly.Streams
, zipWith , zipWith
, zipAsyncWith , zipAsyncWith
-- * Sum Style Composition
, (<=>)
, (<|)
-- * Fold Utilities -- * Fold Utilities
-- $foldutils -- $foldutils
, foldWith , foldWith
@ -103,8 +107,10 @@ import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans) import Control.Monad.Trans.Class (MonadTrans)
import Data.Semigroup (Semigroup(..)) import Data.Semigroup (Semigroup(..))
import Prelude hiding (zipWith) import Prelude hiding (zipWith)
import Streamly.Core hiding (runStream) import Streamly.Core ( MonadAsync, Stream(Stream)
import qualified Streamly.Core as SC , SVar, SVarStyle(..)
, SVarTag(..), SVarSched(..))
import qualified Streamly.Core as S
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- Types that can behave as a Stream -- Types that can behave as a Stream
@ -126,7 +132,7 @@ type Streaming = IsStream
-- | Represesnts an empty stream just like @[]@ represents an empty list. -- | Represesnts an empty stream just like @[]@ represents an empty list.
nil :: IsStream t => t m a nil :: IsStream t => t m a
nil = fromStream snil nil = fromStream S.nil
infixr 5 `cons` infixr 5 `cons`
@ -139,7 +145,7 @@ infixr 5 `cons`
-- [1,2,3] -- [1,2,3]
-- @ -- @
cons :: IsStream t => a -> t m a -> t m a cons :: IsStream t => a -> t m a -> t m a
cons a r = fromStream $ scons a (Just (toStream r)) cons a r = fromStream $ S.cons a (Just (toStream r))
infixr 5 .: infixr 5 .:
@ -194,7 +200,7 @@ fromCallback k = fromStream $ Stream $ \_ _ yld -> k (\a -> yld a Nothing)
-- | Read an SVar to get a stream. -- | Read an SVar to get a stream.
fromSVar :: (MonadAsync m, IsStream t) => SVar m a -> t m a fromSVar :: (MonadAsync m, IsStream t) => SVar m a -> t m a
fromSVar sv = fromStream $ fromStreamVar sv fromSVar sv = fromStream $ S.fromStreamVar sv
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- Destroying a stream -- Destroying a stream
@ -208,7 +214,7 @@ streamFold :: IsStream t
streamFold sv step blank m = streamFold sv step blank m =
let yield a Nothing = step a Nothing let yield a Nothing = step a Nothing
yield a (Just x) = step a (Just (fromStream x)) yield a (Just x) = step a (Just (fromStream x))
in (SC.runStream (toStream m)) sv blank yield in (S.runStream (toStream m)) sv blank yield
-- | Run a streaming composition, discard the results. -- | Run a streaming composition, discard the results.
runStream :: (Monad m, IsStream t) => t m a -> m () runStream :: (Monad m, IsStream t) => t m a -> m ()
@ -218,7 +224,7 @@ runStream m = go (toStream m)
let stop = return () let stop = return ()
yield _ Nothing = stop yield _ Nothing = stop
yield _ (Just x) = go x yield _ (Just x) = go x
in (SC.runStream m1) Nothing stop yield in (S.runStream m1) Nothing stop yield
-- | Same as 'runStream' -- | Same as 'runStream'
{-# Deprecated runStreaming "Please use runStream instead." #-} {-# Deprecated runStreaming "Please use runStream instead." #-}
@ -228,7 +234,7 @@ runStreaming = runStream
-- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then -- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then
-- be read back from the SVar using 'fromSVar'. -- be read back from the SVar using 'fromSVar'.
toSVar :: (IsStream t, MonadAsync m) => SVar m a -> t m a -> m () toSVar :: (IsStream t, MonadAsync m) => SVar m a -> t m a -> m ()
toSVar sv m = toStreamVar sv (toStream m) toSVar sv m = S.toStreamVar sv (toStream m)
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- Transformation -- Transformation
@ -243,14 +249,25 @@ toSVar sv m = toStreamVar sv (toStream m)
async :: (IsStream t, MonadAsync m) => t m a -> m (t m a) async :: (IsStream t, MonadAsync m) => t m a -> m (t m a)
async m = do async m = do
sv <- newStreamVar1 (SVarStyle Disjunction LIFO) (toStream m) sv <- S.newStreamVar1 (SVarStyle Disjunction LIFO) (toStream m)
return $ fromSVar sv return $ fromSVar sv
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- SerialT -- SerialT
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- | The 'Monad' instance of 'SerialT' runs the /monadic continuation/ for each -- | The 'Semigroup' instance of 'SerialT' appends two streams sequentially,
-- yielding all elements from the first stream, and then all elements from the
-- second stream.
--
-- @
-- main = ('toList' . 'serially' $ (fromFoldable [1,2]) \<\> (fromFoldable [3,4])) >>= print
-- @
-- @
-- [1,2,3,4]
-- @
--
-- The 'Monad' instance runs the /monadic continuation/ for each
-- element of the stream, serially. -- element of the stream, serially.
-- --
-- @ -- @
@ -278,11 +295,14 @@ async m = do
-- (2,4) -- (2,4)
-- @ -- @
-- --
-- This behavior is exactly like a list transformer. We call the monadic code -- This behavior of 'SerialT' is exactly like a list transformer. We call the
-- being run for each element of the stream a monadic continuation. In -- monadic code being run for each element of the stream a monadic
-- imperative paradigm we can think of this composition as nested @for@ loops -- continuation. In imperative paradigm we can think of this composition as
-- and the monadic continuation is the body of the loop. The loop iterates for -- nested @for@ loops and the monadic continuation is the body of the loop. The
-- all elements of the stream. -- loop iterates for all elements of the stream.
--
-- Note that serial composition can be used to combine an infinite number of
-- streams as it explores only one stream at a time.
-- --
newtype SerialT m a = SerialT {getSerialT :: Stream m a} newtype SerialT m a = SerialT {getSerialT :: Stream m a}
deriving (Semigroup, Monoid, MonadTrans, MonadIO, MonadThrow) deriving (Semigroup, Monoid, MonadTrans, MonadIO, MonadThrow)
@ -307,6 +327,17 @@ type StreamT = SerialT
-- from a single base type because they depend on the Monad instance which is -- from a single base type because they depend on the Monad instance which is
-- different for each type. -- different for each type.
------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------
-- | Same as the 'Semigroup' instance of 'SerialT'. Appends two streams
-- sequentially, yielding all elements from the first stream, and then all
-- elements from the second stream.
{-# INLINE append #-}
append :: IsStream t => t m a -> t m a -> t m a
append m1 m2 = fromStream $ S.append (toStream m1) (toStream m2)
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- Monad -- Monad
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
@ -314,10 +345,9 @@ type StreamT = SerialT
instance Monad m => Monad (SerialT m) where instance Monad m => Monad (SerialT m) where
return = pure return = pure
(SerialT (Stream m)) >>= f = SerialT $ Stream $ \_ stp yld -> (SerialT (Stream m)) >>= f = SerialT $ Stream $ \_ stp yld ->
let run x = (SC.runStream x) Nothing stp yld let run x = (S.runStream x) Nothing stp yld
yield a Nothing = run $ getSerialT (f a) yield a Nothing = run $ toStream (f a)
yield a (Just r) = run $ getSerialT (f a) yield a (Just r) = run $ toStream $ f a <> (fromStream r >>= f)
<> getSerialT (SerialT r >>= f)
in m Nothing stp yield in m Nothing stp yield
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
@ -325,7 +355,7 @@ instance Monad m => Monad (SerialT m) where
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
instance Monad m => Applicative (SerialT m) where instance Monad m => Applicative (SerialT m) where
pure a = SerialT $ scons a Nothing pure a = SerialT $ S.cons a Nothing
(<*>) = ap (<*>) = ap
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
@ -387,9 +417,18 @@ instance (Monad m, Floating a) => Floating (SerialT m a) where
-- InterleavedT -- InterleavedT
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- | Like 'SerialT' but different in nesting behavior. It fairly interleaves -- | The 'Semigroup' instance of 'InterleavedT' interleaves two streams,
-- the iterations of the inner and the outer loop, nesting loops in a breadth -- yielding one element from each stream alternately.
-- first manner. --
-- @
-- main = ('toList' . 'interleaving $ (fromFoldable [1,2]) \<\> (fromFoldable [3,4])) >>= print
-- @
-- @
-- [1,3,2,4]
-- @
--
-- Similarly, the 'Monad' instance fairly interleaves the iterations of the
-- inner and the outer loop, nesting loops in a breadth first manner.
-- --
-- --
-- @ -- @
@ -405,8 +444,11 @@ instance (Monad m, Floating a) => Floating (SerialT m a) where
-- (2,4) -- (2,4)
-- @ -- @
-- --
-- Note that interleaving composition can only combine a finite number of
-- streams as it needs to retain state for each unfinished stream.
--
newtype InterleavedT m a = InterleavedT {getInterleavedT :: Stream m a} newtype InterleavedT m a = InterleavedT {getInterleavedT :: Stream m a}
deriving (Semigroup, Monoid, MonadTrans, MonadIO, MonadThrow) deriving (Monoid, MonadTrans, MonadIO, MonadThrow)
deriving instance MonadAsync m => Alternative (InterleavedT m) deriving instance MonadAsync m => Alternative (InterleavedT m)
deriving instance MonadAsync m => MonadPlus (InterleavedT m) deriving instance MonadAsync m => MonadPlus (InterleavedT m)
@ -419,14 +461,37 @@ instance IsStream InterleavedT where
toStream = getInterleavedT toStream = getInterleavedT
fromStream = InterleavedT fromStream = InterleavedT
------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------
-- | Same as the 'Semigroup' instance of 'InterleavedT'. Interleaves two
-- streams, yielding one element from each stream alternately.
{-# INLINE interleave #-}
interleave :: IsStream t => t m a -> t m a -> t m a
interleave m1 m2 = fromStream $ S.interleave (toStream m1) (toStream m2)
instance Semigroup (InterleavedT m a) where
(<>) = interleave
infixr 5 <=>
-- | Same as 'interleave'.
{-# Deprecated (<=>) "Please use '<>' of InterleavedT or 'interleave' instead." #-}
{-# INLINE (<=>) #-}
(<=>) :: IsStream t => t m a -> t m a -> t m a
(<=>) = interleave
------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------
instance Monad m => Monad (InterleavedT m) where instance Monad m => Monad (InterleavedT m) where
return = pure return = pure
(InterleavedT (Stream m)) >>= f = InterleavedT $ Stream $ \_ stp yld -> (InterleavedT (Stream m)) >>= f = InterleavedT $ Stream $ \_ stp yld ->
let run x = (SC.runStream x) Nothing stp yld let run x = (S.runStream x) Nothing stp yld
yield a Nothing = run $ getInterleavedT (f a) yield a Nothing = run $ toStream (f a)
yield a (Just r) = run $ getInterleavedT (f a) yield a (Just r) = run $ toStream $ f a <> (fromStream r >>= f)
`interleave`
getInterleavedT (InterleavedT r >>= f)
in m Nothing stp yield in m Nothing stp yield
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
@ -434,7 +499,7 @@ instance Monad m => Monad (InterleavedT m) where
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
instance Monad m => Applicative (InterleavedT m) where instance Monad m => Applicative (InterleavedT m) where
pure a = InterleavedT $ scons a Nothing pure a = InterleavedT $ S.cons a Nothing
(<*>) = ap (<*>) = ap
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
@ -496,9 +561,24 @@ instance (Monad m, Floating a) => Floating (InterleavedT m a) where
-- AsyncT -- AsyncT
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- | Like 'SerialT' but /may/ run each iteration concurrently using demand -- | Left biased concurrent composition.
-- driven concurrency. More concurrent iterations are started only if the --
-- previous iterations are not able to produce enough output for the consumer. -- The Semigroup instance of 'AsyncT' concurrently /merges/ streams in a left
-- biased manner. It keeps yielding elements from the left stream as long as
-- it can. If the left stream blocks or cannot keep up with the pace of the
-- consumer it can concurrently yield from the stream on the right as well.
--
-- @
-- main = ('toList' . 'asyncly' $ (fromFoldable [1,2]) \<> (fromFoldable [3,4])) >>= print
-- @
-- @
-- [1,2,3,4]
-- @
--
-- Similarly, the monad instance of 'AsyncT' /may/ run each iteration
-- concurrently using demand driven concurrency. More concurrent iterations
-- are started only if the previous iterations are not able to produce enough
-- output for the consumer.
-- --
-- @ -- @
-- import "Streamly" -- import "Streamly"
@ -517,8 +597,12 @@ instance (Monad m, Floating a) => Floating (InterleavedT m a) where
-- @ -- @
-- --
-- All iterations may run in the same thread if they do not block. -- All iterations may run in the same thread if they do not block.
--
-- Note that this composition can be used to combine infinite number of streams
-- as it explores only a bounded number of streams at a time.
--
newtype AsyncT m a = AsyncT {getAsyncT :: Stream m a} newtype AsyncT m a = AsyncT {getAsyncT :: Stream m a}
deriving (Semigroup, Monoid, MonadTrans) deriving (Monoid, MonadTrans)
deriving instance MonadAsync m => Alternative (AsyncT m) deriving instance MonadAsync m => Alternative (AsyncT m)
deriving instance MonadAsync m => MonadPlus (AsyncT m) deriving instance MonadAsync m => MonadPlus (AsyncT m)
@ -533,6 +617,29 @@ instance IsStream AsyncT where
toStream = getAsyncT toStream = getAsyncT
fromStream = AsyncT fromStream = AsyncT
------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------
-- | Same as the 'Semigroup' instance of 'AsyncT'. Merges two streams
-- concurrently, preferring the elements from the left one when available.
{-# INLINE asyncmerge #-}
asyncmerge :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
asyncmerge m1 m2 = fromStream $ S.asyncmerge (toStream m1) (toStream m2)
instance MonadAsync m => Semigroup (AsyncT m a) where
(<>) = asyncmerge
-- | Same as 'asyncmerge'.
{-# DEPRECATED (<|) "Please use '<>' of AsyncT or 'asyncmerge' instead." #-}
{-# INLINE (<|) #-}
(<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
(<|) = asyncmerge
------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------
{-# INLINE parbind #-} {-# INLINE parbind #-}
parbind parbind
:: (forall c. Stream m c -> Stream m c -> Stream m c) :: (forall c. Stream m c -> Stream m c -> Stream m c)
@ -543,7 +650,7 @@ parbind par m f = go m
where where
go (Stream g) = go (Stream g) =
Stream $ \ctx stp yld -> Stream $ \ctx stp yld ->
let run x = (SC.runStream x) ctx stp yld let run x = (S.runStream x) ctx stp yld
yield a Nothing = run $ f a yield a Nothing = run $ f a
yield a (Just r) = run $ f a `par` go r yield a (Just r) = run $ f a `par` go r
in g Nothing stp yield in g Nothing stp yield
@ -552,14 +659,14 @@ instance MonadAsync m => Monad (AsyncT m) where
return = pure return = pure
(AsyncT m) >>= f = AsyncT $ parbind par m g (AsyncT m) >>= f = AsyncT $ parbind par m g
where g x = getAsyncT (f x) where g x = getAsyncT (f x)
par = joinStreamVar2 (SVarStyle Conjunction LIFO) par = S.joinStreamVar2 (SVarStyle Conjunction LIFO)
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- Applicative -- Applicative
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
instance MonadAsync m => Applicative (AsyncT m) where instance MonadAsync m => Applicative (AsyncT m) where
pure a = AsyncT $ scons a Nothing pure a = AsyncT $ S.cons a Nothing
(<*>) = ap (<*>) = ap
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
@ -620,8 +727,20 @@ instance (MonadAsync m, Floating a) => Floating (AsyncT m a) where
-- ParallelT -- ParallelT
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- | Like 'SerialT' but runs /all/ iterations fairly concurrently using a round -- | Round robin concurrent composition.
-- robin scheduling. --
-- The Semigroup instance of 'ParallelT' concurrently /merges/ streams in a
-- round robin fashion, yielding elements from both streams alternately.
--
-- @
-- main = ('toList' . 'parallely' $ (fromFoldable [1,2]) \<> (fromFoldable [3,4])) >>= print
-- @
-- @
-- [1,3,2,4]
-- @
--
-- Similarly, the 'Monad' instance of 'ParallelT' runs /all/ iterations fairly
-- concurrently using a round robin scheduling.
-- --
-- @ -- @
-- import "Streamly" -- import "Streamly"
@ -641,8 +760,12 @@ instance (MonadAsync m, Floating a) => Floating (AsyncT m a) where
-- --
-- Unlike 'AsyncT' all iterations are guaranteed to run fairly concurrently, -- Unlike 'AsyncT' all iterations are guaranteed to run fairly concurrently,
-- unconditionally. -- unconditionally.
--
-- Note that round robin composition can only combine a finite number of
-- streams as it needs to retain state for each unfinished stream.
--
newtype ParallelT m a = ParallelT {getParallelT :: Stream m a} newtype ParallelT m a = ParallelT {getParallelT :: Stream m a}
deriving (Semigroup, Monoid, MonadTrans) deriving (Monoid, MonadTrans)
deriving instance MonadAsync m => Alternative (ParallelT m) deriving instance MonadAsync m => Alternative (ParallelT m)
deriving instance MonadAsync m => MonadPlus (ParallelT m) deriving instance MonadAsync m => MonadPlus (ParallelT m)
@ -657,18 +780,35 @@ instance IsStream ParallelT where
toStream = getParallelT toStream = getParallelT
fromStream = ParallelT fromStream = ParallelT
------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------
-- | Same as the 'Semigroup' instance of 'ParallelT'. Merges two streams
-- concurrently choosing elements from both fairly.
{-# INLINE parmerge #-}
parmerge :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
parmerge m1 m2 = fromStream $ S.parmerge (toStream m1) (toStream m2)
instance MonadAsync m => Semigroup (ParallelT m a) where
(<>) = parmerge
------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------
instance MonadAsync m => Monad (ParallelT m) where instance MonadAsync m => Monad (ParallelT m) where
return = pure return = pure
(ParallelT m) >>= f = ParallelT $ parbind par m g (ParallelT m) >>= f = ParallelT $ parbind par m g
where g x = getParallelT (f x) where g x = getParallelT (f x)
par = joinStreamVar2 (SVarStyle Conjunction FIFO) par = S.joinStreamVar2 (SVarStyle Conjunction FIFO)
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- Applicative -- Applicative
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
instance MonadAsync m => Applicative (ParallelT m) where instance MonadAsync m => Applicative (ParallelT m) where
pure a = ParallelT $ scons a Nothing pure a = ParallelT $ S.cons a Nothing
(<*>) = ap (<*>) = ap
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
@ -738,10 +878,10 @@ zipWith f m1 m2 = fromStream $ go (toStream m1) (toStream m2)
let merge a ra = let merge a ra =
let yield2 b Nothing = yld (f a b) Nothing let yield2 b Nothing = yld (f a b) Nothing
yield2 b (Just rb) = yld (f a b) (Just (go ra rb)) yield2 b (Just rb) = yld (f a b) (Just (go ra rb))
in (SC.runStream my) Nothing stp yield2 in (S.runStream my) Nothing stp yield2
let yield1 a Nothing = merge a snil let yield1 a Nothing = merge a S.nil
yield1 a (Just ra) = merge a ra yield1 a (Just ra) = merge a ra
(SC.runStream mx) Nothing stp yield1 (S.runStream mx) Nothing stp yield1
-- | The applicative instance of 'ZipSerial' zips a number of streams serially -- | The applicative instance of 'ZipSerial' zips a number of streams serially
-- i.e. it produces one element from each stream serially and then zips all -- i.e. it produces one element from each stream serially and then zips all
@ -758,8 +898,9 @@ zipWith f m1 m2 = fromStream $ go (toStream m1) (toStream m2)
-- [(1,3,5),(2,4,6)] -- [(1,3,5),(2,4,6)]
-- @ -- @
-- --
-- This applicative operation can be seen as the zipping equivalent of -- The 'Semigroup' instance of this type works the same way as that of
-- interleaving with '<=>'. -- 'SerialT'.
--
newtype ZipSerial m a = ZipSerial {getZipSerial :: Stream m a} newtype ZipSerial m a = ZipSerial {getZipSerial :: Stream m a}
deriving (Semigroup, Monoid) deriving (Semigroup, Monoid)
@ -777,7 +918,7 @@ instance Monad m => Functor (ZipSerial m) where
in m Nothing stp yield in m Nothing stp yield
instance Monad m => Applicative (ZipSerial m) where instance Monad m => Applicative (ZipSerial m) where
pure = ZipSerial . srepeat pure = ZipSerial . S.repeat
(<*>) = zipWith id (<*>) = zipWith id
instance IsStream ZipSerial where instance IsStream ZipSerial where
@ -835,7 +976,7 @@ zipAsyncWith :: (IsStream t, MonadAsync m)
zipAsyncWith f m1 m2 = fromStream $ Stream $ \_ stp yld -> do zipAsyncWith f m1 m2 = fromStream $ Stream $ \_ stp yld -> do
ma <- async m1 ma <- async m1
mb <- async m2 mb <- async m2
(SC.runStream (toStream (zipWith f ma mb))) Nothing stp yld (S.runStream (toStream (zipWith f ma mb))) Nothing stp yld
-- | Like 'ZipSerial' but zips in parallel, it generates all the elements to -- | Like 'ZipSerial' but zips in parallel, it generates all the elements to
-- be zipped concurrently. -- be zipped concurrently.
@ -850,8 +991,9 @@ zipAsyncWith f m1 m2 = fromStream $ Stream $ \_ stp yld -> do
-- [(1,3,5),(2,4,6)] -- [(1,3,5),(2,4,6)]
-- @ -- @
-- --
-- This applicative operation can be seen as the zipping equivalent of -- The 'Semigroup' instance of this type works the same way as that of
-- parallel composition with '<|>'. -- 'SerialT'.
--
newtype ZipAsync m a = ZipAsync {getZipAsync :: Stream m a} newtype ZipAsync m a = ZipAsync {getZipAsync :: Stream m a}
deriving (Semigroup, Monoid) deriving (Semigroup, Monoid)
@ -865,7 +1007,7 @@ instance Monad m => Functor (ZipAsync m) where
in m Nothing stp yield in m Nothing stp yield
instance MonadAsync m => Applicative (ZipAsync m) where instance MonadAsync m => Applicative (ZipAsync m) where
pure = ZipAsync . srepeat pure = ZipAsync . S.repeat
(<*>) = zipAsyncWith id (<*>) = zipAsyncWith id
instance IsStream ZipAsync where instance IsStream ZipAsync where
@ -982,51 +1124,6 @@ runZipStream = runZipSerial
runZipAsync :: Monad m => ZipAsync m a -> m () runZipAsync :: Monad m => ZipAsync m a -> m ()
runZipAsync = runStream runZipAsync = runStream
------------------------------------------------------------------------------
-- Sum Style Composition
------------------------------------------------------------------------------
infixr 5 <=>
-- | Sequential interleaved composition, in contrast to '<>' this operator
-- fairly interleaves two streams instead of appending them; yielding one
-- element from each stream alternately.
--
-- @
-- main = ('toList' . 'serially' $ (return 1 <> return 2) \<=\> (return 3 <> return 4)) >>= print
-- @
-- @
-- [1,3,2,4]
-- @
--
-- This operator corresponds to the 'InterleavedT' style. Unlike '<>', this
-- operator cannot be used to fold infinite containers since that might
-- accumulate too many partially drained streams. To be clear, it can combine
-- infinite streams but not infinite number of streams.
{-# INLINE (<=>) #-}
(<=>) :: IsStream t => t m a -> t m a -> t m a
m1 <=> m2 = fromStream $ interleave (toStream m1) (toStream m2)
-- | Demand driven concurrent composition. In contrast to '<|>' this operator
-- concurrently "merges" streams in a left biased manner rather than fairly
-- interleaving them. It keeps yielding from the stream on the left as long as
-- it can. If the left stream blocks or cannot keep up with the pace of the
-- consumer it can concurrently yield from the stream on the right in parallel.
--
-- @
-- main = ('toList' . 'serially' $ (return 1 <> return 2) \<| (return 3 <> return 4)) >>= print
-- @
-- @
-- [1,2,3,4]
-- @
--
-- Unlike '<|>' it can be used to fold infinite containers of streams. This
-- operator corresponds to the 'AsyncT' type for product style composition.
--
{-# INLINE (<|) #-}
(<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
m1 <| m2 = fromStream $ parLeft (toStream m1) (toStream m2)
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- Fold Utilities -- Fold Utilities
------------------------------------------------------------------------------ ------------------------------------------------------------------------------

View File

@ -9,8 +9,12 @@ import Data.List (sort)
import Test.Hspec import Test.Hspec
import Streamly import Streamly
import Streamly.Prelude ((.:), nil)
import qualified Streamly.Prelude as A import qualified Streamly.Prelude as A
singleton :: IsStream t => a -> t m a
singleton a = a .: nil
toListSerial :: SerialT IO a -> IO [a] toListSerial :: SerialT IO a -> IO [a]
toListSerial = A.toList . serially toListSerial = A.toList . serially
@ -20,8 +24,8 @@ toListInterleaved = A.toList . interleaving
toListAsync :: AsyncT IO a -> IO [a] toListAsync :: AsyncT IO a -> IO [a]
toListAsync = A.toList . asyncly toListAsync = A.toList . asyncly
toListParallel :: Ord a => ParallelT IO a -> IO [a] toListParallel :: ParallelT IO a -> IO [a]
toListParallel = fmap sort . A.toList . parallely toListParallel = A.toList . parallely
main :: IO () main :: IO ()
main = hspec $ do main = hspec $ do
@ -50,8 +54,9 @@ main = hspec $ do
it "fmap on composed (<>)" $ it "fmap on composed (<>)" $
(toListSerial $ fmap (+1) (return 1 <> return 2)) (toListSerial $ fmap (+1) (return 1 <> return 2))
`shouldReturn` ([2,3] :: [Int]) `shouldReturn` ([2,3] :: [Int])
it "fmap on composed (<|>)" $
(toListSerial $ fmap (+1) (return 1 <|> return 2)) it "fmap on composed (<>)" $
((toListParallel $ fmap (+1) (return 1 <> return 2)) >>= return . sort)
`shouldReturn` ([2,3] :: [Int]) `shouldReturn` ([2,3] :: [Int])
--------------------------------------------------------------------------- ---------------------------------------------------------------------------
@ -70,43 +75,43 @@ main = hspec $ do
`shouldReturn` ([(1,2),(1,3)] :: [(Int, Int)]) `shouldReturn` ([(1,2),(1,3)] :: [(Int, Int)])
it "Apply - parallel composed first argument" $ it "Apply - parallel composed first argument" $
(toListSerial $ (,) <$> (return 1 <|> return 2) <*> (return 3)) (toListParallel ((,) <$> (return 1 <> return 2) <*> (return 3)) >>= return . sort)
`shouldReturn` ([(1,3),(2,3)] :: [(Int, Int)]) `shouldReturn` ([(1,3),(2,3)] :: [(Int, Int)])
it "Apply - parallel composed second argument" $ it "Apply - parallel composed second argument" $
(toListSerial $ (,) <$> (return 1) <*> (return 2 <|> return 3)) (toListParallel ((,) <$> (return 1) <*> (return 2 <> return 3)) >>= return . sort)
`shouldReturn` ([(1,2),(1,3)] :: [(Int, Int)]) `shouldReturn` ([(1,2),(1,3)] :: [(Int, Int)])
--------------------------------------------------------------------------- ---------------------------------------------------------------------------
-- Monoidal Compositions -- Monoidal Compositions
--------------------------------------------------------------------------- ---------------------------------------------------------------------------
describe "Serial Composition (<>)" $ compose (<>) id describe "Serial Composition" $ compose serially mempty id
describe "Serial Composition (mappend)" $ compose mappend id describe "Interleaved Composition" $ compose interleaving mempty sort
describe "Interleaved Composition (<>)" $ compose (<=>) sort describe "Left biased parallel Composition" $ compose asyncly mempty sort
describe "Left biased parallel Composition (<|)" $ compose (<|) sort describe "Fair parallel Composition" $ compose parallely mempty sort
describe "Fair parallel Composition (<|>)" $ compose (<|>) sort describe "Semigroup Composition for ZipSerial" $ compose zipping mempty id
describe "Fair parallel Composition (mplus)" $ compose mplus sort describe "Semigroup Composition for ZipAsync" $ compose zippingAsync mempty id
-- XXX need to check alternative compositions as well
--------------------------------------------------------------------------- ---------------------------------------------------------------------------
-- Monoidal Composition ordering checks -- Monoidal Composition ordering checks
--------------------------------------------------------------------------- ---------------------------------------------------------------------------
describe "Serial interleaved ordering check (<=>)" $ interleaveCheck (<=>) describe "Serial interleaved ordering check" $ interleaveCheck interleaving
describe "Parallel interleaved ordering check (<|>)" $ interleaveCheck (<|>) describe "Parallel interleaved ordering check" $ interleaveCheck parallely
describe "Left biased parallel time order check" $ parallelCheck (<|) describe "Left biased parallel time order check" $ parallelCheck asyncly
describe "Fair parallel time order check" $ parallelCheck (<|>) describe "Fair parallel time order check" $ parallelCheck parallely
--------------------------------------------------------------------------- ---------------------------------------------------------------------------
-- TBD Monoidal composition combinations -- TBD Monoidal composition combinations
--------------------------------------------------------------------------- ---------------------------------------------------------------------------
-- TBD need more such combinations to be tested. -- TBD need more such combinations to be tested.
describe "<> and <>" $ composeAndComposeSimple (<>) (<>) (cycle [[1 .. 9]]) describe "<> and <>" $ composeAndComposeSimple serially serially (cycle [[1 .. 9]])
describe "<> and <=>" $ composeAndComposeSimple describe "<> and <=>" $ composeAndComposeSimple
(<>) serially
(<=>) interleaving
([ [1 .. 9] ([ [1 .. 9]
, [1 .. 9] , [1 .. 9]
, [1, 3, 2, 4, 6, 5, 7, 9, 8] , [1, 3, 2, 4, 6, 5, 7, 9, 8]
@ -114,8 +119,8 @@ main = hspec $ do
]) ])
describe "<=> and <=>" $ composeAndComposeSimple describe "<=> and <=>" $ composeAndComposeSimple
(<=>) interleaving
(<=>) interleaving
([ [1, 4, 2, 7, 3, 5, 8, 6, 9] ([ [1, 4, 2, 7, 3, 5, 8, 6, 9]
, [1, 7, 4, 8, 2, 9, 5, 3, 6] , [1, 7, 4, 8, 2, 9, 5, 3, 6]
, [1, 4, 3, 7, 2, 6, 9, 5, 8] , [1, 4, 3, 7, 2, 6, 9, 5, 8]
@ -123,8 +128,8 @@ main = hspec $ do
]) ])
describe "<=> and <>" $ composeAndComposeSimple describe "<=> and <>" $ composeAndComposeSimple
(<=>) interleaving
(<>) serially
([ [1, 4, 2, 7, 3, 5, 8, 6, 9] ([ [1, 4, 2, 7, 3, 5, 8, 6, 9]
, [1, 7, 4, 8, 2, 9, 5, 3, 6] , [1, 7, 4, 8, 2, 9, 5, 3, 6]
, [1, 4, 2, 7, 3, 5, 8, 6, 9] , [1, 4, 2, 7, 3, 5, 8, 6, 9]
@ -132,6 +137,9 @@ main = hspec $ do
]) ])
describe "Nested parallel and serial compositions" $ do describe "Nested parallel and serial compositions" $ do
let t = timed
p = adapt . parallely
s = adapt . serially
{- {-
-- This is not correct, the result can also be [4,4,8,0,8,0,2,2] -- This is not correct, the result can also be [4,4,8,0,8,0,2,2]
-- because of parallelism of [8,0] and [8,0]. -- because of parallelism of [8,0] and [8,0].
@ -143,10 +151,9 @@ main = hspec $ do
`shouldReturn` ([4,4,8,8,0,0,2,2]) `shouldReturn` ([4,4,8,8,0,0,2,2])
-} -}
it "Nest <|>, <>, <|> (2)" $ it "Nest <|>, <>, <|> (2)" $
let t = timed (A.toList . parallely) (
in toListSerial ( s (p (t 4 <> t 8) <> p (t 1 <> t 2))
((t 4 <|> t 8) <> (t 1 <|> t 2)) <> s (p (t 4 <> t 8) <> p (t 1 <> t 2)))
<|> ((t 4 <|> t 8) <> (t 1 <|> t 2)))
`shouldReturn` ([4,4,8,8,1,1,2,2]) `shouldReturn` ([4,4,8,8,1,1,2,2])
-- FIXME: These two keep failing intermittently on Mac OS X -- FIXME: These two keep failing intermittently on Mac OS X
-- Need to examine and fix the tests. -- Need to examine and fix the tests.
@ -165,55 +172,82 @@ main = hspec $ do
`shouldReturn` ([4,4,1,1,8,2,9,2]) `shouldReturn` ([4,4,1,1,8,2,9,2])
-} -}
it "Nest <|>, <|>, <|>" $ it "Nest <|>, <|>, <|>" $
let t = timed (A.toList . parallely) (
in toListSerial ( ((t 4 <> t 8) <> (t 0 <> t 2))
((t 4 <|> t 8) <|> (t 0 <|> t 2)) <> ((t 4 <> t 8) <> (t 0 <> t 2)))
<|> ((t 4 <|> t 8) <|> (t 0 <|> t 2)))
`shouldReturn` ([0,0,2,2,4,4,8,8]) `shouldReturn` ([0,0,2,2,4,4,8,8])
--------------------------------------------------------------------------- ---------------------------------------------------------------------------
-- Monoidal composition recursion loops -- Monoidal composition recursion loops
--------------------------------------------------------------------------- ---------------------------------------------------------------------------
describe "Serial loops (<>)" $ loops (<>) id reverse describe "Serial loops (<>)" $ loops serially id reverse
describe "Left biased parallel loops (<|)" $ loops (<|) sort sort describe "Left biased parallel loops (<|)" $ loops asyncly sort sort
describe "Fair parallel loops (<|>)" $ loops (<|>) sort sort describe "Fair parallel loops (<|>)" $ loops parallely sort sort
--------------------------------------------------------------------------- ---------------------------------------------------------------------------
-- Bind and monoidal composition combinations -- Bind and monoidal composition combinations
--------------------------------------------------------------------------- ---------------------------------------------------------------------------
forM_ [(<>), (<=>), (<|), (<|>)] $ \g -> describe "Bind and compose1" $ bindAndComposeSimple serially serially
describe "Bind and compose" $ bindAndComposeSimple toListSerial g describe "Bind and compose2" $ bindAndComposeSimple serially interleaving
describe "Bind and compose3" $ bindAndComposeSimple serially asyncly
describe "Bind and compose4" $ bindAndComposeSimple serially parallely
forM_ [(<>), (<=>), (<|), (<|>)] $ \g -> describe "Bind and compose1" $ bindAndComposeSimple interleaving serially
describe "Bind and compose" $ bindAndComposeSimple toListInterleaved g describe "Bind and compose2" $ bindAndComposeSimple interleaving interleaving
describe "Bind and compose3" $ bindAndComposeSimple interleaving asyncly
describe "Bind and compose4" $ bindAndComposeSimple interleaving parallely
forM_ [(<>), (<=>), (<|), (<|>)] $ \g -> describe "Bind and compose1" $ bindAndComposeSimple asyncly serially
describe "Bind and compose" $ bindAndComposeSimple toListAsync g describe "Bind and compose2" $ bindAndComposeSimple asyncly interleaving
describe "Bind and compose3" $ bindAndComposeSimple asyncly asyncly
describe "Bind and compose4" $ bindAndComposeSimple asyncly parallely
forM_ [(<>), (<=>), (<|), (<|>)] $ \g -> describe "Bind and compose1" $ bindAndComposeSimple parallely serially
describe "Bind and compose" $ bindAndComposeSimple toListParallel g describe "Bind and compose2" $ bindAndComposeSimple parallely interleaving
describe "Bind and compose3" $ bindAndComposeSimple parallely asyncly
describe "Bind and compose4" $ bindAndComposeSimple parallely parallely
let fldr, fldl :: (IsStream t, Semigroup (t IO Int)) => [t IO Int] -> t IO Int
fldr = foldr (<>) nil
fldl = foldl (<>) nil
let fldr f = foldr f empty
fldl f = foldl f empty
in do
forM_ [(<>), (<=>), (<|), (<|>)] $ \g ->
forM_ [fldr, fldl] $ \k -> forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ describe "Bind and compose" $ bindAndComposeHierarchy serially serially k
bindAndComposeHierarchy toListSerial (k g)
forM_ [(<>), (<=>), (<|), (<|>)] $ \g ->
forM_ [fldr, fldl] $ \k -> forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ describe "Bind and compose" $ bindAndComposeHierarchy serially interleaving k
bindAndComposeHierarchy toListInterleaved (k g)
forM_ [(<>), (<=>), (<|), (<|>)] $ \g ->
forM_ [fldr, fldl] $ \k -> forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ describe "Bind and compose" $ bindAndComposeHierarchy serially asyncly k
bindAndComposeHierarchy toListAsync (k g)
forM_ [(<>), (<=>), (<|), (<|>)] $ \g ->
forM_ [fldr, fldl] $ \k -> forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ describe "Bind and compose" $ bindAndComposeHierarchy serially parallely k
bindAndComposeHierarchy toListParallel (k g)
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy interleaving serially k
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy interleaving interleaving k
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy interleaving asyncly k
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy interleaving parallely k
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy asyncly serially k
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy asyncly interleaving k
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy asyncly asyncly k
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy asyncly parallely k
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy parallely serially k
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy parallely interleaving k
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy parallely asyncly k
forM_ [fldr, fldl] $ \k ->
describe "Bind and compose" $ bindAndComposeHierarchy parallely parallely k
-- Nest two lists using different styles of product compositions -- Nest two lists using different styles of product compositions
it "Nests two streams using monadic serial composition" nestTwoSerial it "Nests two streams using monadic serial composition" nestTwoSerial
@ -229,8 +263,7 @@ main = hspec $ do
it "Nests two streams using Num serial composition" nestTwoSerialNum it "Nests two streams using Num serial composition" nestTwoSerialNum
it "Nests two streams using Num interleaved composition" nestTwoInterleavedNum it "Nests two streams using Num interleaved composition" nestTwoInterleavedNum
it "Nests two streams using Num async composition" nestTwoAsyncNum it "Nests two streams using Num async composition" nestTwoAsyncNum
-- This test fails intermittently, need to investigate it "Nests two streams using Num parallel composition" nestTwoParallelNum
-- it "Nests two streams using Num parallel composition" nestTwoParallelNum
--------------------------------------------------------------------------- ---------------------------------------------------------------------------
-- TBD Bind and Bind combinations -- TBD Bind and Bind combinations
@ -291,182 +324,183 @@ nestTwoAsync :: Expectation
nestTwoAsync = nestTwoAsync =
let s1 = foldMapWith (<>) return [1..4] let s1 = foldMapWith (<>) return [1..4]
s2 = foldMapWith (<>) return [5..8] s2 = foldMapWith (<>) return [5..8]
in toListAsync (do in (toListAsync (do
x <- s1 x <- s1
y <- s2 y <- s2
return (x + y) return (x + y)
) `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int]) ) >>= return . sort)
`shouldReturn` sort ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int])
nestTwoAsyncApp :: Expectation nestTwoAsyncApp :: Expectation
nestTwoAsyncApp = nestTwoAsyncApp =
let s1 = foldMapWith (<>) return [1..4] let s1 = foldMapWith (<>) return [1..4]
s2 = foldMapWith (<>) return [5..8] s2 = foldMapWith (<>) return [5..8]
in toListAsync ((+) <$> s1 <*> s2) in (toListAsync ((+) <$> s1 <*> s2) >>= return . sort)
`shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int]) `shouldReturn` sort ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int])
nestTwoAsyncNum :: Expectation nestTwoAsyncNum :: Expectation
nestTwoAsyncNum = nestTwoAsyncNum =
let s1 = foldMapWith (<>) return [1..4] let s1 = foldMapWith (<>) return [1..4]
s2 = foldMapWith (<>) return [5..8] s2 = foldMapWith (<>) return [5..8]
in toListAsync (s1 + s2) in (toListAsync (s1 + s2) >>= return . sort)
`shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int]) `shouldReturn` sort ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int])
nestTwoParallel :: Expectation nestTwoParallel :: Expectation
nestTwoParallel = nestTwoParallel =
let s1 = foldMapWith (<>) return [1..4] let s1 = foldMapWith (<>) return [1..4]
s2 = foldMapWith (<>) return [5..8] s2 = foldMapWith (<>) return [5..8]
in toListParallel (do in (toListParallel (do
x <- s1 x <- s1
y <- s2 y <- s2
return (x + y) return (x + y)
) `shouldReturn` ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int]) ) >>= return . sort)
`shouldReturn` sort ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int])
nestTwoParallelApp :: Expectation nestTwoParallelApp :: Expectation
nestTwoParallelApp = nestTwoParallelApp =
let s1 = foldMapWith (<>) return [1..4] let s1 = foldMapWith (<>) return [1..4]
s2 = foldMapWith (<>) return [5..8] s2 = foldMapWith (<>) return [5..8]
in toListParallel ((+) <$> s1 <*> s2) in (toListParallel ((+) <$> s1 <*> s2) >>= return . sort)
`shouldReturn` ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int]) `shouldReturn` sort ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int])
{-
nestTwoParallelNum :: Expectation nestTwoParallelNum :: Expectation
nestTwoParallelNum = nestTwoParallelNum =
let s1 = foldMapWith (<>) return [1..4] let s1 = foldMapWith (<>) return [1..4]
s2 = foldMapWith (<>) return [5..8] s2 = foldMapWith (<>) return [5..8]
in toListParallel (s1 + s2) in (toListParallel (s1 + s2) >>= return . sort)
`shouldReturn` ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int]) `shouldReturn` sort ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int])
-}
timed :: Int -> SerialT IO Int timed :: MonadIO (t IO) => Int -> t IO Int
timed x = liftIO (threadDelay (x * 100000)) >> return x timed x = liftIO (threadDelay (x * 100000)) >> return x
interleaveCheck interleaveCheck :: (IsStream t, Semigroup (t IO Int))
:: (SerialT IO Int -> SerialT IO Int -> SerialT IO Int) => (t IO Int -> t IO Int) -> Spec
-> Spec interleaveCheck t =
interleaveCheck f =
it "Interleave four" $ it "Interleave four" $
toListSerial ((return 0 <> return 1) `f` (return 100 <> return 101)) (A.toList . t) ((singleton 0 <> singleton 1) <> (singleton 100 <> singleton 101))
`shouldReturn` ([0, 100, 1, 101]) `shouldReturn` ([0, 100, 1, 101])
parallelCheck :: (SerialT IO Int -> SerialT IO Int -> SerialT IO Int) -> Spec parallelCheck :: (IsStream t, Semigroup (t IO Int), MonadIO (t IO))
parallelCheck f = do => (t IO Int -> t IO Int) -> Spec
parallelCheck t = do
it "Parallel ordering left associated" $ it "Parallel ordering left associated" $
toListSerial (((event 4 `f` event 3) `f` event 2) `f` event 1) (A.toList . t) (((event 4 <> event 3) <> event 2) <> event 1)
`shouldReturn` ([1..4]) `shouldReturn` ([1..4])
it "Parallel ordering right associated" $ it "Parallel ordering right associated" $
toListSerial (event 4 `f` (event 3 `f` (event 2 `f` event 1))) (A.toList . t) (event 4 <> (event 3 <> (event 2 <> event 1)))
`shouldReturn` ([1..4]) `shouldReturn` ([1..4])
where event n = (liftIO $ threadDelay (n * 100000)) >> (return n) where event n = (liftIO $ threadDelay (n * 100000)) >> (return n)
compose compose :: (IsStream t, Semigroup (t IO Int))
:: (SerialT IO Int -> SerialT IO Int -> SerialT IO Int) => (t IO Int -> t IO Int) -> t IO Int -> ([Int] -> [Int]) -> Spec
-> ([Int] -> [Int]) compose t z srt = do
-> Spec -- XXX these should get covered by the property tests
compose f srt = do
it "Compose mempty, mempty" $ it "Compose mempty, mempty" $
(tl (mempty `f` mempty)) `shouldReturn` [] (tl (z <> z)) `shouldReturn` ([] :: [Int])
it "Compose empty, empty" $
(tl (empty `f` empty)) `shouldReturn` []
it "Compose empty at the beginning" $ it "Compose empty at the beginning" $
(tl $ (empty `f` return 1)) `shouldReturn` [1] (tl $ (z <> singleton 1)) `shouldReturn` [1]
it "Compose empty at the end" $ it "Compose empty at the end" $
(tl $ (return 1 `f` empty)) `shouldReturn` [1] (tl $ (singleton 1 <> z)) `shouldReturn` [1]
it "Compose two" $ it "Compose two" $
(tl (return 0 `f` return 1) >>= return . srt) (tl (singleton 0 <> singleton 1) >>= return . srt)
`shouldReturn` [0, 1] `shouldReturn` [0, 1]
it "Compose many" $
((tl $ forEachWith (<>) [1..100] singleton) >>= return . srt)
`shouldReturn` [1..100]
-- These are not covered by the property tests
it "Compose three - empty in the middle" $ it "Compose three - empty in the middle" $
((tl $ (return 0 `f` empty `f` return 1)) >>= return . srt) ((tl $ (singleton 0 <> z <> singleton 1)) >>= return . srt)
`shouldReturn` [0, 1] `shouldReturn` [0, 1]
it "Compose left associated" $ it "Compose left associated" $
((tl $ (((return 0 `f` return 1) `f` return 2) `f` return 3)) ((tl $ (((singleton 0 <> singleton 1) <> singleton 2) <> singleton 3))
>>= return . srt) `shouldReturn` [0, 1, 2, 3] >>= return . srt) `shouldReturn` [0, 1, 2, 3]
it "Compose right associated" $ it "Compose right associated" $
((tl $ (return 0 `f` (return 1 `f` (return 2 `f` return 3)))) ((tl $ (singleton 0 <> (singleton 1 <> (singleton 2 <> singleton 3))))
>>= return . srt) `shouldReturn` [0, 1, 2, 3] >>= return . srt) `shouldReturn` [0, 1, 2, 3]
it "Compose many" $
((tl $ forEachWith f [1..100] return) >>= return . srt)
`shouldReturn` [1..100]
it "Compose hierarchical (multiple levels)" $ it "Compose hierarchical (multiple levels)" $
((tl $ (((return 0 `f` return 1) `f` (return 2 `f` return 3)) ((tl $ (((singleton 0 <> singleton 1) <> (singleton 2 <> singleton 3))
`f` ((return 4 `f` return 5) `f` (return 6 `f` return 7))) <> ((singleton 4 <> singleton 5) <> (singleton 6 <> singleton 7)))
) >>= return . srt) `shouldReturn` [0..7] ) >>= return . srt) `shouldReturn` [0..7]
where tl = toListSerial where tl = A.toList . t
composeAndComposeSimple composeAndComposeSimple
:: (SerialT IO Int -> SerialT IO Int -> SerialT IO Int) :: ( IsStream t1, Semigroup (t1 IO Int)
-> (SerialT IO Int -> SerialT IO Int -> SerialT IO Int) , IsStream t2, Semigroup (t2 IO Int), Monoid (t2 IO Int), Monad (t2 IO))
-> [[Int]] => (t1 IO Int -> t1 IO Int)
-> Spec -> (t2 IO Int -> t2 IO Int)
composeAndComposeSimple f g answer = do -> [[Int]] -> Spec
composeAndComposeSimple t1 t2 answer = do
let rfold = adapt . t2 . foldMapWith (<>) return
it "Compose right associated outer expr, right folded inner" $ it "Compose right associated outer expr, right folded inner" $
let fold = foldMapWith g return ((A.toList. t1) (rfold [1,2,3] <> (rfold [4,5,6] <> rfold [7,8,9])))
in (toListSerial (fold [1,2,3] `f` (fold [4,5,6] `f` fold [7,8,9])))
`shouldReturn` (answer !! 0) `shouldReturn` (answer !! 0)
it "Compose left associated outer expr, right folded inner" $ it "Compose left associated outer expr, right folded inner" $
let fold = foldMapWith g return ((A.toList . t1) ((rfold [1,2,3] <> rfold [4,5,6]) <> rfold [7,8,9]))
in (toListSerial ((fold [1,2,3] `f` fold [4,5,6]) `f` fold [7,8,9]))
`shouldReturn` (answer !! 1) `shouldReturn` (answer !! 1)
let lfold xs = adapt $ t2 $ foldl (<>) mempty $ map return xs
it "Compose right associated outer expr, left folded inner" $ it "Compose right associated outer expr, left folded inner" $
let fold xs = foldl g empty $ map return xs ((A.toList . t1) (lfold [1,2,3] <> (lfold [4,5,6] <> lfold [7,8,9])))
in (toListSerial (fold [1,2,3] `f` (fold [4,5,6] `f` fold [7,8,9])))
`shouldReturn` (answer !! 2) `shouldReturn` (answer !! 2)
it "Compose left associated outer expr, left folded inner" $ it "Compose left associated outer expr, left folded inner" $
let fold xs = foldl g empty $ map return xs ((A.toList . t1) ((lfold [1,2,3] <> lfold [4,5,6]) <> lfold [7,8,9]))
in (toListSerial ((fold [1,2,3] `f` fold [4,5,6]) `f` fold [7,8,9]))
`shouldReturn` (answer !! 3) `shouldReturn` (answer !! 3)
loops loops
:: (SerialT IO Int -> SerialT IO Int -> SerialT IO Int) :: (IsStream t, Semigroup (t IO Int), MonadIO (t IO))
=> (t IO Int -> t IO Int)
-> ([Int] -> [Int]) -> ([Int] -> [Int])
-> ([Int] -> [Int]) -> ([Int] -> [Int])
-> Spec -> Spec
loops f tsrt hsrt = do loops t tsrt hsrt = do
it "Tail recursive loop" $ (toListSerial (loopTail 0) >>= return . tsrt) it "Tail recursive loop" $ (A.toList (loopTail 0) >>= return . tsrt)
`shouldReturn` [0..3] `shouldReturn` [0..3]
it "Head recursive loop" $ (toListSerial (loopHead 0) >>= return . hsrt) it "Head recursive loop" $ (A.toList (loopHead 0) >>= return . hsrt)
`shouldReturn` [0..3] `shouldReturn` [0..3]
where where
loopHead x = do loopHead x = do
-- this print line is important for the test (causes a bind) -- this print line is important for the test (causes a bind)
liftIO $ putStrLn "LoopHead..." liftIO $ putStrLn "LoopHead..."
(if x < 3 then loopHead (x + 1) else empty) `f` return x t $ (if x < 3 then loopHead (x + 1) else nil) <> return x
loopTail x = do loopTail x = do
-- this print line is important for the test (causes a bind) -- this print line is important for the test (causes a bind)
liftIO $ putStrLn "LoopTail..." liftIO $ putStrLn "LoopTail..."
return x `f` (if x < 3 then loopTail (x + 1) else empty) t $ return x <> (if x < 3 then loopTail (x + 1) else nil)
bindAndComposeSimple bindAndComposeSimple
:: (IsStream t, Alternative (t IO), Monad (t IO)) :: ( IsStream t1, IsStream t2, Semigroup (t2 IO Int), Monad (t2 IO))
=> (forall a. Ord a => t IO a -> IO [a]) => (t1 IO Int -> t1 IO Int)
-> (t IO Int -> t IO Int -> t IO Int) -> (t2 IO Int -> t2 IO Int)
-> Spec -> Spec
bindAndComposeSimple tl g = do bindAndComposeSimple t1 t2 = do
-- XXX need a bind in the body of forEachWith instead of a simple return
it "Compose many (right fold) with bind" $ it "Compose many (right fold) with bind" $
(tl (forEachWith g [1..10 :: Int] $ \x -> return x `f` (return . id)) ((A.toList . t1) (adapt . t2 $ forEachWith (<>) [1..10 :: Int] return)
>>= return . sort) `shouldReturn` [1..10] >>= return . sort) `shouldReturn` [1..10]
it "Compose many (left fold) with bind" $ it "Compose many (left fold) with bind" $
let forL xs k = foldl g empty $ map k xs let forL xs k = foldl (<>) nil $ map k xs
in (tl (forL [1..10 :: Int] $ \x -> return x `f` (return . id)) in ((A.toList . t1) (adapt . t2 $ forL [1..10 :: Int] return)
>>= return . sort) `shouldReturn` [1..10] >>= return . sort) `shouldReturn` [1..10]
where f = (>>=)
bindAndComposeHierarchy bindAndComposeHierarchy
:: Monad (s IO) => (forall a. Ord a => s IO a -> IO [a]) :: ( IsStream t1, Monad (t1 IO)
-> ([s IO Int] -> s IO Int) , IsStream t2, Monad (t2 IO))
=> (t1 IO Int -> t1 IO Int)
-> (t2 IO Int -> t2 IO Int)
-> ([t2 IO Int] -> t2 IO Int)
-> Spec -> Spec
bindAndComposeHierarchy tl g = do bindAndComposeHierarchy t1 t2 g = do
it "Bind and compose nested" $ it "Bind and compose nested" $
(tl bindComposeNested >>= return . sort) ((A.toList . t1) bindComposeNested >>= return . sort)
`shouldReturn` (sort ( `shouldReturn` (sort (
[12, 18] [12, 18]
++ replicate 3 13 ++ replicate 3 13
@ -489,12 +523,11 @@ bindAndComposeHierarchy tl g = do
-- in m -- in m
in b in b
tripleCompose a b c = g [a, b, c] tripleCompose a b c = adapt . t2 $ g [a, b, c]
tripleBind mx my mz = tripleBind mx my mz =
mx `f` \x -> my mx >>= \x -> my
`f` \y -> mz >>= \y -> mz
`f` \z -> return (x + y + z) >>= \z -> return (x + y + z)
f = (>>=)
mixedOps :: Spec mixedOps :: Spec
mixedOps = do mixedOps = do
@ -512,14 +545,14 @@ mixedOps = do
x <- return 1 x <- return 1
y <- return 2 y <- return 2
z <- do z <- do
x1 <- return 1 <|> return 2 x1 <- adapt . parallely $ return 1 <> return 2
liftIO $ return () liftIO $ return ()
liftIO $ putStr "" liftIO $ putStr ""
y1 <- return 1 <| return 2 y1 <- adapt . asyncly $ return 1 <> return 2
z1 <- do z1 <- do
x11 <- return 1 <> return 2 x11 <- return 1 <> return 2
y11 <- return 1 <| return 2 y11 <- adapt . asyncly $ return 1 <> return 2
z11 <- return 1 <=> return 2 z11 <- adapt . interleaving $ return 1 <> return 2
liftIO $ return () liftIO $ return ()
liftIO $ putStr "" liftIO $ putStr ""
return (x11 + y11 + z11) return (x11 + y11 + z11)

View File

@ -15,8 +15,12 @@ import Test.QuickCheck.Monadic (run, monadicIO, monitor, assert, PropertyM)
import Test.Hspec import Test.Hspec
import Streamly import Streamly
import Streamly.Prelude ((.:), nil)
import qualified Streamly.Prelude as A import qualified Streamly.Prelude as A
singleton :: IsStream t => a -> t m a
singleton a = a .: nil
sortEq :: Ord a => [a] -> [a] -> Bool sortEq :: Ord a => [a] -> [a] -> Bool
sortEq a b = sort a == sort b sortEq a b = sort a == sort b
@ -203,20 +207,19 @@ transformOpsWord8 constr desc t = do
-- XXX concatenate streams of multiple elements rather than single elements -- XXX concatenate streams of multiple elements rather than single elements
semigroupOps semigroupOps
:: (IsStream t, MonadPlus (t IO) :: (IsStream t
#if __GLASGOW_HASKELL__ < 804 #if __GLASGOW_HASKELL__ < 804
, Semigroup (t IO Int) , Semigroup (t IO Int)
#endif #endif
, Monoid (t IO Int)) , Monoid (t IO Int))
=> String -> (t IO Int -> t IO Int) -> Spec => String
semigroupOps desc t = do -> (t IO Int -> t IO Int)
prop (desc ++ " <>") $ foldFromList (foldMapWith (<>) return) t (==) -> ([Int] -> [Int] -> Bool)
prop (desc ++ " mappend") $ foldFromList (foldMapWith mappend return) t (==) -> Spec
prop (desc ++ " <=>") $ foldFromList (foldMapWith (<=>) return) t (==) semigroupOps desc t eq = do
prop (desc ++ " <|>") $ foldFromList (foldMapWith (<|>) return) t sortEq prop (desc ++ " <>") $ foldFromList (foldMapWith (<>) singleton) t eq
prop (desc ++ " mplus") $ foldFromList (foldMapWith mplus return) t sortEq prop (desc ++ " mappend") $ foldFromList (foldMapWith mappend singleton) t eq
prop (desc ++ " <|") $ foldFromList (foldMapWith (<|) return) t sortEq
applicativeOps applicativeOps
:: (IsStream t, Applicative (t IO)) :: (IsStream t, Applicative (t IO))
@ -330,8 +333,12 @@ main = hspec $ do
functorOps folded "zippingAsync folded" zippingAsync (==) functorOps folded "zippingAsync folded" zippingAsync (==)
describe "Semigroup operations" $ do describe "Semigroup operations" $ do
semigroupOps "serially" serially semigroupOps "serially" serially (==)
semigroupOps "interleaving" interleaving semigroupOps "interleaving" interleaving (==)
semigroupOps "asyncly" asyncly sortEq
semigroupOps "parallely" parallely sortEq
semigroupOps "zipping" zipping (==)
semigroupOps "zippingAsync" zippingAsync (==)
describe "Applicative operations" $ do describe "Applicative operations" $ do
-- The tests using sorted equality are weaker tests -- The tests using sorted equality are weaker tests