Refactor StreamD/IsStream for terminating folds

* IsStream:
  * Add foldMany
  * Update haddock docs, with fold equivalents. Now with terminating folds the
    fold behvaior is equivalent to these folds.
  * Add skeletons for foldSequence/foldIterate/parseSequence
  * Implement combinators directly in terms of foldMany/foldMany1

StreamD:
  * export foldMany/foldMany1
  * Remove groupsOf/splitBy/splitSuffixBy/splitSuffixWith, we can use
    foldMany/foldMany1 directly instead.
  * Make some stylistic changes to code

StreamD/Types:
  * Remove groupsOf
  * Remove GroupConsume state from foldMany/foldMany1

* Add benchmarks for foldMany
This commit is contained in:
Harendra Kumar 2020-12-10 19:11:45 +00:00
parent c191f9c488
commit 82efd3a5bc
6 changed files with 388 additions and 257 deletions

View File

@ -32,18 +32,17 @@ import GHC.Magic (noinline)
#endif
import System.IO (Handle)
import qualified Streamly.Data.Fold as FL
import qualified Streamly.Unicode.Stream as SS
import qualified Streamly.FileSystem.Handle as FH
import qualified Streamly.Internal.Data.Parser as PR
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Unicode.Stream as IUS
import qualified Streamly.Internal.FileSystem.Handle as IFH
import qualified Streamly.Internal.Data.Array.Storable.Foreign as A
import qualified Streamly.Internal.Data.Array.Storable.Foreign.Types as AT
import qualified Streamly.Internal.Memory.ArrayStream as AS
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Parser as PR
import qualified Streamly.Internal.Data.Stream.IsStream as IP
import qualified Streamly.Internal.FileSystem.Handle as IFH
import qualified Streamly.Internal.Memory.ArrayStream as AS
import qualified Streamly.Internal.Unicode.Stream as IUS
import qualified Streamly.Prelude as S
import qualified Streamly.Unicode.Stream as SS
import Gauge hiding (env)
import Prelude hiding (last, length)
@ -52,6 +51,7 @@ import Streamly.Benchmark.Common.Handle
#ifdef INSPECTION
import Streamly.Internal.Data.Stream.StreamD.Type (Step(..), GroupState)
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Unfold as IUF
import Test.Inspection
@ -309,9 +309,14 @@ o_1_space_reduce_toBytes env =
chunksOfSum :: Int -> Handle -> IO Int
chunksOfSum n inh = S.length $ S.chunksOf n FL.sum (S.unfold FH.read inh)
foldManyChunksOfSum :: Int -> Handle -> IO Int
foldManyChunksOfSum n inh =
S.length $ IP.foldMany (FL.ltake n FL.sum) (S.unfold FH.read inh)
parseManyChunksOfSum :: Int -> Handle -> IO Int
parseManyChunksOfSum n inh =
S.length $ IP.parseMany (PR.take n FL.sum) (S.unfold FH.read inh)
S.length
$ IP.parseMany (PR.fromFold $ FL.ltake n FL.sum) (S.unfold FH.read inh)
-- XXX investigate why we need an INLINE in this case (GHC)
-- Even though allocations remain the same in both cases inlining improves time
@ -333,25 +338,6 @@ inspect $ 'chunksOf `hasNoType` ''IUF.ConcatState -- FH.read/UF.concat
inspect $ 'chunksOf `hasNoType` ''A.ReadUState -- FH.read/A.read
#endif
-- This is to make sure that the concatMap in FH.read, groupsOf and foldlM'
-- together can fuse.
--
-- | Slice in chunks of size n and get the count of chunks.
_chunksOfD :: Int -> Handle -> IO Int
_chunksOfD n inh =
D.foldlM' (\i _ -> return $ i + 1) (return 0)
$ D.groupsOf n (AT.writeNUnsafe n)
$ D.fromStreamK (S.unfold FH.read inh)
#ifdef INSPECTION
inspect $ hasNoTypeClasses '_chunksOfD
inspect $ '_chunksOfD `hasNoType` ''Step
inspect $ '_chunksOfD `hasNoType` ''GroupState
inspect $ '_chunksOfD `hasNoType` ''AT.ArrayUnsafe -- AT.writeNUnsafe
inspect $ '_chunksOfD `hasNoType` ''IUF.ConcatState -- FH.read/UF.concat
inspect $ '_chunksOfD `hasNoType` ''A.ReadUState -- FH.read/A.read
#endif
o_1_space_reduce_read_grouped :: BenchEnv -> [Benchmark]
o_1_space_reduce_read_grouped env =
[ bgroup "reduce/read/chunks"
@ -363,11 +349,22 @@ o_1_space_reduce_read_grouped env =
-- XXX investigate why we need inline/noinline in these cases (GHC)
-- Chunk using parsers
, mkBenchSmall ("S.parseMany (PR.take " ++ show (bigSize env) ++ " FL.sum)")
env $ \inh _ ->
noinline parseManyChunksOfSum (bigSize env) inh
, mkBench "S.parseMany (PR.take 1 FL.sum)" env $ \inh _ ->
inline parseManyChunksOfSum 1 inh
, mkBenchSmall
("S.foldMany (FL.take " ++ show (bigSize env) ++ " FL.sum)")
env
$ \inh _ -> noinline foldManyChunksOfSum (bigSize env) inh
, mkBench
"S.foldMany (FL.take 1 FL.sum)"
env
$ \inh _ -> inline foldManyChunksOfSum 1 inh
, mkBenchSmall
("S.parseMany (FL.take " ++ show (bigSize env) ++ " FL.sum)")
env
$ \inh _ -> noinline parseManyChunksOfSum (bigSize env) inh
, mkBench
"S.parseMany (FL.take 1 FL.sum)"
env
$ \inh _ -> inline parseManyChunksOfSum 1 inh
-- folding chunks to arrays
, mkBenchSmall "S.arraysOf 1" env $ \inh _ ->

View File

@ -24,13 +24,13 @@ import Data.Char (ord)
import Data.Word (Word8)
import System.IO (Handle)
import qualified Streamly.Data.Fold as FL
import qualified Streamly.FileSystem.Handle as FH
import qualified Streamly.Internal.Data.Parser as PR
import qualified Streamly.Internal.Unicode.Stream as IUS
import qualified Streamly.Internal.FileSystem.Handle as IFH
import qualified Streamly.Internal.Data.Array.Storable.Foreign as A
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Parser as PR
import qualified Streamly.Internal.Data.Stream.IsStream as IP
import qualified Streamly.Internal.FileSystem.Handle as IFH
import qualified Streamly.Internal.Unicode.Stream as IUS
import qualified Streamly.Prelude as S
import Gauge hiding (env)
@ -95,11 +95,23 @@ inspect $ 'splitWithSuffix `hasNoType` ''IUF.ConcatState -- FH.read/UF.concat
inspect $ 'splitWithSuffix `hasNoType` ''A.ReadUState -- FH.read/A.read
#endif
-- | Split on line feed.
foldManySepBy :: Handle -> IO Int
foldManySepBy inh =
(S.length
$ IP.foldMany
(FL.sliceSepBy (== lf) FL.drain)
(S.unfold FH.read inh)
) -- >>= print
-- | Split on line feed.
parseManySepBy :: Handle -> IO Int
parseManySepBy inh =
(S.length $ IP.parseMany (PR.sliceSepBy (== lf) FL.drain)
(S.unfold FH.read inh)) -- >>= print
(S.length
$ IP.parseMany
(PR.fromFold $ FL.sliceSepBy (== lf) FL.drain)
(S.unfold FH.read inh)
) -- >>= print
-- | Words by space
wordsBy :: Handle -> IO Int
@ -146,7 +158,9 @@ splitOnSuffixSeq str inh =
o_1_space_reduce_read_split :: BenchEnv -> [Benchmark]
o_1_space_reduce_read_split env =
[ bgroup "split"
[ mkBench "S.parseMany (PR.sliceSepBy (== lf) FL.drain)" env
[ mkBench "S.foldMany (FL.sliceSepBy (== lf) FL.drain)" env
$ \inh _ -> foldManySepBy inh
, mkBench "S.parseMany (FL.sliceSepBy (== lf) FL.drain)" env
$ \inh _ -> parseManySepBy inh
, mkBench "S.wordsBy isSpace FL.drain" env $ \inh _ ->
wordsBy inh

View File

@ -17,11 +17,12 @@ module Serial.Transformation2 (benchmarks) where
import Control.DeepSeq (NFData(..))
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Monoid (Sum(..))
import GHC.Generics (Generic)
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Stream.IsStream as Internal
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Stream.IsStream as Internal
import qualified Streamly.Prelude as S
import Gauge
import Streamly.Prelude (SerialT, serially)
@ -131,6 +132,22 @@ groupsByRollingEq :: MonadIO m => SerialT m Int -> m ()
groupsByRollingEq =
S.drain . S.groupsByRolling (==) FL.drain
{-# INLINE foldMany #-}
foldMany :: Monad m => SerialT m Int -> m ()
foldMany =
S.drain
. S.map getSum
. Internal.foldMany (FL.ltake 2 FL.mconcat)
. S.map Sum
{-# INLINE _foldIterate #-}
_foldIterate :: Monad m => SerialT m Int -> m ()
_foldIterate =
S.drain
. S.map getSum
. Internal.foldIterate (FL.ltake 2 . FL.sconcat) (Sum 0)
. S.map Sum
o_1_space_grouping :: Int -> [Benchmark]
o_1_space_grouping value =
-- Buffering operations using heap proportional to group/window sizes.
@ -140,6 +157,8 @@ o_1_space_grouping value =
, benchIOSink value "groupsByEq" groupsByEq
, benchIOSink value "groupsByRollingLT" groupsByRollingLT
, benchIOSink value "groupsByRollingEq" groupsByRollingEq
, benchIOSink value "foldMany" foldMany
-- , benchIOSink value "foldIterate" foldIterate
]
]

View File

@ -136,10 +136,22 @@ module Streamly.Internal.Data.Stream.IsStream
, foldlM'
-- ** Composable Left Folds
-- | See "Streamly.Internal.Data.Fold"
, fold
, foldMany
, foldSequence
, foldIterate
-- ** Parsers
-- | See "Streamly.Internal.Data.Parser"
, parse
, parseK
, parseD
, parseMany
, parseManyD
, parseManyTill
, parseSequence
, parseIterate
-- ** Concurrent Folds
, foldAsync
@ -319,12 +331,6 @@ module Streamly.Internal.Data.Stream.IsStream
, reverse
, reverse'
-- ** Parsing
, parseMany
, parseManyD
, parseManyTill
, parseIterate
-- ** Trimming
, take
-- , takeGE
@ -643,8 +649,7 @@ import Streamly.Internal.Data.IORef.Prim (Prim, IORef)
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..))
import qualified Streamly.Internal.Data.Array.Storable.Foreign as A
import qualified Streamly.Data.Fold as FL
import qualified Streamly.Internal.Data.Fold.Types as FL
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Stream.Prelude as P
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Stream.StreamD as D
@ -1410,7 +1415,10 @@ foldlM' step begin m = S.foldlM' step begin $ toStreamS m
-- Running a Fold
------------------------------------------------------------------------------
-- | Fold a stream using the supplied left fold.
-- | Fold a stream using the supplied left 'Fold' and reducing the resulting
-- expression strictly at each step. The behavior is similar to 'foldl''. A
-- 'Fold' can terminate early without consuming the full stream. See the
-- documentation of individual 'Fold's for termination behavior.
--
-- >>> S.fold FL.sum (S.enumerateFromTo 1 100)
-- 5050
@ -1461,6 +1469,7 @@ parse = parseD . PRK.fromParserK
-- |
-- > drain = mapM_ (\_ -> return ())
-- > drain = fold Fold.drain
--
-- Run a stream, discarding the results. By default it interprets the stream
-- as 'SerialT', to run other types of streams use the type adapting
@ -1483,6 +1492,7 @@ runStream = drain
-- |
-- > drainN n = drain . take n
-- > drainN n = fold (Fold.ltake n Fold.drain)
--
-- Run maximum up to @n@ iterations of a stream.
--
@ -1504,6 +1514,7 @@ runN = drainN
-- |
-- > drainWhile p = drain . takeWhile p
-- > drainWhile p = fold (Fold.sliceSepBy (not . p) Fold.drain)
--
-- Run a stream as long as the predicate holds true.
--
@ -1525,6 +1536,8 @@ runWhile = drainWhile
-- | Determine whether the stream is empty.
--
-- > null = fold Fold.null
--
-- @since 0.1.1
{-# INLINE null #-}
null :: Monad m => SerialT m a -> m Bool
@ -1533,6 +1546,7 @@ null = S.null . toStreamS
-- | Extract the first element of the stream, if any.
--
-- > head = (!! 0)
-- > head = fold Fold.head
--
-- @since 0.1.0
{-# INLINE head #-}
@ -1568,6 +1582,7 @@ init m = K.init (K.adapt m)
-- | Extract the last element of the stream, if any.
--
-- > last xs = xs !! (length xs - 1)
-- > last = fold Fold.last
--
-- @since 0.1.1
{-# INLINE last #-}
@ -1576,6 +1591,8 @@ last m = S.last $ toStreamS m
-- | Determine whether an element is present in the stream.
--
-- > elem = fold Fold.elem
--
-- @since 0.1.0
{-# INLINE elem #-}
elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool
@ -1583,6 +1600,8 @@ elem e m = S.elem e (toStreamS m)
-- | Determine whether an element is not present in the stream.
--
-- > notElem = fold Fold.notElem
--
-- @since 0.1.0
{-# INLINE notElem #-}
notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool
@ -1590,6 +1609,8 @@ notElem e m = S.notElem e (toStreamS m)
-- | Determine the length of the stream.
--
-- > notElem = fold Fold.length
--
-- @since 0.1.0
{-# INLINE length #-}
length :: Monad m => SerialT m a -> m Int
@ -1597,6 +1618,8 @@ length = foldl' (\n _ -> n + 1) 0
-- | Determine whether all elements of a stream satisfy a predicate.
--
-- > all = fold Fold.all
--
-- @since 0.1.0
{-# INLINE all #-}
all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool
@ -1604,6 +1627,8 @@ all p m = S.all p (toStreamS m)
-- | Determine whether any of the elements of a stream satisfy a predicate.
--
-- > any = fold Fold.any
--
-- @since 0.1.0
{-# INLINE any #-}
any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool
@ -1611,6 +1636,8 @@ any p m = S.any p (toStreamS m)
-- | Determines if all elements of a boolean stream are True.
--
-- > and = fold Fold.and
--
-- @since 0.5.0
{-# INLINE and #-}
and :: Monad m => SerialT m Bool -> m Bool
@ -1618,6 +1645,8 @@ and = all (==True)
-- | Determines whether at least one element of a boolean stream is True.
--
-- > or = fold Fold.or
--
-- @since 0.5.0
{-# INLINE or #-}
or :: Monad m => SerialT m Bool -> m Bool
@ -1627,6 +1656,8 @@ or = any (==True)
-- the stream is empty. Note that this is not numerically stable for floating
-- point numbers.
--
-- > sum = fold Fold.sum
--
-- @since 0.1.0
{-# INLINE sum #-}
sum :: (Monad m, Num a) => SerialT m a -> m a
@ -1635,6 +1666,8 @@ sum = foldl' (+) 0
-- | Determine the product of all elements of a stream of numbers. Returns @1@
-- when the stream is empty.
--
-- > product = fold Fold.product
--
-- @since 0.1.1
{-# INLINE product #-}
product :: (Monad m, Num a) => SerialT m a -> m a
@ -1642,6 +1675,8 @@ product = foldl' (*) 1
-- | Fold a stream of monoid elements by appending them.
--
-- > mconcat = fold Fold.mconcat
--
-- /Internal/
{-# INLINE mconcat #-}
mconcat :: (Monad m, Monoid a) => SerialT m a -> m a
@ -1650,6 +1685,7 @@ mconcat = foldr mappend mempty
-- |
-- @
-- minimum = 'minimumBy' compare
-- minimum = fold Fold.minimum
-- @
--
-- Determine the minimum element in a stream.
@ -1662,6 +1698,8 @@ minimum m = S.minimum (toStreamS m)
-- | Determine the minimum element in a stream using the supplied comparison
-- function.
--
-- > minimumBy = fold Fold.minimumBy
--
-- @since 0.6.0
{-# INLINE minimumBy #-}
minimumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a)
@ -1670,6 +1708,7 @@ minimumBy cmp m = S.minimumBy cmp (toStreamS m)
-- |
-- @
-- maximum = 'maximumBy' compare
-- maximum = fold Fold.maximum
-- @
--
-- Determine the maximum element in a stream.
@ -1682,6 +1721,8 @@ maximum = P.maximum
-- | Determine the maximum element in a stream using the supplied comparison
-- function.
--
-- > maximumBy = fold Fold.maximumBy
--
-- @since 0.6.0
{-# INLINE maximumBy #-}
maximumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a)
@ -1698,6 +1739,7 @@ m !! i = toStreamS m S.!! i
-- first pair where the key equals the given value @a@.
--
-- > lookup = snd <$> find ((==) . fst)
-- > lookup = fold Fold.lookup
--
-- @since 0.5.0
{-# INLINE lookup #-}
@ -1707,6 +1749,7 @@ lookup a m = S.lookup a (toStreamS m)
-- | Like 'findM' but with a non-monadic predicate.
--
-- > find p = findM (return . p)
-- > find = fold Fold.find
--
-- @since 0.5.0
{-# INLINE find #-}
@ -1715,6 +1758,8 @@ find p m = S.find p (toStreamS m)
-- | Returns the first element that satisfies the given predicate.
--
-- > findM = fold Fold.findM
--
-- @since 0.6.0
{-# INLINE findM #-}
findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a)
@ -1723,6 +1768,8 @@ findM p m = S.findM p (toStreamS m)
-- | Find all the indices where the element in the stream satisfies the given
-- predicate.
--
-- > findIndices = fold Fold.findIndices
--
-- @since 0.5.0
{-# INLINE findIndices #-}
findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int
@ -1730,6 +1777,8 @@ findIndices p m = fromStreamS $ S.findIndices p (toStreamS m)
-- | Returns the first index that satisfies the given predicate.
--
-- > findIndex = fold Fold.findIndex
--
-- @since 0.5.0
{-# INLINE findIndex #-}
findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int)
@ -1738,10 +1787,12 @@ findIndex p = head . findIndices p
-- | Find all the indices where the value of the element in the stream is equal
-- to the given value.
--
-- > elemIndices a = findIndices (== a)
--
-- @since 0.5.0
{-# INLINE elemIndices #-}
elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int
elemIndices a = findIndices (==a)
elemIndices a = findIndices (== a)
-- | Returns the first index where a given value is found in the stream.
--
@ -3636,7 +3687,7 @@ iterateMapLeftsWith
iterateMapLeftsWith combine f = iterateMapWith combine (either f (const K.nil))
------------------------------------------------------------------------------
-- Parsing
-- Folding/Parsing chunks in a stream
------------------------------------------------------------------------------
-- Splitting operations that take a predicate and a Fold can be
@ -3645,6 +3696,67 @@ iterateMapLeftsWith combine f = iterateMapWith combine (either f (const K.nil))
--
-- XXX We need takeGE/takeBetween to implement "some" using "many".
-- | Apply a 'Fold' repeatedly on a stream and emit the parsed values in the
-- output stream.
--
-- This is the streaming dual of the 'Streamly.Internal.Data.Fold.many'
-- parse combinator.
--
-- >>> f = Fold.ltake 2 Fold.sum
-- >>> Stream.toList $ Stream.foldMany f $ Stream.fromList [1..10]
-- > [3,7,11,15,19]
--
-- >>> f = Fold.sliceEndWith Fold.toList
-- >>> Stream.toList $ Stream.foldMany f $ Stream.fromList "hello\nworld"
-- > ["hello\n","world"]
--
-- /Internal/
--
{-# INLINE foldMany #-}
foldMany
:: (IsStream t, Monad m)
=> Fold m a b
-> t m a
-> t m b
foldMany f m = D.fromStreamD $ D.foldMany f (D.toStreamD m)
-- | Apply a stream of folds to an input stream and emit the results in the
-- output stream.
--
-- /Internal/
--
{-# INLINE foldSequence #-}
foldSequence
:: -- (IsStream t, Monad m) =>
t m (Fold m a b)
-> t m a
-> t m b
foldSequence _f _m = undefined
-- | Iterate a fold generator on a stream. The initial value @b@ is used to
-- generate the first fold, the fold is applied on the stream and the result of
-- the fold is used to generate the next fold and so on.
--
-- >>> f x = Fold.ltake 2 (Fold.mconcatTo x)
-- >>> s = Stream.map Sum $ Stream.fromList [1..10]
-- >>> Stream.toList $ Stream.map getSum $ Stream.foldIterate f 0 s
-- > [3,10,21,36,55,55]
--
-- This is the streaming equivalent of monad like sequenced application of
-- folds where next fold is dependent on the previous fold.
--
-- /Internal/
--
{-# INLINE foldIterate #-}
foldIterate
:: -- (IsStream t, Monad m) =>
(b -> Fold m a b)
-> b
-> t m a
-> t m b
foldIterate _f _i _m = undefined
-- D.fromStreamD $ D.foldIterate f i (D.toStreamD m)
-- | Apply a 'Parser' repeatedly on a stream and emit the parsed values in the
-- output stream.
--
@ -3657,7 +3769,7 @@ iterateMapLeftsWith combine f = iterateMapWith combine (either f (const K.nil))
-- >>> S.toList $ S.parseMany (PR.line FL.toList) $ S.fromList "hello\nworld"
-- > ["hello\n","world"]
--
-- /Internal
-- /Internal/
--
{-# INLINE parseMany #-}
parseMany
@ -3677,6 +3789,19 @@ parseManyD
parseManyD p m =
D.fromStreamD $ D.parseMany p (D.toStreamD m)
-- | Apply a stream of parsers to an input stream and emit the results in the
-- output stream.
--
-- /Internal/
--
{-# INLINE parseSequence #-}
parseSequence
:: -- (IsStream t, Monad m) =>
t m (Parser m a b)
-> t m a
-> t m b
parseSequence _f _m = undefined
-- | @parseManyTill collect test stream@ tries the parser @test@ on the input,
-- if @test@ fails it backtracks and tries @collect@, after @collect@ succeeds
-- @test@ is tried again and so on. The parser stops when @test@ succeeds. The
@ -3786,7 +3911,7 @@ groupScan split fold m = undefined
chunksOf
:: (IsStream t, Monad m)
=> Int -> Fold m a b -> t m a -> t m b
chunksOf n f m = D.fromStreamD $ D.groupsOf n f (D.toStreamD m)
chunksOf n f = foldMany (FL.ltake n f)
-- |
--
@ -3831,8 +3956,6 @@ intervalsOf n f xs =
-- N-ary APIs
------------------------------------------------------------------------------
-- XXX We should probably change the order of the comparision and update the
-- docs accordingly.
-- | @groupsBy cmp f $ S.fromList [a,b,c,...]@ assigns the element @a@ to the
-- first group, if @b \`cmp` a@ is 'True' then @b@ is also assigned to the same
-- group. If @c \`cmp` a@ is 'True' then @c@ is also assigned to the same
@ -3945,8 +4068,7 @@ groups = groupsBy (==)
splitOn
:: (IsStream t, Monad m)
=> (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOn predicate f m =
D.fromStreamD $ D.splitBy predicate f (D.toStreamD m)
splitOn predicate f = foldMany (FL.sliceSepBy predicate f)
-- | Like 'splitOn' but the separator is considered as suffixed to the segments
-- in the stream. A missing suffix at the end is allowed. A separator at the
@ -3998,7 +4120,7 @@ splitOnSuffix
:: (IsStream t, Monad m)
=> (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnSuffix predicate f m =
D.fromStreamD $ D.splitSuffixBy predicate f (D.toStreamD m)
D.fromStreamD $ D.foldMany1 (FL.sliceSepBy predicate f) (D.toStreamD m)
-- | Like 'splitOn' after stripping leading, trailing, and repeated separators.
-- Therefore, @".a..b."@ with '.' as the separator would be parsed as
@ -4068,7 +4190,7 @@ splitWithSuffix
:: (IsStream t, Monad m)
=> (a -> Bool) -> Fold m a b -> t m a -> t m b
splitWithSuffix predicate f m =
D.fromStreamD $ D.splitSuffixWith predicate f (D.toStreamD m)
D.fromStreamD $ D.foldMany1 (FL.sliceEndWith predicate f) (D.toStreamD m)
------------------------------------------------------------------------------
-- Split on a delimiter sequence
@ -4758,13 +4880,12 @@ classifySessionsBy tick tmout reset ejectPred
-- better performance.
--
let curTime = max sessionEventTime timestamp
extractOld v =
case v of
Nothing -> initial
Just (Tuple' _ acc) -> return acc
mOld = Map.lookup key sessionKeyValueMap
fs <- extractOld mOld
fs <-
case mOld of
Nothing -> initial
Just (Tuple' _ acc) -> return acc
res <- step fs value
case res of
FL.Done fb -> do

View File

@ -103,6 +103,8 @@ module Streamly.Internal.Data.Stream.StreamD
, foldlx'
, foldlMx'
, foldOnce
, foldMany
, foldMany1
, parselMx'
, parseMany
@ -157,16 +159,12 @@ module Streamly.Internal.Data.Stream.StreamD
, interpose
-- ** Grouping
, groupsOf
, groupsOf2
, groupsBy
, groupsRollingBy
-- ** Splitting
, splitBy
, splitSuffixBy
, wordsBy
, splitSuffixWith
, splitOnSeq
, splitOnSuffixSeq
@ -1519,11 +1517,6 @@ reverse' m =
-- Grouping/Splitting
------------------------------------------------------------------------------
{-# INLINE_NORMAL splitSuffixWith #-}
splitSuffixWith :: Monad m
=> (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
splitSuffixWith predicate f = foldMany1 (FL.sliceEndWith predicate f)
{-# INLINE_NORMAL groupsBy #-}
groupsBy :: Monad m
=> (a -> a -> Bool)
@ -1540,11 +1533,11 @@ groupsBy cmp f (Stream step state) = Stream (stepOuter f) (Just state, Nothing)
case res of
Yield x s -> do
fs <- initial
sfs <- fstep fs x
case sfs of
r <- fstep fs x
case r of
FL.Partial fs1 -> go SPEC x s fs1
FL.Done fb -> return $ Yield fb (Just s, Just x)
Skip s -> return $ Skip $ (Just s, Nothing)
FL.Done b -> return $ Yield b (Just s, Just x)
Skip s -> return $ Skip (Just s, Nothing)
Stop -> return Stop
where
@ -1555,19 +1548,19 @@ groupsBy cmp f (Stream step state) = Stream (stepOuter f) (Just state, Nothing)
Yield x s -> do
if cmp x prev
then do
sfs <- fstep acc x
case sfs of
r <- fstep acc x
case r of
FL.Partial fs1 -> go SPEC prev s fs1
FL.Done fb -> return $ Yield fb (Just s, Just x)
FL.Done b -> return $ Yield b (Just s, Just x)
else done acc >>= \r -> return $ Yield r (Just s, Just x)
Skip s -> go SPEC prev s acc
Stop -> done acc >>= \r -> return $ Yield r (Nothing, Nothing)
stepOuter (Fold fstep initial done) gst (Just st, Just prev) = do
fs <- initial
sfs <- fstep fs prev
case sfs of
r <- fstep fs prev
case r of
FL.Partial fs1 -> go SPEC st fs1
FL.Done fb -> return $ Yield fb (Just st, Nothing)
FL.Done b -> return $ Yield b (Just st, Nothing)
where
@ -1578,10 +1571,10 @@ groupsBy cmp f (Stream step state) = Stream (stepOuter f) (Just state, Nothing)
Yield x s -> do
if cmp x prev
then do
sfs <- fstep acc x
case sfs of
r <- fstep acc x
case r of
FL.Partial fs1 -> go SPEC s fs1
FL.Done fb -> return $ Yield fb (Just s, Just x)
FL.Done b -> return $ Yield b (Just s, Just x)
else done acc >>= \r -> return $ Yield r (Just s, Just x)
Skip s -> go SPEC s acc
Stop -> done acc >>= \r -> return $ Yield r (Nothing, Nothing)
@ -1604,10 +1597,10 @@ groupsRollingBy cmp f (Stream step state) =
case res of
Yield x s -> do
fs <- initial
sfs <- fstep fs x
case sfs of
r <- fstep fs x
case r of
FL.Partial fs1 -> go SPEC x s fs1
FL.Done fb -> return $ Yield fb (Just s, Just x)
FL.Done b -> return $ Yield b (Just s, Just x)
Skip s -> return $ Skip $ (Just s, Nothing)
Stop -> return Stop
@ -1619,19 +1612,19 @@ groupsRollingBy cmp f (Stream step state) =
Yield x s -> do
if cmp prev x
then do
sfs <- fstep acc x
case sfs of
r <- fstep acc x
case r of
FL.Partial fs1 -> go SPEC x s fs1
FL.Done fb -> return $ Yield fb (Just s, Just x)
FL.Done b -> return $ Yield b (Just s, Just x)
else done acc >>= \r -> return $ Yield r (Just s, Just x)
Skip s -> go SPEC prev s acc
Stop -> done acc >>= \r -> return $ Yield r (Nothing, Nothing)
stepOuter (Fold fstep initial done) gst (Just st, Just prev') = do
fs <- initial
sfs <- fstep fs prev'
case sfs of
r <- fstep fs prev'
case r of
FL.Partial fs1 -> go SPEC prev' st fs1
FL.Done fb -> return $ Yield fb (Just st, Nothing)
FL.Done b -> return $ Yield b (Just st, Nothing)
where
@ -1641,24 +1634,15 @@ groupsRollingBy cmp f (Stream step state) =
Yield x s -> do
if cmp prevv x
then do
sfs <- fstep acc x
case sfs of
r <- fstep acc x
case r of
FL.Partial fs1 -> go SPEC x s fs1
FL.Done fb -> return $ Yield fb (Just s, Just x)
FL.Done b -> return $ Yield b (Just s, Just x)
else done acc >>= \r -> return $ Yield r (Just s, Just x)
Skip s -> go SPEC prevv s acc
Stop -> done acc >>= \r -> return $ Yield r (Nothing, Nothing)
stepOuter _ _ (Nothing, _) = return Stop
{-# INLINE_NORMAL splitBy #-}
splitBy :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
splitBy predicate f = foldMany (FL.sliceSepBy predicate f)
{-# INLINE_NORMAL splitSuffixBy #-}
splitSuffixBy :: Monad m
=> (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
splitSuffixBy predicate f = foldMany1 (FL.sliceSepBy predicate f)
data WordsByState s = WordsByJust s | WordsByNothing
{-# INLINE_NORMAL wordsBy #-}
@ -1677,10 +1661,10 @@ wordsBy predicate f (Stream step state) =
then return $ Skip (WordsByJust s)
else do
fs <- initial
sfs <- fstep fs x
case sfs of
r <- fstep fs x
case r of
FL.Partial fs1 -> go SPEC s fs1
FL.Done fb -> return $ Yield fb (WordsByJust s)
FL.Done b -> return $ Yield b (WordsByJust s)
Skip s -> return $ Skip $ WordsByJust s
Stop -> return Stop
@ -1694,10 +1678,10 @@ wordsBy predicate f (Stream step state) =
if predicate x
then done acc >>= \r -> return $ Yield r (WordsByJust s)
else do
sfs <- fstep acc x
case sfs of
r <- fstep acc x
case r of
FL.Partial fs1 -> go SPEC s fs1
FL.Done fb -> return $ Yield fb (WordsByJust s)
FL.Done b -> return $ Yield b (WordsByJust s)
Skip s -> go SPEC s acc
Stop -> done acc >>= \r -> return $ Yield r WordsByNothing
@ -1808,13 +1792,13 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) =
res <- step (adaptState gst) st
case res of
Yield x s -> do
acc <- initial
sfs <- fstep acc x
case sfs of
FL.Partial acc1 -> do
r <- done acc1
skip $ SplitOnSeqYield r (SplitOnSeqEmpty s)
FL.Done fb -> skip $ SplitOnSeqYield fb (SplitOnSeqEmpty s)
fs <- initial
r <- fstep fs x
case r of
FL.Partial fs1 -> do
b <- done fs1
skip $ SplitOnSeqYield b (SplitOnSeqEmpty s)
FL.Done b -> skip $ SplitOnSeqYield b (SplitOnSeqEmpty s)
Skip s -> return $ Skip (SplitOnSeqEmpty s)
Stop -> return Stop
@ -1839,12 +1823,12 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) =
fs1 <- initial
return $ Skip $ SplitOnSeqYield r (SplitOnSeqSingle fs1 s pat)
else do
sfs <- fstep fs x
case sfs of
FL.Partial fs1 -> skip $ (SplitOnSeqSingle fs1 s pat)
FL.Done fb -> do
r <- fstep fs x
case r of
FL.Partial fs1 -> skip $ SplitOnSeqSingle fs1 s pat
FL.Done b -> do
fs1 <- initial
skip $ SplitOnSeqYield fb (SplitOnSeqSingle fs1 s pat)
skip $ SplitOnSeqYield b (SplitOnSeqSingle fs1 s pat)
Skip s -> return $ Skip $ SplitOnSeqSingle fs s pat
Stop -> do
r <- done fs
@ -1859,13 +1843,13 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) =
skip $ SplitOnSeqYield r SplitOnSeqDone
stepOuter _ (SplitOnSeqWordDone n fs wrd) = do
let old = elemMask .&. (wrd `shiftR` (elemBits * (n - 1)))
sfs <- fstep fs (toEnum $ fromIntegral old)
case sfs of
r <- fstep fs (toEnum $ fromIntegral old)
case r of
FL.Partial fs1 -> skip $ SplitOnSeqWordDone (n - 1) fs1 wrd
FL.Done r -> do
FL.Done b -> do
fs1 <- initial
let next = SplitOnSeqWordDone (n - 1) fs1 wrd
skip $ SplitOnSeqYield r next
skip $ SplitOnSeqYield b next
stepOuter gst (SplitOnSeqWordInit st0) =
go SPEC 0 0 st0
@ -1910,18 +1894,18 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) =
let wrd1 = addToWord wrd x
old = (wordMask .&. wrd)
`shiftR` (elemBits * (patLen - 1))
sfs <- fstep fs (toEnum $ fromIntegral old)
case sfs of
r <- fstep fs (toEnum $ fromIntegral old)
case r of
FL.Partial fs1 -> do
if wrd1 .&. wordMask == wordPat
then do
r <- done fs1
b <- done fs1
let next = SplitOnSeqWordInit s
skip $ SplitOnSeqYield r next
skip $ SplitOnSeqYield b next
else go SPEC wrd1 s fs1
FL.Done r ->
FL.Done b ->
let next = SplitOnSeqWordInit s
in skip $ SplitOnSeqYield r next
in skip $ SplitOnSeqYield b next
Skip s -> go SPEC wrd s fs
Stop -> skip $ SplitOnSeqWordDone patLen fs wrd
@ -1963,16 +1947,16 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) =
Yield x s -> do
old <- liftIO $ peek rh
let cksum1 = deltaCksum cksum old x
sfs <- fstep fs old
case sfs of
r <- fstep fs old
case r of
FL.Partial fs1 -> do
rh1 <- liftIO (RB.unsafeInsert rb rh x)
if (cksum1 == patHash)
if cksum1 == patHash
then skip $ SplitOnSeqKRCheck fs1 s rb rh1
else go SPEC fs1 s rh1 cksum1
FL.Done r ->
FL.Done b ->
let next = SplitOnSeqKRInit 0 s rb (RB.startOf rb)
in skip $ SplitOnSeqYield r next
in skip $ SplitOnSeqYield b next
Skip s -> go SPEC fs s rh cksum
Stop -> skip $ SplitOnSeqKRDone patLen fs rb rh
@ -2015,13 +1999,13 @@ splitOnSeq patArr (Fold fstep initial done) (Stream step state) =
stepOuter _ (SplitOnSeqKRDone n fs rb rh) = do
old <- liftIO $ peek rh
let rh1 = RB.advance rb rh
sfs <- fstep fs old
case sfs of
r <- fstep fs old
case r of
FL.Partial fs1 -> skip $ SplitOnSeqKRDone (n - 1) fs1 rb rh1
FL.Done r -> do
FL.Done b -> do
fs1 <- initial
let next = SplitOnSeqKRDone (n - 1) fs1 rb rh1
skip $ SplitOnSeqYield r next
skip $ SplitOnSeqYield b next
{-# ANN type SplitOnSuffixSeqState Fuse #-}
data SplitOnSuffixSeqState rb rh ck w fs s b x =
@ -2078,22 +2062,22 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
processYieldSingle pat x s fs =
if pat == x
then do
sfs <- if withSep then fstep fs x else return $ FL.Partial fs
case sfs of
r <- if withSep then fstep fs x else return $ FL.Partial fs
case r of
FL.Partial fs1 -> do
r <- done fs1
b <- done fs1
let next = SplitOnSuffixSeqSingleInit s pat
skip $ SplitOnSuffixSeqYield r next
FL.Done r ->
skip $ SplitOnSuffixSeqYield b next
FL.Done b ->
let next = SplitOnSuffixSeqSingleInit s pat
in skip $ SplitOnSuffixSeqYield r next
in skip $ SplitOnSuffixSeqYield b next
else do
sfs <- fstep fs x
case sfs of
r <- fstep fs x
case r of
FL.Partial fs1 -> skip $ SplitOnSuffixSeqSingle fs1 s pat
FL.Done r ->
FL.Done b ->
let next = SplitOnSuffixSeqSingleInit s pat
in skip $ SplitOnSuffixSeqYield r next
in skip $ SplitOnSuffixSeqYield b next
-- For Rabin-Karp search
k = 2891336453 :: Word32
@ -2135,13 +2119,13 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
case res of
Yield x s -> do
acc <- initial
sfs <- fstep acc x
case sfs of
FL.Partial acc1 -> do
r <- done acc1
skip $ SplitOnSuffixSeqYield r (SplitOnSuffixSeqEmpty s)
FL.Done r ->
skip $ SplitOnSuffixSeqYield r (SplitOnSuffixSeqEmpty s)
r <- fstep acc x
case r of
FL.Partial fs -> do
b <- done fs
skip $ SplitOnSuffixSeqYield b (SplitOnSuffixSeqEmpty s)
FL.Done b ->
skip $ SplitOnSuffixSeqYield b (SplitOnSuffixSeqEmpty s)
Skip s -> skip (SplitOnSuffixSeqEmpty s)
Stop -> return Stop
@ -2180,13 +2164,13 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
skip $ SplitOnSuffixSeqYield r SplitOnSuffixSeqDone
stepOuter _ (SplitOnSuffixSeqWordDone n fs wrd) = do
let old = elemMask .&. (wrd `shiftR` (elemBits * (n - 1)))
sfs <- fstep fs (toEnum $ fromIntegral old)
case sfs of
r <- fstep fs (toEnum $ fromIntegral old)
case r of
FL.Partial fs1 -> skip $ SplitOnSuffixSeqWordDone (n - 1) fs1 wrd
FL.Done r -> do
FL.Done b -> do
fs1 <- initial
let next = SplitOnSuffixSeqWordDone (n - 1) fs1 wrd
skip $ SplitOnSuffixSeqYield r next
skip $ SplitOnSuffixSeqYield b next
stepOuter gst (SplitOnSuffixSeqWordInit st0) = do
res <- step (adaptState gst) st0
@ -2194,12 +2178,12 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
Yield x s -> do
fs <- initial
let wrd = addToWord 0 x
sfs <- if withSep then fstep fs x else return $ FL.Partial fs
case sfs of
r <- if withSep then fstep fs x else return $ FL.Partial fs
case r of
FL.Partial fs1 -> go SPEC 1 wrd s fs1
FL.Done r -> do
FL.Done b -> do
let next = SplitOnSuffixSeqWordInit s
in skip $ SplitOnSuffixSeqYield r next
in skip $ SplitOnSuffixSeqYield b next
Skip s -> skip (SplitOnSuffixSeqWordInit s)
Stop -> return Stop
@ -2211,20 +2195,20 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
case res of
Yield x s -> do
let wrd1 = addToWord wrd x
sfs <- if withSep then fstep fs x else return $ FL.Partial fs
case sfs of
r <- if withSep then fstep fs x else return $ FL.Partial fs
case r of
FL.Partial fs1 ->
if idx /= maxIndex
then go SPEC (idx + 1) wrd1 s fs1
else if wrd1 .&. wordMask /= wordPat
then skip $ SplitOnSuffixSeqWordLoop wrd1 s fs1
else do
r <- done fs
b <- done fs
let next = SplitOnSuffixSeqWordInit s
skip $ SplitOnSuffixSeqYield r next
FL.Done r ->
skip $ SplitOnSuffixSeqYield b next
FL.Done b ->
let next = SplitOnSuffixSeqWordInit s
in skip $ SplitOnSuffixSeqYield r next
in skip $ SplitOnSuffixSeqYield b next
Skip s -> go SPEC idx wrd s fs
Stop -> skip $ SplitOnSuffixSeqWordDone idx fs wrd
@ -2241,21 +2225,21 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
let wrd1 = addToWord wrd x
old = (wordMask .&. wrd)
`shiftR` (elemBits * (patLen - 1))
sfs <-
r <-
if withSep
then fstep fs x
else fstep fs (toEnum $ fromIntegral old)
case sfs of
case r of
FL.Partial fs1 ->
if wrd1 .&. wordMask == wordPat
then do
r <- done fs1
b <- done fs1
let next = SplitOnSuffixSeqWordInit s
skip $ SplitOnSuffixSeqYield r next
skip $ SplitOnSuffixSeqYield b next
else go SPEC wrd1 s fs1
FL.Done r ->
FL.Done b ->
let next = SplitOnSuffixSeqWordInit s
in skip $ SplitOnSuffixSeqYield r next
in skip $ SplitOnSuffixSeqYield b next
Skip s -> go SPEC wrd s fs
Stop ->
if wrd .&. wordMask == wordPat
@ -2276,12 +2260,13 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
Yield x s -> do
rh1 <- liftIO $ RB.unsafeInsert rb rh0 x
fs <- initial
sfs <- if withSep then fstep fs x else return $ FL.Partial fs
case sfs of
FL.Partial fs1 -> skip $ SplitOnSuffixSeqKRInit1 fs1 s rb rh1
FL.Done r ->
r <- if withSep then fstep fs x else return $ FL.Partial fs
case r of
FL.Partial fs1 ->
skip $ SplitOnSuffixSeqKRInit1 fs1 s rb rh1
FL.Done b ->
let next = SplitOnSuffixSeqKRInit 0 s rb (RB.startOf rb)
in skip $ SplitOnSuffixSeqYield r next
in skip $ SplitOnSuffixSeqYield b next
Skip s -> skip $ SplitOnSuffixSeqKRInit idx0 s rb rh0
Stop -> return Stop
@ -2295,8 +2280,8 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
case res of
Yield x s -> do
rh1 <- liftIO (RB.unsafeInsert rb rh x)
sfs <- if withSep then fstep fs x else return $ FL.Partial fs
case sfs of
r <- if withSep then fstep fs x else return $ FL.Partial fs
case r of
FL.Partial fs1 ->
if idx /= maxIndex
then go SPEC (idx + 1) rh1 s fs1
@ -2305,10 +2290,12 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
!ringHash = fold addCksum 0 rb
in if ringHash == patHash
then SplitOnSuffixSeqKRCheck fs1 s rb rh1
else SplitOnSuffixSeqKRLoop fs1 s rb rh1 ringHash
FL.Done r ->
let next = SplitOnSuffixSeqKRInit 0 st rb (RB.startOf rb)
in skip $ SplitOnSuffixSeqYield r next
else SplitOnSuffixSeqKRLoop
fs1 s rb rh1 ringHash
FL.Done b ->
let next = SplitOnSuffixSeqKRInit
0 st rb (RB.startOf rb)
in skip $ SplitOnSuffixSeqYield b next
Skip s -> go SPEC idx rh s fs
Stop -> do
-- do not issue a blank segment when we end at pattern
@ -2332,15 +2319,16 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
old <- liftIO $ peek rh
rh1 <- liftIO (RB.unsafeInsert rb rh x)
let cksum1 = deltaCksum cksum old x
sfs <- if withSep then fstep fs x else fstep fs old
case sfs of
r <- if withSep then fstep fs x else fstep fs old
case r of
FL.Partial fs1 ->
if (cksum1 /= patHash)
then go SPEC fs1 s rh1 cksum1
else skip $ SplitOnSuffixSeqKRCheck fs1 s rb rh1
FL.Done r ->
let next = SplitOnSuffixSeqKRInit 0 st rb (RB.startOf rb)
in skip $ SplitOnSuffixSeqYield r next
FL.Done b ->
let next = SplitOnSuffixSeqKRInit
0 st rb (RB.startOf rb)
in skip $ SplitOnSuffixSeqYield b next
Skip s -> go SPEC fs s rh cksum
Stop ->
if RB.unsafeEqArray rb rh patArr
@ -2365,13 +2353,13 @@ splitOnSuffixSeq withSep patArr (Fold fstep initial done) (Stream step state) =
stepOuter _ (SplitOnSuffixSeqKRDone n fs rb rh) = do
old <- liftIO $ peek rh
let rh1 = RB.advance rb rh
sfs <- fstep fs old
case sfs of
r <- fstep fs old
case r of
FL.Partial fs1 -> skip $ SplitOnSuffixSeqKRDone (n - 1) fs1 rb rh1
FL.Done r -> do
FL.Done b -> do
fs1 <- initial
let next = SplitOnSuffixSeqKRDone (n - 1) fs1 rb rh1
skip $ SplitOnSuffixSeqYield r next
skip $ SplitOnSuffixSeqYield b next
{-# ANN type SplitState Fuse #-}
data SplitState s arr
@ -3765,12 +3753,10 @@ scanlMx' fstep begin done s =
(begin >>= \x -> x `seq` done x) `consM` postscanlMx' fstep begin done s
data PostScanState fsM s a = PostScan s !fsM
data PostScanState s f = PostScan s !f
-- XXX PostScanWith is can be eliminated if we abstract Yield
{-# INLINE_NORMAL postscanOnce #-}
postscanOnce :: Monad m
=> FL.Fold m a b -> Stream m a -> Stream m b
postscanOnce :: Monad m => FL.Fold m a b -> Stream m a -> Stream m b
postscanOnce (FL.Fold fstep begin done) (Stream step state) =
Stream step' (PostScan state begin)
@ -3780,15 +3766,14 @@ postscanOnce (FL.Fold fstep begin done) (Stream step state) =
step' gst (PostScan st acc) = do
r <- step (adaptState gst) st
case r of
-- XXX Move Yield to a common function
Yield x s -> do
old <- acc
y <- fstep old x
case y of
FL.Partial sres -> do
!v <- done sres
return $ Yield v $ PostScan s (return sres)
FL.Done _ -> return $ Stop
res <- fstep old x
case res of
FL.Partial fs -> do
!v <- done fs
return $ Yield v $ PostScan s (return fs)
FL.Done _ -> return Stop
Skip s -> return $ Skip $ PostScan s acc
Stop -> return Stop
@ -4027,17 +4012,17 @@ tap (Fold fstep initial extract) (Stream step state) = Stream step' TapInit
step' gst (Tapping acc st) = do
r <- step gst st
case r of
-- XXX Abstract Yield?
Yield x s -> do
acc1 <- fstep acc x
res <- fstep acc x
return
$ case acc1 of
FL.Partial sres -> Yield x (Tapping sres s)
FL.Done _ -> Yield x (TapDone s)
$ Yield x
$ case res of
FL.Partial fs -> Tapping fs s
FL.Done _ -> TapDone s
Skip s -> return $ Skip (Tapping acc s)
Stop -> do
void $ extract acc
return $ Stop
return Stop
step' gst (TapDone st) = do
r <- step gst st
return
@ -4067,21 +4052,22 @@ tapOffsetEvery offset n (Fold fstep initial extract) (Stream step state) =
step' gst (TapOffTapping acc st count) = do
r <- step gst st
case r of
Yield x s ->
if count <= 0
-- XXX Abstract the then branch?
then do
acc1 <- fstep acc x
return
$ case acc1 of
FL.Partial sres ->
Yield x $ TapOffTapping sres s (n - 1)
FL.Done _ -> Yield x (TapOffDone s)
else return $ Yield x $ TapOffTapping acc s (count - 1)
Yield x s -> do
next <-
if count <= 0
then do
res <- fstep acc x
return
$ case res of
FL.Partial sres ->
TapOffTapping sres s (n - 1)
FL.Done _ -> TapOffDone s
else return $ TapOffTapping acc s (count - 1)
return $ Yield x next
Skip s -> return $ Skip (TapOffTapping acc s count)
Stop -> do
void $ extract acc
return $ Stop
return Stop
step' gst (TapOffDone st) = do
r <- step gst st
return

View File

@ -48,7 +48,6 @@ module Streamly.Internal.Data.Stream.StreamD.Type
, GroupState (..) -- for inspection testing
, foldMany
, foldMany1
, groupsOf
, groupsOf2
)
where
@ -66,8 +65,8 @@ import Fusion.Plugin.Types (Fuse(..))
import Streamly.Internal.Data.SVar (State(..), adaptState, defState)
import Streamly.Internal.Data.Fold.Types (Fold(..), Fold2(..))
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Fold.Types as FL
import qualified Streamly.Internal.Data.Stream.StreamK as K
------------------------------------------------------------------------------
-- The direct style stream type
@ -589,12 +588,10 @@ take n (Stream step state) = n `seq` Stream step' (state, 0)
{-# ANN type GroupState Fuse #-}
data GroupState s fs b a
= GroupStart s
| GroupConsume s fs a
| GroupBuffer s fs
| GroupYield b (GroupState s fs b a)
| GroupFinish
-- XXX Remove GroupConsume
{-# INLINE_NORMAL foldMany #-}
foldMany :: Monad m => Fold m a b -> Stream m a -> Stream m b
foldMany (Fold fstep initial extract) (Stream step state) =
@ -607,15 +604,16 @@ foldMany (Fold fstep initial extract) (Stream step state) =
-- fs = fold state
fs <- initial
return $ Skip (GroupBuffer st fs)
step' _ (GroupConsume st fs x) = do
fs' <- fstep fs x
case fs' of
FL.Done b -> return $ Skip (GroupYield b (GroupStart st))
FL.Partial ps -> return $ Skip (GroupBuffer st ps)
step' gst (GroupBuffer st fs) = do
r <- step (adaptState gst) st
case r of
Yield x s -> return $ Skip $ GroupConsume s fs x
Yield x s -> do
res <- fstep fs x
return
$ Skip
$ case res of
FL.Done b -> GroupYield b (GroupStart s)
FL.Partial ps -> GroupBuffer s ps
Skip s -> return $ Skip (GroupBuffer s fs)
Stop -> do
b <- extract fs
@ -630,25 +628,26 @@ foldMany1 (Fold fstep initial extract) (Stream step state) =
where
{-# INLINE consume #-}
consume x s fs = do
res <- fstep fs x
return
$ Skip
$ case res of
FL.Done b -> GroupYield b (GroupStart s)
FL.Partial ps -> GroupBuffer s ps
{-# INLINE_LATE step' #-}
step' gst (GroupStart st) = do
-- fs = fold state
r <- step (adaptState gst) st
case r of
Yield x s -> do
fi <- initial
return $ Skip $ GroupConsume s fi x
Yield x s -> initial >>= consume x s
Skip s -> return $ Skip (GroupStart s)
Stop -> return $ Stop
step' _ (GroupConsume st fs x) = do
fs' <- fstep fs x
case fs' of
FL.Done b -> return $ Skip (GroupYield b (GroupStart st))
FL.Partial ps -> return $ Skip (GroupBuffer st ps)
Stop -> return Stop
step' gst (GroupBuffer st fs) = do
r <- step (adaptState gst) st
case r of
Yield x s -> return $ Skip $ GroupConsume s fs x
Yield x s -> consume x s fs
Skip s -> return $ Skip (GroupBuffer s fs)
Stop -> do
b <- extract fs
@ -656,11 +655,6 @@ foldMany1 (Fold fstep initial extract) (Stream step state) =
step' _ (GroupYield b next) = return $ Yield b next
step' _ GroupFinish = return Stop
-- XXX Investigate performance
{-# INLINE groupsOf #-}
groupsOf :: Monad m => Int -> Fold m a b -> Stream m a -> Stream m b
groupsOf n fld = foldMany (FL.ltake n fld)
data GroupState2 s fs
= GroupStart2 s
| GroupBuffer2 s fs Int