Merge pull request #61 from composewell/type-combinators

Fix the default stream type to SerialT
This commit is contained in:
Harendra Kumar 2018-04-18 16:54:37 +05:30 committed by GitHub
commit 0631b9d2a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 170 additions and 207 deletions

View File

@ -16,6 +16,16 @@
different behavior for each type. See the documentation for more details. To
adapt to this change replace any usage of `<|>` with `parallel` and
`empty` with `nil`.
* Stream type now defaults to the `SerialT` type unless explicitly specified
using a type combinator or a monomorphic type. This change reduces puzzling
type errors for beginners. It includes the following two changes:
* Change the type of all stream elimination functions to `SerialT`. This
makes sure that the stream type is always fixed at all exits.
* Change the type combinators to only fix the argument stream type and
the resulting stream type remains polymorphic.
Stream types may have to be changed or type combinators may have to added or
removed to adapt to this change.
* Change the type of `foldrM` to make it consistent with `foldrM` in base.
### Deprecations
@ -42,7 +52,7 @@
* `foldl'` strict left fold
* `foldlM'` strict left fold with a monadic fold function
* `append` run two streams serially one after the other
* `parallel` run two streams in parallel
* `parallel` run two streams in parallel (replaces `<|>`)
## 0.1.2

View File

@ -65,7 +65,7 @@ source n = S.fromFoldable [n..n+value]
{-# INLINE runStream #-}
runStream :: Monad m => Stream m a -> m ()
runStream = S.runSerialT
runStream = S.runStream
-------------------------------------------------------------------------------
-- Elimination

View File

@ -5,32 +5,32 @@ main = do
liftIO $ hSetBuffering stdout LineBuffering
putStrLn $ "\nloopTail:\n"
runSerialT $ do
runStream $ do
x <- loopTail 0
liftIO $ print (x :: Int)
putStrLn $ "\nloopHead:\n"
runSerialT $ do
runStream $ do
x <- loopHead 0
liftIO $ print (x :: Int)
putStrLn $ "\nloopTailA:\n"
runSerialT $ do
runStream $ do
x <- loopTailA 0
liftIO $ print (x :: Int)
putStrLn $ "\nloopHeadA:\n"
runSerialT $ do
runStream $ do
x <- loopHeadA 0
liftIO $ print (x :: Int)
putStrLn $ "\ninterleave:\n"
runSerialT $ do
runStream $ do
x <- (return 0 <> return 1) `interleave` (return 100 <> return 101)
liftIO $ print (x :: Int)
putStrLn $ "\nParallel interleave:\n"
runSerialT $ do
runStream $ do
x <- (return 0 <> return 1) `parallel` (return 100 <> return 101)
liftIO $ print (x :: Int)

View File

@ -5,7 +5,7 @@ import System.Random (randomIO)
import Streamly
import Streamly.Prelude (nil)
main = runSerialT $ do
main = runStream $ do
liftIO $ hSetBuffering stdout LineBuffering
x <- loop "A " 2
y <- loop "B " 2

View File

@ -4,7 +4,7 @@ import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
import System.Random (randomIO)
import Streamly
main = runSerialT $ do
main = runStream $ do
liftIO $ hSetBuffering stdout LineBuffering
x <- loop "A" `parallel` loop "B"
liftIO $ myThreadId >>= putStr . show

View File

@ -30,7 +30,7 @@ module Streamly
, ZipSerial
, ZipAsync
-- * Type Independent Sum Operations
-- * Polymorphic Sum Operations
-- $sum
, append
, interleave
@ -52,12 +52,6 @@ module Streamly
-- * Running Streams
, runStream
, runSerialT
, runInterleavedT
, runAParallelT
, runParallelT
, runZipSerial
, runZipAsync
-- * Fold Utilities
-- $foldutils
@ -77,6 +71,9 @@ module Streamly
, Streaming
, runStreaming
, runStreamT
, runInterleavedT
, runParallelT
, runZipAsync
, runAsyncT
, runZipStream
, StreamT
@ -182,50 +179,25 @@ import Control.Monad.Trans.Class (MonadTrans (..))
-- not monads.
-- $sum
-- Each stream style provides its own way of merging streams. The 'Semigroup'
-- '<>' operation can be used to merge streams in the style of the current
-- type. In case you want to merge streams in a particular style you can either
-- use a type adapter to force that type of composition or alternatively use
-- the type independent merge operations described in this section.
-- The 'Semigroup' operation '<>' of each stream type combines two streams in a
-- type specific manner. This section provides polymorphic versions of '<>'
-- which can be used to combine two streams in a predetermined way irrespective
-- of the type.
-- $adapters
--
-- Code using streamly is usually written such that it is agnostic of any
-- specific streaming type. We use a type variable (polymorphic type) with the
-- 'IsStream' class constraint. Finally, when running the monad we can specify
-- the actual type that we want to use to interpret the code. However, in
-- certain cases we may want to use a specific type to force a certain type of
-- composition. These combinators can be used to convert the stream types from
-- one to another at no cost as all the types have the same underlying
-- representation.
-- You may want to use different stream types at different points in your
-- program. Stream types can be converted or adapted from one type to another
-- to make them interwork with each other.
--
-- If you see an @ambiguous type variable@ error then most likely it is because
-- you have not specified the stream type. You either need a type annotation or
-- one of the following combinators to specify what type of stream you mean.
--
-- This code:
--
-- @
-- main = ('toList' $ (return 1 <> return 2)) >>= print
-- @
--
-- will result in a type error like this:
--
-- @
-- Ambiguous type variable t0 arising from a use of ...
-- @
--
-- To fix the error just tell 'toList' what kind of stream are we feeding it:
--
-- @
-- main = ('toList' $ 'serially' $ (return 1 <> return 2)) >>= print
-- @
-- @
-- main = ('toList' $ (return 1 <> return 2 :: SerialT IO Int)) >>= print
-- @
--
-- Note that using the combinators is easier as you do not have to think about
-- the specific types, they are just inferred.
-- To adapt from one monomorphic type (e.g. 'ParallelT') to another monomorphic
-- type (e.g. 'SerialT') use the 'adapt' combinator. To give a polymorphic code
-- a specific interpretation or to adapt a specific type to a polymorphic type
-- use the type specific combinators e.g. 'parallely' or 'interleaving'. You
-- cannot adapt polymorphic code to polymorphic code, as it would not know
-- which specific type you are converting from or to. If you see a an
-- @ambiguous type variable@ error then most likely you are using 'adapt'
-- unnecessarily on polymorphic code.
--
-- $foldutils

View File

@ -42,5 +42,5 @@ acidRainGame :: IO ()
acidRainGame = do
putStrLn "Your health is deteriorating due to acid rain,\
\ type \"potion\" or \"quit\""
_ <- runStateT (runSerialT game) 60
_ <- runStateT (runStream game) 60
return ()

View File

@ -87,4 +87,4 @@ circlingSquare :: IO ()
circlingSquare = do
sdlInit
cref <- newIORef (0,0)
runSerialT $ liftIO (updateController cref) `parallel` liftIO (updateDisplay cref)
runStream $ liftIO (updateController cref) `parallel` liftIO (updateDisplay cref)

View File

@ -3,15 +3,15 @@ module Streamly.Examples.ListDirRecursive where
import Control.Monad.IO.Class (liftIO)
import Path.IO (listDir, getCurrentDir)
import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
import Streamly (runAParallelT)
import Streamly (runStream, aparallely)
-- | This example demonstrates that there is little difference between regular
-- IO code and concurrent streamly code. You can just remove 'runAParallelT'
-- and this is your regular IO code.
-- IO code and concurrent streamly code. You can just remove
-- 'runStream . aparallely' and this becomes your regular IO code.
listDirRecursive :: IO ()
listDirRecursive = do
hSetBuffering stdout LineBuffering
runAParallelT $ getCurrentDir >>= readdir
runStream . aparallely $ getCurrentDir >>= readdir
where readdir d = do
(ds, fs) <- liftIO $ listDir d
liftIO $ mapM_ putStrLn $ map show fs ++ map show ds

View File

@ -7,15 +7,15 @@ import Network.HTTP.Simple
searchEngineQuery :: IO ()
searchEngineQuery = do
putStrLn "Using parallel alternative"
runParallelT $ google <> bing <> duckduckgo
runStream . parallely $ google <> bing <> duckduckgo
putStrLn "\nUsing parallel applicative zip"
runZipAsync $ (,,) <$> google <*> bing <*> duckduckgo
runStream . zippingAsync $ (,,) <$> google <*> bing <*> duckduckgo
where
get :: IsStream t => String -> t IO ()
google, bing, duckduckgo :: IsStream t => t IO ()
get s = adapt . serially $ liftIO (httpNoBody (parseRequest_ s) >> putStrLn (show s))
get s = serially $ liftIO (httpNoBody (parseRequest_ s) >> putStrLn (show s))
google = get "https://www.google.com/search?q=haskell"
bing = get "https://www.bing.com/search?q=haskell"
duckduckgo = get "https://www.duckduckgo.com/?q=haskell"

View File

@ -174,7 +174,7 @@ fromHandle h = fromStream go
-- >> runIdentity $ foldr (:) [] (serially $ fromFoldable [1,2,3])
-- [1,2,3]
-- @
foldr :: (IsStream t, Monad m) => (a -> b -> b) -> b -> t m a -> m b
foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b
foldr step acc m = go (toStream m)
where
go m1 =
@ -191,7 +191,7 @@ foldr step acc m = go (toStream m)
-- [1,2,3]
-- @
{-# INLINE foldrM #-}
foldrM :: (IsStream t, Monad m) => (a -> b -> m b) -> b -> t m a -> m b
foldrM :: Monad m => (a -> b -> m b) -> b -> SerialT m a -> m b
foldrM step acc m = go (toStream m)
where
go m1 =
@ -233,8 +233,7 @@ scanl' step begin m = scanx step begin id m
-- argument) to the folded value at the end. This is designed to work with the
-- @foldl@ library. The suffix @x@ is a mnemonic for extraction.
{-# INLINE foldx #-}
foldx :: (IsStream t, Monad m)
=> (x -> a -> x) -> x -> (x -> b) -> t m a -> m b
foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b
foldx step begin done m = get $ go (toStream m) begin
where
{-# NOINLINE get #-}
@ -257,19 +256,17 @@ foldx step begin done m = get $ go (toStream m) begin
in (S.runStream m1) Nothing stop yield
{-# DEPRECATED foldl "Please use foldx instead." #-}
foldl :: (IsStream t, Monad m)
=> (x -> a -> x) -> x -> (x -> b) -> t m a -> m b
foldl :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b
foldl = foldx
-- | Strict left associative fold.
{-# INLINE foldl' #-}
foldl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> m b
foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b
foldl' step begin m = foldx step begin id m
-- XXX replace the recursive "go" with explicit continuations.
-- | Like 'foldx', but with a monadic step function.
foldxM :: (IsStream t, Monad m)
=> (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b
foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b
foldxM step begin done m = go begin (toStream m)
where
go !acc m1 =
@ -279,18 +276,17 @@ foldxM step begin done m = go begin (toStream m)
in (S.runStream m1) Nothing stop yield
{-# DEPRECATED foldlM "Please use foldxM instead." #-}
foldlM :: (IsStream t, Monad m)
=> (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b
foldlM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b
foldlM = foldxM
-- | Like 'foldl'' but with a monadic step function.
foldlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> m b
foldlM' :: Monad m => (b -> a -> m b) -> b -> SerialT m a -> m b
foldlM' step begin m = foldxM step (return begin) return m
-- | Decompose a stream into its head and tail. If the stream is empty, returns
-- 'Nothing'. If the stream is non-empty, returns 'Just (a, ma)', where 'a' is
-- the head of the stream and 'ma' its tail.
uncons :: (IsStream t, Monad m) => t m a -> m (Maybe (a, t m a))
uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a))
uncons m =
let stop = return Nothing
yield a Nothing = return (Just (a, nil))
@ -298,7 +294,7 @@ uncons m =
in (S.runStream (toStream m)) Nothing stop yield
-- | Write a stream of Strings to an IO Handle.
toHandle :: (IsStream t, MonadIO m) => IO.Handle -> t m String -> m ()
toHandle :: MonadIO m => IO.Handle -> SerialT m String -> m ()
toHandle h m = go (toStream m)
where
go m1 =
@ -313,7 +309,7 @@ toHandle h m = go (toStream m)
-- | Convert a stream into a list in the underlying monad.
{-# INLINABLE toList #-}
toList :: (IsStream t, Monad m) => t m a -> m [a]
toList :: Monad m => SerialT m a -> m [a]
toList = foldrM (\a xs -> return (a : xs)) []
-- | Take first 'n' elements from the stream and discard the rest.
@ -376,7 +372,7 @@ dropWhile p m = fromStream $ go (toStream m)
in (S.runStream m1) ctx stp yield
-- | Determine whether all elements of a stream satisfy a predicate.
all :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool
all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool
all p m = go (toStream m)
where
go m1 =
@ -387,7 +383,7 @@ all p m = go (toStream m)
in (S.runStream m1) Nothing (return True) yield
-- | Determine whether any of the elements of a stream satisfy a predicate.
any :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool
any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool
any p m = go (toStream m)
where
go m1 =
@ -398,22 +394,22 @@ any p m = go (toStream m)
in (S.runStream m1) Nothing (return False) yield
-- | Determine the sum of all elements of a stream of numbers
sum :: (IsStream t, Monad m, Num a) => t m a -> m a
sum :: (Monad m, Num a) => SerialT m a -> m a
sum = foldl (+) 0 id
-- | Determine the product of all elements of a stream of numbers
product :: (IsStream t, Monad m, Num a) => t m a -> m a
product :: (Monad m, Num a) => SerialT m a -> m a
product = foldl (*) 1 id
-- | Extract the first element of the stream, if any.
head :: (IsStream t, Monad m) => t m a -> m (Maybe a)
head :: Monad m => SerialT m a -> m (Maybe a)
head m =
let stop = return Nothing
yield a _ = return (Just a)
in (S.runStream (toStream m)) Nothing stop yield
-- | Extract all but the first element of the stream, if any.
tail :: (IsStream t, Monad m) => t m a -> m (Maybe (t m a))
tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a))
tail m =
let stop = return Nothing
yield _ Nothing = return $ Just nil
@ -422,18 +418,18 @@ tail m =
-- | Extract the last element of the stream, if any.
{-# INLINE last #-}
last :: (IsStream t, Monad m) => t m a -> m (Maybe a)
last :: Monad m => SerialT m a -> m (Maybe a)
last = foldl (\_ y -> Just y) Nothing id
-- | Determine whether the stream is empty.
null :: (IsStream t, Monad m) => t m a -> m Bool
null :: Monad m => SerialT m a -> m Bool
null m =
let stop = return True
yield _ _ = return False
in (S.runStream (toStream m)) Nothing stop yield
-- | Determine whether an element is present in the stream.
elem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool
elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool
elem e m = go (toStream m)
where
go m1 =
@ -443,7 +439,7 @@ elem e m = go (toStream m)
in (S.runStream m1) Nothing stop yield
-- | Determine whether an element is not present in the stream.
notElem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool
notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool
notElem e m = go (toStream m)
where
go m1 =
@ -453,7 +449,7 @@ notElem e m = go (toStream m)
in (S.runStream m1) Nothing stop yield
-- | Determine the length of the stream.
length :: (IsStream t, Monad m) => t m a -> m Int
length :: Monad m => SerialT m a -> m Int
length = foldl (\n _ -> n + 1) 0 id
-- | Returns the elements of the stream in reverse order.
@ -471,7 +467,7 @@ reverse m = fromStream $ go Nothing (toStream m)
-- XXX replace the recursive "go" with continuation
-- | Determine the minimum element in a stream.
minimum :: (IsStream t, Monad m, Ord a) => t m a -> m (Maybe a)
minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a)
minimum m = go Nothing (toStream m)
where
go r m1 =
@ -486,7 +482,7 @@ minimum m = go Nothing (toStream m)
-- XXX replace the recursive "go" with continuation
-- | Determine the maximum element in a stream.
maximum :: (IsStream t, Monad m, Ord a) => t m a -> m (Maybe a)
maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a)
maximum m = go Nothing (toStream m)
where
go r m1 =
@ -519,7 +515,7 @@ mapM f m = fromStream $ go (toStream m)
-- | Apply a monadic action to each element of the stream and discard the
-- output of the action.
mapM_ :: (IsStream t, Monad m) => (a -> m b) -> t m a -> m ()
mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m ()
mapM_ f m = go (toStream m)
where
go m1 =

View File

@ -77,15 +77,12 @@ module Streamly.Streams
, adapt
-- * Running Streams
, runSerialT
, runStreamT -- deprecated
, runInterleavedT
, runAParallelT
, runInterleavedT -- deprecated
, runAsyncT -- deprecated
, runParallelT
, runZipSerial
, runParallelT -- deprecated
, runZipStream -- deprecated
, runZipAsync
, runZipAsync -- deprecated
-- * Zipping
, zipWith
@ -220,7 +217,7 @@ streamFold sv step blank m =
in (S.runStream (toStream m)) sv blank yield
-- | Run a streaming composition, discard the results.
runStream :: (Monad m, IsStream t) => t m a -> m ()
runStream :: Monad m => SerialT m a -> m ()
runStream m = go (toStream m)
where
go m1 =
@ -232,7 +229,7 @@ runStream m = go (toStream m)
-- | Same as 'runStream'
{-# Deprecated runStreaming "Please use runStream instead." #-}
runStreaming :: (Monad m, IsStream t) => t m a -> m ()
runStreaming = runStream
runStreaming = runStream . adapt
-- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then
-- be read back from the SVar using 'fromSVar'.
@ -274,7 +271,7 @@ async m = do
-- element of the stream, serially.
--
-- @
-- main = 'runSerialT' $ do
-- main = 'runStream' . 'serially' $ do
-- x <- return 1 \<\> return 2
-- liftIO $ print x
-- @
@ -286,7 +283,7 @@ async m = do
-- 'SerialT' nests streams serially in a depth first manner.
--
-- @
-- main = 'runSerialT' $ do
-- main = 'runStream' . 'serially' $ do
-- x <- return 1 \<\> return 2
-- y <- return 3 \<\> return 4
-- liftIO $ print (x, y)
@ -304,6 +301,8 @@ async m = do
-- nested @for@ loops and the monadic continuation is the body of the loop. The
-- loop iterates for all elements of the stream.
--
-- The 'serially' combinator can be omitted as the default stream type is
-- 'SerialT'.
-- Note that serial composition can be used to combine an infinite number of
-- streams as it explores only one stream at a time.
--
@ -334,9 +333,9 @@ type StreamT = SerialT
-- Semigroup
------------------------------------------------------------------------------
-- | Same as the 'Semigroup' instance of 'SerialT'. Appends two streams
-- sequentially, yielding all elements from the first stream, and then all
-- elements from the second stream.
-- | Polymorphic version of the 'Semigroup' operation '<>' of 'SerialT'.
-- Appends two streams sequentially, yielding all elements from the first
-- stream, and then all elements from the second stream.
{-# INLINE append #-}
append :: IsStream t => t m a -> t m a -> t m a
append m1 m2 = fromStream $ S.append (toStream m1) (toStream m2)
@ -435,7 +434,7 @@ instance (Monad m, Floating a) => Floating (SerialT m a) where
--
--
-- @
-- main = 'runInterleavedT' $ do
-- main = 'runStream' . 'interleaving' $ do
-- x <- return 1 \<\> return 2
-- y <- return 3 \<\> return 4
-- liftIO $ print (x, y)
@ -468,8 +467,8 @@ instance IsStream InterleavedT where
-- Semigroup
------------------------------------------------------------------------------
-- | Same as the 'Semigroup' instance of 'InterleavedT'. Interleaves two
-- streams, yielding one element from each stream alternately.
-- | Polymorphic version of the 'Semigroup' operation '<>' of 'InterleavedT'.
-- Interleaves two streams, yielding one element from each stream alternately.
{-# INLINE interleave #-}
interleave :: IsStream t => t m a -> t m a -> t m a
interleave m1 m2 = fromStream $ S.interleave (toStream m1) (toStream m2)
@ -586,7 +585,7 @@ instance (Monad m, Floating a) => Floating (InterleavedT m a) where
-- import "Streamly"
-- import Control.Concurrent
--
-- main = 'runAParallelT' $ do
-- main = 'runStream' . 'aparallely' $ do
-- n <- return 3 \<\> return 2 \<\> return 1
-- liftIO $ do
-- threadDelay (n * 1000000)
@ -627,9 +626,9 @@ instance IsStream AParallelT where
-- Semigroup
------------------------------------------------------------------------------
-- | Same as the 'Semigroup' operation of 'AParallelT', but polymorphic.
-- Merges two streams possibly concurrently, preferring the elements from the
-- left one when available.
-- | Polymorphic version of the 'Semigroup' operation '<>' of 'AParallelT', but
-- polymorphic. Merges two streams possibly concurrently, preferring the
-- elements from the left one when available.
{-# INLINE aparallel #-}
aparallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
aparallel m1 m2 = fromStream $ S.aparallel (toStream m1) (toStream m2)
@ -753,7 +752,7 @@ instance (MonadAsync m, Floating a) => Floating (AParallelT m a) where
-- import "Streamly"
-- import Control.Concurrent
--
-- main = 'runParallelT' $ do
-- main = 'runStream' . 'parallely' $ do
-- n <- return 3 \<\> return 2 \<\> return 1
-- liftIO $ do
-- threadDelay (n * 1000000)
@ -792,8 +791,8 @@ instance IsStream ParallelT where
-- Semigroup
------------------------------------------------------------------------------
-- | Same as the 'Semigroup' instance of 'ParallelT'. Merges two streams
-- concurrently choosing elements from both fairly.
-- | Polymorphic version of the 'Semigroup' operation '<>' of 'ParallelT'.
-- Merges two streams concurrently choosing elements from both fairly.
{-# INLINE parallel #-}
parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
parallel m1 m2 = fromStream $ S.parallel (toStream m1) (toStream m2)
@ -1066,81 +1065,72 @@ instance (MonadAsync m, Floating a) => Floating (ZipAsync m a) where
-- Type adapting combinators
-------------------------------------------------------------------------------
-- | Adapt one streaming type to another.
-- | Adapt any specific stream type to any other specific stream type.
adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a
adapt = fromStream . toStream
-- | Interpret an ambiguously typed stream as 'SerialT'.
serially :: SerialT m a -> SerialT m a
serially x = x
-- | Fix the type of a polymorphic stream as 'SerialT'.
serially :: IsStream t => SerialT m a -> t m a
serially = adapt
-- | Interpret an ambiguously typed stream as 'InterleavedT'.
interleaving :: InterleavedT m a -> InterleavedT m a
interleaving x = x
-- | Fix the type of a polymorphic stream as 'InterleavedT'.
interleaving :: IsStream t => InterleavedT m a -> t m a
interleaving = adapt
-- | Interpret an ambiguously typed stream as 'AParallelT'.
aparallely :: AParallelT m a -> AParallelT m a
aparallely x = x
-- | Fix the type of a polymorphic stream as 'AParallelT'.
aparallely :: IsStream t => AParallelT m a -> t m a
aparallely = adapt
-- | Same as 'aparallely'.
{-# DEPRECATED asyncly "Please use aparallely instead." #-}
asyncly :: AParallelT m a -> AParallelT m a
asyncly :: IsStream t => AParallelT m a -> t m a
asyncly = aparallely
-- | Interpret an ambiguously typed stream as 'ParallelT'.
parallely :: ParallelT m a -> ParallelT m a
parallely x = x
-- | Fix the type of a polymorphic stream as 'ParallelT'.
parallely :: IsStream t => ParallelT m a -> t m a
parallely = adapt
-- | Interpret an ambiguously typed stream as 'ZipSerial'.
zipping :: ZipSerial m a -> ZipSerial m a
zipping x = x
-- | Fix the type of a polymorphic stream as 'ZipSerial'.
zipping :: IsStream t => ZipSerial m a -> t m a
zipping = adapt
-- | Interpret an ambiguously typed stream as 'ZipAsync'.
zippingAsync :: ZipAsync m a -> ZipAsync m a
zippingAsync x = x
-- | Fix the type of a polymorphic stream as 'ZipAsync'.
zippingAsync :: IsStream t => ZipAsync m a -> t m a
zippingAsync = adapt
-------------------------------------------------------------------------------
-- Running Streams, convenience functions specialized to types
-------------------------------------------------------------------------------
-- | Same as @runStream . serially@.
runSerialT :: Monad m => SerialT m a -> m ()
runSerialT = runStream
-- | Same as @runSerialT@.
{-# Deprecated runStreamT "Please use runSerialT instead." #-}
-- | Same as @runStream@.
{-# DEPRECATED runStreamT "Please use runStream instead." #-}
runStreamT :: Monad m => SerialT m a -> m ()
runStreamT = runSerialT
runStreamT = runStream
-- | Same as @runStream . interleaving@.
{-# DEPRECATED runInterleavedT "Please use 'runStream . interleaving' instead." #-}
runInterleavedT :: Monad m => InterleavedT m a -> m ()
runInterleavedT = runStream
runInterleavedT = runStream . interleaving
-- | Same as @runStream . aparallely@.
runAParallelT :: Monad m => AParallelT m a -> m ()
runAParallelT = runStream
-- | Same as @runAParallelT@.
{-# Deprecated runAsyncT "Please use runAParallelT instead." #-}
{-# DEPRECATED runAsyncT "Please use 'runStream . aparallely' instead." #-}
runAsyncT :: Monad m => AParallelT m a -> m ()
runAsyncT = runAParallelT
runAsyncT = runStream . aparallely
-- | Same as @runStream . parallely@.
{-# DEPRECATED runParallelT "Please use 'runStream . parallely' instead." #-}
runParallelT :: Monad m => ParallelT m a -> m ()
runParallelT = runStream
runParallelT = runStream . parallely
-- | Same as @runStream . zipping@.
runZipSerial :: Monad m => ZipSerial m a -> m ()
runZipSerial = runStream
{-# Deprecated runZipStream "Please use runZipSerial instead." #-}
-- | Same as ZipSerial.
{-# DEPRECATED runZipStream "Please use 'runStream . zipping instead." #-}
runZipStream :: Monad m => ZipSerial m a -> m ()
runZipStream = runZipSerial
runZipStream = runStream . zipping
-- | Same as @runStream . zippingAsync@.
{-# DEPRECATED runZipAsync "Please use 'runStream . zippingAsync instead." #-}
runZipAsync :: Monad m => ZipAsync m a -> m ()
runZipAsync = runStream
runZipAsync = runStream . zippingAsync
------------------------------------------------------------------------------
-- Fold Utilities

View File

@ -139,8 +139,8 @@ main = hspec $ do
describe "Nested parallel and serial compositions" $ do
let t = timed
p = adapt . parallely
s = adapt . serially
p = parallely
s = serially
{-
-- This is not correct, the result can also be [4,4,8,0,8,0,2,2]
-- because of parallelism of [8,0] and [8,0].
@ -375,14 +375,14 @@ timed :: MonadIO (t IO) => Int -> t IO Int
timed x = liftIO (threadDelay (x * 100000)) >> return x
interleaveCheck :: (IsStream t, Semigroup (t IO Int))
=> (t IO Int -> t IO Int) -> Spec
=> (t IO Int -> SerialT IO Int) -> Spec
interleaveCheck t =
it "Interleave four" $
(A.toList . t) ((singleton 0 <> singleton 1) <> (singleton 100 <> singleton 101))
`shouldReturn` ([0, 100, 1, 101])
parallelCheck :: (IsStream t, Semigroup (t IO Int), MonadIO (t IO))
=> (t IO Int -> t IO Int) -> Spec
parallelCheck :: (Semigroup (t IO Int), MonadIO (t IO))
=> (t IO Int -> SerialT IO Int) -> Spec
parallelCheck t = do
it "Parallel ordering left associated" $
(A.toList . t) (((event 4 <> event 3) <> event 2) <> event 1)
@ -395,7 +395,7 @@ parallelCheck t = do
where event n = (liftIO $ threadDelay (n * 100000)) >> (return n)
compose :: (IsStream t, Semigroup (t IO Int))
=> (t IO Int -> t IO Int) -> t IO Int -> ([Int] -> [Int]) -> Spec
=> (t IO Int -> SerialT IO Int) -> t IO Int -> ([Int] -> [Int]) -> Spec
compose t z srt = do
-- XXX these should get covered by the property tests
it "Compose mempty, mempty" $
@ -434,13 +434,13 @@ composeAndComposeSimple
, Semigroup (t2 IO Int)
#endif
)
=> (t1 IO Int -> t1 IO Int)
=> (t1 IO Int -> SerialT IO Int)
-> (t2 IO Int -> t2 IO Int)
-> [[Int]] -> Spec
composeAndComposeSimple t1 t2 answer = do
let rfold = adapt . t2 . foldMapWith (<>) return
it "Compose right associated outer expr, right folded inner" $
((A.toList. t1) (rfold [1,2,3] <> (rfold [4,5,6] <> rfold [7,8,9])))
((A.toList . t1) (rfold [1,2,3] <> (rfold [4,5,6] <> rfold [7,8,9])))
`shouldReturn` (answer !! 0)
it "Compose left associated outer expr, right folded inner" $
@ -463,10 +463,10 @@ loops
-> ([Int] -> [Int])
-> Spec
loops t tsrt hsrt = do
it "Tail recursive loop" $ (A.toList (loopTail 0) >>= return . tsrt)
it "Tail recursive loop" $ ((A.toList . adapt) (loopTail 0) >>= return . tsrt)
`shouldReturn` [0..3]
it "Head recursive loop" $ (A.toList (loopHead 0) >>= return . hsrt)
it "Head recursive loop" $ ((A.toList . adapt) (loopHead 0) >>= return . hsrt)
`shouldReturn` [0..3]
where
@ -482,7 +482,7 @@ loops t tsrt hsrt = do
bindAndComposeSimple
:: ( IsStream t1, IsStream t2, Semigroup (t2 IO Int), Monad (t2 IO))
=> (t1 IO Int -> t1 IO Int)
=> (t1 IO Int -> SerialT IO Int)
-> (t2 IO Int -> t2 IO Int)
-> Spec
bindAndComposeSimple t1 t2 = do
@ -499,7 +499,7 @@ bindAndComposeSimple t1 t2 = do
bindAndComposeHierarchy
:: ( IsStream t1, Monad (t1 IO)
, IsStream t2, Monad (t2 IO))
=> (t1 IO Int -> t1 IO Int)
=> (t1 IO Int -> SerialT IO Int)
-> (t2 IO Int -> t2 IO Int)
-> ([t2 IO Int] -> t2 IO Int)
-> Spec
@ -550,14 +550,14 @@ mixedOps = do
x <- return 1
y <- return 2
z <- do
x1 <- adapt . parallely $ return 1 <> return 2
x1 <- parallely $ return 1 <> return 2
liftIO $ return ()
liftIO $ putStr ""
y1 <- adapt . aparallely $ return 1 <> return 2
y1 <- aparallely $ return 1 <> return 2
z1 <- do
x11 <- return 1 <> return 2
y11 <- adapt . aparallely $ return 1 <> return 2
z11 <- adapt . interleaving $ return 1 <> return 2
y11 <- aparallely $ return 1 <> return 2
z11 <- interleaving $ return 1 <> return 2
liftIO $ return ()
liftIO $ putStr ""
return (x11 + y11 + z11)

View File

@ -36,7 +36,7 @@ equals eq stream list = do
constructWithReplicateM
:: IsStream t
=> (t IO Int -> t IO Int)
=> (t IO Int -> SerialT IO Int)
-> Word8
-> Property
constructWithReplicateM op len =
@ -47,11 +47,10 @@ constructWithReplicateM op len =
equals (==) stream list
transformFromList
:: IsStream t
=> ([Int] -> t IO Int)
:: ([Int] -> t IO Int)
-> ([Int] -> [Int] -> Bool)
-> ([Int] -> [Int])
-> (t IO Int -> t IO Int)
-> (t IO Int -> SerialT IO Int)
-> [Int]
-> Property
transformFromList constr eq listOp op a =
@ -61,9 +60,8 @@ transformFromList constr eq listOp op a =
equals eq stream list
foldFromList
:: IsStream t
=> ([Int] -> t IO Int)
-> (t IO Int -> t IO Int)
:: ([Int] -> t IO Int)
-> (t IO Int -> SerialT IO Int)
-> ([Int] -> [Int] -> Bool)
-> [Int]
-> Property
@ -84,8 +82,8 @@ eliminateOp constr listOp op a =
elemOp
:: ([Word8] -> t IO Word8)
-> (t IO Word8 -> t IO Word8)
-> (Word8 -> t IO Word8 -> IO Bool)
-> (t IO Word8 -> SerialT IO Word8)
-> (Word8 -> SerialT IO Word8 -> IO Bool)
-> (Word8 -> [Word8] -> Bool)
-> (Word8, [Word8])
-> Property
@ -96,10 +94,10 @@ elemOp constr op streamOp listOp (x, xs) =
equals (==) stream list
functorOps
:: (IsStream t, Functor (t IO))
:: Functor (t IO)
=> ([Int] -> t IO Int)
-> String
-> (t IO Int -> t IO Int)
-> (t IO Int -> SerialT IO Int)
-> ([Int] -> [Int] -> Bool)
-> Spec
functorOps constr desc t eq = do
@ -110,7 +108,7 @@ transformOps
:: IsStream t
=> ([Int] -> t IO Int)
-> String
-> (t IO Int -> t IO Int)
-> (t IO Int -> SerialT IO Int)
-> ([Int] -> [Int] -> Bool)
-> Spec
transformOps constr desc t eq = do
@ -159,10 +157,9 @@ wrapMaybe f =
else Just (f x)
eliminationOps
:: IsStream t
=> ([Int] -> t IO Int)
:: ([Int] -> t IO Int)
-> String
-> (t IO Int -> t IO Int)
-> (t IO Int -> SerialT IO Int)
-> Spec
eliminationOps constr desc t = do
-- Elimination
@ -181,10 +178,9 @@ eliminationOps constr desc t = do
-- head/tail/last may depend on the order in case of parallel streams
-- so we test these only for serial streams.
serialEliminationOps
:: IsStream t
=> ([Int] -> t IO Int)
:: ([Int] -> t IO Int)
-> String
-> (t IO Int -> t IO Int)
-> (t IO Int -> SerialT IO Int)
-> Spec
serialEliminationOps constr desc t = do
prop (desc ++ " head") $ eliminateOp constr (wrapMaybe head) $ A.head . t
@ -196,10 +192,9 @@ serialEliminationOps constr desc t = do
prop (desc ++ " last") $ eliminateOp constr (wrapMaybe last) $ A.last . t
transformOpsWord8
:: IsStream t
=> ([Word8] -> t IO Word8)
:: ([Word8] -> t IO Word8)
-> String
-> (t IO Word8 -> t IO Word8)
-> (t IO Word8 -> SerialT IO Word8)
-> Spec
transformOpsWord8 constr desc t = do
prop (desc ++ " elem") $ elemOp constr t A.elem elem
@ -214,7 +209,7 @@ semigroupOps
#endif
, Monoid (t IO Int))
=> String
-> (t IO Int -> t IO Int)
-> (t IO Int -> SerialT IO Int)
-> ([Int] -> [Int] -> Bool)
-> Spec
semigroupOps desc t eq = do
@ -222,9 +217,9 @@ semigroupOps desc t eq = do
prop (desc ++ " mappend") $ foldFromList (foldMapWith mappend singleton) t eq
applicativeOps
:: (IsStream t, Applicative (t IO))
:: Applicative (t IO)
=> ([Int] -> t IO Int)
-> (t IO (Int, Int) -> t IO (Int, Int))
-> (t IO (Int, Int) -> SerialT IO (Int, Int))
-> ([(Int, Int)] -> [(Int, Int)] -> Bool)
-> ([Int], [Int])
-> Property
@ -236,7 +231,7 @@ applicativeOps constr t eq (a, b) = monadicIO $ do
zipApplicative
:: (IsStream t, Applicative (t IO))
=> ([Int] -> t IO Int)
-> (t IO (Int, Int) -> t IO (Int, Int))
-> (t IO (Int, Int) -> SerialT IO (Int, Int))
-> ([(Int, Int)] -> [(Int, Int)] -> Bool)
-> ([Int], [Int])
-> Property
@ -252,7 +247,7 @@ zipApplicative constr t eq (a, b) = monadicIO $ do
zipMonadic
:: (IsStream t, Monad (t IO))
=> ([Int] -> t IO Int)
-> (t IO (Int, Int) -> t IO (Int, Int))
-> (t IO (Int, Int) -> SerialT IO (Int, Int))
-> ([(Int, Int)] -> [(Int, Int)] -> Bool)
-> ([Int], [Int])
-> Property
@ -271,9 +266,9 @@ zipMonadic constr t eq (a, b) =
equals eq stream2 list
monadThen
:: (IsStream t, Monad (t IO))
:: Monad (t IO)
=> ([Int] -> t IO Int)
-> (t IO Int -> t IO Int)
-> (t IO Int -> SerialT IO Int)
-> ([Int] -> [Int] -> Bool)
-> ([Int], [Int])
-> Property
@ -283,9 +278,9 @@ monadThen constr t eq (a, b) = monadicIO $ do
equals eq stream list
monadBind
:: (IsStream t, Monad (t IO))
:: Monad (t IO)
=> ([Int] -> t IO Int)
-> (t IO Int -> t IO Int)
-> (t IO Int -> SerialT IO Int)
-> ([Int] -> [Int] -> Bool)
-> ([Int], [Int])
-> Property
@ -313,7 +308,7 @@ main = hspec $ do
`shouldReturn` (take 100 $ iterate (+ 1) 0)
let folded :: IsStream t => [a] -> t IO a
folded = adapt . serially . (\xs ->
folded = serially . (\xs ->
case xs of
[x] -> return x -- singleton stream case
_ -> foldMapWith (<>) return xs