rename "once" to "yieldM"

and some other related refactoring changes.
This commit is contained in:
Harendra Kumar 2018-06-23 06:50:59 +05:30
parent 5ee02ffbf6
commit 9c85811905
20 changed files with 343 additions and 304 deletions

View File

@ -1,3 +1,9 @@
## Unreleased
### Deprecations
* `once` has been deprecated and renamed to `yieldM`
## 0.3.0
### Breaking changes

View File

@ -89,7 +89,7 @@ sourceFoldMapWith n = S.foldMapWith (S.<>) return [n..n+value]
{-# INLINE sourceFoldMapWithM #-}
sourceFoldMapWithM :: (S.IsStream t, Monad m, S.Semigroup (t m Int))
=> Int -> t m Int
sourceFoldMapWithM n = S.foldMapWith (S.<>) (S.once . return) [n..n+value]
sourceFoldMapWithM n = S.foldMapWith (S.<>) (S.yieldM . return) [n..n+value]
{-# INLINE sourceUnfoldr #-}
sourceUnfoldr :: S.IsStream t => Int -> t m Int

View File

@ -9,7 +9,7 @@
import Data.IORef
import Graphics.UI.SDL as SDL
import Streamly
import Streamly.Prelude (once)
import Streamly.Prelude (yieldM)
import Streamly.Time
------------------------------------------------------------------------------
@ -87,5 +87,5 @@ main :: IO ()
main = do
sdlInit
cref <- newIORef (0,0)
runStream $ once (updateController cref)
`parallel` once (updateDisplay cref)
runStream $ yieldM (updateController cref)
`parallel` yieldM (updateDisplay cref)

View File

@ -4,29 +4,29 @@ import Data.Word
import System.Random (getStdGen, randoms)
import Data.List (sort)
import Streamly
import Streamly.Prelude (once)
import Streamly.Prelude (yieldM)
import qualified Streamly.Prelude as A
getSorted :: Serial Word16
getSorted = do
g <- once getStdGen
g <- yieldM getStdGen
let ls = take 100000 (randoms g) :: [Word16]
foldMap return (sort ls)
-- | merge two streams generating the elements from each in parallel
mergeAsync :: Ord a => Serial a -> Serial a -> Serial a
mergeAsync a b = do
x <- once $ mkAsync a
y <- once $ mkAsync b
x <- yieldM $ mkAsync a
y <- yieldM $ mkAsync b
merge x y
merge :: Ord a => Serial a -> Serial a -> Serial a
merge a b = do
a1 <- once $ A.uncons a
a1 <- yieldM $ A.uncons a
case a1 of
Nothing -> b
Just (x, ma) -> do
b1 <- once $ A.uncons b
b1 <- yieldM $ A.uncons b
case b1 of
Nothing -> return x <> ma
Just (y, mb) ->

View File

@ -1,5 +1,5 @@
import Streamly
import Streamly.Prelude (nil, once, (|:))
import Streamly.Prelude (nil, yieldM, (|:))
import Network.HTTP.Simple
-- | Runs three search engine queries in parallel and prints the search engine
@ -13,10 +13,11 @@ main = do
runStream . parallely $ google |: bing |: duckduckgo |: nil
putStrLn "\nUsing parallel semigroup composition"
runStream . parallely $ once google <> once bing <> once duckduckgo
runStream . parallely $ yieldM google <> yieldM bing <> yieldM duckduckgo
putStrLn "\nUsing parallel applicative zip"
runStream . zipAsyncly $ (,,) <$> once google <*> once bing <*> once duckduckgo
runStream . zipAsyncly $
(,,) <$> yieldM google <*> yieldM bing <*> yieldM duckduckgo
where
get :: String -> IO ()

View File

@ -53,7 +53,7 @@ module Streamly.Prelude
-- * Special Generation
-- | Generate a monadic stream from an input structure, a seed or a
-- generation function.
, once
, K.yieldM
, replicateM
, repeatM
, iterate
@ -121,6 +121,7 @@ module Streamly.Prelude
, toHandle
-- * Deprecated
, once
, each
, scan
, foldl
@ -217,7 +218,7 @@ unfoldrM step = go
case mayb of
Nothing -> stp
Just (a, b) ->
K.runStream (toStream (return a |: go b)) svr stp sng yld
K.unStream (toStream (return a |: go b)) svr stp sng yld
-- | Construct a stream from a 'Foldable' containing pure values.
--
@ -310,7 +311,7 @@ iterateM step = go
where
go s = fromStream $ K.Stream $ \svr stp sng yld -> do
next <- step s
K.runStream (toStream (return s |: go next)) svr stp sng yld
K.unStream (toStream (return s |: go next)) svr stp sng yld
-- | Read lines from an IO Handle into a stream of Strings.
--
@ -344,8 +345,8 @@ foldr step acc m = go (toStream m)
go m1 =
let stop = return acc
single a = return (step a acc)
yield a r = go r >>= \b -> return (step a b)
in (K.runStream m1) Nothing stop single yield
yieldk a r = go r >>= \b -> return (step a b)
in (K.unStream m1) Nothing stop single yieldk
-- | Lazy right fold with a monadic step function. For example, to fold a
-- stream into a list:
@ -363,8 +364,8 @@ foldrM step acc m = go (toStream m)
go m1 =
let stop = return acc
single a = step a acc
yield a r = go r >>= step a
in (K.runStream m1) Nothing stop single yield
yieldk a r = go r >>= step a
in (K.unStream m1) Nothing stop single yieldk
-- | Strict left scan with an extraction function. Like 'scanl'', but applies a
-- user supplied extraction function (the third argument) at each step. This is
@ -379,10 +380,10 @@ scanx step begin done m =
where
go m1 !acc = K.Stream $ \_ stp sng yld ->
let single a = sng (done $ step acc a)
yield a r =
yieldk a r =
let s = step acc a
in yld (done s) (go r s)
in K.runStream m1 Nothing stp single yield
in K.unStream m1 Nothing stp single yieldk
-- |
-- @since 0.1.1
@ -413,7 +414,7 @@ foldx step begin done m = get $ go (toStream m) begin
{-# NOINLINE get #-}
get m1 =
let single = return . done
in (K.runStream m1) Nothing undefined single undefined
in (K.unStream m1) Nothing undefined single undefined
-- Note, this can be implemented by making a recursive call to "go",
-- however that is more expensive because of unnecessary recursion
@ -422,10 +423,10 @@ foldx step begin done m = get $ go (toStream m) begin
go m1 !acc = K.Stream $ \_ _ sng yld ->
let stop = sng acc
single a = sng $ step acc a
yield a r =
yieldk a r =
let stream = go r (step acc a)
in (K.runStream stream) Nothing undefined sng yld
in (K.runStream m1) Nothing stop single yield
in (K.unStream stream) Nothing undefined sng yld
in (K.unStream m1) Nothing stop single yieldk
-- |
-- @since 0.1.0
@ -450,8 +451,8 @@ foldxM step begin done m = go begin (toStream m)
go !acc m1 =
let stop = acc >>= done
single a = acc >>= \b -> step b a >>= done
yield a r = acc >>= \b -> go (step b a) r
in (K.runStream m1) Nothing stop single yield
yieldk a r = acc >>= \b -> go (step b a) r
in (K.unStream m1) Nothing stop single yieldk
-- |
-- @since 0.1.0
@ -474,8 +475,8 @@ uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a))
uncons m =
let stop = return Nothing
single a = return (Just (a, nil))
yield a r = return (Just (a, fromStream r))
in (K.runStream (toStream m)) Nothing stop single yield
yieldk a r = return (Just (a, fromStream r))
in (K.unStream (toStream m)) Nothing stop single yieldk
-- | Write a stream of Strings to an IO Handle.
--
@ -486,8 +487,8 @@ toHandle h m = go (toStream m)
go m1 =
let stop = return ()
single a = liftIO (IO.hPutStrLn h a)
yield a r = liftIO (IO.hPutStrLn h a) >> go r
in (K.runStream m1) Nothing stop single yield
yieldk a r = liftIO (IO.hPutStrLn h a) >> go r
in (K.unStream m1) Nothing stop single yieldk
------------------------------------------------------------------------------
-- Special folds
@ -508,8 +509,8 @@ take :: IsStream t => Int -> t m a -> t m a
take n m = fromStream $ go n (toStream m)
where
go n1 m1 = K.Stream $ \_ stp sng yld ->
let yield a r = yld a (go (n1 - 1) r)
in if n1 <= 0 then stp else (K.runStream m1) Nothing stp sng yield
let yieldk a r = yld a (go (n1 - 1) r)
in if n1 <= 0 then stp else (K.unStream m1) Nothing stp sng yieldk
-- | Include only those elements that pass a predicate.
--
@ -521,9 +522,9 @@ filter p m = fromStream $ go (toStream m)
go m1 = K.Stream $ \_ stp sng yld ->
let single a | p a = sng a
| otherwise = stp
yield a r | p a = yld a (go r)
| otherwise = (K.runStream r) Nothing stp single yield
in (K.runStream m1) Nothing stp single yield
yieldk a r | p a = yld a (go r)
| otherwise = (K.unStream r) Nothing stp single yieldk
in (K.unStream m1) Nothing stp single yieldk
-- | End the stream as soon as the predicate fails on an element.
--
@ -535,9 +536,9 @@ takeWhile p m = fromStream $ go (toStream m)
go m1 = K.Stream $ \_ stp sng yld ->
let single a | p a = sng a
| otherwise = stp
yield a r | p a = yld a (go r)
yieldk a r | p a = yld a (go r)
| otherwise = stp
in (K.runStream m1) Nothing stp single yield
in (K.unStream m1) Nothing stp single yieldk
-- | Discard first 'n' elements from the stream and take the rest.
--
@ -547,11 +548,11 @@ drop n m = fromStream $ go n (toStream m)
where
go n1 m1 = K.Stream $ \_ stp sng yld ->
let single _ = stp
yield _ r = (K.runStream $ go (n1 - 1) r) Nothing stp sng yld
yieldk _ r = (K.unStream $ go (n1 - 1) r) Nothing stp sng yld
-- Somehow "<=" check performs better than a ">"
in if n1 <= 0
then (K.runStream m1) Nothing stp sng yld
else (K.runStream m1) Nothing stp single yield
then (K.unStream m1) Nothing stp sng yld
else (K.unStream m1) Nothing stp single yieldk
-- | Drop elements in the stream as long as the predicate succeeds and then
-- take the rest of the stream.
@ -564,9 +565,9 @@ dropWhile p m = fromStream $ go (toStream m)
go m1 = K.Stream $ \_ stp sng yld ->
let single a | p a = stp
| otherwise = sng a
yield a r | p a = (K.runStream r) Nothing stp single yield
yieldk a r | p a = (K.unStream r) Nothing stp single yieldk
| otherwise = yld a r
in (K.runStream m1) Nothing stp single yield
in (K.unStream m1) Nothing stp single yieldk
-- | Determine whether all elements of a stream satisfy a predicate.
--
@ -577,9 +578,9 @@ all p m = go (toStream m)
go m1 =
let single a | p a = return True
| otherwise = return False
yield a r | p a = go r
yieldk a r | p a = go r
| otherwise = return False
in (K.runStream m1) Nothing (return True) single yield
in (K.unStream m1) Nothing (return True) single yieldk
-- | Determine whether any of the elements of a stream satisfy a predicate.
--
@ -590,9 +591,9 @@ any p m = go (toStream m)
go m1 =
let single a | p a = return True
| otherwise = return False
yield a r | p a = return True
yieldk a r | p a = return True
| otherwise = go r
in (K.runStream m1) Nothing (return False) single yield
in (K.unStream m1) Nothing (return False) single yieldk
-- | Determine the sum of all elements of a stream of numbers
--
@ -613,8 +614,8 @@ head :: Monad m => SerialT m a -> m (Maybe a)
head m =
let stop = return Nothing
single a = return (Just a)
yield a _ = return (Just a)
in (K.runStream (toStream m)) Nothing stop single yield
yieldk a _ = return (Just a)
in (K.unStream (toStream m)) Nothing stop single yieldk
-- | Extract all but the first element of the stream, if any.
--
@ -623,8 +624,8 @@ tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a))
tail m =
let stop = return Nothing
single _ = return $ Just nil
yield _ r = return $ Just $ fromStream r
in (K.runStream (toStream m)) Nothing stop single yield
yieldk _ r = return $ Just $ fromStream r
in (K.unStream (toStream m)) Nothing stop single yieldk
-- | Extract the last element of the stream, if any.
--
@ -640,8 +641,8 @@ null :: Monad m => SerialT m a -> m Bool
null m =
let stop = return True
single _ = return False
yield _ _ = return False
in (K.runStream (toStream m)) Nothing stop single yield
yieldk _ _ = return False
in (K.unStream (toStream m)) Nothing stop single yieldk
-- | Determine whether an element is present in the stream.
--
@ -652,8 +653,8 @@ elem e m = go (toStream m)
go m1 =
let stop = return False
single a = return (a == e)
yield a r = if a == e then return True else go r
in (K.runStream m1) Nothing stop single yield
yieldk a r = if a == e then return True else go r
in (K.unStream m1) Nothing stop single yieldk
-- | Determine whether an element is not present in the stream.
--
@ -664,8 +665,8 @@ notElem e m = go (toStream m)
go m1 =
let stop = return True
single a = return (a /= e)
yield a r = if a == e then return False else go r
in (K.runStream m1) Nothing stop single yield
yieldk a r = if a == e then return False else go r
in (K.unStream m1) Nothing stop single yieldk
-- | Determine the length of the stream.
--
@ -681,11 +682,11 @@ reverse :: (IsStream t) => t m a -> t m a
reverse m = fromStream $ go K.nil (toStream m)
where
go rev rest = K.Stream $ \_ stp sng yld ->
let runIt x = K.runStream x Nothing stp sng yld
let runIt x = K.unStream x Nothing stp sng yld
stop = runIt rev
single a = runIt $ a `K.cons` rev
yield a r = runIt $ go (a `K.cons` rev) r
in K.runStream rest Nothing stop single yield
yieldk a r = runIt $ go (a `K.cons` rev) r
in K.unStream rest Nothing stop single yieldk
-- XXX replace the recursive "go" with continuation
-- | Determine the minimum element in a stream.
@ -697,8 +698,8 @@ minimum m = go Nothing (toStream m)
go res m1 =
let stop = return res
single a = return $ min_ a res
yield a r = go (min_ a res) r
in (K.runStream m1) Nothing stop single yield
yieldk a r = go (min_ a res) r
in (K.unStream m1) Nothing stop single yieldk
min_ a res = case res of
Nothing -> Just a
@ -714,8 +715,8 @@ maximum m = go Nothing (toStream m)
go res m1 =
let stop = return res
single a = return $ max_ a res
yield a r = go (max_ a res) r
in (K.runStream m1) Nothing stop single yield
yieldk a r = go (max_ a res) r
in (K.unStream m1) Nothing stop single yieldk
max_ a res = case res of
Nothing -> Just a
@ -745,8 +746,8 @@ mapM f m = go (toStream m)
where
go m1 = fromStream $ K.Stream $ \svr stp sng yld ->
let single a = f a >>= sng
yield a r = K.runStream (toStream (f a |: (go r))) svr stp sng yld
in (K.runStream m1) Nothing stp single yield
yieldk a r = K.unStream (toStream (f a |: (go r))) svr stp sng yld
in (K.unStream m1) Nothing stp single yieldk
-- | Map a 'Maybe' returning function to a stream, filter out the 'Nothing'
-- elements, and return a stream of values extracted from 'Just'.
@ -760,10 +761,10 @@ mapMaybe f m = go (toStream m)
let single a = case f a of
Just b -> sng b
Nothing -> stp
yield a r = case f a of
yieldk a r = case f a of
Just b -> yld b (toStream $ go r)
Nothing -> (K.runStream r) Nothing stp single yield
in (K.runStream m1) Nothing stp single yield
Nothing -> (K.unStream r) Nothing stp single yieldk
in (K.unStream m1) Nothing stp single yieldk
-- | Like 'mapMaybe' but maps a monadic function.
--
@ -786,8 +787,8 @@ mapM_ f m = go (toStream m)
go m1 =
let stop = return ()
single a = void (f a)
yield a r = f a >> go r
in (K.runStream m1) Nothing stop single yield
yieldk a r = f a >> go r
in (K.unStream m1) Nothing stop single yieldk
-- | Reduce a stream of monadic actions to a stream of the output of those
-- actions.
@ -808,5 +809,5 @@ sequence m = go (toStream m)
where
go m1 = fromStream $ K.Stream $ \svr stp sng yld ->
let single ma = ma >>= sng
yield ma r = K.runStream (toStream $ ma |: go r) svr stp sng yld
in (K.runStream m1) Nothing stp single yield
yieldk ma r = K.unStream (toStream $ ma |: go r) svr stp sng yld
in (K.unStream m1) Nothing stp single yieldk

View File

@ -141,9 +141,9 @@ workLoopAhead sv q heap = runHeap
yieldOutput seqNo a r = do
continue <- liftIO $ send sv (ChildYield a)
if continue
then (runStream r) (Just sv) (runQueueToken seqNo)
(singleOutput seqNo)
(yieldOutput seqNo)
then unStream r (Just sv) (runQueueToken seqNo)
(singleOutput seqNo)
(yieldOutput seqNo)
else liftIO $ do
atomicModifyIORefCAS_ heap $ \(h, _) ->
(H.insert (Entry seqNo (AheadEntryStream r)) h, seqNo)
@ -160,15 +160,15 @@ workLoopAhead sv q heap = runHeap
Just (m, seqNo) -> do
if seqNo == prevSeqNo + 1
then
(runStream m) (Just sv) (runQueueToken seqNo)
(singleOutput seqNo)
(yieldOutput seqNo)
unStream m (Just sv) (runQueueToken seqNo)
(singleOutput seqNo)
(yieldOutput seqNo)
else do
liftIO $ atomicModifyIORefCAS_ heap $ \(h, _) ->
(h, prevSeqNo + 1)
(runStream m) (Just sv) runHeap
(singleToHeap seqNo)
(yieldToHeap seqNo)
unStream m (Just sv) runHeap
(singleToHeap seqNo)
(yieldToHeap seqNo)
runQueueNoToken = do
work <- dequeueAhead q
case work of
@ -176,13 +176,13 @@ workLoopAhead sv q heap = runHeap
Just (m, seqNo) -> do
if seqNo == 0
then
(runStream m) (Just sv) (runQueueToken seqNo)
(singleOutput seqNo)
(yieldOutput seqNo)
unStream m (Just sv) (runQueueToken seqNo)
(singleOutput seqNo)
(yieldOutput seqNo)
else
(runStream m) (Just sv) runHeap
(singleToHeap seqNo)
(yieldToHeap seqNo)
unStream m (Just sv) runHeap
(singleToHeap seqNo)
(yieldToHeap seqNo)
{-# NOINLINE runHeap #-}
runHeap = do
@ -203,9 +203,9 @@ workLoopAhead sv q heap = runHeap
case hent of
AheadEntryPure a -> singleOutput seqNo a
AheadEntryStream r ->
(runStream r) (Just sv) (runQueueToken seqNo)
(singleOutput seqNo)
(yieldOutput seqNo)
unStream r (Just sv) (runQueueToken seqNo)
(singleOutput seqNo)
(yieldOutput seqNo)
-------------------------------------------------------------------------------
-- WAhead
@ -220,11 +220,11 @@ workLoopAhead sv q heap = runHeap
forkSVarAhead :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
forkSVarAhead m1 m2 = Stream $ \_ stp sng yld -> do
sv <- newAheadVar (concurrently m1 m2) workLoopAhead
(runStream (fromSVar sv)) Nothing stp sng yld
unStream (fromSVar sv) Nothing stp sng yld
where
concurrently ma mb = Stream $ \svr stp sng yld -> do
liftIO $ enqueue (fromJust svr) mb
(runStream ma) Nothing stp sng yld
unStream ma Nothing stp sng yld
{-# INLINE aheadS #-}
aheadS :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
@ -235,14 +235,14 @@ aheadS m1 m2 = Stream $ \svr stp sng yld -> do
-- Always run the left side on a new SVar to avoid complexity in
-- sequencing results. This means the left side cannot further
-- split into more ahead computations on the same SVar.
(runStream m1) Nothing stp sng yld
_ -> runStream (forkSVarAhead m1 m2) Nothing stp sng yld
unStream m1 Nothing stp sng yld
_ -> unStream (forkSVarAhead m1 m2) Nothing stp sng yld
-- | XXX we can implement it more efficienty by directly implementing instead
-- of combining streams using ahead.
{-# INLINE consMAhead #-}
consMAhead :: MonadAsync m => m a -> Stream m a -> Stream m a
consMAhead m r = once m `aheadS` r
consMAhead m r = yieldM m `aheadS` r
------------------------------------------------------------------------------
-- AheadT
@ -356,10 +356,10 @@ aheadbind m f = go m
where
go (Stream g) =
Stream $ \ctx stp sng yld ->
let runIt x = (runStream x) ctx stp sng yld
single a = runIt $ f a
yield a r = runIt $ f a `aheadS` go r
in g Nothing stp single yield
let run x = unStream x ctx stp sng yld
single a = run $ f a
yieldk a r = run $ f a `aheadS` go r
in g Nothing stp single yieldk
instance MonadAsync m => Monad (AheadT m) where
return = pure

View File

@ -64,15 +64,15 @@ import Streamly.SVar
{-# INLINE runStreamLIFO #-}
runStreamLIFO :: MonadIO m
=> SVar Stream m a -> IORef [Stream m a] -> Stream m a -> m () -> m ()
runStreamLIFO sv q m stop = runStream m (Just sv) stop single yield
runStreamLIFO sv q m stop = unStream m (Just sv) stop single yieldk
where
single a = do
res <- liftIO $ send sv (ChildYield a)
if res then stop else liftIO $ sendStop sv
yield a r = do
yieldk a r = do
res <- liftIO $ send sv (ChildYield a)
if res
then (runStream r) (Just sv) stop single yield
then (unStream r) (Just sv) stop single yieldk
else liftIO $ enqueueLIFO sv q r >> sendStop sv
-------------------------------------------------------------------------------
@ -82,12 +82,12 @@ runStreamLIFO sv q m stop = runStream m (Just sv) stop single yield
{-# INLINE runStreamFIFO #-}
runStreamFIFO :: MonadIO m
=> SVar Stream m a -> LinkedQueue (Stream m a) -> Stream m a -> m () -> m ()
runStreamFIFO sv q m stop = runStream m (Just sv) stop single yield
runStreamFIFO sv q m stop = unStream m (Just sv) stop single yieldk
where
single a = do
res <- liftIO $ send sv (ChildYield a)
if res then stop else liftIO $ sendStop sv
yield a r = do
yieldk a r = do
res <- liftIO $ send sv (ChildYield a)
liftIO (enqueueFIFO sv q r)
if res then stop else liftIO $ sendStop sv
@ -278,11 +278,11 @@ forkSVarAsync style m1 m2 = Stream $ \_ stp sng yld -> do
AsyncVar -> newAsyncVar (concurrently m1 m2)
WAsyncVar -> newWAsyncVar (concurrently m1 m2)
_ -> error "illegal svar type"
runStream (fromSVar sv) Nothing stp sng yld
unStream (fromSVar sv) Nothing stp sng yld
where
concurrently ma mb = Stream $ \svr stp sng yld -> do
liftIO $ enqueue (fromJust svr) mb
(runStream ma) svr stp sng yld
unStream ma svr stp sng yld
{-# INLINE joinStreamVarAsync #-}
joinStreamVarAsync :: MonadAsync m
@ -290,8 +290,8 @@ joinStreamVarAsync :: MonadAsync m
joinStreamVarAsync style m1 m2 = Stream $ \svr stp sng yld ->
case svr of
Just sv | svarStyle sv == style ->
liftIO (enqueue sv m2) >> (runStream m1) svr stp sng yld
_ -> runStream (forkSVarAsync style m1 m2) Nothing stp sng yld
liftIO (enqueue sv m2) >> unStream m1 svr stp sng yld
_ -> unStream (forkSVarAsync style m1 m2) Nothing stp sng yld
------------------------------------------------------------------------------
-- Semigroup and Monoid style compositions for parallel actions
@ -323,7 +323,7 @@ async m1 m2 = fromStream $
-- of combining streams using async.
{-# INLINE consMAsync #-}
consMAsync :: MonadAsync m => m a -> Stream m a -> Stream m a
consMAsync m r = once m `asyncS` r
consMAsync m r = yieldM m `asyncS` r
------------------------------------------------------------------------------
-- AsyncT
@ -447,7 +447,7 @@ wAsyncS = joinStreamVarAsync WAsyncVar
-- of combining streams using wAsync.
{-# INLINE consMWAsync #-}
consMWAsync :: MonadAsync m => m a -> Stream m a -> Stream m a
consMWAsync m r = once m `wAsyncS` r
consMWAsync m r = yieldM m `wAsyncS` r
-- | Polymorphic version of the 'Semigroup' operation '<>' of 'WAsyncT'.
-- Merges two streams concurrently choosing elements from both fairly.
@ -567,4 +567,4 @@ MONAD_COMMON_INSTANCES(WAsyncT, MONADPARALLEL)
-- @since 0.1.0
{-# DEPRECATED runAsyncT "Please use 'runStream . asyncly' instead." #-}
runAsyncT :: Monad m => AsyncT m a -> m ()
runAsyncT = run
runAsyncT = runStream

View File

@ -9,7 +9,7 @@
#define MONAD_APPLICATIVE_INSTANCE(STREAM,CONSTRAINT) \
instance (Monad m CONSTRAINT) => Applicative (STREAM m) where { \
pure = STREAM . singleton; \
pure = STREAM . yield; \
(<*>) = ap }
#define MONAD_COMMON_INSTANCES(STREAM,CONSTRAINT) \

View File

@ -58,7 +58,7 @@ import Streamly.SVar
{-# NOINLINE runOne #-}
runOne :: MonadIO m => SVar Stream m a -> Stream m a -> m ()
runOne sv m = (runStream m) (Just sv) stop single yield
runOne sv m = (unStream m) (Just sv) stop single yieldk
where
@ -69,7 +69,7 @@ runOne sv m = (runStream m) (Just sv) stop single yield
-- queue and queue it back on that and exit the thread when the outputQueue
-- overflows. Parallel is dangerous because it can accumulate unbounded
-- output in the buffer.
yield a r = void (sendit a) >> runOne sv r
yieldk a r = void (sendit a) >> runOne sv r
{-# NOINLINE forkSVarPar #-}
forkSVarPar :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
@ -77,7 +77,7 @@ forkSVarPar m r = Stream $ \_ stp sng yld -> do
sv <- newParallelVar
pushWorkerPar sv (runOne sv m)
pushWorkerPar sv (runOne sv r)
(runStream (fromSVar sv)) Nothing stp sng yld
(unStream (fromSVar sv)) Nothing stp sng yld
{-# INLINE joinStreamVarPar #-}
joinStreamVarPar :: MonadAsync m
@ -85,8 +85,8 @@ joinStreamVarPar :: MonadAsync m
joinStreamVarPar style m1 m2 = Stream $ \svr stp sng yld ->
case svr of
Just sv | svarStyle sv == style -> do
pushWorkerPar sv (runOne sv m1) >> (runStream m2) svr stp sng yld
_ -> runStream (forkSVarPar m1 m2) Nothing stp sng yld
pushWorkerPar sv (runOne sv m1) >> (unStream m2) svr stp sng yld
_ -> unStream (forkSVarPar m1 m2) Nothing stp sng yld
{-# INLINE parallelStream #-}
parallelStream :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
@ -96,7 +96,7 @@ parallelStream = joinStreamVarPar ParallelVar
-- of combining streams using parallel.
{-# INLINE consMParallel #-}
consMParallel :: MonadAsync m => m a -> Stream m a -> Stream m a
consMParallel m r = once m `parallelStream` r
consMParallel m r = yieldM m `parallelStream` r
-- | Polymorphic version of the 'Semigroup' operation '<>' of 'ParallelT'
-- Merges two streams concurrently.
@ -125,7 +125,7 @@ applyWith :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b
applyWith f m = fromStream $ Stream $ \svr stp sng yld -> do
sv <- newParallelVar
pushWorkerPar sv (runOne sv (toStream m))
runStream (toStream $ f $ fromSVar sv) svr stp sng yld
unStream (toStream $ f $ fromSVar sv) svr stp sng yld
------------------------------------------------------------------------------
-- Stream runner concurrent function application
@ -369,4 +369,4 @@ MONAD_COMMON_INSTANCES(ParallelT, MONADPARALLEL)
-- @since 0.1.0
{-# DEPRECATED runParallelT "Please use 'runStream . parallely' instead." #-}
runParallelT :: Monad m => ParallelT m a -> m ()
runParallelT = run
runParallelT = runStream

View File

@ -21,13 +21,8 @@
--
module Streamly.Streams.Prelude
(
-- * Construction
streamBuild
, fromCallback
-- * Elimination
, streamFold
, runStream
runStream
, runStreaming -- deprecated
-- * Fold Utilities
@ -38,11 +33,28 @@ module Streamly.Streams.Prelude
where
import Streamly.Streams.Serial (SerialT)
import Streamly.SVar (SVar)
import Streamly.Streams.StreamK hiding (runStream)
import qualified Streamly.Streams.StreamK as K
------------------------------------------------------------------------------
-- Eliminating a stream
------------------------------------------------------------------------------
-- | Run a streaming composition, discard the results. By default it interprets
-- the stream as 'SerialT', to run other types of streams use the type adapting
-- combinators for example @runStream . 'asyncly'@.
--
-- @since 0.2.0
runStream :: Monad m => SerialT m a -> m ()
runStream = K.runStream
-- | Same as 'runStream'
--
-- @since 0.1.0
{-# DEPRECATED runStreaming "Please use runStream instead." #-}
runStreaming :: (Monad m, IsStream t) => t m a -> m ()
runStreaming = runStream . adapt
------------------------------------------------------------------------------
-- Fold Utilities
------------------------------------------------------------------------------
@ -79,30 +91,6 @@ forEachWith :: (IsStream t, Foldable f)
=> (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b
forEachWith f xs g = foldr (f . g) nil xs
------------------------------------------------------------------------------
-- Generation
------------------------------------------------------------------------------
-- | Build a stream from its church encoding. The function passed maps
-- directly to the underlying representation of the stream type. The second
-- parameter to the function is the "yield" function yielding a value and the
-- remaining stream if any otherwise 'Nothing'. The third parameter is to
-- represent an "empty" stream.
streamBuild :: IsStream t
=> (forall r. Maybe (SVar Stream m a)
-> (a -> t m a -> m r)
-> (a -> m r)
-> m r
-> m r)
-> t m a
streamBuild k = fromStream $ Stream $ \svr stp sng yld ->
let yield a r = yld a (toStream r)
in k svr yield sng stp
-- | Build a singleton stream from a callback function.
fromCallback :: IsStream t => (forall r. (a -> m r) -> m r) -> t m a
fromCallback k = fromStream $ Stream $ \_ _ sng _ -> k sng
-------------------------------------------------------------------------------
-- Generation by unfold
-------------------------------------------------------------------------------
@ -128,37 +116,3 @@ unfoldrM step = go
Just (a, b) ->
K.runStream (a `cons` go b) svr stp sng yld
-}
------------------------------------------------------------------------------
-- Destroying a stream
------------------------------------------------------------------------------
-- | Fold a stream using its church encoding. The second argument is the "step"
-- function consuming an element and the remaining stream, if any. The third
-- argument is for consuming an "empty" stream that yields nothing.
streamFold
:: IsStream t
=> Maybe (SVar Stream m a)
-> (a -> t m a -> m r)
-> (a -> m r)
-> m r
-> t m a
-> m r
streamFold svr step single blank m =
let yield a x = step a (fromStream x)
in (K.runStream (toStream m)) svr blank single yield
-- | Run a streaming composition, discard the results. By default it interprets
-- the stream as 'SerialT', to run other types of streams use the type adapting
-- combinators for example @runStream . 'asyncly'@.
--
-- @since 0.2.0
runStream :: Monad m => SerialT m a -> m ()
runStream = run
-- | Same as 'runStream'
--
-- @since 0.1.0
{-# DEPRECATED runStreaming "Please use runStream instead." #-}
runStreaming :: (Monad m, IsStream t) => t m a -> m ()
runStreaming = runStream . adapt

View File

@ -44,7 +44,7 @@ fromStreamVar sv = Stream $ \_ stp sng yld -> do
-- Reversing the output is important to guarantee that we process the
-- outputs in the same order as they were generated by the constituent
-- streams.
runStream (processEvents $ reverse list) Nothing stp sng yld
unStream (processEvents $ reverse list) Nothing stp sng yld
where
@ -62,7 +62,7 @@ fromStreamVar sv = Stream $ \_ stp sng yld -> do
done <- postProcess sv
if done
then allDone stp
else runStream (fromStreamVar sv) Nothing stp sng yld
else unStream (fromStreamVar sv) Nothing stp sng yld
processEvents (ev : es) = Stream $ \_ stp sng yld -> do
let rest = processEvents es
@ -71,7 +71,7 @@ fromStreamVar sv = Stream $ \_ stp sng yld -> do
ChildStop tid e -> do
accountThread sv tid
case e of
Nothing -> runStream rest Nothing stp sng yld
Nothing -> unStream rest Nothing stp sng yld
Just ex -> throwM ex
{-# INLINE fromSVar #-}

View File

@ -171,10 +171,10 @@ serial m1 m2 = fromStream $ C.serial (toStream m1) (toStream m2)
instance Monad m => Monad (SerialT m) where
return = pure
(SerialT (Stream m)) >>= f = SerialT $ Stream $ \_ stp sng yld ->
let runIt x = (runStream x) Nothing stp sng yld
single a = runIt $ toStream (f a)
yield a r = runIt $ toStream $ f a <> (fromStream r >>= f)
in m Nothing stp single yield
let run x = (unStream x) Nothing stp sng yld
single a = run $ toStream (f a)
yieldk a r = run $ toStream $ f a <> (fromStream r >>= f)
in m Nothing stp single yieldk
------------------------------------------------------------------------------
-- Other instances
@ -272,10 +272,10 @@ instance IsStream WSerialT where
{-# INLINE interleave #-}
interleave :: Stream m a -> Stream m a -> Stream m a
interleave m1 m2 = Stream $ \_ stp sng yld -> do
let stop = (runStream m2) Nothing stp sng yld
single a = yld a m2
yield a r = yld a (interleave m2 r)
(runStream m1) Nothing stop single yield
let stop = (unStream m2) Nothing stp sng yld
single a = yld a m2
yieldk a r = yld a (interleave m2 r)
(unStream m1) Nothing stop single yieldk
-- | Polymorphic version of the 'Semigroup' operation '<>' of 'WSerialT'.
-- Interleaves two streams, yielding one element from each stream alternately.
@ -313,10 +313,10 @@ instance Monoid (WSerialT m a) where
instance Monad m => Monad (WSerialT m) where
return = pure
(WSerialT (Stream m)) >>= f = WSerialT $ Stream $ \_ stp sng yld ->
let runIt x = (runStream x) Nothing stp sng yld
single a = runIt $ toStream (f a)
yield a r = runIt $ toStream $ f a <> (fromStream r >>= f)
in m Nothing stp single yield
let run x = (unStream x) Nothing stp sng yld
single a = run $ toStream (f a)
yieldk a r = run $ toStream $ f a <> (fromStream r >>= f)
in m Nothing stp single yieldk
------------------------------------------------------------------------------
-- Other instances
@ -334,11 +334,11 @@ MONAD_COMMON_INSTANCES(WSerialT,)
-- @since 0.1.0
{-# DEPRECATED runStreamT "Please use runStream instead." #-}
runStreamT :: Monad m => SerialT m a -> m ()
runStreamT = run
runStreamT = runStream
-- | Same as @runStream . wSerially@.
--
-- @since 0.1.0
{-# DEPRECATED runInterleavedT "Please use 'runStream . interleaving' instead." #-}
runInterleavedT :: Monad m => InterleavedT m a -> m ()
runInterleavedT = run
runInterleavedT = runStream

View File

@ -20,32 +20,47 @@
-- Portability : GHC
--
--
-- Continuation passing style stream implementation.
--
-- import qualified Streamly.Streams.StreamK as K
--
module Streamly.Streams.StreamK
(
-- * Streams
IsStream (..)
, adapt
, Streaming -- deprecated
, Stream (..)
, run
, mkStream
, foldStream
, runStream
-- * Construction
, nil
, yield
, yieldM
, cons
, (.:)
, consMSerial
-- * Asynchronous construction
, nilK
, yieldK
, consK
-- * Generation
, singleton
, once
, repeat
-- * Semigroup Style Composition
, serial
-- * Utilities
, consMSerial
, bindWith
, withLocal
-- * Deprecated
, Streaming -- deprecated
, once -- deprecated
)
where
@ -77,7 +92,7 @@ import Streamly.SVar
--
newtype Stream m a =
Stream {
runStream :: forall r.
unStream :: forall r.
Maybe (SVar Stream m a) -- local state
-> m r -- stop
-> (a -> m r) -- singleton
@ -152,6 +167,23 @@ type Streaming = IsStream
adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a
adapt = fromStream . toStream
------------------------------------------------------------------------------
-- Building a stream
------------------------------------------------------------------------------
-- | Build a stream from an 'SVar', a stop continuation, a singleton stream
-- continuation and a yield continuation.
mkStream:: IsStream t
=> (forall r. Maybe (SVar Stream m a)
-> m r
-> (a -> m r)
-> (a -> t m a -> m r)
-> m r)
-> t m a
mkStream k = fromStream $ Stream $ \svr stp sng yld ->
let yieldk a r = yld a (toStream r)
in k svr stp sng yieldk
------------------------------------------------------------------------------
-- Construction
------------------------------------------------------------------------------
@ -167,11 +199,41 @@ adapt = fromStream . toStream
nil :: IsStream t => t m a
nil = fromStream $ Stream $ \_ stp _ _ -> stp
-- faster than yieldM because there is no bind.
-- | Create a singleton stream from a pure value. Same as @yieldM (return a)@
-- but more efficient.
yield :: IsStream t => a -> t m a
yield a = fromStream $ Stream $ \_ _ single _ -> single a
-- | Create a singleton stream from a monadic action. Same as @m \`consM` nil@
-- but more efficient.
--
-- @
-- > toList $ yieldM getLine
-- hello
-- ["hello"]
-- @
--
-- @since 0.4.0
{-# INLINE yieldM #-}
yieldM :: (Monad m, IsStream t) => m a -> t m a
yieldM m = fromStream $ Stream $ \_ _ single _ -> m >>= single
-- | Same as yieldM
--
-- @since 0.2.0
{-# DEPRECATED once "Please use yieldM instead." #-}
{-# INLINE once #-}
once :: (Monad m, IsStream t) => m a -> t m a
once = yieldM
infixr 5 `cons`
-- faster than consM because there is no bind.
-- | Construct a stream by adding a pure value at the head of an existing
-- stream. For pure values it can be faster than 'consM'. For example:
-- stream. For serial streams this is the same as @(return a) \`consM` r@ but
-- more efficient. For concurrent streams this is not concurrent whereas
-- 'consM' is concurrent. For example:
--
-- @
-- > toList $ 1 \`cons` 2 \`cons` 3 \`cons` nil
@ -210,6 +272,25 @@ infixr 5 .:
consMSerial :: (Monad m) => m a -> Stream m a -> Stream m a
consMSerial m r = Stream $ \_ _ _ yld -> m >>= \a -> yld a r
------------------------------------------------------------------------------
-- Asynchronous construction
------------------------------------------------------------------------------
-- | Make an empty stream from a callback function.
nilK :: IsStream t => (forall r. m r -> m r) -> t m a
nilK k = fromStream $ Stream $ \_ stp _ _ -> k stp
-- | Make a singleton stream from a one shot callback function.
yieldK :: IsStream t => (forall r. (a -> m r) -> m r) -> t m a
yieldK k = fromStream $ Stream $ \_ _ sng _ -> k sng
-- | Construct a stream from a callback function.
consK :: IsStream t => (forall r. (a -> m r) -> m r) -> t m a -> t m a
consK k r = fromStream $ Stream $ \_ _ _ yld -> k (\x -> yld x (toStream r))
-- XXX consK with concurrent callbacks
-- XXX Build a stream from a repeating callback function.
-------------------------------------------------------------------------------
-- IsStream Stream
-------------------------------------------------------------------------------
@ -228,37 +309,33 @@ instance IsStream Stream where
(|:) :: Monad m => m a -> Stream m a -> Stream m a
(|:) = consMSerial
run :: (Monad m, IsStream t) => t m a -> m ()
run m = go (toStream m)
-- | Fold a stream by providing an SVar, a stop continuation, a singleton
-- continuation and a yield continuation.
foldStream
:: IsStream t
=> Maybe (SVar Stream m a)
-> m r
-> (a -> m r)
-> (a -> t m a -> m r)
-> t m a
-> m r
foldStream svr blank single step m =
let yieldk a x = step a (fromStream x)
in (unStream (toStream m)) svr blank single yieldk
runStream :: (Monad m, IsStream t) => t m a -> m ()
runStream m = go (toStream m)
where
go m1 =
let stop = return ()
single _ = return ()
yield _ r = go (toStream r)
in (runStream m1) Nothing stop single yield
yieldk _ r = go (toStream r)
in (unStream m1) Nothing stop single yieldk
-------------------------------------------------------------------------------
-- Special generation
-------------------------------------------------------------------------------
-- | Same as @once . return@ but may be faster because there is no bind
singleton :: IsStream t => a -> t m a
singleton a = fromStream $ Stream $ \_ _ single _ -> single a
-- | Create a singleton stream by executing a monadic action once. Same as
-- @m \`consM` nil@ but more efficient.
--
-- @
-- > toList $ once getLine
-- hello
-- ["hello"]
-- @
--
-- @since 0.2.0
{-# INLINE once #-}
once :: (Monad m, IsStream t) => m a -> t m a
once m = fromStream $ Stream $ \_ _ single _ -> m >>= single
repeat :: IsStream t => a -> t m a
repeat a = let x = cons a x in x
@ -273,10 +350,10 @@ serial :: Stream m a -> Stream m a -> Stream m a
serial m1 m2 = go m1
where
go (Stream m) = Stream $ \_ stp sng yld ->
let stop = (runStream m2) Nothing stp sng yld
single a = yld a m2
yield a r = yld a (go r)
in m Nothing stop single yield
let stop = (unStream m2) Nothing stp sng yld
single a = yld a m2
yieldk a r = yld a (go r)
in m Nothing stop single yieldk
instance Semigroup (Stream m a) where
(<>) = serial
@ -295,9 +372,9 @@ instance Monoid (Stream m a) where
instance Monad m => Functor (Stream m) where
fmap f m = Stream $ \_ stp sng yld ->
let single = sng . f
yield a r = yld (f a) (fmap f r)
in (runStream m) Nothing stp single yield
let single = sng . f
yieldk a r = yld (f a) (fmap f r)
in (unStream m) Nothing stp single yieldk
-------------------------------------------------------------------------------
-- Bind utility
@ -313,10 +390,10 @@ bindWith par m f = go m
where
go (Stream g) =
Stream $ \ctx stp sng yld ->
let runIt x = (runStream x) ctx stp sng yld
single a = runIt $ f a
yield a r = runIt $ f a `par` go r
in g Nothing stp single yield
let run x = (unStream x) ctx stp sng yld
single a = run $ f a
yieldk a r = run $ f a `par` go r
in g Nothing stp single yieldk
------------------------------------------------------------------------------
-- Alternative & MonadPlus
@ -324,8 +401,8 @@ bindWith par m f = go m
_alt :: Stream m a -> Stream m a -> Stream m a
_alt m1 m2 = Stream $ \_ stp sng yld ->
let stop = runStream m2 Nothing stp sng yld
in runStream m1 Nothing stop sng yld
let stop = unStream m2 Nothing stp sng yld
in unStream m1 Nothing stop sng yld
------------------------------------------------------------------------------
-- MonadReader
@ -335,8 +412,8 @@ withLocal :: MonadReader r m => (r -> r) -> Stream m a -> Stream m a
withLocal f m =
Stream $ \_ stp sng yld ->
let single = local f . sng
yield a r = local f $ yld a (withLocal f r)
in (runStream m) Nothing (local f stp) single yield
yieldk a r = local f $ yld a (withLocal f r)
in (unStream m) Nothing (local f stp) single yieldk
------------------------------------------------------------------------------
-- MonadError
@ -349,9 +426,9 @@ withCatchError
=> Stream m a -> (e -> Stream m a) -> Stream m a
withCatchError m h =
Stream $ \_ stp sng yld ->
let run x = runStream x Nothing stp sng yield
let run x = unStream x Nothing stp sng yieldk
handle r = r `catchError` \e -> run $ h e
yield a r = yld a (withCatchError r h)
yieldk a r = yld a (withCatchError r h)
in handle $ run m
-}
@ -360,4 +437,4 @@ withCatchError m h =
-------------------------------------------------------------------------------
instance MonadTrans Stream where
lift = once
lift = yieldM

View File

@ -61,10 +61,10 @@ zipWithS f m1 m2 = go m1 m2
let merge a ra =
let single2 b = sng (f a b)
yield2 b rb = yld (f a b) (go ra rb)
in (runStream my) Nothing stp single2 yield2
in unStream my Nothing stp single2 yield2
let single1 a = merge a nil
yield1 a ra = merge a ra
(runStream mx) Nothing stp single1 yield1
unStream mx Nothing stp single1 yield1
-- | Zip two streams serially using a pure zipping function.
--
@ -81,13 +81,13 @@ zipWithM f m1 m2 = fromStream $ go (toStream m1) (toStream m2)
where
go mx my = Stream $ \_ stp sng yld -> do
let merge a ra =
let runIt x = runStream x Nothing stp sng yld
let runIt x = unStream x Nothing stp sng yld
single2 b = runIt $ toStream (f a b)
yield2 b rb = runIt $ toStream (f a b) <> go ra rb
in (runStream my) Nothing stp single2 yield2
in unStream my Nothing stp single2 yield2
let single1 a = merge a nil
yield1 a ra = merge a ra
(runStream mx) Nothing stp single1 yield1
unStream mx Nothing stp single1 yield1
------------------------------------------------------------------------------
-- Serially Zipping Streams
@ -168,7 +168,7 @@ zipAsyncWith :: (IsStream t, MonadAsync m)
zipAsyncWith f m1 m2 = fromStream $ Stream $ \_ stp sng yld -> do
ma <- mkAsync m1
mb <- mkAsync m2
(runStream (toStream (zipWith f ma mb))) Nothing stp sng yld
unStream (toStream (zipWith f ma mb)) Nothing stp sng yld
-- | Zip two streams asyncly (i.e. both the elements being zipped are generated
-- concurrently) using a monadic zipping function.
@ -179,7 +179,7 @@ zipAsyncWithM :: (IsStream t, MonadAsync m)
zipAsyncWithM f m1 m2 = fromStream $ Stream $ \_ stp sng yld -> do
ma <- mkAsync m1
mb <- mkAsync m2
(runStream (toStream (zipWithM f ma mb))) Nothing stp sng yld
unStream (toStream (zipWithM f ma mb)) Nothing stp sng yld
------------------------------------------------------------------------------
-- Parallely Zipping Streams
@ -245,11 +245,11 @@ instance MonadAsync m => Applicative (ZipAsyncM m) where
-- @since 0.1.0
{-# DEPRECATED runZipStream "Please use 'runStream . zipSerially instead." #-}
runZipStream :: Monad m => ZipSerialM m a -> m ()
runZipStream = run
runZipStream = runStream
-- | Same as @runStream . zippingAsync@.
--
-- @since 0.1.0
{-# DEPRECATED runZipAsync "Please use 'runStream . zipAsyncly instead." #-}
runZipAsync :: Monad m => ZipAsyncM m a -> m ()
runZipAsync = run
runZipAsync = runStream

View File

@ -40,7 +40,7 @@ main = hspec $ do
it "simple serially" $
(runStream . serially) (return (0 :: Int)) `shouldReturn` ()
it "simple serially with IO" $
(runStream . serially) (A.once $ putStrLn "hello") `shouldReturn` ()
(runStream . serially) (A.yieldM $ putStrLn "hello") `shouldReturn` ()
describe "Empty" $ do
it "Monoid - mempty" $
@ -381,12 +381,12 @@ main = hspec $ do
it "asyncly crosses thread limit (2000 threads)" $
runStream (asyncly $ fold $
replicate 2000 $ A.once $ threadDelay 1000000)
replicate 2000 $ A.yieldM $ threadDelay 1000000)
`shouldReturn` ()
it "aheadly crosses thread limit (4000 threads)" $
runStream (aheadly $ fold $
replicate 4000 $ A.once $ threadDelay 1000000)
replicate 4000 $ A.yieldM $ threadDelay 1000000)
`shouldReturn` ()
-- XXX need to test that we have promptly cleaned up everything after the error
@ -578,7 +578,7 @@ nestTwoParallelApp =
`shouldReturn` sort ([6,7,7,8,8,8,9,9,9,9,10,10,10,11,11,12] :: [Int])
timed :: (IsStream t, Monad (t IO)) => Int -> t IO Int
timed x = A.once (threadDelay (x * 100000)) >> return x
timed x = A.yieldM (threadDelay (x * 100000)) >> return x
interleaveCheck :: IsStream t
=> (t IO Int -> SerialT IO Int)
@ -602,7 +602,7 @@ parallelCheck t f = do
(A.toList . t) (event 4 `f` (event 3 `f` (event 2 `f` event 1)))
`shouldReturn` ([1..4])
where event n = (A.once $ threadDelay (n * 100000)) >> (return n)
where event n = (A.yieldM $ threadDelay (n * 100000)) >> (return n)
compose :: (IsStream t, Semigroup (t IO Int))
=> (t IO Int -> SerialT IO Int) -> t IO Int -> ([Int] -> [Int]) -> Spec
@ -682,12 +682,12 @@ loops t tsrt hsrt = do
where
loopHead x = do
-- this print line is important for the test (causes a bind)
A.once $ putStrLn "LoopHead..."
A.yieldM $ putStrLn "LoopHead..."
t $ (if x < 3 then loopHead (x + 1) else nil) <> return x
loopTail x = do
-- this print line is important for the test (causes a bind)
A.once $ putStrLn "LoopTail..."
A.yieldM $ putStrLn "LoopTail..."
t $ return x <> (if x < 3 then loopTail (x + 1) else nil)
bindAndComposeSimple
@ -755,21 +755,21 @@ mixedOps = do
composeMixed :: SerialT IO Int
composeMixed = do
A.once $ return ()
A.once $ putStr ""
A.yieldM $ return ()
A.yieldM $ putStr ""
x <- return 1
y <- return 2
z <- do
x1 <- wAsyncly $ return 1 <> return 2
A.once $ return ()
A.once $ putStr ""
A.yieldM $ return ()
A.yieldM $ putStr ""
y1 <- asyncly $ return 1 <> return 2
z1 <- do
x11 <- return 1 <> return 2
y11 <- asyncly $ return 1 <> return 2
z11 <- wSerially $ return 1 <> return 2
A.once $ return ()
A.once $ putStr ""
A.yieldM $ return ()
A.yieldM $ putStr ""
return (x11 + y11 + z11)
return (x1 + y1 + z1)
return (x + y + z)
@ -785,21 +785,21 @@ mixedOpsAheadly = do
composeMixed :: SerialT IO Int
composeMixed = do
A.once $ return ()
A.once $ putStr ""
A.yieldM $ return ()
A.yieldM $ putStr ""
x <- return 1
y <- return 2
z <- do
x1 <- wAsyncly $ return 1 <> return 2
A.once $ return ()
A.once $ putStr ""
A.yieldM $ return ()
A.yieldM $ putStr ""
y1 <- aheadly $ return 1 <> return 2
z1 <- do
x11 <- return 1 <> return 2
y11 <- aheadly $ return 1 <> return 2
z11 <- parallely $ return 1 <> return 2
A.once $ return ()
A.once $ putStr ""
A.yieldM $ return ()
A.yieldM $ putStr ""
return (x11 + y11 + z11)
return (x1 + y1 + z1)
return (x + y + z)

View File

@ -159,10 +159,10 @@ concurrentUnfoldrM eq op n =
-- results may not be yielded in order, in case of
-- Async/WAsync/Parallel. So we use an increasing count
-- instead.
i <- A.once $ readIORef cnt
A.once $ modifyIORef cnt (+1)
i <- A.yieldM $ readIORef cnt
A.yieldM $ modifyIORef cnt (+1)
let msg = show i ++ "/" ++ show n
A.once $ do
A.yieldM $ do
if even i
then do
dbgMVar ("first take concurrentUnfoldrM " ++ msg)

View File

@ -1,6 +1,6 @@
import Streamly
import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
import Streamly.Prelude (nil, once)
import Streamly.Prelude (nil, yieldM)
main = do
hSetBuffering stdout LineBuffering
@ -8,32 +8,32 @@ main = do
putStrLn $ "\nloopTail:\n"
runStream $ do
x <- loopTail 0
once $ print (x :: Int)
yieldM $ print (x :: Int)
putStrLn $ "\nloopHead:\n"
runStream $ do
x <- loopHead 0
once $ print (x :: Int)
yieldM $ print (x :: Int)
putStrLn $ "\nloopTailA:\n"
runStream $ do
x <- loopTailA 0
once $ print (x :: Int)
yieldM $ print (x :: Int)
putStrLn $ "\nloopHeadA:\n"
runStream $ do
x <- loopHeadA 0
once $ print (x :: Int)
yieldM $ print (x :: Int)
putStrLn $ "\nwSerial:\n"
runStream $ do
x <- (return 0 <> return 1) `wSerial` (return 100 <> return 101)
once $ print (x :: Int)
yieldM $ print (x :: Int)
putStrLn $ "\nParallel interleave:\n"
runStream $ do
x <- (return 0 <> return 1) `wAsync` (return 100 <> return 101)
once $ print (x :: Int)
yieldM $ print (x :: Int)
where
@ -45,7 +45,7 @@ main = do
-- stream. Interleaves the generator and the consumer.
loopTail :: Int -> Serial Int
loopTail x = do
once $ putStrLn "LoopTail..."
yieldM $ putStrLn "LoopTail..."
return x <> (if x < 3 then loopTail (x + 1) else nil)
-- Loops and then generates a value. The consumer can run only after the
@ -53,7 +53,7 @@ main = do
-- at all.
loopHead :: Int -> Serial Int
loopHead x = do
once $ putStrLn "LoopHead..."
yieldM $ putStrLn "LoopHead..."
(if x < 3 then loopHead (x + 1) else nil) <> return x
-------------------------------------------------------------------------------
@ -62,12 +62,12 @@ main = do
loopTailA :: Int -> Serial Int
loopTailA x = do
once $ putStrLn "LoopTailA..."
yieldM $ putStrLn "LoopTailA..."
return x `async` (if x < 3 then loopTailA (x + 1) else nil)
loopHeadA :: Int -> Serial Int
loopHeadA x = do
once $ putStrLn "LoopHeadA..."
yieldM $ putStrLn "LoopHeadA..."
(if x < 3 then loopHeadA (x + 1) else nil) `async` return x
-------------------------------------------------------------------------------

View File

@ -2,23 +2,23 @@ import Control.Concurrent (myThreadId)
import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
import System.Random (randomIO)
import Streamly
import Streamly.Prelude (nil, once)
import Streamly.Prelude (nil, yieldM)
main = runStream $ do
once $ hSetBuffering stdout LineBuffering
yieldM $ hSetBuffering stdout LineBuffering
x <- loop "A " 2
y <- loop "B " 2
once $ myThreadId >>= putStr . show
yieldM $ myThreadId >>= putStr . show
>> putStr " "
>> print (x, y)
where
-- we can just use
-- parallely $ mconcat $ replicate n $ once (...)
-- parallely $ mconcat $ replicate n $ yieldM (...)
loop :: String -> Int -> SerialT IO String
loop name n = do
rnd <- once (randomIO :: IO Int)
rnd <- yieldM (randomIO :: IO Int)
let result = (name ++ show rnd)
repeat = if n > 1 then loop name (n - 1) else nil
in (return result) `wAsync` repeat

View File

@ -8,19 +8,19 @@ main = do
hSetBuffering stdout LineBuffering
runStream $ do
x <- S.take 10 $ loop "A" `parallel` loop "B"
S.once $ myThreadId >>= putStr . show
S.yieldM $ myThreadId >>= putStr . show
>> putStr " got "
>> print x
where
-- we can just use
-- parallely $ cycle1 $ once (...)
-- parallely $ cycle1 $ yieldM (...)
loop :: String -> Serial (String, Int)
loop name = do
S.once $ threadDelay 1000000
rnd <- S.once (randomIO :: IO Int)
S.once $ myThreadId >>= putStr . show
S.yieldM $ threadDelay 1000000
rnd <- S.yieldM (randomIO :: IO Int)
S.yieldM $ myThreadId >>= putStr . show
>> putStr " yielding "
>> print rnd
return (name, rnd) `parallel` loop name