Add a Scan module as simplified pipe

This commit is contained in:
Harendra Kumar 2024-02-10 09:11:31 +05:30
parent e73aa914bb
commit fe2dba682c
7 changed files with 485 additions and 8 deletions

View File

@ -68,6 +68,9 @@ module Stream.Common
, transformComposeMapM , transformComposeMapM
, transformTeeMapM , transformTeeMapM
-- , transformZipMapM -- , transformZipMapM
, scanMapM
, scanComposeMapM
, scanTeeMapM
) )
where where
@ -81,6 +84,7 @@ import System.Random (randomRIO)
import qualified Streamly.Internal.Data.Fold as Fold import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Pipe as Pipe import qualified Streamly.Internal.Data.Pipe as Pipe
import qualified Streamly.Internal.Data.Scan as Scan
#ifdef USE_PRELUDE #ifdef USE_PRELUDE
import Streamly.Prelude (foldl', scanl') import Streamly.Prelude (foldl', scanl')
@ -479,6 +483,14 @@ transformMapM ::
-> m () -> m ()
transformMapM n = composeN n $ Stream.pipe (Pipe.mapM return) transformMapM n = composeN n $ Stream.pipe (Pipe.mapM return)
{-# INLINE scanMapM #-}
scanMapM ::
(Monad m)
=> Int
-> Stream m Int
-> m ()
scanMapM n = composeN n $ Stream.runScan (Scan.mapM return)
{-# INLINE transformComposeMapM #-} {-# INLINE transformComposeMapM #-}
transformComposeMapM :: transformComposeMapM ::
(Monad m) (Monad m)
@ -491,6 +503,18 @@ transformComposeMapM n =
(Pipe.mapM (\x -> return (x + 1)) `Pipe.compose` (Pipe.mapM (\x -> return (x + 1)) `Pipe.compose`
Pipe.mapM (\x -> return (x + 2))) Pipe.mapM (\x -> return (x + 2)))
{-# INLINE scanComposeMapM #-}
scanComposeMapM ::
(Monad m)
=> Int
-> Stream m Int
-> m ()
scanComposeMapM n =
composeN n $
Stream.runScan
(Scan.mapM (\x -> return (x + 1)) `Scan.compose`
Scan.mapM (\x -> return (x + 2)))
{-# INLINE transformTeeMapM #-} {-# INLINE transformTeeMapM #-}
transformTeeMapM :: transformTeeMapM ::
(Monad m) (Monad m)
@ -503,6 +527,18 @@ transformTeeMapM n =
(Pipe.mapM (\x -> return (x + 1)) `Pipe.teeMerge` (Pipe.mapM (\x -> return (x + 1)) `Pipe.teeMerge`
Pipe.mapM (\x -> return (x + 2))) Pipe.mapM (\x -> return (x + 2)))
{-# INLINE scanTeeMapM #-}
scanTeeMapM ::
(Monad m)
=> Int
-> Stream m Int
-> m ()
scanTeeMapM n =
composeN n $
Stream.runScan
(Scan.teeWith (+) (Scan.mapM (\x -> return (x + 1)))
(Scan.mapM (\x -> return (x + 2))))
{- {-
{-# INLINE transformZipMapM #-} {-# INLINE transformZipMapM #-}
transformZipMapM :: transformZipMapM ::

View File

@ -565,6 +565,28 @@ o_1_space_pipesX4 value =
] ]
] ]
-------------------------------------------------------------------------------
-- Scans
-------------------------------------------------------------------------------
o_1_space_scans :: Int -> [Benchmark]
o_1_space_scans value =
[ bgroup "scans"
[ benchIOSink value "mapM" (scanMapM 1)
, benchIOSink value "compose" (scanComposeMapM 1)
, benchIOSink value "tee" (scanTeeMapM 1)
]
]
o_1_space_scansX4 :: Int -> [Benchmark]
o_1_space_scansX4 value =
[ bgroup "scansX4"
[ benchIOSink value "mapM" (scanMapM 4)
, benchIOSink value "compose" (scanComposeMapM 4)
, benchIOSink value "tee" (scanTeeMapM 4)
]
]
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
-- Main -- Main
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
@ -582,6 +604,10 @@ benchmarks moduleName size =
-- pipes -- pipes
, o_1_space_pipes size , o_1_space_pipes size
, o_1_space_pipesX4 size , o_1_space_pipesX4 size
-- scans
, o_1_space_scans size
, o_1_space_scansX4 size
] ]
, bgroup (o_n_stack_prefix moduleName) (o_n_stack_iterated size) , bgroup (o_n_stack_prefix moduleName) (o_n_stack_iterated size)
, bgroup (o_n_heap_prefix moduleName) (o_n_heap_buffering size) , bgroup (o_n_heap_prefix moduleName) (o_n_heap_buffering size)

View File

@ -6,18 +6,27 @@
-- Stability : experimental -- Stability : experimental
-- Portability : GHC -- Portability : GHC
-- --
-- There are three fundamental types in streamly. They are streams -- There are three fundamental types that make up a stream pipeline:
-- ("Streamly.Data.Stream"), pipes ("Streamly.Internal.Data.Pipe") and folds ("Streamly.Data.Fold"). --
-- * Stream: sources
-- * Scan: transformations
-- * Fold: sinks
--
-- Streams are sources or producers of values, multiple sources can be merged -- Streams are sources or producers of values, multiple sources can be merged
-- into a single source but a source cannot be split into multiple stream -- into a single source but a source cannot be split into multiple stream
-- sources. Folds are sinks or consumers, a stream can be split and -- sources. Folds are sinks or consumers, a stream can be split and
-- distributed to multiple folds but the results cannot be merged back into a -- distributed to multiple folds but the results cannot be merged back into a
-- stream source again. Pipes are transformations, a stream source can be split -- stream source again. Scans are simple one-to-one transformations with
-- and distributed to multiple pipes each pipe can apply its own transform on -- filtering. One element cannot be transformed to multiple elements.
-- the stream and the results can be merged back into a single pipe. Pipes can --
-- be attached to a source to produce a source or they can be attached to a -- The Pipe type is a super type of all the above, it is the most complex type.
-- fold to produce a fold, or multiple pipes can be merged or zipped into a -- All of these can be represented by a pipe. A pipe can act as a source or a
-- single pipe. -- sink or a transformation, dynamically. A stream source can be split and
-- distributed to multiple pipes each pipe can apply its own transform on the
-- stream and the results can be merged back into a single pipe. Pipes can be
-- attached to a source to produce a source or they can be attached to a fold
-- to produce a fold, or multiple pipes can be merged or zipped into a single
-- pipe.
-- --
-- > import qualified Streamly.Internal.Data.Pipe as Pipe -- > import qualified Streamly.Internal.Data.Pipe as Pipe

View File

@ -13,6 +13,8 @@ module Streamly.Internal.Data.Pipe.Type
, Pipe (..) , Pipe (..)
-- * From folds -- * From folds
, fromStream
, fromScan
, fromFold , fromFold
-- * Primitive Pipes -- * Primitive Pipes
@ -35,10 +37,15 @@ import Control.Category (Category(..))
import Data.Functor ((<&>)) import Data.Functor ((<&>))
import Fusion.Plugin.Types (Fuse(..)) import Fusion.Plugin.Types (Fuse(..))
import Streamly.Internal.Data.Fold.Type (Fold(..)) import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Scan (Scan(..))
import Streamly.Internal.Data.Stream.Type (Stream(..))
-- import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..)) -- import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..))
import Streamly.Internal.Data.SVar.Type (defState)
import qualified Prelude import qualified Prelude
import qualified Streamly.Internal.Data.Fold.Type as Fold import qualified Streamly.Internal.Data.Fold.Type as Fold
import qualified Streamly.Internal.Data.Scan as Scan
import qualified Streamly.Internal.Data.Stream.Type as Stream
import Prelude hiding (filter, zipWith, map, mapM, id, unzip, null) import Prelude hiding (filter, zipWith, map, mapM, id, unzip, null)
@ -641,3 +648,34 @@ fromFold (Fold fstep finitial fextract _) = Pipe consume produce ToScanInit
produce (ToScanFirst st x) = consume (ToScanGo st) x produce (ToScanFirst st x) = consume (ToScanGo st) x
produce ToScanStop = return Stop produce ToScanStop = return Stop
-- | Produces the stream on consuming ().
--
{-# INLINE fromStream #-}
fromStream :: Monad m => Stream m a -> Pipe m () a
fromStream (Stream step state) = Pipe consume produce ()
where
-- XXX make the initial state Either type and start in produce mode
consume () () = return $ SkipP state
produce st = do
r <- step defState st
return $ case r of
Stream.Yield b s -> YieldP s b
Stream.Skip s -> SkipP s
Stream.Stop -> Stop
{-# INLINE fromScan #-}
fromScan :: Monad m => Scan m a b -> Pipe m a b
fromScan (Scan step initial) = Pipe consume undefined initial
where
consume st a = do
r <- step st a
return $ case r of
Scan.Yield s b -> YieldC s b
Scan.Skip s -> SkipC s
Scan.Stop -> Stop

View File

@ -0,0 +1,338 @@
-- |
-- Module : Streamly.Internal.Data.Scan
-- Copyright : (c) 2019 Composewell Technologies
-- License : BSD3
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
module Streamly.Internal.Data.Scan
(
-- * Type
Step (..)
, Scan (..)
-- * Primitive Scans
, identity
, map -- function?
, mapM -- functionM?
, filter
, filterM
-- * Combinators
, compose
, teeWithMay
, teeWith -- zipWith -- teeZip
)
where
#include "inline.hs"
import Control.Arrow (Arrow(..))
import Control.Category (Category(..))
import Data.Functor ((<&>))
import Data.Maybe (isJust, fromJust)
import Fusion.Plugin.Types (Fuse(..))
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..))
import qualified Prelude
import Prelude hiding (filter, zipWith, map, mapM, id, unzip, null)
-- $setup
-- >>> :m
-- >>> :set -XFlexibleContexts
-- >>> import Control.Category
--
-- >>> import qualified Streamly.Internal.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Scan as Scan
-- >>> import qualified Streamly.Internal.Data.Stream as Stream
------------------------------------------------------------------------------
-- Scans
------------------------------------------------------------------------------
-- A Scan is half the pipe:
--
-- A scan is a simpler version of pipes. A scan always consumes and input and
-- may or may not produce an output. It can produce at most one output on one
-- input. Whereas a pipe may produce output even without consuming anything or
-- it can produce multiple outputs on a single input. Scans are simpler
-- abstractions to think about and easier for the compiler to optimize.
-- What kind of compositions are possible with scans?
--
-- Append: this is the easiest. The behavior is simple even in presence of
-- filtering (Skip) and termination (Stop). Skip translates to Skip, Stop
-- translates to Stop.
--
-- demux: we select one of n scans to run. Behaviour with Skip is straight
-- forward. Termination behavior has multiple options, stop when first one
-- stops, stop when the last one stops, or stop when a selected one stops.
--
-- zip: run all and zip the outputs. If one of them Skips we Skip the output.
-- If one of them stops we stop. It may be possible to collect the outputs as
-- Just/Nothing values.
--
-- Another option could be if a Scan terminates do we want to start it again or
-- not.
--
{-# ANN type Step Fuse #-}
data Step s b =
Yield s b -- ^ Yield and consume
| Skip s -- ^ Skip and consume
| Stop
instance Functor (Step s) where
{-# INLINE_NORMAL fmap #-}
fmap f (Yield s b) = Yield s (f b)
fmap _ (Skip s) = Skip s
fmap _ Stop = Stop
-- | Represents a stateful transformation over an input stream of values of
-- type @a@ to outputs of type @b@ in 'Monad' @m@.
--
-- The constructor is @Scan consume initial@.
data Scan m a b =
forall s. Scan
(s -> a -> m (Step s b))
s
------------------------------------------------------------------------------
-- Functor: Mapping on the output
------------------------------------------------------------------------------
-- | 'fmap' maps a pure function on a scan output.
--
-- >>> Stream.toList $ Stream.runScan (fmap (+1) Scan.identity) $ Stream.fromList [1..5::Int]
-- [2,3,4,5,6]
--
instance Functor m => Functor (Scan m a) where
{-# INLINE_NORMAL fmap #-}
fmap f (Scan consume initial) = Scan consume1 initial
where
{-# INLINE_LATE consume1 #-}
consume1 s b = fmap (fmap f) (consume s b)
-------------------------------------------------------------------------------
-- Category
-------------------------------------------------------------------------------
-- | Connect two scans in series. The second scan is the input end, and the
-- first scan is the output end.
--
-- >>> import Control.Category
-- >>> Stream.toList $ Stream.runScan (Scan.map (+1) >>> Scan.map (+1)) $ Stream.fromList [1..5::Int]
-- [3,4,5,6,7]
--
{-# INLINE_NORMAL compose #-}
compose :: Monad m => Scan m b c -> Scan m a b -> Scan m a c
compose
(Scan stepR initialR)
(Scan stepL initialL) = Scan step (initialL, initialR)
where
-- XXX Use strict tuple?
step (sL, sR) x = do
rL <- stepL sL x
case rL of
Yield sL1 bL -> do
rR <- stepR sR bL
return
$ case rR of
Yield sR1 br -> Yield (sL1, sR1) br
Skip sR1 -> Skip (sL1, sR1)
Stop -> Stop
Skip sL1 -> return $ Skip (sL1, sR)
Stop -> return Stop
-- | A scan representing mapping of a monadic action.
--
-- >>> Stream.toList $ Stream.runScan (Scan.mapM print) $ Stream.fromList [1..5::Int]
-- 1
-- 2
-- 3
-- 4
-- 5
-- [(),(),(),(),()]
--
{-# INLINE mapM #-}
mapM :: Monad m => (a -> m b) -> Scan m a b
mapM f = Scan (\() a -> f a <&> Yield ()) ()
-- | A scan representing mapping of a pure function.
--
-- >>> Stream.toList $ Stream.runScan (Scan.map (+1)) $ Stream.fromList [1..5::Int]
-- [2,3,4,5,6]
--
{-# INLINE map #-}
map :: Monad m => (a -> b) -> Scan m a b
map f = mapM (return Prelude.. f)
{- HLINT ignore "Redundant map" -}
-- | An identity scan producing the same output as input.
--
-- >>> identity = Scan.map Prelude.id
--
-- >>> Stream.toList $ Stream.runScan (Scan.identity) $ Stream.fromList [1..5::Int]
-- [1,2,3,4,5]
--
{-# INLINE identity #-}
identity :: Monad m => Scan m a a
identity = map Prelude.id
instance Monad m => Category (Scan m) where
{-# INLINE id #-}
id = identity
{-# INLINE (.) #-}
(.) = compose
{-# ANN type TeeWith Fuse #-}
data TeeWith sL sR = TeeWith !sL !sR
-- XXX zipWith?
-- | Connect two scans in parallel. Distribute the input across two scans and
-- merge their outputs as soon as they become available. Note that a scan may
-- not generate output on each input, it might filter it.
--
-- >>> Stream.toList $ Stream.runScan (Scan.teeWithMay (,) Scan.identity (Scan.map (\x -> x * x))) $ Stream.fromList [1..5::Int]
-- [(Just 1,Just 1),(Just 2,Just 4),(Just 3,Just 9),(Just 4,Just 16),(Just 5,Just 25)]
--
{-# INLINE_NORMAL teeWithMay #-}
teeWithMay :: Monad m =>
(Maybe b -> Maybe c -> d) -> Scan m a b -> Scan m a c -> Scan m a d
teeWithMay f (Scan stepL initialL) (Scan stepR initialR) =
Scan step (TeeWith initialL initialR)
where
-- XXX Use strict tuple?
step (TeeWith sL sR) a = do
resL <- stepL sL a
resR <- stepR sR a
return
$ case resL of
Yield sL1 bL ->
case resR of
Yield sR1 bR ->
Yield (TeeWith sL1 sR1) (f (Just bL) (Just bR))
Skip sR1 ->
Yield (TeeWith sL1 sR1) (f (Just bL) Nothing)
Stop -> Stop
Skip sL1 ->
case resR of
Yield sR1 bR ->
Yield (TeeWith sL1 sR1) (f Nothing (Just bR))
Skip sR1 ->
Yield (TeeWith sL1 sR1) (f Nothing Nothing)
Stop -> Stop
Stop -> Stop
-- | Produces an output only when both the scans produce an output.
--
-- >>> Stream.toList $ Stream.runScan (Scan.teeWith (,) Scan.identity (Scan.map (\x -> x * x))) $ Stream.fromList [1..5::Int]
-- [Just (1,1),Just (2,4),Just (3,9),Just (4,16),Just (5,25)]
--
{-# INLINE_NORMAL teeWith #-}
teeWith :: Monad m =>
(b -> c -> d) -> Scan m a b -> Scan m a c -> Scan m a d
teeWith f s1 s2 =
fmap fromJust
$ compose (filter isJust)
$ teeWithMay (\b c -> f <$> b <*> c) s1 s2
-------------------------------------------------------------------------------
-- Arrow
-------------------------------------------------------------------------------
{-# INLINE_NORMAL unzipMay #-}
unzipMay :: Monad m =>
Scan m a x -> Scan m b y -> Scan m (a, b) (Maybe x, Maybe y)
unzipMay (Scan stepL initialL) (Scan stepR initialR) =
Scan step (Tuple' initialL initialR)
where
step (Tuple' sL sR) (a, b) = do
resL <- stepL sL a
resR <- stepR sR b
return
$ case resL of
Yield sL1 bL ->
case resR of
Yield sR1 bR ->
Yield (Tuple' sL1 sR1) (Just bL, Just bR)
Skip sR1 ->
Yield (Tuple' sL1 sR1) (Just bL, Nothing)
Stop -> Stop
Skip sL1 ->
case resR of
Yield sR1 bR ->
Yield (Tuple' sL1 sR1) (Nothing, Just bR)
Skip sR1 ->
Yield (Tuple' sL1 sR1) (Nothing, Nothing)
Stop -> Stop
Stop -> Stop
-- | Produces an output only when both the scans produce an output.
{-# INLINE_NORMAL unzip #-}
unzip :: Monad m => Scan m a x -> Scan m b y -> Scan m (a, b) (x, y)
unzip s1 s2 = fmap (fromJust Prelude.. f) $ unzipMay s1 s2
where
f (mx, my) =
case mx of
Just x ->
case my of
Just y -> Just (x, y)
Nothing -> Nothing
Nothing -> Nothing
instance Monad m => Applicative (Scan m a) where
{-# INLINE pure #-}
pure b = Scan (\_ _ -> pure $ Yield () b) ()
(<*>) = teeWith id
instance Monad m => Arrow (Scan m) where
{-# INLINE arr #-}
arr = map
{-# INLINE (***) #-}
(***) = unzip
{-# INLINE (&&&) #-}
(&&&) = teeWith (,)
-------------------------------------------------------------------------------
-- Primitive scans
-------------------------------------------------------------------------------
-- | A filtering scan using a monadic predicate.
{-# INLINE filterM #-}
filterM :: Monad m => (a -> m Bool) -> Scan m a a
filterM f = Scan (\() a -> f a >>= g a) ()
where
{-# INLINE g #-}
g a b =
return
$ if b
then Yield () a
else Skip ()
-- | A filtering scan using a pure predicate.
--
-- >>> Stream.toList $ Stream.runScan (Scan.filter odd) $ Stream.fromList [1..5::Int]
-- [1,3,5]
--
{-# INLINE filter #-}
filter :: Monad m => (a -> Bool) -> Scan m a a
filter f = filterM (return Prelude.. f)

View File

@ -34,6 +34,7 @@ module Streamly.Internal.Data.Stream.Transform
, scan , scan
, scanMany , scanMany
, pipe , pipe
, runScan
-- * Splitting -- * Splitting
, splitOn , splitOn
@ -157,6 +158,7 @@ import Fusion.Plugin.Types (Fuse(..))
import Streamly.Internal.Data.Fold.Type (Fold(..)) import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Pipe.Type (Pipe(..)) import Streamly.Internal.Data.Pipe.Type (Pipe(..))
import Streamly.Internal.Data.Scan (Scan(..))
import Streamly.Internal.Data.SVar.Type (adaptState) import Streamly.Internal.Data.SVar.Type (adaptState)
import Streamly.Internal.Data.Time.Units (AbsTime, RelTime64) import Streamly.Internal.Data.Time.Units (AbsTime, RelTime64)
import Streamly.Internal.Data.Unbox (Unbox) import Streamly.Internal.Data.Unbox (Unbox)
@ -166,6 +168,7 @@ import Streamly.Internal.System.IO (defaultChunkSize)
import qualified Streamly.Internal.Data.Array.Type as A import qualified Streamly.Internal.Data.Array.Type as A
import qualified Streamly.Internal.Data.Fold as FL import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Pipe.Type as Pipe import qualified Streamly.Internal.Data.Pipe.Type as Pipe
import qualified Streamly.Internal.Data.Scan as Scan
import qualified Streamly.Internal.Data.StreamK.Type as K import qualified Streamly.Internal.Data.StreamK.Type as K
import Prelude hiding import Prelude hiding
@ -222,6 +225,32 @@ pipe (Pipe consume produce initial) (Stream stream_step state) =
Pipe.YieldP ps1 b -> Yield b (PipeProduce st ps1) Pipe.YieldP ps1 b -> Yield b (PipeProduce st ps1)
Pipe.SkipP ps1 -> Skip (PipeProduce st ps1) Pipe.SkipP ps1 -> Skip (PipeProduce st ps1)
{-# ANN type RunScanState Fuse #-}
data RunScanState st sc ps = ScanConsume st sc
-- | Use a 'Scan' to transform a stream.
--
{-# INLINE_NORMAL runScan #-}
runScan :: Monad m => Scan m a b -> Stream m a -> Stream m b
runScan (Scan consume initial) (Stream stream_step state) =
Stream step (ScanConsume state initial)
where
{-# INLINE_LATE step #-}
step gst (ScanConsume st cs) = do
r <- stream_step (adaptState gst) st
case r of
Yield x s -> do
res <- consume cs x
return
$ case res of
Scan.Yield cs1 b -> Yield b (ScanConsume s cs1)
Scan.Skip cs1 -> Skip (ScanConsume s cs1)
Scan.Stop -> Stop
Skip s -> return $ Skip (ScanConsume s cs)
Stop -> return Stop
------------------------------------------------------------------------------ ------------------------------------------------------------------------------
-- Transformation Folds -- Transformation Folds
------------------------------------------------------------------------------ ------------------------------------------------------------------------------

View File

@ -360,6 +360,7 @@ library
, Streamly.Internal.Data.Parser , Streamly.Internal.Data.Parser
, Streamly.Internal.Data.ParserK , Streamly.Internal.Data.ParserK
, Streamly.Internal.Data.Pipe , Streamly.Internal.Data.Pipe
, Streamly.Internal.Data.Scan
-- streamly-containers (non-base) -- streamly-containers (non-base)
, Streamly.Internal.Data.Fold , Streamly.Internal.Data.Fold