Change the order of foldStream arguments

To make it consistent with regular fold function conventions.
This commit is contained in:
Harendra Kumar 2018-12-26 16:31:38 +05:30
parent 90844f990b
commit 0a2755a48c
9 changed files with 183 additions and 183 deletions

View File

@ -896,7 +896,7 @@ each = K.fromFoldable
fromHandle :: (IsStream t, MonadIO m) => IO.Handle -> t m String
fromHandle h = go
where
go = K.mkStream $ \_ stp _ yld -> do
go = K.mkStream $ \_ yld _ stp -> do
eof <- liftIO $ IO.hIsEOF h
if eof
then stp
@ -1449,7 +1449,7 @@ toHandle h m = go m
let stop = return ()
single a = liftIO (IO.hPutStrLn h a)
yieldk a r = liftIO (IO.hPutStrLn h a) >> go r
in K.foldStream defState stop single yieldk m1
in K.foldStream defState yieldk single stop m1
------------------------------------------------------------------------------
-- Transformation by Folding (Scans)
@ -1770,12 +1770,12 @@ mapMaybeMSerial f m = fromStreamD $ D.mapMaybeM f $ toStreamD m
reverse :: (IsStream t) => t m a -> t m a
reverse m = go K.nil m
where
go rev rest = K.mkStream $ \st stp sng yld ->
let runIt x = K.foldStream st stp sng yld x
go rev rest = K.mkStream $ \st yld sng stp ->
let runIt x = K.foldStream st yld sng stp x
stop = runIt rev
single a = runIt $ a `K.cons` rev
yieldk a r = runIt $ go (a `K.cons` rev) r
in K.foldStream st stop single yieldk rest
in K.foldStream st yieldk single stop rest
------------------------------------------------------------------------------
-- Transformation by Inserting

View File

@ -298,9 +298,10 @@ processHeap q heap st sv winfo entry sno stopping = loopHeap sno entry
let stop = do
liftIO (incrementYieldLimit sv)
nextHeap seqNo
foldStreamSVar sv st stop
(singleStreamFromHeap seqNo)
foldStreamSVar sv st
(yieldStreamFromHeap seqNo)
(singleStreamFromHeap seqNo)
stop
r
else liftIO $ do
let ent = Entry seqNo (AheadEntryStream r)
@ -349,9 +350,10 @@ processWithoutToken q heap st sv winfo m seqNo = do
-- we stop.
toHeap AheadEntryNull
foldStreamSVar sv st stop
(toHeap . AheadEntryPure)
foldStreamSVar sv st
(\a r -> toHeap $ AheadEntryStream $ K.cons a r)
(toHeap . AheadEntryPure)
stop
m
where
@ -410,7 +412,7 @@ processWithToken q heap st sv winfo action sno = do
liftIO (incrementYieldLimit sv)
loopWithToken (sno + 1)
foldStreamSVar sv st stop (singleOutput sno) (yieldOutput sno) action
foldStreamSVar sv st (yieldOutput sno) (singleOutput sno) stop action
where
@ -433,9 +435,10 @@ processWithToken q heap st sv winfo action sno = do
let stop = do
liftIO (incrementYieldLimit sv)
loopWithToken (seqNo + 1)
foldStreamSVar sv st stop
(singleOutput seqNo)
foldStreamSVar sv st
(yieldOutput seqNo)
(singleOutput seqNo)
stop
r
else do
let ent = Entry seqNo (AheadEntryStream r)
@ -463,9 +466,10 @@ processWithToken q heap st sv winfo action sno = do
let stop = do
liftIO (incrementYieldLimit sv)
loopWithToken (seqNo + 1)
foldStreamSVar sv st stop
(singleOutput seqNo)
foldStreamSVar sv st
(yieldOutput seqNo)
(singleOutput seqNo)
stop
m
else
-- To avoid a race when another thread puts something
@ -684,11 +688,11 @@ aheadbind
aheadbind m f = go m
where
go g =
mkStream $ \st stp sng yld ->
let foldShared = foldStreamShared st stp sng yld
mkStream $ \st yld sng stp ->
let foldShared = foldStreamShared st yld sng stp
single a = foldShared $ unShare (f a)
yieldk a r = foldShared $ unShare (f a) `ahead` go r
in foldStream (adaptState st) stp single yieldk g
in foldStream (adaptState st) yieldk single stp g
instance MonadAsync m => Monad (AheadT m) where
return = pure

View File

@ -84,7 +84,7 @@ workLoopLIFO q st sv winfo = run
work <- dequeue
case work of
Nothing -> liftIO $ sendStop sv winfo
Just m -> foldStreamSVar sv st run single yieldk m
Just m -> foldStreamSVar sv st yieldk single run m
single a = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
@ -93,7 +93,7 @@ workLoopLIFO q st sv winfo = run
yieldk a r = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
if res
then foldStreamSVar sv st run single yieldk r
then foldStreamSVar sv st yieldk single run r
else liftIO $ do
enqueueLIFO sv q r
sendStop sv winfo
@ -134,7 +134,7 @@ workLoopLIFOLimited q st sv winfo = run
if yieldLimitOk
then do
let stop = liftIO (incrementYieldLimit sv) >> run
foldStreamSVar sv st stop single yieldk m
foldStreamSVar sv st yieldk single stop m
-- Avoid any side effects, undo the yield limit decrement if we
-- never yielded anything.
else liftIO $ do
@ -153,7 +153,7 @@ workLoopLIFOLimited q st sv winfo = run
yieldLimitOk <- liftIO $ decrementYieldLimit sv
let stop = liftIO (incrementYieldLimit sv) >> run
if res && yieldLimitOk
then foldStreamSVar sv st stop single yieldk r
then foldStreamSVar sv st yieldk single stop r
else liftIO $ do
incrementYieldLimit sv
enqueueLIFO sv q r
@ -185,7 +185,7 @@ workLoopFIFO q st sv winfo = run
work <- liftIO $ tryPopR q
case work of
Nothing -> liftIO $ sendStop sv winfo
Just m -> foldStreamSVar sv st run single yieldk m
Just m -> foldStreamSVar sv st yieldk single run m
single a = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
@ -194,7 +194,7 @@ workLoopFIFO q st sv winfo = run
yieldk a r = do
res <- liftIO $ sendYield sv winfo (ChildYield a)
if res
then foldStreamSVar sv st run single yieldk r
then foldStreamSVar sv st yieldk single run r
else liftIO $ do
enqueueFIFO sv q r
sendStop sv winfo
@ -220,7 +220,7 @@ workLoopFIFOLimited q st sv winfo = run
if yieldLimitOk
then do
let stop = liftIO (incrementYieldLimit sv) >> run
foldStreamSVar sv st stop single yieldk m
foldStreamSVar sv st yieldk single stop m
else liftIO $ do
enqueueFIFO sv q m
incrementYieldLimit sv
@ -235,7 +235,7 @@ workLoopFIFOLimited q st sv winfo = run
yieldLimitOk <- liftIO $ decrementYieldLimit sv
let stop = liftIO (incrementYieldLimit sv) >> run
if res && yieldLimitOk
then foldStreamSVar sv st stop single yieldk r
then foldStreamSVar sv st yieldk single stop r
else liftIO $ do
incrementYieldLimit sv
enqueueFIFO sv q r

View File

@ -63,7 +63,7 @@ import qualified Streamly.Streams.StreamK as K
runOne
:: MonadIO m
=> State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne st m winfo = foldStreamShared st stop single yieldk m
runOne st m winfo = foldStreamShared st yieldk single stop m
where
@ -89,21 +89,21 @@ runOne st m winfo = foldStreamShared st stop single yieldk m
{-# NOINLINE forkSVarPar #-}
forkSVarPar :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
forkSVarPar m r = mkStream $ \st stp sng yld -> do
forkSVarPar m r = mkStream $ \st yld sng stp -> do
sv <- newParallelVar st
pushWorkerPar sv (runOne st{streamVar = Just sv} $ toStream m)
pushWorkerPar sv (runOne st{streamVar = Just sv} $ toStream r)
foldStream st stp sng yld (fromSVar sv)
foldStream st yld sng stp (fromSVar sv)
{-# INLINE joinStreamVarPar #-}
joinStreamVarPar :: (IsStream t, MonadAsync m)
=> SVarStyle -> t m a -> t m a -> t m a
joinStreamVarPar style m1 m2 = mkStream $ \st stp sng yld ->
joinStreamVarPar style m1 m2 = mkStream $ \st yld sng stp ->
case streamVar st of
Just sv | svarStyle sv == style -> do
pushWorkerPar sv (runOne st $ toStream m1)
foldStreamShared st stp sng yld m2
_ -> foldStreamShared st stp sng yld (forkSVarPar m1 m2)
foldStreamShared st yld sng stp m2
_ -> foldStreamShared st yld sng stp (forkSVarPar m1 m2)
-- | XXX we can implement it more efficienty by directly implementing instead
-- of combining streams using parallel.
@ -135,10 +135,10 @@ mkParallel m = do
{-# INLINE applyWith #-}
applyWith :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b
applyWith f m = mkStream $ \st stp sng yld -> do
applyWith f m = mkStream $ \st yld sng stp -> do
sv <- newParallelVar (adaptState st)
pushWorkerPar sv (runOne st{streamVar = Just sv} (toStream m))
foldStream st stp sng yld $ f $ fromSVar sv
foldStream st yld sng stp $ f $ fromSVar sv
------------------------------------------------------------------------------
-- Stream runner concurrent function application

View File

@ -38,12 +38,12 @@ printSVar sv how = do
-- | Pull a stream from an SVar.
{-# NOINLINE fromStreamVar #-}
fromStreamVar :: MonadAsync m => SVar Stream m a -> Stream m a
fromStreamVar sv = mkStream $ \st stp sng yld -> do
fromStreamVar sv = mkStream $ \st yld sng stp -> do
list <- readOutputQ sv
-- Reversing the output is important to guarantee that we process the
-- outputs in the same order as they were generated by the constituent
-- streams.
foldStream st stp sng yld $ processEvents $ reverse list
foldStream st yld sng stp $ processEvents $ reverse list
where
@ -55,36 +55,36 @@ fromStreamVar sv = mkStream $ \st stp sng yld -> do
stp
{-# INLINE processEvents #-}
processEvents [] = mkStream $ \st stp sng yld -> do
processEvents [] = mkStream $ \st yld sng stp -> do
done <- postProcess sv
if done
then allDone stp
else foldStream st stp sng yld $ fromStreamVar sv
else foldStream st yld sng stp $ fromStreamVar sv
processEvents (ev : es) = mkStream $ \st stp sng yld -> do
processEvents (ev : es) = mkStream $ \st yld sng stp -> do
let rest = processEvents es
case ev of
ChildYield a -> yld a rest
ChildStop tid e -> do
accountThread sv tid
case e of
Nothing -> foldStream st stp sng yld rest
Nothing -> foldStream st yld sng stp rest
Just ex ->
case fromException ex of
Just ThreadAbort ->
foldStream st stp sng yld rest
foldStream st yld sng stp rest
Nothing -> liftIO (cleanupSVar sv) >> throwM ex
{-# INLINE fromSVar #-}
fromSVar :: (MonadAsync m, IsStream t) => SVar Stream m a -> t m a
fromSVar sv =
mkStream $ \st stp sng yld -> do
mkStream $ \st yld sng stp -> do
ref <- liftIO $ newIORef ()
_ <- liftIO $ mkWeakIORef ref hook
-- We pass a copy of sv to fromStreamVar, so that we know that it has
-- no other references, when that copy gets garbage collected "ref"
-- will get garbage collected and our hook will be called.
foldStreamShared st stp sng yld $
foldStreamShared st yld sng stp $
fromStream $ fromStreamVar sv{svarRef = Just ref}
where

View File

@ -174,11 +174,11 @@ instance IsStream SerialT where
--
instance Monad m => Monad (SerialT m) where
return = pure
m >>= f = mkStream $ \st stp sng yld ->
let run x = foldStream st stp sng yld x
m >>= f = mkStream $ \st yld sng stp ->
let run x = foldStream st yld sng stp x
single a = run $ f a
yieldk a r = run $ f a <> (r >>= f)
in foldStream (adaptState st) stp single yieldk m
in foldStream (adaptState st) yieldk single stp m
------------------------------------------------------------------------------
-- Other instances
@ -300,11 +300,11 @@ instance IsStream WSerialT where
-- @since 0.2.0
{-# INLINE wSerial #-}
wSerial :: IsStream t => t m a -> t m a -> t m a
wSerial m1 m2 = mkStream $ \st stp sng yld -> do
let stop = foldStream st stp sng yld m2
wSerial m1 m2 = mkStream $ \st yld sng stp -> do
let stop = foldStream st yld sng stp m2
single a = yld a m2
yieldk a r = yld a (wSerial m2 r)
foldStream st stop single yieldk m1
foldStream st yieldk single stop m1
instance Semigroup (WSerialT m a) where
(<>) = wSerial
@ -337,11 +337,11 @@ instance Monoid (WSerialT m a) where
--
instance Monad m => Monad (WSerialT m) where
return = pure
m >>= f = mkStream $ \st stp sng yld ->
let run x = foldStream st stp sng yld x
m >>= f = mkStream $ \st yld sng stp ->
let run x = foldStream st yld sng stp x
single a = run $ f a
yieldk a r = run $ f a <> (r >>= f)
in foldStream (adaptState st) stp single yieldk m
in foldStream (adaptState st) yieldk single stp m
------------------------------------------------------------------------------
-- Other instances

View File

@ -483,7 +483,7 @@ fromStreamK = Stream step
let stop = return Stop
single a = return $ Yield a K.nil
yieldk a r = return $ Yield a r
in K.foldStreamShared gst stop single yieldk m1
in K.foldStreamShared gst yieldk single stop m1
{-# INLINE toStreamD #-}
toStreamD :: (K.IsStream t, Monad m) => t m a -> Stream m a
@ -882,11 +882,11 @@ toList = foldr (:) []
toStreamK :: Monad m => Stream m a -> K.Stream m a
toStreamK (Stream step state) = go state
where
go st = K.mkStream $ \gst stp sng yld -> do
go st = K.mkStream $ \gst yld sng stp -> do
r <- step gst st
case r of
Yield x s -> yld x (go s)
Skip s -> K.foldStreamShared gst stp sng yld $ go s
Skip s -> K.foldStreamShared gst yld sng stp $ go s
Stop -> stp
#ifndef DISABLE_FUSION

View File

@ -174,8 +174,8 @@ import Streamly.Streams.StreamKType
-- | Detach a stream from an SVar
{-# INLINE unShare #-}
unShare :: IsStream t => t m a -> t m a
unShare x = mkStream $ \st stp sng yld ->
foldStream st stp sng yld x
unShare x = mkStream $ \st yld sng stp ->
foldStream st yld sng stp x
------------------------------------------------------------------------------
-- Construction
@ -196,7 +196,7 @@ infixr 5 `cons`
--
-- @since 0.1.0
cons :: IsStream t => a -> t m a -> t m a
cons a r = mkStream $ \_ _ _ yld -> yld a r
cons a r = mkStream $ \_ yld _ _ -> yld a r
infixr 5 .:
@ -221,7 +221,7 @@ uncons m =
let stop = return Nothing
single a = return (Just (a, nil))
yieldk a r = return (Just (a, r))
in foldStream defState stop single yieldk m
in foldStream defState yieldk single stop m
-------------------------------------------------------------------------------
-- Generation
@ -231,7 +231,7 @@ uncons m =
unfoldr :: IsStream t => (b -> Maybe (a, b)) -> b -> t m a
unfoldr step = go
where
go s = mkStream $ \_ stp _ yld ->
go s = mkStream $ \_ yld _ stp ->
case step s of
Nothing -> stp
Just (a, b) -> yld a (go b)
@ -240,12 +240,12 @@ unfoldr step = go
unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a
unfoldrM step = go
where
go s = mkStream $ \st stp sng yld -> do
go s = mkStream $ \st yld sng stp -> do
mayb <- step s
case mayb of
Nothing -> stp
Just (a, b) ->
foldStreamShared st stp sng yld $ return a |: go b
foldStreamShared st yld sng stp $ return a |: go b
-------------------------------------------------------------------------------
-- Special generation
@ -320,7 +320,7 @@ foldr step acc m = go m
let stop = return acc
single a = return (step a acc)
yieldk a r = go r >>= \b -> return (step a b)
in foldStream defState stop single yieldk m1
in foldStream defState yieldk single stop m1
-- | Lazy right fold with a monadic step function.
{-# INLINE foldrM #-}
@ -331,7 +331,7 @@ foldrM step acc m = go m
let stop = return acc
single a = step a acc
yieldk a r = go r >>= step a
in foldStream defState stop single yieldk m1
in foldStream defState yieldk single stop m1
{-# INLINE foldr1 #-}
foldr1 :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> m (Maybe a)
@ -345,7 +345,7 @@ foldr1 step m = do
let stp = return p
single a = return $ step a p
yieldk a r = fmap (step p) (go a r)
in foldStream defState stp single yieldk m1
in foldStream defState yieldk single stp m1
-- | Strict left fold with an extraction function. Like the standard strict
-- left fold, but applies a user supplied extraction function (the third
@ -372,13 +372,13 @@ foldx step begin done m = get $ go m begin
-- that cannot be tail call optimized. Unfolding recursion explicitly via
-- continuations is much more efficient.
go :: t m a -> x -> t m x
go m1 !acc = mkStream $ \_ _ sng yld ->
go m1 !acc = mkStream $ \_ yld sng _ ->
let stop = sng acc
single a = sng $ step acc a
-- XXX this is foldNonEmptyStream
yieldk a r = foldStream defState undefined sng yld $
yieldk a r = foldStream defState yld sng undefined $
go r (step acc a)
in foldStream defState stop single yieldk m1
in foldStream defState yieldk single stop m1
-- | Strict left associative fold.
{-# INLINE foldl' #-}
@ -395,7 +395,7 @@ foldxM step begin done m = go begin m
let stop = acc >>= done
single a = acc >>= \b -> step b a >>= done
yieldk a r = acc >>= \b -> step b a >>= \x -> go (return x) r
in foldStream defState stop single yieldk m1
in foldStream defState yieldk single stop m1
-- | Like 'foldl'' but with a monadic step function.
foldlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> m b
@ -413,7 +413,7 @@ runStream = go
let stop = return ()
single _ = return ()
yieldk _ r = go r
in foldStream defState stop single yieldk m1
in foldStream defState yieldk single stop m1
{-# INLINE null #-}
null :: (IsStream t, Monad m) => t m a -> m Bool
@ -421,7 +421,7 @@ null m =
let stop = return True
single _ = return False
yieldk _ _ = return False
in foldStream defState stop single yieldk m
in foldStream defState yieldk single stop m
{-# INLINE head #-}
head :: (IsStream t, Monad m) => t m a -> m (Maybe a)
@ -429,7 +429,7 @@ head m =
let stop = return Nothing
single a = return (Just a)
yieldk a _ = return (Just a)
in foldStream defState stop single yieldk m
in foldStream defState yieldk single stop m
{-# INLINE tail #-}
tail :: (IsStream t, Monad m) => t m a -> m (Maybe (t m a))
@ -437,7 +437,7 @@ tail m =
let stop = return Nothing
single _ = return $ Just nil
yieldk _ r = return $ Just r
in foldStream defState stop single yieldk m
in foldStream defState yieldk single stop m
{-# INLINE init #-}
init :: (IsStream t, Monad m) => t m a -> m (Maybe (t m a))
@ -448,10 +448,10 @@ init m = go1 m
case r of
Nothing -> return Nothing
Just (h, t) -> return . Just $ go h t
go p m1 = mkStream $ \_ stp sng yld ->
go p m1 = mkStream $ \_ yld sng stp ->
let single _ = sng p
yieldk a x = yld p $ go a x
in foldStream defState stp single yieldk m1
in foldStream defState yieldk single stp m1
{-# INLINE elem #-}
elem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool
@ -461,7 +461,7 @@ elem e m = go m
let stop = return False
single a = return (a == e)
yieldk a r = if a == e then return True else go r
in foldStream defState stop single yieldk m1
in foldStream defState yieldk single stop m1
{-# INLINE notElem #-}
notElem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool
@ -471,7 +471,7 @@ notElem e m = go m
let stop = return True
single a = return (a /= e)
yieldk a r = if a == e then return False else go r
in foldStream defState stop single yieldk m1
in foldStream defState yieldk single stop m1
all :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool
all p m = go m
@ -481,7 +481,7 @@ all p m = go m
| otherwise = return False
yieldk a r | p a = go r
| otherwise = return False
in foldStream defState (return True) single yieldk m1
in foldStream defState yieldk single (return True) m1
any :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool
any p m = go m
@ -491,7 +491,7 @@ any p m = go m
| otherwise = return False
yieldk a r | p a = return True
| otherwise = go r
in foldStream defState (return False) single yieldk m1
in foldStream defState yieldk single (return False) m1
-- | Extract the last element of the stream, if any.
{-# INLINE last #-}
@ -506,7 +506,7 @@ minimum m = go Nothing m
let stop = return Nothing
single a = return (Just a)
yieldk a r = go (Just a) r
in foldStream defState stop single yieldk m1
in foldStream defState yieldk single stop m1
go (Just res) m1 =
let stop = return (Just res)
@ -518,7 +518,7 @@ minimum m = go Nothing m
if res <= a
then go (Just res) r
else go (Just a) r
in foldStream defState stop single yieldk m1
in foldStream defState yieldk single stop m1
{-# INLINE minimumBy #-}
minimumBy
@ -530,7 +530,7 @@ minimumBy cmp m = go Nothing m
let stop = return Nothing
single a = return (Just a)
yieldk a r = go (Just a) r
in foldStream defState stop single yieldk m1
in foldStream defState yieldk single stop m1
go (Just res) m1 =
let stop = return (Just res)
@ -540,7 +540,7 @@ minimumBy cmp m = go Nothing m
yieldk a r = case cmp res a of
GT -> go (Just a) r
_ -> go (Just res) r
in foldStream defState stop single yieldk m1
in foldStream defState yieldk single stop m1
{-# INLINE maximum #-}
maximum :: (IsStream t, Monad m, Ord a) => t m a -> m (Maybe a)
@ -550,7 +550,7 @@ maximum m = go Nothing m
let stop = return Nothing
single a = return (Just a)
yieldk a r = go (Just a) r
in foldStream defState stop single yieldk m1
in foldStream defState yieldk single stop m1
go (Just res) m1 =
let stop = return (Just res)
@ -562,7 +562,7 @@ maximum m = go Nothing m
if res <= a
then go (Just a) r
else go (Just res) r
in foldStream defState stop single yieldk m1
in foldStream defState yieldk single stop m1
{-# INLINE maximumBy #-}
maximumBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> m (Maybe a)
@ -572,7 +572,7 @@ maximumBy cmp m = go Nothing m
let stop = return Nothing
single a = return (Just a)
yieldk a r = go (Just a) r
in foldStream defState stop single yieldk m1
in foldStream defState yieldk single stop m1
go (Just res) m1 =
let stop = return (Just res)
@ -582,7 +582,7 @@ maximumBy cmp m = go Nothing m
yieldk a r = case cmp res a of
GT -> go (Just res) r
_ -> go (Just a) r
in foldStream defState stop single yieldk m1
in foldStream defState yieldk single stop m1
{-# INLINE (!!) #-}
(!!) :: (IsStream t, Monad m) => t m a -> Int -> m (Maybe a)
@ -594,7 +594,7 @@ m !! i = go i m
yieldk a x | n < 0 = return Nothing
| n == 0 = return $ Just a
| otherwise = go (n - 1) x
in foldStream defState (return Nothing) single yieldk m1
in foldStream defState yieldk single (return Nothing) m1
{-# INLINE lookup #-}
lookup :: (IsStream t, Monad m, Eq a) => a -> t m (a, b) -> m (Maybe b)
@ -605,7 +605,7 @@ lookup e m = go m
| otherwise = return Nothing
yieldk (a, b) x | a == e = return $ Just b
| otherwise = go x
in foldStream defState (return Nothing) single yieldk m1
in foldStream defState yieldk single (return Nothing) m1
{-# INLINE findM #-}
findM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> m (Maybe a)
@ -618,7 +618,7 @@ findM p m = go m
yieldk a x = do
b <- p a
if b then return $ Just a else go x
in foldStream defState (return Nothing) single yieldk m1
in foldStream defState yieldk single (return Nothing) m1
{-# INLINE find #-}
find :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m (Maybe a)
@ -628,13 +628,13 @@ find p = findM (return . p)
findIndices :: IsStream t => (a -> Bool) -> t m a -> t m Int
findIndices p = go 0
where
go offset m1 = mkStream $ \st stp sng yld ->
go offset m1 = mkStream $ \st yld sng stp ->
let single a | p a = sng offset
| otherwise = stp
yieldk a x | p a = yld offset $ go (offset + 1) x
| otherwise = foldStream (adaptState st) stp sng yld $
| otherwise = foldStream (adaptState st) yld sng stp $
go (offset + 1) x
in foldStream (adaptState st) stp single yieldk m1
in foldStream (adaptState st) yieldk single stp m1
------------------------------------------------------------------------------
-- Map and Fold
@ -649,7 +649,7 @@ mapM_ f m = go m
let stop = return ()
single a = void (f a)
yieldk a r = f a >> go r
in foldStream defState stop single yieldk m1
in foldStream defState yieldk single stop m1
------------------------------------------------------------------------------
-- Converting folds
@ -672,12 +672,12 @@ scanx :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
scanx step begin done m =
cons (done begin) $ go m begin
where
go m1 !acc = mkStream $ \st stp sng yld ->
go m1 !acc = mkStream $ \st yld sng stp ->
let single a = sng (done $ step acc a)
yieldk a r =
let s = step acc a
in yld (done s) (go r s)
in foldStream (adaptState st) stp single yieldk m1
in foldStream (adaptState st) yieldk single stp m1
{-# INLINE scanl' #-}
scanl' :: IsStream t => (b -> a -> b) -> b -> t m a -> t m b
@ -691,56 +691,55 @@ scanl' step begin = scanx step begin id
filter :: IsStream t => (a -> Bool) -> t m a -> t m a
filter p m = go m
where
go m1 = mkStream $ \st stp sng yld ->
go m1 = mkStream $ \st yld sng stp ->
let single a | p a = sng a
| otherwise = stp
yieldk a r | p a = yld a (go r)
| otherwise = foldStream st stp single yieldk r
in foldStream st stp single yieldk m1
| otherwise = foldStream st yieldk single stp r
in foldStream st yieldk single stp m1
{-# INLINE take #-}
take :: IsStream t => Int -> t m a -> t m a
take n m = go n m
where
go n1 m1 = mkStream $ \st stp sng yld ->
go n1 m1 = mkStream $ \st yld sng stp ->
let yieldk a r = yld a (go (n1 - 1) r)
in if n1 <= 0
then stp
else foldStream st stp sng yieldk m1
else foldStream st yieldk sng stp m1
{-# INLINE takeWhile #-}
takeWhile :: IsStream t => (a -> Bool) -> t m a -> t m a
takeWhile p m = go m
where
go m1 = mkStream $ \st stp sng yld ->
go m1 = mkStream $ \st yld sng stp ->
let single a | p a = sng a
| otherwise = stp
yieldk a r | p a = yld a (go r)
| otherwise = stp
in foldStream st stp single yieldk m1
in foldStream st yieldk single stp m1
drop :: IsStream t => Int -> t m a -> t m a
drop n m = mkStream $ \st stp sng yld ->
foldStream st stp sng yld $ go n m
drop n m = go n m
where
go n1 m1 = mkStream $ \st stp sng yld ->
go n1 m1 = mkStream $ \st yld sng stp ->
let single _ = stp
yieldk _ r = foldStream st stp sng yld $ go (n1 - 1) r
yieldk _ r = foldStream st yld sng stp $ go (n1 - 1) r
-- Somehow "<=" check performs better than a ">"
in if n1 <= 0
then foldStream st stp sng yld m1
else foldStream st stp single yieldk m1
then foldStream st yld sng stp m1
else foldStream st yieldk single stp m1
{-# INLINE dropWhile #-}
dropWhile :: IsStream t => (a -> Bool) -> t m a -> t m a
dropWhile p m = go m
where
go m1 = mkStream $ \st stp sng yld ->
go m1 = mkStream $ \st yld sng stp ->
let single a | p a = stp
| otherwise = sng a
yieldk a r | p a = foldStream st stp single yieldk r
yieldk a r | p a = foldStream st yieldk single stp r
| otherwise = yld a r
in foldStream st stp single yieldk m1
in foldStream st yieldk single stp m1
-------------------------------------------------------------------------------
-- Mapping
@ -752,10 +751,10 @@ dropWhile p m = go m
mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b
mapM f m = go m
where
go m1 = mkStream $ \st stp sng yld ->
go m1 = mkStream $ \st yld sng stp ->
let single a = f a >>= sng
yieldk a r = foldStreamShared st stp sng yld $ f a |: go r
in foldStream (adaptState st) stp single yieldk m1
yieldk a r = foldStreamShared st yld sng stp $ f a |: go r
in foldStream (adaptState st) yieldk single stp m1
-- Be careful when modifying this, this uses a consM (|:) deliberately to allow
-- other stream types to overload it.
@ -763,10 +762,10 @@ mapM f m = go m
sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a
sequence m = go m
where
go m1 = mkStream $ \st stp sng yld ->
go m1 = mkStream $ \st yld sng stp ->
let single ma = ma >>= sng
yieldk ma r = foldStreamShared st stp sng yld $ ma |: go r
in foldStream (adaptState st) stp single yieldk m1
yieldk ma r = foldStreamShared st yld sng stp $ ma |: go r
in foldStream (adaptState st) yieldk single stp m1
-------------------------------------------------------------------------------
-- Inserting
@ -776,19 +775,19 @@ sequence m = go m
intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
intersperseM a m = prependingStart m
where
prependingStart m1 = mkStream $ \st stp sng yld ->
let yieldk i x = foldStreamShared st stp sng yld $ return i |: go x
in foldStream st stp sng yieldk m1
go m2 = mkStream $ \st stp sng yld ->
let single i = foldStreamShared st stp sng yld $ a |: yield i
yieldk i x = foldStreamShared st stp sng yld $ a |: return i |: go x
in foldStream st stp single yieldk m2
prependingStart m1 = mkStream $ \st yld sng stp ->
let yieldk i x = foldStreamShared st yld sng stp $ return i |: go x
in foldStream st yieldk sng stp m1
go m2 = mkStream $ \st yld sng stp ->
let single i = foldStreamShared st yld sng stp $ a |: yield i
yieldk i x = foldStreamShared st yld sng stp $ a |: return i |: go x
in foldStream st yieldk single stp m2
{-# INLINE insertBy #-}
insertBy :: IsStream t => (a -> a -> Ordering) -> a -> t m a -> t m a
insertBy cmp x m = go m
where
go m1 = mkStream $ \st _ _ yld ->
go m1 = mkStream $ \st yld _ _ ->
let single a = case cmp x a of
GT -> yld a (yield x)
_ -> yld x (yield a)
@ -796,7 +795,7 @@ insertBy cmp x m = go m
yieldk a r = case cmp x a of
GT -> yld a (go r)
_ -> yld x (a `cons` r)
in foldStream st stop single yieldk m1
in foldStream st yieldk single stop m1
------------------------------------------------------------------------------
-- Deleting
@ -806,12 +805,12 @@ insertBy cmp x m = go m
deleteBy :: IsStream t => (a -> a -> Bool) -> a -> t m a -> t m a
deleteBy eq x m = go m
where
go m1 = mkStream $ \st stp sng yld ->
go m1 = mkStream $ \st yld sng stp ->
let single a = if eq x a then stp else sng a
yieldk a r = if eq x a
then foldStream st stp sng yld r
then foldStream st yld sng stp r
else yld a (go r)
in foldStream st stp single yieldk m1
in foldStream st yieldk single stp m1
-------------------------------------------------------------------------------
-- Map and Filter
@ -821,14 +820,14 @@ deleteBy eq x m = go m
mapMaybe :: IsStream t => (a -> Maybe b) -> t m a -> t m b
mapMaybe f m = go m
where
go m1 = mkStream $ \st stp sng yld ->
go m1 = mkStream $ \st yld sng stp ->
let single a = case f a of
Just b -> sng b
Nothing -> stp
yieldk a r = case f a of
Just b -> yld b $ go r
Nothing -> foldStream (adaptState st) stp single yieldk r
in foldStream (adaptState st) stp single yieldk m1
Nothing -> foldStream (adaptState st) yieldk single stp r
in foldStream (adaptState st) yieldk single stp m1
------------------------------------------------------------------------------
-- Serial Zipping
@ -841,14 +840,14 @@ mapMaybe f m = go m
zipWith :: IsStream t => (a -> b -> c) -> t m a -> t m b -> t m c
zipWith f = go
where
go mx my = mkStream $ \st stp sng yld -> do
go mx my = mkStream $ \st yld sng stp -> do
let merge a ra =
let single2 b = sng (f a b)
yield2 b rb = yld (f a b) (go ra rb)
in foldStream (adaptState st) stp single2 yield2 my
in foldStream (adaptState st) yield2 single2 stp my
let single1 a = merge a nil
yield1 = merge
foldStream (adaptState st) stp single1 yield1 mx
foldStream (adaptState st) yield1 single1 stp mx
-- | Zip two streams serially using a monadic zipping function.
--
@ -856,15 +855,15 @@ zipWith f = go
zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c
zipWithM f m1 m2 = go m1 m2
where
go mx my = mkStream $ \st stp sng yld -> do
go mx my = mkStream $ \st yld sng stp -> do
let merge a ra =
let runIt x = foldStream st stp sng yld x
let runIt x = foldStream st yld sng stp x
single2 b = f a b >>= sng
yield2 b rb = f a b >>= \x -> runIt (x `cons` go ra rb)
in foldStream (adaptState st) stp single2 yield2 my
in foldStream (adaptState st) yield2 single2 stp my
let single1 a = merge a nil
yield1 = merge
foldStream (adaptState st) stp single1 yield1 mx
foldStream (adaptState st) yield1 single1 stp mx
------------------------------------------------------------------------------
-- Merging
@ -876,9 +875,9 @@ mergeByM
=> (a -> a -> m Ordering) -> t m a -> t m a -> t m a
mergeByM cmp = go
where
go mx my = mkStream $ \st stp sng yld -> do
go mx my = mkStream $ \st yld sng stp -> do
let mergeWithY a ra =
let stop2 = foldStream st stp sng yld mx
let stop2 = foldStream st yld sng stp mx
single2 b = do
r <- cmp a b
case r of
@ -889,11 +888,11 @@ mergeByM cmp = go
case r of
GT -> yld b (go (a `cons` ra) rb)
_ -> yld a (go ra (b `cons` rb))
in foldStream st stop2 single2 yield2 my
let stopX = foldStream st stp sng yld my
in foldStream st yield2 single2 stop2 my
let stopX = foldStream st yld sng stp my
singleX a = mergeWithY a nil
yieldX = mergeWithY
foldStream st stopX singleX yieldX mx
foldStream st yieldX singleX stopX mx
{-# INLINABLE mergeBy #-}
mergeBy
@ -918,7 +917,7 @@ the m = do
| otherwise = return Nothing
yieldk a r | h == a = go h r
| otherwise = return Nothing
in foldStream defState (return $ Just h) single yieldk m1
in foldStream defState yieldk single (return $ Just h) m1
-------------------------------------------------------------------------------
-- Bind utility
@ -934,20 +933,20 @@ bindWith
bindWith par m1 f = go m1
where
go m =
mkStream $ \st stp sng yld ->
let foldShared = foldStreamShared st stp sng yld
mkStream $ \st yld sng stp ->
let foldShared = foldStreamShared st yld sng stp
single a = foldShared $ unShare (f a)
yieldk a r = foldShared $ unShare (f a) `par` go r
in foldStream (adaptState st) stp single yieldk m
in foldStream (adaptState st) yieldk single stp m
------------------------------------------------------------------------------
-- Alternative & MonadPlus
------------------------------------------------------------------------------
_alt :: Stream m a -> Stream m a -> Stream m a
_alt m1 m2 = mkStream $ \st stp sng yld ->
let stop = foldStream st stp sng yld m2
in foldStream st stop sng yld m1
_alt m1 m2 = mkStream $ \st yld sng stp ->
let stop = foldStream st yld sng stp m2
in foldStream st yld sng stop m1
------------------------------------------------------------------------------
-- MonadReader
@ -955,10 +954,10 @@ _alt m1 m2 = mkStream $ \st stp sng yld ->
withLocal :: MonadReader r m => (r -> r) -> Stream m a -> Stream m a
withLocal f m =
mkStream $ \st stp sng yld ->
mkStream $ \st yld sng stp ->
let single = local f . sng
yieldk a r = local f $ yld a (withLocal f r)
in foldStream st (local f stp) single yieldk m
in foldStream st yieldk single (local f stp) m
------------------------------------------------------------------------------
-- MonadError

View File

@ -90,9 +90,9 @@ import Streamly.SVar
newtype Stream m a =
MkStream (forall r.
State Stream m a -- state
-> m r -- stop
-> (a -> m r) -- singleton
-> (a -> Stream m a -> m r) -- yield
-> (a -> m r) -- singleton
-> m r -- stop
-> m r
)
@ -182,21 +182,18 @@ adapt = fromStream . toStream
-- Currently we always use "SVar Stream" and therefore a different State type
-- parameterized by that stream.
--
-- XXX change the order of arguments to match the standard fold function
-- convention i.e. st yield single stop.
--
-- | Build a stream from an 'SVar', a stop continuation, a singleton stream
-- continuation and a yield continuation.
mkStream:: IsStream t
=> (forall r. State Stream m a
-> m r
-> (a -> m r)
-> (a -> t m a -> m r)
-> (a -> m r)
-> m r
-> m r)
-> t m a
mkStream k = fromStream $ MkStream $ \st stp sng yld ->
mkStream k = fromStream $ MkStream $ \st yld sng stp ->
let yieldk a r = yld a (toStream r)
in k st stp sng yieldk
in k st yieldk sng stp
-- | A terminal function that has no continuation to follow.
type StopK m = forall r. m r -> m r
@ -212,7 +209,7 @@ _wrapM m = \k -> m >>= k
-- | Make an empty stream from a stop function.
fromStopK :: IsStream t => StopK m -> t m a
fromStopK k = mkStream $ \_ stp _ _ -> k stp
fromStopK k = mkStream $ \_ _ _ stp -> k stp
-- | Make a singleton stream from a yield function.
fromYieldK :: IsStream t => YieldK m a -> t m a
@ -220,7 +217,7 @@ fromYieldK k = mkStream $ \_ _ sng _ -> k sng
-- | Add a yield function at the head of the stream.
consK :: IsStream t => YieldK m a -> t m a -> t m a
consK k r = mkStream $ \_ _ _ yld -> k (\x -> yld x r)
consK k r = mkStream $ \_ yld _ _ -> k (\x -> yld x r)
-- XXX Build a stream from a repeating callback function.
@ -235,15 +232,15 @@ consK k r = mkStream $ \_ _ _ yld -> k (\x -> yld x r)
foldStreamShared
:: IsStream t
=> State Stream m a
-> m r
-> (a -> m r)
-> (a -> t m a -> m r)
-> (a -> m r)
-> m r
-> t m a
-> m r
foldStreamShared st stp sng yld m =
foldStreamShared st yld sng stp m =
let yieldk a x = yld a (fromStream x)
MkStream k = toStream m
in k st stp sng yieldk
in k st yieldk sng stp
-- | Fold a stream by providing a State, stop continuation, a singleton
-- continuation and a yield continuation. The stream will not use the SVar
@ -252,15 +249,15 @@ foldStreamShared st stp sng yld m =
foldStream
:: IsStream t
=> State Stream m a
-> m r
-> (a -> m r)
-> (a -> t m a -> m r)
-> (a -> m r)
-> m r
-> t m a
-> m r
foldStream st stp sng yld m =
foldStream st yld sng stp m =
let yieldk a x = yld a (fromStream x)
MkStream k = toStream m
in k (adaptState st) stp sng yieldk
in k (adaptState st) yieldk sng stp
-- Run the stream using a run function associated with the SVar that runs the
-- streams with a captured snapshot of the monadic state.
@ -269,14 +266,14 @@ foldStreamSVar
:: (IsStream t, MonadIO m)
=> SVar Stream m a
-> State Stream m a -- state
-> m r -- stop
-> (a -> m r) -- singleton
-> (a -> t m a -> m r) -- yield
-> (a -> m r) -- singleton
-> m r -- stop
-> t m a
-> m ()
foldStreamSVar sv st stp sng yld m =
foldStreamSVar sv st yld sng stp m =
let mrun = runInIO $ svarMrun sv
in void $ liftIO $ mrun $ foldStreamShared st stp sng yld m
in void $ liftIO $ mrun $ foldStreamShared st yld sng stp m
-------------------------------------------------------------------------------
-- Instances
@ -284,7 +281,7 @@ foldStreamSVar sv st stp sng yld m =
{-# INLINE consMSerial #-}
consMSerial :: (IsStream t, Monad m) => m a -> t m a -> t m a
consMSerial m r = mkStream $ \_ _ _ yld -> m >>= \a -> yld a r
consMSerial m r = mkStream $ \_ yld _ _ -> m >>= \a -> yld a r
-------------------------------------------------------------------------------
-- IsStream Stream
@ -317,11 +314,11 @@ instance IsStream Stream where
serial :: IsStream t => t m a -> t m a -> t m a
serial m1 m2 = go m1
where
go m = mkStream $ \st stp sng yld ->
let stop = foldStream st stp sng yld m2
go m = mkStream $ \st yld sng stp ->
let stop = foldStream st yld sng stp m2
single a = yld a m2
yieldk a r = yld a (go r)
in foldStream st stop single yieldk m
in foldStream st yieldk single stop m
instance Semigroup (Stream m a) where
(<>) = serial
@ -339,7 +336,7 @@ instance Semigroup (Stream m a) where
--
-- @since 0.1.0
nil :: IsStream t => t m a
nil = mkStream $ \_ stp _ _ -> stp
nil = mkStream $ \_ _ _ stp -> stp
instance Monoid (Stream m a) where
mempty = nil
@ -352,10 +349,10 @@ instance Monoid (Stream m a) where
{-# INLINE map #-}
map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b
map f m = mkStream $ \st stp sng yld ->
map f m = mkStream $ \st yld sng stp ->
let single = sng . f
yieldk a r = yld (f a) (map f r)
in foldStream (adaptState st) stp single yieldk m
in foldStream (adaptState st) yieldk single stp m
instance Monad m => Functor (Stream m) where
fmap = map