From 0a2755a48c69a9022aa12a528b9693ad7a530933 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Wed, 26 Dec 2018 16:31:38 +0530 Subject: [PATCH] Change the order of foldStream arguments To make it consistent with regular fold function conventions. --- src/Streamly/Prelude.hs | 10 +- src/Streamly/Streams/Ahead.hs | 28 ++-- src/Streamly/Streams/Async.hs | 16 +-- src/Streamly/Streams/Parallel.hs | 16 +-- src/Streamly/Streams/SVar.hs | 18 +-- src/Streamly/Streams/Serial.hs | 18 +-- src/Streamly/Streams/StreamD.hs | 6 +- src/Streamly/Streams/StreamK.hs | 197 ++++++++++++++-------------- src/Streamly/Streams/StreamKType.hs | 57 ++++---- 9 files changed, 183 insertions(+), 183 deletions(-) diff --git a/src/Streamly/Prelude.hs b/src/Streamly/Prelude.hs index 2594df96c..1d9706bc4 100644 --- a/src/Streamly/Prelude.hs +++ b/src/Streamly/Prelude.hs @@ -896,7 +896,7 @@ each = K.fromFoldable fromHandle :: (IsStream t, MonadIO m) => IO.Handle -> t m String fromHandle h = go where - go = K.mkStream $ \_ stp _ yld -> do + go = K.mkStream $ \_ yld _ stp -> do eof <- liftIO $ IO.hIsEOF h if eof then stp @@ -1449,7 +1449,7 @@ toHandle h m = go m let stop = return () single a = liftIO (IO.hPutStrLn h a) yieldk a r = liftIO (IO.hPutStrLn h a) >> go r - in K.foldStream defState stop single yieldk m1 + in K.foldStream defState yieldk single stop m1 ------------------------------------------------------------------------------ -- Transformation by Folding (Scans) @@ -1770,12 +1770,12 @@ mapMaybeMSerial f m = fromStreamD $ D.mapMaybeM f $ toStreamD m reverse :: (IsStream t) => t m a -> t m a reverse m = go K.nil m where - go rev rest = K.mkStream $ \st stp sng yld -> - let runIt x = K.foldStream st stp sng yld x + go rev rest = K.mkStream $ \st yld sng stp -> + let runIt x = K.foldStream st yld sng stp x stop = runIt rev single a = runIt $ a `K.cons` rev yieldk a r = runIt $ go (a `K.cons` rev) r - in K.foldStream st stop single yieldk rest + in K.foldStream st yieldk single stop rest ------------------------------------------------------------------------------ -- Transformation by Inserting diff --git a/src/Streamly/Streams/Ahead.hs b/src/Streamly/Streams/Ahead.hs index 2f4c6575c..8e76a5380 100644 --- a/src/Streamly/Streams/Ahead.hs +++ b/src/Streamly/Streams/Ahead.hs @@ -298,9 +298,10 @@ processHeap q heap st sv winfo entry sno stopping = loopHeap sno entry let stop = do liftIO (incrementYieldLimit sv) nextHeap seqNo - foldStreamSVar sv st stop - (singleStreamFromHeap seqNo) + foldStreamSVar sv st (yieldStreamFromHeap seqNo) + (singleStreamFromHeap seqNo) + stop r else liftIO $ do let ent = Entry seqNo (AheadEntryStream r) @@ -349,9 +350,10 @@ processWithoutToken q heap st sv winfo m seqNo = do -- we stop. toHeap AheadEntryNull - foldStreamSVar sv st stop - (toHeap . AheadEntryPure) + foldStreamSVar sv st (\a r -> toHeap $ AheadEntryStream $ K.cons a r) + (toHeap . AheadEntryPure) + stop m where @@ -410,7 +412,7 @@ processWithToken q heap st sv winfo action sno = do liftIO (incrementYieldLimit sv) loopWithToken (sno + 1) - foldStreamSVar sv st stop (singleOutput sno) (yieldOutput sno) action + foldStreamSVar sv st (yieldOutput sno) (singleOutput sno) stop action where @@ -433,9 +435,10 @@ processWithToken q heap st sv winfo action sno = do let stop = do liftIO (incrementYieldLimit sv) loopWithToken (seqNo + 1) - foldStreamSVar sv st stop - (singleOutput seqNo) + foldStreamSVar sv st (yieldOutput seqNo) + (singleOutput seqNo) + stop r else do let ent = Entry seqNo (AheadEntryStream r) @@ -463,9 +466,10 @@ processWithToken q heap st sv winfo action sno = do let stop = do liftIO (incrementYieldLimit sv) loopWithToken (seqNo + 1) - foldStreamSVar sv st stop - (singleOutput seqNo) + foldStreamSVar sv st (yieldOutput seqNo) + (singleOutput seqNo) + stop m else -- To avoid a race when another thread puts something @@ -684,11 +688,11 @@ aheadbind aheadbind m f = go m where go g = - mkStream $ \st stp sng yld -> - let foldShared = foldStreamShared st stp sng yld + mkStream $ \st yld sng stp -> + let foldShared = foldStreamShared st yld sng stp single a = foldShared $ unShare (f a) yieldk a r = foldShared $ unShare (f a) `ahead` go r - in foldStream (adaptState st) stp single yieldk g + in foldStream (adaptState st) yieldk single stp g instance MonadAsync m => Monad (AheadT m) where return = pure diff --git a/src/Streamly/Streams/Async.hs b/src/Streamly/Streams/Async.hs index 762ebd487..001b9e7ba 100644 --- a/src/Streamly/Streams/Async.hs +++ b/src/Streamly/Streams/Async.hs @@ -84,7 +84,7 @@ workLoopLIFO q st sv winfo = run work <- dequeue case work of Nothing -> liftIO $ sendStop sv winfo - Just m -> foldStreamSVar sv st run single yieldk m + Just m -> foldStreamSVar sv st yieldk single run m single a = do res <- liftIO $ sendYield sv winfo (ChildYield a) @@ -93,7 +93,7 @@ workLoopLIFO q st sv winfo = run yieldk a r = do res <- liftIO $ sendYield sv winfo (ChildYield a) if res - then foldStreamSVar sv st run single yieldk r + then foldStreamSVar sv st yieldk single run r else liftIO $ do enqueueLIFO sv q r sendStop sv winfo @@ -134,7 +134,7 @@ workLoopLIFOLimited q st sv winfo = run if yieldLimitOk then do let stop = liftIO (incrementYieldLimit sv) >> run - foldStreamSVar sv st stop single yieldk m + foldStreamSVar sv st yieldk single stop m -- Avoid any side effects, undo the yield limit decrement if we -- never yielded anything. else liftIO $ do @@ -153,7 +153,7 @@ workLoopLIFOLimited q st sv winfo = run yieldLimitOk <- liftIO $ decrementYieldLimit sv let stop = liftIO (incrementYieldLimit sv) >> run if res && yieldLimitOk - then foldStreamSVar sv st stop single yieldk r + then foldStreamSVar sv st yieldk single stop r else liftIO $ do incrementYieldLimit sv enqueueLIFO sv q r @@ -185,7 +185,7 @@ workLoopFIFO q st sv winfo = run work <- liftIO $ tryPopR q case work of Nothing -> liftIO $ sendStop sv winfo - Just m -> foldStreamSVar sv st run single yieldk m + Just m -> foldStreamSVar sv st yieldk single run m single a = do res <- liftIO $ sendYield sv winfo (ChildYield a) @@ -194,7 +194,7 @@ workLoopFIFO q st sv winfo = run yieldk a r = do res <- liftIO $ sendYield sv winfo (ChildYield a) if res - then foldStreamSVar sv st run single yieldk r + then foldStreamSVar sv st yieldk single run r else liftIO $ do enqueueFIFO sv q r sendStop sv winfo @@ -220,7 +220,7 @@ workLoopFIFOLimited q st sv winfo = run if yieldLimitOk then do let stop = liftIO (incrementYieldLimit sv) >> run - foldStreamSVar sv st stop single yieldk m + foldStreamSVar sv st yieldk single stop m else liftIO $ do enqueueFIFO sv q m incrementYieldLimit sv @@ -235,7 +235,7 @@ workLoopFIFOLimited q st sv winfo = run yieldLimitOk <- liftIO $ decrementYieldLimit sv let stop = liftIO (incrementYieldLimit sv) >> run if res && yieldLimitOk - then foldStreamSVar sv st stop single yieldk r + then foldStreamSVar sv st yieldk single stop r else liftIO $ do incrementYieldLimit sv enqueueFIFO sv q r diff --git a/src/Streamly/Streams/Parallel.hs b/src/Streamly/Streams/Parallel.hs index e538d8daf..5dff674c0 100644 --- a/src/Streamly/Streams/Parallel.hs +++ b/src/Streamly/Streams/Parallel.hs @@ -63,7 +63,7 @@ import qualified Streamly.Streams.StreamK as K runOne :: MonadIO m => State Stream m a -> Stream m a -> Maybe WorkerInfo -> m () -runOne st m winfo = foldStreamShared st stop single yieldk m +runOne st m winfo = foldStreamShared st yieldk single stop m where @@ -89,21 +89,21 @@ runOne st m winfo = foldStreamShared st stop single yieldk m {-# NOINLINE forkSVarPar #-} forkSVarPar :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a -forkSVarPar m r = mkStream $ \st stp sng yld -> do +forkSVarPar m r = mkStream $ \st yld sng stp -> do sv <- newParallelVar st pushWorkerPar sv (runOne st{streamVar = Just sv} $ toStream m) pushWorkerPar sv (runOne st{streamVar = Just sv} $ toStream r) - foldStream st stp sng yld (fromSVar sv) + foldStream st yld sng stp (fromSVar sv) {-# INLINE joinStreamVarPar #-} joinStreamVarPar :: (IsStream t, MonadAsync m) => SVarStyle -> t m a -> t m a -> t m a -joinStreamVarPar style m1 m2 = mkStream $ \st stp sng yld -> +joinStreamVarPar style m1 m2 = mkStream $ \st yld sng stp -> case streamVar st of Just sv | svarStyle sv == style -> do pushWorkerPar sv (runOne st $ toStream m1) - foldStreamShared st stp sng yld m2 - _ -> foldStreamShared st stp sng yld (forkSVarPar m1 m2) + foldStreamShared st yld sng stp m2 + _ -> foldStreamShared st yld sng stp (forkSVarPar m1 m2) -- | XXX we can implement it more efficienty by directly implementing instead -- of combining streams using parallel. @@ -135,10 +135,10 @@ mkParallel m = do {-# INLINE applyWith #-} applyWith :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b -applyWith f m = mkStream $ \st stp sng yld -> do +applyWith f m = mkStream $ \st yld sng stp -> do sv <- newParallelVar (adaptState st) pushWorkerPar sv (runOne st{streamVar = Just sv} (toStream m)) - foldStream st stp sng yld $ f $ fromSVar sv + foldStream st yld sng stp $ f $ fromSVar sv ------------------------------------------------------------------------------ -- Stream runner concurrent function application diff --git a/src/Streamly/Streams/SVar.hs b/src/Streamly/Streams/SVar.hs index b2c683514..7d30e091e 100644 --- a/src/Streamly/Streams/SVar.hs +++ b/src/Streamly/Streams/SVar.hs @@ -38,12 +38,12 @@ printSVar sv how = do -- | Pull a stream from an SVar. {-# NOINLINE fromStreamVar #-} fromStreamVar :: MonadAsync m => SVar Stream m a -> Stream m a -fromStreamVar sv = mkStream $ \st stp sng yld -> do +fromStreamVar sv = mkStream $ \st yld sng stp -> do list <- readOutputQ sv -- Reversing the output is important to guarantee that we process the -- outputs in the same order as they were generated by the constituent -- streams. - foldStream st stp sng yld $ processEvents $ reverse list + foldStream st yld sng stp $ processEvents $ reverse list where @@ -55,36 +55,36 @@ fromStreamVar sv = mkStream $ \st stp sng yld -> do stp {-# INLINE processEvents #-} - processEvents [] = mkStream $ \st stp sng yld -> do + processEvents [] = mkStream $ \st yld sng stp -> do done <- postProcess sv if done then allDone stp - else foldStream st stp sng yld $ fromStreamVar sv + else foldStream st yld sng stp $ fromStreamVar sv - processEvents (ev : es) = mkStream $ \st stp sng yld -> do + processEvents (ev : es) = mkStream $ \st yld sng stp -> do let rest = processEvents es case ev of ChildYield a -> yld a rest ChildStop tid e -> do accountThread sv tid case e of - Nothing -> foldStream st stp sng yld rest + Nothing -> foldStream st yld sng stp rest Just ex -> case fromException ex of Just ThreadAbort -> - foldStream st stp sng yld rest + foldStream st yld sng stp rest Nothing -> liftIO (cleanupSVar sv) >> throwM ex {-# INLINE fromSVar #-} fromSVar :: (MonadAsync m, IsStream t) => SVar Stream m a -> t m a fromSVar sv = - mkStream $ \st stp sng yld -> do + mkStream $ \st yld sng stp -> do ref <- liftIO $ newIORef () _ <- liftIO $ mkWeakIORef ref hook -- We pass a copy of sv to fromStreamVar, so that we know that it has -- no other references, when that copy gets garbage collected "ref" -- will get garbage collected and our hook will be called. - foldStreamShared st stp sng yld $ + foldStreamShared st yld sng stp $ fromStream $ fromStreamVar sv{svarRef = Just ref} where diff --git a/src/Streamly/Streams/Serial.hs b/src/Streamly/Streams/Serial.hs index 1a3043c6b..e50f33ac2 100644 --- a/src/Streamly/Streams/Serial.hs +++ b/src/Streamly/Streams/Serial.hs @@ -174,11 +174,11 @@ instance IsStream SerialT where -- instance Monad m => Monad (SerialT m) where return = pure - m >>= f = mkStream $ \st stp sng yld -> - let run x = foldStream st stp sng yld x + m >>= f = mkStream $ \st yld sng stp -> + let run x = foldStream st yld sng stp x single a = run $ f a yieldk a r = run $ f a <> (r >>= f) - in foldStream (adaptState st) stp single yieldk m + in foldStream (adaptState st) yieldk single stp m ------------------------------------------------------------------------------ -- Other instances @@ -300,11 +300,11 @@ instance IsStream WSerialT where -- @since 0.2.0 {-# INLINE wSerial #-} wSerial :: IsStream t => t m a -> t m a -> t m a -wSerial m1 m2 = mkStream $ \st stp sng yld -> do - let stop = foldStream st stp sng yld m2 +wSerial m1 m2 = mkStream $ \st yld sng stp -> do + let stop = foldStream st yld sng stp m2 single a = yld a m2 yieldk a r = yld a (wSerial m2 r) - foldStream st stop single yieldk m1 + foldStream st yieldk single stop m1 instance Semigroup (WSerialT m a) where (<>) = wSerial @@ -337,11 +337,11 @@ instance Monoid (WSerialT m a) where -- instance Monad m => Monad (WSerialT m) where return = pure - m >>= f = mkStream $ \st stp sng yld -> - let run x = foldStream st stp sng yld x + m >>= f = mkStream $ \st yld sng stp -> + let run x = foldStream st yld sng stp x single a = run $ f a yieldk a r = run $ f a <> (r >>= f) - in foldStream (adaptState st) stp single yieldk m + in foldStream (adaptState st) yieldk single stp m ------------------------------------------------------------------------------ -- Other instances diff --git a/src/Streamly/Streams/StreamD.hs b/src/Streamly/Streams/StreamD.hs index 581b7f3de..f5f0d1187 100644 --- a/src/Streamly/Streams/StreamD.hs +++ b/src/Streamly/Streams/StreamD.hs @@ -483,7 +483,7 @@ fromStreamK = Stream step let stop = return Stop single a = return $ Yield a K.nil yieldk a r = return $ Yield a r - in K.foldStreamShared gst stop single yieldk m1 + in K.foldStreamShared gst yieldk single stop m1 {-# INLINE toStreamD #-} toStreamD :: (K.IsStream t, Monad m) => t m a -> Stream m a @@ -882,11 +882,11 @@ toList = foldr (:) [] toStreamK :: Monad m => Stream m a -> K.Stream m a toStreamK (Stream step state) = go state where - go st = K.mkStream $ \gst stp sng yld -> do + go st = K.mkStream $ \gst yld sng stp -> do r <- step gst st case r of Yield x s -> yld x (go s) - Skip s -> K.foldStreamShared gst stp sng yld $ go s + Skip s -> K.foldStreamShared gst yld sng stp $ go s Stop -> stp #ifndef DISABLE_FUSION diff --git a/src/Streamly/Streams/StreamK.hs b/src/Streamly/Streams/StreamK.hs index 96da5afe4..40a08c60e 100644 --- a/src/Streamly/Streams/StreamK.hs +++ b/src/Streamly/Streams/StreamK.hs @@ -174,8 +174,8 @@ import Streamly.Streams.StreamKType -- | Detach a stream from an SVar {-# INLINE unShare #-} unShare :: IsStream t => t m a -> t m a -unShare x = mkStream $ \st stp sng yld -> - foldStream st stp sng yld x +unShare x = mkStream $ \st yld sng stp -> + foldStream st yld sng stp x ------------------------------------------------------------------------------ -- Construction @@ -196,7 +196,7 @@ infixr 5 `cons` -- -- @since 0.1.0 cons :: IsStream t => a -> t m a -> t m a -cons a r = mkStream $ \_ _ _ yld -> yld a r +cons a r = mkStream $ \_ yld _ _ -> yld a r infixr 5 .: @@ -221,7 +221,7 @@ uncons m = let stop = return Nothing single a = return (Just (a, nil)) yieldk a r = return (Just (a, r)) - in foldStream defState stop single yieldk m + in foldStream defState yieldk single stop m ------------------------------------------------------------------------------- -- Generation @@ -231,7 +231,7 @@ uncons m = unfoldr :: IsStream t => (b -> Maybe (a, b)) -> b -> t m a unfoldr step = go where - go s = mkStream $ \_ stp _ yld -> + go s = mkStream $ \_ yld _ stp -> case step s of Nothing -> stp Just (a, b) -> yld a (go b) @@ -240,12 +240,12 @@ unfoldr step = go unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a unfoldrM step = go where - go s = mkStream $ \st stp sng yld -> do + go s = mkStream $ \st yld sng stp -> do mayb <- step s case mayb of Nothing -> stp Just (a, b) -> - foldStreamShared st stp sng yld $ return a |: go b + foldStreamShared st yld sng stp $ return a |: go b ------------------------------------------------------------------------------- -- Special generation @@ -320,7 +320,7 @@ foldr step acc m = go m let stop = return acc single a = return (step a acc) yieldk a r = go r >>= \b -> return (step a b) - in foldStream defState stop single yieldk m1 + in foldStream defState yieldk single stop m1 -- | Lazy right fold with a monadic step function. {-# INLINE foldrM #-} @@ -331,7 +331,7 @@ foldrM step acc m = go m let stop = return acc single a = step a acc yieldk a r = go r >>= step a - in foldStream defState stop single yieldk m1 + in foldStream defState yieldk single stop m1 {-# INLINE foldr1 #-} foldr1 :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> m (Maybe a) @@ -345,7 +345,7 @@ foldr1 step m = do let stp = return p single a = return $ step a p yieldk a r = fmap (step p) (go a r) - in foldStream defState stp single yieldk m1 + in foldStream defState yieldk single stp m1 -- | Strict left fold with an extraction function. Like the standard strict -- left fold, but applies a user supplied extraction function (the third @@ -372,13 +372,13 @@ foldx step begin done m = get $ go m begin -- that cannot be tail call optimized. Unfolding recursion explicitly via -- continuations is much more efficient. go :: t m a -> x -> t m x - go m1 !acc = mkStream $ \_ _ sng yld -> + go m1 !acc = mkStream $ \_ yld sng _ -> let stop = sng acc single a = sng $ step acc a -- XXX this is foldNonEmptyStream - yieldk a r = foldStream defState undefined sng yld $ + yieldk a r = foldStream defState yld sng undefined $ go r (step acc a) - in foldStream defState stop single yieldk m1 + in foldStream defState yieldk single stop m1 -- | Strict left associative fold. {-# INLINE foldl' #-} @@ -395,7 +395,7 @@ foldxM step begin done m = go begin m let stop = acc >>= done single a = acc >>= \b -> step b a >>= done yieldk a r = acc >>= \b -> step b a >>= \x -> go (return x) r - in foldStream defState stop single yieldk m1 + in foldStream defState yieldk single stop m1 -- | Like 'foldl'' but with a monadic step function. foldlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> m b @@ -413,7 +413,7 @@ runStream = go let stop = return () single _ = return () yieldk _ r = go r - in foldStream defState stop single yieldk m1 + in foldStream defState yieldk single stop m1 {-# INLINE null #-} null :: (IsStream t, Monad m) => t m a -> m Bool @@ -421,7 +421,7 @@ null m = let stop = return True single _ = return False yieldk _ _ = return False - in foldStream defState stop single yieldk m + in foldStream defState yieldk single stop m {-# INLINE head #-} head :: (IsStream t, Monad m) => t m a -> m (Maybe a) @@ -429,7 +429,7 @@ head m = let stop = return Nothing single a = return (Just a) yieldk a _ = return (Just a) - in foldStream defState stop single yieldk m + in foldStream defState yieldk single stop m {-# INLINE tail #-} tail :: (IsStream t, Monad m) => t m a -> m (Maybe (t m a)) @@ -437,7 +437,7 @@ tail m = let stop = return Nothing single _ = return $ Just nil yieldk _ r = return $ Just r - in foldStream defState stop single yieldk m + in foldStream defState yieldk single stop m {-# INLINE init #-} init :: (IsStream t, Monad m) => t m a -> m (Maybe (t m a)) @@ -448,10 +448,10 @@ init m = go1 m case r of Nothing -> return Nothing Just (h, t) -> return . Just $ go h t - go p m1 = mkStream $ \_ stp sng yld -> + go p m1 = mkStream $ \_ yld sng stp -> let single _ = sng p yieldk a x = yld p $ go a x - in foldStream defState stp single yieldk m1 + in foldStream defState yieldk single stp m1 {-# INLINE elem #-} elem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool @@ -461,7 +461,7 @@ elem e m = go m let stop = return False single a = return (a == e) yieldk a r = if a == e then return True else go r - in foldStream defState stop single yieldk m1 + in foldStream defState yieldk single stop m1 {-# INLINE notElem #-} notElem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool @@ -471,7 +471,7 @@ notElem e m = go m let stop = return True single a = return (a /= e) yieldk a r = if a == e then return False else go r - in foldStream defState stop single yieldk m1 + in foldStream defState yieldk single stop m1 all :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool all p m = go m @@ -481,7 +481,7 @@ all p m = go m | otherwise = return False yieldk a r | p a = go r | otherwise = return False - in foldStream defState (return True) single yieldk m1 + in foldStream defState yieldk single (return True) m1 any :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool any p m = go m @@ -491,7 +491,7 @@ any p m = go m | otherwise = return False yieldk a r | p a = return True | otherwise = go r - in foldStream defState (return False) single yieldk m1 + in foldStream defState yieldk single (return False) m1 -- | Extract the last element of the stream, if any. {-# INLINE last #-} @@ -506,7 +506,7 @@ minimum m = go Nothing m let stop = return Nothing single a = return (Just a) yieldk a r = go (Just a) r - in foldStream defState stop single yieldk m1 + in foldStream defState yieldk single stop m1 go (Just res) m1 = let stop = return (Just res) @@ -518,7 +518,7 @@ minimum m = go Nothing m if res <= a then go (Just res) r else go (Just a) r - in foldStream defState stop single yieldk m1 + in foldStream defState yieldk single stop m1 {-# INLINE minimumBy #-} minimumBy @@ -530,7 +530,7 @@ minimumBy cmp m = go Nothing m let stop = return Nothing single a = return (Just a) yieldk a r = go (Just a) r - in foldStream defState stop single yieldk m1 + in foldStream defState yieldk single stop m1 go (Just res) m1 = let stop = return (Just res) @@ -540,7 +540,7 @@ minimumBy cmp m = go Nothing m yieldk a r = case cmp res a of GT -> go (Just a) r _ -> go (Just res) r - in foldStream defState stop single yieldk m1 + in foldStream defState yieldk single stop m1 {-# INLINE maximum #-} maximum :: (IsStream t, Monad m, Ord a) => t m a -> m (Maybe a) @@ -550,7 +550,7 @@ maximum m = go Nothing m let stop = return Nothing single a = return (Just a) yieldk a r = go (Just a) r - in foldStream defState stop single yieldk m1 + in foldStream defState yieldk single stop m1 go (Just res) m1 = let stop = return (Just res) @@ -562,7 +562,7 @@ maximum m = go Nothing m if res <= a then go (Just a) r else go (Just res) r - in foldStream defState stop single yieldk m1 + in foldStream defState yieldk single stop m1 {-# INLINE maximumBy #-} maximumBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> m (Maybe a) @@ -572,7 +572,7 @@ maximumBy cmp m = go Nothing m let stop = return Nothing single a = return (Just a) yieldk a r = go (Just a) r - in foldStream defState stop single yieldk m1 + in foldStream defState yieldk single stop m1 go (Just res) m1 = let stop = return (Just res) @@ -582,7 +582,7 @@ maximumBy cmp m = go Nothing m yieldk a r = case cmp res a of GT -> go (Just res) r _ -> go (Just a) r - in foldStream defState stop single yieldk m1 + in foldStream defState yieldk single stop m1 {-# INLINE (!!) #-} (!!) :: (IsStream t, Monad m) => t m a -> Int -> m (Maybe a) @@ -594,7 +594,7 @@ m !! i = go i m yieldk a x | n < 0 = return Nothing | n == 0 = return $ Just a | otherwise = go (n - 1) x - in foldStream defState (return Nothing) single yieldk m1 + in foldStream defState yieldk single (return Nothing) m1 {-# INLINE lookup #-} lookup :: (IsStream t, Monad m, Eq a) => a -> t m (a, b) -> m (Maybe b) @@ -605,7 +605,7 @@ lookup e m = go m | otherwise = return Nothing yieldk (a, b) x | a == e = return $ Just b | otherwise = go x - in foldStream defState (return Nothing) single yieldk m1 + in foldStream defState yieldk single (return Nothing) m1 {-# INLINE findM #-} findM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> m (Maybe a) @@ -618,7 +618,7 @@ findM p m = go m yieldk a x = do b <- p a if b then return $ Just a else go x - in foldStream defState (return Nothing) single yieldk m1 + in foldStream defState yieldk single (return Nothing) m1 {-# INLINE find #-} find :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m (Maybe a) @@ -628,13 +628,13 @@ find p = findM (return . p) findIndices :: IsStream t => (a -> Bool) -> t m a -> t m Int findIndices p = go 0 where - go offset m1 = mkStream $ \st stp sng yld -> + go offset m1 = mkStream $ \st yld sng stp -> let single a | p a = sng offset | otherwise = stp yieldk a x | p a = yld offset $ go (offset + 1) x - | otherwise = foldStream (adaptState st) stp sng yld $ + | otherwise = foldStream (adaptState st) yld sng stp $ go (offset + 1) x - in foldStream (adaptState st) stp single yieldk m1 + in foldStream (adaptState st) yieldk single stp m1 ------------------------------------------------------------------------------ -- Map and Fold @@ -649,7 +649,7 @@ mapM_ f m = go m let stop = return () single a = void (f a) yieldk a r = f a >> go r - in foldStream defState stop single yieldk m1 + in foldStream defState yieldk single stop m1 ------------------------------------------------------------------------------ -- Converting folds @@ -672,12 +672,12 @@ scanx :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b scanx step begin done m = cons (done begin) $ go m begin where - go m1 !acc = mkStream $ \st stp sng yld -> + go m1 !acc = mkStream $ \st yld sng stp -> let single a = sng (done $ step acc a) yieldk a r = let s = step acc a in yld (done s) (go r s) - in foldStream (adaptState st) stp single yieldk m1 + in foldStream (adaptState st) yieldk single stp m1 {-# INLINE scanl' #-} scanl' :: IsStream t => (b -> a -> b) -> b -> t m a -> t m b @@ -691,56 +691,55 @@ scanl' step begin = scanx step begin id filter :: IsStream t => (a -> Bool) -> t m a -> t m a filter p m = go m where - go m1 = mkStream $ \st stp sng yld -> + go m1 = mkStream $ \st yld sng stp -> let single a | p a = sng a | otherwise = stp yieldk a r | p a = yld a (go r) - | otherwise = foldStream st stp single yieldk r - in foldStream st stp single yieldk m1 + | otherwise = foldStream st yieldk single stp r + in foldStream st yieldk single stp m1 {-# INLINE take #-} take :: IsStream t => Int -> t m a -> t m a take n m = go n m where - go n1 m1 = mkStream $ \st stp sng yld -> + go n1 m1 = mkStream $ \st yld sng stp -> let yieldk a r = yld a (go (n1 - 1) r) in if n1 <= 0 then stp - else foldStream st stp sng yieldk m1 + else foldStream st yieldk sng stp m1 {-# INLINE takeWhile #-} takeWhile :: IsStream t => (a -> Bool) -> t m a -> t m a takeWhile p m = go m where - go m1 = mkStream $ \st stp sng yld -> + go m1 = mkStream $ \st yld sng stp -> let single a | p a = sng a | otherwise = stp yieldk a r | p a = yld a (go r) | otherwise = stp - in foldStream st stp single yieldk m1 + in foldStream st yieldk single stp m1 drop :: IsStream t => Int -> t m a -> t m a -drop n m = mkStream $ \st stp sng yld -> - foldStream st stp sng yld $ go n m +drop n m = go n m where - go n1 m1 = mkStream $ \st stp sng yld -> + go n1 m1 = mkStream $ \st yld sng stp -> let single _ = stp - yieldk _ r = foldStream st stp sng yld $ go (n1 - 1) r + yieldk _ r = foldStream st yld sng stp $ go (n1 - 1) r -- Somehow "<=" check performs better than a ">" in if n1 <= 0 - then foldStream st stp sng yld m1 - else foldStream st stp single yieldk m1 + then foldStream st yld sng stp m1 + else foldStream st yieldk single stp m1 {-# INLINE dropWhile #-} dropWhile :: IsStream t => (a -> Bool) -> t m a -> t m a dropWhile p m = go m where - go m1 = mkStream $ \st stp sng yld -> + go m1 = mkStream $ \st yld sng stp -> let single a | p a = stp | otherwise = sng a - yieldk a r | p a = foldStream st stp single yieldk r + yieldk a r | p a = foldStream st yieldk single stp r | otherwise = yld a r - in foldStream st stp single yieldk m1 + in foldStream st yieldk single stp m1 ------------------------------------------------------------------------------- -- Mapping @@ -752,10 +751,10 @@ dropWhile p m = go m mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b mapM f m = go m where - go m1 = mkStream $ \st stp sng yld -> + go m1 = mkStream $ \st yld sng stp -> let single a = f a >>= sng - yieldk a r = foldStreamShared st stp sng yld $ f a |: go r - in foldStream (adaptState st) stp single yieldk m1 + yieldk a r = foldStreamShared st yld sng stp $ f a |: go r + in foldStream (adaptState st) yieldk single stp m1 -- Be careful when modifying this, this uses a consM (|:) deliberately to allow -- other stream types to overload it. @@ -763,10 +762,10 @@ mapM f m = go m sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a sequence m = go m where - go m1 = mkStream $ \st stp sng yld -> + go m1 = mkStream $ \st yld sng stp -> let single ma = ma >>= sng - yieldk ma r = foldStreamShared st stp sng yld $ ma |: go r - in foldStream (adaptState st) stp single yieldk m1 + yieldk ma r = foldStreamShared st yld sng stp $ ma |: go r + in foldStream (adaptState st) yieldk single stp m1 ------------------------------------------------------------------------------- -- Inserting @@ -776,19 +775,19 @@ sequence m = go m intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a intersperseM a m = prependingStart m where - prependingStart m1 = mkStream $ \st stp sng yld -> - let yieldk i x = foldStreamShared st stp sng yld $ return i |: go x - in foldStream st stp sng yieldk m1 - go m2 = mkStream $ \st stp sng yld -> - let single i = foldStreamShared st stp sng yld $ a |: yield i - yieldk i x = foldStreamShared st stp sng yld $ a |: return i |: go x - in foldStream st stp single yieldk m2 + prependingStart m1 = mkStream $ \st yld sng stp -> + let yieldk i x = foldStreamShared st yld sng stp $ return i |: go x + in foldStream st yieldk sng stp m1 + go m2 = mkStream $ \st yld sng stp -> + let single i = foldStreamShared st yld sng stp $ a |: yield i + yieldk i x = foldStreamShared st yld sng stp $ a |: return i |: go x + in foldStream st yieldk single stp m2 {-# INLINE insertBy #-} insertBy :: IsStream t => (a -> a -> Ordering) -> a -> t m a -> t m a insertBy cmp x m = go m where - go m1 = mkStream $ \st _ _ yld -> + go m1 = mkStream $ \st yld _ _ -> let single a = case cmp x a of GT -> yld a (yield x) _ -> yld x (yield a) @@ -796,7 +795,7 @@ insertBy cmp x m = go m yieldk a r = case cmp x a of GT -> yld a (go r) _ -> yld x (a `cons` r) - in foldStream st stop single yieldk m1 + in foldStream st yieldk single stop m1 ------------------------------------------------------------------------------ -- Deleting @@ -806,12 +805,12 @@ insertBy cmp x m = go m deleteBy :: IsStream t => (a -> a -> Bool) -> a -> t m a -> t m a deleteBy eq x m = go m where - go m1 = mkStream $ \st stp sng yld -> + go m1 = mkStream $ \st yld sng stp -> let single a = if eq x a then stp else sng a yieldk a r = if eq x a - then foldStream st stp sng yld r + then foldStream st yld sng stp r else yld a (go r) - in foldStream st stp single yieldk m1 + in foldStream st yieldk single stp m1 ------------------------------------------------------------------------------- -- Map and Filter @@ -821,14 +820,14 @@ deleteBy eq x m = go m mapMaybe :: IsStream t => (a -> Maybe b) -> t m a -> t m b mapMaybe f m = go m where - go m1 = mkStream $ \st stp sng yld -> + go m1 = mkStream $ \st yld sng stp -> let single a = case f a of Just b -> sng b Nothing -> stp yieldk a r = case f a of Just b -> yld b $ go r - Nothing -> foldStream (adaptState st) stp single yieldk r - in foldStream (adaptState st) stp single yieldk m1 + Nothing -> foldStream (adaptState st) yieldk single stp r + in foldStream (adaptState st) yieldk single stp m1 ------------------------------------------------------------------------------ -- Serial Zipping @@ -841,14 +840,14 @@ mapMaybe f m = go m zipWith :: IsStream t => (a -> b -> c) -> t m a -> t m b -> t m c zipWith f = go where - go mx my = mkStream $ \st stp sng yld -> do + go mx my = mkStream $ \st yld sng stp -> do let merge a ra = let single2 b = sng (f a b) yield2 b rb = yld (f a b) (go ra rb) - in foldStream (adaptState st) stp single2 yield2 my + in foldStream (adaptState st) yield2 single2 stp my let single1 a = merge a nil yield1 = merge - foldStream (adaptState st) stp single1 yield1 mx + foldStream (adaptState st) yield1 single1 stp mx -- | Zip two streams serially using a monadic zipping function. -- @@ -856,15 +855,15 @@ zipWith f = go zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c zipWithM f m1 m2 = go m1 m2 where - go mx my = mkStream $ \st stp sng yld -> do + go mx my = mkStream $ \st yld sng stp -> do let merge a ra = - let runIt x = foldStream st stp sng yld x + let runIt x = foldStream st yld sng stp x single2 b = f a b >>= sng yield2 b rb = f a b >>= \x -> runIt (x `cons` go ra rb) - in foldStream (adaptState st) stp single2 yield2 my + in foldStream (adaptState st) yield2 single2 stp my let single1 a = merge a nil yield1 = merge - foldStream (adaptState st) stp single1 yield1 mx + foldStream (adaptState st) yield1 single1 stp mx ------------------------------------------------------------------------------ -- Merging @@ -876,9 +875,9 @@ mergeByM => (a -> a -> m Ordering) -> t m a -> t m a -> t m a mergeByM cmp = go where - go mx my = mkStream $ \st stp sng yld -> do + go mx my = mkStream $ \st yld sng stp -> do let mergeWithY a ra = - let stop2 = foldStream st stp sng yld mx + let stop2 = foldStream st yld sng stp mx single2 b = do r <- cmp a b case r of @@ -889,11 +888,11 @@ mergeByM cmp = go case r of GT -> yld b (go (a `cons` ra) rb) _ -> yld a (go ra (b `cons` rb)) - in foldStream st stop2 single2 yield2 my - let stopX = foldStream st stp sng yld my + in foldStream st yield2 single2 stop2 my + let stopX = foldStream st yld sng stp my singleX a = mergeWithY a nil yieldX = mergeWithY - foldStream st stopX singleX yieldX mx + foldStream st yieldX singleX stopX mx {-# INLINABLE mergeBy #-} mergeBy @@ -918,7 +917,7 @@ the m = do | otherwise = return Nothing yieldk a r | h == a = go h r | otherwise = return Nothing - in foldStream defState (return $ Just h) single yieldk m1 + in foldStream defState yieldk single (return $ Just h) m1 ------------------------------------------------------------------------------- -- Bind utility @@ -934,20 +933,20 @@ bindWith bindWith par m1 f = go m1 where go m = - mkStream $ \st stp sng yld -> - let foldShared = foldStreamShared st stp sng yld + mkStream $ \st yld sng stp -> + let foldShared = foldStreamShared st yld sng stp single a = foldShared $ unShare (f a) yieldk a r = foldShared $ unShare (f a) `par` go r - in foldStream (adaptState st) stp single yieldk m + in foldStream (adaptState st) yieldk single stp m ------------------------------------------------------------------------------ -- Alternative & MonadPlus ------------------------------------------------------------------------------ _alt :: Stream m a -> Stream m a -> Stream m a -_alt m1 m2 = mkStream $ \st stp sng yld -> - let stop = foldStream st stp sng yld m2 - in foldStream st stop sng yld m1 +_alt m1 m2 = mkStream $ \st yld sng stp -> + let stop = foldStream st yld sng stp m2 + in foldStream st yld sng stop m1 ------------------------------------------------------------------------------ -- MonadReader @@ -955,10 +954,10 @@ _alt m1 m2 = mkStream $ \st stp sng yld -> withLocal :: MonadReader r m => (r -> r) -> Stream m a -> Stream m a withLocal f m = - mkStream $ \st stp sng yld -> + mkStream $ \st yld sng stp -> let single = local f . sng yieldk a r = local f $ yld a (withLocal f r) - in foldStream st (local f stp) single yieldk m + in foldStream st yieldk single (local f stp) m ------------------------------------------------------------------------------ -- MonadError diff --git a/src/Streamly/Streams/StreamKType.hs b/src/Streamly/Streams/StreamKType.hs index 8246da163..83af583ee 100644 --- a/src/Streamly/Streams/StreamKType.hs +++ b/src/Streamly/Streams/StreamKType.hs @@ -90,9 +90,9 @@ import Streamly.SVar newtype Stream m a = MkStream (forall r. State Stream m a -- state - -> m r -- stop - -> (a -> m r) -- singleton -> (a -> Stream m a -> m r) -- yield + -> (a -> m r) -- singleton + -> m r -- stop -> m r ) @@ -182,21 +182,18 @@ adapt = fromStream . toStream -- Currently we always use "SVar Stream" and therefore a different State type -- parameterized by that stream. -- --- XXX change the order of arguments to match the standard fold function --- convention i.e. st yield single stop. --- -- | Build a stream from an 'SVar', a stop continuation, a singleton stream -- continuation and a yield continuation. mkStream:: IsStream t => (forall r. State Stream m a - -> m r - -> (a -> m r) -> (a -> t m a -> m r) + -> (a -> m r) + -> m r -> m r) -> t m a -mkStream k = fromStream $ MkStream $ \st stp sng yld -> +mkStream k = fromStream $ MkStream $ \st yld sng stp -> let yieldk a r = yld a (toStream r) - in k st stp sng yieldk + in k st yieldk sng stp -- | A terminal function that has no continuation to follow. type StopK m = forall r. m r -> m r @@ -212,7 +209,7 @@ _wrapM m = \k -> m >>= k -- | Make an empty stream from a stop function. fromStopK :: IsStream t => StopK m -> t m a -fromStopK k = mkStream $ \_ stp _ _ -> k stp +fromStopK k = mkStream $ \_ _ _ stp -> k stp -- | Make a singleton stream from a yield function. fromYieldK :: IsStream t => YieldK m a -> t m a @@ -220,7 +217,7 @@ fromYieldK k = mkStream $ \_ _ sng _ -> k sng -- | Add a yield function at the head of the stream. consK :: IsStream t => YieldK m a -> t m a -> t m a -consK k r = mkStream $ \_ _ _ yld -> k (\x -> yld x r) +consK k r = mkStream $ \_ yld _ _ -> k (\x -> yld x r) -- XXX Build a stream from a repeating callback function. @@ -235,15 +232,15 @@ consK k r = mkStream $ \_ _ _ yld -> k (\x -> yld x r) foldStreamShared :: IsStream t => State Stream m a - -> m r - -> (a -> m r) -> (a -> t m a -> m r) + -> (a -> m r) + -> m r -> t m a -> m r -foldStreamShared st stp sng yld m = +foldStreamShared st yld sng stp m = let yieldk a x = yld a (fromStream x) MkStream k = toStream m - in k st stp sng yieldk + in k st yieldk sng stp -- | Fold a stream by providing a State, stop continuation, a singleton -- continuation and a yield continuation. The stream will not use the SVar @@ -252,15 +249,15 @@ foldStreamShared st stp sng yld m = foldStream :: IsStream t => State Stream m a - -> m r - -> (a -> m r) -> (a -> t m a -> m r) + -> (a -> m r) + -> m r -> t m a -> m r -foldStream st stp sng yld m = +foldStream st yld sng stp m = let yieldk a x = yld a (fromStream x) MkStream k = toStream m - in k (adaptState st) stp sng yieldk + in k (adaptState st) yieldk sng stp -- Run the stream using a run function associated with the SVar that runs the -- streams with a captured snapshot of the monadic state. @@ -269,14 +266,14 @@ foldStreamSVar :: (IsStream t, MonadIO m) => SVar Stream m a -> State Stream m a -- state - -> m r -- stop - -> (a -> m r) -- singleton -> (a -> t m a -> m r) -- yield + -> (a -> m r) -- singleton + -> m r -- stop -> t m a -> m () -foldStreamSVar sv st stp sng yld m = +foldStreamSVar sv st yld sng stp m = let mrun = runInIO $ svarMrun sv - in void $ liftIO $ mrun $ foldStreamShared st stp sng yld m + in void $ liftIO $ mrun $ foldStreamShared st yld sng stp m ------------------------------------------------------------------------------- -- Instances @@ -284,7 +281,7 @@ foldStreamSVar sv st stp sng yld m = {-# INLINE consMSerial #-} consMSerial :: (IsStream t, Monad m) => m a -> t m a -> t m a -consMSerial m r = mkStream $ \_ _ _ yld -> m >>= \a -> yld a r +consMSerial m r = mkStream $ \_ yld _ _ -> m >>= \a -> yld a r ------------------------------------------------------------------------------- -- IsStream Stream @@ -317,11 +314,11 @@ instance IsStream Stream where serial :: IsStream t => t m a -> t m a -> t m a serial m1 m2 = go m1 where - go m = mkStream $ \st stp sng yld -> - let stop = foldStream st stp sng yld m2 + go m = mkStream $ \st yld sng stp -> + let stop = foldStream st yld sng stp m2 single a = yld a m2 yieldk a r = yld a (go r) - in foldStream st stop single yieldk m + in foldStream st yieldk single stop m instance Semigroup (Stream m a) where (<>) = serial @@ -339,7 +336,7 @@ instance Semigroup (Stream m a) where -- -- @since 0.1.0 nil :: IsStream t => t m a -nil = mkStream $ \_ stp _ _ -> stp +nil = mkStream $ \_ _ _ stp -> stp instance Monoid (Stream m a) where mempty = nil @@ -352,10 +349,10 @@ instance Monoid (Stream m a) where {-# INLINE map #-} map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b -map f m = mkStream $ \st stp sng yld -> +map f m = mkStream $ \st yld sng stp -> let single = sng . f yieldk a r = yld (f a) (map f r) - in foldStream (adaptState st) stp single yieldk m + in foldStream (adaptState st) yieldk single stp m instance Monad m => Functor (Stream m) where fmap = map