Use separate consume/produce functions in Scan

This commit is contained in:
Harendra Kumar 2024-02-09 00:19:10 +05:30
parent 9951b00120
commit 4f725b41b1
3 changed files with 249 additions and 123 deletions

View File

@ -139,7 +139,7 @@ module Streamly.Internal.Data.Fold.Combinators
-- ** Scanning Input
, scan
, scanMany
, runScan
-- , runScan
, toScan
, indexed
@ -538,6 +538,7 @@ scan = scanWith False
scanMany :: Monad m => Fold m a b -> Fold m b c -> Fold m a c
scanMany = scanWith True
{-
-- | Does not work correctly for scans which can emit YieldRep or SkipRep
-- constructors. This will get fixed when we add SkipRep to folds as well.
{-# INLINE runScan #-}
@ -576,6 +577,7 @@ runScan (Scan stepL initialL) (Fold stepR initialR extractR finalR) =
extract = extractR . snd
final = finalR . snd
-}
-- Note when we have a separate Scan type then we can remove extract from
-- Folds. Then folds can only be used for foldMany or many and not for
@ -588,8 +590,11 @@ runScan (Scan stepL initialL) (Fold stepR initialR extractR finalR) =
-- With "Continue s" and "Partial s b" instead of using "extract" we can do
-- that.
{-# ANN type ToScanState Fuse #-}
data ToScanState s = ToScanInit | ToScanGo s | ToScanStop
{-# ANN type ToScanConsume Fuse #-}
data ToScanConsume s x = ToScanInit | ToScanGo s
{-# ANN type ToScanProduce Fuse #-}
data ToScanProduce s x = ToScanFirst s x | ToScanStop
-- | ScanR does not support finalization yet. This does not finalize the fold
-- when the stream stops before the fold terminates. So cannot be used on folds
@ -600,24 +605,27 @@ data ToScanState s = ToScanInit | ToScanGo s | ToScanStop
--
{-# INLINE toScan #-}
toScan :: Monad m => Fold m a b -> Scan m a b
toScan (Fold fstep finitial fextract _) = Scan step ToScanInit
toScan (Fold fstep finitial fextract _) = Scan consume produce ToScanInit
where
step ToScanInit _ = do
-- XXX make the initial state Either type and start in produce mode
consume ToScanInit x = do
r <- finitial
return $ case r of
Partial s -> Scan.SkipRep (ToScanGo s)
Done b -> Scan.YieldRep ToScanStop b
Partial s -> Scan.SkipP (ToScanFirst s x)
Done b -> Scan.YieldP ToScanStop b
step (ToScanGo st) a = do
consume (ToScanGo st) a = do
r <- fstep st a
case r of
Partial s -> do
b <- fextract s
return $ Scan.Yield (ToScanGo s) b
Done b -> return $ Scan.Yield ToScanStop b
step ToScanStop _ = return Scan.Stop
return $ Scan.YieldC (ToScanGo s) b
Done b -> return $ Scan.YieldP ToScanStop b
produce (ToScanFirst st x) = consume (ToScanGo st) x
produce ToScanStop = return Scan.Stop
------------------------------------------------------------------------------
-- Filters

View File

@ -231,24 +231,33 @@ import Prelude hiding (map, mapM, filter)
-- of "Yield s b". Though "Yield s b" is sometimes better when using curried
-- "Yield s". "Yield b" sounds better because the verb applies to "b".
--
-- XXX We could reduce the number of constructors by using Consume | Produce
-- wrapper around the state. But when fusion does not occur, it may be better
-- yo use a flat structure rather than nested to avoid more allocations. In a
-- flat structure the pointer tag from the Step constructor itself can identiy
-- any of the 5 constructors.
--
{-# ANN type Step Fuse #-}
data Step s b =
Yield s b
| Skip s
data Step cs ps b =
YieldC cs b -- Partial
| SkipC cs -- Continue
| Stop
| YieldRep s b
| SkipRep s
| YieldP ps b -- Yield
| SkipP ps -- Skip
instance Functor (Step s) where
instance Functor (Step cs ps) where
{-# INLINE fmap #-}
fmap f (Yield s b) = Yield s (f b)
fmap f (YieldRep s b) = YieldRep s (f b)
fmap _ (Skip s) = Skip s
fmap _ (SkipRep s) = SkipRep s
fmap f (YieldC s b) = YieldC s (f b)
fmap f (YieldP s b) = YieldP s (f b)
fmap _ (SkipC s) = SkipC s
fmap _ (SkipP s) = SkipP s
fmap _ Stop = Stop
data Scan m a b =
forall s. Scan (s -> a -> m (Step s b)) s
forall cs ps. Scan
(cs -> a -> m (Step cs ps b))
(ps -> m (Step cs ps b))
cs
------------------------------------------------------------------------------
-- Functor: Mapping on the output
@ -261,21 +270,27 @@ data Scan m a b =
--
instance Functor m => Functor (Scan m a) where
{-# INLINE fmap #-}
fmap f (Scan step1 initial) = Scan step initial
fmap f (Scan consume produce cinitial) =
Scan consume1 produce1 cinitial
where
step s b = fmap (fmap f) (step1 s b)
consume1 s b = fmap (fmap f) (consume s b)
produce1 s = fmap (fmap f) (produce s)
-------------------------------------------------------------------------------
-- Category
-------------------------------------------------------------------------------
{-# ANN type ComposeState Fuse #-}
data ComposeState sL sR x bL =
ComposeInit sL sR
| ComposeGoRight sL sR bL
| ComposeGoRightRepLeft sL sR bL
{-# ANN type ComposeConsume Fuse #-}
data ComposeConsume csL psL csR =
ComposeConsume csL csR
{-# ANN type ComposeProduce Fuse #-}
data ComposeProduce csL psL csR psR =
ComposeProduceR csL psR
| ComposeProduceL psL csR
| ComposeProduceLR psL psR
-- | Connect two scans in series. The second scan is the input end, and the
-- first scan is the output end.
@ -287,43 +302,79 @@ data ComposeState sL sR x bL =
{-# INLINE compose #-}
compose :: Monad m => Scan m b c -> Scan m a b -> Scan m a c
compose
(Scan stepR initialR)
(Scan stepL initialL) = Scan step (ComposeInit initialL initialR)
(Scan consumeR produceR initialR)
(Scan consumeL produceL initialL) =
Scan consume produce (ComposeConsume initialL initialR)
where
{-# INLINE goRight #-}
goRight sL1 sR bL = do
rR <- stepR sR bL
{-# INLINE consumeLFeedR #-}
consumeLFeedR csL csR bL = do
rR <- consumeR csR bL
return
$ case rR of
Yield sR1 br -> Yield (ComposeInit sL1 sR1) br
Skip sR1 -> Skip (ComposeInit sL1 sR1)
YieldC csR1 br -> YieldC (ComposeConsume csL csR1) br
SkipC csR1 -> SkipC (ComposeConsume csL csR1)
Stop -> Stop
YieldRep sR1 br -> YieldRep (ComposeGoRight sL1 sR1 bL) br
SkipRep sR1 -> SkipRep (ComposeGoRight sL1 sR1 bL)
YieldP psR br -> YieldP (ComposeProduceR csL psR) br
SkipP psR -> SkipP (ComposeProduceR csL psR)
{-# INLINE goRightRepLeft #-}
goRightRepLeft sL1 sR bL = do
rR <- stepR sR bL
{-# INLINE produceLFeedR #-}
produceLFeedR psL csR bL = do
rR <- consumeR csR bL
return
$ case rR of
Yield sR1 br -> YieldRep (ComposeInit sL1 sR1) br
Skip sR1 -> Skip (ComposeInit sL1 sR1)
YieldC csR1 br -> YieldP (ComposeProduceL psL csR1) br
SkipC csR1 -> SkipP (ComposeProduceL psL csR1)
Stop -> Stop
YieldRep sR1 br -> YieldRep (ComposeGoRightRepLeft sL1 sR1 bL) br
SkipRep sR1 -> SkipRep (ComposeGoRightRepLeft sL1 sR1 bL)
YieldP psR br -> YieldP (ComposeProduceLR psL psR) br
SkipP psR -> SkipP (ComposeProduceLR psL psR)
step (ComposeInit sL sR) x = do
rL <- stepL sL x
consume (ComposeConsume csL csR) x = do
rL <- consumeL csL x
case rL of
Yield sL1 bL -> goRight sL1 sR bL
Skip sL1 -> return $ Skip (ComposeInit sL1 sR)
YieldC csL1 bL ->
-- XXX Use SkipC instead? Flat may be better for fusion.
consumeLFeedR csL1 csR bL
SkipC csL1 -> return $ SkipC (ComposeConsume csL1 csR)
Stop -> return Stop
YieldRep sL1 bL -> goRightRepLeft sL1 sR bL
SkipRep sL1 -> return $ SkipRep (ComposeInit sL1 sR)
step (ComposeGoRight sL sR bL) _ = goRight sL sR bL
step (ComposeGoRightRepLeft sL sR bL) _ = goRightRepLeft sL sR bL
YieldP psL bL ->
-- XXX Use SkipC instead?
produceLFeedR psL csR bL
SkipP psL -> return $ SkipP (ComposeProduceL psL csR)
produce (ComposeProduceL psL csR) = do
rL <- produceL psL
case rL of
YieldC csL bL ->
-- XXX Use SkipC instead?
consumeLFeedR csL csR bL
SkipC csL -> return $ SkipC (ComposeConsume csL csR)
Stop -> return Stop
YieldP psL1 bL ->
-- XXX Use SkipC instead?
produceLFeedR psL1 csR bL
SkipP psL1 -> return $ SkipP (ComposeProduceL psL1 csR)
produce (ComposeProduceR csL psR) = do
rR <- produceR psR
return
$ case rR of
YieldC csR1 br -> YieldC (ComposeConsume csL csR1) br
SkipC csR1 -> SkipC (ComposeConsume csL csR1)
Stop -> Stop
YieldP psR1 br -> YieldP (ComposeProduceR csL psR1) br
SkipP psR1 -> SkipP (ComposeProduceR csL psR1)
produce (ComposeProduceLR psL psR) = do
rR <- produceR psR
return
$ case rR of
YieldC csR1 br -> YieldP (ComposeProduceL psL csR1) br
SkipC csR1 -> SkipP (ComposeProduceL psL csR1)
Stop -> Stop
YieldP psR1 br -> YieldP (ComposeProduceLR psL psR1) br
SkipP psR1 -> SkipP (ComposeProduceLR psL psR1)
-- | A scan representing mapping of a monadic action.
--
@ -337,7 +388,7 @@ compose
--
{-# INLINE mapM #-}
mapM :: Monad m => (a -> m b) -> Scan m a b
mapM f = Scan (\() a -> f a <&> Yield ()) ()
mapM f = Scan (\() a -> f a <&> YieldC ()) undefined ()
-- | A scan representing mapping of a pure function.
--
@ -371,7 +422,7 @@ instance Monad m => Category (Scan m) where
-- | A filtering scan.
{-# INLINE filterM #-}
filterM :: Monad m => (a -> m Bool) -> Scan m a a
filterM f = Scan (\() a -> f a >>= g a) ()
filterM f = Scan (\() a -> f a >>= g a) undefined ()
where
@ -379,8 +430,8 @@ filterM f = Scan (\() a -> f a >>= g a) ()
g a b =
return
$ if b
then Yield () a
else Skip ()
then YieldC () a
else SkipC ()
-- | A filtering scan.
--
@ -391,12 +442,28 @@ filterM f = Scan (\() a -> f a >>= g a) ()
filter :: Monad m => (a -> Bool) -> Scan m a a
filter f = filterM (return Prelude.. f)
{-
{-# ANN type TeeMergeState Fuse #-}
data TeeMergeState sL sR
= TeeMergeLeft !sL !sR
| TeeMergeRight !sL !sR
| TeeMergeLeftOnly !sL
| TeeMergeRightOnly !sR
-}
{-# ANN type TeeMergeConsume Fuse #-}
data TeeMergeConsume csL csR
= TeeMergeConsume !csL !csR
| TeeMergeConsumeOnlyL !csL
| TeeMergeConsumeOnlyR !csR
{-# ANN type TeeMergeProduce Fuse #-}
data TeeMergeProduce csL csR psL psR x
= TeeMergeProduce !csL !csR x
| TeeMergeProduceL !psL !csR x
| TeeMergeProduceR !csL !psR
| TeeMergeProduceOnlyL !psL
| TeeMergeProduceOnlyR !psR
-- | Connect two scans in parallel. Distribute the input across two scans and
-- merge their outputs as soon as they become available. Note that a scan may
@ -407,46 +474,88 @@ data TeeMergeState sL sR
--
{-# INLINE teeMerge #-}
teeMerge :: Monad m => Scan m a b -> Scan m a b -> Scan m a b
teeMerge (Scan stepL initialL) (Scan stepR initialR) =
Scan step (TeeMergeLeft initialL initialR)
teeMerge (Scan consumeL produceL initialL) (Scan consumeR produceR initialR) =
Scan consume produce (TeeMergeConsume initialL initialR)
where
step (TeeMergeLeft sL sR) a = do
resL <- stepL sL a
return
$ case resL of
Yield s b -> YieldRep (TeeMergeRight s sR) b
Skip s -> SkipRep (TeeMergeRight s sR)
Stop -> SkipRep (TeeMergeRightOnly sR)
YieldRep s b -> YieldRep (TeeMergeLeft s sR) b
SkipRep s -> SkipRep (TeeMergeLeft s sR)
step (TeeMergeRight sL sR) a = do
res <- stepR sR a
return
$ case res of
Yield s b -> Yield (TeeMergeLeft sL s) b
Skip s -> Skip (TeeMergeLeft sL s)
Stop -> Skip (TeeMergeLeftOnly sL)
YieldRep s b -> YieldRep (TeeMergeRight sL s) b
SkipRep s -> SkipRep (TeeMergeRight sL s)
step (TeeMergeLeftOnly sL) a = do
resL <- stepL sL a
return
$ case resL of
Yield s b -> Yield (TeeMergeLeftOnly s) b
Skip s -> Skip (TeeMergeLeftOnly s)
Stop -> Stop
YieldRep s b -> YieldRep (TeeMergeLeftOnly s) b
SkipRep s -> SkipRep (TeeMergeLeftOnly s)
step (TeeMergeRightOnly sR) a = do
resR <- stepR sR a
{-# INLINE feedRightOnly #-}
feedRightOnly csR a = do
resR <- consumeR csR a
return
$ case resR of
Yield s b -> Yield (TeeMergeRightOnly s) b
Skip s -> Skip (TeeMergeRightOnly s)
YieldC cs b -> YieldC (TeeMergeConsumeOnlyR cs) b
SkipC cs -> SkipC (TeeMergeConsumeOnlyR cs)
Stop -> Stop
YieldRep s b -> YieldRep (TeeMergeRightOnly s) b
SkipRep s -> SkipRep (TeeMergeRightOnly s)
YieldP ps b -> YieldP (TeeMergeProduceOnlyR ps) b
SkipP ps -> SkipP (TeeMergeProduceOnlyR ps)
consume (TeeMergeConsume csL csR) a = do
resL <- consumeL csL a
case resL of
YieldC cs b -> return $ YieldP (TeeMergeProduce cs csR a) b
SkipC cs -> return $ SkipP (TeeMergeProduce cs csR a)
Stop ->
-- XXX Skip to a state instead?
feedRightOnly csR a
YieldP ps b -> return $ YieldP (TeeMergeProduceL ps csR a) b
SkipP ps -> return $ SkipP (TeeMergeProduceL ps csR a)
consume (TeeMergeConsumeOnlyL csL) a = do
resL <- consumeL csL a
return
$ case resL of
YieldC cs b -> YieldC (TeeMergeConsumeOnlyL cs) b
SkipC cs -> SkipC (TeeMergeConsumeOnlyL cs)
Stop -> Stop
YieldP ps b -> YieldP (TeeMergeProduceOnlyL ps) b
SkipP ps -> SkipP (TeeMergeProduceOnlyL ps)
consume (TeeMergeConsumeOnlyR csR) a = feedRightOnly csR a
produce (TeeMergeProduce csL csR a) = do
res <- consumeR csR a
return
$ case res of
YieldC cs b -> YieldC (TeeMergeConsume csL cs) b
SkipC cs -> SkipC (TeeMergeConsume csL cs)
Stop -> SkipC (TeeMergeConsumeOnlyL csL)
YieldP ps b -> YieldP (TeeMergeProduceR csL ps) b
SkipP ps -> SkipP (TeeMergeProduceR csL ps)
produce (TeeMergeProduceL psL csR a) = do
res <- produceL psL
case res of
YieldC cs b -> return $ YieldP (TeeMergeProduce cs csR a) b
SkipC cs -> return $ SkipP (TeeMergeProduce cs csR a)
Stop -> feedRightOnly csR a
YieldP ps b -> return $ YieldP (TeeMergeProduceL ps csR a) b
SkipP ps -> return $ SkipP (TeeMergeProduceL ps csR a)
produce (TeeMergeProduceR csL psR) = do
res <- produceR psR
return $ case res of
YieldC cs b -> YieldC (TeeMergeConsume csL cs) b
SkipC cs -> SkipC (TeeMergeConsume csL cs)
Stop -> SkipC (TeeMergeConsumeOnlyL csL)
YieldP ps b -> YieldP (TeeMergeProduceR csL ps) b
SkipP ps -> SkipP (TeeMergeProduceR csL ps)
produce (TeeMergeProduceOnlyL psL) = do
resL <- produceL psL
return
$ case resL of
YieldC cs b -> YieldC (TeeMergeConsumeOnlyL cs) b
SkipC cs -> SkipC (TeeMergeConsumeOnlyL cs)
Stop -> Stop
YieldP ps b -> YieldP (TeeMergeProduceOnlyL ps) b
SkipP ps -> SkipP (TeeMergeProduceOnlyL ps)
produce (TeeMergeProduceOnlyR psR) = do
resL <- produceR psR
return
$ case resL of
YieldC cs b -> YieldC (TeeMergeConsumeOnlyR cs) b
SkipC cs -> SkipC (TeeMergeConsumeOnlyR cs)
Stop -> Stop
YieldP ps b -> YieldP (TeeMergeProduceOnlyR ps) b
SkipP ps -> SkipP (TeeMergeProduceOnlyR ps)

View File

@ -218,6 +218,44 @@ transform (Pipe pstep1 pstep2 pstate) (Stream step state) =
Pipe.Yield b pst' -> return $ Yield b (pst', st)
Pipe.Continue pst' -> return $ Skip (pst', st)
{-# ANN type RunScanState Fuse #-}
data RunScanState st sc ps = ScanConsume st sc | ScanProduce st ps
{-# INLINE runScan #-}
runScan :: Monad m => Scan m a b -> Stream m a -> Stream m b
runScan (Scan consume produce initial) (Stream stream_step state) =
Stream step (ScanConsume state initial)
where
{-# INLINE goScan #-}
goScan st sc x = do
res <- consume sc x
return
$ case res of
Scan.YieldC s b -> Yield b (ScanConsume st s)
Scan.SkipC s -> Skip (ScanConsume st s)
Scan.Stop -> Stop
Scan.YieldP ps b -> Yield b (ScanProduce st ps)
Scan.SkipP ps -> Skip (ScanProduce st ps)
{-# INLINE_LATE step #-}
step gst (ScanConsume st sc) = do
r <- stream_step (adaptState gst) st
case r of
Yield x s -> goScan s sc x
Skip s -> return $ Skip (ScanConsume s sc)
Stop -> return Stop
step _ (ScanProduce st ps) = do
r <- produce ps
return
$ case r of
Scan.YieldC s b -> Yield b (ScanConsume st s)
Scan.SkipC s -> Skip (ScanConsume st s)
Scan.Stop -> Stop
Scan.YieldP ps1 b -> Yield b (ScanProduce st ps1)
Scan.SkipP ps1 -> Skip (ScanProduce st ps1)
------------------------------------------------------------------------------
-- Transformation Folds
------------------------------------------------------------------------------
@ -576,35 +614,6 @@ scanMany :: Monad m
=> FL.Fold m a b -> Stream m a -> Stream m b
scanMany = scanWith True
{-# ANN type RunScanState Fuse #-}
data RunScanState st sc x = RunScan st sc | RunScanRep st sc x
{-# INLINE runScan #-}
runScan :: Monad m => Scan m a b -> Stream m a -> Stream m b
runScan (Scan scan_step initial) (Stream stream_step state) =
Stream step (RunScan state initial)
where
{-# INLINE goScan #-}
goScan st sc x = do
res <- scan_step sc x
return
$ case res of
Scan.Yield s b -> Yield b (RunScan st s)
Scan.Skip s -> Skip (RunScan st s)
Scan.Stop -> Stop
Scan.YieldRep s b -> Yield b (RunScanRep st s x)
Scan.SkipRep s -> Skip (RunScanRep st s x)
step gst (RunScan st sc) = do
r <- stream_step (adaptState gst) st
case r of
Yield x s -> goScan s sc x
Skip s -> return $ Skip (RunScan s sc)
Stop -> return Stop
step _ (RunScanRep st sc x) = goScan st sc x
------------------------------------------------------------------------------
-- Scanning - Prescans
------------------------------------------------------------------------------