Add fromScan for folds and fromFold for pipes

And some small refactoring.
This commit is contained in:
Harendra Kumar 2024-02-11 00:32:12 +05:30
parent db1cee542a
commit 87acd611b0
6 changed files with 192 additions and 86 deletions

View File

@ -233,18 +233,18 @@ import Streamly.Internal.Data.MutArray.Type (MutArray(..))
import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe)
import Streamly.Internal.Data.Pipe.Type (Pipe (..))
import Streamly.Internal.Data.Scan (Scan (..))
import Streamly.Internal.Data.Stream.Type (Stream)
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..))
import Streamly.Internal.Data.Unbox (Unbox, sizeOf)
import Streamly.Internal.Data.Unfold.Type (Unfold(..))
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..))
import Streamly.Internal.Data.Stream.Type (Stream)
import qualified Prelude
import qualified Streamly.Internal.Data.MutArray.Type as MA
import qualified Streamly.Internal.Data.Array.Type as Array
import qualified Streamly.Internal.Data.Fold.Window as Fold
import qualified Streamly.Internal.Data.Pipe.Type as Pipe
import qualified Streamly.Internal.Data.Scan as Scan
import qualified Streamly.Internal.Data.Ring as Ring
import qualified Streamly.Internal.Data.Scan as Scan
import qualified Streamly.Internal.Data.Stream.Type as StreamD
import Prelude hiding
@ -474,6 +474,7 @@ pipe (Pipe consume produce pinitial) (Fold fstep finitial fextract ffinal) =
go acc (Pipe.SkipP ps1) = do
r <- produce ps1
go acc r
-- XXX a Stop in consumer means we dropped the input.
go acc Pipe.Stop = Done <$> fextract acc
extract (Tuple' _ fs) = fextract fs

View File

@ -28,6 +28,18 @@ import Fusion.Plugin.Types (Fuse(..))
-- terminate early whereas we use data constructors. It allows stream fusion in
-- contrast to the foldr/build fusion when composing with functions.
-- XXX Change the semantics of Done such that when we return Done, the input is
-- always unused. Then we can include the takeWhile fold as well under folds.
-- This will be a breaking change, so rename "Done" to "Stop" so that users are
-- forced to look at all places where it is used.
--
-- Perhaps we do not need to return the Step type in initial. Instead of
-- returning "Done" in initial we can wait for the next input or invocation of
-- "final". This should simplify the composition of initial considerably.
--
-- Also, rename Partial to Skip, to keep it consistent with Scans/Pipes/Streams.
-- Make Partial a pattern synonym to keep backward compatibility.
-- | Represents the result of the @step@ of a 'Fold'. 'Partial' returns an
-- intermediate state of the fold, the fold step can be called again with the
-- state or the driver can use @extract@ on the state to get the result out.

View File

@ -363,6 +363,7 @@ module Streamly.Internal.Data.Fold.Type
, fromPure
, fromEffect
, fromRefold
, fromScan
, drain
, toList
, toStreamK
@ -455,9 +456,11 @@ import Data.Either (fromLeft, fromRight, isLeft, isRight)
import Data.Functor.Identity (Identity(..))
import Fusion.Plugin.Types (Fuse(..))
import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe)
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..))
import Streamly.Internal.Data.Refold.Type (Refold(..))
import Streamly.Internal.Data.Scan (Scan(..))
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..))
import qualified Streamly.Internal.Data.Scan as Scan
import qualified Streamly.Internal.Data.StreamK.Type as K
import Prelude hiding (Foldable(..), concatMap, filter, map, take)
@ -616,6 +619,36 @@ foldlM1' step = fmap toMaybe $ foldlM' step1 (return Nothing')
step1 Nothing' a = return $ Just' a
step1 (Just' x) a = Just' <$> step x a
data FromScan s b = FromScanInit !s | FromScanGo !s !b
-- | This does not work correctly yet. We lose the last input.
--
{-# INLINE fromScan #-}
fromScan :: Monad m => Scan m a b -> Fold m a (Maybe b)
fromScan (Scan consume initial) =
Fold fstep (return $ Partial (FromScanInit initial)) fextract fextract
where
fstep (FromScanInit ss) a = do
r <- consume ss a
return $ case r of
Scan.Yield s b -> Partial (FromScanGo s b)
Scan.Skip s -> Partial (FromScanInit s)
-- XXX We have lost the input here.
-- XXX Need to change folds to always return Done on the next input
Scan.Stop -> Done Nothing
fstep (FromScanGo ss acc) a = do
r <- consume ss a
return $ case r of
Scan.Yield s b -> Partial (FromScanGo s b)
Scan.Skip s -> Partial (FromScanGo s acc)
-- XXX We have lost the input here.
Scan.Stop -> Done (Just acc)
fextract (FromScanInit _) = return Nothing
fextract (FromScanGo _ acc) = return (Just acc)
------------------------------------------------------------------------------
-- Right fold constructors
------------------------------------------------------------------------------

View File

@ -16,6 +16,7 @@ module Streamly.Internal.Data.Pipe.Type
, fromStream
, fromScan
, fromFold
, scanFold
-- * Primitive Pipes
, identity
@ -62,24 +63,13 @@ import Prelude hiding (filter, zipWith, map, mapM, id, unzip, null)
-- Pipes
------------------------------------------------------------------------------
-- A scan is a much simpler version of pipes. A scan always produces an output
-- on an input whereas a pipe does not necessarily produce an output on an
-- input, it might consume multiple inputs before producing an output. That way
-- it can implement filtering. Similarly, it can produce more than one output
-- on an single input.
--
-- Therefore when two pipes are composed in parallel formation, one may run
-- slower or faster than the other. If all of them are being fed from the same
-- source, we may have to buffer the input to match the speeds, if we want to
-- zip the outputs. In case of scans we do not have that problem.
-- XXX If we do not want to change Streams, we should use "Yield b s" instead
-- of "Yield s b". Though "Yield s b" is sometimes better when using curried
-- "Yield s". "Yield b" sounds better because the verb applies to "b".
--
-- XXX We could reduce the number of constructors by using Consume | Produce
-- Note: We could reduce the number of constructors by using Consume | Produce
-- wrapper around the state. But when fusion does not occur, it may be better
-- yo use a flat structure rather than nested to avoid more allocations. In a
-- to use a flat structure rather than nested to avoid more allocations. In a
-- flat structure the pointer tag from the Step constructor itself can identiy
-- any of the 5 constructors.
--
@ -87,7 +77,7 @@ import Prelude hiding (filter, zipWith, map, mapM, id, unzip, null)
data Step cs ps b =
YieldC cs b -- ^ Yield and consume
| SkipC cs -- ^ Skip and consume
| Stop
| Stop -- ^ when consuming, Stop means input remains unused
| YieldP ps b -- ^ Yield and produce
| SkipP ps -- ^ Skip and produce
@ -99,16 +89,12 @@ instance Functor (Step cs ps) where
fmap _ (SkipP s) = SkipP s
fmap _ Stop = Stop
-- A pipe uses a consume function and a produce function. It can switch from
-- consume/fold mode to a produce/source mode. The first step function is a
-- fold function while the second one is a stream generator function.
-- A pipe uses a consume function and a produce function. It can dynamically
-- switch from consume/fold mode to a produce/source mode.
--
-- We can upgrade a stream or a fold into a pipe. However, streams are more
-- efficient in generation and folds are more efficient in consumption.
--
-- For pure transformation we can have a 'Scan' type. A Scan would be more
-- efficient in zipping whereas pipes are useful for merging and zipping where
-- we know buffering can occur. A Scan type can be upgraded to a pipe.
-- We can upgrade a stream, fold or scan into a pipe. However, the simpler
-- types should be preferred because they can be more efficient and fuse
-- better.
--
-- XXX In general the starting state could either be for generation or for
-- consumption. Currently we are only starting with a consumption state.
@ -277,6 +263,7 @@ map f = mapM (return Prelude.. f)
identity :: Monad m => Pipe m a a
identity = map Prelude.id
-- | "." composes the pipes in series.
instance Monad m => Category (Pipe m) where
{-# INLINE id #-}
id = identity
@ -397,6 +384,7 @@ teeMerge (Pipe consumeL produceL initialL) (Pipe consumeR produceR initialR) =
YieldP ps b -> YieldP (TeeMergeProduceOnlyR ps) b
SkipP ps -> SkipP (TeeMergeProduceOnlyR ps)
-- | '<>' composes the pipes in parallel.
instance Monad m => Semigroup (Pipe m a b) where
{-# INLINE (<>) #-}
(<>) = teeMerge
@ -431,6 +419,8 @@ uncons (Deque snocList consList) =
h : t -> Just (h, Deque [] t)
_ -> Nothing
-- XXX This is old code retained for reference until rewritten.
-- | The composed pipe distributes the input to both the constituent pipes and
-- zips the output of the two using a supplied zipping function.
--
@ -612,42 +602,77 @@ filter f = filterM (return Prelude.. f)
-- With "Continue s" and "Partial s b" instead of using "extract" we can do
-- that.
{-# ANN type ToScanConsume Fuse #-}
data ToScanConsume s x = ToScanInit | ToScanGo s
{-# ANN type FromFoldConsume Fuse #-}
data FromFoldConsume s x = FoldConsumeInit | FoldConsumeGo s
{-# ANN type ToScanProduce Fuse #-}
data ToScanProduce s x = ToScanFirst s x | ToScanStop
{-# ANN type FromFoldProduce Fuse #-}
data FromFoldProduce s x = FoldProduceInit s x | FoldProduceStop
-- XXX This should be removed once we remove "extract" from folds.
-- | Pipes do not support finalization yet. This does not finalize the fold
-- when the stream stops before the fold terminates. So cannot be used on folds
-- that require finalization.
--
-- >>> Stream.toList $ Stream.pipe (Pipe.fromFold Fold.sum) $ Stream.fromList [1..5::Int]
-- >>> Stream.toList $ Stream.pipe (Pipe.scanFold Fold.sum) $ Stream.fromList [1..5::Int]
-- [1,3,6,10,15]
--
{-# INLINE fromFold #-}
fromFold :: Monad m => Fold m a b -> Pipe m a b
fromFold (Fold fstep finitial fextract _) = Pipe consume produce ToScanInit
{-# INLINE scanFold #-}
scanFold :: Monad m => Fold m a b -> Pipe m a b
scanFold (Fold fstep finitial fextract _) =
Pipe consume produce FoldConsumeInit
where
-- XXX make the initial state Either type and start in produce mode
consume ToScanInit x = do
consume FoldConsumeInit x = do
r <- finitial
return $ case r of
Fold.Partial s -> SkipP (ToScanFirst s x)
Fold.Done b -> YieldP ToScanStop b
Fold.Partial s -> SkipP (FoldProduceInit s x)
Fold.Done b -> YieldP FoldProduceStop b
consume (ToScanGo st) a = do
consume (FoldConsumeGo st) a = do
r <- fstep st a
case r of
Fold.Partial s -> do
b <- fextract s
return $ YieldC (ToScanGo s) b
Fold.Done b -> return $ YieldP ToScanStop b
return $ YieldC (FoldConsumeGo s) b
Fold.Done b -> return $ YieldP FoldProduceStop b
produce (ToScanFirst st x) = consume (ToScanGo st) x
produce ToScanStop = return Stop
produce (FoldProduceInit st x) = consume (FoldConsumeGo st) x
produce FoldProduceStop = return Stop
-- | Create a singleton pipe from a fold.
--
-- Pipes do not support finalization yet. This does not finalize the fold
-- when the stream stops before the fold terminates. So cannot be used on folds
-- that require such finalization.
--
-- >>> Stream.toList $ Stream.pipe (Pipe.fromFold Fold.sum) $ Stream.fromList [1..5::Int]
-- [15]
--
{-# INLINE fromFold #-}
fromFold :: Monad m => Fold m a b -> Pipe m a b
fromFold (Fold fstep finitial _ _) =
Pipe consume produce FoldConsumeInit
where
-- XXX make the initial state Either type and start in produce mode
consume FoldConsumeInit x = do
r <- finitial
return $ case r of
Fold.Partial s -> SkipP (FoldProduceInit s x)
Fold.Done b -> YieldP FoldProduceStop b
consume (FoldConsumeGo st) a = do
r <- fstep st a
return $ case r of
Fold.Partial s -> SkipC (FoldConsumeGo s)
Fold.Done b -> YieldP FoldProduceStop b
produce (FoldProduceInit st x) = consume (FoldConsumeGo st) x
produce FoldProduceStop = return Stop
-- | Produces the stream on consuming ().
--

View File

@ -5,6 +5,48 @@
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
-- A Scan is half the pipe:
--
-- A scan is a simpler version of pipes. A scan always consumes an input and
-- may or may not produce an output. It can produce at most one output on one
-- input. Whereas a pipe may produce output even without consuming anything or
-- it can produce multiple outputs on a single input. Scans are simpler
-- abstractions to think about and easier for the compiler to optimize.
--
-- Scans vs folds:
--
-- Folds and scans both are consumers with very little difference. Folds return
-- a singleton value whereas scans produce a stream. A fold can be implemented
-- by extracting the last value from the scan stream.
--
-- Since folds do not care about intermediate values, we do not need the Yield
-- constructor for folds, we can do with Skip and Stop. Because folds do not
-- have a requirement for intermediate values, they can be used for
-- implementing combinators like splitWith where intermediate values are not
-- meaningful and are expensive to compute. Folds provide an applicative and
-- monad behavior to consume the stream in parts and compose the folded
-- results. Scans provide Category like composition and stream zip applicative
-- behavior. The finalization function of a fold would return a single value
-- whereas for scan it may be a stream draining the scan buffer. For these
-- reasons, scans and folds are required as independent abstractions.
--
-- What kind of compositions are possible with scans?
--
-- Append: this is the easiest. The behavior is simple even in presence of
-- filtering (Skip) and termination (Stop). Skip translates to Skip, Stop
-- translates to Stop.
--
-- demux: we select one of n scans to run. Behaviour with Skip is straight
-- forward. Termination behavior has multiple options, stop when first one
-- stops, stop when the last one stops, or stop when a selected one stops.
--
-- zip: run all and zip the outputs. If one of them Skips we Skip the output.
-- If one of them stops we stop. It may be possible to collect the outputs as
-- Just/Nothing values.
--
-- Another option could be if a Scan terminates do we want to start it again or
-- not.
module Streamly.Internal.Data.Scan
(
@ -51,43 +93,24 @@ import Prelude hiding (filter, zipWith, map, mapM, id, unzip, null)
-- Scans
------------------------------------------------------------------------------
-- A Scan is half the pipe:
--
-- A scan is a simpler version of pipes. A scan always consumes and input and
-- may or may not produce an output. It can produce at most one output on one
-- input. Whereas a pipe may produce output even without consuming anything or
-- it can produce multiple outputs on a single input. Scans are simpler
-- abstractions to think about and easier for the compiler to optimize.
-- What kind of compositions are possible with scans?
--
-- Append: this is the easiest. The behavior is simple even in presence of
-- filtering (Skip) and termination (Stop). Skip translates to Skip, Stop
-- translates to Stop.
--
-- demux: we select one of n scans to run. Behaviour with Skip is straight
-- forward. Termination behavior has multiple options, stop when first one
-- stops, stop when the last one stops, or stop when a selected one stops.
--
-- zip: run all and zip the outputs. If one of them Skips we Skip the output.
-- If one of them stops we stop. It may be possible to collect the outputs as
-- Just/Nothing values.
--
-- Another option could be if a Scan terminates do we want to start it again or
-- not.
--
-- | The result of a scan step.
{-# ANN type Step Fuse #-}
data Step s b =
Yield s b -- ^ Yield and consume
| Skip s -- ^ Skip and consume
| Stop
Yield s b -- ^ Yield output and keep consuming
| Skip s -- ^ No output, keep consuming
| Stop -- ^ Stop consuming, last input is unsed
-- | 'fmap' maps a function on the step output.
instance Functor (Step s) where
{-# INLINE_NORMAL fmap #-}
fmap f (Yield s b) = Yield s (f b)
fmap _ (Skip s) = Skip s
fmap _ Stop = Stop
-- XXX A scan produces an output only on an input. The scan may have buffered
-- data which may have to be drained if the driver has no more input to supply.
-- So we need a finalizer which produces a (possibly empty) stream.
-- | Represents a stateful transformation over an input stream of values of
-- type @a@ to outputs of type @b@ in 'Monad' @m@.
--
@ -119,8 +142,8 @@ instance Functor m => Functor (Scan m a) where
-- Category
-------------------------------------------------------------------------------
-- | Connect two scans in series. The second scan is the input end, and the
-- first scan is the output end.
-- | Connect two scans in series. Attach the first scan on the output of the
-- second scan.
--
-- >>> import Control.Category
-- >>> Stream.toList $ Stream.runScan (Scan.map (+1) >>> Scan.map (+1)) $ Stream.fromList [1..5::Int]
@ -191,14 +214,18 @@ instance Monad m => Category (Scan m) where
{-# INLINE (.) #-}
(.) = compose
-------------------------------------------------------------------------------
-- Applicative Zip
-------------------------------------------------------------------------------
{-# ANN type TeeWith Fuse #-}
data TeeWith sL sR = TeeWith !sL !sR
-- XXX zipWith?
-- | Connect two scans in parallel. Distribute the input across two scans and
-- merge their outputs as soon as they become available. Note that a scan may
-- not generate output on each input, it might filter it.
-- zip their outputs. If the scan filters the output, 'Nothing' is emitted
-- otherwise 'Just' is emitted. The scan stops if any of the scans stop.
--
-- >>> Stream.toList $ Stream.runScan (Scan.teeWithMay (,) Scan.identity (Scan.map (\x -> x * x))) $ Stream.fromList [1..5::Int]
-- [(Just 1,Just 1),(Just 2,Just 4),(Just 3,Just 9),(Just 4,Just 16),(Just 5,Just 25)]
@ -211,7 +238,6 @@ teeWithMay f (Scan stepL initialL) (Scan stepR initialR) =
where
-- XXX Use strict tuple?
step (TeeWith sL sR) a = do
resL <- stepL sL a
resR <- stepR sR a
@ -233,10 +259,12 @@ teeWithMay f (Scan stepL initialL) (Scan stepR initialR) =
Stop -> Stop
Stop -> Stop
-- | Produces an output only when both the scans produce an output.
-- | Produces an output only when both the scans produce an output. If any of
-- the scans skips the output then the composed scan also skips. Stops when any
-- of the scans stop.
--
-- >>> Stream.toList $ Stream.runScan (Scan.teeWith (,) Scan.identity (Scan.map (\x -> x * x))) $ Stream.fromList [1..5::Int]
-- [Just (1,1),Just (2,4),Just (3,9),Just (4,16),Just (5,25)]
-- [(1,1),(2,4),(3,9),(4,16),(5,25)]
--
{-# INLINE_NORMAL teeWith #-}
teeWith :: Monad m =>
@ -246,10 +274,22 @@ teeWith f s1 s2 =
$ compose (filter isJust)
$ teeWithMay (\b c -> f <$> b <*> c) s1 s2
-- | Zips the outputs only when both scans produce outputs, discards otherwise.
instance Monad m => Applicative (Scan m a) where
{-# INLINE pure #-}
pure b = Scan (\_ _ -> pure $ Yield () b) ()
(<*>) = teeWith id
-------------------------------------------------------------------------------
-- Arrow
-------------------------------------------------------------------------------
-- | Use the first scan for the first element of the tuple and second scan for
-- the second. Zip the outputs. Emits 'Nothing' if no output is generated by
-- the scan, otherwise emits 'Just'. Stops as soon as any one of the scans
-- stop.
--
{-# INLINE_NORMAL unzipMay #-}
unzipMay :: Monad m =>
Scan m a x -> Scan m b y -> Scan m (a, b) (Maybe x, Maybe y)
@ -279,7 +319,8 @@ unzipMay (Scan stepL initialL) (Scan stepR initialR) =
Stop -> Stop
Stop -> Stop
-- | Produces an output only when both the scans produce an output.
-- | Like 'unzipMay' but produces an output only when both the scans produce an
-- output. Other outputs are filtered out.
{-# INLINE_NORMAL unzip #-}
unzip :: Monad m => Scan m a x -> Scan m b y -> Scan m (a, b) (x, y)
unzip s1 s2 = fmap (fromJust Prelude.. f) $ unzipMay s1 s2
@ -294,12 +335,6 @@ unzip s1 s2 = fmap (fromJust Prelude.. f) $ unzipMay s1 s2
Nothing -> Nothing
Nothing -> Nothing
instance Monad m => Applicative (Scan m a) where
{-# INLINE pure #-}
pure b = Scan (\_ _ -> pure $ Yield () b) ()
(<*>) = teeWith id
instance Monad m => Arrow (Scan m) where
{-# INLINE arr #-}
arr = map

View File

@ -33,8 +33,8 @@ module Streamly.Internal.Data.Stream.Transform
, postscan
, scan
, scanMany
, pipe
, runScan
, pipe
-- * Splitting
, splitOn