Update Fold module docs and expose some APIs

This commit is contained in:
Harendra Kumar 2021-03-07 12:35:20 +05:30
parent badc62d2b7
commit 3b1a26cee5
13 changed files with 662 additions and 509 deletions

View File

@ -2,9 +2,11 @@
### Behavioral changes
* `Streamly.Prelude.fold` can now terminate early without consuming the entire
* `Streamly.Prelude.fold` may now terminate early without consuming the entire
stream. For example, `fold Fold.head stream` would now terminate immediately
after consuming the head element from `stream`.
after consuming the head element from `stream`. This may result in change of
behavior in existing programs if the program relies on the evaluation of the
full stream.
* Change the associativity of combinators `serial`, `wSerial`,
`ahead`, `async`, `wAsync`, `parallel` to be the same as `<>`.
* `encodeUtf8`, `decodeUtf8` now replace any invalid character encountered
@ -26,14 +28,14 @@
require an additional `MonadAsync` constraint. Several other
functions that used these functions also now require the additional
constraint.
* Remove `Applicative` instance of folds. Please use `teeWith` or the `Tee` type
as an alternative to Fold applicative.
* Remove `Applicative` instance of folds. Please use `teeWith` or the `Tee`
type as an alternative to Fold applicative.
### Enhancements
* New encoding/decoding routines, `encodeUtf8'`, `encodeLatin1'`, `decodeUtf8'`,
are added, these routines fail when they encounter any invalid characters.
* `teeWith` is released as a part of `Streamly.Data.Fold`
* Several new functions added to `Streamly.Data.Fold`
### Bug Fixes

View File

@ -19,7 +19,6 @@ import Streamly.Internal.Data.Fold (Fold(..))
import qualified Data.Map.Strict as Map
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Fold.Type as FL
import qualified Streamly.Internal.Data.Pipe as Pipe
import qualified Streamly.Internal.Data.Sink as Sink
import qualified Streamly.Internal.Data.Stream.IsStream as IP

View File

@ -8,9 +8,12 @@
import Control.Concurrent
import Control.Monad (when, replicateM)
import Streamly.Prelude
( IsStream, SerialT, serial, async, asyncly, ahead, aheadly, wAsync
, wAsyncly, parallel, parallely
)
import Gauge
import Streamly.Prelude hiding (mapM_, replicateM)
import qualified Streamly.Prelude as S
-------------------------------------------------------------------------------
@ -28,8 +31,8 @@ append buflen tcount d t =
let work = (\i -> when (d /= 0) (threadDelay d) >> return i)
in S.drain
$ t
$ maxBuffer buflen
$ maxThreads (-1)
$ S.maxBuffer buflen
$ S.maxThreads (-1)
$ S.fromFoldableM $ fmap work [1..tcount]
-- | Run @threads@ concurrently, each producing streams of @elems@ elements
@ -48,28 +51,28 @@ concated
concated buflen threads d elems t =
let work = \i -> S.replicateM i (when (d /= 0) (threadDelay d) >> return i)
in S.drain
$ adapt
$ maxThreads (-1)
$ maxBuffer buflen
$ S.adapt
$ S.maxThreads (-1)
$ S.maxBuffer buflen
$ S.concatMapWith t work
$ S.replicate threads elems
appendGroup :: Int -> Int -> Int -> [Benchmark]
appendGroup buflen threads delay =
appendGroup buflen threads usec =
[ -- bench "serial" $ nfIO $ append buflen threads delay serially
bench "ahead" $ nfIO $ append buflen threads delay aheadly
, bench "async" $ nfIO $ append buflen threads delay asyncly
, bench "wAsync" $ nfIO $ append buflen threads delay wAsyncly
, bench "parallel" $ nfIO $ append buflen threads delay parallely
bench "ahead" $ nfIO $ append buflen threads usec aheadly
, bench "async" $ nfIO $ append buflen threads usec asyncly
, bench "wAsync" $ nfIO $ append buflen threads usec wAsyncly
, bench "parallel" $ nfIO $ append buflen threads usec parallely
]
concatGroup :: Int -> Int -> Int -> Int -> [Benchmark]
concatGroup buflen threads delay n =
[ bench "serial" $ nfIO $ concated buflen threads delay n serial
, bench "ahead" $ nfIO $ concated buflen threads delay n ahead
, bench "async" $ nfIO $ concated buflen threads delay n async
, bench "wAsync" $ nfIO $ concated buflen threads delay n wAsync
, bench "parallel" $ nfIO $ concated buflen threads delay n parallel
concatGroup buflen threads usec n =
[ bench "serial" $ nfIO $ concated buflen threads usec n serial
, bench "ahead" $ nfIO $ concated buflen threads usec n ahead
, bench "async" $ nfIO $ concated buflen threads usec n async
, bench "wAsync" $ nfIO $ concated buflen threads usec n wAsync
, bench "parallel" $ nfIO $ concated buflen threads usec n parallel
]
main :: IO ()

View File

@ -1,79 +1,142 @@
-- |
-- Module : Streamly.Data.Fold
-- Copyright : (c) 2019 Composewell Technologies
-- License : BSD3
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
-- The 'Fold' type represents an effectful action that absorbs a value into an
-- accumulator. For example, a 'sum' 'Fold' represents adding the input to the
-- accumulated sum. A fold driver e.g. 'Streamly.Prelude.fold' pushes values
-- from a stream to the 'Fold' one at a time, reducing the stream to a single
-- value. Therefore, a fold can also be thought of as a sink or a stream
-- consumer.
-- A 'Fold' is a sink or a consumer of a stream of values. The 'Fold' type
-- consists of an accumulator and an effectful action that absorbs a value into
-- the accumulator.
--
-- A 'Fold' is in fact a data representation of a standard left fold ('foldl').
-- Unlike the standard left folds, 'Fold's can terminate at any point e.g. the
-- 'head' fold would terminate immediately after returning the head element.
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Prelude as Stream
--
-- 'Fold's can be combined efficiently using combinators provided in this
-- module; a stream supplied to the combined fold is provided to the
-- constituent folds according to the behavior of the combinator. For example,
-- the 'tee' combinator distributes the input stream to two folds and then
-- combines the resulting fold outputs. Similarly, a 'partition' combinator
-- divides the input among constituent folds.
-- For example, a 'sum' Fold represents adding the input to the accumulated
-- sum. A fold driver e.g. 'Streamly.Prelude.fold' pushes values from a stream
-- to the 'Fold' one at a time, reducing the stream to a single value.
--
-- = Accumulators and Terminating Folds
-- >>> Stream.fold Fold.sum $ Stream.fromList [1..100]
-- 5050
--
-- Folds in this module can be classified in two categories viz. accumulators
-- and terminating folds. Accumulators do not have a terminating condition,
-- they run forever and consume the entire stream, for example the 'length'
-- fold. Terminating folds have a terminating condition and can terminate
-- without consuming the entire stream, for example, the 'head' fold.
-- Conceptually, a 'Fold' is a data type that can mimic a strict left fold
-- ('Data.List.foldl') as well as lazy right fold ('Prelude.foldr'). The above
-- example is similar to a left fold using @(+)@ as the step and @0@ as the
-- initial value of the accumulator:
--
-- = Monoids
-- >>> Data.List.foldl' (+) 0 [1..100]
-- 5050
--
-- Monoids allow generalized, modular folding. The accumulators in this module
-- can be expressed using 'mconcat' and a suitable 'Monoid'. Instead of
-- writing folds we can write Monoids and turn them into folds.
-- 'Fold's have an early termination capability e.g. the 'head' fold would
-- terminate on an infinite stream:
--
-- = Performance Notes
-- >>> Stream.fold Fold.head $ Stream.fromList [1..]
-- Just 1
--
-- 'Streamly.Prelude' module provides fold functions to directly fold streams
-- e.g. Streamly.Prelude/'Streamly.Prelude.sum' serves the same purpose as
-- Fold/'sum'. However, the functions in Streamly.Prelude cannot be
-- efficiently combined together e.g. we cannot drive the input stream through
-- @sum@ and @length@ fold functions simultaneously. Using the 'Fold' type we
-- can efficiently split the stream across mutliple folds because it allows the
-- compiler to perform stream fusion optimizations.
-- The above example is similar to the following right fold:
--
-- = Programmer Notes
-- >>> Prelude.foldr (\x _ -> Just x) Nothing [1..]
-- Just 1
--
-- > import qualified Streamly.Data.Fold as Fold
-- 'Fold's can be combined together using combinators. For example, to create a
-- fold that sums first two elements in a stream:
--
-- More, not yet exposed, fold combinators can be found in
-- "Streamly.Internal.Data.Fold".
-- >>> sumTwo = Fold.take 2 Fold.sum
-- >>> Stream.fold sumTwo $ Stream.fromList [1..100]
-- 3
--
-- Folds can be combined to run in parallel on the same input. For example, to
-- compute the average of numbers in a stream without going through the stream
-- twice:
--
-- >>> avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)
-- >>> Stream.fold avg $ Stream.fromList [1.0..100.0]
-- 50.5
--
-- Folds can be combined so as to partition the input stream over multiple
-- folds. For example, to count even and odd numbers in a stream:
--
-- >>> split n = if even n then Left n else Right n
-- >>> stream = Stream.map split $ Stream.fromList [1..100]
-- >>> countEven = fmap (("Even " ++) . show) Fold.length
-- >>> countOdd = fmap (("Odd " ++) . show) Fold.length
-- >>> f = Fold.partition countEven countOdd
-- >>> Stream.fold f stream
-- ("Even 50","Odd 50")
--
-- Terminating folds can be combined to parse the stream serially such that the
-- first fold consumes the input until it terminates and the second fold
-- consumes the rest of the input until it terminates:
--
-- >>> f = Fold.serialWith (,) (Fold.take 8 Fold.toList) (Fold.takeEndBy (== '\n') Fold.toList)
-- >>> Stream.fold f $ Stream.fromList "header: hello\n"
-- ("header: ","hello\n")
--
-- A 'Fold' can be applied repeatedly on a stream to transform it to a stream
-- of fold results. To split a stream on newlines:
--
-- >>> f = Fold.takeEndBy (== '\n') Fold.toList
-- >>> Stream.toList $ Stream.foldMany f $ Stream.fromList "Hello there!\nHow are you\n"
-- ["Hello there!\n","How are you\n"]
--
-- Similarly, we can split the input of a fold too:
--
-- >>> Stream.fold (Fold.many f Fold.toList) $ Stream.fromList "Hello there!\nHow are you\n"
-- ["Hello there!\n","How are you\n"]
--
-- Please see "Streamly.Internal.Data.Fold" for additional @Pre-release@
-- functions.
--
-- = Folds vs. Streams
--
-- We can often use streams or folds to achieve the same goal. However, streams
-- allow efficient composition of producers (e.g. 'Streamly.Prelude.serial' or
-- 'Streamly.Prelude.mergeBy') whereas folds allow efficient composition of
-- consumers (e.g. 'serialWith', 'partition' or 'teeWith').
--
-- Streams are producers, transformations on streams happen on the output side:
--
-- >>> f = Stream.sum . Stream.map (+1) . Stream.filter odd
-- >>> f $ Stream.fromList [1..100]
-- 2550
--
-- Folds are stream consumers with an input stream and an output value, stream
-- transformations on folds happen on the input side:
--
-- >>> f = Fold.filter odd $ Fold.lmap (+1) $ Fold.sum
-- >>> Stream.fold f $ Stream.fromList [1..100]
-- 2550
--
-- Notice the composition by @.@ vs @$@ and the order of operations in the
-- above examples, the difference is due to output vs input side
-- transformations.
module Streamly.Data.Fold
(
-- * Fold Type
-- |
-- A 'Fold' can be run over a stream using the 'Streamly.Prelude.fold'
-- combinator:
--
-- >>> Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)
-- 5050
Fold -- (..)
-- * Accumulators
-- ** Monoids
-- * Constructors
, mkFoldl
, mkFoldlM
, mkFoldr
-- * Folds
-- ** Accumulators
-- | Folds that never terminate, these folds are much like strict left
-- folds. 'mconcat' is the fundamental accumulator. All other accumulators
-- can be expressed in terms of 'mconcat' using a suitable Monoid. Instead
-- of writing folds we could write Monoids and turn them into folds.
-- Monoids
, sconcat
, mconcat
, foldMap
, foldMapM
-- ** Reducers
-- Reducers
, drain
, drainBy
, last
@ -88,22 +151,16 @@ module Streamly.Data.Fold
, variance
, stdDev
-- ** Collectors
-- | Avoid using these folds in scalable or performance critical
-- applications, they buffer all the input in GC memory which can be
-- detrimental to performance if the input is large.
-- Collectors
, toList
, toListRev
-- ** Terminating Folds
-- | These are much like lazy right folds.
-- * Terminating Folds
-- , drainN
-- , drainWhile
-- , lastN
-- , (!!)
-- , genericIndex
, index
, head
-- , findM
, find
, lookup
, findIndex
@ -116,143 +173,82 @@ module Streamly.Data.Fold
, and
, or
-- * Output Transformations
-- | Unlike stream producer types (e.g. @SerialT m a@) which have only
-- output side, folds have an input side as well as an output side. In the
-- type @Fold m a b@, the input type is @a@ and the output type is @b@.
-- Transformations can be applied either on the input side or on the output
-- side. The 'Functor' instance of a fold maps on the output of the fold:
-- * Combinators
-- | Combinators are modifiers of folds. In the type @Fold m a b@, @a@ is
-- the input type and @b@ is the output type. Transformations can be
-- applied either on the input side or on the output side. Therefore,
-- combinators are of one of the following general shapes:
--
-- * @... -> Fold m a b -> Fold m c b@ (input transformation)
-- * @... -> Fold m a b -> Fold m a c@ (output transformation)
--
-- Output transformations are also known as covariant transformations, and
-- input transformations are also known as contravariant transformations.
-- The input side transformations are more interesting for folds. Most of
-- the following sections describe the input transformation operations on a
-- fold. The names and signatures of the operations are consistent with
-- corresponding operations in "Streamly.Prelude". When an operation makes
-- sense on both input and output side we use the prefix @l@ (for left) for
-- input side operations and the prefix @r@ (for right) for output side
-- operations.
-- ** Mapping on output
-- | The 'Functor' instance of a fold maps on the output of the fold:
--
-- >>> Stream.fold (fmap show Fold.sum) (Stream.enumerateFromTo 1 100)
-- "5050"
--
-- Note: Output transformations are also known as covariant
-- transformations.
, sequence
, rmapM
, mapM
{-
-- * Input Transformations
-- The input side transformations are more interesting for folds. The
-- following sections describe the input transformation operations on a
-- fold. The names and signatures of the operations are consistent with
-- corresponding operations in "Streamly.Prelude".
--
-- Note: Input transformations are also known as contravariant
-- transformations.
-- ** Mapping on Input
, lmap
, lmapM
-- ** Mapping
--, transform
-- , lmap
--, lsequence
-- , lmapM
-- ** Filtering
, filter
, filterM
-- -- ** Filtering
-- , filter
-- , filterM
-- , ldeleteBy
-- , luniq
-- -- ** Mapping Filters
, catMaybes
, mapMaybe
-- ** Mapping Filters
, lmapMaybe
, lmapMaybeM
-- ** Scanning Filters
, lfindIndices
, lelemIndices
-- ** Insertion
-- | Insertion adds more elements to the stream.
, linsertBy
, lintersperseM
-- ** Reordering
, lreverse
-}
{-
-- ** Trimming
, take
-- takeByTime
, ldrop
, ldropWhile
, ldropWhileM
-}
-- , takeInterval
, takeEndBy_
, takeEndBy
-- * Distributing
-- |
-- The 'Applicative' instance of a distributing 'Fold' distributes one copy
-- of the stream to each fold and combines the results using a function.
--
-- @
--
-- |-------Fold m a b--------|
-- ---stream m a---| |---m (b,c,...)
-- |-------Fold m a c--------|
-- | |
-- ...
-- @
--
-- To compute the average of numbers in a stream without going through the
-- stream twice:
--
-- >>> let avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)
-- >>> Stream.fold avg (Stream.enumerateFromTo 1.0 100.0)
-- 50.5
--
-- The 'Semigroup' and 'Monoid' instances of a distributing fold distribute
-- the input to both the folds and combines the outputs using Monoid or
-- Semigroup instances of the output types:
--
-- >>> import Data.Monoid (Sum(..))
-- >>> Stream.fold (Fold.teeWith (<>) Fold.head Fold.last) (fmap Sum $ Stream.enumerateFromTo 1.0 100.0)
-- Just (Sum {getSum = 101.0})
--
-- The 'Num', 'Floating', and 'Fractional' instances work in the same way.
-- ** Serial Append
, serialWith
-- ** Parallel Distribution
-- | For applicative composition using distribution see
-- "Streamly.Internal.Data.Fold.Tee".
, tee
, teeWith
, tee
, distribute
-- * Partitioning
-- |
-- Direct items in the input stream to different folds using a binary
-- ** Partitioning
-- | Direct items in the input stream to different folds using a binary
-- fold selector.
-- , partitionByM
-- , partitionBy
, partition
{-
-- * Demultiplexing
-- | Direct values in the input stream to different folds using an n-ary
-- fold selector.
, demux
-- , demuxWith
, demux_
-- , demuxWith_
-- * Classifying
-- | In an input stream of key value pairs fold values for different keys
-- in individual output buckets using the given fold.
, classify
-- , classifyWith
-}
-- * Unzipping
-- ** Unzipping
, unzip
-- These can be expressed using lmap/lmapM and unzip
-- , unzipWith
-- , unzipWithM
-- -- * Nesting
-- , concatMap
-- , chunksOf
-- , duplicate -- experimental
-- ** Splitting
, many
, chunksOf
-- , intervalsOf
-- ** Nesting
, concatMap
-- * Deprecated
, sequence
, mapM
)
where
@ -265,10 +261,3 @@ import Prelude
span, splitAt, break, mapM)
import Streamly.Internal.Data.Fold
--
-- $setup
-- >>> :m
-- >>> import Prelude hiding (head, sum, last, length)
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import qualified Streamly.Data.Fold as Fold

View File

@ -7,7 +7,8 @@
-- Stability : experimental
-- Portability : GHC
--
-- See "Streamly.Data.Fold" for an overview.
-- See "Streamly.Data.Fold" for an overview and
-- "Streamly.Internal.Data.Fold.Types" for design notes.
--
-- IMPORTANT: keep the signatures consistent with the folds in Streamly.Prelude
@ -17,9 +18,10 @@ module Streamly.Internal.Data.Fold
Step (..)
, Fold (..)
-- * Creating
-- * Constructors
, mkFoldl
, mkFoldlM
, mkFoldl1
, mkFoldr
, mkFoldrM
, mkFold
@ -27,21 +29,21 @@ module Streamly.Internal.Data.Fold
, mkFoldM
, mkFoldM_
-- * Generators
-- * Folds
-- ** Identity
, yield
, yieldM
-- * Accumulators
-- ** Semigroups and Monoids
-- ** Accumulators
-- *** Semigroups and Monoids
, sconcat
, mconcat
, foldMap
, foldMapM
-- ** Reducers
-- *** Reducers
, drain
, drainBy
, drainBy2
, last
, length
, mean
@ -52,7 +54,7 @@ module Streamly.Internal.Data.Fold
, rollingHashFirstN
-- , rollingHashLastN
-- ** Saturating Reducers
-- *** Saturating Reducers
-- | 'product' terminates if it becomes 0. Other folds can theoretically
-- saturate on bounded types, and therefore terminate, however, they will
-- run forever on unbounded types like Integer/Double.
@ -63,14 +65,17 @@ module Streamly.Internal.Data.Fold
, minimumBy
, minimum
-- ** Collectors
-- *** Collectors
-- | Avoid using these folds in scalable or performance critical
-- applications, they buffer all the input in GC memory which can be
-- detrimental to performance if the input is large.
, toList
, toListRev
-- $toListRev
, toStream
, toStreamRev
-- * Terminating Folds
-- ** Terminating Folds
, drainN
-- , lastN
-- , (!!)
@ -91,26 +96,21 @@ module Streamly.Internal.Data.Fold
, or
-- , the
-- * Adapting
-- * Combinators
-- ** Utilities
, with
-- ** Transforming the Monad
, hoist
, generally
-- * Running Incrementally
, initialize
, runStep
-- * Output Transformations
, sequence
-- ** Mapping on output
, rmapM
, mapM
-- * Input Transformations
-- ** Mapping
-- ** Mapping on Input
, transform
, map
, lmap
--, lsequence
, lmapM
, indexed
@ -125,63 +125,67 @@ module Streamly.Internal.Data.Fold
-- ** Mapping Filters
, catMaybes
, mapMaybe
-- , mapMaybeM
{-
-- ** Scanning Filters
, lfindIndices
, lelemIndices
, findIndices
, elemIndices
-- ** Insertion
-- | Insertion adds more elements to the stream.
, linsertBy
, lintersperseM
, insertBy
, intersperseM
-- ** Reordering
, lreverse
, reverse
-}
-- ** Trimming
, take
, takeInterval
-- By elements
, takeEndBy_
, takeEndBy
, takeEndBy_
-- , takeEndBySeq
{-
, ldrop
, ldropWhile
, ldropWhileM
, drop
, dropWhile
, dropWhileM
-}
-- * Splitting
-- Binary
-- ** Serial Append
, serialWith
-- , tail
-- , init
, splitAt -- spanN
-- , splitIn -- sessionN
-- * Distributing
, tee
-- ** Parallel Distribution
, teeWith
, tee
, teeWithFst
, teeWithMin
, distribute
-- , distributeFst
-- , distributeMin
-- * Partitioning
-- ** Parallel Alternative
, shortest
, longest
-- ** Partitioning
, partitionByM
, partitionByFstM
, partitionByMinM
, partitionBy
, partition
-- * Demultiplexing
-- ** Demultiplexing
-- | Direct values in the input stream to different folds using an n-ary
-- fold selector.
, demux -- XXX rename this to demux_
, demuxWith
, demuxDefault -- XXX rename this to demux
@ -189,31 +193,47 @@ module Streamly.Internal.Data.Fold
-- , demuxWithSel
-- , demuxWithMin
-- * Classifying
-- ** Classifying
-- | In an input stream of key value pairs fold values for different keys
-- in individual output buckets using the given fold.
, classify
, classifyWith
-- , classifyWithSel
-- , classifyWithMin
-- * Unzipping
-- ** Unzipping
, unzip
-- These two can be expressed using lmap/lmapM and unzip
, unzipWith
, unzipWithM
, unzipWithFstM
, unzipWithMinM
-- * Nesting
-- ** Zipping
, zipWithM
, zip
-- ** Splitting
, many
, intervalsOf
, chunksOf
, chunksBetween
, zipWithM
, zip
-- ** Nesting
, concatSequence
, concatMap
-- * Running Partially
, initialize
, runStep
, duplicate
-- * Fold2
, drainBy2
-- * Deprecated
, sequence
, mapM
)
where
@ -228,7 +248,6 @@ import Data.Semigroup (Semigroup((<>)))
#endif
import Streamly.Internal.Data.Either.Strict
(Either'(..), fromLeft', fromRight', isLeft', isRight')
import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe)
import Streamly.Internal.Data.Pipe.Type (Pipe (..), PipeState(..))
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..))
import Streamly.Internal.Data.Stream.Serial (SerialT)
@ -270,7 +289,7 @@ hoist f (Fold step initial extract) =
-- | Adapt a pure fold to any monad
--
-- > generally = hoist (return . runIdentity)
-- > generally = Fold.hoist (return . runIdentity)
--
-- /Pre-release/
generally :: Monad m => Fold Identity a b -> Fold m a b
@ -305,7 +324,7 @@ mapM = rmapM
-- >>> Stream.fold fld (Stream.enumerateFromTo 1 10)
-- [2,4,6,8,10]
--
-- /Pre-release/
-- @since 0.8.0
{-# INLINE mapMaybe #-}
mapMaybe :: (Monad m) => (a -> Maybe b) -> Fold m b r -> Fold m a r
mapMaybe f = map f . filter isJust . map fromJust
@ -318,7 +337,7 @@ mapMaybe f = map f . filter isJust . map fromJust
--
-- | Apply a transformation on a 'Fold' using a 'Pipe'.
--
-- @since 0.7.0
-- /Pre-release/
{-# INLINE transform #-}
transform :: Monad m => Pipe m a b -> Fold m b c -> Fold m a c
transform (Pipe pstep1 pstep2 pinitial) (Fold fstep finitial fextract) =
@ -355,23 +374,6 @@ transform (Pipe pstep1 pstep2 pinitial) (Fold fstep finitial fextract) =
extract (Tuple' _ fs) = fextract fs
------------------------------------------------------------------------------
-- Utilities
------------------------------------------------------------------------------
-- | @_Fold1 step@ returns a new 'Fold' using just a step function that has the
-- same type for the accumulator and the element. The result type is the
-- accumulator type wrapped in 'Maybe'. The initial accumulator is retrieved
-- from the 'Foldable', the result is 'None' for empty containers.
{-# INLINABLE _Fold1 #-}
_Fold1 :: Monad m => (a -> a -> a) -> Fold m a (Maybe a)
_Fold1 step = fmap toMaybe $ mkFoldl step_ Nothing'
where
step_ Nothing' a = Just' a
step_ (Just' x) a = Just' $ step x a
------------------------------------------------------------------------------
-- Left folds
------------------------------------------------------------------------------
@ -382,11 +384,13 @@ _Fold1 step = fmap toMaybe $ mkFoldl step_ Nothing'
-- |
-- > drainBy f = lmapM f drain
-- > drainBy = FL.foldMapM (void . f)
-- > drainBy = Fold.foldMapM (void . f)
--
-- Drain all input after passing it through a monadic function. This is the
-- dual of mapM_ on stream producers.
--
-- See also: "Streamly.Prelude.mapM_"
--
-- @since 0.7.0
{-# INLINABLE drainBy #-}
drainBy :: Monad m => (a -> m b) -> Fold m a ()
@ -401,12 +405,12 @@ drainBy2 f = Fold2 (const (void . f)) (\_ -> return ()) return
-- | Extract the last element of the input stream, if any.
--
-- > last = fmap getLast $ FL.foldMap (Last . Just)
-- > last = fmap getLast $ Fold.foldMap (Last . Just)
--
-- @since 0.7.0
{-# INLINABLE last #-}
last :: Monad m => Fold m a (Maybe a)
last = _Fold1 (\_ x -> x)
last = mkFoldl1 (\_ x -> x)
------------------------------------------------------------------------------
-- To Summary
@ -416,14 +420,14 @@ last = _Fold1 (\_ x -> x)
--
-- > genericLength = fmap getSum $ foldMap (Sum . const 1)
--
-- @since 0.7.0
-- /Pre-release/
{-# INLINE genericLength #-}
genericLength :: (Monad m, Num b) => Fold m a b
genericLength = mkFoldl (\n _ -> n + 1) 0
-- | Determine the length of the input stream.
--
-- > length = fmap getSum $ foldMap (Sum . const 1)
-- > length = fmap getSum $ Fold.foldMap (Sum . const 1)
--
-- @since 0.7.0
{-# INLINE length #-}
@ -434,7 +438,7 @@ length = genericLength
-- identity (@0@) when the stream is empty. Note that this is not numerically
-- stable for floating point numbers.
--
-- > sum = fmap getSum $ FL.foldMap Sum
-- > sum = fmap getSum $ Fold.foldMap Sum
--
-- @since 0.7.0
{-# INLINE sum #-}
@ -445,7 +449,7 @@ sum = mkFoldl (+) 0
-- multiplicative identity (@1@) when the stream is empty. The fold terminates
-- when it encounters (@0@) in its input.
--
-- > product = fmap getProduct $ FL.foldMap Product
-- Compare with @Fold.foldMap Product@.
--
-- @since 0.7.0
-- /Since 0.8.0 (Added 'Eq' constraint)/
@ -470,7 +474,7 @@ product = mkFold_ step (Partial 1)
-- @since 0.7.0
{-# INLINE maximumBy #-}
maximumBy :: Monad m => (a -> a -> Ordering) -> Fold m a (Maybe a)
maximumBy cmp = _Fold1 max'
maximumBy cmp = mkFoldl1 max'
where
@ -481,24 +485,24 @@ maximumBy cmp = _Fold1 max'
-- |
-- @
-- maximum = 'maximumBy' compare
-- maximum = Fold.maximumBy compare
-- @
--
-- Determine the maximum element in a stream.
--
-- Compare with @FL.foldMap Max@.
-- Compare with @Fold.foldMap Max@.
--
-- @since 0.7.0
{-# INLINE maximum #-}
maximum :: (Monad m, Ord a) => Fold m a (Maybe a)
maximum = _Fold1 max
maximum = mkFoldl1 max
-- | Computes the minimum element with respect to the given comparison function
--
-- @since 0.7.0
{-# INLINE minimumBy #-}
minimumBy :: Monad m => (a -> a -> Ordering) -> Fold m a (Maybe a)
minimumBy cmp = _Fold1 min'
minimumBy cmp = mkFoldl1 min'
where
@ -514,12 +518,12 @@ minimumBy cmp = _Fold1 min'
-- minimum = 'minimumBy' compare
-- @
--
-- Compare with @FL.foldMap Min@.
-- Compare with @Fold.foldMap Min@.
--
-- @since 0.7.0
{-# INLINE minimum #-}
minimum :: (Monad m, Ord a) => Fold m a (Maybe a)
minimum = _Fold1 min
minimum = mkFoldl1 min
------------------------------------------------------------------------------
-- To Summary (Statistical)
@ -603,7 +607,7 @@ defaultSalt = -2578643520546668380
-- | Compute an 'Int' sized polynomial rolling hash of a stream.
--
-- > rollingHash = rollingHashWithSalt defaultSalt
-- > rollingHash = Fold.rollingHashWithSalt defaultSalt
--
-- /Pre-release/
{-# INLINABLE rollingHash #-}
@ -613,7 +617,7 @@ rollingHash = rollingHashWithSalt defaultSalt
-- | Compute an 'Int' sized polynomial rolling hash of the first n elements of
-- a stream.
--
-- > rollingHashFirstN = take n rollingHash
-- > rollingHashFirstN = Fold.take n Fold.rollingHash
--
-- /Pre-release/
{-# INLINABLE rollingHashFirstN #-}
@ -626,10 +630,14 @@ rollingHashFirstN n = take n rollingHash
-- | Append the elements of an input stream to a provided starting value.
--
-- > S.fold (FL.sconcat 10) (S.map Sum $ S.enumerateFromTo 1 10)
-- >>> Stream.fold (Fold.sconcat 10) (Stream.map Data.Monoid.Sum $ Stream.enumerateFromTo 1 10)
-- Sum {getSum = 65}
--
-- /Pre-release/
-- @
-- sconcat = Fold.mkFoldl (<>)
-- @
--
-- @since 0.8.0
{-# INLINE sconcat #-}
sconcat :: (Monad m, Semigroup a) => a -> Fold m a a
sconcat = mkFoldl (<>)
@ -637,7 +645,10 @@ sconcat = mkFoldl (<>)
-- | Fold an input stream consisting of monoidal elements using 'mappend'
-- and 'mempty'.
--
-- > S.fold FL.mconcat (S.map Sum $ S.enumerateFromTo 1 10)
-- >>> Stream.fold Fold.mconcat (Stream.map Data.Monoid.Sum $ Stream.enumerateFromTo 1 10)
-- Sum {getSum = 55}
--
-- > mconcat = Fold.sconcat mempty
--
-- @since 0.7.0
{-# INLINE mconcat #-}
@ -650,12 +661,13 @@ mconcat ::
mconcat = sconcat mempty
-- |
-- > foldMap f = map f mconcat
-- > foldMap f = Fold.lmap f Fold.mconcat
--
-- Make a fold from a pure function that folds the output of the function
-- using 'mappend' and 'mempty'.
--
-- > S.fold (FL.foldMap Sum) $ S.enumerateFromTo 1 10
-- >>> Stream.fold (Fold.foldMap Data.Monoid.Sum) $ Stream.enumerateFromTo 1 10
-- Sum {getSum = 55}
--
-- @since 0.7.0
{-# INLINABLE foldMap #-}
@ -664,15 +676,16 @@ foldMap :: (Monad m, Monoid b
, Semigroup b
#endif
) => (a -> b) -> Fold m a b
foldMap f = map f mconcat
foldMap f = lmap f mconcat
-- |
-- > foldMapM f = lmapM f mconcat
-- > foldMapM f = Fold.lmapM f Fold.mconcat
--
-- Make a fold from a monadic function that folds the output of the function
-- using 'mappend' and 'mempty'.
--
-- > S.fold (FL.foldMapM (return . Sum)) $ S.enumerateFromTo 1 10
-- >>> Stream.fold (Fold.foldMapM (return . Data.Monoid.Sum)) $ Stream.enumerateFromTo 1 10
-- Sum {getSum = 55}
--
-- @since 0.7.0
{-# INLINABLE foldMapM #-}
@ -695,10 +708,12 @@ foldMapM act = mkFoldlM step (pure mempty)
-- | Buffers the input stream to a list in the reverse order of the input.
--
-- > toListRev = Fold.mkFoldl (flip (:)) []
--
-- /Warning!/ working on large lists accumulated as buffers in memory could be
-- very inefficient, consider using "Streamly.Array" instead.
--
-- @since 0.7.0
-- @since 0.8.0
-- xn : ... : x2 : x1 : []
{-# INLINABLE toListRev #-}
@ -711,6 +726,10 @@ toListRev = mkFoldl (flip (:)) []
-- | A fold that drains the first n elements of its input, running the effects
-- and discarding the results.
--
-- > drainN n = Fold.take n Fold.drain
--
-- /Pre-release/
{-# INLINABLE drainN #-}
drainN :: Monad m => Int -> Fold m a ()
drainN n = take n drain
@ -735,6 +754,8 @@ genericIndex i = mkFold step (Partial 0) (const Nothing)
-- | Lookup the element at the given index.
--
-- See also: "Streamly.Prelude.!!"
--
-- @since 0.7.0
{-# INLINABLE index #-}
index :: Monad m => Int -> Fold m a (Maybe a)
@ -742,8 +763,6 @@ index = genericIndex
-- | Extract the first element of the stream, if any.
--
-- > head = fmap getFirst $ FL.foldMap (First . Just)
--
-- @since 0.7.0
{-# INLINABLE head #-}
head :: Monad m => Fold m a (Maybe a)
@ -751,11 +770,6 @@ head = mkFold_ (const (Done . Just)) (Partial Nothing)
-- | Returns the first element that satisfies the given predicate.
--
-- @
-- find p = fmap getFirst $
-- FL.foldMap (\x -> First (if p x then Just x else Nothing))
-- @
--
-- @since 0.7.0
{-# INLINABLE find #-}
find :: Monad m => (a -> Bool) -> Fold m a (Maybe a)
@ -771,7 +785,7 @@ find predicate = mkFold step (Partial ()) (const Nothing)
-- | In a stream of (key-value) pairs @(a, b)@, return the value @b@ of the
-- first pair where the key equals the given value @a@.
--
-- > lookup = snd <$> find ((==) . fst)
-- > lookup = snd <$> Fold.find ((==) . fst)
--
-- @since 0.7.0
{-# INLINABLE lookup #-}
@ -801,7 +815,7 @@ findIndex predicate = mkFold step (Partial 0) (const Nothing)
-- | Returns the first index where a given value is found in the stream.
--
-- > elemIndex a = findIndex (== a)
-- > elemIndex a = Fold.findIndex (== a)
--
-- @since 0.7.0
{-# INLINABLE elemIndex #-}
@ -814,22 +828,20 @@ elemIndex a = findIndex (a ==)
-- | Return 'True' if the input stream is empty.
--
-- > null = fmap isJust head
-- > null = fmap isJust Fold.head
--
-- @since 0.7.0
{-# INLINABLE null #-}
null :: Monad m => Fold m a Bool
null = mkFold (\() _ -> Done False) (Partial ()) (const True)
--
-- > any p = map p or
-- > any p = fmap getAny . FL.foldMap (Any . p)
--
-- | Returns 'True' if any of the elements of a stream satisfies a predicate.
--
-- >>> Stream.fold (Fold.any (== 0)) $ Stream.fromList [1,0,1]
-- True
--
-- > any p = Fold.lmap p Fold.or
--
-- @since 0.7.0
{-# INLINE any #-}
any :: Monad m => (a -> Bool) -> Fold m a Bool
@ -846,22 +858,20 @@ any predicate = mkFold_ step initial
-- | Return 'True' if the given element is present in the stream.
--
-- > elem a = any (== a)
-- > elem a = Fold.any (== a)
--
-- @since 0.7.0
{-# INLINABLE elem #-}
elem :: (Eq a, Monad m) => a -> Fold m a Bool
elem a = any (a ==)
--
-- > all p = map p and
-- > all p = fmap getAll . FL.foldMap (All . p)
--
-- | Returns 'True' if all elements of a stream satisfy a predicate.
--
-- >>> Stream.fold (Fold.all (== 0)) $ Stream.fromList [1,0,1]
-- False
--
-- > all p = Fold.lmap p Fold.and
--
-- @since 0.7.0
{-# INLINABLE all #-}
all :: Monad m => (a -> Bool) -> Fold m a Bool
@ -878,7 +888,7 @@ all predicate = mkFold_ step initial
-- | Returns 'True' if the given element is not present in the stream.
--
-- > notElem a = all (/= a)
-- > notElem a = Fold.all (/= a)
--
-- @since 0.7.0
{-# INLINABLE notElem #-}
@ -887,8 +897,7 @@ notElem a = all (a /=)
-- | Returns 'True' if all elements are 'True', 'False' otherwise
--
-- > and = all (== True)
-- > and = fmap getAll . FL.foldMap All
-- > and = Fold.all (== True)
--
-- @since 0.7.0
{-# INLINE and #-}
@ -897,8 +906,7 @@ and = all (== True)
-- | Returns 'True' if any element is 'True', 'False' otherwise
--
-- > or = any (== True)
-- > or = fmap getAny . FL.foldMap Any
-- > or = Fold.any (== True)
--
-- @since 0.7.0
{-# INLINE or #-}
@ -917,8 +925,6 @@ or = any (== True)
-- Binary APIs
------------------------------------------------------------------------------
-- XXX These would just be applicative compositions of terminating folds.
-- | @splitAt n f1 f2@ composes folds @f1@ and @f2@ such that first @n@
-- elements of its input are consumed by fold @f1@ and the rest of the stream
-- is consumed by fold @f2@.
@ -943,7 +949,7 @@ or = any (== True)
-- >>> splitAt_ 4 [1,2,3]
-- ([1,2,3],[])
--
-- > splitAt n f1 f2 = serialWith (,) (take n f1) f2
-- > splitAt n f1 f2 = Fold.serialWith (,) (Fold.take n f1) f2
--
-- /Internal/
@ -967,53 +973,22 @@ splitAt n fld = serialWith (,) (take n fld)
-- Note: Keep this consistent with S.splitOn. In fact we should eliminate
-- S.splitOn in favor of the fold.
--
-- | Consume the input until it encounters an infixed separator element (i.e.
-- when the supplied predicate succeeds), dropping the separator.
-- XXX Use Fold.many instead once it is fixed.
--
-- Repeated applications of 'takeEndBy_' splits the stream on separator
-- elements determined by the supplied predicate, separator is considered as
-- infixed between two segments, if one side of the separator is missing then
-- it is parsed as an empty stream. The supplied 'Fold' is applied on the
-- split segments. With '-' representing non-separator elements and '.' as
-- separator, repeated 'takeEndBy_' splits the stream as follows:
-- | Like 'takeEndBy' but drops the element on which the predicate succeeds.
--
-- @
-- "--.--" => "--" "--"
-- "--." => "--" ""
-- ".--" => "" "--"
-- @
-- >>> Stream.fold (Fold.takeEndBy_ (== '\n') Fold.toList) $ Stream.fromList "hello\nthere\n"
-- "hello"
--
-- Repeated applications of @Fold.takeEndBy_ (== x)@ on the input stream gives us
-- an inverse of @Stream.intercalate (Stream.yield x)@
-- >>> Stream.toList $ Stream.foldMany (Fold.takeEndBy_ (== '\n') Fold.toList) $ Stream.fromList "hello\nthere\n"
-- ["hello","there"]
--
-- > Stream.splitOn pred f = Stream.foldMany (Fold.takeEndBy_ pred f)
-- > Stream.splitOnSuffix p f = Stream.foldMany (Fold.takeEndBy_ p f)
--
-- Let's use the following definition for illustration:
-- See 'Streamly.Prelude.splitOnSuffix' for more details on splitting a
-- stream using 'takeEndBy_'.
--
-- >>> splitOn p = Stream.foldMany (Fold.takeEndBy_ pred Fold.toList)
-- >>> splitOn' p = Stream.toList . splitOn p . Stream.fromList
-- >>> splitOn' (== '.') ""
-- [""]
--
-- >>> splitOn' (== '.') "."
-- ["",""]
--
-- >>> splitOn' (== '.') ".a"
-- ["","a"]
--
-- >>> splitOn' (== '.') "a."
-- ["a",""]
--
-- >>> splitOn' (== '.') "a.b"
-- ["a","b"]
--
-- >>> splitOn' (== '.') "a..b"
-- ["a","","b"]
--
-- Stops - when the predicate succeeds.
--
-- /Pre-release/
-- @since 0.8.0
{-# INLINE takeEndBy_ #-}
takeEndBy_ :: Monad m => (a -> Bool) -> Fold m a b -> Fold m a b
takeEndBy_ predicate (Fold fstep finitial fextract) =
@ -1026,16 +1001,21 @@ takeEndBy_ predicate (Fold fstep finitial fextract) =
then fstep s a
else Done <$> fextract s
-- | Collect stream elements until an element succeeds the predicate. Also take
-- the element on which the predicate succeeded. The succeeding element is
-- treated as a suffix separator which is kept in the output segement.
-- | Take the input, stop when the predicate succeeds taking the succeeding
-- element as well.
--
-- * Stops - when the predicate succeeds.
-- >>> Stream.fold (Fold.takeEndBy (== '\n') Fold.toList) $ Stream.fromList "hello\nthere\n"
-- "hello\n"
--
-- > Stream.splitWithSuffix pred f = Stream.foldMany (Fold.takeEndBy pred f)
-- >>> Stream.toList $ Stream.foldMany (Fold.takeEndBy (== '\n') Fold.toList) $ Stream.fromList "hello\nthere\n"
-- ["hello\n","there\n"]
--
-- /Pre-release/
-- > Stream.splitWithSuffix p f = Stream.foldMany (Fold.takeEndBy p f)
--
-- See 'Streamly.Prelude.splitWithSuffix' for more details on splitting a
-- stream using 'takeEndBy'.
--
-- @since 0.8.0
{-# INLINE takeEndBy #-}
takeEndBy :: Monad m => (a -> Bool) -> Fold m a b -> Fold m a b
takeEndBy predicate (Fold fstep finitial fextract) =
@ -1086,6 +1066,8 @@ breakOn pat f m = undefined
-- >>> Stream.fold (Fold.tee Fold.sum Fold.length) (Stream.enumerateFromTo 1.0 100.0)
-- (5050.0,100)
--
-- > tee = teeWith (,)
--
-- @since 0.7.0
{-# INLINE tee #-}
tee :: Monad m => Fold m a b -> Fold m a c -> Fold m a (b,c)
@ -1110,6 +1092,8 @@ tee = teeWith (,)
-- >>> Stream.fold (Fold.distribute [Fold.sum, Fold.length]) (Stream.enumerateFromTo 1 5)
-- [15,5]
--
-- > distribute = Prelude.foldr (Fold.teeWith (:)) (Fold.yield [])
--
-- This is the consumer side dual of the producer side 'sequence' operation.
--
-- Stops when all the folds stop.
@ -1171,7 +1155,7 @@ distribute = foldr (teeWith (:)) (yield [])
--
-- /See also: 'partitionByFstM' and 'partitionByMinM'./
--
-- @since 0.7.0
-- /Pre-release/
{-# INLINE partitionByM #-}
partitionByM :: Monad m
=> (a -> m (Either b c)) -> Fold m b x -> Fold m c y -> Fold m a (x, y)
@ -1270,7 +1254,7 @@ partitionByMinM = undefined
-- :}
-- ("Even 50","Odd 50")
--
-- @since 0.7.0
-- /Pre-release/
{-# INLINE partitionBy #-}
partitionBy :: Monad m
=> (a -> Either b c) -> Fold m b x -> Fold m c y -> Fold m a (x, y)
@ -1346,7 +1330,9 @@ demuxWith f kv = fmap fst $ demuxDefaultWith f kv drain
-- :}
-- fromList [("PRODUCT",8),("SUM",4)]
--
-- @since 0.7.0
-- > demux = demuxWith id
--
-- /Pre-release/
{-# INLINE demux #-}
demux :: (Monad m, Ord k)
=> Map k (Fold m a b) -> Fold m (k, a) (Map k b)
@ -1476,6 +1462,10 @@ demuxDefaultWith f kv (Fold dstep dinitial dextract) =
b <- dextract dacc
return (doneMap, b)
-- |
-- > demuxDefault = demuxDefaultWith id
--
-- /Pre-release/
{-# INLINE demuxDefault #-}
demuxDefault :: (Monad m, Ord k)
=> Map k (Fold m a b) -> Fold m (k, a) b -> Fold m (k, a) (Map k b, b)
@ -1555,12 +1545,11 @@ classifyWith f (Fold step1 initial1 extract1) =
-- :}
-- fromList [("ONE",[1.0,1.1]),("TWO",[2.0,2.2])]
--
-- @since 0.7.0
--
-- Same as:
--
-- > classify fld = classifyWith fst (map snd fld)
-- > classify fld = Fold.classifyWith fst (map snd fld)
--
-- /Pre-release/
{-# INLINE classify #-}
classify :: (Monad m, Ord k) => Fold m a b -> Fold m (k, a) (Map k b)
classify fld = classifyWith fst (map snd fld)
@ -1571,9 +1560,9 @@ classify fld = classifyWith fst (map snd fld)
-- | Like 'unzipWith' but with a monadic splitter function.
--
-- -- @unzipWithM k f1 f2 = lmapM k (unzip f1 f2)@
-- @unzipWithM k f1 f2 = lmapM k (unzip f1 f2)@
--
-- @since 0.7.0
-- /Pre-release/
{-# INLINE unzipWithM #-}
unzipWithM :: Monad m
=> (a -> m (b,c)) -> Fold m b x -> Fold m c y -> Fold m a (x,y)
@ -1653,11 +1642,11 @@ unzipWithMinM = undefined
-- | Split elements in the input stream into two parts using a pure splitter
-- function, direct each part to a different fold and zip the results.
--
-- @unzipWith f fld1 fld2 = map f (unzip fld1 fld2)@
-- @unzipWith f fld1 fld2 = Fold.lmap f (Fold.unzip fld1 fld2)@
--
-- This fold terminates when both the input folds terminate.
--
-- @since 0.7.0
-- /Pre-release/
{-# INLINE unzipWith #-}
unzipWith :: Monad m
=> (a -> (b,c)) -> Fold m b x -> Fold m c y -> Fold m a (x,y)
@ -1674,6 +1663,8 @@ unzipWith f = unzipWithM (return . f)
--
-- @
--
-- > unzip = Fold.unzipWith id
--
-- This is the consumer side dual of the producer side 'zip' operation.
--
-- @since 0.7.0
@ -1773,6 +1764,8 @@ chunksBetween _low _high _f1 _f2 = undefined
-- /Warning!/ working on large streams accumulated as buffers in memory could
-- be very inefficient, consider using "Streamly.Data.Array" instead.
--
-- > toStream = mkFoldr K.cons K.nil
--
-- /Pre-release/
{-# INLINE toStream #-}
toStream :: Monad m => Fold m a (SerialT Identity a)
@ -1784,6 +1777,8 @@ toStream = mkFoldr K.cons K.nil
-- | Buffers the input stream to a pure stream in the reverse order of the
-- input.
--
-- > toStreamRev = mkFoldl (flip K.cons) K.nil
--
-- /Warning!/ working on large streams accumulated as buffers in memory could
-- be very inefficient, consider using "Streamly.Data.Array" instead.
--

View File

@ -6,10 +6,35 @@
-- Stability : experimental
-- Portability : GHC
--
-- The @Tee@ type is a newtype wrapper over the 'Fold' type providing
-- distributing 'Applicative', 'Semigroup', 'Monoid', 'Num', 'Floating' and
-- 'Fractional' instances. The input received by the composed 'Tee' is
-- replicated and distributed to both the constituent Tee's.
--
-- For example, to compute the average of numbers in a stream without going
-- through the stream twice:
--
-- >>> import Streamly.Internal.Data.Fold.Tee (mkT, toFold)
-- >>> import Streamly.Internal.Data.Fold as Fold
--
-- >>> avg = (/) <$> (mkT Fold.sum) <*> (mkT $ fmap fromIntegral Fold.length)
-- >>> Stream.fold (toFold avg) $ Stream.fromList [1.0..100.0]
-- 50.5
--
-- Similarly, the 'Semigroup' and 'Monoid' instances of 'Tee' distribute the
-- input to both the folds and combines the outputs using Monoid or Semigroup
-- instances of the output types:
--
-- >>> import Data.Monoid (Sum(..))
-- >>> t = mkT Fold.head <> mkT Fold.last
-- >>> Stream.fold (toFold t) (fmap Sum $ Stream.enumerateFromTo 1.0 100.0)
-- Just (Sum {getSum = 101.0})
--
-- The 'Num', 'Floating', and 'Fractional' instances work in the same way.
--
module Streamly.Internal.Data.Fold.Tee
( Tee(..)
, fromFold
, toFold
, mkT
)
where
@ -22,44 +47,44 @@ import Streamly.Internal.Data.Fold.Type (Fold)
import qualified Streamly.Internal.Data.Fold.Type as Fold
-- | The type @Tee m a b@ represents a left fold over an input stream of values
-- of type @a@ to a single value of type @b@ in 'Monad' @m@.
--
-- @Tee@ is a wrapper over 'Fold' that uses 'teeWith' to define the applicative
-- instance.
-- | @Tee@ is a newtype wrapper over the 'Fold' type providing distributing
-- 'Applicative', 'Semigroup', 'Monoid', 'Num', 'Floating' and 'Fractional'
-- instances.
--
-- /Pre-release/
newtype Tee m a b =
Tee { runTee :: Fold m a b }
Tee { toFold :: Fold m a b }
deriving (Functor)
-- | Convert a 'Tee' to 'Fold'.
{-# INLINE toFold #-}
toFold :: Tee m a b -> Fold m a b
toFold = coerce
-- | Make a 'Tee' from a 'Fold'.
--
-- /Pre-release/
{-# INLINE mkT #-}
mkT :: Fold m a b -> Tee m a b
mkT = coerce
-- | Convert a 'Fold' to 'Tee'.
{-# INLINE fromFold #-}
fromFold :: Fold m a b -> Tee m a b
fromFold = coerce
-- | The 'Tee' resulting from '<*>' distributes its input to both the argument
-- 'Tee's and combines their output using function application.
-- | '<*>' distributes the input to both the argument 'Tee's and combines their
-- outputs using function application.
--
instance Monad m => Applicative (Tee m a) where
{-# INLINE pure #-}
pure a = fromFold (Fold.yield a)
pure a = mkT (Fold.yield a)
{-# INLINE (<*>) #-}
(<*>) a b = fromFold (Fold.teeWith ($) (toFold a) (toFold b))
(<*>) a b = mkT (Fold.teeWith ($) (toFold a) (toFold b))
-- | Combines the outputs (the type @b@) using their 'Semigroup' instances.
-- | '<>' distributes the input to both the argument 'Tee's and combines their
-- outputs using the 'Semigroup' instance of the output type.
--
instance (Semigroup b, Monad m) => Semigroup (Tee m a b) where
{-# INLINE (<>) #-}
(<>) = liftA2 (<>)
-- | Combines the outputs (the type @b@) using their 'Monoid' instances.
-- | '<>' distributes the input to both the argument 'Tee's and combines their
-- outputs using the 'Monoid' instance of the output type.
--
instance (Semigroup b, Monoid b, Monad m) => Monoid (Tee m a b) where
{-# INLINE mempty #-}
mempty = pure mempty
@ -67,7 +92,9 @@ instance (Semigroup b, Monoid b, Monad m) => Monoid (Tee m a b) where
{-# INLINE mappend #-}
mappend = (<>)
-- | Combines the outputs (type @b@) using their 'Num' instances.
-- | Binary 'Num' operations distribute the input to both the argument 'Tee's
-- and combine their outputs using the 'Num' instance of the output type.
--
instance (Monad m, Num b) => Num (Tee m a b) where
{-# INLINE fromInteger #-}
fromInteger = pure . fromInteger
@ -90,7 +117,10 @@ instance (Monad m, Num b) => Num (Tee m a b) where
{-# INLINE (-) #-}
(-) = liftA2 (-)
-- | Combines the outputs (type @b@) using their 'Fractional' instances.
-- | Binary 'Fractional' operations distribute the input to both the argument
-- 'Tee's and combine their outputs using the 'Fractional' instance of the
-- output type.
--
instance (Monad m, Fractional b) => Fractional (Tee m a b) where
{-# INLINE fromRational #-}
fromRational = pure . fromRational
@ -101,7 +131,9 @@ instance (Monad m, Fractional b) => Fractional (Tee m a b) where
{-# INLINE (/) #-}
(/) = liftA2 (/)
-- | Combines the outputs using their 'Floating' instances.
-- | Binary 'Floating' operations distribute the input to both the argument
-- 'Tee's and combine their outputs using the 'Floating' instance of the output
-- type.
instance (Monad m, Floating b) => Floating (Tee m a b) where
{-# INLINE pi #-}
pi = pure pi

View File

@ -164,16 +164,41 @@
-- can be used for the scan use case, instead of using extract. Extract would
-- then be used only for the case when the stream stops before the fold
-- completes.
--
-- = Accumulators and Terminating Folds
--
-- Folds in this module can be classified in two categories viz. accumulators
-- and terminating folds. Accumulators do not have a terminating condition,
-- they run forever and consume the entire stream, for example the 'length'
-- fold. Terminating folds have a terminating condition and can terminate
-- without consuming the entire stream, for example, the 'head' fold.
--
-- = Monoids
--
-- Monoids allow generalized, modular folding. The accumulators in this module
-- can be expressed using 'mconcat' and a suitable 'Monoid'. Instead of
-- writing folds we can write Monoids and turn them into folds.
--
-- = Performance Notes
--
-- 'Streamly.Prelude' module provides fold functions to directly fold streams
-- e.g. Streamly.Prelude/'Streamly.Prelude.sum' serves the same purpose as
-- Fold/'sum'. However, the functions in Streamly.Prelude cannot be
-- efficiently combined together e.g. we cannot drive the input stream through
-- @sum@ and @length@ fold functions simultaneously. Using the 'Fold' type we
-- can efficiently split the stream across multiple folds because it allows the
-- compiler to perform stream fusion optimizations.
--
module Streamly.Internal.Data.Fold.Type
(
-- * Types
Step (..)
, Fold (..)
-- * Fold constructors
-- * Constructors
, mkFoldl
, mkFoldlM
, mkFoldl1
, mkFoldr
, mkFoldrM
, mkFold
@ -181,55 +206,64 @@ module Streamly.Internal.Data.Fold.Type
, mkFoldM
, mkFoldM_
-- * Fold2
, Fold2 (..)
, simplify
-- * Basic Folds
-- * Folds
, yield
, yieldM
, drain
, toList
-- * Generators
, yield
, yieldM
-- * Combinators
-- * Transformations
-- ** Mapping output
, rmapM
-- ** Mapping Input
, map
, lmap
, lmapM
-- ** Filtering
, filter
, filterM
, catMaybes
-- ** Trimming
, take
, takeInterval
-- * Distributing
, teeWith
, teeWithFst
, teeWithMin
, shortest
, longest
-- * Serial Application
-- ** Serial Append
, serialWith
, serial_
-- * Nested Application
, concatMap
-- ** Parallel Distribution
, GenericRunner(..)
, teeWith
, teeWithFst
, teeWithMin
-- ** Parallel Alternative
, shortest
, longest
-- ** Splitting
, ManyState
, many
, manyPost
, intervalsOf
, chunksOf
, chunksOf2
, intervalsOf
-- ** Nesting
, concatMap
-- * Running Partially
, duplicate
, initialize
, runStep
-- * Misc
, GenericRunner(..) -- Is used in multiple step functions
-- * Fold2
, Fold2 (..)
, simplify
, chunksOf2
)
where
@ -242,6 +276,7 @@ import Control.Monad.Trans.Control (control)
import Data.Bifunctor (Bifunctor(..))
import Data.Maybe (isJust, fromJust)
import Fusion.Plugin.Types (Fuse(..))
import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe)
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..))
import Streamly.Internal.Data.SVar (MonadAsync)
@ -255,9 +290,14 @@ import Prelude hiding (concatMap, filter, map, take)
-- >>> import qualified Streamly.Internal.Data.Fold as Fold
------------------------------------------------------------------------------
-- Monadic left folds
-- Step of a fold
------------------------------------------------------------------------------
-- The Step functor around b allows expressing early termination like a right
-- fold. Traditional list right folds use function composition and laziness to
-- terminate early whereas we use data constructors. It allows stream fusion in
-- contrast to the foldr/build fusion when composing with functions.
-- | Represents the result of the @step@ of a 'Fold'. 'Partial' returns an
-- intermediate state of the fold, the fold step can be called again with the
-- state or the driver can use @extract@ on the state to get the result out.
@ -270,10 +310,7 @@ data Step s b
= Partial !s
| Done !b
-- | A bifunctor instance on 'Step'. @first@ maps on the value held by 'Partial'
-- and @second@ maps on the result held by 'Done'.
--
-- /Pre-release/
-- | 'first' maps over 'Partial' and 'second' maps over 'Done'.
--
instance Bifunctor Step where
{-# INLINE bimap #-}
@ -288,17 +325,19 @@ instance Bifunctor Step where
second _ (Partial x) = Partial x
second f (Done a) = Done (f a)
-- | Maps the function over the result held by 'Done'.
-- | 'fmap' maps over 'Done'.
--
-- @
-- fmap = 'second'
-- @
--
-- /Pre-release/
--
instance Functor (Step s) where
{-# INLINE fmap #-}
fmap = second
-- | Map a monadic function over the result @b@ in @Step s b@.
--
-- /Internal/
{-# INLINE mapMStep #-}
mapMStep :: Applicative m => (a -> m b) -> Step s a -> m (Step s b)
mapMStep f res =
@ -306,25 +345,27 @@ mapMStep f res =
Partial s -> pure $ Partial s
Done b -> Done <$> f b
-- The Step functor around b allows expressing early termination like a right
-- fold. Traditional list right folds use function composition and laziness to
-- terminate early whereas we use data constructors. It allows stream fusion in
-- contrast to the foldr/build fusion when composing with functions.
------------------------------------------------------------------------------
-- The Fold type
------------------------------------------------------------------------------
-- | The type @Fold m a b@ having constructor @Fold step initial extract@
-- represents a left fold over an input stream of values of type @a@ to a
-- single value of type @b@ in 'Monad' @m@. The constructor is not exposed via
-- exposed modules, smart constructors are provided to create folds.
-- represents a fold over an input stream of values of type @a@ to a final
-- value of type @b@ in 'Monad' @m@.
--
-- The fold uses an intermediate state @s@ as accumulator, the type @s@ is
-- specific to the fold. The initial value of the fold state @s@ is returned by
-- @initial@. The @step@ function consumes an input and either returns the
-- final result @b@ if the fold is done or the next intermediate state (see
-- 'Step'). At any point the fold driver can extract the result from the
-- intermediate state using the @extract@ function.
-- internal to the specific fold definition. The initial value of the fold
-- state @s@ is returned by @initial@. The @step@ function consumes an input
-- and either returns the final result @b@ if the fold is done or the next
-- intermediate state (see 'Step'). At any point the fold driver can extract
-- the result from the intermediate state using the @extract@ function.
--
-- NOTE: If you think you need the constructor of this type please consider
-- using the smart constructors in "Streamly.Internal.Data.Fold' instead.
-- NOTE: The constructor is not yet exposed via exposed modules, smart
-- constructors are provided to create folds. If you think you need the
-- constructor of this type please consider using the smart constructors in
-- "Streamly.Internal.Data.Fold' instead.
--
-- /since 0.8.0 (type changed)/
--
-- @since 0.7.0
@ -332,6 +373,10 @@ data Fold m a b =
-- | @Fold @ @ step @ @ initial @ @ extract@
forall s. Fold (s -> a -> m (Step s b)) (m (Step s b)) (s -> m b)
------------------------------------------------------------------------------
-- Mapping on the output
------------------------------------------------------------------------------
-- | Map a monadic function on the output of a fold.
--
-- @since 0.8.0
@ -361,7 +406,9 @@ rmapM f (Fold step initial extract) = Fold step1 initial1 (extract >=> f)
-- mkfoldlx step initial extract = fmap extract (mkFoldl step initial)
-- @
--
-- /Pre-release/
-- See also: "Streamly.Prelude.foldl'"
--
-- @since 0.8.0
--
{-# INLINE mkFoldl #-}
mkFoldl :: Monad m => (b -> a -> b) -> b -> Fold m a b
@ -377,42 +424,65 @@ mkFoldl step initial =
-- A fold with an extract function can be expressed using rmapM:
--
-- @
-- mkAccumM :: Functor m => (s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
-- mkAccumM step initial extract = rmapM extract (mkFoldlM step initial)
-- mkFoldlxM :: Functor m => (s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
-- mkFoldlxM step initial extract = rmapM extract (mkFoldlM step initial)
-- @
--
-- /Pre-release/
-- See also: "Streamly.Prelude.foldlM'"
--
-- @since 0.8.0
--
{-# INLINE mkFoldlM #-}
mkFoldlM :: Monad m => (b -> a -> m b) -> m b -> Fold m a b
mkFoldlM step initial =
Fold (\s a -> Partial <$> step s a) (Partial <$> initial) return
-- | Make a strict left fold, for non-empty streams, using first element as the
-- starting value. Returns Nothing if the stream is empty.
--
-- See also: "Streamly.Prelude.foldl1'"
--
-- /Pre-release/
{-# INLINE mkFoldl1 #-}
mkFoldl1 :: Monad m => (a -> a -> a) -> Fold m a (Maybe a)
mkFoldl1 step = fmap toMaybe $ mkFoldl step1 Nothing'
where
step1 Nothing' a = Just' a
step1 (Just' x) a = Just' $ step x a
------------------------------------------------------------------------------
-- Right fold constructors
------------------------------------------------------------------------------
-- | Make a fold using a right fold style step function and a terminal value.
-- It performs a right fold via a left fold using function composition.
-- This can be useful for constructing structures. For reductions this may be
-- very inefficient compared to using a direct fold implementation using
-- 'mkFold'.
-- It performs a strict right fold via a left fold using function composition.
-- Note that this is strict fold, it can only be useful for constructing strict
-- structures in memory. For reductions this will be very inefficient.
--
-- For example,
--
-- > toList = mkFoldr (:) []
--
-- /Pre-release/
-- See also: "Streamly.Prelude.foldr"
--
-- @since 0.8.0
{-# INLINE mkFoldr #-}
mkFoldr :: Monad m => (a -> b -> b) -> b -> Fold m a b
mkFoldr g z = fmap ($ z) $ mkFoldl (\f x -> f . g x) id
-- XXX we have not seen any use of this yet, not releasing until we have a use
-- case.
--
-- | Like 'mkFoldr' but with a monadic step function.
--
-- For example,
--
-- > toList = mkFoldrM (\a xs -> return $ a : xs) (return [])
--
-- See also: "Streamly.Prelude.foldrM"
--
-- /Pre-release/
{-# INLINE mkFoldrM #-}
mkFoldrM :: Monad m => (a -> b -> m b) -> m b -> Fold m a b
@ -423,6 +493,17 @@ mkFoldrM g z =
-- General fold constructors
------------------------------------------------------------------------------
-- XXX If the Step yield gives the result each time along with the state then
-- we can make the type of this as
--
-- mkFold :: Monad m => (s -> a -> Step s b) -> Step s b -> Fold m a b
--
-- Then similar to mkFoldl and mkFoldr we can just fmap extract on it to extend
-- it to the version where an 'extract' function is required. Or do we even
-- need that?
--
-- Until we investigate this we are not releasing these.
-- | Make a terminating fold using a pure step function, a pure initial state
-- and a pure state extraction function.
--
@ -513,6 +594,8 @@ drain = mkFoldl (\_ _ -> ()) ()
-- very inefficient, consider using "Streamly.Data.Array.Foreign"
-- instead.
--
-- > toList = mkFoldr (:) []
--
-- @since 0.7.0
{-# INLINABLE toList #-}
toList :: Monad m => Fold m a [a]
@ -558,16 +641,26 @@ yieldM b = Fold undefined (Done <$> b) pure
data SeqFoldState sl f sr = SeqFoldL !sl | SeqFoldR !f !sr
-- | Sequential fold application. Apply two folds sequentially to an input
-- stream. The input is provided to the first fold, when it is done the
-- stream. The input is provided to the first fold, when it is done - the
-- remaining input is provided to the second fold. When the second fold is done
-- or if the input stream is over, the outputs of the two folds are combined
-- using the supplied function.
--
-- Note: This is a folding dual of appending streams using
-- 'Streamly.Prelude.serial', it splits the streams using two folds and zips
-- the results. This has the same caveats as ParseD's @serialWith@
-- >>> f = Fold.serialWith (,) (Fold.take 8 Fold.toList) (Fold.takeEndBy (== '\n') Fold.toList)
-- >>> Stream.fold f $ Stream.fromList "header: hello\n"
-- ("header: ","hello\n")
--
-- /Pre-release/
-- Note: This is dual to appending streams using 'Streamly.Prelude.serial'.
--
-- Note: this implementation allows for stream fusion but has quadratic time
-- complexity, because each composition adds a new branch that each subsequent
-- fold's input element has to traverse, therefore, it cannot scale to a large
-- number of compositions. After around 100 compositions the performance starts
-- dipping rapidly compared to a CPS style implementation.
--
-- /Time: O(n^2) where n is the number of compositions./
--
-- @since 0.8.0
--
{-# INLINE serialWith #-}
serialWith :: Monad m => (a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c
@ -627,6 +720,17 @@ data GenericRunner sL sR bL bR
-- | @teeWith k f1 f2@ distributes its input to both @f1@ and @f2@ until both
-- of them terminate and combines their output using @k@.
--
-- >>> avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)
-- >>> Stream.fold avg $ Stream.fromList [1.0..100.0]
-- 50.5
--
-- > teeWith k f1 f2 = fmap (uncurry k) ((Fold.tee f1 f2)
--
-- For applicative composition using this combinator see
-- "Streamly.Internal.Data.Fold.Tee".
--
-- See also: "Streamly.Internal.Data.Fold.Tee"
--
-- @since 0.8.0
--
{-# INLINE teeWith #-}
@ -687,7 +791,7 @@ teeWith f (Fold stepL beginL doneL) (Fold stepR beginR doneR) =
bR <- doneR sR
return $ f bL bR
-- | Like 'teeWith' but terminates when the first fold terminates.
-- | Like 'teeWith' but terminates as soon as the first fold terminates.
--
-- /Unimplemented/
--
@ -695,7 +799,8 @@ teeWith f (Fold stepL beginL doneL) (Fold stepR beginR doneR) =
teeWithFst :: (b -> c -> d) -> Fold m a b -> Fold m a c -> Fold m a d
teeWithFst = undefined
-- | Like 'teeWith' but terminates when any fold terminates.
-- | Like 'teeWith' but terminates as soon as any one of the two folds
-- terminates.
--
-- /Unimplemented/
--
@ -729,14 +834,26 @@ data ConcatMapState m sa a c
= B !sa
| forall s. C (s -> a -> m (Step s c)) !s (s -> m c)
-- Compare with foldIterate.
--
-- | Map a 'Fold' returning function on the result of a 'Fold' and run the
-- returned fold.
-- returned fold. This operation can be used to express data dependencies
-- between fold operations.
--
-- Let's say the first element in the stream is a count of the following
-- elements that we have to add, then:
--
-- >>> import Data.Maybe (fromJust)
-- >>> Stream.fold (Fold.concatMap (flip Fold.take Fold.sum) (Fold.rmapM (return . fromJust) Fold.head)) $ Stream.fromList [10,9..1]
-- >>> count = fmap fromJust Fold.head
-- >>> total n = Fold.take n Fold.sum
-- >>> Stream.fold (Fold.concatMap total count) $ Stream.fromList [10,9..1]
-- 45
--
-- /Pre-release/
-- /Time: O(n^2) where @n@ is the number of compositions./
--
-- See also: "Streamly.Internal.Data.Stream.IsStream.foldIterateM"
--
-- @since 0.8.0
--
{-# INLINE concatMap #-}
concatMap :: Monad m => (b -> Fold m a c) -> Fold m a b -> Fold m a c
@ -781,12 +898,14 @@ concatMap f (Fold stepa initiala extracta) = Fold stepc initialc extractc
-- Mapping on input
------------------------------------------------------------------------------
-- | @(lmap f fold)@ maps the function @f@ on the input of the fold.
-- | @lmap f fold@ maps the function @f@ on the input of the fold.
--
-- >>> Stream.fold (Fold.lmap (\x -> x * x) Fold.sum) (Stream.enumerateFromTo 1 100)
-- 338350
--
-- /Pre-release/
-- > lmap = Fold.lmapM return
--
-- @since 0.8.0
{-# INLINABLE lmap #-}
lmap :: (a -> b) -> Fold m b r -> Fold m a r
lmap f (Fold step begin done) = Fold step' begin done
@ -800,9 +919,9 @@ lmap f (Fold step begin done) = Fold step' begin done
map :: (a -> b) -> Fold m b r -> Fold m a r
map = lmap
-- | @(lmapM f fold)@ maps the monadic function @f@ on the input of the fold.
-- | @lmapM f fold@ maps the monadic function @f@ on the input of the fold.
--
-- /Pre-release/
-- @since 0.8.0
{-# INLINABLE lmapM #-}
lmapM :: Monad m => (a -> m b) -> Fold m b r -> Fold m a r
lmapM f (Fold step begin done) = Fold step' begin done
@ -818,10 +937,9 @@ lmapM f (Fold step begin done) = Fold step' begin done
-- >>> Stream.fold (Fold.filter (> 5) Fold.sum) $ Stream.fromList [1..10]
-- 40
--
-- >>> Stream.fold (Fold.filter (< 5) Fold.sum) $ Stream.fromList [1..10]
-- 10
-- > filter f = Fold.filterM (return . f)
--
-- @since 0.7.0
-- @since 0.8.0
{-# INLINABLE filter #-}
filter :: Monad m => (a -> Bool) -> Fold m a r -> Fold m a r
filter f (Fold step begin done) = Fold step' begin done
@ -830,7 +948,7 @@ filter f (Fold step begin done) = Fold step' begin done
-- | Like 'filter' but with a monadic predicate.
--
-- @since 0.7.0
-- @since 0.8.0
{-# INLINABLE filterM #-}
filterM :: Monad m => (a -> m Bool) -> Fold m a r -> Fold m a r
filterM f (Fold step begin done) = Fold step' begin done
@ -842,7 +960,7 @@ filterM f (Fold step begin done) = Fold step' begin done
-- | Modify a fold to receive a 'Maybe' input, the 'Just' values are unwrapped
-- and sent to the original fold, 'Nothing' values are discarded.
--
-- /Pre-release/
-- @since 0.8.0
{-# INLINE catMaybes #-}
catMaybes :: Monad m => Fold m a b -> Fold m (Maybe a) b
catMaybes = filter isJust . map fromJust
@ -851,15 +969,13 @@ catMaybes = filter isJust . map fromJust
-- Parsing
------------------------------------------------------------------------------
-- | Take at most @n@ input elements and fold them using the supplied fold.
-- | Take at most @n@ input elements and fold them using the supplied fold. A
-- negative count is treated as 0.
--
-- >>> Stream.fold (Fold.take 1 Fold.toList) $ Stream.fromList [1]
-- [1]
-- >>> Stream.fold (Fold.take 2 Fold.toList) $ Stream.fromList [1..10]
-- [1,2]
--
-- >>> Stream.fold (Fold.take (-1) Fold.toList) $ Stream.fromList [1]
-- []
--
-- /Pre-release/
-- @since 0.8.0
{-# INLINE take #-}
take :: Monad m => Int -> Fold m a b -> Fold m a b
take n (Fold fstep finitial fextract) = Fold step initial extract
@ -898,11 +1014,13 @@ take n (Fold fstep finitial fextract) = Fold step initial extract
-- i.e. it uses the last accumulator value as the initial value of the
-- accumulator. Thus we can resume the fold later and feed it more input.
--
-- >> do
-- > more <- S.fold (FL.duplicate FL.sum) (S.enumerateFromTo 1 10)
-- > evenMore <- S.fold (FL.duplicate more) (S.enumerateFromTo 11 20)
-- > S.fold evenMore (S.enumerateFromTo 21 30)
-- > 465
-- >>> :{
-- do
-- more <- Stream.fold (Fold.duplicate Fold.sum) (Stream.enumerateFromTo 1 10)
-- evenMore <- Stream.fold (Fold.duplicate more) (Stream.enumerateFromTo 11 20)
-- Stream.fold evenMore (Stream.enumerateFromTo 21 30)
-- :}
-- 465
--
-- /Pre-release/
{-# INLINABLE duplicate #-}
@ -959,11 +1077,16 @@ data ManyState s1 s2
-- the @split@ fold repeatedly on the input stream and accumulates zero or more
-- fold results using @collect@.
--
-- >>> two = Fold.take 2 Fold.toList
-- >>> twos = Fold.many two Fold.toList
-- >>> Stream.fold twos $ Stream.fromList [1..10]
-- [[1,2],[3,4],[5,6],[7,8],[9,10]]
--
-- Stops when @collect@ stops.
--
-- /Pre-release/
-- See also: "Streamly.Prelude.concatMap", "Streamly.Prelude.foldMany"
--
-- /See also: Streamly.Prelude.concatMap, Streamly.Prelude.foldMany/
-- @since 0.8.0
--
{-# INLINE many #-}
many :: Monad m => Fold m a b -> Fold m b c -> Fold m a c
@ -1077,11 +1200,15 @@ manyPost (Fold sstep sinitial sextract) (Fold cstep cinitial cextract) =
-- of @n@ items in the input stream and supplies the result to the @collect@
-- fold.
--
-- >>> twos = Fold.chunksOf 2 Fold.toList Fold.toList
-- >>> Stream.fold twos $ Stream.fromList [1..10]
-- [[1,2],[3,4],[5,6],[7,8],[9,10]]
--
-- > chunksOf n split = many (take n split)
--
-- Stops when @collect@ stops.
--
-- /Pre-release/
-- @since 0.8.0
--
{-# INLINE chunksOf #-}
chunksOf :: Monad m => Int -> Fold m a b -> Fold m b c -> Fold m a c
@ -1119,10 +1246,15 @@ chunksOf2 n (Fold step1 initial1 extract1) (Fold2 step2 inject2 extract2) =
-- XXX We can use asyncClock here. A parser can be used to return an input that
-- arrives after the timeout.
-- XXX If n is 0 return immediately in initial.
-- XXX we should probably discard the input received after the timeout like
-- takeEndBy_.
--
-- | @takeInterval n fold@ uses @fold@ to fold the input items arriving within
-- a window of first @n@ seconds.
--
-- >>> Stream.fold (Fold.takeInterval 1.0 Fold.toList) $ Stream.delay 0.1 $ Stream.fromList [1..]
-- [1,2,3,4,5,6,7,8,9,10,11]
--
-- Stops when @fold@ stops or when the timeout occurs. Note that the fold needs
-- an input after the timeout to stop. For example, if no input is pushed to
-- the fold until one hour after the timeout had occurred, then the fold will
@ -1177,18 +1309,16 @@ takeInterval n (Fold step initial done) = Fold step' initial' done'
handleChildException :: MVar Bool -> SomeException -> IO ()
handleChildException mv _ = void $ swapMVar mv True
-- | Group the input stream into windows of n second each and then fold each
-- group using the provided fold function.
--
-- For example, we can copy and distribute a stream to multiple folds where
-- each fold can group the input differently e.g. by one second, one minute and
-- one hour windows respectively and fold each resulting stream of folds.
-- | Group the input stream into windows of n second each using the first fold
-- and then fold the resulting groups using the second fold.
--
-- @
--
-- -----Fold m a b----|-Fold n a c-|-Fold n a c-|-...-|----Fold m a c
--
-- @
-- >>> intervals = Fold.intervalsOf 0.5 Fold.toList Fold.toList
-- >>> Stream.fold intervals $ Stream.delay 0.2 $ Stream.fromList [1..10]
-- [[1,2,3,4],[5,6,7],[8,9,10]]
--
-- > intervalsOf n split = many (takeInterval n split)
--

View File

@ -332,14 +332,17 @@ runSink = fold . toFold
-- Running a Parser
------------------------------------------------------------------------------
-- | Parse a stream using the supplied 'Parser'.
-- | Parse a stream using the supplied ParserD 'PRD.Parser'.
--
-- /Pre-release/
-- /Internal/
--
{-# INLINE_NORMAL parseD #-}
parseD :: MonadThrow m => PRD.Parser m a b -> SerialT m a -> m b
parseD p = D.parse p . toStreamD
-- | Parse a stream using the supplied ParserK 'PRK.Parser'.
--
-- /Internal/
{-# INLINE parseK #-}
parseK :: MonadThrow m => PRK.Parser m a b -> SerialT m a -> m b
parseK = parse
@ -877,7 +880,7 @@ isPrefixOf m1 m2 = D.isPrefixOf (toStreamD m1) (toStreamD m2)
--
-- /Pre-release/
--
-- /Requires 'Storable' constraint/ - Help wanted.
-- /Requires 'Storable' constraint/
--
{-# INLINE isInfixOf #-}
isInfixOf :: (MonadIO m, Eq a, Enum a, Storable a)

View File

@ -980,7 +980,7 @@ foldManyPost f m = D.fromStreamD $ D.foldManyPost f (D.toStreamD m)
-- Note @foldMany (take 0)@ would result in an infinite loop in a non-empty
-- stream.
--
-- /Pre-release/
-- @since 0.8.0
--
{-# INLINE foldMany #-}
foldMany

View File

@ -1192,7 +1192,7 @@ interspersePrefix_ m = mapM (\x -> void m >> return x)
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),2)
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)
--
-- /Pre-release/
-- @since 0.8.0
--
{-# INLINE delay #-}
delay :: (IsStream t, MonadIO m) => Double -> t m a -> t m a

View File

@ -98,7 +98,6 @@ import Streamly.Data.Fold (Fold)
-- import Streamly.String (encodeUtf8, decodeUtf8, foldLines)
import qualified Streamly.Data.Fold as FL
import qualified Streamly.Internal.Data.Fold.Type as FL
import qualified Streamly.Internal.Data.Unfold as UF
import qualified Streamly.Internal.Data.Array.Foreign as IA
import qualified Streamly.Data.Array.Foreign as A

View File

@ -247,6 +247,7 @@ module Streamly.Prelude
-- $runningfolds
, fold
, foldMany
-- ** Full Folds
-- | Folds that are guaranteed to evaluate the whole stream.
@ -343,6 +344,7 @@ module Streamly.Prelude
, mapM_
, trace
, tap
, delay
-- ** Scanning
--

View File

@ -134,7 +134,6 @@ import Streamly.Prelude (avgRate, rate, maxBuffer, maxThreads)
import qualified Streamly.Prelude as S
import qualified Streamly.Data.Fold as FL
import qualified Streamly.Internal.Data.Stream.IsStream as S
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Unfold as UF
import Streamly.Test.Common