diff --git a/Changelog.md b/Changelog.md index bb38351d..6b785d6d 100644 --- a/Changelog.md +++ b/Changelog.md @@ -1,6 +1,15 @@ ## Unreleased ### Breaking changes +* Change the semantics of the Semigroup instance for `InterleavedT`, `AsyncT` + and `ParallelT`. Now the `<>` operation interleaves two streams for + `InterleavedT` (just like the now deprecated `<=>` operation which has been + renamed to `interleave`). For `AsyncT` `<>` now concurrently merges two + streams (just like the now deprecated `<|` operation which has been renamed + to `asyncmerge`). For `ParallelT` the `<>` operation now behaves like the + earlier `Alternative` operator `<|>`. +* Change the semantics of `Alternative` instance. The `<|>` operator now has a + different behavior for each type. See the documentation for more details. * Change the type of `foldrM` to make it consistent with `foldrM` in base ### Deprecations @@ -11,6 +20,8 @@ * `runZipStream` to `runZipSerial` * `Streaming` to `IsStream` * `runStreaming` to `runStream` + * `<=>` to `interleave` + * `<|` to `asyncmerge` * `each` to `fromFoldable` * `scan` to `scanx` * `foldl` to `foldx` diff --git a/src/Streamly.hs b/src/Streamly.hs index 7004d5b9..be3bfe40 100644 --- a/src/Streamly.hs +++ b/src/Streamly.hs @@ -18,22 +18,24 @@ module Streamly MonadAsync , IsStream - -- * Product Style Composition + -- * General Stream Styles -- $product , SerialT , InterleavedT , AsyncT , ParallelT - -- * Zip Style Composition + -- * Zip Style Streams -- $zipping , ZipSerial , ZipAsync - -- * Sum Style Composition + -- * Type Independent Sum Operations -- $sum - , (<=>) - , (<|) + , append + , interleave + , asyncmerge + , parmerge -- * Transformation , async @@ -78,6 +80,8 @@ module Streamly , runZipStream , StreamT , ZipStream + , (<=>) + , (<|) ) where @@ -146,18 +150,14 @@ import Control.Monad.Trans.Class (MonadTrans (..)) -- being zipped serially whereas 'ZipAsync' produces both the elements being -- zipped concurrently. -- --- Two streams of the same type can be combined using a sum style composition --- to generate a stream of the same type where the output stream would contain --- all elements of both the streams. However, the sequence in which the --- elements in the resulting stream are produced depends on the combining --- operator. Four distinct sum style operators, '<>', '<=>', '<|' and '<|>' --- combine two streams in different ways, each corresponding to the one of the --- four ways of combining monadically. See the respective section below for --- more details. +-- Two streams of the same type can be merged using a sum style composition to +-- generate a stream of the same type where the output stream would contain all +-- elements of both the streams. However, the sequence or the manner +-- (concurrent or serial) in which the elements in the resulting stream are +-- produced depends on the type of the stream. -- --- Concurrent composition types 'AsyncT', 'ParallelT', 'ZipAsync' and --- concurrent composition operators '<|' and '<|>' require the underlying monad --- of the streaming monad transformer to be 'MonadAsync'. +-- Concurrent composition types 'AsyncT', 'ParallelT', 'ZipAsync' require the +-- underlying monad of the streaming monad transformer to be 'MonadAsync'. -- -- For more details please see the "Streamly.Tutorial" and "Streamly.Examples" -- (the latter is available only when built with the 'examples' build flag). @@ -179,37 +179,11 @@ import Control.Monad.Trans.Class (MonadTrans (..)) -- not monads. -- $sum --- --- Just like product style composition there are four distinct ways to combine --- streams in sum style each directly corresponding to one of the product style --- composition. --- --- The standard semigroup append '<>' operator appends two streams serially, --- this style corresponds to the 'SerialT' style of monadic composition. --- --- @ --- main = ('toList' . 'serially' $ (return 1 <> return 2) <> (return 3 <> return 4)) >>= print --- @ --- @ --- [1,2,3,4] --- @ --- --- The standard 'Alternative' operator '<|>' fairly interleaves two streams in --- parallel, this operator corresponds to the 'ParallelT' style. --- --- @ --- main = ('toList' . 'serially' $ (return 1 <> return 2) \<|\> (return 3 <> return 4)) >>= print --- @ --- @ --- [1,3,2,4] --- @ --- --- Unlike '<|', this operator cannot be used to fold infinite containers since --- that might accumulate too many partially drained streams. To be clear, it --- can combine infinite streams but not infinite number of streams. --- --- Two additional sum style composition operators that streamly introduces are --- described below. +-- Each stream style provides its own way of merging streams. The 'Semigroup' +-- '<>' operation can be used to merge streams in the style of the current +-- type. In case you want to merge streams in a particular style you can either +-- use a type adapter to force that type of composition or alternatively use +-- the type independent merge operations described in this section. -- $adapters -- diff --git a/src/Streamly/Core.hs b/src/Streamly/Core.hs index 2f97d9ac..728b3ed0 100644 --- a/src/Streamly/Core.hs +++ b/src/Streamly/Core.hs @@ -24,12 +24,18 @@ module Streamly.Core , Stream (..) -- * Construction - , scons - , srepeat - , snil + , cons + , repeat + , nil - -- * Composition + -- * Semigroup Style Composition + , append , interleave + , asyncmerge + , parmerge + + -- * Alternative + , alt -- * Concurrent Stream Vars (SVars) , SVar @@ -42,10 +48,6 @@ module Streamly.Core , joinStreamVar2 , fromStreamVar , toStreamVar - - -- * Concurrent Streams - , parAlt - , parLeft ) where @@ -76,6 +78,7 @@ import Data.Maybe (isNothing) import Data.Semigroup (Semigroup(..)) import Data.Set (Set) import qualified Data.Set as S +import Prelude hiding (repeat) ------------------------------------------------------------------------------ -- Parent child thread communication type @@ -215,14 +218,14 @@ newtype Stream m a = -- 'MonadAsync'. type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m) -scons :: a -> Maybe (Stream m a) -> Stream m a -scons a r = Stream $ \_ _ yld -> yld a r +cons :: a -> Maybe (Stream m a) -> Stream m a +cons a r = Stream $ \_ _ yld -> yld a r -srepeat :: a -> Stream m a -srepeat a = let x = scons a (Just x) in x +repeat :: a -> Stream m a +repeat a = let x = cons a (Just x) in x -snil :: Stream m a -snil = Stream $ \_ stp _ -> stp +nil :: Stream m a +nil = Stream $ \_ stp _ -> stp ------------------------------------------------------------------------------ -- Composing streams @@ -237,30 +240,32 @@ snil = Stream $ \_ stp _ -> stp -- Semigroup ------------------------------------------------------------------------------ --- | '<>' concatenates two streams sequentially i.e. the first stream is +-- | Concatenates two streams sequentially i.e. the first stream is -- exhausted completely before yielding any element from the second stream. +append :: Stream m a -> Stream m a -> Stream m a +append m1 m2 = go m1 + where + go (Stream m) = Stream $ \_ stp yld -> + let stop = (runStream m2) Nothing stp yld + yield a Nothing = yld a (Just m2) + yield a (Just r) = yld a (Just (go r)) + in m Nothing stop yield + instance Semigroup (Stream m a) where - m1 <> m2 = go m1 - where - go (Stream m) = Stream $ \_ stp yld -> - let stop = (runStream m2) Nothing stp yld - yield a Nothing = yld a (Just m2) - yield a (Just r) = yld a (Just (go r)) - in m Nothing stop yield + (<>) = append ------------------------------------------------------------------------------ -- Monoid ------------------------------------------------------------------------------ instance Monoid (Stream m a) where - mempty = Stream $ \_ stp _ -> stp + mempty = nil mappend = (<>) ------------------------------------------------------------------------------ -- Interleave ------------------------------------------------------------------------------ --- | Same as '<=>'. interleave :: Stream m a -> Stream m a -> Stream m a interleave m1 m2 = Stream $ \_ stp yld -> do let stop = (runStream m2) Nothing stp yld @@ -603,32 +608,13 @@ joinStreamVar2 style m1 m2 = Stream $ \st stp yld -> -- Semigroup and Monoid style compositions for parallel actions ------------------------------------------------------------------------------ -{- --- | Same as '<>|'. -parAhead :: Stream m a -> Stream m a -> Stream m a -parAhead = undefined +{-# INLINE asyncmerge #-} +asyncmerge :: MonadAsync m => Stream m a -> Stream m a -> Stream m a +asyncmerge = joinStreamVar2 (SVarStyle Disjunction LIFO) --- | Sequential composition similar to '<>' except that it can execute the --- action on the right in parallel ahead of time. Returns the results in --- sequential order like '<>' from left to right. -(<>|) :: Stream m a -> Stream m a -> Stream m a -(<>|) = parAhead --} - --- | Same as '<|>'. Since this schedules all the composed streams fairly you --- cannot fold infinite number of streams using this operation. -{-# INLINE parAlt #-} -parAlt :: MonadAsync m => Stream m a -> Stream m a -> Stream m a -parAlt = joinStreamVar2 (SVarStyle Disjunction FIFO) - --- | Same as '<|'. Since this schedules the left side computation first you can --- right fold an infinite container using this operator. However a left fold --- will not work well as it first unpeels the whole structure before scheduling --- a computation requiring an amount of memory proportional to the size of the --- structure. -{-# INLINE parLeft #-} -parLeft :: MonadAsync m => Stream m a -> Stream m a -> Stream m a -parLeft = joinStreamVar2 (SVarStyle Disjunction LIFO) +{-# INLINE parmerge #-} +parmerge :: MonadAsync m => Stream m a -> Stream m a -> Stream m a +parmerge = joinStreamVar2 (SVarStyle Disjunction FIFO) ------------------------------------------------------------------------------- -- Instances (only used for deriving newtype instances) @@ -655,17 +641,18 @@ instance Monad m => Monad (Stream m) where -- Alternative & MonadPlus ------------------------------------------------------------------------------ --- | `empty` represents an action that takes non-zero time to complete. Since --- all actions take non-zero time, an `Alternative` composition ('<|>') is a --- monoidal composition executing all actions in parallel, it is similar to --- '<>' except that it runs all the actions in parallel and interleaves their --- results fairly. +alt :: Stream m a -> Stream m a -> Stream m a +alt m1 m2 = Stream $ \_ stp yld -> + let stop = runStream m2 Nothing stp yld + yield = yld + in runStream m1 Nothing stop yield + instance MonadAsync m => Alternative (Stream m) where - empty = mempty - (<|>) = parAlt + empty = nil + (<|>) = alt instance MonadAsync m => MonadPlus (Stream m) where - mzero = empty + mzero = nil mplus = (<|>) ------------------------------------------------------------------------------- diff --git a/src/Streamly/Prelude.hs b/src/Streamly/Prelude.hs index a453b8a5..59f0ada4 100644 --- a/src/Streamly/Prelude.hs +++ b/src/Streamly/Prelude.hs @@ -100,8 +100,9 @@ import Prelude hiding (filter, drop, dropWhile, take, import qualified Prelude import qualified System.IO as IO -import Streamly.Core -import Streamly.Streams hiding (runStream) +import qualified Streamly.Core as S +import Streamly.Core (Stream(Stream)) +import Streamly.Streams ------------------------------------------------------------------------------ -- Construction @@ -141,7 +142,7 @@ each = fromFoldable iterate :: IsStream t => (a -> a) -> a -> t m a iterate step = fromStream . go where - go s = scons s (Just (go (step s))) + go s = S.cons s (Just (go (step s))) -- | Iterate a monadic function from a seed value, streaming the results forever iterateM :: (IsStream t, Monad m) => (a -> m a) -> a -> t m a @@ -180,7 +181,7 @@ foldr step acc m = go (toStream m) let stop = return acc yield a Nothing = return (step a acc) yield a (Just x) = go x >>= \b -> return (step a b) - in (runStream m1) Nothing stop yield + in (S.runStream m1) Nothing stop yield -- | Lazy right fold with a monadic step function. For example, to fold a -- stream into a list: @@ -197,7 +198,7 @@ foldrM step acc m = go (toStream m) let stop = return acc yield a Nothing = step a acc yield a (Just x) = (go x) >>= (step a) - in (runStream m1) Nothing stop yield + in (S.runStream m1) Nothing stop yield -- | Strict left scan with an extraction function. Like 'scanl'', but applies a -- user supplied extraction function (the third argument) at each step. This is @@ -213,7 +214,7 @@ scanx step begin done m = cons (done begin) $ fromStream $ go (toStream m) begin yield a (Just x) = let s = step acc a in yld (done s) (Just (go x s)) - in runStream m1 Nothing stop yield + in S.runStream m1 Nothing stop yield {-# DEPRECATED scan "Please use scanx instead." #-} scan :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b @@ -240,7 +241,7 @@ foldx step begin done m = get $ go (toStream m) begin get m1 = let yield a Nothing = return $ done a yield _ _ = undefined - in (runStream m1) Nothing undefined yield + in (S.runStream m1) Nothing undefined yield -- Note, this can be implemented by making a recursive call to "go", -- however that is more expensive because of unnecessary recursion @@ -252,8 +253,8 @@ foldx step begin done m = get $ go (toStream m) begin let s = step acc a in case r of Nothing -> yld s Nothing - Just x -> (runStream (go x s)) Nothing undefined yld - in (runStream m1) Nothing stop yield + Just x -> (S.runStream (go x s)) Nothing undefined yld + in (S.runStream m1) Nothing stop yield {-# DEPRECATED foldl "Please use foldx instead." #-} foldl :: (IsStream t, Monad m) @@ -275,7 +276,7 @@ foldxM step begin done m = go begin (toStream m) let stop = acc >>= done yield a Nothing = acc >>= \b -> step b a >>= done yield a (Just x) = acc >>= \b -> go (step b a) x - in (runStream m1) Nothing stop yield + in (S.runStream m1) Nothing stop yield {-# DEPRECATED foldlM "Please use foldxM instead." #-} foldlM :: (IsStream t, Monad m) @@ -294,7 +295,7 @@ uncons m = let stop = return Nothing yield a Nothing = return (Just (a, nil)) yield a (Just x) = return (Just (a, fromStream x)) - in (runStream (toStream m)) Nothing stop yield + in (S.runStream (toStream m)) Nothing stop yield -- | Write a stream of Strings to an IO Handle. toHandle :: (IsStream t, MonadIO m) => IO.Handle -> t m String -> m () @@ -304,7 +305,7 @@ toHandle h m = go (toStream m) let stop = return () yield a Nothing = liftIO (IO.hPutStrLn h a) yield a (Just x) = liftIO (IO.hPutStrLn h a) >> go x - in (runStream m1) Nothing stop yield + in (S.runStream m1) Nothing stop yield ------------------------------------------------------------------------------ -- Special folds @@ -323,7 +324,7 @@ take n m = fromStream $ go n (toStream m) go n1 m1 = Stream $ \ctx stp yld -> let yield a Nothing = yld a Nothing yield a (Just x) = yld a (Just (go (n1 - 1) x)) - in if n1 <= 0 then stp else (runStream m1) ctx stp yield + in if n1 <= 0 then stp else (S.runStream m1) ctx stp yield -- | Include only those elements that pass a predicate. {-# INLINE filter #-} @@ -334,8 +335,8 @@ filter p m = fromStream $ go (toStream m) let yield a Nothing | p a = yld a Nothing | otherwise = stp yield a (Just x) | p a = yld a (Just (go x)) - | otherwise = (runStream x) ctx stp yield - in (runStream m1) ctx stp yield + | otherwise = (S.runStream x) ctx stp yield + in (S.runStream m1) ctx stp yield -- | End the stream as soon as the predicate fails on an element. {-# INLINE takeWhile #-} @@ -347,7 +348,7 @@ takeWhile p m = fromStream $ go (toStream m) | otherwise = stp yield a (Just x) | p a = yld a (Just (go x)) | otherwise = stp - in (runStream m1) ctx stp yield + in (S.runStream m1) ctx stp yield -- | Discard first 'n' elements from the stream and take the rest. drop :: IsStream t => Int -> t m a -> t m a @@ -355,11 +356,11 @@ drop n m = fromStream $ go n (toStream m) where go n1 m1 = Stream $ \ctx stp yld -> let yield _ Nothing = stp - yield _ (Just x) = (runStream $ go (n1 - 1) x) ctx stp yld + yield _ (Just x) = (S.runStream $ go (n1 - 1) x) ctx stp yld -- Somehow "<=" check performs better than a ">" in if n1 <= 0 - then (runStream m1) ctx stp yld - else (runStream m1) ctx stp yield + then (S.runStream m1) ctx stp yld + else (S.runStream m1) ctx stp yield -- | Drop elements in the stream as long as the predicate succeeds and then -- take the rest of the stream. @@ -370,9 +371,9 @@ dropWhile p m = fromStream $ go (toStream m) go m1 = Stream $ \ctx stp yld -> let yield a Nothing | p a = stp | otherwise = yld a Nothing - yield a (Just x) | p a = (runStream x) ctx stp yield + yield a (Just x) | p a = (S.runStream x) ctx stp yield | otherwise = yld a (Just x) - in (runStream m1) ctx stp yield + in (S.runStream m1) ctx stp yield -- | Determine whether all elements of a stream satisfy a predicate. all :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool @@ -383,7 +384,7 @@ all p m = go (toStream m) | otherwise = return False yield a (Just x) | p a = go x | otherwise = return False - in (runStream m1) Nothing (return True) yield + in (S.runStream m1) Nothing (return True) yield -- | Determine whether any of the elements of a stream satisfy a predicate. any :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool @@ -394,7 +395,7 @@ any p m = go (toStream m) | otherwise = return False yield a (Just x) | p a = return True | otherwise = go x - in (runStream m1) Nothing (return False) yield + in (S.runStream m1) Nothing (return False) yield -- | Determine the sum of all elements of a stream of numbers sum :: (IsStream t, Monad m, Num a) => t m a -> m a @@ -409,7 +410,7 @@ head :: (IsStream t, Monad m) => t m a -> m (Maybe a) head m = let stop = return Nothing yield a _ = return (Just a) - in (runStream (toStream m)) Nothing stop yield + in (S.runStream (toStream m)) Nothing stop yield -- | Extract all but the first element of the stream, if any. tail :: (IsStream t, Monad m) => t m a -> m (Maybe (t m a)) @@ -417,7 +418,7 @@ tail m = let stop = return Nothing yield _ Nothing = return $ Just nil yield _ (Just t) = return $ Just $ fromStream t - in (runStream (toStream m)) Nothing stop yield + in (S.runStream (toStream m)) Nothing stop yield -- | Extract the last element of the stream, if any. {-# INLINE last #-} @@ -429,7 +430,7 @@ null :: (IsStream t, Monad m) => t m a -> m Bool null m = let stop = return True yield _ _ = return False - in (runStream (toStream m)) Nothing stop yield + in (S.runStream (toStream m)) Nothing stop yield -- | Determine whether an element is present in the stream. elem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool @@ -439,7 +440,7 @@ elem e m = go (toStream m) let stop = return False yield a Nothing = return (a == e) yield a (Just x) = if a == e then return True else go x - in (runStream m1) Nothing stop yield + in (S.runStream m1) Nothing stop yield -- | Determine whether an element is not present in the stream. notElem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool @@ -449,7 +450,7 @@ notElem e m = go (toStream m) let stop = return True yield a Nothing = return (a /= e) yield a (Just x) = if a == e then return False else go x - in (runStream m1) Nothing stop yield + in (S.runStream m1) Nothing stop yield -- | Determine the length of the stream. length :: (IsStream t, Monad m) => t m a -> m Int @@ -463,10 +464,10 @@ reverse m = fromStream $ go Nothing (toStream m) go rev rest = Stream $ \svr stp yld -> let stop = case rev of Nothing -> stp - Just str -> runStream str svr stp yld - yield a Nothing = runStream (a `scons` rev) svr stp yld - yield a (Just x) = runStream (go (Just $ a `scons` rev) x) svr stp yld - in runStream rest svr stop yield + Just str -> S.runStream str svr stp yld + yield a Nothing = S.runStream (a `S.cons` rev) svr stp yld + yield a (Just x) = S.runStream (go (Just $ a `S.cons` rev) x) svr stp yld + in S.runStream rest svr stop yield -- XXX replace the recursive "go" with continuation -- | Determine the minimum element in a stream. @@ -477,7 +478,7 @@ minimum m = go Nothing (toStream m) let stop = return r yield a Nothing = return $ min_ a r yield a (Just x) = go (min_ a r) x - in (runStream m1) Nothing stop yield + in (S.runStream m1) Nothing stop yield min_ a r = case r of Nothing -> Just a @@ -492,7 +493,7 @@ maximum m = go Nothing (toStream m) let stop = return r yield a Nothing = return $ max_ a r yield a (Just x) = go (max_ a r) x - in (runStream m1) Nothing stop yield + in (S.runStream m1) Nothing stop yield max_ a r = case r of Nothing -> Just a @@ -514,7 +515,7 @@ mapM f m = fromStream $ go (toStream m) let stop = stp yield a Nothing = f a >>= \b -> yld b Nothing yield a (Just x) = f a >>= \b -> yld b (Just (go x)) - in (runStream m1) Nothing stop yield + in (S.runStream m1) Nothing stop yield -- | Apply a monadic action to each element of the stream and discard the -- output of the action. @@ -525,7 +526,7 @@ mapM_ f m = go (toStream m) let stop = return () yield a Nothing = void (f a) yield a (Just x) = f a >> go x - in (runStream m1) Nothing stop yield + in (S.runStream m1) Nothing stop yield -- | Reduce a stream of monadic actions to a stream of the output of those -- actions. @@ -536,7 +537,7 @@ sequence m = fromStream $ go (toStream m) let stop = stp yield a Nothing = a >>= \b -> yld b Nothing yield a (Just x) = a >>= \b -> yld b (Just (go x)) - in (runStream m1) Nothing stop yield + in (S.runStream m1) Nothing stop yield -- | Generate a stream by performing an action @n@ times. replicateM :: (IsStream t, Monad m) => Int -> m a -> t m a @@ -557,13 +558,13 @@ zipWithM f m1 m2 = fromStream $ go (toStream m1) (toStream m2) where go mx my = Stream $ \_ stp yld -> do let merge a ra = - let yield2 b Nothing = (runStream (g a b)) Nothing stp yld + let yield2 b Nothing = (S.runStream (g a b)) Nothing stp yld yield2 b (Just rb) = - (runStream (g a b <> go ra rb)) Nothing stp yld - in (runStream my) Nothing stp yield2 - let yield1 a Nothing = merge a snil + (S.runStream (g a b <> go ra rb)) Nothing stp yld + in (S.runStream my) Nothing stp yield2 + let yield1 a Nothing = merge a S.nil yield1 a (Just ra) = merge a ra - (runStream mx) Nothing stp yield1 + (S.runStream mx) Nothing stp yield1 g a b = toStream $ f a b ------------------------------------------------------------------------------ @@ -577,4 +578,4 @@ zipAsyncWithM :: (IsStream t, MonadAsync m) zipAsyncWithM f m1 m2 = fromStream $ Stream $ \_ stp yld -> do ma <- async m1 mb <- async m2 - (runStream (toStream (zipWithM f ma mb))) Nothing stp yld + (S.runStream (toStream (zipWithM f ma mb))) Nothing stp yld diff --git a/src/Streamly/Streams.hs b/src/Streamly/Streams.hs index 8f699acc..d05d8b62 100644 --- a/src/Streamly/Streams.hs +++ b/src/Streamly/Streams.hs @@ -28,7 +28,7 @@ module Streamly.Streams , SVarTag (..) , SVarStyle (..) , SVar - , newEmptySVar + , S.newEmptySVar -- * Construction , nil @@ -47,6 +47,14 @@ module Streamly.Streams -- * Transformation , async + -- * Merging Streams + , append + , interleave + , asyncmerge + , parmerge + , (<=>) --deprecated + , (<|) --deprecated + -- * Stream Styles , SerialT , StreamT -- deprecated @@ -80,10 +88,6 @@ module Streamly.Streams , zipWith , zipAsyncWith - -- * Sum Style Composition - , (<=>) - , (<|) - -- * Fold Utilities -- $foldutils , foldWith @@ -103,8 +107,10 @@ import Control.Monad.State.Class (MonadState(..)) import Control.Monad.Trans.Class (MonadTrans) import Data.Semigroup (Semigroup(..)) import Prelude hiding (zipWith) -import Streamly.Core hiding (runStream) -import qualified Streamly.Core as SC +import Streamly.Core ( MonadAsync, Stream(Stream) + , SVar, SVarStyle(..) + , SVarTag(..), SVarSched(..)) +import qualified Streamly.Core as S ------------------------------------------------------------------------------ -- Types that can behave as a Stream @@ -126,7 +132,7 @@ type Streaming = IsStream -- | Represesnts an empty stream just like @[]@ represents an empty list. nil :: IsStream t => t m a -nil = fromStream snil +nil = fromStream S.nil infixr 5 `cons` @@ -139,7 +145,7 @@ infixr 5 `cons` -- [1,2,3] -- @ cons :: IsStream t => a -> t m a -> t m a -cons a r = fromStream $ scons a (Just (toStream r)) +cons a r = fromStream $ S.cons a (Just (toStream r)) infixr 5 .: @@ -194,7 +200,7 @@ fromCallback k = fromStream $ Stream $ \_ _ yld -> k (\a -> yld a Nothing) -- | Read an SVar to get a stream. fromSVar :: (MonadAsync m, IsStream t) => SVar m a -> t m a -fromSVar sv = fromStream $ fromStreamVar sv +fromSVar sv = fromStream $ S.fromStreamVar sv ------------------------------------------------------------------------------ -- Destroying a stream @@ -208,7 +214,7 @@ streamFold :: IsStream t streamFold sv step blank m = let yield a Nothing = step a Nothing yield a (Just x) = step a (Just (fromStream x)) - in (SC.runStream (toStream m)) sv blank yield + in (S.runStream (toStream m)) sv blank yield -- | Run a streaming composition, discard the results. runStream :: (Monad m, IsStream t) => t m a -> m () @@ -218,7 +224,7 @@ runStream m = go (toStream m) let stop = return () yield _ Nothing = stop yield _ (Just x) = go x - in (SC.runStream m1) Nothing stop yield + in (S.runStream m1) Nothing stop yield -- | Same as 'runStream' {-# Deprecated runStreaming "Please use runStream instead." #-} @@ -228,7 +234,7 @@ runStreaming = runStream -- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then -- be read back from the SVar using 'fromSVar'. toSVar :: (IsStream t, MonadAsync m) => SVar m a -> t m a -> m () -toSVar sv m = toStreamVar sv (toStream m) +toSVar sv m = S.toStreamVar sv (toStream m) ------------------------------------------------------------------------------ -- Transformation @@ -243,14 +249,25 @@ toSVar sv m = toStreamVar sv (toStream m) async :: (IsStream t, MonadAsync m) => t m a -> m (t m a) async m = do - sv <- newStreamVar1 (SVarStyle Disjunction LIFO) (toStream m) + sv <- S.newStreamVar1 (SVarStyle Disjunction LIFO) (toStream m) return $ fromSVar sv ------------------------------------------------------------------------------ -- SerialT ------------------------------------------------------------------------------ --- | The 'Monad' instance of 'SerialT' runs the /monadic continuation/ for each +-- | The 'Semigroup' instance of 'SerialT' appends two streams sequentially, +-- yielding all elements from the first stream, and then all elements from the +-- second stream. +-- +-- @ +-- main = ('toList' . 'serially' $ (fromFoldable [1,2]) \<\> (fromFoldable [3,4])) >>= print +-- @ +-- @ +-- [1,2,3,4] +-- @ +-- +-- The 'Monad' instance runs the /monadic continuation/ for each -- element of the stream, serially. -- -- @ @@ -278,11 +295,14 @@ async m = do -- (2,4) -- @ -- --- This behavior is exactly like a list transformer. We call the monadic code --- being run for each element of the stream a monadic continuation. In --- imperative paradigm we can think of this composition as nested @for@ loops --- and the monadic continuation is the body of the loop. The loop iterates for --- all elements of the stream. +-- This behavior of 'SerialT' is exactly like a list transformer. We call the +-- monadic code being run for each element of the stream a monadic +-- continuation. In imperative paradigm we can think of this composition as +-- nested @for@ loops and the monadic continuation is the body of the loop. The +-- loop iterates for all elements of the stream. +-- +-- Note that serial composition can be used to combine an infinite number of +-- streams as it explores only one stream at a time. -- newtype SerialT m a = SerialT {getSerialT :: Stream m a} deriving (Semigroup, Monoid, MonadTrans, MonadIO, MonadThrow) @@ -307,6 +327,17 @@ type StreamT = SerialT -- from a single base type because they depend on the Monad instance which is -- different for each type. +------------------------------------------------------------------------------ +-- Semigroup +------------------------------------------------------------------------------ + +-- | Same as the 'Semigroup' instance of 'SerialT'. Appends two streams +-- sequentially, yielding all elements from the first stream, and then all +-- elements from the second stream. +{-# INLINE append #-} +append :: IsStream t => t m a -> t m a -> t m a +append m1 m2 = fromStream $ S.append (toStream m1) (toStream m2) + ------------------------------------------------------------------------------ -- Monad ------------------------------------------------------------------------------ @@ -314,10 +345,9 @@ type StreamT = SerialT instance Monad m => Monad (SerialT m) where return = pure (SerialT (Stream m)) >>= f = SerialT $ Stream $ \_ stp yld -> - let run x = (SC.runStream x) Nothing stp yld - yield a Nothing = run $ getSerialT (f a) - yield a (Just r) = run $ getSerialT (f a) - <> getSerialT (SerialT r >>= f) + let run x = (S.runStream x) Nothing stp yld + yield a Nothing = run $ toStream (f a) + yield a (Just r) = run $ toStream $ f a <> (fromStream r >>= f) in m Nothing stp yield ------------------------------------------------------------------------------ @@ -325,7 +355,7 @@ instance Monad m => Monad (SerialT m) where ------------------------------------------------------------------------------ instance Monad m => Applicative (SerialT m) where - pure a = SerialT $ scons a Nothing + pure a = SerialT $ S.cons a Nothing (<*>) = ap ------------------------------------------------------------------------------ @@ -387,9 +417,18 @@ instance (Monad m, Floating a) => Floating (SerialT m a) where -- InterleavedT ------------------------------------------------------------------------------ --- | Like 'SerialT' but different in nesting behavior. It fairly interleaves --- the iterations of the inner and the outer loop, nesting loops in a breadth --- first manner. +-- | The 'Semigroup' instance of 'InterleavedT' interleaves two streams, +-- yielding one element from each stream alternately. +-- +-- @ +-- main = ('toList' . 'interleaving $ (fromFoldable [1,2]) \<\> (fromFoldable [3,4])) >>= print +-- @ +-- @ +-- [1,3,2,4] +-- @ +-- +-- Similarly, the 'Monad' instance fairly interleaves the iterations of the +-- inner and the outer loop, nesting loops in a breadth first manner. -- -- -- @ @@ -405,8 +444,11 @@ instance (Monad m, Floating a) => Floating (SerialT m a) where -- (2,4) -- @ -- +-- Note that interleaving composition can only combine a finite number of +-- streams as it needs to retain state for each unfinished stream. +-- newtype InterleavedT m a = InterleavedT {getInterleavedT :: Stream m a} - deriving (Semigroup, Monoid, MonadTrans, MonadIO, MonadThrow) + deriving (Monoid, MonadTrans, MonadIO, MonadThrow) deriving instance MonadAsync m => Alternative (InterleavedT m) deriving instance MonadAsync m => MonadPlus (InterleavedT m) @@ -419,14 +461,37 @@ instance IsStream InterleavedT where toStream = getInterleavedT fromStream = InterleavedT +------------------------------------------------------------------------------ +-- Semigroup +------------------------------------------------------------------------------ + +-- | Same as the 'Semigroup' instance of 'InterleavedT'. Interleaves two +-- streams, yielding one element from each stream alternately. +{-# INLINE interleave #-} +interleave :: IsStream t => t m a -> t m a -> t m a +interleave m1 m2 = fromStream $ S.interleave (toStream m1) (toStream m2) + +instance Semigroup (InterleavedT m a) where + (<>) = interleave + +infixr 5 <=> + +-- | Same as 'interleave'. +{-# Deprecated (<=>) "Please use '<>' of InterleavedT or 'interleave' instead." #-} +{-# INLINE (<=>) #-} +(<=>) :: IsStream t => t m a -> t m a -> t m a +(<=>) = interleave + +------------------------------------------------------------------------------ +-- Monad +------------------------------------------------------------------------------ + instance Monad m => Monad (InterleavedT m) where return = pure (InterleavedT (Stream m)) >>= f = InterleavedT $ Stream $ \_ stp yld -> - let run x = (SC.runStream x) Nothing stp yld - yield a Nothing = run $ getInterleavedT (f a) - yield a (Just r) = run $ getInterleavedT (f a) - `interleave` - getInterleavedT (InterleavedT r >>= f) + let run x = (S.runStream x) Nothing stp yld + yield a Nothing = run $ toStream (f a) + yield a (Just r) = run $ toStream $ f a <> (fromStream r >>= f) in m Nothing stp yield ------------------------------------------------------------------------------ @@ -434,7 +499,7 @@ instance Monad m => Monad (InterleavedT m) where ------------------------------------------------------------------------------ instance Monad m => Applicative (InterleavedT m) where - pure a = InterleavedT $ scons a Nothing + pure a = InterleavedT $ S.cons a Nothing (<*>) = ap ------------------------------------------------------------------------------ @@ -496,9 +561,24 @@ instance (Monad m, Floating a) => Floating (InterleavedT m a) where -- AsyncT ------------------------------------------------------------------------------ --- | Like 'SerialT' but /may/ run each iteration concurrently using demand --- driven concurrency. More concurrent iterations are started only if the --- previous iterations are not able to produce enough output for the consumer. +-- | Left biased concurrent composition. +-- +-- The Semigroup instance of 'AsyncT' concurrently /merges/ streams in a left +-- biased manner. It keeps yielding elements from the left stream as long as +-- it can. If the left stream blocks or cannot keep up with the pace of the +-- consumer it can concurrently yield from the stream on the right as well. +-- +-- @ +-- main = ('toList' . 'asyncly' $ (fromFoldable [1,2]) \<> (fromFoldable [3,4])) >>= print +-- @ +-- @ +-- [1,2,3,4] +-- @ +-- +-- Similarly, the monad instance of 'AsyncT' /may/ run each iteration +-- concurrently using demand driven concurrency. More concurrent iterations +-- are started only if the previous iterations are not able to produce enough +-- output for the consumer. -- -- @ -- import "Streamly" @@ -517,8 +597,12 @@ instance (Monad m, Floating a) => Floating (InterleavedT m a) where -- @ -- -- All iterations may run in the same thread if they do not block. +-- +-- Note that this composition can be used to combine infinite number of streams +-- as it explores only a bounded number of streams at a time. +-- newtype AsyncT m a = AsyncT {getAsyncT :: Stream m a} - deriving (Semigroup, Monoid, MonadTrans) + deriving (Monoid, MonadTrans) deriving instance MonadAsync m => Alternative (AsyncT m) deriving instance MonadAsync m => MonadPlus (AsyncT m) @@ -533,6 +617,29 @@ instance IsStream AsyncT where toStream = getAsyncT fromStream = AsyncT +------------------------------------------------------------------------------ +-- Semigroup +------------------------------------------------------------------------------ + +-- | Same as the 'Semigroup' instance of 'AsyncT'. Merges two streams +-- concurrently, preferring the elements from the left one when available. +{-# INLINE asyncmerge #-} +asyncmerge :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a +asyncmerge m1 m2 = fromStream $ S.asyncmerge (toStream m1) (toStream m2) + +instance MonadAsync m => Semigroup (AsyncT m a) where + (<>) = asyncmerge + +-- | Same as 'asyncmerge'. +{-# DEPRECATED (<|) "Please use '<>' of AsyncT or 'asyncmerge' instead." #-} +{-# INLINE (<|) #-} +(<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a +(<|) = asyncmerge + +------------------------------------------------------------------------------ +-- Monad +------------------------------------------------------------------------------ + {-# INLINE parbind #-} parbind :: (forall c. Stream m c -> Stream m c -> Stream m c) @@ -543,7 +650,7 @@ parbind par m f = go m where go (Stream g) = Stream $ \ctx stp yld -> - let run x = (SC.runStream x) ctx stp yld + let run x = (S.runStream x) ctx stp yld yield a Nothing = run $ f a yield a (Just r) = run $ f a `par` go r in g Nothing stp yield @@ -552,14 +659,14 @@ instance MonadAsync m => Monad (AsyncT m) where return = pure (AsyncT m) >>= f = AsyncT $ parbind par m g where g x = getAsyncT (f x) - par = joinStreamVar2 (SVarStyle Conjunction LIFO) + par = S.joinStreamVar2 (SVarStyle Conjunction LIFO) ------------------------------------------------------------------------------ -- Applicative ------------------------------------------------------------------------------ instance MonadAsync m => Applicative (AsyncT m) where - pure a = AsyncT $ scons a Nothing + pure a = AsyncT $ S.cons a Nothing (<*>) = ap ------------------------------------------------------------------------------ @@ -620,8 +727,20 @@ instance (MonadAsync m, Floating a) => Floating (AsyncT m a) where -- ParallelT ------------------------------------------------------------------------------ --- | Like 'SerialT' but runs /all/ iterations fairly concurrently using a round --- robin scheduling. +-- | Round robin concurrent composition. +-- +-- The Semigroup instance of 'ParallelT' concurrently /merges/ streams in a +-- round robin fashion, yielding elements from both streams alternately. +-- +-- @ +-- main = ('toList' . 'parallely' $ (fromFoldable [1,2]) \<> (fromFoldable [3,4])) >>= print +-- @ +-- @ +-- [1,3,2,4] +-- @ +-- +-- Similarly, the 'Monad' instance of 'ParallelT' runs /all/ iterations fairly +-- concurrently using a round robin scheduling. -- -- @ -- import "Streamly" @@ -641,8 +760,12 @@ instance (MonadAsync m, Floating a) => Floating (AsyncT m a) where -- -- Unlike 'AsyncT' all iterations are guaranteed to run fairly concurrently, -- unconditionally. +-- +-- Note that round robin composition can only combine a finite number of +-- streams as it needs to retain state for each unfinished stream. +-- newtype ParallelT m a = ParallelT {getParallelT :: Stream m a} - deriving (Semigroup, Monoid, MonadTrans) + deriving (Monoid, MonadTrans) deriving instance MonadAsync m => Alternative (ParallelT m) deriving instance MonadAsync m => MonadPlus (ParallelT m) @@ -657,18 +780,35 @@ instance IsStream ParallelT where toStream = getParallelT fromStream = ParallelT +------------------------------------------------------------------------------ +-- Semigroup +------------------------------------------------------------------------------ + +-- | Same as the 'Semigroup' instance of 'ParallelT'. Merges two streams +-- concurrently choosing elements from both fairly. +{-# INLINE parmerge #-} +parmerge :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a +parmerge m1 m2 = fromStream $ S.parmerge (toStream m1) (toStream m2) + +instance MonadAsync m => Semigroup (ParallelT m a) where + (<>) = parmerge + +------------------------------------------------------------------------------ +-- Monad +------------------------------------------------------------------------------ + instance MonadAsync m => Monad (ParallelT m) where return = pure (ParallelT m) >>= f = ParallelT $ parbind par m g where g x = getParallelT (f x) - par = joinStreamVar2 (SVarStyle Conjunction FIFO) + par = S.joinStreamVar2 (SVarStyle Conjunction FIFO) ------------------------------------------------------------------------------ -- Applicative ------------------------------------------------------------------------------ instance MonadAsync m => Applicative (ParallelT m) where - pure a = ParallelT $ scons a Nothing + pure a = ParallelT $ S.cons a Nothing (<*>) = ap ------------------------------------------------------------------------------ @@ -738,10 +878,10 @@ zipWith f m1 m2 = fromStream $ go (toStream m1) (toStream m2) let merge a ra = let yield2 b Nothing = yld (f a b) Nothing yield2 b (Just rb) = yld (f a b) (Just (go ra rb)) - in (SC.runStream my) Nothing stp yield2 - let yield1 a Nothing = merge a snil + in (S.runStream my) Nothing stp yield2 + let yield1 a Nothing = merge a S.nil yield1 a (Just ra) = merge a ra - (SC.runStream mx) Nothing stp yield1 + (S.runStream mx) Nothing stp yield1 -- | The applicative instance of 'ZipSerial' zips a number of streams serially -- i.e. it produces one element from each stream serially and then zips all @@ -758,8 +898,9 @@ zipWith f m1 m2 = fromStream $ go (toStream m1) (toStream m2) -- [(1,3,5),(2,4,6)] -- @ -- --- This applicative operation can be seen as the zipping equivalent of --- interleaving with '<=>'. +-- The 'Semigroup' instance of this type works the same way as that of +-- 'SerialT'. +-- newtype ZipSerial m a = ZipSerial {getZipSerial :: Stream m a} deriving (Semigroup, Monoid) @@ -777,7 +918,7 @@ instance Monad m => Functor (ZipSerial m) where in m Nothing stp yield instance Monad m => Applicative (ZipSerial m) where - pure = ZipSerial . srepeat + pure = ZipSerial . S.repeat (<*>) = zipWith id instance IsStream ZipSerial where @@ -835,7 +976,7 @@ zipAsyncWith :: (IsStream t, MonadAsync m) zipAsyncWith f m1 m2 = fromStream $ Stream $ \_ stp yld -> do ma <- async m1 mb <- async m2 - (SC.runStream (toStream (zipWith f ma mb))) Nothing stp yld + (S.runStream (toStream (zipWith f ma mb))) Nothing stp yld -- | Like 'ZipSerial' but zips in parallel, it generates all the elements to -- be zipped concurrently. @@ -850,8 +991,9 @@ zipAsyncWith f m1 m2 = fromStream $ Stream $ \_ stp yld -> do -- [(1,3,5),(2,4,6)] -- @ -- --- This applicative operation can be seen as the zipping equivalent of --- parallel composition with '<|>'. +-- The 'Semigroup' instance of this type works the same way as that of +-- 'SerialT'. +-- newtype ZipAsync m a = ZipAsync {getZipAsync :: Stream m a} deriving (Semigroup, Monoid) @@ -865,7 +1007,7 @@ instance Monad m => Functor (ZipAsync m) where in m Nothing stp yield instance MonadAsync m => Applicative (ZipAsync m) where - pure = ZipAsync . srepeat + pure = ZipAsync . S.repeat (<*>) = zipAsyncWith id instance IsStream ZipAsync where @@ -982,51 +1124,6 @@ runZipStream = runZipSerial runZipAsync :: Monad m => ZipAsync m a -> m () runZipAsync = runStream ------------------------------------------------------------------------------- --- Sum Style Composition ------------------------------------------------------------------------------- - -infixr 5 <=> - --- | Sequential interleaved composition, in contrast to '<>' this operator --- fairly interleaves two streams instead of appending them; yielding one --- element from each stream alternately. --- --- @ --- main = ('toList' . 'serially' $ (return 1 <> return 2) \<=\> (return 3 <> return 4)) >>= print --- @ --- @ --- [1,3,2,4] --- @ --- --- This operator corresponds to the 'InterleavedT' style. Unlike '<>', this --- operator cannot be used to fold infinite containers since that might --- accumulate too many partially drained streams. To be clear, it can combine --- infinite streams but not infinite number of streams. -{-# INLINE (<=>) #-} -(<=>) :: IsStream t => t m a -> t m a -> t m a -m1 <=> m2 = fromStream $ interleave (toStream m1) (toStream m2) - --- | Demand driven concurrent composition. In contrast to '<|>' this operator --- concurrently "merges" streams in a left biased manner rather than fairly --- interleaving them. It keeps yielding from the stream on the left as long as --- it can. If the left stream blocks or cannot keep up with the pace of the --- consumer it can concurrently yield from the stream on the right in parallel. --- --- @ --- main = ('toList' . 'serially' $ (return 1 <> return 2) \<| (return 3 <> return 4)) >>= print --- @ --- @ --- [1,2,3,4] --- @ --- --- Unlike '<|>' it can be used to fold infinite containers of streams. This --- operator corresponds to the 'AsyncT' type for product style composition. --- -{-# INLINE (<|) #-} -(<|) :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a -m1 <| m2 = fromStream $ parLeft (toStream m1) (toStream m2) - ------------------------------------------------------------------------------ -- Fold Utilities ------------------------------------------------------------------------------ diff --git a/test/Main.hs b/test/Main.hs index be130f18..cad50db5 100644 --- a/test/Main.hs +++ b/test/Main.hs @@ -9,8 +9,12 @@ import Data.List (sort) import Test.Hspec import Streamly +import Streamly.Prelude ((.:), nil) import qualified Streamly.Prelude as A +singleton :: IsStream t => a -> t m a +singleton a = a .: nil + toListSerial :: SerialT IO a -> IO [a] toListSerial = A.toList . serially @@ -20,8 +24,8 @@ toListInterleaved = A.toList . interleaving toListAsync :: AsyncT IO a -> IO [a] toListAsync = A.toList . asyncly -toListParallel :: Ord a => ParallelT IO a -> IO [a] -toListParallel = fmap sort . A.toList . parallely +toListParallel :: ParallelT IO a -> IO [a] +toListParallel = A.toList . parallely main :: IO () main = hspec $ do @@ -50,8 +54,9 @@ main = hspec $ do it "fmap on composed (<>)" $ (toListSerial $ fmap (+1) (return 1 <> return 2)) `shouldReturn` ([2,3] :: [Int]) - it "fmap on composed (<|>)" $ - (toListSerial $ fmap (+1) (return 1 <|> return 2)) + + it "fmap on composed (<>)" $ + ((toListParallel $ fmap (+1) (return 1 <> return 2)) >>= return . sort) `shouldReturn` ([2,3] :: [Int]) --------------------------------------------------------------------------- @@ -70,43 +75,43 @@ main = hspec $ do `shouldReturn` ([(1,2),(1,3)] :: [(Int, Int)]) it "Apply - parallel composed first argument" $ - (toListSerial $ (,) <$> (return 1 <|> return 2) <*> (return 3)) + (toListParallel ((,) <$> (return 1 <> return 2) <*> (return 3)) >>= return . sort) `shouldReturn` ([(1,3),(2,3)] :: [(Int, Int)]) it "Apply - parallel composed second argument" $ - (toListSerial $ (,) <$> (return 1) <*> (return 2 <|> return 3)) + (toListParallel ((,) <$> (return 1) <*> (return 2 <> return 3)) >>= return . sort) `shouldReturn` ([(1,2),(1,3)] :: [(Int, Int)]) --------------------------------------------------------------------------- -- Monoidal Compositions --------------------------------------------------------------------------- - describe "Serial Composition (<>)" $ compose (<>) id - describe "Serial Composition (mappend)" $ compose mappend id - describe "Interleaved Composition (<>)" $ compose (<=>) sort - describe "Left biased parallel Composition (<|)" $ compose (<|) sort - describe "Fair parallel Composition (<|>)" $ compose (<|>) sort - describe "Fair parallel Composition (mplus)" $ compose mplus sort - + describe "Serial Composition" $ compose serially mempty id + describe "Interleaved Composition" $ compose interleaving mempty sort + describe "Left biased parallel Composition" $ compose asyncly mempty sort + describe "Fair parallel Composition" $ compose parallely mempty sort + describe "Semigroup Composition for ZipSerial" $ compose zipping mempty id + describe "Semigroup Composition for ZipAsync" $ compose zippingAsync mempty id + -- XXX need to check alternative compositions as well --------------------------------------------------------------------------- -- Monoidal Composition ordering checks --------------------------------------------------------------------------- - describe "Serial interleaved ordering check (<=>)" $ interleaveCheck (<=>) - describe "Parallel interleaved ordering check (<|>)" $ interleaveCheck (<|>) - describe "Left biased parallel time order check" $ parallelCheck (<|) - describe "Fair parallel time order check" $ parallelCheck (<|>) + describe "Serial interleaved ordering check" $ interleaveCheck interleaving + describe "Parallel interleaved ordering check" $ interleaveCheck parallely + describe "Left biased parallel time order check" $ parallelCheck asyncly + describe "Fair parallel time order check" $ parallelCheck parallely --------------------------------------------------------------------------- -- TBD Monoidal composition combinations --------------------------------------------------------------------------- -- TBD need more such combinations to be tested. - describe "<> and <>" $ composeAndComposeSimple (<>) (<>) (cycle [[1 .. 9]]) + describe "<> and <>" $ composeAndComposeSimple serially serially (cycle [[1 .. 9]]) describe "<> and <=>" $ composeAndComposeSimple - (<>) - (<=>) + serially + interleaving ([ [1 .. 9] , [1 .. 9] , [1, 3, 2, 4, 6, 5, 7, 9, 8] @@ -114,8 +119,8 @@ main = hspec $ do ]) describe "<=> and <=>" $ composeAndComposeSimple - (<=>) - (<=>) + interleaving + interleaving ([ [1, 4, 2, 7, 3, 5, 8, 6, 9] , [1, 7, 4, 8, 2, 9, 5, 3, 6] , [1, 4, 3, 7, 2, 6, 9, 5, 8] @@ -123,8 +128,8 @@ main = hspec $ do ]) describe "<=> and <>" $ composeAndComposeSimple - (<=>) - (<>) + interleaving + serially ([ [1, 4, 2, 7, 3, 5, 8, 6, 9] , [1, 7, 4, 8, 2, 9, 5, 3, 6] , [1, 4, 2, 7, 3, 5, 8, 6, 9] @@ -132,6 +137,9 @@ main = hspec $ do ]) describe "Nested parallel and serial compositions" $ do + let t = timed + p = adapt . parallely + s = adapt . serially {- -- This is not correct, the result can also be [4,4,8,0,8,0,2,2] -- because of parallelism of [8,0] and [8,0]. @@ -143,10 +151,9 @@ main = hspec $ do `shouldReturn` ([4,4,8,8,0,0,2,2]) -} it "Nest <|>, <>, <|> (2)" $ - let t = timed - in toListSerial ( - ((t 4 <|> t 8) <> (t 1 <|> t 2)) - <|> ((t 4 <|> t 8) <> (t 1 <|> t 2))) + (A.toList . parallely) ( + s (p (t 4 <> t 8) <> p (t 1 <> t 2)) + <> s (p (t 4 <> t 8) <> p (t 1 <> t 2))) `shouldReturn` ([4,4,8,8,1,1,2,2]) -- FIXME: These two keep failing intermittently on Mac OS X -- Need to examine and fix the tests. @@ -165,55 +172,82 @@ main = hspec $ do `shouldReturn` ([4,4,1,1,8,2,9,2]) -} it "Nest <|>, <|>, <|>" $ - let t = timed - in toListSerial ( - ((t 4 <|> t 8) <|> (t 0 <|> t 2)) - <|> ((t 4 <|> t 8) <|> (t 0 <|> t 2))) + (A.toList . parallely) ( + ((t 4 <> t 8) <> (t 0 <> t 2)) + <> ((t 4 <> t 8) <> (t 0 <> t 2))) `shouldReturn` ([0,0,2,2,4,4,8,8]) --------------------------------------------------------------------------- -- Monoidal composition recursion loops --------------------------------------------------------------------------- - describe "Serial loops (<>)" $ loops (<>) id reverse - describe "Left biased parallel loops (<|)" $ loops (<|) sort sort - describe "Fair parallel loops (<|>)" $ loops (<|>) sort sort + describe "Serial loops (<>)" $ loops serially id reverse + describe "Left biased parallel loops (<|)" $ loops asyncly sort sort + describe "Fair parallel loops (<|>)" $ loops parallely sort sort --------------------------------------------------------------------------- -- Bind and monoidal composition combinations --------------------------------------------------------------------------- - forM_ [(<>), (<=>), (<|), (<|>)] $ \g -> - describe "Bind and compose" $ bindAndComposeSimple toListSerial g + describe "Bind and compose1" $ bindAndComposeSimple serially serially + describe "Bind and compose2" $ bindAndComposeSimple serially interleaving + describe "Bind and compose3" $ bindAndComposeSimple serially asyncly + describe "Bind and compose4" $ bindAndComposeSimple serially parallely - forM_ [(<>), (<=>), (<|), (<|>)] $ \g -> - describe "Bind and compose" $ bindAndComposeSimple toListInterleaved g + describe "Bind and compose1" $ bindAndComposeSimple interleaving serially + describe "Bind and compose2" $ bindAndComposeSimple interleaving interleaving + describe "Bind and compose3" $ bindAndComposeSimple interleaving asyncly + describe "Bind and compose4" $ bindAndComposeSimple interleaving parallely - forM_ [(<>), (<=>), (<|), (<|>)] $ \g -> - describe "Bind and compose" $ bindAndComposeSimple toListAsync g + describe "Bind and compose1" $ bindAndComposeSimple asyncly serially + describe "Bind and compose2" $ bindAndComposeSimple asyncly interleaving + describe "Bind and compose3" $ bindAndComposeSimple asyncly asyncly + describe "Bind and compose4" $ bindAndComposeSimple asyncly parallely - forM_ [(<>), (<=>), (<|), (<|>)] $ \g -> - describe "Bind and compose" $ bindAndComposeSimple toListParallel g + describe "Bind and compose1" $ bindAndComposeSimple parallely serially + describe "Bind and compose2" $ bindAndComposeSimple parallely interleaving + describe "Bind and compose3" $ bindAndComposeSimple parallely asyncly + describe "Bind and compose4" $ bindAndComposeSimple parallely parallely - let fldr f = foldr f empty - fldl f = foldl f empty - in do - forM_ [(<>), (<=>), (<|), (<|>)] $ \g -> - forM_ [fldr, fldl] $ \k -> - describe "Bind and compose" $ - bindAndComposeHierarchy toListSerial (k g) - forM_ [(<>), (<=>), (<|), (<|>)] $ \g -> - forM_ [fldr, fldl] $ \k -> - describe "Bind and compose" $ - bindAndComposeHierarchy toListInterleaved (k g) - forM_ [(<>), (<=>), (<|), (<|>)] $ \g -> - forM_ [fldr, fldl] $ \k -> - describe "Bind and compose" $ - bindAndComposeHierarchy toListAsync (k g) - forM_ [(<>), (<=>), (<|), (<|>)] $ \g -> - forM_ [fldr, fldl] $ \k -> - describe "Bind and compose" $ - bindAndComposeHierarchy toListParallel (k g) + let fldr, fldl :: (IsStream t, Semigroup (t IO Int)) => [t IO Int] -> t IO Int + fldr = foldr (<>) nil + fldl = foldl (<>) nil + + forM_ [fldr, fldl] $ \k -> + describe "Bind and compose" $ bindAndComposeHierarchy serially serially k + forM_ [fldr, fldl] $ \k -> + describe "Bind and compose" $ bindAndComposeHierarchy serially interleaving k + forM_ [fldr, fldl] $ \k -> + describe "Bind and compose" $ bindAndComposeHierarchy serially asyncly k + forM_ [fldr, fldl] $ \k -> + describe "Bind and compose" $ bindAndComposeHierarchy serially parallely k + + forM_ [fldr, fldl] $ \k -> + describe "Bind and compose" $ bindAndComposeHierarchy interleaving serially k + forM_ [fldr, fldl] $ \k -> + describe "Bind and compose" $ bindAndComposeHierarchy interleaving interleaving k + forM_ [fldr, fldl] $ \k -> + describe "Bind and compose" $ bindAndComposeHierarchy interleaving asyncly k + forM_ [fldr, fldl] $ \k -> + describe "Bind and compose" $ bindAndComposeHierarchy interleaving parallely k + + forM_ [fldr, fldl] $ \k -> + describe "Bind and compose" $ bindAndComposeHierarchy asyncly serially k + forM_ [fldr, fldl] $ \k -> + describe "Bind and compose" $ bindAndComposeHierarchy asyncly interleaving k + forM_ [fldr, fldl] $ \k -> + describe "Bind and compose" $ bindAndComposeHierarchy asyncly asyncly k + forM_ [fldr, fldl] $ \k -> + describe "Bind and compose" $ bindAndComposeHierarchy asyncly parallely k + + forM_ [fldr, fldl] $ \k -> + describe "Bind and compose" $ bindAndComposeHierarchy parallely serially k + forM_ [fldr, fldl] $ \k -> + describe "Bind and compose" $ bindAndComposeHierarchy parallely interleaving k + forM_ [fldr, fldl] $ \k -> + describe "Bind and compose" $ bindAndComposeHierarchy parallely asyncly k + forM_ [fldr, fldl] $ \k -> + describe "Bind and compose" $ bindAndComposeHierarchy parallely parallely k -- Nest two lists using different styles of product compositions it "Nests two streams using monadic serial composition" nestTwoSerial @@ -229,8 +263,7 @@ main = hspec $ do it "Nests two streams using Num serial composition" nestTwoSerialNum it "Nests two streams using Num interleaved composition" nestTwoInterleavedNum it "Nests two streams using Num async composition" nestTwoAsyncNum - -- This test fails intermittently, need to investigate - -- it "Nests two streams using Num parallel composition" nestTwoParallelNum + it "Nests two streams using Num parallel composition" nestTwoParallelNum --------------------------------------------------------------------------- -- TBD Bind and Bind combinations @@ -291,182 +324,183 @@ nestTwoAsync :: Expectation nestTwoAsync = let s1 = foldMapWith (<>) return [1..4] s2 = foldMapWith (<>) return [5..8] - in toListAsync (do + in (toListAsync (do x <- s1 y <- s2 return (x + y) - ) `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int]) + ) >>= return . sort) + `shouldReturn` sort ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int]) nestTwoAsyncApp :: Expectation nestTwoAsyncApp = let s1 = foldMapWith (<>) return [1..4] s2 = foldMapWith (<>) return [5..8] - in toListAsync ((+) <$> s1 <*> s2) - `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int]) + in (toListAsync ((+) <$> s1 <*> s2) >>= return . sort) + `shouldReturn` sort ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int]) nestTwoAsyncNum :: Expectation nestTwoAsyncNum = let s1 = foldMapWith (<>) return [1..4] s2 = foldMapWith (<>) return [5..8] - in toListAsync (s1 + s2) - `shouldReturn` ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int]) + in (toListAsync (s1 + s2) >>= return . sort) + `shouldReturn` sort ([6,7,8,9,7,8,9,10,8,9,10,11,9,10,11,12] :: [Int]) nestTwoParallel :: Expectation nestTwoParallel = let s1 = foldMapWith (<>) return [1..4] s2 = foldMapWith (<>) return [5..8] - in toListParallel (do + in (toListParallel (do x <- s1 y <- s2 return (x + y) - ) `shouldReturn` ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int]) + ) >>= return . sort) + `shouldReturn` sort ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int]) nestTwoParallelApp :: Expectation nestTwoParallelApp = let s1 = foldMapWith (<>) return [1..4] s2 = foldMapWith (<>) return [5..8] - in toListParallel ((+) <$> s1 <*> s2) - `shouldReturn` ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int]) + in (toListParallel ((+) <$> s1 <*> s2) >>= return . sort) + `shouldReturn` sort ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int]) -{- nestTwoParallelNum :: Expectation nestTwoParallelNum = let s1 = foldMapWith (<>) return [1..4] s2 = foldMapWith (<>) return [5..8] - in toListParallel (s1 + s2) - `shouldReturn` ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int]) --} + in (toListParallel (s1 + s2) >>= return . sort) + `shouldReturn` sort ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int]) -timed :: Int -> SerialT IO Int +timed :: MonadIO (t IO) => Int -> t IO Int timed x = liftIO (threadDelay (x * 100000)) >> return x -interleaveCheck - :: (SerialT IO Int -> SerialT IO Int -> SerialT IO Int) - -> Spec -interleaveCheck f = +interleaveCheck :: (IsStream t, Semigroup (t IO Int)) + => (t IO Int -> t IO Int) -> Spec +interleaveCheck t = it "Interleave four" $ - toListSerial ((return 0 <> return 1) `f` (return 100 <> return 101)) + (A.toList . t) ((singleton 0 <> singleton 1) <> (singleton 100 <> singleton 101)) `shouldReturn` ([0, 100, 1, 101]) -parallelCheck :: (SerialT IO Int -> SerialT IO Int -> SerialT IO Int) -> Spec -parallelCheck f = do +parallelCheck :: (IsStream t, Semigroup (t IO Int), MonadIO (t IO)) + => (t IO Int -> t IO Int) -> Spec +parallelCheck t = do it "Parallel ordering left associated" $ - toListSerial (((event 4 `f` event 3) `f` event 2) `f` event 1) + (A.toList . t) (((event 4 <> event 3) <> event 2) <> event 1) `shouldReturn` ([1..4]) it "Parallel ordering right associated" $ - toListSerial (event 4 `f` (event 3 `f` (event 2 `f` event 1))) + (A.toList . t) (event 4 <> (event 3 <> (event 2 <> event 1))) `shouldReturn` ([1..4]) where event n = (liftIO $ threadDelay (n * 100000)) >> (return n) -compose - :: (SerialT IO Int -> SerialT IO Int -> SerialT IO Int) - -> ([Int] -> [Int]) - -> Spec -compose f srt = do +compose :: (IsStream t, Semigroup (t IO Int)) + => (t IO Int -> t IO Int) -> t IO Int -> ([Int] -> [Int]) -> Spec +compose t z srt = do + -- XXX these should get covered by the property tests it "Compose mempty, mempty" $ - (tl (mempty `f` mempty)) `shouldReturn` [] - it "Compose empty, empty" $ - (tl (empty `f` empty)) `shouldReturn` [] + (tl (z <> z)) `shouldReturn` ([] :: [Int]) it "Compose empty at the beginning" $ - (tl $ (empty `f` return 1)) `shouldReturn` [1] + (tl $ (z <> singleton 1)) `shouldReturn` [1] it "Compose empty at the end" $ - (tl $ (return 1 `f` empty)) `shouldReturn` [1] + (tl $ (singleton 1 <> z)) `shouldReturn` [1] it "Compose two" $ - (tl (return 0 `f` return 1) >>= return . srt) + (tl (singleton 0 <> singleton 1) >>= return . srt) `shouldReturn` [0, 1] + it "Compose many" $ + ((tl $ forEachWith (<>) [1..100] singleton) >>= return . srt) + `shouldReturn` [1..100] + + -- These are not covered by the property tests it "Compose three - empty in the middle" $ - ((tl $ (return 0 `f` empty `f` return 1)) >>= return . srt) + ((tl $ (singleton 0 <> z <> singleton 1)) >>= return . srt) `shouldReturn` [0, 1] it "Compose left associated" $ - ((tl $ (((return 0 `f` return 1) `f` return 2) `f` return 3)) + ((tl $ (((singleton 0 <> singleton 1) <> singleton 2) <> singleton 3)) >>= return . srt) `shouldReturn` [0, 1, 2, 3] it "Compose right associated" $ - ((tl $ (return 0 `f` (return 1 `f` (return 2 `f` return 3)))) + ((tl $ (singleton 0 <> (singleton 1 <> (singleton 2 <> singleton 3)))) >>= return . srt) `shouldReturn` [0, 1, 2, 3] - it "Compose many" $ - ((tl $ forEachWith f [1..100] return) >>= return . srt) - `shouldReturn` [1..100] it "Compose hierarchical (multiple levels)" $ - ((tl $ (((return 0 `f` return 1) `f` (return 2 `f` return 3)) - `f` ((return 4 `f` return 5) `f` (return 6 `f` return 7))) + ((tl $ (((singleton 0 <> singleton 1) <> (singleton 2 <> singleton 3)) + <> ((singleton 4 <> singleton 5) <> (singleton 6 <> singleton 7))) ) >>= return . srt) `shouldReturn` [0..7] - where tl = toListSerial + where tl = A.toList . t composeAndComposeSimple - :: (SerialT IO Int -> SerialT IO Int -> SerialT IO Int) - -> (SerialT IO Int -> SerialT IO Int -> SerialT IO Int) - -> [[Int]] - -> Spec -composeAndComposeSimple f g answer = do + :: ( IsStream t1, Semigroup (t1 IO Int) + , IsStream t2, Semigroup (t2 IO Int), Monoid (t2 IO Int), Monad (t2 IO)) + => (t1 IO Int -> t1 IO Int) + -> (t2 IO Int -> t2 IO Int) + -> [[Int]] -> Spec +composeAndComposeSimple t1 t2 answer = do + let rfold = adapt . t2 . foldMapWith (<>) return it "Compose right associated outer expr, right folded inner" $ - let fold = foldMapWith g return - in (toListSerial (fold [1,2,3] `f` (fold [4,5,6] `f` fold [7,8,9]))) + ((A.toList. t1) (rfold [1,2,3] <> (rfold [4,5,6] <> rfold [7,8,9]))) `shouldReturn` (answer !! 0) it "Compose left associated outer expr, right folded inner" $ - let fold = foldMapWith g return - in (toListSerial ((fold [1,2,3] `f` fold [4,5,6]) `f` fold [7,8,9])) + ((A.toList . t1) ((rfold [1,2,3] <> rfold [4,5,6]) <> rfold [7,8,9])) `shouldReturn` (answer !! 1) + let lfold xs = adapt $ t2 $ foldl (<>) mempty $ map return xs it "Compose right associated outer expr, left folded inner" $ - let fold xs = foldl g empty $ map return xs - in (toListSerial (fold [1,2,3] `f` (fold [4,5,6] `f` fold [7,8,9]))) + ((A.toList . t1) (lfold [1,2,3] <> (lfold [4,5,6] <> lfold [7,8,9]))) `shouldReturn` (answer !! 2) it "Compose left associated outer expr, left folded inner" $ - let fold xs = foldl g empty $ map return xs - in (toListSerial ((fold [1,2,3] `f` fold [4,5,6]) `f` fold [7,8,9])) + ((A.toList . t1) ((lfold [1,2,3] <> lfold [4,5,6]) <> lfold [7,8,9])) `shouldReturn` (answer !! 3) - loops - :: (SerialT IO Int -> SerialT IO Int -> SerialT IO Int) + :: (IsStream t, Semigroup (t IO Int), MonadIO (t IO)) + => (t IO Int -> t IO Int) -> ([Int] -> [Int]) -> ([Int] -> [Int]) -> Spec -loops f tsrt hsrt = do - it "Tail recursive loop" $ (toListSerial (loopTail 0) >>= return . tsrt) +loops t tsrt hsrt = do + it "Tail recursive loop" $ (A.toList (loopTail 0) >>= return . tsrt) `shouldReturn` [0..3] - it "Head recursive loop" $ (toListSerial (loopHead 0) >>= return . hsrt) + it "Head recursive loop" $ (A.toList (loopHead 0) >>= return . hsrt) `shouldReturn` [0..3] where loopHead x = do -- this print line is important for the test (causes a bind) liftIO $ putStrLn "LoopHead..." - (if x < 3 then loopHead (x + 1) else empty) `f` return x + t $ (if x < 3 then loopHead (x + 1) else nil) <> return x loopTail x = do -- this print line is important for the test (causes a bind) liftIO $ putStrLn "LoopTail..." - return x `f` (if x < 3 then loopTail (x + 1) else empty) + t $ return x <> (if x < 3 then loopTail (x + 1) else nil) bindAndComposeSimple - :: (IsStream t, Alternative (t IO), Monad (t IO)) - => (forall a. Ord a => t IO a -> IO [a]) - -> (t IO Int -> t IO Int -> t IO Int) + :: ( IsStream t1, IsStream t2, Semigroup (t2 IO Int), Monad (t2 IO)) + => (t1 IO Int -> t1 IO Int) + -> (t2 IO Int -> t2 IO Int) -> Spec -bindAndComposeSimple tl g = do +bindAndComposeSimple t1 t2 = do + -- XXX need a bind in the body of forEachWith instead of a simple return it "Compose many (right fold) with bind" $ - (tl (forEachWith g [1..10 :: Int] $ \x -> return x `f` (return . id)) + ((A.toList . t1) (adapt . t2 $ forEachWith (<>) [1..10 :: Int] return) >>= return . sort) `shouldReturn` [1..10] it "Compose many (left fold) with bind" $ - let forL xs k = foldl g empty $ map k xs - in (tl (forL [1..10 :: Int] $ \x -> return x `f` (return . id)) + let forL xs k = foldl (<>) nil $ map k xs + in ((A.toList . t1) (adapt . t2 $ forL [1..10 :: Int] return) >>= return . sort) `shouldReturn` [1..10] - where f = (>>=) bindAndComposeHierarchy - :: Monad (s IO) => (forall a. Ord a => s IO a -> IO [a]) - -> ([s IO Int] -> s IO Int) + :: ( IsStream t1, Monad (t1 IO) + , IsStream t2, Monad (t2 IO)) + => (t1 IO Int -> t1 IO Int) + -> (t2 IO Int -> t2 IO Int) + -> ([t2 IO Int] -> t2 IO Int) -> Spec -bindAndComposeHierarchy tl g = do +bindAndComposeHierarchy t1 t2 g = do it "Bind and compose nested" $ - (tl bindComposeNested >>= return . sort) + ((A.toList . t1) bindComposeNested >>= return . sort) `shouldReturn` (sort ( [12, 18] ++ replicate 3 13 @@ -489,12 +523,11 @@ bindAndComposeHierarchy tl g = do -- in m in b - tripleCompose a b c = g [a, b, c] + tripleCompose a b c = adapt . t2 $ g [a, b, c] tripleBind mx my mz = - mx `f` \x -> my - `f` \y -> mz - `f` \z -> return (x + y + z) - f = (>>=) + mx >>= \x -> my + >>= \y -> mz + >>= \z -> return (x + y + z) mixedOps :: Spec mixedOps = do @@ -512,14 +545,14 @@ mixedOps = do x <- return 1 y <- return 2 z <- do - x1 <- return 1 <|> return 2 + x1 <- adapt . parallely $ return 1 <> return 2 liftIO $ return () liftIO $ putStr "" - y1 <- return 1 <| return 2 + y1 <- adapt . asyncly $ return 1 <> return 2 z1 <- do x11 <- return 1 <> return 2 - y11 <- return 1 <| return 2 - z11 <- return 1 <=> return 2 + y11 <- adapt . asyncly $ return 1 <> return 2 + z11 <- adapt . interleaving $ return 1 <> return 2 liftIO $ return () liftIO $ putStr "" return (x11 + y11 + z11) diff --git a/test/Prop.hs b/test/Prop.hs index 33cd80c3..fb17b0a6 100644 --- a/test/Prop.hs +++ b/test/Prop.hs @@ -15,8 +15,12 @@ import Test.QuickCheck.Monadic (run, monadicIO, monitor, assert, PropertyM) import Test.Hspec import Streamly +import Streamly.Prelude ((.:), nil) import qualified Streamly.Prelude as A +singleton :: IsStream t => a -> t m a +singleton a = a .: nil + sortEq :: Ord a => [a] -> [a] -> Bool sortEq a b = sort a == sort b @@ -203,20 +207,19 @@ transformOpsWord8 constr desc t = do -- XXX concatenate streams of multiple elements rather than single elements semigroupOps - :: (IsStream t, MonadPlus (t IO) + :: (IsStream t #if __GLASGOW_HASKELL__ < 804 , Semigroup (t IO Int) #endif , Monoid (t IO Int)) - => String -> (t IO Int -> t IO Int) -> Spec -semigroupOps desc t = do - prop (desc ++ " <>") $ foldFromList (foldMapWith (<>) return) t (==) - prop (desc ++ " mappend") $ foldFromList (foldMapWith mappend return) t (==) - prop (desc ++ " <=>") $ foldFromList (foldMapWith (<=>) return) t (==) - prop (desc ++ " <|>") $ foldFromList (foldMapWith (<|>) return) t sortEq - prop (desc ++ " mplus") $ foldFromList (foldMapWith mplus return) t sortEq - prop (desc ++ " <|") $ foldFromList (foldMapWith (<|) return) t sortEq + => String + -> (t IO Int -> t IO Int) + -> ([Int] -> [Int] -> Bool) + -> Spec +semigroupOps desc t eq = do + prop (desc ++ " <>") $ foldFromList (foldMapWith (<>) singleton) t eq + prop (desc ++ " mappend") $ foldFromList (foldMapWith mappend singleton) t eq applicativeOps :: (IsStream t, Applicative (t IO)) @@ -330,8 +333,12 @@ main = hspec $ do functorOps folded "zippingAsync folded" zippingAsync (==) describe "Semigroup operations" $ do - semigroupOps "serially" serially - semigroupOps "interleaving" interleaving + semigroupOps "serially" serially (==) + semigroupOps "interleaving" interleaving (==) + semigroupOps "asyncly" asyncly sortEq + semigroupOps "parallely" parallely sortEq + semigroupOps "zipping" zipping (==) + semigroupOps "zippingAsync" zippingAsync (==) describe "Applicative operations" $ do -- The tests using sorted equality are weaker tests