diff --git a/benchmark/Streamly/Benchmark/FileSystem/Handle/Read.hs b/benchmark/Streamly/Benchmark/FileSystem/Handle/Read.hs index dea0b7d33..2a8bd45d0 100644 --- a/benchmark/Streamly/Benchmark/FileSystem/Handle/Read.hs +++ b/benchmark/Streamly/Benchmark/FileSystem/Handle/Read.hs @@ -32,18 +32,17 @@ import GHC.Magic (noinline) #endif import System.IO (Handle) -import qualified Streamly.Data.Fold as FL -import qualified Streamly.Unicode.Stream as SS import qualified Streamly.FileSystem.Handle as FH -import qualified Streamly.Internal.Data.Parser as PR -import qualified Streamly.Internal.Data.Stream.StreamD as D -import qualified Streamly.Internal.Unicode.Stream as IUS -import qualified Streamly.Internal.FileSystem.Handle as IFH import qualified Streamly.Internal.Data.Array.Storable.Foreign as A import qualified Streamly.Internal.Data.Array.Storable.Foreign.Types as AT -import qualified Streamly.Internal.Memory.ArrayStream as AS +import qualified Streamly.Internal.Data.Fold as FL +import qualified Streamly.Internal.Data.Parser as PR import qualified Streamly.Internal.Data.Stream.IsStream as IP +import qualified Streamly.Internal.FileSystem.Handle as IFH +import qualified Streamly.Internal.Memory.ArrayStream as AS +import qualified Streamly.Internal.Unicode.Stream as IUS import qualified Streamly.Prelude as S +import qualified Streamly.Unicode.Stream as SS import Gauge hiding (env) import Prelude hiding (last, length) @@ -52,6 +51,7 @@ import Streamly.Benchmark.Common.Handle #ifdef INSPECTION import Streamly.Internal.Data.Stream.StreamD.Type (Step(..), GroupState) +import qualified Streamly.Internal.Data.Stream.StreamD as D import qualified Streamly.Internal.Data.Unfold as IUF import Test.Inspection @@ -309,9 +309,14 @@ o_1_space_reduce_toBytes env = chunksOfSum :: Int -> Handle -> IO Int chunksOfSum n inh = S.length $ S.chunksOf n FL.sum (S.unfold FH.read inh) +foldManyChunksOfSum :: Int -> Handle -> IO Int +foldManyChunksOfSum n inh = + S.length $ IP.foldMany (FL.ltake n FL.sum) (S.unfold FH.read inh) + parseManyChunksOfSum :: Int -> Handle -> IO Int parseManyChunksOfSum n inh = - S.length $ IP.parseMany (PR.take n FL.sum) (S.unfold FH.read inh) + S.length + $ IP.parseMany (PR.fromFold $ FL.ltake n FL.sum) (S.unfold FH.read inh) -- XXX investigate why we need an INLINE in this case (GHC) -- Even though allocations remain the same in both cases inlining improves time @@ -333,25 +338,6 @@ inspect $ 'chunksOf `hasNoType` ''IUF.ConcatState -- FH.read/UF.concat inspect $ 'chunksOf `hasNoType` ''A.ReadUState -- FH.read/A.read #endif --- This is to make sure that the concatMap in FH.read, groupsOf and foldlM' --- together can fuse. --- --- | Slice in chunks of size n and get the count of chunks. -_chunksOfD :: Int -> Handle -> IO Int -_chunksOfD n inh = - D.foldlM' (\i _ -> return $ i + 1) (return 0) - $ D.groupsOf n (AT.writeNUnsafe n) - $ D.fromStreamK (S.unfold FH.read inh) - -#ifdef INSPECTION -inspect $ hasNoTypeClasses '_chunksOfD -inspect $ '_chunksOfD `hasNoType` ''Step -inspect $ '_chunksOfD `hasNoType` ''GroupState -inspect $ '_chunksOfD `hasNoType` ''AT.ArrayUnsafe -- AT.writeNUnsafe -inspect $ '_chunksOfD `hasNoType` ''IUF.ConcatState -- FH.read/UF.concat -inspect $ '_chunksOfD `hasNoType` ''A.ReadUState -- FH.read/A.read -#endif - o_1_space_reduce_read_grouped :: BenchEnv -> [Benchmark] o_1_space_reduce_read_grouped env = [ bgroup "reduce/read/chunks" @@ -363,11 +349,22 @@ o_1_space_reduce_read_grouped env = -- XXX investigate why we need inline/noinline in these cases (GHC) -- Chunk using parsers - , mkBenchSmall ("S.parseMany (PR.take " ++ show (bigSize env) ++ " FL.sum)") - env $ \inh _ -> - noinline parseManyChunksOfSum (bigSize env) inh - , mkBench "S.parseMany (PR.take 1 FL.sum)" env $ \inh _ -> - inline parseManyChunksOfSum 1 inh + , mkBenchSmall + ("S.foldMany (FL.take " ++ show (bigSize env) ++ " FL.sum)") + env + $ \inh _ -> noinline foldManyChunksOfSum (bigSize env) inh + , mkBench + "S.foldMany (FL.take 1 FL.sum)" + env + $ \inh _ -> inline foldManyChunksOfSum 1 inh + , mkBenchSmall + ("S.parseMany (FL.take " ++ show (bigSize env) ++ " FL.sum)") + env + $ \inh _ -> noinline parseManyChunksOfSum (bigSize env) inh + , mkBench + "S.parseMany (FL.take 1 FL.sum)" + env + $ \inh _ -> inline parseManyChunksOfSum 1 inh -- folding chunks to arrays , mkBenchSmall "S.arraysOf 1" env $ \inh _ -> diff --git a/benchmark/Streamly/Benchmark/Prelude/Serial/Split.hs b/benchmark/Streamly/Benchmark/Prelude/Serial/Split.hs index 6038d2bd8..3944c0492 100644 --- a/benchmark/Streamly/Benchmark/Prelude/Serial/Split.hs +++ b/benchmark/Streamly/Benchmark/Prelude/Serial/Split.hs @@ -24,13 +24,13 @@ import Data.Char (ord) import Data.Word (Word8) import System.IO (Handle) -import qualified Streamly.Data.Fold as FL import qualified Streamly.FileSystem.Handle as FH -import qualified Streamly.Internal.Data.Parser as PR -import qualified Streamly.Internal.Unicode.Stream as IUS -import qualified Streamly.Internal.FileSystem.Handle as IFH import qualified Streamly.Internal.Data.Array.Storable.Foreign as A +import qualified Streamly.Internal.Data.Fold as FL +import qualified Streamly.Internal.Data.Parser as PR import qualified Streamly.Internal.Data.Stream.IsStream as IP +import qualified Streamly.Internal.FileSystem.Handle as IFH +import qualified Streamly.Internal.Unicode.Stream as IUS import qualified Streamly.Prelude as S import Gauge hiding (env) @@ -95,11 +95,23 @@ inspect $ 'splitWithSuffix `hasNoType` ''IUF.ConcatState -- FH.read/UF.concat inspect $ 'splitWithSuffix `hasNoType` ''A.ReadUState -- FH.read/A.read #endif +-- | Split on line feed. +foldManySepBy :: Handle -> IO Int +foldManySepBy inh = + (S.length + $ IP.foldMany + (FL.sliceSepBy (== lf) FL.drain) + (S.unfold FH.read inh) + ) -- >>= print + -- | Split on line feed. parseManySepBy :: Handle -> IO Int parseManySepBy inh = - (S.length $ IP.parseMany (PR.sliceSepBy (== lf) FL.drain) - (S.unfold FH.read inh)) -- >>= print + (S.length + $ IP.parseMany + (PR.fromFold $ FL.sliceSepBy (== lf) FL.drain) + (S.unfold FH.read inh) + ) -- >>= print -- | Words by space wordsBy :: Handle -> IO Int @@ -146,7 +158,9 @@ splitOnSuffixSeq str inh = o_1_space_reduce_read_split :: BenchEnv -> [Benchmark] o_1_space_reduce_read_split env = [ bgroup "split" - [ mkBench "S.parseMany (PR.sliceSepBy (== lf) FL.drain)" env + [ mkBench "S.foldMany (FL.sliceSepBy (== lf) FL.drain)" env + $ \inh _ -> foldManySepBy inh + , mkBench "S.parseMany (FL.sliceSepBy (== lf) FL.drain)" env $ \inh _ -> parseManySepBy inh , mkBench "S.wordsBy isSpace FL.drain" env $ \inh _ -> wordsBy inh diff --git a/benchmark/Streamly/Benchmark/Prelude/Serial/Transformation2.hs b/benchmark/Streamly/Benchmark/Prelude/Serial/Transformation2.hs index 1a379b256..aa1c1a570 100644 --- a/benchmark/Streamly/Benchmark/Prelude/Serial/Transformation2.hs +++ b/benchmark/Streamly/Benchmark/Prelude/Serial/Transformation2.hs @@ -17,11 +17,12 @@ module Serial.Transformation2 (benchmarks) where import Control.DeepSeq (NFData(..)) import Control.Monad (when) import Control.Monad.IO.Class (MonadIO(..)) +import Data.Monoid (Sum(..)) import GHC.Generics (Generic) -import qualified Streamly.Prelude as S -import qualified Streamly.Internal.Data.Stream.IsStream as Internal import qualified Streamly.Internal.Data.Fold as FL +import qualified Streamly.Internal.Data.Stream.IsStream as Internal +import qualified Streamly.Prelude as S import Gauge import Streamly.Prelude (SerialT, serially) @@ -131,6 +132,22 @@ groupsByRollingEq :: MonadIO m => SerialT m Int -> m () groupsByRollingEq = S.drain . S.groupsByRolling (==) FL.drain +{-# INLINE foldMany #-} +foldMany :: Monad m => SerialT m Int -> m () +foldMany = + S.drain + . S.map getSum + . Internal.foldMany (FL.ltake 2 FL.mconcat) + . S.map Sum + +{-# INLINE _foldIterate #-} +_foldIterate :: Monad m => SerialT m Int -> m () +_foldIterate = + S.drain + . S.map getSum + . Internal.foldIterate (FL.ltake 2 . FL.sconcat) (Sum 0) + . S.map Sum + o_1_space_grouping :: Int -> [Benchmark] o_1_space_grouping value = -- Buffering operations using heap proportional to group/window sizes. @@ -140,6 +157,8 @@ o_1_space_grouping value = , benchIOSink value "groupsByEq" groupsByEq , benchIOSink value "groupsByRollingLT" groupsByRollingLT , benchIOSink value "groupsByRollingEq" groupsByRollingEq + , benchIOSink value "foldMany" foldMany + -- , benchIOSink value "foldIterate" foldIterate ] ] diff --git a/src/Streamly/Internal/Data/Stream/IsStream.hs b/src/Streamly/Internal/Data/Stream/IsStream.hs index bdcf1dee3..50eb36157 100644 --- a/src/Streamly/Internal/Data/Stream/IsStream.hs +++ b/src/Streamly/Internal/Data/Stream/IsStream.hs @@ -136,10 +136,22 @@ module Streamly.Internal.Data.Stream.IsStream , foldlM' -- ** Composable Left Folds + -- | See "Streamly.Internal.Data.Fold" , fold + , foldMany + , foldSequence + , foldIterate + + -- ** Parsers + -- | See "Streamly.Internal.Data.Parser" , parse , parseK , parseD + , parseMany + , parseManyD + , parseManyTill + , parseSequence + , parseIterate -- ** Concurrent Folds , foldAsync @@ -319,12 +331,6 @@ module Streamly.Internal.Data.Stream.IsStream , reverse , reverse' - -- ** Parsing - , parseMany - , parseManyD - , parseManyTill - , parseIterate - -- ** Trimming , take -- , takeGE @@ -643,8 +649,7 @@ import Streamly.Internal.Data.IORef.Prim (Prim, IORef) import Streamly.Internal.Data.Tuple.Strict (Tuple'(..)) import qualified Streamly.Internal.Data.Array.Storable.Foreign as A -import qualified Streamly.Data.Fold as FL -import qualified Streamly.Internal.Data.Fold.Types as FL +import qualified Streamly.Internal.Data.Fold as FL import qualified Streamly.Internal.Data.Stream.Prelude as P import qualified Streamly.Internal.Data.Stream.StreamK as K import qualified Streamly.Internal.Data.Stream.StreamD as D @@ -1410,7 +1415,10 @@ foldlM' step begin m = S.foldlM' step begin $ toStreamS m -- Running a Fold ------------------------------------------------------------------------------ --- | Fold a stream using the supplied left fold. +-- | Fold a stream using the supplied left 'Fold' and reducing the resulting +-- expression strictly at each step. The behavior is similar to 'foldl''. A +-- 'Fold' can terminate early without consuming the full stream. See the +-- documentation of individual 'Fold's for termination behavior. -- -- >>> S.fold FL.sum (S.enumerateFromTo 1 100) -- 5050 @@ -1461,6 +1469,7 @@ parse = parseD . PRK.fromParserK -- | -- > drain = mapM_ (\_ -> return ()) +-- > drain = fold Fold.drain -- -- Run a stream, discarding the results. By default it interprets the stream -- as 'SerialT', to run other types of streams use the type adapting @@ -1483,6 +1492,7 @@ runStream = drain -- | -- > drainN n = drain . take n +-- > drainN n = fold (Fold.ltake n Fold.drain) -- -- Run maximum up to @n@ iterations of a stream. -- @@ -1504,6 +1514,7 @@ runN = drainN -- | -- > drainWhile p = drain . takeWhile p +-- > drainWhile p = fold (Fold.sliceSepBy (not . p) Fold.drain) -- -- Run a stream as long as the predicate holds true. -- @@ -1525,6 +1536,8 @@ runWhile = drainWhile -- | Determine whether the stream is empty. -- +-- > null = fold Fold.null +-- -- @since 0.1.1 {-# INLINE null #-} null :: Monad m => SerialT m a -> m Bool @@ -1533,6 +1546,7 @@ null = S.null . toStreamS -- | Extract the first element of the stream, if any. -- -- > head = (!! 0) +-- > head = fold Fold.head -- -- @since 0.1.0 {-# INLINE head #-} @@ -1568,6 +1582,7 @@ init m = K.init (K.adapt m) -- | Extract the last element of the stream, if any. -- -- > last xs = xs !! (length xs - 1) +-- > last = fold Fold.last -- -- @since 0.1.1 {-# INLINE last #-} @@ -1576,6 +1591,8 @@ last m = S.last $ toStreamS m -- | Determine whether an element is present in the stream. -- +-- > elem = fold Fold.elem +-- -- @since 0.1.0 {-# INLINE elem #-} elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool @@ -1583,6 +1600,8 @@ elem e m = S.elem e (toStreamS m) -- | Determine whether an element is not present in the stream. -- +-- > notElem = fold Fold.notElem +-- -- @since 0.1.0 {-# INLINE notElem #-} notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool @@ -1590,6 +1609,8 @@ notElem e m = S.notElem e (toStreamS m) -- | Determine the length of the stream. -- +-- > notElem = fold Fold.length +-- -- @since 0.1.0 {-# INLINE length #-} length :: Monad m => SerialT m a -> m Int @@ -1597,6 +1618,8 @@ length = foldl' (\n _ -> n + 1) 0 -- | Determine whether all elements of a stream satisfy a predicate. -- +-- > all = fold Fold.all +-- -- @since 0.1.0 {-# INLINE all #-} all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool @@ -1604,6 +1627,8 @@ all p m = S.all p (toStreamS m) -- | Determine whether any of the elements of a stream satisfy a predicate. -- +-- > any = fold Fold.any +-- -- @since 0.1.0 {-# INLINE any #-} any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool @@ -1611,6 +1636,8 @@ any p m = S.any p (toStreamS m) -- | Determines if all elements of a boolean stream are True. -- +-- > and = fold Fold.and +-- -- @since 0.5.0 {-# INLINE and #-} and :: Monad m => SerialT m Bool -> m Bool @@ -1618,6 +1645,8 @@ and = all (==True) -- | Determines whether at least one element of a boolean stream is True. -- +-- > or = fold Fold.or +-- -- @since 0.5.0 {-# INLINE or #-} or :: Monad m => SerialT m Bool -> m Bool @@ -1627,6 +1656,8 @@ or = any (==True) -- the stream is empty. Note that this is not numerically stable for floating -- point numbers. -- +-- > sum = fold Fold.sum +-- -- @since 0.1.0 {-# INLINE sum #-} sum :: (Monad m, Num a) => SerialT m a -> m a @@ -1635,6 +1666,8 @@ sum = foldl' (+) 0 -- | Determine the product of all elements of a stream of numbers. Returns @1@ -- when the stream is empty. -- +-- > product = fold Fold.product +-- -- @since 0.1.1 {-# INLINE product #-} product :: (Monad m, Num a) => SerialT m a -> m a @@ -1642,6 +1675,8 @@ product = foldl' (*) 1 -- | Fold a stream of monoid elements by appending them. -- +-- > mconcat = fold Fold.mconcat +-- -- /Internal/ {-# INLINE mconcat #-} mconcat :: (Monad m, Monoid a) => SerialT m a -> m a @@ -1650,6 +1685,7 @@ mconcat = foldr mappend mempty -- | -- @ -- minimum = 'minimumBy' compare +-- minimum = fold Fold.minimum -- @ -- -- Determine the minimum element in a stream. @@ -1662,6 +1698,8 @@ minimum m = S.minimum (toStreamS m) -- | Determine the minimum element in a stream using the supplied comparison -- function. -- +-- > minimumBy = fold Fold.minimumBy +-- -- @since 0.6.0 {-# INLINE minimumBy #-} minimumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) @@ -1670,6 +1708,7 @@ minimumBy cmp m = S.minimumBy cmp (toStreamS m) -- | -- @ -- maximum = 'maximumBy' compare +-- maximum = fold Fold.maximum -- @ -- -- Determine the maximum element in a stream. @@ -1682,6 +1721,8 @@ maximum = P.maximum -- | Determine the maximum element in a stream using the supplied comparison -- function. -- +-- > maximumBy = fold Fold.maximumBy +-- -- @since 0.6.0 {-# INLINE maximumBy #-} maximumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) @@ -1698,6 +1739,7 @@ m !! i = toStreamS m S.!! i -- first pair where the key equals the given value @a@. -- -- > lookup = snd <$> find ((==) . fst) +-- > lookup = fold Fold.lookup -- -- @since 0.5.0 {-# INLINE lookup #-} @@ -1707,6 +1749,7 @@ lookup a m = S.lookup a (toStreamS m) -- | Like 'findM' but with a non-monadic predicate. -- -- > find p = findM (return . p) +-- > find = fold Fold.find -- -- @since 0.5.0 {-# INLINE find #-} @@ -1715,6 +1758,8 @@ find p m = S.find p (toStreamS m) -- | Returns the first element that satisfies the given predicate. -- +-- > findM = fold Fold.findM +-- -- @since 0.6.0 {-# INLINE findM #-} findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a) @@ -1723,6 +1768,8 @@ findM p m = S.findM p (toStreamS m) -- | Find all the indices where the element in the stream satisfies the given -- predicate. -- +-- > findIndices = fold Fold.findIndices +-- -- @since 0.5.0 {-# INLINE findIndices #-} findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int @@ -1730,6 +1777,8 @@ findIndices p m = fromStreamS $ S.findIndices p (toStreamS m) -- | Returns the first index that satisfies the given predicate. -- +-- > findIndex = fold Fold.findIndex +-- -- @since 0.5.0 {-# INLINE findIndex #-} findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int) @@ -1738,10 +1787,12 @@ findIndex p = head . findIndices p -- | Find all the indices where the value of the element in the stream is equal -- to the given value. -- +-- > elemIndices a = findIndices (== a) +-- -- @since 0.5.0 {-# INLINE elemIndices #-} elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int -elemIndices a = findIndices (==a) +elemIndices a = findIndices (== a) -- | Returns the first index where a given value is found in the stream. -- @@ -3636,7 +3687,7 @@ iterateMapLeftsWith iterateMapLeftsWith combine f = iterateMapWith combine (either f (const K.nil)) ------------------------------------------------------------------------------ --- Parsing +-- Folding/Parsing chunks in a stream ------------------------------------------------------------------------------ -- Splitting operations that take a predicate and a Fold can be @@ -3645,6 +3696,67 @@ iterateMapLeftsWith combine f = iterateMapWith combine (either f (const K.nil)) -- -- XXX We need takeGE/takeBetween to implement "some" using "many". +-- | Apply a 'Fold' repeatedly on a stream and emit the parsed values in the +-- output stream. +-- +-- This is the streaming dual of the 'Streamly.Internal.Data.Fold.many' +-- parse combinator. +-- +-- >>> f = Fold.ltake 2 Fold.sum +-- >>> Stream.toList $ Stream.foldMany f $ Stream.fromList [1..10] +-- > [3,7,11,15,19] +-- +-- >>> f = Fold.sliceEndWith Fold.toList +-- >>> Stream.toList $ Stream.foldMany f $ Stream.fromList "hello\nworld" +-- > ["hello\n","world"] +-- +-- /Internal/ +-- +{-# INLINE foldMany #-} +foldMany + :: (IsStream t, Monad m) + => Fold m a b + -> t m a + -> t m b +foldMany f m = D.fromStreamD $ D.foldMany f (D.toStreamD m) + +-- | Apply a stream of folds to an input stream and emit the results in the +-- output stream. +-- +-- /Internal/ +-- +{-# INLINE foldSequence #-} +foldSequence + :: -- (IsStream t, Monad m) => + t m (Fold m a b) + -> t m a + -> t m b +foldSequence _f _m = undefined + +-- | Iterate a fold generator on a stream. The initial value @b@ is used to +-- generate the first fold, the fold is applied on the stream and the result of +-- the fold is used to generate the next fold and so on. +-- +-- >>> f x = Fold.ltake 2 (Fold.mconcatTo x) +-- >>> s = Stream.map Sum $ Stream.fromList [1..10] +-- >>> Stream.toList $ Stream.map getSum $ Stream.foldIterate f 0 s +-- > [3,10,21,36,55,55] +-- +-- This is the streaming equivalent of monad like sequenced application of +-- folds where next fold is dependent on the previous fold. +-- +-- /Internal/ +-- +{-# INLINE foldIterate #-} +foldIterate + :: -- (IsStream t, Monad m) => + (b -> Fold m a b) + -> b + -> t m a + -> t m b +foldIterate _f _i _m = undefined +-- D.fromStreamD $ D.foldIterate f i (D.toStreamD m) + -- | Apply a 'Parser' repeatedly on a stream and emit the parsed values in the -- output stream. -- @@ -3657,7 +3769,7 @@ iterateMapLeftsWith combine f = iterateMapWith combine (either f (const K.nil)) -- >>> S.toList $ S.parseMany (PR.line FL.toList) $ S.fromList "hello\nworld" -- > ["hello\n","world"] -- --- /Internal +-- /Internal/ -- {-# INLINE parseMany #-} parseMany @@ -3677,6 +3789,19 @@ parseManyD parseManyD p m = D.fromStreamD $ D.parseMany p (D.toStreamD m) +-- | Apply a stream of parsers to an input stream and emit the results in the +-- output stream. +-- +-- /Internal/ +-- +{-# INLINE parseSequence #-} +parseSequence + :: -- (IsStream t, Monad m) => + t m (Parser m a b) + -> t m a + -> t m b +parseSequence _f _m = undefined + -- | @parseManyTill collect test stream@ tries the parser @test@ on the input, -- if @test@ fails it backtracks and tries @collect@, after @collect@ succeeds -- @test@ is tried again and so on. The parser stops when @test@ succeeds. The @@ -3786,7 +3911,7 @@ groupScan split fold m = undefined chunksOf :: (IsStream t, Monad m) => Int -> Fold m a b -> t m a -> t m b -chunksOf n f m = D.fromStreamD $ D.groupsOf n f (D.toStreamD m) +chunksOf n f = foldMany (FL.ltake n f) -- | -- @@ -3831,8 +3956,6 @@ intervalsOf n f xs = -- N-ary APIs ------------------------------------------------------------------------------ --- XXX We should probably change the order of the comparision and update the --- docs accordingly. -- | @groupsBy cmp f $ S.fromList [a,b,c,...]@ assigns the element @a@ to the -- first group, if @b \`cmp` a@ is 'True' then @b@ is also assigned to the same -- group. If @c \`cmp` a@ is 'True' then @c@ is also assigned to the same @@ -3945,8 +4068,7 @@ groups = groupsBy (==) splitOn :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b -splitOn predicate f m = - D.fromStreamD $ D.splitBy predicate f (D.toStreamD m) +splitOn predicate f = foldMany (FL.sliceSepBy predicate f) -- | Like 'splitOn' but the separator is considered as suffixed to the segments -- in the stream. A missing suffix at the end is allowed. A separator at the @@ -3998,7 +4120,7 @@ splitOnSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b splitOnSuffix predicate f m = - D.fromStreamD $ D.splitSuffixBy predicate f (D.toStreamD m) + D.fromStreamD $ D.foldMany1 (FL.sliceSepBy predicate f) (D.toStreamD m) -- | Like 'splitOn' after stripping leading, trailing, and repeated separators. -- Therefore, @".a..b."@ with '.' as the separator would be parsed as @@ -4068,7 +4190,7 @@ splitWithSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b splitWithSuffix predicate f m = - D.fromStreamD $ D.splitSuffixWith predicate f (D.toStreamD m) + D.fromStreamD $ D.foldMany1 (FL.sliceEndWith predicate f) (D.toStreamD m) ------------------------------------------------------------------------------ -- Split on a delimiter sequence @@ -4758,13 +4880,12 @@ classifySessionsBy tick tmout reset ejectPred -- better performance. -- let curTime = max sessionEventTime timestamp - extractOld v = - case v of - Nothing -> initial - Just (Tuple' _ acc) -> return acc mOld = Map.lookup key sessionKeyValueMap - fs <- extractOld mOld + fs <- + case mOld of + Nothing -> initial + Just (Tuple' _ acc) -> return acc res <- step fs value case res of FL.Done fb -> do diff --git a/src/Streamly/Internal/Data/Stream/StreamD.hs b/src/Streamly/Internal/Data/Stream/StreamD.hs index 8cbfd4057..aa0a64d9a 100644 --- a/src/Streamly/Internal/Data/Stream/StreamD.hs +++ b/src/Streamly/Internal/Data/Stream/StreamD.hs @@ -103,6 +103,8 @@ module Streamly.Internal.Data.Stream.StreamD , foldlx' , foldlMx' , foldOnce + , foldMany + , foldMany1 , parselMx' , parseMany @@ -157,16 +159,12 @@ module Streamly.Internal.Data.Stream.StreamD , interpose -- ** Grouping - , groupsOf , groupsOf2 , groupsBy , groupsRollingBy -- ** Splitting - , splitBy - , splitSuffixBy , wordsBy - , splitSuffixWith , splitOnSeq , splitOnSuffixSeq @@ -1519,11 +1517,6 @@ reverse' m = -- Grouping/Splitting ------------------------------------------------------------------------------ -{-# INLINE_NORMAL splitSuffixWith #-} -splitSuffixWith :: Monad m - => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b -splitSuffixWith predicate f = foldMany1 (FL.sliceEndWith predicate f) - {-# INLINE_NORMAL groupsBy #-} groupsBy :: Monad m => (a -> a -> Bool) @@ -1540,11 +1533,11 @@ groupsBy cmp f (Stream step state) = Stream (stepOuter f) (Just state, Nothing) case res of Yield x s -> do fs <- initial - sfs <- fstep fs x - case sfs of + r <- fstep fs x + case r of FL.Partial fs1 -> go SPEC x s fs1 - FL.Done fb -> return $ Yield fb (Just s, Just x) - Skip s -> return $ Skip $ (Just s, Nothing) + FL.Done b -> return $ Yield b (Just s, Just x) + Skip s -> return $ Skip (Just s, Nothing) Stop -> return Stop where @@ -1555,19 +1548,19 @@ groupsBy cmp f (Stream step state) = Stream (stepOuter f) (Just state, Nothing) Yield x s -> do if cmp x prev then do - sfs <- fstep acc x - case sfs of + r <- fstep acc x + case r of FL.Partial fs1 -> go SPEC prev s fs1 - FL.Done fb -> return $ Yield fb (Just s, Just x) + FL.Done b -> return $ Yield b (Just s, Just x) else done acc >>= \r -> return $ Yield r (Just s, Just x) Skip s -> go SPEC prev s acc Stop -> done acc >>= \r -> return $ Yield r (Nothing, Nothing) stepOuter (Fold fstep initial done) gst (Just st, Just prev) = do fs <- initial - sfs <- fstep fs prev - case sfs of + r <- fstep fs prev + case r of FL.Partial fs1 -> go SPEC st fs1 - FL.Done fb -> return $ Yield fb (Just st, Nothing) + FL.Done b -> return $ Yield b (Just st, Nothing) where @@ -1578,10 +1571,10 @@ groupsBy cmp f (Stream step state) = Stream (stepOuter f) (Just state, Nothing) Yield x s -> do if cmp x prev then do - sfs <- fstep acc x - case sfs of + r <- fstep acc x + case r of FL.Partial fs1 -> go SPEC s fs1 - FL.Done fb -> return $ Yield fb (Just s, Just x) + FL.Done b -> return $ Yield b (Just s, Just x) else done acc >>= \r -> return $ Yield r (Just s, Just x) Skip s -> go SPEC s acc Stop -> done acc >>= \r -> return $ Yield r (Nothing, Nothing) @@ -1604,10 +1597,10 @@ groupsRollingBy cmp f (Stream step state) = case res of Yield x s -> do fs <- initial - sfs <- fstep fs x - case sfs of + r <- fstep fs x + case r of FL.Partial fs1 -> go SPEC x s fs1 - FL.Done fb -> return $ Yield fb (Just s, Just x) + FL.Done b -> return $ Yield b (Just s, Just x) Skip s -> return $ Skip $ (Just s, Nothing) Stop -> return Stop @@ -1619,19 +1612,19 @@ groupsRollingBy cmp f (Stream step state) = Yield x s -> do if cmp prev x then do - sfs <- fstep acc x - case sfs of + r <- fstep acc x + case r of FL.Partial fs1 -> go SPEC x s fs1 - FL.Done fb -> return $ Yield fb (Just s, Just x) + FL.Done b -> return $ Yield b (Just s, Just x) else done acc >>= \r -> return $ Yield r (Just s, Just x) Skip s -> go SPEC prev s acc Stop -> done acc >>= \r -> return $ Yield r (Nothing, Nothing) stepOuter (Fold fstep initial done) gst (Just st, Just prev') = do fs <- initial - sfs <- fstep fs prev' - case sfs of + r <- fstep fs prev' + case r of FL.Partial fs1 -> go SPEC prev' st fs1 - FL.Done fb -> return $ Yield fb (Just st, Nothing) + FL.Done b -> return $ Yield b (Just st, Nothing) where @@ -1641,24 +1634,15 @@ groupsRollingBy cmp f (Stream step state) = Yield x s -> do if cmp prevv x then do - sfs <- fstep acc x - case sfs of + r <- fstep acc x + case r of FL.Partial fs1 -> go SPEC x s fs1 - FL.Done fb -> return $ Yield fb (Just s, Just x) + FL.Done b -> return $ Yield b (Just s, Just x) else done acc >>= \r -> return $ Yield r (Just s, Just x) Skip s -> go SPEC prevv s acc Stop -> done acc >>= \r -> return $ Yield r (Nothing, Nothing) stepOuter _ _ (Nothing, _) = return Stop -{-# INLINE_NORMAL splitBy #-} -splitBy :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b -splitBy predicate f = foldMany (FL.sliceSepBy predicate f) - -{-# INLINE_NORMAL splitSuffixBy #-} -splitSuffixBy :: Monad m - => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b -splitSuffixBy predicate f = foldMany1 (FL.sliceSepBy predicate f) - data WordsByState s = WordsByJust s | WordsByNothing {-# INLINE_NORMAL wordsBy #-} @@ -1677,10 +1661,10 @@ wordsBy predicate f (Stream step state) = then return $ Skip (WordsByJust s) else do fs <- initial - sfs <- fstep fs x - case sfs of + r <- fstep fs x + case r of FL.Partial fs1 -> go SPEC s fs1 - FL.Done fb -> return $ Yield fb (WordsByJust s) + FL.Done b -> return $ Yield b (WordsByJust s) Skip s -> return $ Skip $ WordsByJust s Stop -> return Stop @@ -1694,10 +1678,10 @@ wordsBy predicate f (Stream step state) = if predicate x then done acc >>= \r -> return $ Yield r (WordsByJust s) else do - sfs <- fstep acc x - case sfs of + r <- fstep acc x + case r of FL.Partial fs1 -> go SPEC s fs1 - FL.Done fb -> return $ Yield fb (WordsByJust s) + FL.Done b -> return $ Yield b (WordsByJust s) Skip s -> go SPEC s acc Stop -> done acc >>= \r -> return $ Yield r WordsByNothing @@ -1808,13 +1792,13 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) = res <- step (adaptState gst) st case res of Yield x s -> do - acc <- initial - sfs <- fstep acc x - case sfs of - FL.Partial acc1 -> do - r <- done acc1 - skip $ SplitOnSeqYield r (SplitOnSeqEmpty s) - FL.Done fb -> skip $ SplitOnSeqYield fb (SplitOnSeqEmpty s) + fs <- initial + r <- fstep fs x + case r of + FL.Partial fs1 -> do + b <- done fs1 + skip $ SplitOnSeqYield b (SplitOnSeqEmpty s) + FL.Done b -> skip $ SplitOnSeqYield b (SplitOnSeqEmpty s) Skip s -> return $ Skip (SplitOnSeqEmpty s) Stop -> return Stop @@ -1839,12 +1823,12 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) = fs1 <- initial return $ Skip $ SplitOnSeqYield r (SplitOnSeqSingle fs1 s pat) else do - sfs <- fstep fs x - case sfs of - FL.Partial fs1 -> skip $ (SplitOnSeqSingle fs1 s pat) - FL.Done fb -> do + r <- fstep fs x + case r of + FL.Partial fs1 -> skip $ SplitOnSeqSingle fs1 s pat + FL.Done b -> do fs1 <- initial - skip $ SplitOnSeqYield fb (SplitOnSeqSingle fs1 s pat) + skip $ SplitOnSeqYield b (SplitOnSeqSingle fs1 s pat) Skip s -> return $ Skip $ SplitOnSeqSingle fs s pat Stop -> do r <- done fs @@ -1859,13 +1843,13 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) = skip $ SplitOnSeqYield r SplitOnSeqDone stepOuter _ (SplitOnSeqWordDone n fs wrd) = do let old = elemMask .&. (wrd `shiftR` (elemBits * (n - 1))) - sfs <- fstep fs (toEnum $ fromIntegral old) - case sfs of + r <- fstep fs (toEnum $ fromIntegral old) + case r of FL.Partial fs1 -> skip $ SplitOnSeqWordDone (n - 1) fs1 wrd - FL.Done r -> do + FL.Done b -> do fs1 <- initial let next = SplitOnSeqWordDone (n - 1) fs1 wrd - skip $ SplitOnSeqYield r next + skip $ SplitOnSeqYield b next stepOuter gst (SplitOnSeqWordInit st0) = go SPEC 0 0 st0 @@ -1910,18 +1894,18 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) = let wrd1 = addToWord wrd x old = (wordMask .&. wrd) `shiftR` (elemBits * (patLen - 1)) - sfs <- fstep fs (toEnum $ fromIntegral old) - case sfs of + r <- fstep fs (toEnum $ fromIntegral old) + case r of FL.Partial fs1 -> do if wrd1 .&. wordMask == wordPat then do - r <- done fs1 + b <- done fs1 let next = SplitOnSeqWordInit s - skip $ SplitOnSeqYield r next + skip $ SplitOnSeqYield b next else go SPEC wrd1 s fs1 - FL.Done r -> + FL.Done b -> let next = SplitOnSeqWordInit s - in skip $ SplitOnSeqYield r next + in skip $ SplitOnSeqYield b next Skip s -> go SPEC wrd s fs Stop -> skip $ SplitOnSeqWordDone patLen fs wrd @@ -1963,16 +1947,16 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) = Yield x s -> do old <- liftIO $ peek rh let cksum1 = deltaCksum cksum old x - sfs <- fstep fs old - case sfs of + r <- fstep fs old + case r of FL.Partial fs1 -> do rh1 <- liftIO (RB.unsafeInsert rb rh x) - if (cksum1 == patHash) + if cksum1 == patHash then skip $ SplitOnSeqKRCheck fs1 s rb rh1 else go SPEC fs1 s rh1 cksum1 - FL.Done r -> + FL.Done b -> let next = SplitOnSeqKRInit 0 s rb (RB.startOf rb) - in skip $ SplitOnSeqYield r next + in skip $ SplitOnSeqYield b next Skip s -> go SPEC fs s rh cksum Stop -> skip $ SplitOnSeqKRDone patLen fs rb rh @@ -2015,13 +1999,13 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) = stepOuter _ (SplitOnSeqKRDone n fs rb rh) = do old <- liftIO $ peek rh let rh1 = RB.advance rb rh - sfs <- fstep fs old - case sfs of + r <- fstep fs old + case r of FL.Partial fs1 -> skip $ SplitOnSeqKRDone (n - 1) fs1 rb rh1 - FL.Done r -> do + FL.Done b -> do fs1 <- initial let next = SplitOnSeqKRDone (n - 1) fs1 rb rh1 - skip $ SplitOnSeqYield r next + skip $ SplitOnSeqYield b next {-# ANN type SplitOnSuffixSeqState Fuse #-} data SplitOnSuffixSeqState rb rh ck w fs s b x = @@ -2078,22 +2062,22 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) = processYieldSingle pat x s fs = if pat == x then do - sfs <- if withSep then fstep fs x else return $ FL.Partial fs - case sfs of + r <- if withSep then fstep fs x else return $ FL.Partial fs + case r of FL.Partial fs1 -> do - r <- done fs1 + b <- done fs1 let next = SplitOnSuffixSeqSingleInit s pat - skip $ SplitOnSuffixSeqYield r next - FL.Done r -> + skip $ SplitOnSuffixSeqYield b next + FL.Done b -> let next = SplitOnSuffixSeqSingleInit s pat - in skip $ SplitOnSuffixSeqYield r next + in skip $ SplitOnSuffixSeqYield b next else do - sfs <- fstep fs x - case sfs of + r <- fstep fs x + case r of FL.Partial fs1 -> skip $ SplitOnSuffixSeqSingle fs1 s pat - FL.Done r -> + FL.Done b -> let next = SplitOnSuffixSeqSingleInit s pat - in skip $ SplitOnSuffixSeqYield r next + in skip $ SplitOnSuffixSeqYield b next -- For Rabin-Karp search k = 2891336453 :: Word32 @@ -2135,13 +2119,13 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) = case res of Yield x s -> do acc <- initial - sfs <- fstep acc x - case sfs of - FL.Partial acc1 -> do - r <- done acc1 - skip $ SplitOnSuffixSeqYield r (SplitOnSuffixSeqEmpty s) - FL.Done r -> - skip $ SplitOnSuffixSeqYield r (SplitOnSuffixSeqEmpty s) + r <- fstep acc x + case r of + FL.Partial fs -> do + b <- done fs + skip $ SplitOnSuffixSeqYield b (SplitOnSuffixSeqEmpty s) + FL.Done b -> + skip $ SplitOnSuffixSeqYield b (SplitOnSuffixSeqEmpty s) Skip s -> skip (SplitOnSuffixSeqEmpty s) Stop -> return Stop @@ -2180,13 +2164,13 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) = skip $ SplitOnSuffixSeqYield r SplitOnSuffixSeqDone stepOuter _ (SplitOnSuffixSeqWordDone n fs wrd) = do let old = elemMask .&. (wrd `shiftR` (elemBits * (n - 1))) - sfs <- fstep fs (toEnum $ fromIntegral old) - case sfs of + r <- fstep fs (toEnum $ fromIntegral old) + case r of FL.Partial fs1 -> skip $ SplitOnSuffixSeqWordDone (n - 1) fs1 wrd - FL.Done r -> do + FL.Done b -> do fs1 <- initial let next = SplitOnSuffixSeqWordDone (n - 1) fs1 wrd - skip $ SplitOnSuffixSeqYield r next + skip $ SplitOnSuffixSeqYield b next stepOuter gst (SplitOnSuffixSeqWordInit st0) = do res <- step (adaptState gst) st0 @@ -2194,12 +2178,12 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) = Yield x s -> do fs <- initial let wrd = addToWord 0 x - sfs <- if withSep then fstep fs x else return $ FL.Partial fs - case sfs of + r <- if withSep then fstep fs x else return $ FL.Partial fs + case r of FL.Partial fs1 -> go SPEC 1 wrd s fs1 - FL.Done r -> do + FL.Done b -> do let next = SplitOnSuffixSeqWordInit s - in skip $ SplitOnSuffixSeqYield r next + in skip $ SplitOnSuffixSeqYield b next Skip s -> skip (SplitOnSuffixSeqWordInit s) Stop -> return Stop @@ -2211,20 +2195,20 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) = case res of Yield x s -> do let wrd1 = addToWord wrd x - sfs <- if withSep then fstep fs x else return $ FL.Partial fs - case sfs of + r <- if withSep then fstep fs x else return $ FL.Partial fs + case r of FL.Partial fs1 -> if idx /= maxIndex then go SPEC (idx + 1) wrd1 s fs1 else if wrd1 .&. wordMask /= wordPat then skip $ SplitOnSuffixSeqWordLoop wrd1 s fs1 else do - r <- done fs + b <- done fs let next = SplitOnSuffixSeqWordInit s - skip $ SplitOnSuffixSeqYield r next - FL.Done r -> + skip $ SplitOnSuffixSeqYield b next + FL.Done b -> let next = SplitOnSuffixSeqWordInit s - in skip $ SplitOnSuffixSeqYield r next + in skip $ SplitOnSuffixSeqYield b next Skip s -> go SPEC idx wrd s fs Stop -> skip $ SplitOnSuffixSeqWordDone idx fs wrd @@ -2241,21 +2225,21 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) = let wrd1 = addToWord wrd x old = (wordMask .&. wrd) `shiftR` (elemBits * (patLen - 1)) - sfs <- + r <- if withSep then fstep fs x else fstep fs (toEnum $ fromIntegral old) - case sfs of + case r of FL.Partial fs1 -> if wrd1 .&. wordMask == wordPat then do - r <- done fs1 + b <- done fs1 let next = SplitOnSuffixSeqWordInit s - skip $ SplitOnSuffixSeqYield r next + skip $ SplitOnSuffixSeqYield b next else go SPEC wrd1 s fs1 - FL.Done r -> + FL.Done b -> let next = SplitOnSuffixSeqWordInit s - in skip $ SplitOnSuffixSeqYield r next + in skip $ SplitOnSuffixSeqYield b next Skip s -> go SPEC wrd s fs Stop -> if wrd .&. wordMask == wordPat @@ -2276,12 +2260,13 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) = Yield x s -> do rh1 <- liftIO $ RB.unsafeInsert rb rh0 x fs <- initial - sfs <- if withSep then fstep fs x else return $ FL.Partial fs - case sfs of - FL.Partial fs1 -> skip $ SplitOnSuffixSeqKRInit1 fs1 s rb rh1 - FL.Done r -> + r <- if withSep then fstep fs x else return $ FL.Partial fs + case r of + FL.Partial fs1 -> + skip $ SplitOnSuffixSeqKRInit1 fs1 s rb rh1 + FL.Done b -> let next = SplitOnSuffixSeqKRInit 0 s rb (RB.startOf rb) - in skip $ SplitOnSuffixSeqYield r next + in skip $ SplitOnSuffixSeqYield b next Skip s -> skip $ SplitOnSuffixSeqKRInit idx0 s rb rh0 Stop -> return Stop @@ -2295,8 +2280,8 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) = case res of Yield x s -> do rh1 <- liftIO (RB.unsafeInsert rb rh x) - sfs <- if withSep then fstep fs x else return $ FL.Partial fs - case sfs of + r <- if withSep then fstep fs x else return $ FL.Partial fs + case r of FL.Partial fs1 -> if idx /= maxIndex then go SPEC (idx + 1) rh1 s fs1 @@ -2305,10 +2290,12 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) = !ringHash = fold addCksum 0 rb in if ringHash == patHash then SplitOnSuffixSeqKRCheck fs1 s rb rh1 - else SplitOnSuffixSeqKRLoop fs1 s rb rh1 ringHash - FL.Done r -> - let next = SplitOnSuffixSeqKRInit 0 st rb (RB.startOf rb) - in skip $ SplitOnSuffixSeqYield r next + else SplitOnSuffixSeqKRLoop + fs1 s rb rh1 ringHash + FL.Done b -> + let next = SplitOnSuffixSeqKRInit + 0 st rb (RB.startOf rb) + in skip $ SplitOnSuffixSeqYield b next Skip s -> go SPEC idx rh s fs Stop -> do -- do not issue a blank segment when we end at pattern @@ -2332,15 +2319,16 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) = old <- liftIO $ peek rh rh1 <- liftIO (RB.unsafeInsert rb rh x) let cksum1 = deltaCksum cksum old x - sfs <- if withSep then fstep fs x else fstep fs old - case sfs of + r <- if withSep then fstep fs x else fstep fs old + case r of FL.Partial fs1 -> if (cksum1 /= patHash) then go SPEC fs1 s rh1 cksum1 else skip $ SplitOnSuffixSeqKRCheck fs1 s rb rh1 - FL.Done r -> - let next = SplitOnSuffixSeqKRInit 0 st rb (RB.startOf rb) - in skip $ SplitOnSuffixSeqYield r next + FL.Done b -> + let next = SplitOnSuffixSeqKRInit + 0 st rb (RB.startOf rb) + in skip $ SplitOnSuffixSeqYield b next Skip s -> go SPEC fs s rh cksum Stop -> if RB.unsafeEqArray rb rh patArr @@ -2365,13 +2353,13 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) = stepOuter _ (SplitOnSuffixSeqKRDone n fs rb rh) = do old <- liftIO $ peek rh let rh1 = RB.advance rb rh - sfs <- fstep fs old - case sfs of + r <- fstep fs old + case r of FL.Partial fs1 -> skip $ SplitOnSuffixSeqKRDone (n - 1) fs1 rb rh1 - FL.Done r -> do + FL.Done b -> do fs1 <- initial let next = SplitOnSuffixSeqKRDone (n - 1) fs1 rb rh1 - skip $ SplitOnSuffixSeqYield r next + skip $ SplitOnSuffixSeqYield b next {-# ANN type SplitState Fuse #-} data SplitState s arr @@ -3765,12 +3753,10 @@ scanlMx' fstep begin done s = (begin >>= \x -> x `seq` done x) `consM` postscanlMx' fstep begin done s -data PostScanState fsM s a = PostScan s !fsM +data PostScanState s f = PostScan s !f --- XXX PostScanWith is can be eliminated if we abstract Yield {-# INLINE_NORMAL postscanOnce #-} -postscanOnce :: Monad m - => FL.Fold m a b -> Stream m a -> Stream m b +postscanOnce :: Monad m => FL.Fold m a b -> Stream m a -> Stream m b postscanOnce (FL.Fold fstep begin done) (Stream step state) = Stream step' (PostScan state begin) @@ -3780,15 +3766,14 @@ postscanOnce (FL.Fold fstep begin done) (Stream step state) = step' gst (PostScan st acc) = do r <- step (adaptState gst) st case r of - -- XXX Move Yield to a common function Yield x s -> do old <- acc - y <- fstep old x - case y of - FL.Partial sres -> do - !v <- done sres - return $ Yield v $ PostScan s (return sres) - FL.Done _ -> return $ Stop + res <- fstep old x + case res of + FL.Partial fs -> do + !v <- done fs + return $ Yield v $ PostScan s (return fs) + FL.Done _ -> return Stop Skip s -> return $ Skip $ PostScan s acc Stop -> return Stop @@ -4027,17 +4012,17 @@ tap (Fold fstep initial extract) (Stream step state) = Stream step' TapInit step' gst (Tapping acc st) = do r <- step gst st case r of - -- XXX Abstract Yield? Yield x s -> do - acc1 <- fstep acc x + res <- fstep acc x return - $ case acc1 of - FL.Partial sres -> Yield x (Tapping sres s) - FL.Done _ -> Yield x (TapDone s) + $ Yield x + $ case res of + FL.Partial fs -> Tapping fs s + FL.Done _ -> TapDone s Skip s -> return $ Skip (Tapping acc s) Stop -> do void $ extract acc - return $ Stop + return Stop step' gst (TapDone st) = do r <- step gst st return @@ -4067,21 +4052,22 @@ tapOffsetEvery offset n (Fold fstep initial extract) (Stream step state) = step' gst (TapOffTapping acc st count) = do r <- step gst st case r of - Yield x s -> - if count <= 0 - -- XXX Abstract the then branch? - then do - acc1 <- fstep acc x - return - $ case acc1 of - FL.Partial sres -> - Yield x $ TapOffTapping sres s (n - 1) - FL.Done _ -> Yield x (TapOffDone s) - else return $ Yield x $ TapOffTapping acc s (count - 1) + Yield x s -> do + next <- + if count <= 0 + then do + res <- fstep acc x + return + $ case res of + FL.Partial sres -> + TapOffTapping sres s (n - 1) + FL.Done _ -> TapOffDone s + else return $ TapOffTapping acc s (count - 1) + return $ Yield x next Skip s -> return $ Skip (TapOffTapping acc s count) Stop -> do void $ extract acc - return $ Stop + return Stop step' gst (TapOffDone st) = do r <- step gst st return diff --git a/src/Streamly/Internal/Data/Stream/StreamD/Type.hs b/src/Streamly/Internal/Data/Stream/StreamD/Type.hs index c662b3dbd..fcd8f9702 100644 --- a/src/Streamly/Internal/Data/Stream/StreamD/Type.hs +++ b/src/Streamly/Internal/Data/Stream/StreamD/Type.hs @@ -48,7 +48,6 @@ module Streamly.Internal.Data.Stream.StreamD.Type , GroupState (..) -- for inspection testing , foldMany , foldMany1 - , groupsOf , groupsOf2 ) where @@ -66,8 +65,8 @@ import Fusion.Plugin.Types (Fuse(..)) import Streamly.Internal.Data.SVar (State(..), adaptState, defState) import Streamly.Internal.Data.Fold.Types (Fold(..), Fold2(..)) -import qualified Streamly.Internal.Data.Stream.StreamK as K import qualified Streamly.Internal.Data.Fold.Types as FL +import qualified Streamly.Internal.Data.Stream.StreamK as K ------------------------------------------------------------------------------ -- The direct style stream type @@ -589,12 +588,10 @@ take n (Stream step state) = n `seq` Stream step' (state, 0) {-# ANN type GroupState Fuse #-} data GroupState s fs b a = GroupStart s - | GroupConsume s fs a | GroupBuffer s fs | GroupYield b (GroupState s fs b a) | GroupFinish --- XXX Remove GroupConsume {-# INLINE_NORMAL foldMany #-} foldMany :: Monad m => Fold m a b -> Stream m a -> Stream m b foldMany (Fold fstep initial extract) (Stream step state) = @@ -607,15 +604,16 @@ foldMany (Fold fstep initial extract) (Stream step state) = -- fs = fold state fs <- initial return $ Skip (GroupBuffer st fs) - step' _ (GroupConsume st fs x) = do - fs' <- fstep fs x - case fs' of - FL.Done b -> return $ Skip (GroupYield b (GroupStart st)) - FL.Partial ps -> return $ Skip (GroupBuffer st ps) step' gst (GroupBuffer st fs) = do r <- step (adaptState gst) st case r of - Yield x s -> return $ Skip $ GroupConsume s fs x + Yield x s -> do + res <- fstep fs x + return + $ Skip + $ case res of + FL.Done b -> GroupYield b (GroupStart s) + FL.Partial ps -> GroupBuffer s ps Skip s -> return $ Skip (GroupBuffer s fs) Stop -> do b <- extract fs @@ -630,25 +628,26 @@ foldMany1 (Fold fstep initial extract) (Stream step state) = where + {-# INLINE consume #-} + consume x s fs = do + res <- fstep fs x + return + $ Skip + $ case res of + FL.Done b -> GroupYield b (GroupStart s) + FL.Partial ps -> GroupBuffer s ps + {-# INLINE_LATE step' #-} step' gst (GroupStart st) = do - -- fs = fold state r <- step (adaptState gst) st case r of - Yield x s -> do - fi <- initial - return $ Skip $ GroupConsume s fi x + Yield x s -> initial >>= consume x s Skip s -> return $ Skip (GroupStart s) - Stop -> return $ Stop - step' _ (GroupConsume st fs x) = do - fs' <- fstep fs x - case fs' of - FL.Done b -> return $ Skip (GroupYield b (GroupStart st)) - FL.Partial ps -> return $ Skip (GroupBuffer st ps) + Stop -> return Stop step' gst (GroupBuffer st fs) = do r <- step (adaptState gst) st case r of - Yield x s -> return $ Skip $ GroupConsume s fs x + Yield x s -> consume x s fs Skip s -> return $ Skip (GroupBuffer s fs) Stop -> do b <- extract fs @@ -656,11 +655,6 @@ foldMany1 (Fold fstep initial extract) (Stream step state) = step' _ (GroupYield b next) = return $ Yield b next step' _ GroupFinish = return Stop --- XXX Investigate performance -{-# INLINE groupsOf #-} -groupsOf :: Monad m => Int -> Fold m a b -> Stream m a -> Stream m b -groupsOf n fld = foldMany (FL.ltake n fld) - data GroupState2 s fs = GroupStart2 s | GroupBuffer2 s fs Int