diff --git a/benchmark/Streamly/Benchmark/Data/Stream/Common.hs b/benchmark/Streamly/Benchmark/Data/Stream/Common.hs index f078ef3cd..1f23981f5 100644 --- a/benchmark/Streamly/Benchmark/Data/Stream/Common.hs +++ b/benchmark/Streamly/Benchmark/Data/Stream/Common.hs @@ -68,6 +68,9 @@ module Stream.Common , transformComposeMapM , transformTeeMapM -- , transformZipMapM + , scanMapM + , scanComposeMapM + , scanTeeMapM ) where @@ -81,6 +84,7 @@ import System.Random (randomRIO) import qualified Streamly.Internal.Data.Fold as Fold import qualified Streamly.Internal.Data.Pipe as Pipe +import qualified Streamly.Internal.Data.Scan as Scan #ifdef USE_PRELUDE import Streamly.Prelude (foldl', scanl') @@ -479,6 +483,14 @@ transformMapM :: -> m () transformMapM n = composeN n $ Stream.pipe (Pipe.mapM return) +{-# INLINE scanMapM #-} +scanMapM :: + (Monad m) + => Int + -> Stream m Int + -> m () +scanMapM n = composeN n $ Stream.runScan (Scan.mapM return) + {-# INLINE transformComposeMapM #-} transformComposeMapM :: (Monad m) @@ -491,6 +503,18 @@ transformComposeMapM n = (Pipe.mapM (\x -> return (x + 1)) `Pipe.compose` Pipe.mapM (\x -> return (x + 2))) +{-# INLINE scanComposeMapM #-} +scanComposeMapM :: + (Monad m) + => Int + -> Stream m Int + -> m () +scanComposeMapM n = + composeN n $ + Stream.runScan + (Scan.mapM (\x -> return (x + 1)) `Scan.compose` + Scan.mapM (\x -> return (x + 2))) + {-# INLINE transformTeeMapM #-} transformTeeMapM :: (Monad m) @@ -503,6 +527,18 @@ transformTeeMapM n = (Pipe.mapM (\x -> return (x + 1)) `Pipe.teeMerge` Pipe.mapM (\x -> return (x + 2))) +{-# INLINE scanTeeMapM #-} +scanTeeMapM :: + (Monad m) + => Int + -> Stream m Int + -> m () +scanTeeMapM n = + composeN n $ + Stream.runScan + (Scan.teeWith (+) (Scan.mapM (\x -> return (x + 1))) + (Scan.mapM (\x -> return (x + 2)))) + {- {-# INLINE transformZipMapM #-} transformZipMapM :: diff --git a/benchmark/Streamly/Benchmark/Data/Stream/Reduce.hs b/benchmark/Streamly/Benchmark/Data/Stream/Reduce.hs index 1027d6c18..646636cba 100644 --- a/benchmark/Streamly/Benchmark/Data/Stream/Reduce.hs +++ b/benchmark/Streamly/Benchmark/Data/Stream/Reduce.hs @@ -565,6 +565,28 @@ o_1_space_pipesX4 value = ] ] +------------------------------------------------------------------------------- +-- Scans +------------------------------------------------------------------------------- + +o_1_space_scans :: Int -> [Benchmark] +o_1_space_scans value = + [ bgroup "scans" + [ benchIOSink value "mapM" (scanMapM 1) + , benchIOSink value "compose" (scanComposeMapM 1) + , benchIOSink value "tee" (scanTeeMapM 1) + ] + ] + +o_1_space_scansX4 :: Int -> [Benchmark] +o_1_space_scansX4 value = + [ bgroup "scansX4" + [ benchIOSink value "mapM" (scanMapM 4) + , benchIOSink value "compose" (scanComposeMapM 4) + , benchIOSink value "tee" (scanTeeMapM 4) + ] + ] + ------------------------------------------------------------------------------- -- Main ------------------------------------------------------------------------------- @@ -582,6 +604,10 @@ benchmarks moduleName size = -- pipes , o_1_space_pipes size , o_1_space_pipesX4 size + + -- scans + , o_1_space_scans size + , o_1_space_scansX4 size ] , bgroup (o_n_stack_prefix moduleName) (o_n_stack_iterated size) , bgroup (o_n_heap_prefix moduleName) (o_n_heap_buffering size) diff --git a/core/src/Streamly/Internal/Data/Pipe.hs b/core/src/Streamly/Internal/Data/Pipe.hs index 94609ed8e..3651d0ce0 100644 --- a/core/src/Streamly/Internal/Data/Pipe.hs +++ b/core/src/Streamly/Internal/Data/Pipe.hs @@ -6,18 +6,27 @@ -- Stability : experimental -- Portability : GHC -- --- There are three fundamental types in streamly. They are streams --- ("Streamly.Data.Stream"), pipes ("Streamly.Internal.Data.Pipe") and folds ("Streamly.Data.Fold"). +-- There are three fundamental types that make up a stream pipeline: +-- +-- * Stream: sources +-- * Scan: transformations +-- * Fold: sinks +-- -- Streams are sources or producers of values, multiple sources can be merged -- into a single source but a source cannot be split into multiple stream -- sources. Folds are sinks or consumers, a stream can be split and -- distributed to multiple folds but the results cannot be merged back into a --- stream source again. Pipes are transformations, a stream source can be split --- and distributed to multiple pipes each pipe can apply its own transform on --- the stream and the results can be merged back into a single pipe. Pipes can --- be attached to a source to produce a source or they can be attached to a --- fold to produce a fold, or multiple pipes can be merged or zipped into a --- single pipe. +-- stream source again. Scans are simple one-to-one transformations with +-- filtering. One element cannot be transformed to multiple elements. +-- +-- The Pipe type is a super type of all the above, it is the most complex type. +-- All of these can be represented by a pipe. A pipe can act as a source or a +-- sink or a transformation, dynamically. A stream source can be split and +-- distributed to multiple pipes each pipe can apply its own transform on the +-- stream and the results can be merged back into a single pipe. Pipes can be +-- attached to a source to produce a source or they can be attached to a fold +-- to produce a fold, or multiple pipes can be merged or zipped into a single +-- pipe. -- -- > import qualified Streamly.Internal.Data.Pipe as Pipe diff --git a/core/src/Streamly/Internal/Data/Pipe/Type.hs b/core/src/Streamly/Internal/Data/Pipe/Type.hs index 2c0f5166f..e44850449 100644 --- a/core/src/Streamly/Internal/Data/Pipe/Type.hs +++ b/core/src/Streamly/Internal/Data/Pipe/Type.hs @@ -13,6 +13,8 @@ module Streamly.Internal.Data.Pipe.Type , Pipe (..) -- * From folds + , fromStream + , fromScan , fromFold -- * Primitive Pipes @@ -35,10 +37,15 @@ import Control.Category (Category(..)) import Data.Functor ((<&>)) import Fusion.Plugin.Types (Fuse(..)) import Streamly.Internal.Data.Fold.Type (Fold(..)) +import Streamly.Internal.Data.Scan (Scan(..)) +import Streamly.Internal.Data.Stream.Type (Stream(..)) -- import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..)) +import Streamly.Internal.Data.SVar.Type (defState) import qualified Prelude import qualified Streamly.Internal.Data.Fold.Type as Fold +import qualified Streamly.Internal.Data.Scan as Scan +import qualified Streamly.Internal.Data.Stream.Type as Stream import Prelude hiding (filter, zipWith, map, mapM, id, unzip, null) @@ -641,3 +648,34 @@ fromFold (Fold fstep finitial fextract _) = Pipe consume produce ToScanInit produce (ToScanFirst st x) = consume (ToScanGo st) x produce ToScanStop = return Stop + +-- | Produces the stream on consuming (). +-- +{-# INLINE fromStream #-} +fromStream :: Monad m => Stream m a -> Pipe m () a +fromStream (Stream step state) = Pipe consume produce () + + where + + -- XXX make the initial state Either type and start in produce mode + consume () () = return $ SkipP state + + produce st = do + r <- step defState st + return $ case r of + Stream.Yield b s -> YieldP s b + Stream.Skip s -> SkipP s + Stream.Stop -> Stop + +{-# INLINE fromScan #-} +fromScan :: Monad m => Scan m a b -> Pipe m a b +fromScan (Scan step initial) = Pipe consume undefined initial + + where + + consume st a = do + r <- step st a + return $ case r of + Scan.Yield s b -> YieldC s b + Scan.Skip s -> SkipC s + Scan.Stop -> Stop diff --git a/core/src/Streamly/Internal/Data/Scan.hs b/core/src/Streamly/Internal/Data/Scan.hs new file mode 100644 index 000000000..a6c81326f --- /dev/null +++ b/core/src/Streamly/Internal/Data/Scan.hs @@ -0,0 +1,338 @@ +-- | +-- Module : Streamly.Internal.Data.Scan +-- Copyright : (c) 2019 Composewell Technologies +-- License : BSD3 +-- Maintainer : streamly@composewell.com +-- Stability : experimental +-- Portability : GHC + +module Streamly.Internal.Data.Scan + ( + -- * Type + Step (..) + , Scan (..) + + -- * Primitive Scans + , identity + , map -- function? + , mapM -- functionM? + , filter + , filterM + + -- * Combinators + , compose + , teeWithMay + , teeWith -- zipWith -- teeZip + ) +where + +#include "inline.hs" +import Control.Arrow (Arrow(..)) +import Control.Category (Category(..)) +import Data.Functor ((<&>)) +import Data.Maybe (isJust, fromJust) +import Fusion.Plugin.Types (Fuse(..)) +import Streamly.Internal.Data.Tuple.Strict (Tuple'(..)) + +import qualified Prelude + +import Prelude hiding (filter, zipWith, map, mapM, id, unzip, null) + +-- $setup +-- >>> :m +-- >>> :set -XFlexibleContexts +-- >>> import Control.Category +-- +-- >>> import qualified Streamly.Internal.Data.Fold as Fold +-- >>> import qualified Streamly.Internal.Data.Scan as Scan +-- >>> import qualified Streamly.Internal.Data.Stream as Stream + +------------------------------------------------------------------------------ +-- Scans +------------------------------------------------------------------------------ + +-- A Scan is half the pipe: +-- +-- A scan is a simpler version of pipes. A scan always consumes and input and +-- may or may not produce an output. It can produce at most one output on one +-- input. Whereas a pipe may produce output even without consuming anything or +-- it can produce multiple outputs on a single input. Scans are simpler +-- abstractions to think about and easier for the compiler to optimize. + +-- What kind of compositions are possible with scans? +-- +-- Append: this is the easiest. The behavior is simple even in presence of +-- filtering (Skip) and termination (Stop). Skip translates to Skip, Stop +-- translates to Stop. +-- +-- demux: we select one of n scans to run. Behaviour with Skip is straight +-- forward. Termination behavior has multiple options, stop when first one +-- stops, stop when the last one stops, or stop when a selected one stops. +-- +-- zip: run all and zip the outputs. If one of them Skips we Skip the output. +-- If one of them stops we stop. It may be possible to collect the outputs as +-- Just/Nothing values. +-- +-- Another option could be if a Scan terminates do we want to start it again or +-- not. +-- +{-# ANN type Step Fuse #-} +data Step s b = + Yield s b -- ^ Yield and consume + | Skip s -- ^ Skip and consume + | Stop + +instance Functor (Step s) where + {-# INLINE_NORMAL fmap #-} + fmap f (Yield s b) = Yield s (f b) + fmap _ (Skip s) = Skip s + fmap _ Stop = Stop + +-- | Represents a stateful transformation over an input stream of values of +-- type @a@ to outputs of type @b@ in 'Monad' @m@. +-- +-- The constructor is @Scan consume initial@. +data Scan m a b = + forall s. Scan + (s -> a -> m (Step s b)) + s + +------------------------------------------------------------------------------ +-- Functor: Mapping on the output +------------------------------------------------------------------------------ + +-- | 'fmap' maps a pure function on a scan output. +-- +-- >>> Stream.toList $ Stream.runScan (fmap (+1) Scan.identity) $ Stream.fromList [1..5::Int] +-- [2,3,4,5,6] +-- +instance Functor m => Functor (Scan m a) where + {-# INLINE_NORMAL fmap #-} + fmap f (Scan consume initial) = Scan consume1 initial + + where + + {-# INLINE_LATE consume1 #-} + consume1 s b = fmap (fmap f) (consume s b) + +------------------------------------------------------------------------------- +-- Category +------------------------------------------------------------------------------- + +-- | Connect two scans in series. The second scan is the input end, and the +-- first scan is the output end. +-- +-- >>> import Control.Category +-- >>> Stream.toList $ Stream.runScan (Scan.map (+1) >>> Scan.map (+1)) $ Stream.fromList [1..5::Int] +-- [3,4,5,6,7] +-- +{-# INLINE_NORMAL compose #-} +compose :: Monad m => Scan m b c -> Scan m a b -> Scan m a c +compose + (Scan stepR initialR) + (Scan stepL initialL) = Scan step (initialL, initialR) + + where + + -- XXX Use strict tuple? + step (sL, sR) x = do + rL <- stepL sL x + case rL of + Yield sL1 bL -> do + rR <- stepR sR bL + return + $ case rR of + Yield sR1 br -> Yield (sL1, sR1) br + Skip sR1 -> Skip (sL1, sR1) + Stop -> Stop + Skip sL1 -> return $ Skip (sL1, sR) + Stop -> return Stop + +-- | A scan representing mapping of a monadic action. +-- +-- >>> Stream.toList $ Stream.runScan (Scan.mapM print) $ Stream.fromList [1..5::Int] +-- 1 +-- 2 +-- 3 +-- 4 +-- 5 +-- [(),(),(),(),()] +-- +{-# INLINE mapM #-} +mapM :: Monad m => (a -> m b) -> Scan m a b +mapM f = Scan (\() a -> f a <&> Yield ()) () + +-- | A scan representing mapping of a pure function. +-- +-- >>> Stream.toList $ Stream.runScan (Scan.map (+1)) $ Stream.fromList [1..5::Int] +-- [2,3,4,5,6] +-- +{-# INLINE map #-} +map :: Monad m => (a -> b) -> Scan m a b +map f = mapM (return Prelude.. f) + +{- HLINT ignore "Redundant map" -} + +-- | An identity scan producing the same output as input. +-- +-- >>> identity = Scan.map Prelude.id +-- +-- >>> Stream.toList $ Stream.runScan (Scan.identity) $ Stream.fromList [1..5::Int] +-- [1,2,3,4,5] +-- +{-# INLINE identity #-} +identity :: Monad m => Scan m a a +identity = map Prelude.id + +instance Monad m => Category (Scan m) where + {-# INLINE id #-} + id = identity + + {-# INLINE (.) #-} + (.) = compose + +{-# ANN type TeeWith Fuse #-} +data TeeWith sL sR = TeeWith !sL !sR + +-- XXX zipWith? + +-- | Connect two scans in parallel. Distribute the input across two scans and +-- merge their outputs as soon as they become available. Note that a scan may +-- not generate output on each input, it might filter it. +-- +-- >>> Stream.toList $ Stream.runScan (Scan.teeWithMay (,) Scan.identity (Scan.map (\x -> x * x))) $ Stream.fromList [1..5::Int] +-- [(Just 1,Just 1),(Just 2,Just 4),(Just 3,Just 9),(Just 4,Just 16),(Just 5,Just 25)] +-- +{-# INLINE_NORMAL teeWithMay #-} +teeWithMay :: Monad m => + (Maybe b -> Maybe c -> d) -> Scan m a b -> Scan m a c -> Scan m a d +teeWithMay f (Scan stepL initialL) (Scan stepR initialR) = + Scan step (TeeWith initialL initialR) + + where + + -- XXX Use strict tuple? + step (TeeWith sL sR) a = do + resL <- stepL sL a + resR <- stepR sR a + return + $ case resL of + Yield sL1 bL -> + case resR of + Yield sR1 bR -> + Yield (TeeWith sL1 sR1) (f (Just bL) (Just bR)) + Skip sR1 -> + Yield (TeeWith sL1 sR1) (f (Just bL) Nothing) + Stop -> Stop + Skip sL1 -> + case resR of + Yield sR1 bR -> + Yield (TeeWith sL1 sR1) (f Nothing (Just bR)) + Skip sR1 -> + Yield (TeeWith sL1 sR1) (f Nothing Nothing) + Stop -> Stop + Stop -> Stop + +-- | Produces an output only when both the scans produce an output. +-- +-- >>> Stream.toList $ Stream.runScan (Scan.teeWith (,) Scan.identity (Scan.map (\x -> x * x))) $ Stream.fromList [1..5::Int] +-- [Just (1,1),Just (2,4),Just (3,9),Just (4,16),Just (5,25)] +-- +{-# INLINE_NORMAL teeWith #-} +teeWith :: Monad m => + (b -> c -> d) -> Scan m a b -> Scan m a c -> Scan m a d +teeWith f s1 s2 = + fmap fromJust + $ compose (filter isJust) + $ teeWithMay (\b c -> f <$> b <*> c) s1 s2 + +------------------------------------------------------------------------------- +-- Arrow +------------------------------------------------------------------------------- + +{-# INLINE_NORMAL unzipMay #-} +unzipMay :: Monad m => + Scan m a x -> Scan m b y -> Scan m (a, b) (Maybe x, Maybe y) +unzipMay (Scan stepL initialL) (Scan stepR initialR) = + Scan step (Tuple' initialL initialR) + + where + + step (Tuple' sL sR) (a, b) = do + resL <- stepL sL a + resR <- stepR sR b + return + $ case resL of + Yield sL1 bL -> + case resR of + Yield sR1 bR -> + Yield (Tuple' sL1 sR1) (Just bL, Just bR) + Skip sR1 -> + Yield (Tuple' sL1 sR1) (Just bL, Nothing) + Stop -> Stop + Skip sL1 -> + case resR of + Yield sR1 bR -> + Yield (Tuple' sL1 sR1) (Nothing, Just bR) + Skip sR1 -> + Yield (Tuple' sL1 sR1) (Nothing, Nothing) + Stop -> Stop + Stop -> Stop + +-- | Produces an output only when both the scans produce an output. +{-# INLINE_NORMAL unzip #-} +unzip :: Monad m => Scan m a x -> Scan m b y -> Scan m (a, b) (x, y) +unzip s1 s2 = fmap (fromJust Prelude.. f) $ unzipMay s1 s2 + + where + + f (mx, my) = + case mx of + Just x -> + case my of + Just y -> Just (x, y) + Nothing -> Nothing + Nothing -> Nothing + +instance Monad m => Applicative (Scan m a) where + {-# INLINE pure #-} + pure b = Scan (\_ _ -> pure $ Yield () b) () + + (<*>) = teeWith id + +instance Monad m => Arrow (Scan m) where + {-# INLINE arr #-} + arr = map + + {-# INLINE (***) #-} + (***) = unzip + + {-# INLINE (&&&) #-} + (&&&) = teeWith (,) + +------------------------------------------------------------------------------- +-- Primitive scans +------------------------------------------------------------------------------- + +-- | A filtering scan using a monadic predicate. +{-# INLINE filterM #-} +filterM :: Monad m => (a -> m Bool) -> Scan m a a +filterM f = Scan (\() a -> f a >>= g a) () + + where + + {-# INLINE g #-} + g a b = + return + $ if b + then Yield () a + else Skip () + +-- | A filtering scan using a pure predicate. +-- +-- >>> Stream.toList $ Stream.runScan (Scan.filter odd) $ Stream.fromList [1..5::Int] +-- [1,3,5] +-- +{-# INLINE filter #-} +filter :: Monad m => (a -> Bool) -> Scan m a a +filter f = filterM (return Prelude.. f) diff --git a/core/src/Streamly/Internal/Data/Stream/Transform.hs b/core/src/Streamly/Internal/Data/Stream/Transform.hs index 5db72b58d..914834379 100644 --- a/core/src/Streamly/Internal/Data/Stream/Transform.hs +++ b/core/src/Streamly/Internal/Data/Stream/Transform.hs @@ -34,6 +34,7 @@ module Streamly.Internal.Data.Stream.Transform , scan , scanMany , pipe + , runScan -- * Splitting , splitOn @@ -157,6 +158,7 @@ import Fusion.Plugin.Types (Fuse(..)) import Streamly.Internal.Data.Fold.Type (Fold(..)) import Streamly.Internal.Data.Pipe.Type (Pipe(..)) +import Streamly.Internal.Data.Scan (Scan(..)) import Streamly.Internal.Data.SVar.Type (adaptState) import Streamly.Internal.Data.Time.Units (AbsTime, RelTime64) import Streamly.Internal.Data.Unbox (Unbox) @@ -166,6 +168,7 @@ import Streamly.Internal.System.IO (defaultChunkSize) import qualified Streamly.Internal.Data.Array.Type as A import qualified Streamly.Internal.Data.Fold as FL import qualified Streamly.Internal.Data.Pipe.Type as Pipe +import qualified Streamly.Internal.Data.Scan as Scan import qualified Streamly.Internal.Data.StreamK.Type as K import Prelude hiding @@ -222,6 +225,32 @@ pipe (Pipe consume produce initial) (Stream stream_step state) = Pipe.YieldP ps1 b -> Yield b (PipeProduce st ps1) Pipe.SkipP ps1 -> Skip (PipeProduce st ps1) +{-# ANN type RunScanState Fuse #-} +data RunScanState st sc ps = ScanConsume st sc + +-- | Use a 'Scan' to transform a stream. +-- +{-# INLINE_NORMAL runScan #-} +runScan :: Monad m => Scan m a b -> Stream m a -> Stream m b +runScan (Scan consume initial) (Stream stream_step state) = + Stream step (ScanConsume state initial) + + where + + {-# INLINE_LATE step #-} + step gst (ScanConsume st cs) = do + r <- stream_step (adaptState gst) st + case r of + Yield x s -> do + res <- consume cs x + return + $ case res of + Scan.Yield cs1 b -> Yield b (ScanConsume s cs1) + Scan.Skip cs1 -> Skip (ScanConsume s cs1) + Scan.Stop -> Stop + Skip s -> return $ Skip (ScanConsume s cs) + Stop -> return Stop + ------------------------------------------------------------------------------ -- Transformation Folds ------------------------------------------------------------------------------ diff --git a/core/streamly-core.cabal b/core/streamly-core.cabal index bf63bc42e..4af7f6abe 100644 --- a/core/streamly-core.cabal +++ b/core/streamly-core.cabal @@ -360,6 +360,7 @@ library , Streamly.Internal.Data.Parser , Streamly.Internal.Data.ParserK , Streamly.Internal.Data.Pipe + , Streamly.Internal.Data.Scan -- streamly-containers (non-base) , Streamly.Internal.Data.Fold