diff --git a/Changelog.md b/Changelog.md index a0dab471..98c5f85f 100644 --- a/Changelog.md +++ b/Changelog.md @@ -16,6 +16,16 @@ different behavior for each type. See the documentation for more details. To adapt to this change replace any usage of `<|>` with `parallel` and `empty` with `nil`. +* Stream type now defaults to the `SerialT` type unless explicitly specified + using a type combinator or a monomorphic type. This change reduces puzzling + type errors for beginners. It includes the following two changes: + * Change the type of all stream elimination functions to `SerialT`. This + makes sure that the stream type is always fixed at all exits. + * Change the type combinators to only fix the argument stream type and + the resulting stream type remains polymorphic. + + Stream types may have to be changed or type combinators may have to added or + removed to adapt to this change. * Change the type of `foldrM` to make it consistent with `foldrM` in base. ### Deprecations @@ -42,7 +52,7 @@ * `foldl'` strict left fold * `foldlM'` strict left fold with a monadic fold function * `append` run two streams serially one after the other - * `parallel` run two streams in parallel + * `parallel` run two streams in parallel (replaces `<|>`) ## 0.1.2 diff --git a/benchmark/BenchmarkOps.hs b/benchmark/BenchmarkOps.hs index 37251933..ac8bd7e4 100644 --- a/benchmark/BenchmarkOps.hs +++ b/benchmark/BenchmarkOps.hs @@ -65,7 +65,7 @@ source n = S.fromFoldable [n..n+value] {-# INLINE runStream #-} runStream :: Monad m => Stream m a -> m () -runStream = S.runSerialT +runStream = S.runStream ------------------------------------------------------------------------------- -- Elimination diff --git a/examples/loops.hs b/examples/loops.hs index 1701aad4..2866de1e 100644 --- a/examples/loops.hs +++ b/examples/loops.hs @@ -5,32 +5,32 @@ main = do liftIO $ hSetBuffering stdout LineBuffering putStrLn $ "\nloopTail:\n" - runSerialT $ do + runStream $ do x <- loopTail 0 liftIO $ print (x :: Int) putStrLn $ "\nloopHead:\n" - runSerialT $ do + runStream $ do x <- loopHead 0 liftIO $ print (x :: Int) putStrLn $ "\nloopTailA:\n" - runSerialT $ do + runStream $ do x <- loopTailA 0 liftIO $ print (x :: Int) putStrLn $ "\nloopHeadA:\n" - runSerialT $ do + runStream $ do x <- loopHeadA 0 liftIO $ print (x :: Int) putStrLn $ "\ninterleave:\n" - runSerialT $ do + runStream $ do x <- (return 0 <> return 1) `interleave` (return 100 <> return 101) liftIO $ print (x :: Int) putStrLn $ "\nParallel interleave:\n" - runSerialT $ do + runStream $ do x <- (return 0 <> return 1) `parallel` (return 100 <> return 101) liftIO $ print (x :: Int) diff --git a/examples/nested-loops.hs b/examples/nested-loops.hs index ba86a048..53510f7e 100644 --- a/examples/nested-loops.hs +++ b/examples/nested-loops.hs @@ -5,7 +5,7 @@ import System.Random (randomIO) import Streamly import Streamly.Prelude (nil) -main = runSerialT $ do +main = runStream $ do liftIO $ hSetBuffering stdout LineBuffering x <- loop "A " 2 y <- loop "B " 2 diff --git a/examples/parallel-loops.hs b/examples/parallel-loops.hs index 05f9103c..b1b47947 100644 --- a/examples/parallel-loops.hs +++ b/examples/parallel-loops.hs @@ -4,7 +4,7 @@ import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering)) import System.Random (randomIO) import Streamly -main = runSerialT $ do +main = runStream $ do liftIO $ hSetBuffering stdout LineBuffering x <- loop "A" `parallel` loop "B" liftIO $ myThreadId >>= putStr . show diff --git a/src/Streamly.hs b/src/Streamly.hs index 6deab1a3..ca3ec3ea 100644 --- a/src/Streamly.hs +++ b/src/Streamly.hs @@ -30,7 +30,7 @@ module Streamly , ZipSerial , ZipAsync - -- * Type Independent Sum Operations + -- * Polymorphic Sum Operations -- $sum , append , interleave @@ -52,12 +52,6 @@ module Streamly -- * Running Streams , runStream - , runSerialT - , runInterleavedT - , runAParallelT - , runParallelT - , runZipSerial - , runZipAsync -- * Fold Utilities -- $foldutils @@ -77,6 +71,9 @@ module Streamly , Streaming , runStreaming , runStreamT + , runInterleavedT + , runParallelT + , runZipAsync , runAsyncT , runZipStream , StreamT @@ -182,50 +179,25 @@ import Control.Monad.Trans.Class (MonadTrans (..)) -- not monads. -- $sum --- Each stream style provides its own way of merging streams. The 'Semigroup' --- '<>' operation can be used to merge streams in the style of the current --- type. In case you want to merge streams in a particular style you can either --- use a type adapter to force that type of composition or alternatively use --- the type independent merge operations described in this section. +-- The 'Semigroup' operation '<>' of each stream type combines two streams in a +-- type specific manner. This section provides polymorphic versions of '<>' +-- which can be used to combine two streams in a predetermined way irrespective +-- of the type. -- $adapters -- --- Code using streamly is usually written such that it is agnostic of any --- specific streaming type. We use a type variable (polymorphic type) with the --- 'IsStream' class constraint. Finally, when running the monad we can specify --- the actual type that we want to use to interpret the code. However, in --- certain cases we may want to use a specific type to force a certain type of --- composition. These combinators can be used to convert the stream types from --- one to another at no cost as all the types have the same underlying --- representation. +-- You may want to use different stream types at different points in your +-- program. Stream types can be converted or adapted from one type to another +-- to make them interwork with each other. -- --- If you see an @ambiguous type variable@ error then most likely it is because --- you have not specified the stream type. You either need a type annotation or --- one of the following combinators to specify what type of stream you mean. --- --- This code: --- --- @ --- main = ('toList' $ (return 1 <> return 2)) >>= print --- @ --- --- will result in a type error like this: --- --- @ --- Ambiguous type variable ‘t0’ arising from a use of ... --- @ --- --- To fix the error just tell 'toList' what kind of stream are we feeding it: --- --- @ --- main = ('toList' $ 'serially' $ (return 1 <> return 2)) >>= print --- @ --- @ --- main = ('toList' $ (return 1 <> return 2 :: SerialT IO Int)) >>= print --- @ --- --- Note that using the combinators is easier as you do not have to think about --- the specific types, they are just inferred. +-- To adapt from one monomorphic type (e.g. 'ParallelT') to another monomorphic +-- type (e.g. 'SerialT') use the 'adapt' combinator. To give a polymorphic code +-- a specific interpretation or to adapt a specific type to a polymorphic type +-- use the type specific combinators e.g. 'parallely' or 'interleaving'. You +-- cannot adapt polymorphic code to polymorphic code, as it would not know +-- which specific type you are converting from or to. If you see a an +-- @ambiguous type variable@ error then most likely you are using 'adapt' +-- unnecessarily on polymorphic code. -- -- $foldutils diff --git a/src/Streamly/Examples/AcidRainGame.hs b/src/Streamly/Examples/AcidRainGame.hs index d3c72dac..ca52c3f9 100644 --- a/src/Streamly/Examples/AcidRainGame.hs +++ b/src/Streamly/Examples/AcidRainGame.hs @@ -42,5 +42,5 @@ acidRainGame :: IO () acidRainGame = do putStrLn "Your health is deteriorating due to acid rain,\ \ type \"potion\" or \"quit\"" - _ <- runStateT (runSerialT game) 60 + _ <- runStateT (runStream game) 60 return () diff --git a/src/Streamly/Examples/CirclingSquare.hs b/src/Streamly/Examples/CirclingSquare.hs index a574ffb0..2834d2a7 100644 --- a/src/Streamly/Examples/CirclingSquare.hs +++ b/src/Streamly/Examples/CirclingSquare.hs @@ -87,4 +87,4 @@ circlingSquare :: IO () circlingSquare = do sdlInit cref <- newIORef (0,0) - runSerialT $ liftIO (updateController cref) `parallel` liftIO (updateDisplay cref) + runStream $ liftIO (updateController cref) `parallel` liftIO (updateDisplay cref) diff --git a/src/Streamly/Examples/ListDirRecursive.hs b/src/Streamly/Examples/ListDirRecursive.hs index 51404713..118b1f63 100644 --- a/src/Streamly/Examples/ListDirRecursive.hs +++ b/src/Streamly/Examples/ListDirRecursive.hs @@ -3,15 +3,15 @@ module Streamly.Examples.ListDirRecursive where import Control.Monad.IO.Class (liftIO) import Path.IO (listDir, getCurrentDir) import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering)) -import Streamly (runAParallelT) +import Streamly (runStream, aparallely) -- | This example demonstrates that there is little difference between regular --- IO code and concurrent streamly code. You can just remove 'runAParallelT' --- and this is your regular IO code. +-- IO code and concurrent streamly code. You can just remove +-- 'runStream . aparallely' and this becomes your regular IO code. listDirRecursive :: IO () listDirRecursive = do hSetBuffering stdout LineBuffering - runAParallelT $ getCurrentDir >>= readdir + runStream . aparallely $ getCurrentDir >>= readdir where readdir d = do (ds, fs) <- liftIO $ listDir d liftIO $ mapM_ putStrLn $ map show fs ++ map show ds diff --git a/src/Streamly/Examples/SearchEngineQuery.hs b/src/Streamly/Examples/SearchEngineQuery.hs index 2c76e350..7e111648 100644 --- a/src/Streamly/Examples/SearchEngineQuery.hs +++ b/src/Streamly/Examples/SearchEngineQuery.hs @@ -7,15 +7,15 @@ import Network.HTTP.Simple searchEngineQuery :: IO () searchEngineQuery = do putStrLn "Using parallel alternative" - runParallelT $ google <> bing <> duckduckgo + runStream . parallely $ google <> bing <> duckduckgo putStrLn "\nUsing parallel applicative zip" - runZipAsync $ (,,) <$> google <*> bing <*> duckduckgo + runStream . zippingAsync $ (,,) <$> google <*> bing <*> duckduckgo where get :: IsStream t => String -> t IO () google, bing, duckduckgo :: IsStream t => t IO () - get s = adapt . serially $ liftIO (httpNoBody (parseRequest_ s) >> putStrLn (show s)) + get s = serially $ liftIO (httpNoBody (parseRequest_ s) >> putStrLn (show s)) google = get "https://www.google.com/search?q=haskell" bing = get "https://www.bing.com/search?q=haskell" duckduckgo = get "https://www.duckduckgo.com/?q=haskell" diff --git a/src/Streamly/Prelude.hs b/src/Streamly/Prelude.hs index 59f0ada4..9e972c3c 100644 --- a/src/Streamly/Prelude.hs +++ b/src/Streamly/Prelude.hs @@ -174,7 +174,7 @@ fromHandle h = fromStream go -- >> runIdentity $ foldr (:) [] (serially $ fromFoldable [1,2,3]) -- [1,2,3] -- @ -foldr :: (IsStream t, Monad m) => (a -> b -> b) -> b -> t m a -> m b +foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b foldr step acc m = go (toStream m) where go m1 = @@ -191,7 +191,7 @@ foldr step acc m = go (toStream m) -- [1,2,3] -- @ {-# INLINE foldrM #-} -foldrM :: (IsStream t, Monad m) => (a -> b -> m b) -> b -> t m a -> m b +foldrM :: Monad m => (a -> b -> m b) -> b -> SerialT m a -> m b foldrM step acc m = go (toStream m) where go m1 = @@ -233,8 +233,7 @@ scanl' step begin m = scanx step begin id m -- argument) to the folded value at the end. This is designed to work with the -- @foldl@ library. The suffix @x@ is a mnemonic for extraction. {-# INLINE foldx #-} -foldx :: (IsStream t, Monad m) - => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b +foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b foldx step begin done m = get $ go (toStream m) begin where {-# NOINLINE get #-} @@ -257,19 +256,17 @@ foldx step begin done m = get $ go (toStream m) begin in (S.runStream m1) Nothing stop yield {-# DEPRECATED foldl "Please use foldx instead." #-} -foldl :: (IsStream t, Monad m) - => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b +foldl :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b foldl = foldx -- | Strict left associative fold. {-# INLINE foldl' #-} -foldl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> m b +foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b foldl' step begin m = foldx step begin id m -- XXX replace the recursive "go" with explicit continuations. -- | Like 'foldx', but with a monadic step function. -foldxM :: (IsStream t, Monad m) - => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b +foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b foldxM step begin done m = go begin (toStream m) where go !acc m1 = @@ -279,18 +276,17 @@ foldxM step begin done m = go begin (toStream m) in (S.runStream m1) Nothing stop yield {-# DEPRECATED foldlM "Please use foldxM instead." #-} -foldlM :: (IsStream t, Monad m) - => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b +foldlM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b foldlM = foldxM -- | Like 'foldl'' but with a monadic step function. -foldlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> m b +foldlM' :: Monad m => (b -> a -> m b) -> b -> SerialT m a -> m b foldlM' step begin m = foldxM step (return begin) return m -- | Decompose a stream into its head and tail. If the stream is empty, returns -- 'Nothing'. If the stream is non-empty, returns 'Just (a, ma)', where 'a' is -- the head of the stream and 'ma' its tail. -uncons :: (IsStream t, Monad m) => t m a -> m (Maybe (a, t m a)) +uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a)) uncons m = let stop = return Nothing yield a Nothing = return (Just (a, nil)) @@ -298,7 +294,7 @@ uncons m = in (S.runStream (toStream m)) Nothing stop yield -- | Write a stream of Strings to an IO Handle. -toHandle :: (IsStream t, MonadIO m) => IO.Handle -> t m String -> m () +toHandle :: MonadIO m => IO.Handle -> SerialT m String -> m () toHandle h m = go (toStream m) where go m1 = @@ -313,7 +309,7 @@ toHandle h m = go (toStream m) -- | Convert a stream into a list in the underlying monad. {-# INLINABLE toList #-} -toList :: (IsStream t, Monad m) => t m a -> m [a] +toList :: Monad m => SerialT m a -> m [a] toList = foldrM (\a xs -> return (a : xs)) [] -- | Take first 'n' elements from the stream and discard the rest. @@ -376,7 +372,7 @@ dropWhile p m = fromStream $ go (toStream m) in (S.runStream m1) ctx stp yield -- | Determine whether all elements of a stream satisfy a predicate. -all :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool +all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool all p m = go (toStream m) where go m1 = @@ -387,7 +383,7 @@ all p m = go (toStream m) in (S.runStream m1) Nothing (return True) yield -- | Determine whether any of the elements of a stream satisfy a predicate. -any :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool +any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool any p m = go (toStream m) where go m1 = @@ -398,22 +394,22 @@ any p m = go (toStream m) in (S.runStream m1) Nothing (return False) yield -- | Determine the sum of all elements of a stream of numbers -sum :: (IsStream t, Monad m, Num a) => t m a -> m a +sum :: (Monad m, Num a) => SerialT m a -> m a sum = foldl (+) 0 id -- | Determine the product of all elements of a stream of numbers -product :: (IsStream t, Monad m, Num a) => t m a -> m a +product :: (Monad m, Num a) => SerialT m a -> m a product = foldl (*) 1 id -- | Extract the first element of the stream, if any. -head :: (IsStream t, Monad m) => t m a -> m (Maybe a) +head :: Monad m => SerialT m a -> m (Maybe a) head m = let stop = return Nothing yield a _ = return (Just a) in (S.runStream (toStream m)) Nothing stop yield -- | Extract all but the first element of the stream, if any. -tail :: (IsStream t, Monad m) => t m a -> m (Maybe (t m a)) +tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) tail m = let stop = return Nothing yield _ Nothing = return $ Just nil @@ -422,18 +418,18 @@ tail m = -- | Extract the last element of the stream, if any. {-# INLINE last #-} -last :: (IsStream t, Monad m) => t m a -> m (Maybe a) +last :: Monad m => SerialT m a -> m (Maybe a) last = foldl (\_ y -> Just y) Nothing id -- | Determine whether the stream is empty. -null :: (IsStream t, Monad m) => t m a -> m Bool +null :: Monad m => SerialT m a -> m Bool null m = let stop = return True yield _ _ = return False in (S.runStream (toStream m)) Nothing stop yield -- | Determine whether an element is present in the stream. -elem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool +elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool elem e m = go (toStream m) where go m1 = @@ -443,7 +439,7 @@ elem e m = go (toStream m) in (S.runStream m1) Nothing stop yield -- | Determine whether an element is not present in the stream. -notElem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool +notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool notElem e m = go (toStream m) where go m1 = @@ -453,7 +449,7 @@ notElem e m = go (toStream m) in (S.runStream m1) Nothing stop yield -- | Determine the length of the stream. -length :: (IsStream t, Monad m) => t m a -> m Int +length :: Monad m => SerialT m a -> m Int length = foldl (\n _ -> n + 1) 0 id -- | Returns the elements of the stream in reverse order. @@ -471,7 +467,7 @@ reverse m = fromStream $ go Nothing (toStream m) -- XXX replace the recursive "go" with continuation -- | Determine the minimum element in a stream. -minimum :: (IsStream t, Monad m, Ord a) => t m a -> m (Maybe a) +minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) minimum m = go Nothing (toStream m) where go r m1 = @@ -486,7 +482,7 @@ minimum m = go Nothing (toStream m) -- XXX replace the recursive "go" with continuation -- | Determine the maximum element in a stream. -maximum :: (IsStream t, Monad m, Ord a) => t m a -> m (Maybe a) +maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a) maximum m = go Nothing (toStream m) where go r m1 = @@ -519,7 +515,7 @@ mapM f m = fromStream $ go (toStream m) -- | Apply a monadic action to each element of the stream and discard the -- output of the action. -mapM_ :: (IsStream t, Monad m) => (a -> m b) -> t m a -> m () +mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m () mapM_ f m = go (toStream m) where go m1 = diff --git a/src/Streamly/Streams.hs b/src/Streamly/Streams.hs index bea4e872..63ad7e17 100644 --- a/src/Streamly/Streams.hs +++ b/src/Streamly/Streams.hs @@ -77,15 +77,12 @@ module Streamly.Streams , adapt -- * Running Streams - , runSerialT , runStreamT -- deprecated - , runInterleavedT - , runAParallelT + , runInterleavedT -- deprecated , runAsyncT -- deprecated - , runParallelT - , runZipSerial + , runParallelT -- deprecated , runZipStream -- deprecated - , runZipAsync + , runZipAsync -- deprecated -- * Zipping , zipWith @@ -220,7 +217,7 @@ streamFold sv step blank m = in (S.runStream (toStream m)) sv blank yield -- | Run a streaming composition, discard the results. -runStream :: (Monad m, IsStream t) => t m a -> m () +runStream :: Monad m => SerialT m a -> m () runStream m = go (toStream m) where go m1 = @@ -232,7 +229,7 @@ runStream m = go (toStream m) -- | Same as 'runStream' {-# Deprecated runStreaming "Please use runStream instead." #-} runStreaming :: (Monad m, IsStream t) => t m a -> m () -runStreaming = runStream +runStreaming = runStream . adapt -- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then -- be read back from the SVar using 'fromSVar'. @@ -274,7 +271,7 @@ async m = do -- element of the stream, serially. -- -- @ --- main = 'runSerialT' $ do +-- main = 'runStream' . 'serially' $ do -- x <- return 1 \<\> return 2 -- liftIO $ print x -- @ @@ -286,7 +283,7 @@ async m = do -- 'SerialT' nests streams serially in a depth first manner. -- -- @ --- main = 'runSerialT' $ do +-- main = 'runStream' . 'serially' $ do -- x <- return 1 \<\> return 2 -- y <- return 3 \<\> return 4 -- liftIO $ print (x, y) @@ -304,6 +301,8 @@ async m = do -- nested @for@ loops and the monadic continuation is the body of the loop. The -- loop iterates for all elements of the stream. -- +-- The 'serially' combinator can be omitted as the default stream type is +-- 'SerialT'. -- Note that serial composition can be used to combine an infinite number of -- streams as it explores only one stream at a time. -- @@ -334,9 +333,9 @@ type StreamT = SerialT -- Semigroup ------------------------------------------------------------------------------ --- | Same as the 'Semigroup' instance of 'SerialT'. Appends two streams --- sequentially, yielding all elements from the first stream, and then all --- elements from the second stream. +-- | Polymorphic version of the 'Semigroup' operation '<>' of 'SerialT'. +-- Appends two streams sequentially, yielding all elements from the first +-- stream, and then all elements from the second stream. {-# INLINE append #-} append :: IsStream t => t m a -> t m a -> t m a append m1 m2 = fromStream $ S.append (toStream m1) (toStream m2) @@ -435,7 +434,7 @@ instance (Monad m, Floating a) => Floating (SerialT m a) where -- -- -- @ --- main = 'runInterleavedT' $ do +-- main = 'runStream' . 'interleaving' $ do -- x <- return 1 \<\> return 2 -- y <- return 3 \<\> return 4 -- liftIO $ print (x, y) @@ -468,8 +467,8 @@ instance IsStream InterleavedT where -- Semigroup ------------------------------------------------------------------------------ --- | Same as the 'Semigroup' instance of 'InterleavedT'. Interleaves two --- streams, yielding one element from each stream alternately. +-- | Polymorphic version of the 'Semigroup' operation '<>' of 'InterleavedT'. +-- Interleaves two streams, yielding one element from each stream alternately. {-# INLINE interleave #-} interleave :: IsStream t => t m a -> t m a -> t m a interleave m1 m2 = fromStream $ S.interleave (toStream m1) (toStream m2) @@ -586,7 +585,7 @@ instance (Monad m, Floating a) => Floating (InterleavedT m a) where -- import "Streamly" -- import Control.Concurrent -- --- main = 'runAParallelT' $ do +-- main = 'runStream' . 'aparallely' $ do -- n <- return 3 \<\> return 2 \<\> return 1 -- liftIO $ do -- threadDelay (n * 1000000) @@ -627,9 +626,9 @@ instance IsStream AParallelT where -- Semigroup ------------------------------------------------------------------------------ --- | Same as the 'Semigroup' operation of 'AParallelT', but polymorphic. --- Merges two streams possibly concurrently, preferring the elements from the --- left one when available. +-- | Polymorphic version of the 'Semigroup' operation '<>' of 'AParallelT', but +-- polymorphic. Merges two streams possibly concurrently, preferring the +-- elements from the left one when available. {-# INLINE aparallel #-} aparallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a aparallel m1 m2 = fromStream $ S.aparallel (toStream m1) (toStream m2) @@ -753,7 +752,7 @@ instance (MonadAsync m, Floating a) => Floating (AParallelT m a) where -- import "Streamly" -- import Control.Concurrent -- --- main = 'runParallelT' $ do +-- main = 'runStream' . 'parallely' $ do -- n <- return 3 \<\> return 2 \<\> return 1 -- liftIO $ do -- threadDelay (n * 1000000) @@ -792,8 +791,8 @@ instance IsStream ParallelT where -- Semigroup ------------------------------------------------------------------------------ --- | Same as the 'Semigroup' instance of 'ParallelT'. Merges two streams --- concurrently choosing elements from both fairly. +-- | Polymorphic version of the 'Semigroup' operation '<>' of 'ParallelT'. +-- Merges two streams concurrently choosing elements from both fairly. {-# INLINE parallel #-} parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a parallel m1 m2 = fromStream $ S.parallel (toStream m1) (toStream m2) @@ -1066,81 +1065,72 @@ instance (MonadAsync m, Floating a) => Floating (ZipAsync m a) where -- Type adapting combinators ------------------------------------------------------------------------------- --- | Adapt one streaming type to another. +-- | Adapt any specific stream type to any other specific stream type. adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a adapt = fromStream . toStream --- | Interpret an ambiguously typed stream as 'SerialT'. -serially :: SerialT m a -> SerialT m a -serially x = x +-- | Fix the type of a polymorphic stream as 'SerialT'. +serially :: IsStream t => SerialT m a -> t m a +serially = adapt --- | Interpret an ambiguously typed stream as 'InterleavedT'. -interleaving :: InterleavedT m a -> InterleavedT m a -interleaving x = x +-- | Fix the type of a polymorphic stream as 'InterleavedT'. +interleaving :: IsStream t => InterleavedT m a -> t m a +interleaving = adapt --- | Interpret an ambiguously typed stream as 'AParallelT'. -aparallely :: AParallelT m a -> AParallelT m a -aparallely x = x +-- | Fix the type of a polymorphic stream as 'AParallelT'. +aparallely :: IsStream t => AParallelT m a -> t m a +aparallely = adapt -- | Same as 'aparallely'. {-# DEPRECATED asyncly "Please use aparallely instead." #-} -asyncly :: AParallelT m a -> AParallelT m a +asyncly :: IsStream t => AParallelT m a -> t m a asyncly = aparallely --- | Interpret an ambiguously typed stream as 'ParallelT'. -parallely :: ParallelT m a -> ParallelT m a -parallely x = x +-- | Fix the type of a polymorphic stream as 'ParallelT'. +parallely :: IsStream t => ParallelT m a -> t m a +parallely = adapt --- | Interpret an ambiguously typed stream as 'ZipSerial'. -zipping :: ZipSerial m a -> ZipSerial m a -zipping x = x +-- | Fix the type of a polymorphic stream as 'ZipSerial'. +zipping :: IsStream t => ZipSerial m a -> t m a +zipping = adapt --- | Interpret an ambiguously typed stream as 'ZipAsync'. -zippingAsync :: ZipAsync m a -> ZipAsync m a -zippingAsync x = x +-- | Fix the type of a polymorphic stream as 'ZipAsync'. +zippingAsync :: IsStream t => ZipAsync m a -> t m a +zippingAsync = adapt ------------------------------------------------------------------------------- -- Running Streams, convenience functions specialized to types ------------------------------------------------------------------------------- --- | Same as @runStream . serially@. -runSerialT :: Monad m => SerialT m a -> m () -runSerialT = runStream - --- | Same as @runSerialT@. -{-# Deprecated runStreamT "Please use runSerialT instead." #-} +-- | Same as @runStream@. +{-# DEPRECATED runStreamT "Please use runStream instead." #-} runStreamT :: Monad m => SerialT m a -> m () -runStreamT = runSerialT +runStreamT = runStream -- | Same as @runStream . interleaving@. +{-# DEPRECATED runInterleavedT "Please use 'runStream . interleaving' instead." #-} runInterleavedT :: Monad m => InterleavedT m a -> m () -runInterleavedT = runStream +runInterleavedT = runStream . interleaving -- | Same as @runStream . aparallely@. -runAParallelT :: Monad m => AParallelT m a -> m () -runAParallelT = runStream - --- | Same as @runAParallelT@. -{-# Deprecated runAsyncT "Please use runAParallelT instead." #-} +{-# DEPRECATED runAsyncT "Please use 'runStream . aparallely' instead." #-} runAsyncT :: Monad m => AParallelT m a -> m () -runAsyncT = runAParallelT +runAsyncT = runStream . aparallely -- | Same as @runStream . parallely@. +{-# DEPRECATED runParallelT "Please use 'runStream . parallely' instead." #-} runParallelT :: Monad m => ParallelT m a -> m () -runParallelT = runStream +runParallelT = runStream . parallely -- | Same as @runStream . zipping@. -runZipSerial :: Monad m => ZipSerial m a -> m () -runZipSerial = runStream - -{-# Deprecated runZipStream "Please use runZipSerial instead." #-} --- | Same as ZipSerial. +{-# DEPRECATED runZipStream "Please use 'runStream . zipping instead." #-} runZipStream :: Monad m => ZipSerial m a -> m () -runZipStream = runZipSerial +runZipStream = runStream . zipping -- | Same as @runStream . zippingAsync@. +{-# DEPRECATED runZipAsync "Please use 'runStream . zippingAsync instead." #-} runZipAsync :: Monad m => ZipAsync m a -> m () -runZipAsync = runStream +runZipAsync = runStream . zippingAsync ------------------------------------------------------------------------------ -- Fold Utilities diff --git a/test/Main.hs b/test/Main.hs index 2ad8a223..76919574 100644 --- a/test/Main.hs +++ b/test/Main.hs @@ -139,8 +139,8 @@ main = hspec $ do describe "Nested parallel and serial compositions" $ do let t = timed - p = adapt . parallely - s = adapt . serially + p = parallely + s = serially {- -- This is not correct, the result can also be [4,4,8,0,8,0,2,2] -- because of parallelism of [8,0] and [8,0]. @@ -375,14 +375,14 @@ timed :: MonadIO (t IO) => Int -> t IO Int timed x = liftIO (threadDelay (x * 100000)) >> return x interleaveCheck :: (IsStream t, Semigroup (t IO Int)) - => (t IO Int -> t IO Int) -> Spec + => (t IO Int -> SerialT IO Int) -> Spec interleaveCheck t = it "Interleave four" $ (A.toList . t) ((singleton 0 <> singleton 1) <> (singleton 100 <> singleton 101)) `shouldReturn` ([0, 100, 1, 101]) -parallelCheck :: (IsStream t, Semigroup (t IO Int), MonadIO (t IO)) - => (t IO Int -> t IO Int) -> Spec +parallelCheck :: (Semigroup (t IO Int), MonadIO (t IO)) + => (t IO Int -> SerialT IO Int) -> Spec parallelCheck t = do it "Parallel ordering left associated" $ (A.toList . t) (((event 4 <> event 3) <> event 2) <> event 1) @@ -395,7 +395,7 @@ parallelCheck t = do where event n = (liftIO $ threadDelay (n * 100000)) >> (return n) compose :: (IsStream t, Semigroup (t IO Int)) - => (t IO Int -> t IO Int) -> t IO Int -> ([Int] -> [Int]) -> Spec + => (t IO Int -> SerialT IO Int) -> t IO Int -> ([Int] -> [Int]) -> Spec compose t z srt = do -- XXX these should get covered by the property tests it "Compose mempty, mempty" $ @@ -434,13 +434,13 @@ composeAndComposeSimple , Semigroup (t2 IO Int) #endif ) - => (t1 IO Int -> t1 IO Int) + => (t1 IO Int -> SerialT IO Int) -> (t2 IO Int -> t2 IO Int) -> [[Int]] -> Spec composeAndComposeSimple t1 t2 answer = do let rfold = adapt . t2 . foldMapWith (<>) return it "Compose right associated outer expr, right folded inner" $ - ((A.toList. t1) (rfold [1,2,3] <> (rfold [4,5,6] <> rfold [7,8,9]))) + ((A.toList . t1) (rfold [1,2,3] <> (rfold [4,5,6] <> rfold [7,8,9]))) `shouldReturn` (answer !! 0) it "Compose left associated outer expr, right folded inner" $ @@ -463,10 +463,10 @@ loops -> ([Int] -> [Int]) -> Spec loops t tsrt hsrt = do - it "Tail recursive loop" $ (A.toList (loopTail 0) >>= return . tsrt) + it "Tail recursive loop" $ ((A.toList . adapt) (loopTail 0) >>= return . tsrt) `shouldReturn` [0..3] - it "Head recursive loop" $ (A.toList (loopHead 0) >>= return . hsrt) + it "Head recursive loop" $ ((A.toList . adapt) (loopHead 0) >>= return . hsrt) `shouldReturn` [0..3] where @@ -482,7 +482,7 @@ loops t tsrt hsrt = do bindAndComposeSimple :: ( IsStream t1, IsStream t2, Semigroup (t2 IO Int), Monad (t2 IO)) - => (t1 IO Int -> t1 IO Int) + => (t1 IO Int -> SerialT IO Int) -> (t2 IO Int -> t2 IO Int) -> Spec bindAndComposeSimple t1 t2 = do @@ -499,7 +499,7 @@ bindAndComposeSimple t1 t2 = do bindAndComposeHierarchy :: ( IsStream t1, Monad (t1 IO) , IsStream t2, Monad (t2 IO)) - => (t1 IO Int -> t1 IO Int) + => (t1 IO Int -> SerialT IO Int) -> (t2 IO Int -> t2 IO Int) -> ([t2 IO Int] -> t2 IO Int) -> Spec @@ -550,14 +550,14 @@ mixedOps = do x <- return 1 y <- return 2 z <- do - x1 <- adapt . parallely $ return 1 <> return 2 + x1 <- parallely $ return 1 <> return 2 liftIO $ return () liftIO $ putStr "" - y1 <- adapt . aparallely $ return 1 <> return 2 + y1 <- aparallely $ return 1 <> return 2 z1 <- do x11 <- return 1 <> return 2 - y11 <- adapt . aparallely $ return 1 <> return 2 - z11 <- adapt . interleaving $ return 1 <> return 2 + y11 <- aparallely $ return 1 <> return 2 + z11 <- interleaving $ return 1 <> return 2 liftIO $ return () liftIO $ putStr "" return (x11 + y11 + z11) diff --git a/test/Prop.hs b/test/Prop.hs index 3a88e7b2..37fee332 100644 --- a/test/Prop.hs +++ b/test/Prop.hs @@ -36,7 +36,7 @@ equals eq stream list = do constructWithReplicateM :: IsStream t - => (t IO Int -> t IO Int) + => (t IO Int -> SerialT IO Int) -> Word8 -> Property constructWithReplicateM op len = @@ -47,11 +47,10 @@ constructWithReplicateM op len = equals (==) stream list transformFromList - :: IsStream t - => ([Int] -> t IO Int) + :: ([Int] -> t IO Int) -> ([Int] -> [Int] -> Bool) -> ([Int] -> [Int]) - -> (t IO Int -> t IO Int) + -> (t IO Int -> SerialT IO Int) -> [Int] -> Property transformFromList constr eq listOp op a = @@ -61,9 +60,8 @@ transformFromList constr eq listOp op a = equals eq stream list foldFromList - :: IsStream t - => ([Int] -> t IO Int) - -> (t IO Int -> t IO Int) + :: ([Int] -> t IO Int) + -> (t IO Int -> SerialT IO Int) -> ([Int] -> [Int] -> Bool) -> [Int] -> Property @@ -84,8 +82,8 @@ eliminateOp constr listOp op a = elemOp :: ([Word8] -> t IO Word8) - -> (t IO Word8 -> t IO Word8) - -> (Word8 -> t IO Word8 -> IO Bool) + -> (t IO Word8 -> SerialT IO Word8) + -> (Word8 -> SerialT IO Word8 -> IO Bool) -> (Word8 -> [Word8] -> Bool) -> (Word8, [Word8]) -> Property @@ -96,10 +94,10 @@ elemOp constr op streamOp listOp (x, xs) = equals (==) stream list functorOps - :: (IsStream t, Functor (t IO)) + :: Functor (t IO) => ([Int] -> t IO Int) -> String - -> (t IO Int -> t IO Int) + -> (t IO Int -> SerialT IO Int) -> ([Int] -> [Int] -> Bool) -> Spec functorOps constr desc t eq = do @@ -110,7 +108,7 @@ transformOps :: IsStream t => ([Int] -> t IO Int) -> String - -> (t IO Int -> t IO Int) + -> (t IO Int -> SerialT IO Int) -> ([Int] -> [Int] -> Bool) -> Spec transformOps constr desc t eq = do @@ -159,10 +157,9 @@ wrapMaybe f = else Just (f x) eliminationOps - :: IsStream t - => ([Int] -> t IO Int) + :: ([Int] -> t IO Int) -> String - -> (t IO Int -> t IO Int) + -> (t IO Int -> SerialT IO Int) -> Spec eliminationOps constr desc t = do -- Elimination @@ -181,10 +178,9 @@ eliminationOps constr desc t = do -- head/tail/last may depend on the order in case of parallel streams -- so we test these only for serial streams. serialEliminationOps - :: IsStream t - => ([Int] -> t IO Int) + :: ([Int] -> t IO Int) -> String - -> (t IO Int -> t IO Int) + -> (t IO Int -> SerialT IO Int) -> Spec serialEliminationOps constr desc t = do prop (desc ++ " head") $ eliminateOp constr (wrapMaybe head) $ A.head . t @@ -196,10 +192,9 @@ serialEliminationOps constr desc t = do prop (desc ++ " last") $ eliminateOp constr (wrapMaybe last) $ A.last . t transformOpsWord8 - :: IsStream t - => ([Word8] -> t IO Word8) + :: ([Word8] -> t IO Word8) -> String - -> (t IO Word8 -> t IO Word8) + -> (t IO Word8 -> SerialT IO Word8) -> Spec transformOpsWord8 constr desc t = do prop (desc ++ " elem") $ elemOp constr t A.elem elem @@ -214,7 +209,7 @@ semigroupOps #endif , Monoid (t IO Int)) => String - -> (t IO Int -> t IO Int) + -> (t IO Int -> SerialT IO Int) -> ([Int] -> [Int] -> Bool) -> Spec semigroupOps desc t eq = do @@ -222,9 +217,9 @@ semigroupOps desc t eq = do prop (desc ++ " mappend") $ foldFromList (foldMapWith mappend singleton) t eq applicativeOps - :: (IsStream t, Applicative (t IO)) + :: Applicative (t IO) => ([Int] -> t IO Int) - -> (t IO (Int, Int) -> t IO (Int, Int)) + -> (t IO (Int, Int) -> SerialT IO (Int, Int)) -> ([(Int, Int)] -> [(Int, Int)] -> Bool) -> ([Int], [Int]) -> Property @@ -236,7 +231,7 @@ applicativeOps constr t eq (a, b) = monadicIO $ do zipApplicative :: (IsStream t, Applicative (t IO)) => ([Int] -> t IO Int) - -> (t IO (Int, Int) -> t IO (Int, Int)) + -> (t IO (Int, Int) -> SerialT IO (Int, Int)) -> ([(Int, Int)] -> [(Int, Int)] -> Bool) -> ([Int], [Int]) -> Property @@ -252,7 +247,7 @@ zipApplicative constr t eq (a, b) = monadicIO $ do zipMonadic :: (IsStream t, Monad (t IO)) => ([Int] -> t IO Int) - -> (t IO (Int, Int) -> t IO (Int, Int)) + -> (t IO (Int, Int) -> SerialT IO (Int, Int)) -> ([(Int, Int)] -> [(Int, Int)] -> Bool) -> ([Int], [Int]) -> Property @@ -271,9 +266,9 @@ zipMonadic constr t eq (a, b) = equals eq stream2 list monadThen - :: (IsStream t, Monad (t IO)) + :: Monad (t IO) => ([Int] -> t IO Int) - -> (t IO Int -> t IO Int) + -> (t IO Int -> SerialT IO Int) -> ([Int] -> [Int] -> Bool) -> ([Int], [Int]) -> Property @@ -283,9 +278,9 @@ monadThen constr t eq (a, b) = monadicIO $ do equals eq stream list monadBind - :: (IsStream t, Monad (t IO)) + :: Monad (t IO) => ([Int] -> t IO Int) - -> (t IO Int -> t IO Int) + -> (t IO Int -> SerialT IO Int) -> ([Int] -> [Int] -> Bool) -> ([Int], [Int]) -> Property @@ -313,7 +308,7 @@ main = hspec $ do `shouldReturn` (take 100 $ iterate (+ 1) 0) let folded :: IsStream t => [a] -> t IO a - folded = adapt . serially . (\xs -> + folded = serially . (\xs -> case xs of [x] -> return x -- singleton stream case _ -> foldMapWith (<>) return xs