mirror of
https://github.com/ilyakooo0/streamly.git
synced 2024-10-26 18:10:11 +03:00
Update Fold docs, move containers ops to streamly pkg
Move the container dependent code from streamly-core to streamly Rename some functions. Release some functions.
This commit is contained in:
parent
443a2c6c22
commit
703823b3d5
@ -28,6 +28,7 @@ import Streamly.Internal.Data.Fold (Fold(..))
|
||||
import Streamly.Internal.Data.IsMap.HashMap ()
|
||||
|
||||
import qualified Streamly.Internal.Data.Fold as FL
|
||||
import qualified Streamly.Internal.Data.Fold.Extra as FL
|
||||
import qualified Streamly.Internal.Data.Unfold as Unfold
|
||||
import qualified Streamly.Internal.Data.Pipe as Pipe
|
||||
import qualified Streamly.Internal.Data.Stream as Stream
|
||||
|
@ -45,6 +45,7 @@ import Streamly.Benchmark.Prelude
|
||||
, sourceFoldMapWithStream, concatFoldableWith, concatForFoldableWith)
|
||||
#else
|
||||
import qualified Streamly.Internal.Data.Stream as S
|
||||
import qualified Streamly.Internal.Data.Stream.Extra as S
|
||||
#endif
|
||||
import qualified Streamly.Internal.Data.Unfold as UF
|
||||
import qualified Streamly.Internal.Data.Fold as Fold
|
||||
|
@ -15,8 +15,8 @@
|
||||
-- >>> import qualified Streamly.Data.Stream as Stream
|
||||
--
|
||||
-- For example, a 'sum' Fold represents adding the input to the accumulated
|
||||
-- sum. A fold driver e.g. 'Streamly.Prelude.fold' pushes values from a stream
|
||||
-- to the 'Fold' one at a time, reducing the stream to a single value.
|
||||
-- sum. A fold driver pushes values from a stream to the 'Fold' one at a time,
|
||||
-- reducing the stream to a single value.
|
||||
--
|
||||
-- >>> Stream.fold Fold.sum $ Stream.fromList [1..100]
|
||||
-- 5050
|
||||
@ -93,7 +93,7 @@
|
||||
--
|
||||
-- We can often use streams or folds to achieve the same goal. However, streams
|
||||
-- are more efficient in composition of producers (e.g.
|
||||
-- 'Streamly.Prelude.serial' or 'Streamly.Prelude.mergeBy') whereas folds are
|
||||
-- 'Data.Stream.append' or 'Data.Stream.mergeBy') whereas folds are
|
||||
-- more efficient in composition of consumers (e.g. 'serialWith', 'partition'
|
||||
-- or 'teeWith').
|
||||
--
|
||||
@ -135,7 +135,8 @@ module Streamly.Data.Fold
|
||||
-- * Constructors
|
||||
, foldl'
|
||||
, foldlM'
|
||||
, foldr
|
||||
, foldl1'
|
||||
, foldr'
|
||||
|
||||
-- * Folds
|
||||
-- ** Accumulators
|
||||
@ -152,17 +153,10 @@ module Streamly.Data.Fold
|
||||
|
||||
-- Reducers
|
||||
, drain
|
||||
, drainBy
|
||||
, the
|
||||
, uniq
|
||||
, last
|
||||
, drainMapM
|
||||
, length
|
||||
, sum
|
||||
, product
|
||||
, maximumBy
|
||||
, maximum
|
||||
, minimumBy
|
||||
, minimum
|
||||
, mean
|
||||
, rollingHash
|
||||
, rollingHashWithSalt
|
||||
@ -171,16 +165,38 @@ module Streamly.Data.Fold
|
||||
, toList
|
||||
, toListRev
|
||||
|
||||
-- ** Non-Empty Accumulators
|
||||
-- | Accumulators that do not have a default value, therefore, return
|
||||
-- 'Nothing' on an empty stream.
|
||||
, latest
|
||||
, maximumBy
|
||||
, maximum
|
||||
, minimumBy
|
||||
, minimum
|
||||
|
||||
-- ** Filtering Scanners
|
||||
-- | Accumulators that are usually run as a scan using the 'scanMaybe'
|
||||
-- combinator.
|
||||
, uniq
|
||||
, uniqBy
|
||||
, deleteBy
|
||||
, findIndices
|
||||
, elemIndices
|
||||
|
||||
-- ** Terminating Folds
|
||||
-- | These are much like lazy right folds.
|
||||
|
||||
, index
|
||||
, one
|
||||
, null
|
||||
-- , satisfy
|
||||
-- , maybe
|
||||
|
||||
, index
|
||||
, the
|
||||
, find
|
||||
, lookup
|
||||
, findIndex
|
||||
, elemIndex
|
||||
, null
|
||||
, elem
|
||||
, notElem
|
||||
, all
|
||||
@ -188,24 +204,36 @@ module Streamly.Data.Fold
|
||||
, and
|
||||
, or
|
||||
|
||||
-- * Running A Fold
|
||||
, drive
|
||||
, driveBreak
|
||||
|
||||
-- * Building Incrmentally
|
||||
, extractM
|
||||
, reduce
|
||||
, snoc
|
||||
-- , snocl
|
||||
, snocM
|
||||
-- , snoclM
|
||||
, augment
|
||||
, duplicate
|
||||
-- , isClosed
|
||||
|
||||
-- * Combinators
|
||||
-- | Combinators are modifiers of folds. In the type @Fold m a b@, @a@ is
|
||||
-- the input type and @b@ is the output type. Transformations can be
|
||||
-- applied either on the input side or on the output side. Therefore,
|
||||
-- combinators are of one of the following general shapes:
|
||||
-- applied either on the input side (contravariant) or on the output side
|
||||
-- (covariant). Therefore, combinators are of one of the following general
|
||||
-- shapes:
|
||||
--
|
||||
-- * @... -> Fold m a b -> Fold m c b@ (input transformation)
|
||||
-- * @... -> Fold m a b -> Fold m a c@ (output transformation)
|
||||
--
|
||||
-- Output transformations are also known as covariant transformations, and
|
||||
-- input transformations are also known as contravariant transformations.
|
||||
-- The input side transformations are more interesting for folds. Most of
|
||||
-- the following sections describe the input transformation operations on a
|
||||
-- fold. The names and signatures of the operations are consistent with
|
||||
-- corresponding operations in "Streamly.Prelude". When an operation makes
|
||||
-- sense on both input and output side we use the prefix @l@ (for left) for
|
||||
-- input side operations and the prefix @r@ (for right) for output side
|
||||
-- operations.
|
||||
-- fold. When an operation makes sense on both input and output side we use
|
||||
-- the prefix @l@ (for left) for input side operations and the prefix @r@
|
||||
-- (for right) for output side operations.
|
||||
|
||||
-- ** Mapping on output
|
||||
-- | The 'Functor' instance of a fold maps on the output of the fold:
|
||||
@ -219,7 +247,10 @@ module Streamly.Data.Fold
|
||||
, lmap
|
||||
, lmapM
|
||||
|
||||
-- ** Filtering
|
||||
-- ** Scanning and Filtering
|
||||
, scan
|
||||
, postscan
|
||||
, scanMaybe
|
||||
, filter
|
||||
, filterM
|
||||
|
||||
@ -268,6 +299,9 @@ module Streamly.Data.Fold
|
||||
, concatMap
|
||||
|
||||
-- * Deprecated
|
||||
, foldr
|
||||
, drainBy
|
||||
, last
|
||||
, head
|
||||
, sequence
|
||||
, mapM
|
||||
@ -282,7 +316,7 @@ import Prelude
|
||||
notElem, maximum, minimum, head, last, tail, length, null,
|
||||
reverse, iterate, init, and, or, lookup, foldr1, (!!),
|
||||
scanl, scanl1, replicate, concatMap, mconcat, foldMap, unzip,
|
||||
span, splitAt, break, mapM)
|
||||
span, splitAt, break, mapM, maybe)
|
||||
|
||||
import Streamly.Internal.Data.Fold
|
||||
|
||||
|
@ -27,7 +27,7 @@
|
||||
-- instances of the output types:
|
||||
--
|
||||
-- >>> import Data.Monoid (Sum(..))
|
||||
-- >>> t = Tee Fold.one <> Tee Fold.last
|
||||
-- >>> t = Tee Fold.one <> Tee Fold.latest
|
||||
-- >>> Stream.fold (toFold t) (fmap Sum $ Stream.enumerateFromTo 1.0 100.0)
|
||||
-- Just (Sum {getSum = 101.0})
|
||||
--
|
||||
|
@ -18,7 +18,6 @@ module Streamly.Data.Parser
|
||||
-- * Parser Type
|
||||
Parser
|
||||
|
||||
-- XXX Should we use Fold.fromParser instead?
|
||||
-- -- * Downgrade to Fold
|
||||
-- , toFold
|
||||
|
||||
|
@ -242,7 +242,10 @@ module Streamly.Data.Stream
|
||||
, fold -- XXX rename to run? We can have a Stream.run and Fold.run.
|
||||
-- XXX fold1 can be achieved using Monoids or Refolds.
|
||||
, foldBreak
|
||||
, foldContinue
|
||||
|
||||
-- * Builders
|
||||
, build
|
||||
-- , buildl
|
||||
|
||||
-- ** Parsing
|
||||
, parse
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -322,18 +322,11 @@
|
||||
-- can be expressed using 'mconcat' and a suitable 'Monoid'. Instead of
|
||||
-- writing folds we can write Monoids and turn them into folds.
|
||||
--
|
||||
-- = Performance Notes
|
||||
--
|
||||
-- 'Streamly.Prelude' module provides fold functions to directly fold streams
|
||||
-- e.g. Streamly.Prelude/'Streamly.Prelude.sum' serves the same purpose as
|
||||
-- Fold/'sum'. However, the functions in Streamly.Prelude cannot be
|
||||
-- efficiently combined together e.g. we cannot drive the input stream through
|
||||
-- @sum@ and @length@ fold functions simultaneously. Using the 'Fold' type we
|
||||
-- can efficiently split the stream across multiple folds because it allows the
|
||||
-- compiler to perform stream fusion optimizations.
|
||||
--
|
||||
module Streamly.Internal.Data.Fold.Type
|
||||
(
|
||||
-- * Imports
|
||||
-- $setup
|
||||
|
||||
-- * Types
|
||||
Step (..)
|
||||
, Fold (..)
|
||||
@ -344,8 +337,8 @@ module Streamly.Internal.Data.Fold.Type
|
||||
, foldl1'
|
||||
, foldt'
|
||||
, foldtM'
|
||||
, foldr
|
||||
, foldrM
|
||||
, foldr'
|
||||
, foldrM'
|
||||
|
||||
-- * Folds
|
||||
, fromPure
|
||||
@ -395,6 +388,7 @@ module Streamly.Internal.Data.Fold.Type
|
||||
|
||||
-- ** Nested Application
|
||||
, concatMap
|
||||
, duplicate
|
||||
, refold
|
||||
|
||||
-- ** Parallel Distribution
|
||||
@ -407,11 +401,17 @@ module Streamly.Internal.Data.Fold.Type
|
||||
, longest
|
||||
|
||||
-- * Running A Fold
|
||||
, initialize
|
||||
, extractM
|
||||
, reduce
|
||||
, snoc
|
||||
, duplicate
|
||||
, finish
|
||||
, isDone
|
||||
, snocM
|
||||
, snocl
|
||||
, snoclM
|
||||
, close
|
||||
, isClosed
|
||||
|
||||
-- * Deprecated
|
||||
, foldr
|
||||
)
|
||||
where
|
||||
|
||||
@ -434,6 +434,7 @@ import Prelude hiding (concatMap, filter, foldr, map, take)
|
||||
-- >>> :m
|
||||
-- >>> :set -XFlexibleContexts
|
||||
-- >>> import Data.Maybe (fromJust, isJust)
|
||||
-- >>> import Data.Monoid (Endo(..))
|
||||
-- >>> import Streamly.Data.Fold (Fold)
|
||||
-- >>> import Streamly.Internal.Data.Stream.Type (Stream)
|
||||
-- >>> import qualified Data.Foldable as Foldable
|
||||
@ -504,8 +505,6 @@ rmapM f (Fold step initial extract) = Fold step1 initial1 (extract >=> f)
|
||||
-- mkfoldlx step initial extract = fmap extract (foldl' step initial)
|
||||
-- @
|
||||
--
|
||||
-- See also: @Streamly.Prelude.foldl'@
|
||||
--
|
||||
{-# INLINE foldl' #-}
|
||||
foldl' :: Monad m => (b -> a -> b) -> b -> Fold m a b
|
||||
foldl' step initial =
|
||||
@ -524,8 +523,6 @@ foldl' step initial =
|
||||
-- mkFoldlxM step initial extract = rmapM extract (foldlM' step initial)
|
||||
-- @
|
||||
--
|
||||
-- See also: @Streamly.Prelude.foldlM'@
|
||||
--
|
||||
{-# INLINE foldlM' #-}
|
||||
foldlM' :: Monad m => (b -> a -> m b) -> m b -> Fold m a b
|
||||
foldlM' step initial =
|
||||
@ -534,8 +531,6 @@ foldlM' step initial =
|
||||
-- | Make a strict left fold, for non-empty streams, using first element as the
|
||||
-- starting value. Returns Nothing if the stream is empty.
|
||||
--
|
||||
-- See also: @Streamly.Prelude.foldl1'@
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE foldl1' #-}
|
||||
foldl1' :: Monad m => (a -> a -> a) -> Fold m a (Maybe a)
|
||||
@ -552,34 +547,43 @@ foldl1' step = fmap toMaybe $ foldl' step1 Nothing'
|
||||
|
||||
-- | Make a fold using a right fold style step function and a terminal value.
|
||||
-- It performs a strict right fold via a left fold using function composition.
|
||||
-- Note that this is strict fold, it can only be useful for constructing strict
|
||||
-- Note that a strict right fold can only be useful for constructing strict
|
||||
-- structures in memory. For reductions this will be very inefficient.
|
||||
--
|
||||
-- For example,
|
||||
-- Definitions:
|
||||
--
|
||||
-- > toList = foldr (:) []
|
||||
-- >>> foldr' f z = fmap (flip appEndo z) $ Fold.foldMap (Endo . f)
|
||||
-- >>> foldr' f z = fmap ($ z) $ Fold.foldl' (\g x -> g . f x) id
|
||||
--
|
||||
-- See also: 'Streamly.Prelude.foldr'
|
||||
-- Example:
|
||||
--
|
||||
-- >>> Stream.fold (Fold.foldr' (:) []) $ Stream.enumerateFromTo 1 5
|
||||
-- [1,2,3,4,5]
|
||||
--
|
||||
{-# INLINE foldr' #-}
|
||||
foldr' :: Monad m => (a -> b -> b) -> b -> Fold m a b
|
||||
foldr' f z = fmap ($ z) $ foldl' (\g x -> g . f x) id
|
||||
|
||||
{-# DEPRECATED foldr "Please use foldr' instead." #-}
|
||||
{-# INLINE foldr #-}
|
||||
foldr :: Monad m => (a -> b -> b) -> b -> Fold m a b
|
||||
foldr g z = fmap ($ z) $ foldl' (\f x -> f . g x) id
|
||||
foldr = foldr'
|
||||
|
||||
-- XXX we have not seen any use of this yet, not releasing until we have a use
|
||||
-- case.
|
||||
|
||||
-- | Like foldr' but with a monadic step function.
|
||||
--
|
||||
-- | Like 'foldr' but with a monadic step function.
|
||||
-- Example:
|
||||
--
|
||||
-- For example,
|
||||
-- >>> toList = Fold.foldrM' (\a xs -> return $ a : xs) (return [])
|
||||
--
|
||||
-- > toList = foldrM (\a xs -> return $ a : xs) (return [])
|
||||
--
|
||||
-- See also: 'Streamly.Prelude.foldrM'
|
||||
-- See also: 'Streamly.Internal.Data.Stream.foldrM'
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE foldrM #-}
|
||||
foldrM :: Monad m => (a -> b -> m b) -> m b -> Fold m a b
|
||||
foldrM g z =
|
||||
{-# INLINE foldrM' #-}
|
||||
foldrM' :: Monad m => (a -> b -> m b) -> m b -> Fold m a b
|
||||
foldrM' g z =
|
||||
rmapM (z >>=) $ foldlM' (\f x -> return $ g x >=> f) (return return)
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
@ -644,7 +648,8 @@ fromRefold (Refold step inject extract) c =
|
||||
-- | A fold that drains all its input, running the effects and discarding the
|
||||
-- results.
|
||||
--
|
||||
-- > drain = drainBy (const (return ()))
|
||||
-- >>> drain = Fold.drainMapM (const (return ()))
|
||||
-- >>> drain = Fold.foldl' (\_ _ -> ()) ()
|
||||
--
|
||||
{-# INLINE drain #-}
|
||||
drain :: Monad m => Fold m a ()
|
||||
@ -656,11 +661,11 @@ drain = foldl' (\_ _ -> ()) ()
|
||||
-- very inefficient, consider using "Streamly.Data.Array.Unboxed"
|
||||
-- instead.
|
||||
--
|
||||
-- > toList = foldr (:) []
|
||||
-- >>> toList = Fold.foldr' (:) []
|
||||
--
|
||||
{-# INLINE toList #-}
|
||||
toList :: Monad m => Fold m a [a]
|
||||
toList = foldr (:) []
|
||||
toList = foldr' (:) []
|
||||
|
||||
-- | Buffers the input stream to a pure stream in the reverse order of the
|
||||
-- input.
|
||||
@ -702,9 +707,28 @@ instance Functor m => Functor (Fold m a) where
|
||||
step s b = fmap2 f (step1 s b)
|
||||
fmap2 g = fmap (fmap g)
|
||||
|
||||
-- This is the dual of stream "fromPure".
|
||||
-- XXX These are singleton folds that are closed for input. The correspondence
|
||||
-- to a nil stream would be a nil fold that returns "Done" in "initial" i.e. it
|
||||
-- does not produce any accumulator value. However, we do not have a
|
||||
-- representation of an empty value in folds, because the Done constructor
|
||||
-- always produces a value (Done b). We can potentially use "Partial s b" and
|
||||
-- "Done" to make the type correspond to the stream type. That may be possible
|
||||
-- if we introduce the "Skip" constructor as well because after the last
|
||||
-- "Partial s b" we have to emit a "Skip to Done" state to keep cranking the
|
||||
-- fold until it is done.
|
||||
--
|
||||
-- | A fold that always yields a pure value without consuming any input.
|
||||
-- There is also the asymmetry between folds and streams because folds have an
|
||||
-- "initial" to initialize the fold without any input. A similar concept is
|
||||
-- possible in streams as well to stop the stream. That would be a "closing"
|
||||
-- operation for the stream which can be called even without consuming any item
|
||||
-- from the stream or when we are done consuming.
|
||||
--
|
||||
-- However, the initial action in folds creates a discrepancy with the CPS
|
||||
-- folds, and the same may be the case if we have a stop/cleanup operation in
|
||||
-- streams.
|
||||
|
||||
-- | Make a fold that yields the supplied value without consuming any further
|
||||
-- input.
|
||||
--
|
||||
-- /Pre-release/
|
||||
--
|
||||
@ -712,10 +736,8 @@ instance Functor m => Functor (Fold m a) where
|
||||
fromPure :: Applicative m => b -> Fold m a b
|
||||
fromPure b = Fold undefined (pure $ Done b) pure
|
||||
|
||||
-- This is the dual of stream "fromEffect".
|
||||
--
|
||||
-- | A fold that always yields the result of an effectful action without
|
||||
-- consuming any input.
|
||||
-- | Make a fold that yields the result of the supplied effectful action
|
||||
-- without consuming any further input.
|
||||
--
|
||||
-- /Pre-release/
|
||||
--
|
||||
@ -732,17 +754,22 @@ data SeqFoldState sl f sr = SeqFoldL !sl | SeqFoldR !f !sr
|
||||
-- or if the input stream is over, the outputs of the two folds are combined
|
||||
-- using the supplied function.
|
||||
--
|
||||
-- >>> f = Fold.serialWith (,) (Fold.take 8 Fold.toList) (Fold.takeEndBy (== '\n') Fold.toList)
|
||||
-- Example:
|
||||
--
|
||||
-- >>> header = Fold.take 8 Fold.toList
|
||||
-- >>> line = Fold.takeEndBy (== '\n') Fold.toList
|
||||
-- >>> f = Fold.serialWith (,) header line
|
||||
-- >>> Stream.fold f $ Stream.fromList "header: hello\n"
|
||||
-- ("header: ","hello\n")
|
||||
--
|
||||
-- Note: This is dual to appending streams using 'Streamly.Prelude.serial'.
|
||||
-- Note: This is dual to appending streams using 'Data.Stream.append'.
|
||||
--
|
||||
-- Note: this implementation allows for stream fusion but has quadratic time
|
||||
-- complexity, because each composition adds a new branch that each subsequent
|
||||
-- fold's input element has to traverse, therefore, it cannot scale to a large
|
||||
-- number of compositions. After around 100 compositions the performance starts
|
||||
-- dipping rapidly compared to a CPS style implementation.
|
||||
-- dipping rapidly compared to a CPS style implementation. When you need
|
||||
-- scaling use parser monad instead.
|
||||
--
|
||||
-- /Time: O(n^2) where n is the number of compositions./
|
||||
--
|
||||
@ -828,16 +855,20 @@ data TeeState sL sR bL bR
|
||||
-- | @teeWith k f1 f2@ distributes its input to both @f1@ and @f2@ until both
|
||||
-- of them terminate and combines their output using @k@.
|
||||
--
|
||||
-- Definition:
|
||||
--
|
||||
-- >>> teeWith k f1 f2 = fmap (uncurry k) (Fold.tee f1 f2)
|
||||
--
|
||||
-- Example:
|
||||
--
|
||||
-- >>> avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)
|
||||
-- >>> Stream.fold avg $ Stream.fromList [1.0..100.0]
|
||||
-- 50.5
|
||||
--
|
||||
-- > teeWith k f1 f2 = fmap (uncurry k) ((Fold.tee f1 f2)
|
||||
--
|
||||
-- For applicative composition using this combinator see
|
||||
-- "Streamly.Internal.Data.Fold.Tee".
|
||||
-- "Streamly.Data.Fold.Tee".
|
||||
--
|
||||
-- See also: "Streamly.Internal.Data.Fold.Tee"
|
||||
-- See also: "Streamly.Data.Fold.Tee"
|
||||
--
|
||||
{-# INLINE teeWith #-}
|
||||
teeWith :: Monad m => (a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c
|
||||
@ -1092,10 +1123,15 @@ concatMap f (Fold stepa initiala extracta) = Fold stepc initialc extractc
|
||||
|
||||
-- | @lmap f fold@ maps the function @f@ on the input of the fold.
|
||||
--
|
||||
-- >>> Stream.fold (Fold.lmap (\x -> x * x) Fold.sum) (Stream.enumerateFromTo 1 100)
|
||||
-- 338350
|
||||
-- Definition:
|
||||
--
|
||||
-- > lmap = Fold.lmapM return
|
||||
-- >>> lmap = Fold.lmapM return
|
||||
--
|
||||
-- Example:
|
||||
--
|
||||
-- >>> sumSquared = Fold.lmap (\x -> x * x) Fold.sum
|
||||
-- >>> Stream.fold sumSquared (Stream.enumerateFromTo 1 100)
|
||||
-- 338350
|
||||
--
|
||||
{-# INLINE lmap #-}
|
||||
lmap :: (a -> b) -> Fold m b r -> Fold m a r
|
||||
@ -1247,6 +1283,10 @@ rights = filter isRight . lmap (fromRight undefined)
|
||||
-- | Remove the either wrapper and flatten both lefts and as well as rights in
|
||||
-- the output stream.
|
||||
--
|
||||
-- Definition:
|
||||
--
|
||||
-- >>> both = Fold.lmap (either id id)
|
||||
--
|
||||
-- /Pre-release/
|
||||
--
|
||||
{-# INLINE both #-}
|
||||
@ -1328,29 +1368,18 @@ take n (Fold fstep finitial fextract) = Fold step initial extract
|
||||
-- Nesting
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
-- Similar to the comonad "duplicate" operation.
|
||||
|
||||
-- | 'duplicate' provides the ability to run a fold in parts. The duplicated
|
||||
-- fold consumes the input and returns the same fold as output instead of
|
||||
-- returning the final result, the returned fold can be run later to consume
|
||||
-- more input.
|
||||
--
|
||||
-- We can append a stream to a fold as follows:
|
||||
--
|
||||
-- >>> :{
|
||||
-- foldAppend :: Monad m => Fold m a b -> Stream m a -> m (Fold m a b)
|
||||
-- foldAppend f = Stream.fold (Fold.duplicate f)
|
||||
-- :}
|
||||
--
|
||||
-- >>> :{
|
||||
-- do
|
||||
-- sum1 <- foldAppend Fold.sum (Stream.enumerateFromTo 1 10)
|
||||
-- sum2 <- foldAppend sum1 (Stream.enumerateFromTo 11 20)
|
||||
-- Stream.fold sum2 (Stream.enumerateFromTo 21 30)
|
||||
-- :}
|
||||
-- 465
|
||||
--
|
||||
-- 'duplicate' essentially appends a stream to the fold without finishing the
|
||||
-- fold. Compare with 'snoc' which appends a singleton value to the fold.
|
||||
--
|
||||
-- See also 'Streamly.Internal.Data.Stream.build'.
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE duplicate #-}
|
||||
duplicate :: Monad m => Fold m a b -> Fold m a (Fold m a b)
|
||||
@ -1363,23 +1392,94 @@ duplicate (Fold step1 initial1 extract1) =
|
||||
|
||||
step s a = second fromPure <$> step1 s a
|
||||
|
||||
-- | Run the initialization effect of a fold. The returned fold would use the
|
||||
-- value returned by this effect as its initial value.
|
||||
-- If there were a finalize/flushing action in the stream type that would be
|
||||
-- equivalent to running initialize in Fold. But we do not have a flushing
|
||||
-- action in streams.
|
||||
|
||||
-- | Evaluate the initialization effect of a fold. If we are building the fold
|
||||
-- by chaining lazy actions in fold init this would reduce the actions to a
|
||||
-- strict accumulator value.
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE initialize #-}
|
||||
initialize :: Monad m => Fold m a b -> m (Fold m a b)
|
||||
initialize (Fold step initial extract) = do
|
||||
{-# INLINE reduce #-}
|
||||
reduce :: Monad m => Fold m a b -> m (Fold m a b)
|
||||
reduce (Fold step initial extract) = do
|
||||
i <- initial
|
||||
return $ Fold step (return i) extract
|
||||
|
||||
-- | Append a singleton value to the fold.
|
||||
-- This is the dual of Stream @cons@.
|
||||
|
||||
-- | Append an effect to the fold lazily, in other words run a single
|
||||
-- step of the fold.
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE snoclM #-}
|
||||
snoclM :: Monad m => Fold m a b -> m a -> Fold m a b
|
||||
snoclM (Fold fstep finitial fextract) action = Fold fstep initial fextract
|
||||
|
||||
where
|
||||
|
||||
initial = do
|
||||
res <- finitial
|
||||
case res of
|
||||
Partial fs -> action >>= fstep fs
|
||||
Done b -> return $ Done b
|
||||
|
||||
-- | Append a singleton value to the fold lazily, in other words run a single
|
||||
-- step of the fold.
|
||||
--
|
||||
-- Definition:
|
||||
--
|
||||
-- >>> snocl f = Fold.snoclM f . return
|
||||
--
|
||||
-- Example:
|
||||
--
|
||||
-- >>> import qualified Data.Foldable as Foldable
|
||||
-- >>> Foldable.foldlM Fold.snoc Fold.toList [1..3] >>= Fold.finish
|
||||
-- >>> Fold.extractM $ Foldable.foldl Fold.snocl Fold.toList [1..3]
|
||||
-- [1,2,3]
|
||||
--
|
||||
-- Compare with 'duplicate' which allows appending a stream to the fold.
|
||||
-- /Pre-release/
|
||||
{-# INLINE snocl #-}
|
||||
snocl :: Monad m => Fold m a b -> a -> Fold m a b
|
||||
-- snocl f = snoclM f . return
|
||||
snocl (Fold fstep finitial fextract) a = Fold fstep initial fextract
|
||||
|
||||
where
|
||||
|
||||
initial = do
|
||||
res <- finitial
|
||||
case res of
|
||||
Partial fs -> fstep fs a
|
||||
Done b -> return $ Done b
|
||||
|
||||
-- | Append a singleton value to the fold in other words run a single step of
|
||||
-- the fold.
|
||||
--
|
||||
-- Definition:
|
||||
--
|
||||
-- >>> snocM f = Fold.reduce . Fold.snoclM f
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE snocM #-}
|
||||
snocM :: Monad m => Fold m a b -> m a -> m (Fold m a b)
|
||||
snocM (Fold step initial extract) action = do
|
||||
res <- initial
|
||||
r <- case res of
|
||||
Partial fs -> action >>= step fs
|
||||
Done _ -> return res
|
||||
return $ Fold step (return r) extract
|
||||
|
||||
-- | Append a singleton value to the fold, in other words run a single step of
|
||||
-- the fold.
|
||||
--
|
||||
-- Definitions:
|
||||
--
|
||||
-- >>> snoc f = Fold.reduce . Fold.snocl f
|
||||
-- >>> snoc f = Fold.snocM f . return
|
||||
--
|
||||
-- >>> import qualified Data.Foldable as Foldable
|
||||
-- >>> Foldable.foldlM Fold.snoc Fold.toList [1..3] >>= Fold.extractM
|
||||
-- [1,2,3]
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE snoc #-}
|
||||
@ -1391,30 +1491,54 @@ snoc (Fold step initial extract) a = do
|
||||
Done _ -> return res
|
||||
return $ Fold step (return r) extract
|
||||
|
||||
-- | Finish the fold to extract the current value of the fold.
|
||||
-- Similar to the comonad "extract" operation.
|
||||
-- XXX rename to extract. We can use "extr" for the fold extract function.
|
||||
|
||||
-- | Extract the accumulated result of the fold.
|
||||
--
|
||||
-- >>> Fold.finish Fold.toList
|
||||
-- Definition:
|
||||
--
|
||||
-- >>> extractM = Fold.drive Stream.nil
|
||||
--
|
||||
-- Example:
|
||||
--
|
||||
-- >>> Fold.extractM Fold.toList
|
||||
-- []
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE finish #-}
|
||||
finish :: Monad m => Fold m a b -> m b
|
||||
finish (Fold _ initial extract) = do
|
||||
{-# INLINE extractM #-}
|
||||
extractM :: Monad m => Fold m a b -> m b
|
||||
extractM (Fold _ initial extract) = do
|
||||
res <- initial
|
||||
case res of
|
||||
Partial fs -> extract fs
|
||||
Done b -> return b
|
||||
|
||||
-- | Check if the fold is done and can take no more input.
|
||||
-- | Close a fold so that it does not accept any more input.
|
||||
{-# INLINE close #-}
|
||||
close :: Monad m => Fold m a b -> Fold m a b
|
||||
close (Fold _ initial1 extract1) = Fold undefined initial undefined
|
||||
|
||||
where
|
||||
|
||||
initial = do
|
||||
res <- initial1
|
||||
case res of
|
||||
Partial s -> Done <$> extract1 s
|
||||
Done b -> return $ Done b
|
||||
|
||||
-- Corresponds to the null check for streams.
|
||||
|
||||
-- | Check if the fold has terminated and can take no more input.
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE isDone #-}
|
||||
isDone :: Monad m => Fold m a b -> m (Maybe b)
|
||||
isDone (Fold _ initial _) = do
|
||||
{-# INLINE isClosed #-}
|
||||
isClosed :: Monad m => Fold m a b -> m Bool
|
||||
isClosed (Fold _ initial _) = do
|
||||
res <- initial
|
||||
return $ case res of
|
||||
Partial _ -> Nothing
|
||||
Done b -> Just b
|
||||
Partial _ -> False
|
||||
Done _ -> True
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Parsing
|
||||
@ -1440,7 +1564,7 @@ data ManyState s1 s2
|
||||
--
|
||||
-- Stops when @collect@ stops.
|
||||
--
|
||||
-- See also: 'Streamly.Prelude.concatMap', 'Streamly.Prelude.foldMany'
|
||||
-- See also: 'Data.Stream.concatMap', 'Data.Stream.foldMany'
|
||||
--
|
||||
{-# INLINE many #-}
|
||||
many :: Monad m => Fold m a b -> Fold m b c -> Fold m a c
|
||||
@ -1499,7 +1623,7 @@ many (Fold sstep sinitial sextract) (Fold cstep cinitial cextract) =
|
||||
--
|
||||
-- /Internal/
|
||||
--
|
||||
-- /See also: 'Streamly.Prelude.concatMap', 'Streamly.Prelude.foldMany'/
|
||||
-- See also: 'Data.Stream.concatMap', 'Data.Stream.foldMany'
|
||||
--
|
||||
{-# INLINE manyPost #-}
|
||||
manyPost :: Monad m => Fold m a b -> Fold m b c -> Fold m a c
|
||||
@ -1544,12 +1668,16 @@ manyPost (Fold sstep sinitial sextract) (Fold cstep cinitial cextract) =
|
||||
-- of @n@ items in the input stream and supplies the result to the @collect@
|
||||
-- fold.
|
||||
--
|
||||
-- Definition:
|
||||
--
|
||||
-- >>> chunksOf n split = Fold.many (Fold.take n split)
|
||||
--
|
||||
-- Example:
|
||||
--
|
||||
-- >>> twos = Fold.chunksOf 2 Fold.toList Fold.toList
|
||||
-- >>> Stream.fold twos $ Stream.fromList [1..10]
|
||||
-- [[1,2],[3,4],[5,6],[7,8],[9,10]]
|
||||
--
|
||||
-- > chunksOf n split = many (take n split)
|
||||
--
|
||||
-- Stops when @collect@ stops.
|
||||
--
|
||||
{-# INLINE chunksOf #-}
|
||||
@ -1665,4 +1793,5 @@ refoldMany1 (Refold sstep sinject sextract) (Fold cstep cinitial cextract) =
|
||||
-- /Internal/
|
||||
{-# INLINE refold #-}
|
||||
refold :: Monad m => Refold m b a c -> Fold m a b -> Fold m a c
|
||||
refold (Refold step inject extract) f = Fold step (finish f >>= inject) extract
|
||||
refold (Refold step inject extract) f =
|
||||
Fold step (extractM f >>= inject) extract
|
||||
|
@ -54,9 +54,6 @@ module Streamly.Internal.Data.Fold.Window
|
||||
, maximum
|
||||
, range
|
||||
, mean
|
||||
|
||||
-- ** Distribution
|
||||
, frequency
|
||||
)
|
||||
where
|
||||
|
||||
@ -68,7 +65,6 @@ import Streamly.Internal.Data.Fold.Type (Fold(..), Step(..))
|
||||
import Streamly.Internal.Data.Tuple.Strict
|
||||
(Tuple'(..), Tuple3Fused' (Tuple3Fused'))
|
||||
|
||||
import qualified Data.Map as Map
|
||||
import qualified Streamly.Internal.Data.Fold.Type as Fold
|
||||
import qualified Streamly.Internal.Data.Ring.Unboxed as Ring
|
||||
|
||||
@ -353,36 +349,3 @@ maximum n = fmap (fmap snd) $ range n
|
||||
{-# INLINE mean #-}
|
||||
mean :: forall m a. (Monad m, Fractional a) => Fold m (a, Maybe a) a
|
||||
mean = Fold.teeWith (/) sum length
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
-- Distribution
|
||||
-------------------------------------------------------------------------------
|
||||
|
||||
-- XXX We can use a Windowed classifyWith operation, that will allow us to
|
||||
-- express windowed frequency, mode, histograms etc idiomatically.
|
||||
|
||||
-- | Count the frequency of elements in a sliding window.
|
||||
--
|
||||
-- >>> input = Stream.fromList [1,1,3,4,4::Int]
|
||||
-- >>> f = Ring.slidingWindow 4 FoldW.frequency
|
||||
-- >>> Stream.fold f input
|
||||
-- fromList [(1,1),(3,1),(4,2)]
|
||||
--
|
||||
-- /Pre-release/
|
||||
--
|
||||
{-# INLINE frequency #-}
|
||||
frequency :: (Monad m, Ord a) => Fold m (a, Maybe a) (Map.Map a Int)
|
||||
frequency = Fold.foldl' step Map.empty
|
||||
|
||||
where
|
||||
|
||||
decrement v =
|
||||
if v == 1
|
||||
then Nothing
|
||||
else Just (v - 1)
|
||||
|
||||
step refCountMap (new, mOld) =
|
||||
let m1 = Map.insertWith (+) new 1 refCountMap
|
||||
in case mOld of
|
||||
Just k -> Map.update decrement k m1
|
||||
Nothing -> m1
|
||||
|
@ -105,6 +105,7 @@ module Streamly.Internal.Data.Parser
|
||||
, listEq
|
||||
, listEqBy
|
||||
, eqBy
|
||||
, subsequenceBy
|
||||
|
||||
-- ** By predicate
|
||||
, takeWhileP
|
||||
@ -442,6 +443,8 @@ satisfy = D.toParserK . D.satisfy
|
||||
one :: Monad m => Parser m a a
|
||||
one = satisfy $ const True
|
||||
|
||||
-- Alternate names: "only", "onlyThis".
|
||||
|
||||
-- | Match a specific element.
|
||||
--
|
||||
-- >>> oneEq x = Parser.satisfy (== x)
|
||||
@ -450,6 +453,8 @@ one = satisfy $ const True
|
||||
oneEq :: (Monad m, Eq a) => a -> Parser m a a
|
||||
oneEq x = satisfy (== x)
|
||||
|
||||
-- Alternate names: "exclude", "notThis".
|
||||
|
||||
-- | Match anything other than the supplied element.
|
||||
--
|
||||
-- >>> oneNotEq x = Parser.satisfy (/= x)
|
||||
@ -995,27 +1000,40 @@ groupByRollingEither :: Monad m =>
|
||||
(a -> a -> Bool) -> Fold m a b -> Fold m a c -> Parser m a (Either b c)
|
||||
groupByRollingEither eq f1 = D.toParserK . D.groupByRollingEither eq f1
|
||||
|
||||
-- XXX eqBy is not a good name because we are not matching the entire stream
|
||||
-- unlike the eqBy in Stream module. matchStreamBy, matchStream, matchListBy,
|
||||
-- matchList.
|
||||
|
||||
-- | Like 'listEqBy' but uses a stream instead of a list and does not return
|
||||
-- the stream.
|
||||
--
|
||||
-- See also: "Streamly.Data.Stream.eqBy"
|
||||
--
|
||||
{-# INLINE eqBy #-}
|
||||
eqBy :: Monad m => (a -> a -> Bool) -> Stream m a -> Parser m a ()
|
||||
eqBy cmp = D.toParserK . D.eqBy cmp . Stream.toStreamD
|
||||
|
||||
-- | Match the given sequence of elements using the given comparison function.
|
||||
-- Returns the original sequence if successful.
|
||||
--
|
||||
-- Definition:
|
||||
--
|
||||
-- >>> listEqBy cmp xs = Parser.eqBy cmp (Stream.fromList xs) *> Parser.fromPure xs
|
||||
--
|
||||
-- Examples:
|
||||
--
|
||||
-- >>> Stream.parse (Parser.listEqBy (==) "string") $ Stream.fromList "string"
|
||||
-- "string"
|
||||
--
|
||||
-- >>> Stream.parse (Parser.listEqBy (==) "mismatch") $ Stream.fromList "match"
|
||||
-- *** Exception: ParseError "listEqBy: failed, yet to match 7 elements"
|
||||
-- *** Exception: ParseError "eqBy: mismtach occurred"
|
||||
--
|
||||
{-# INLINE listEqBy #-}
|
||||
listEqBy :: Monad m => (a -> a -> Bool) -> [a] -> Parser m a [a]
|
||||
listEqBy cmp xs = D.toParserK (D.listEqBy cmp xs)
|
||||
|
||||
-- | Like 'listEqBy' but uses a stream instead of a list and does not return
|
||||
-- the stream.
|
||||
--
|
||||
-- Note: A @Stream m a@ isn't returned as it isn't buffered.
|
||||
{-# INLINE eqBy #-}
|
||||
eqBy :: Monad m => (a -> a -> Bool) -> Stream m a -> Parser m a ()
|
||||
eqBy cmp = D.toParserK . D.eqBy cmp . Stream.toStreamD
|
||||
-- listEqBy cmp xs = D.toParserK (D.listEqBy cmp xs)
|
||||
listEqBy cmp xs = eqBy cmp (Stream.fromList xs) *> fromPure xs
|
||||
|
||||
-- Rename to "list".
|
||||
-- | Match the input sequence with the supplied list and return it if
|
||||
-- successful.
|
||||
--
|
||||
@ -1025,6 +1043,29 @@ eqBy cmp = D.toParserK . D.eqBy cmp . Stream.toStreamD
|
||||
listEq :: (Monad m, Eq a) => [a] -> Parser m a [a]
|
||||
listEq = listEqBy (==)
|
||||
|
||||
-- | Match if the input stream is a subsequence of the argument stream i.e. all
|
||||
-- the elements of the input stream occur, in order, in the argument stream.
|
||||
-- The elements do not have to occur consecutively. A sequence is considered a
|
||||
-- subsequence of itself.
|
||||
{-# INLINE subsequenceBy #-}
|
||||
subsequenceBy :: -- Monad m =>
|
||||
(a -> a -> Bool) -> Stream m a -> Parser m a ()
|
||||
subsequenceBy = undefined
|
||||
|
||||
{-
|
||||
-- Should go in Data.Parser.Regex in streamly package so that it can depend on
|
||||
-- regex backends.
|
||||
{-# INLINE regexPosix #-}
|
||||
regexPosix :: -- Monad m =>
|
||||
Regex -> Parser m a (Maybe (Array (MatchOffset, MatchLength)))
|
||||
regexPosix = undefined
|
||||
|
||||
{-# INLINE regexPCRE #-}
|
||||
regexPCRE :: -- Monad m =>
|
||||
Regex -> Parser m a (Maybe (Array (MatchOffset, MatchLength)))
|
||||
regexPCRE = undefined
|
||||
-}
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
-- nested parsers
|
||||
-------------------------------------------------------------------------------
|
||||
@ -1432,7 +1473,7 @@ sepBy1 :: Monad m =>
|
||||
Parser m a b -> Parser m a x -> Fold m b c -> Parser m a c
|
||||
sepBy1 p sep sink = do
|
||||
x <- p
|
||||
f <- fromEffect $ FL.initialize sink
|
||||
f <- fromEffect $ FL.reduce sink
|
||||
f1 <- fromEffect $ FL.snoc f x
|
||||
many (sep >> p) f1
|
||||
|
||||
|
@ -21,13 +21,16 @@ module Streamly.Internal.Data.Stream.Bottom
|
||||
|
||||
-- * Folds
|
||||
, fold
|
||||
, foldContinue
|
||||
, foldBreak
|
||||
, foldBreak2
|
||||
, foldEither
|
||||
, foldEither2
|
||||
, foldConcat
|
||||
|
||||
-- * Builders
|
||||
, build
|
||||
, buildl
|
||||
|
||||
-- * Scans
|
||||
, smapM
|
||||
-- $smapM_Notes
|
||||
@ -75,7 +78,6 @@ import Streamly.Internal.Data.SVar.Type (defState)
|
||||
|
||||
import qualified Streamly.Internal.Data.Array.Unboxed.Type as A
|
||||
import qualified Streamly.Internal.Data.Fold as Fold
|
||||
import qualified Streamly.Internal.Data.Stream.Common as Common
|
||||
import qualified Streamly.Internal.Data.Stream.StreamK as K
|
||||
import qualified Streamly.Internal.Data.Stream.StreamD as D
|
||||
|
||||
@ -86,7 +88,7 @@ import Streamly.Internal.Data.Stream.Type
|
||||
--
|
||||
-- $setup
|
||||
-- >>> :m
|
||||
-- >>> import Control.Monad (join)
|
||||
-- >>> import Control.Monad (join, (>=>), (<=<))
|
||||
-- >>> import Control.Monad.Trans.Class (lift)
|
||||
-- >>> import Data.Function (fix, (&))
|
||||
-- >>> import Data.Maybe (fromJust, isJust)
|
||||
@ -99,20 +101,6 @@ import Streamly.Internal.Data.Stream.Type
|
||||
-- >>> import qualified Streamly.Internal.Data.Parser as Parser
|
||||
-- >>> import qualified Streamly.Internal.Data.Unfold as Unfold
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Generation
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
-- |
|
||||
-- >>> fromList = Prelude.foldr Stream.cons Stream.nil
|
||||
--
|
||||
-- Construct a stream from a list of pure values. This is more efficient than
|
||||
-- 'fromFoldable'.
|
||||
--
|
||||
{-# INLINE fromList #-}
|
||||
fromList :: Monad m => [a] -> Stream m a
|
||||
fromList = fromStreamK . Common.fromList
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Generation - Time related
|
||||
------------------------------------------------------------------------------
|
||||
@ -126,7 +114,7 @@ fromList = fromStreamK . Common.fromList
|
||||
-- terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.
|
||||
--
|
||||
-- >>> import Control.Concurrent (threadDelay)
|
||||
-- >>> f = Fold.drainBy (\x -> print x >> threadDelay 1000000)
|
||||
-- >>> f = Fold.drainMapM (\x -> print x >> threadDelay 1000000)
|
||||
-- >>> Stream.fold f $ Stream.take 3 $ Stream.timesWith 0.01
|
||||
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
|
||||
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
|
||||
@ -145,7 +133,7 @@ timesWith g = fromStreamD $ D.times g
|
||||
-- expensive in terms of CPU usage. Any granularity lower than 1 ms is treated
|
||||
-- as 1 ms.
|
||||
--
|
||||
-- >>> f = Fold.drainBy print
|
||||
-- >>> f = Fold.drainMapM print
|
||||
-- >>> Stream.fold f $ Stream.delayPre 1 $ Stream.take 3 $ Stream.absTimesWith 0.01
|
||||
-- AbsTime (TimeSpec {sec = ..., nsec = ...})
|
||||
-- AbsTime (TimeSpec {sec = ..., nsec = ...})
|
||||
@ -164,7 +152,7 @@ absTimesWith = fmap (uncurry addToAbsTime64) . timesWith
|
||||
-- clock is more expensive in terms of CPU usage. Any granularity lower than 1
|
||||
-- ms is treated as 1 ms.
|
||||
--
|
||||
-- >>> f = Fold.drainBy print
|
||||
-- >>> f = Fold.drainMapM print
|
||||
-- >>> Stream.fold f $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimesWith 0.01
|
||||
-- RelTime64 (NanoSecond64 ...)
|
||||
-- RelTime64 (NanoSecond64 ...)
|
||||
@ -182,40 +170,64 @@ relTimesWith = fmap snd . timesWith
|
||||
-- Elimination - Running a Fold
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
-- | We can create higher order folds using 'foldContinue'. We can fold a
|
||||
-- number of streams to a given fold efficiently with full stream fusion. For
|
||||
-- example, to fold a list of streams on the same sum fold:
|
||||
-- | Append a stream to a fold lazily to build an accumulator incrementally.
|
||||
--
|
||||
-- >>> concatFold = Prelude.foldl Stream.foldContinue Fold.sum
|
||||
-- Example, to continue folding a list of streams on the same sum fold:
|
||||
--
|
||||
-- >>> fold f = Fold.finish . Stream.foldContinue f
|
||||
-- >>> streams = [Stream.fromList [1..5], Stream.fromList [6..10]]
|
||||
-- >>> f = Prelude.foldl Stream.buildl Fold.sum streams
|
||||
-- >>> Fold.extractM f
|
||||
-- 55
|
||||
--
|
||||
{-# INLINE foldContinue #-}
|
||||
foldContinue :: Monad m => Fold m a b -> Stream m a -> Fold m a b
|
||||
foldContinue f s = D.foldContinue f $ toStreamD s
|
||||
{-# INLINE buildl #-}
|
||||
buildl :: Monad m => Fold m a b -> Stream m a -> Fold m a b
|
||||
buildl f s = D.foldContinue f $ toStreamD s
|
||||
|
||||
-- | Append a stream to a fold strictly to build an accumulator incrementally.
|
||||
--
|
||||
-- Definitions:
|
||||
--
|
||||
-- >>> build f = Stream.fold (Fold.duplicate f)
|
||||
-- >>> build f = Stream.buildl f >=> Fold.reduce
|
||||
--
|
||||
-- Example:
|
||||
--
|
||||
-- >>> :{
|
||||
-- do
|
||||
-- sum1 <- Stream.build Fold.sum (Stream.enumerateFromTo 1 10)
|
||||
-- sum2 <- Stream.build sum1 (Stream.enumerateFromTo 11 20)
|
||||
-- Stream.fold sum2 (Stream.enumerateFromTo 21 30)
|
||||
-- :}
|
||||
-- 465
|
||||
--
|
||||
build :: Monad m => Fold m a b -> Stream m a -> m (Fold m a b)
|
||||
build f = fold (Fold.duplicate f)
|
||||
|
||||
-- | Fold a stream using the supplied left 'Fold' and reducing the resulting
|
||||
-- expression strictly at each step. The behavior is similar to 'foldl''. A
|
||||
-- 'Fold' can terminate early without consuming the full stream. See the
|
||||
-- documentation of individual 'Fold's for termination behavior.
|
||||
--
|
||||
-- Definitions:
|
||||
--
|
||||
-- >>> fold f = fmap fst . Stream.foldBreak f
|
||||
-- >>> fold f = Fold.extractM . Stream.buildl f
|
||||
-- >>> fold f = Fold.extractM <=< Stream.build f
|
||||
-- >>> fold f = Stream.parse (Parser.fromFold f)
|
||||
--
|
||||
-- Example:
|
||||
--
|
||||
-- >>> Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)
|
||||
-- 5050
|
||||
--
|
||||
-- Folds never fail, therefore, they produce a default value even when no input
|
||||
-- is provided. It means we can always fold an empty stream and get a valid
|
||||
-- result. For example:
|
||||
--
|
||||
-- >>> Stream.fold Fold.sum Stream.nil
|
||||
-- 0
|
||||
--
|
||||
-- >>> fold f = Stream.parse (Parser.fromFold f)
|
||||
--
|
||||
{-# INLINE fold #-}
|
||||
fold :: Monad m => Fold m a b -> Stream m a -> m b
|
||||
fold fl strm = D.fold fl $ D.fromStreamK $ toStreamK strm
|
||||
|
||||
-- | Like 'fold' but also returns the remaining stream.
|
||||
-- Alternative name foldSome, but may be confused vs foldMany.
|
||||
|
||||
-- | Like 'fold' but also returns the remaining stream. The resulting stream
|
||||
-- would be 'Stream.nil' if the stream finished before the fold.
|
||||
--
|
||||
-- /Not fused/
|
||||
--
|
||||
@ -246,9 +258,9 @@ foldBreak2 fl strm = fmap f $ D.foldBreak fl $ toStreamD strm
|
||||
|
||||
f (b, str) = (b, fromStreamD str)
|
||||
|
||||
-- | Fold resulting in either breaking the stream or continuation of the fold
|
||||
-- | Fold resulting in either breaking the stream or continuation of the fold.
|
||||
-- Instead of supplying the input stream in one go we can run the fold multiple
|
||||
-- times each time supplying the next segment of the input stream. If the fold
|
||||
-- times, each time supplying the next segment of the input stream. If the fold
|
||||
-- has not yet finished it returns a fold that can be run again otherwise it
|
||||
-- returns the fold result and the residual stream.
|
||||
--
|
||||
@ -372,7 +384,7 @@ map f = fromStreamD . D.map f . toStreamD
|
||||
-- Stream.fold Fold.toList
|
||||
-- $ fmap (fromJust . fst)
|
||||
-- $ Stream.takeWhile (\(_,x) -> x <= 10)
|
||||
-- $ Stream.postscan (Fold.tee Fold.last avg) s
|
||||
-- $ Stream.postscan (Fold.tee Fold.latest avg) s
|
||||
-- :}
|
||||
-- [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]
|
||||
--
|
||||
|
@ -24,7 +24,14 @@ module Streamly.Internal.Data.Stream.Eliminate
|
||||
-- See "Streamly.Internal.Data.Fold".
|
||||
fold
|
||||
, foldBreak
|
||||
, foldContinue
|
||||
, foldBreak2
|
||||
, foldEither
|
||||
, foldEither2
|
||||
, foldConcat
|
||||
|
||||
-- * Builders
|
||||
, build
|
||||
, buildl
|
||||
|
||||
-- * Running a 'Parser'
|
||||
-- "Streamly.Internal.Data.Parser".
|
||||
@ -106,10 +113,15 @@ import Prelude hiding (foldr, init, reverse)
|
||||
-- 'Nothing'. If the stream is non-empty, returns @Just (a, ma)@, where @a@ is
|
||||
-- the head of the stream and @ma@ its tail.
|
||||
--
|
||||
-- This can be used to do pretty much anything in an imperative manner, as it
|
||||
-- just breaks down the stream into individual elements and we can loop over
|
||||
-- them as we deem fit. For example, this can be used to convert a streamly
|
||||
-- stream into other stream types.
|
||||
-- Properties:
|
||||
--
|
||||
-- >>> Nothing <- Stream.uncons Stream.nil
|
||||
-- >>> Just ("a", t) <- Stream.uncons (Stream.cons "a" Stream.nil)
|
||||
--
|
||||
-- This can be used to consume the stream in an imperative manner one element
|
||||
-- at a time, as it just breaks down the stream into individual elements and we
|
||||
-- can loop over them as we deem fit. For example, this can be used to convert
|
||||
-- a streamly stream into other stream types.
|
||||
--
|
||||
-- All the folds in this module can be expressed in terms of 'uncons', however,
|
||||
-- this is generally less efficient than specific folds because it takes apart
|
||||
|
@ -19,14 +19,12 @@ module Streamly.Internal.Data.Stream.Exception
|
||||
, finallyIO
|
||||
, ghandle
|
||||
, handle
|
||||
, retry
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Exception (Exception)
|
||||
import Control.Monad.Catch (MonadCatch)
|
||||
import Control.Monad.IO.Class (MonadIO)
|
||||
import Data.Map.Strict (Map)
|
||||
import Streamly.Internal.Data.Stream.Type (Stream, fromStreamD, toStreamD)
|
||||
|
||||
import qualified Streamly.Internal.Data.Stream.StreamD as D
|
||||
@ -220,46 +218,3 @@ handle :: (MonadCatch m, Exception e)
|
||||
=> (e -> Stream m a) -> Stream m a -> Stream m a
|
||||
handle handler xs =
|
||||
fromStreamD $ D.handle (toStreamD . handler) $ toStreamD xs
|
||||
|
||||
|
||||
-- | @retry@ takes 3 arguments
|
||||
--
|
||||
-- 1. A map @m@ whose keys are exceptions and values are the number of times to
|
||||
-- retry the action given that the exception occurs.
|
||||
--
|
||||
-- 2. A handler @han@ that decides how to handle an exception when the exception
|
||||
-- cannot be retried.
|
||||
--
|
||||
-- 3. The stream itself that we want to run this mechanism on.
|
||||
--
|
||||
-- When evaluating a stream if an exception occurs,
|
||||
--
|
||||
-- 1. The stream evaluation aborts
|
||||
--
|
||||
-- 2. The exception is looked up in @m@
|
||||
--
|
||||
-- a. If the exception exists and the mapped value is > 0 then,
|
||||
--
|
||||
-- i. The value is decreased by 1.
|
||||
--
|
||||
-- ii. The stream is resumed from where the exception was called, retrying
|
||||
-- the action.
|
||||
--
|
||||
-- b. If the exception exists and the mapped value is == 0 then the stream
|
||||
-- evaluation stops.
|
||||
--
|
||||
-- c. If the exception does not exist then we handle the exception using
|
||||
-- @han@.
|
||||
--
|
||||
-- /Internal/
|
||||
--
|
||||
{-# INLINE retry #-}
|
||||
retry :: (MonadCatch m, Exception e, Ord e)
|
||||
=> Map e Int
|
||||
-- ^ map from exception to retry count
|
||||
-> (e -> Stream m a)
|
||||
-- ^ default handler for those exceptions that are not in the map
|
||||
-> Stream m a
|
||||
-> Stream m a
|
||||
retry emap handler inp =
|
||||
fromStreamD $ D.retry emap (toStreamD . handler) $ toStreamD inp
|
||||
|
@ -225,7 +225,7 @@ replicateM n = Stream.sequence . replicate n
|
||||
-- (epoch) denoting the start of the stream and the second component is a time
|
||||
-- relative to the reference.
|
||||
--
|
||||
-- >>> f = Fold.drainBy (\x -> print x >> threadDelay 1000000)
|
||||
-- >>> f = Fold.drainMapM (\x -> print x >> threadDelay 1000000)
|
||||
-- >>> Stream.fold f $ Stream.take 3 $ Stream.times
|
||||
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
|
||||
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
|
||||
@ -242,7 +242,7 @@ times = timesWith 0.01
|
||||
-- | @absTimes@ returns a stream of absolute timestamps using a clock of 10 ms
|
||||
-- granularity.
|
||||
--
|
||||
-- >>> f = Fold.drainBy print
|
||||
-- >>> f = Fold.drainMapM print
|
||||
-- >>> Stream.fold f $ Stream.delayPre 1 $ Stream.take 3 $ Stream.absTimes
|
||||
-- AbsTime (TimeSpec {sec = ..., nsec = ...})
|
||||
-- AbsTime (TimeSpec {sec = ..., nsec = ...})
|
||||
@ -259,7 +259,7 @@ absTimes = fmap (uncurry addToAbsTime64) times
|
||||
-- | @relTimes@ returns a stream of relative time values starting from 0,
|
||||
-- using a clock of granularity 10 ms.
|
||||
--
|
||||
-- >>> f = Fold.drainBy print
|
||||
-- >>> f = Fold.drainMapM print
|
||||
-- >>> Stream.fold f $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimes
|
||||
-- RelTime64 (NanoSecond64 ...)
|
||||
-- RelTime64 (NanoSecond64 ...)
|
||||
@ -373,7 +373,7 @@ iterateM step = fromStreamD . D.iterateM step
|
||||
-- >>> import System.IO.Unsafe (unsafeInterleaveIO)
|
||||
--
|
||||
-- >>> :{
|
||||
-- main = Stream.fold (Fold.drainBy print) $ Stream.mfix f
|
||||
-- main = Stream.fold (Fold.drainMapM print) $ Stream.mfix f
|
||||
-- where
|
||||
-- f action = do
|
||||
-- let incr n act = fmap ((+n) . snd) $ unsafeInterleaveIO act
|
||||
|
@ -20,7 +20,6 @@ module Streamly.Internal.Data.Stream.StreamD.Exception
|
||||
, finally
|
||||
, ghandle
|
||||
, handle
|
||||
, retry
|
||||
)
|
||||
where
|
||||
|
||||
@ -29,13 +28,11 @@ where
|
||||
import Control.Monad.IO.Class (MonadIO(..))
|
||||
import Control.Exception (Exception, SomeException, mask_)
|
||||
import Control.Monad.Catch (MonadCatch)
|
||||
import Data.Map.Strict (Map)
|
||||
import GHC.Exts (inline)
|
||||
import Streamly.Internal.Data.IOFinalizer
|
||||
(newIOFinalizer, runIOFinalizer, clearingIOFinalizer)
|
||||
|
||||
import qualified Control.Monad.Catch as MC
|
||||
import qualified Data.Map.Strict as Map
|
||||
|
||||
import Streamly.Internal.Data.Stream.StreamD.Type
|
||||
|
||||
@ -369,55 +366,3 @@ _handle f (Stream step state) = Stream step' (Left state)
|
||||
Yield x s -> return $ Yield x (Right (Stream step1 s))
|
||||
Skip s -> return $ Skip (Right (Stream step1 s))
|
||||
Stop -> return Stop
|
||||
|
||||
data RetryState emap s1 s2
|
||||
= RetryWithMap emap s1
|
||||
| RetryDefault s2
|
||||
|
||||
-- | See 'Streamly.Internal.Data.Stream.retry'
|
||||
--
|
||||
{-# INLINE_NORMAL retry #-}
|
||||
retry
|
||||
:: forall e m a. (Exception e, Ord e, MonadCatch m)
|
||||
=> Map e Int
|
||||
-- ^ map from exception to retry count
|
||||
-> (e -> Stream m a)
|
||||
-- ^ default handler for those exceptions that are not in the map
|
||||
-> Stream m a
|
||||
-> Stream m a
|
||||
retry emap0 defaultHandler (Stream step0 state0) = Stream step state
|
||||
|
||||
where
|
||||
|
||||
state = RetryWithMap emap0 state0
|
||||
|
||||
{-# INLINE_LATE step #-}
|
||||
step gst (RetryWithMap emap st) = do
|
||||
eres <- MC.try $ step0 gst st
|
||||
case eres of
|
||||
Left e -> handler e emap st
|
||||
Right res ->
|
||||
return
|
||||
$ case res of
|
||||
Yield x st1 -> Yield x $ RetryWithMap emap st1
|
||||
Skip st1 -> Skip $ RetryWithMap emap st1
|
||||
Stop -> Stop
|
||||
step gst (RetryDefault (UnStream step1 state1)) = do
|
||||
res <- step1 gst state1
|
||||
return
|
||||
$ case res of
|
||||
Yield x st1 -> Yield x $ RetryDefault (Stream step1 st1)
|
||||
Skip st1 -> Skip $ RetryDefault (Stream step1 st1)
|
||||
Stop -> Stop
|
||||
|
||||
{-# INLINE handler #-}
|
||||
handler e emap st =
|
||||
return
|
||||
$ Skip
|
||||
$ case Map.lookup e emap of
|
||||
Just i
|
||||
| i > 0 ->
|
||||
let emap1 = Map.insert e (i - 1) emap
|
||||
in RetryWithMap emap1 st
|
||||
| otherwise -> RetryDefault $ defaultHandler e
|
||||
Nothing -> RetryDefault $ defaultHandler e
|
||||
|
@ -72,7 +72,6 @@ module Streamly.Internal.Data.Stream.StreamD.Transform
|
||||
, filterM
|
||||
, deleteBy
|
||||
, uniq
|
||||
, nub
|
||||
|
||||
-- * Trimming
|
||||
-- | Produce a subset of the stream trimmed at ends.
|
||||
@ -133,7 +132,6 @@ import Streamly.Internal.Data.Fold.Type (Fold(..))
|
||||
import Streamly.Internal.Data.Pipe.Type (Pipe(..), PipeState(..))
|
||||
import Streamly.Internal.Data.SVar.Type (defState, adaptState)
|
||||
|
||||
import qualified Data.Set as Set
|
||||
import qualified Streamly.Internal.Data.Fold.Type as FL
|
||||
import qualified Streamly.Internal.Data.Pipe.Type as Pipe
|
||||
|
||||
@ -703,26 +701,6 @@ uniq (Stream step state) = Stream step' (Nothing, state)
|
||||
Skip s -> return $ Skip (Just x, s)
|
||||
Stop -> return Stop
|
||||
|
||||
-- | The memory used is proportional to the number of unique elements in the
|
||||
-- stream. If we want to limit the memory we can just use "take" to limit the
|
||||
-- uniq elements in the stream.
|
||||
{-# INLINE_NORMAL nub #-}
|
||||
nub :: (Monad m, Ord a) => Stream m a -> Stream m a
|
||||
nub (Stream step1 state1) = Stream step (Set.empty, state1)
|
||||
|
||||
where
|
||||
|
||||
step gst (set, st) = do
|
||||
r <- step1 gst st
|
||||
return
|
||||
$ case r of
|
||||
Yield x s ->
|
||||
if Set.member x set
|
||||
then Skip (set, s)
|
||||
else Yield x (Set.insert x set, s)
|
||||
Skip s -> Skip (set, s)
|
||||
Stop -> Stop
|
||||
|
||||
{-# INLINE_NORMAL deleteBy #-}
|
||||
deleteBy :: Monad m => (a -> a -> Bool) -> a -> Stream m a -> Stream m a
|
||||
deleteBy eq x (Stream step state) = Stream step' (state, False)
|
||||
|
@ -323,9 +323,6 @@ foldBreak fld strm = do
|
||||
|
||||
nil = Stream (\_ _ -> return Stop) ()
|
||||
|
||||
-- | If the fold finishes before the stream, we can detect that the fold is
|
||||
-- done by checking if the initial action returns Done. But the remaining
|
||||
-- stream is discarded.
|
||||
{-# INLINE_NORMAL foldContinue #-}
|
||||
foldContinue :: Monad m => Fold m a b -> Stream m a -> Fold m a b
|
||||
foldContinue (Fold fstep finitial fextract) (Stream sstep state) =
|
||||
|
@ -32,14 +32,11 @@ module Streamly.Internal.Data.Stream.Top
|
||||
-- ** Join operations
|
||||
, crossJoin
|
||||
, joinInner
|
||||
, joinInnerMap
|
||||
, joinInnerMerge
|
||||
, joinLeft
|
||||
, mergeLeftJoin
|
||||
, joinLeftMap
|
||||
, joinOuter
|
||||
, mergeOuterJoin
|
||||
, joinOuterMap
|
||||
)
|
||||
where
|
||||
|
||||
@ -56,7 +53,6 @@ import Streamly.Internal.Data.Stream.Common ()
|
||||
import Streamly.Internal.Data.Stream.Type (Stream, fromStreamD, toStreamD)
|
||||
|
||||
import qualified Data.List as List
|
||||
import qualified Data.Map.Strict as Map
|
||||
import qualified Streamly.Internal.Data.Array as Array
|
||||
import qualified Streamly.Internal.Data.Array.Unboxed.Mut.Type as MA
|
||||
import qualified Streamly.Internal.Data.Fold as Fold
|
||||
@ -211,44 +207,6 @@ joinInner eq s1 s2 = do
|
||||
) s2
|
||||
) s1
|
||||
|
||||
-- XXX Generate error if a duplicate insertion is attempted?
|
||||
toMap :: (Monad m, Ord k) => Stream m (k, v) -> m (Map.Map k v)
|
||||
toMap =
|
||||
let f = Fold.foldl' (\kv (k, b) -> Map.insert k b kv) Map.empty
|
||||
in Stream.fold f
|
||||
|
||||
-- If the second stream is too big it can be partitioned based on hashes and
|
||||
-- then we can process one parition at a time.
|
||||
--
|
||||
-- XXX An IntMap may be faster when the keys are Int.
|
||||
-- XXX Use hashmap instead of map?
|
||||
--
|
||||
-- | Like 'joinInner' but uses a 'Map' for efficiency.
|
||||
--
|
||||
-- If the input streams have duplicate keys, the behavior is undefined.
|
||||
--
|
||||
-- For space efficiency use the smaller stream as the second stream.
|
||||
--
|
||||
-- Space: O(n)
|
||||
--
|
||||
-- Time: O(m + n)
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE joinInnerMap #-}
|
||||
joinInnerMap :: (Monad m, Ord k) =>
|
||||
Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, b)
|
||||
joinInnerMap s1 s2 =
|
||||
Stream.concatM $ do
|
||||
km <- toMap s2
|
||||
pure $ Stream.mapMaybe (joinAB km) s1
|
||||
|
||||
where
|
||||
|
||||
joinAB kvm (k, a) =
|
||||
case k `Map.lookup` kvm of
|
||||
Just b -> Just (k, a, b)
|
||||
Nothing -> Nothing
|
||||
|
||||
-- | Like 'joinInner' but works only on sorted streams.
|
||||
--
|
||||
-- Space: O(1)
|
||||
@ -309,28 +267,6 @@ joinLeft eq s1 s2 = Stream.evalStateT (return False) $ do
|
||||
else Stream.nil
|
||||
Nothing -> return (a, Nothing)
|
||||
|
||||
-- | Like 'joinLeft' but uses a hashmap for efficiency.
|
||||
--
|
||||
-- Space: O(n)
|
||||
--
|
||||
-- Time: O(m + n)
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE joinLeftMap #-}
|
||||
joinLeftMap :: (Ord k, Monad m) =>
|
||||
Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, Maybe b)
|
||||
joinLeftMap s1 s2 =
|
||||
Stream.concatM $ do
|
||||
km <- toMap s2
|
||||
return $ fmap (joinAB km) s1
|
||||
|
||||
where
|
||||
|
||||
joinAB km (k, a) =
|
||||
case k `Map.lookup` km of
|
||||
Just b -> (k, a, Just b)
|
||||
Nothing -> (k, a, Nothing)
|
||||
|
||||
-- | Like 'joinLeft' but works only on sorted streams.
|
||||
--
|
||||
-- Space: O(1)
|
||||
@ -412,57 +348,6 @@ joinOuter eq s1 s =
|
||||
else Stream.nil
|
||||
Nothing -> return (Just a, Nothing)
|
||||
|
||||
-- Put the b's that have been paired, in another hash or mutate the hash to set
|
||||
-- a flag. At the end go through @Stream m b@ and find those that are not in that
|
||||
-- hash to return (Nothing, b).
|
||||
--
|
||||
-- | Like 'joinOuter' but uses a 'Map' for efficiency.
|
||||
--
|
||||
-- Space: O(m + n)
|
||||
--
|
||||
-- Time: O(m + n)
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE joinOuterMap #-}
|
||||
joinOuterMap ::
|
||||
(Ord k, MonadIO m) =>
|
||||
Stream m (k, a) -> Stream m (k, b) -> Stream m (k, Maybe a, Maybe b)
|
||||
joinOuterMap s1 s2 =
|
||||
Stream.concatM $ do
|
||||
km1 <- kvFold s1
|
||||
km2 <- kvFold s2
|
||||
|
||||
-- XXX Not sure if toList/fromList would fuse optimally. We may have to
|
||||
-- create a fused Map.toStream function.
|
||||
let res1 = fmap (joinAB km2)
|
||||
$ Stream.fromList $ Map.toList km1
|
||||
where
|
||||
joinAB km (k, a) =
|
||||
case k `Map.lookup` km of
|
||||
Just b -> (k, Just a, Just b)
|
||||
Nothing -> (k, Just a, Nothing)
|
||||
|
||||
-- XXX We can take advantage of the lookups in the first pass above to
|
||||
-- reduce the number of lookups in this pass. If we keep mutable cells
|
||||
-- in the second Map, we can flag it in the first pass and not do any
|
||||
-- lookup in the second pass if it is flagged.
|
||||
let res2 = Stream.mapMaybe (joinAB km1)
|
||||
$ Stream.fromList $ Map.toList km2
|
||||
where
|
||||
joinAB km (k, b) =
|
||||
case k `Map.lookup` km of
|
||||
Just _ -> Nothing
|
||||
Nothing -> Just (k, Nothing, Just b)
|
||||
|
||||
return $ Stream.append res1 res2
|
||||
|
||||
where
|
||||
|
||||
-- XXX Generate error if a duplicate insertion is attempted?
|
||||
kvFold =
|
||||
let f = Fold.foldl' (\kv (k, b) -> Map.insert k b kv) Map.empty
|
||||
in Stream.fold f
|
||||
|
||||
-- | Like 'joinOuter' but works only on sorted streams.
|
||||
--
|
||||
-- Space: O(1)
|
||||
|
@ -313,7 +313,7 @@ sequence = mapM id
|
||||
-- @
|
||||
--
|
||||
-- >>> s = Stream.enumerateFromTo 1 2
|
||||
-- >>> Stream.fold Fold.drain $ Stream.tap (Fold.drainBy print) s
|
||||
-- >>> Stream.fold Fold.drain $ Stream.tap (Fold.drainMapM print) s
|
||||
-- 1
|
||||
-- 2
|
||||
--
|
||||
@ -770,7 +770,7 @@ interspersePrefix_ m = mapM (\x -> void m >> return x)
|
||||
-- | Introduce a delay of specified seconds between elements of the stream.
|
||||
--
|
||||
-- >>> input = Stream.enumerateFromTo 1 3
|
||||
-- >>> Stream.fold (Fold.drainBy print) $ Stream.delay 1 input
|
||||
-- >>> Stream.fold (Fold.drainMapM print) $ Stream.delay 1 input
|
||||
-- 1
|
||||
-- 2
|
||||
-- 3
|
||||
@ -786,7 +786,7 @@ delay n = intersperseM_ $ liftIO $ threadDelay $ round $ n * 1000000
|
||||
-- stream.
|
||||
--
|
||||
-- >>> input = Stream.enumerateFromTo 1 3
|
||||
-- >>> Stream.fold (Fold.drainBy print) $ Stream.delayPost 1 input
|
||||
-- >>> Stream.fold (Fold.drainMapM print) $ Stream.delayPost 1 input
|
||||
-- 1
|
||||
-- 2
|
||||
-- 3
|
||||
@ -801,7 +801,7 @@ delayPost n = intersperseSuffix_ $ liftIO $ threadDelay $ round $ n * 1000000
|
||||
-- stream.
|
||||
--
|
||||
-- >>> input = Stream.enumerateFromTo 1 3
|
||||
-- >>> Stream.fold (Fold.drainBy print) $ Stream.delayPre 1 input
|
||||
-- >>> Stream.fold (Fold.drainMapM print) $ Stream.delayPre 1 input
|
||||
-- 1
|
||||
-- 2
|
||||
-- 3
|
||||
|
@ -18,6 +18,7 @@ module Streamly.Internal.Data.Stream.Type
|
||||
, toStreamK
|
||||
, fromStreamD
|
||||
, toStreamD
|
||||
, Streamly.Internal.Data.Stream.Type.fromList
|
||||
|
||||
-- * Construction
|
||||
, cons
|
||||
@ -122,6 +123,20 @@ fromStreamD = fromStreamK . D.toStreamK
|
||||
toStreamD :: Applicative m => Stream m a -> D.Stream m a
|
||||
toStreamD = D.fromStreamK . toStreamK
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Generation
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
-- |
|
||||
-- >>> fromList = Prelude.foldr Stream.cons Stream.nil
|
||||
--
|
||||
-- Construct a stream from a list of pure values. This is more efficient than
|
||||
-- 'fromFoldable'.
|
||||
--
|
||||
{-# INLINE fromList #-}
|
||||
fromList :: Monad m => [a] -> Stream m a
|
||||
fromList = fromStreamK . P.fromList
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Comparison
|
||||
------------------------------------------------------------------------------
|
||||
@ -320,7 +335,7 @@ instance Show a => Show (Stream Identity a) where
|
||||
instance Read a => Read (Stream Identity a) where
|
||||
readPrec = parens $ prec 10 $ do
|
||||
Ident "fromList" <- lexP
|
||||
fromList <$> readPrec
|
||||
Streamly.Internal.Data.Stream.Type.fromList <$> readPrec
|
||||
|
||||
readListPrec = readListPrecDefault
|
||||
|
||||
@ -450,7 +465,7 @@ infixr 5 `consM`
|
||||
consM :: Monad m => m a -> Stream m a -> Stream m a
|
||||
consM m = fromStreamK . K.consM m . toStreamK
|
||||
|
||||
-- | A pure empty stream with no result and no side-effect.
|
||||
-- | A stream that terminates without producing any output or side effect.
|
||||
--
|
||||
-- >>> Stream.fold Fold.toList Stream.nil
|
||||
-- []
|
||||
@ -459,7 +474,8 @@ consM m = fromStreamK . K.consM m . toStreamK
|
||||
nil :: Stream m a
|
||||
nil = fromStreamK K.nil
|
||||
|
||||
-- | An empty stream producing the supplied side effect.
|
||||
-- | A stream that terminates without producing any output, but produces a side
|
||||
-- effect.
|
||||
--
|
||||
-- >>> Stream.fold Fold.toList (Stream.nilM (print "nil"))
|
||||
-- "nil"
|
||||
|
@ -114,7 +114,7 @@ import Streamly.Internal.System.IO (defaultChunkSize)
|
||||
import qualified Streamly.Data.Array.Unboxed as A
|
||||
import qualified Streamly.Data.Unfold as UF
|
||||
import qualified Streamly.Internal.Data.Fold.Type as FL
|
||||
(Step(..), snoc, initialize)
|
||||
(Step(..), snoc, reduce)
|
||||
import qualified Streamly.Internal.Data.Unfold as UF (bracketIO)
|
||||
import qualified Streamly.Internal.FileSystem.Handle as FH
|
||||
import qualified Streamly.Internal.Data.Array.Unboxed.Stream as AS
|
||||
@ -432,7 +432,7 @@ writeChunks path = Fold step initial extract
|
||||
where
|
||||
initial = do
|
||||
h <- liftIO (openFile path WriteMode)
|
||||
fld <- FL.initialize (FH.writeChunks h)
|
||||
fld <- FL.reduce (FH.writeChunks h)
|
||||
`MC.onException` liftIO (hClose h)
|
||||
return $ FL.Partial (fld, h)
|
||||
step (fld, h) x = do
|
||||
|
@ -405,7 +405,7 @@ putChunk h arr = A.asPtrUnsafe arr $ \ptr ->
|
||||
--
|
||||
{-# INLINE putChunks #-}
|
||||
putChunks :: MonadIO m => Handle -> Stream m (Array a) -> m ()
|
||||
putChunks h = S.fold (FL.drainBy (putChunk h))
|
||||
putChunks h = S.fold (FL.drainMapM (putChunk h))
|
||||
|
||||
-- XXX AS.compact can be written idiomatically in terms of foldMany, just like
|
||||
-- AS.concat is written in terms of foldMany. Once that is done we can write
|
||||
@ -452,7 +452,7 @@ putBytes = putBytesWith defaultChunkSize
|
||||
--
|
||||
{-# INLINE writeChunks #-}
|
||||
writeChunks :: MonadIO m => Handle -> Fold m (Array a) ()
|
||||
writeChunks h = FL.drainBy (putChunk h)
|
||||
writeChunks h = FL.drainMapM (putChunk h)
|
||||
|
||||
-- | Like writeChunks but uses the experimental 'Refold' API.
|
||||
--
|
||||
|
@ -239,7 +239,6 @@ library
|
||||
, Streamly.Internal.Control.ForkIO
|
||||
, Streamly.Internal.Data.Cont
|
||||
, Streamly.Internal.System.IO
|
||||
, Streamly.Internal.Data.IsMap
|
||||
|
||||
-- streamly-strict-data
|
||||
, Streamly.Internal.Data.Tuple.Strict
|
||||
@ -397,6 +396,11 @@ library
|
||||
, Streamly.Internal.Data.Stream.StreamDK.Type
|
||||
|
||||
build-depends:
|
||||
-- streamly-base
|
||||
--
|
||||
-- These dependencies can be reversed if we want
|
||||
-- streamly-base to depend only on base.
|
||||
--
|
||||
-- Core libraries shipped with ghc, the min and max
|
||||
-- constraints of these libraries should match with
|
||||
-- the GHC versions we support. This is to make sure that
|
||||
@ -404,18 +408,23 @@ library
|
||||
-- depending on doctest is a common example) can
|
||||
-- depend on streamly.
|
||||
base >= 4.12 && < 4.18
|
||||
, containers >= 0.6.0 && < 0.7
|
||||
, deepseq >= 1.4.4 && < 1.5
|
||||
, directory >= 1.3.3 && < 1.4
|
||||
, exceptions >= 0.8.0 && < 0.11
|
||||
, ghc-prim >= 0.5.3 && < 0.10
|
||||
, mtl >= 2.2.2 && < 2.4
|
||||
, transformers >= 0.5.5 && < 0.7
|
||||
|
||||
-- streamly-unicode-core
|
||||
, template-haskell >= 2.14 && < 2.20
|
||||
|
||||
-- streamly-filesystem-core
|
||||
, directory >= 1.3.3 && < 1.4
|
||||
|
||||
-- fusion-plugin
|
||||
, fusion-plugin-types >= 0.1 && < 0.2
|
||||
|
||||
-- XXX to be removed
|
||||
, containers >= 0.6.0 && < 0.7
|
||||
, heaps >= 0.3 && < 0.5
|
||||
if !flag(use-unliftio)
|
||||
build-depends: monad-control >= 1.0 && < 1.1
|
||||
|
623
src/Streamly/Internal/Data/Fold/Extra.hs
Normal file
623
src/Streamly/Internal/Data/Fold/Extra.hs
Normal file
@ -0,0 +1,623 @@
|
||||
-- |
|
||||
-- Module : Streamly.Internal.Data.Fold.Extra
|
||||
-- Copyright : (c) 2019 Composewell Technologies
|
||||
-- License : BSD3
|
||||
-- Maintainer : streamly@composewell.com
|
||||
-- Stability : experimental
|
||||
-- Portability : GHC
|
||||
--
|
||||
|
||||
module Streamly.Internal.Data.Fold.Extra
|
||||
(
|
||||
-- * Imports
|
||||
-- $setup
|
||||
|
||||
countDistinct
|
||||
, countDistinctInt
|
||||
, frequency
|
||||
, toMap
|
||||
, nub
|
||||
, nubInt
|
||||
|
||||
-- ** Demultiplexing
|
||||
-- | Direct values in the input stream to different folds using an n-ary
|
||||
-- fold selector. 'demux' is a generalization of 'classify' (and
|
||||
-- 'partition') where each key of the classifier can use a different fold.
|
||||
, demux
|
||||
, demuxWith
|
||||
, demuxScanWith
|
||||
, demuxScanMutWith
|
||||
, demuxMutWith
|
||||
|
||||
-- TODO: These can be implemented using the above operations
|
||||
-- , demuxWithSel -- Stop when the fold for the specified key stops
|
||||
-- , demuxWithMin -- Stop when any of the folds stop
|
||||
-- , demuxWithAll -- Stop when all the folds stop (run once)
|
||||
|
||||
-- ** Classifying
|
||||
-- | In an input stream of key value pairs fold values for different keys
|
||||
-- in individual output buckets using the given fold. 'classify' is a
|
||||
-- special case of 'demux' where all the branches of the demultiplexer use
|
||||
-- the same fold.
|
||||
--
|
||||
-- Different types of maps can be used with these combinators via the IsMap
|
||||
-- type class. Hashmap performs better when there are more collisions, trie
|
||||
-- Map performs better otherwise. Trie has an advantage of sorting the keys
|
||||
-- at the same time. For example if we want to store a dictionary of words
|
||||
-- and their meanings then trie Map would be better if we also want to
|
||||
-- display them in sorted order.
|
||||
|
||||
, classify
|
||||
, classifyWith
|
||||
, classifyMutWith
|
||||
, classifyScanWith
|
||||
-- , classifyWithSel
|
||||
-- , classifyWithMin
|
||||
)
|
||||
where
|
||||
|
||||
#include "inline.hs"
|
||||
#include "ArrayMacros.h"
|
||||
|
||||
import Control.Monad.IO.Class (MonadIO(..))
|
||||
import Data.IORef (newIORef, readIORef, writeIORef)
|
||||
import Data.Map.Strict (Map)
|
||||
import Streamly.Internal.Data.IsMap (IsMap(..))
|
||||
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..))
|
||||
|
||||
import qualified Data.IntSet as IntSet
|
||||
import qualified Data.Set as Set
|
||||
import qualified Prelude
|
||||
import qualified Streamly.Internal.Data.IsMap as IsMap
|
||||
|
||||
import Prelude hiding
|
||||
( filter, foldl1, drop, dropWhile, take, takeWhile, zipWith
|
||||
, foldl, foldr, map, mapM_, sequence, all, any, sum, product, elem
|
||||
, notElem, maximum, minimum, head, last, tail, length, null
|
||||
, reverse, iterate, init, and, or, lookup, (!!)
|
||||
, scanl, scanl1, replicate, concatMap, mconcat, foldMap, unzip
|
||||
, span, splitAt, break, mapM, zip, maybe)
|
||||
import Streamly.Internal.Data.Fold
|
||||
|
||||
-- $setup
|
||||
-- >>> :m
|
||||
-- >>> import qualified Streamly.Data.Fold as Fold
|
||||
-- >>> import qualified Streamly.Data.Stream as Stream
|
||||
-- >>> import qualified Streamly.Internal.Data.Fold.Extra as Fold
|
||||
|
||||
-- XXX Name as nubOrd? Or write a nubGeneric
|
||||
|
||||
-- | Used as a scan. Returns 'Just' for the first occurrence of an element,
|
||||
-- returns 'Nothing' for any other occurrences.
|
||||
--
|
||||
-- Example:
|
||||
--
|
||||
-- >>> stream = Stream.fromList [1::Int,1,2,3,4,4,5,1,5,7]
|
||||
-- >>> Stream.fold Fold.toList $ Stream.scanMaybe Fold.nub stream
|
||||
-- [1,2,3,4,5,7]
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE nub #-}
|
||||
nub :: (Monad m, Ord a) => Fold m a (Maybe a)
|
||||
nub = fmap (\(Tuple' _ x) -> x) $ foldl' step initial
|
||||
|
||||
where
|
||||
|
||||
initial = Tuple' Set.empty Nothing
|
||||
|
||||
step (Tuple' set _) x =
|
||||
if Set.member x set
|
||||
then Tuple' set Nothing
|
||||
else Tuple' (Set.insert x set) (Just x)
|
||||
|
||||
-- | Like 'nub' but specialized to a stream of 'Int', for better performance.
|
||||
--
|
||||
-- Definition:
|
||||
--
|
||||
-- >>> nubInt = Fold.nub
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE nubInt #-}
|
||||
nubInt :: Monad m => Fold m Int (Maybe Int)
|
||||
nubInt = fmap (\(Tuple' _ x) -> x) $ foldl' step initial
|
||||
|
||||
where
|
||||
|
||||
initial = Tuple' IntSet.empty Nothing
|
||||
|
||||
step (Tuple' set _) x =
|
||||
if IntSet.member x set
|
||||
then Tuple' set Nothing
|
||||
else Tuple' (IntSet.insert x set) (Just x)
|
||||
|
||||
-- XXX Try Hash set
|
||||
-- XXX Add a countDistinct window fold
|
||||
-- XXX Add a bloom filter fold
|
||||
|
||||
-- | Count non-duplicate elements in the stream.
|
||||
--
|
||||
-- Definition:
|
||||
--
|
||||
-- >>> countDistinct = Fold.postscan Fold.nub $ Fold.catMaybes $ Fold.length
|
||||
--
|
||||
-- The memory used is proportional to the number of distinct elements in the
|
||||
-- stream, to guard against using too much memory use it as a scan and
|
||||
-- terminate if the count reaches more than a threshold.
|
||||
--
|
||||
-- /Space/: \(\mathcal{O}(n)\)
|
||||
--
|
||||
-- /Pre-release/
|
||||
--
|
||||
{-# INLINE countDistinct #-}
|
||||
countDistinct :: (Monad m, Ord a) => Fold m a Int
|
||||
countDistinct = postscan nub $ catMaybes length
|
||||
{-
|
||||
countDistinct = fmap (\(Tuple' _ n) -> n) $ foldl' step initial
|
||||
|
||||
where
|
||||
|
||||
initial = Tuple' Set.empty 0
|
||||
|
||||
step (Tuple' set n) x = do
|
||||
if Set.member x set
|
||||
then
|
||||
Tuple' set n
|
||||
else
|
||||
let cnt = n + 1
|
||||
in Tuple' (Set.insert x set) cnt
|
||||
-}
|
||||
|
||||
-- | Like 'countDistinct' but specialized to a stream of 'Int', for better
|
||||
-- performance.
|
||||
--
|
||||
-- Definition:
|
||||
--
|
||||
-- >>> countDistinctInt = Fold.postscan Fold.nubInt $ Fold.catMaybes $ Fold.length
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE countDistinctInt #-}
|
||||
countDistinctInt :: Monad m => Fold m Int Int
|
||||
countDistinctInt = postscan nubInt $ catMaybes length
|
||||
{-
|
||||
countDistinctInt = fmap (\(Tuple' _ n) -> n) $ foldl' step initial
|
||||
|
||||
where
|
||||
|
||||
initial = Tuple' IntSet.empty 0
|
||||
|
||||
step (Tuple' set n) x = do
|
||||
if IntSet.member x set
|
||||
then
|
||||
Tuple' set n
|
||||
else
|
||||
let cnt = n + 1
|
||||
in Tuple' (IntSet.insert x set) cnt
|
||||
-}
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- demux: in a key value stream fold each key sub-stream with a different fold
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
-- TODO Demultiplex an input element into a number of typed variants. We want
|
||||
-- to statically restrict the target values within a set of predefined types,
|
||||
-- an enumeration of a GADT.
|
||||
--
|
||||
-- This is the consumer side dual of the producer side 'mux' operation (XXX to
|
||||
-- be implemented).
|
||||
--
|
||||
-- XXX If we use Refold in it, it can perhaps fuse/be more efficient. For
|
||||
-- example we can store just the result rather than storing the whole fold in
|
||||
-- the Map.
|
||||
--
|
||||
-- Note: There are separate functions to determine Key and Fold from the input
|
||||
-- because key is to be determined on each input whereas fold is to be
|
||||
-- determined only once for a key.
|
||||
|
||||
-- | In a key value stream, fold values corresponding to each key with a key
|
||||
-- specific fold. The fold returns the fold result as the second component of
|
||||
-- the output tuple whenever a fold terminates. The first component of the
|
||||
-- tuple is a container of in-progress folds. If a fold terminates, another
|
||||
-- instance of the fold is started upon receiving an input with that key.
|
||||
--
|
||||
-- This can be used to scan a stream and collect the results from the scan
|
||||
-- output.
|
||||
--
|
||||
-- /Pre-release/
|
||||
--
|
||||
{-# INLINE demuxScanWith #-}
|
||||
demuxScanWith :: (Monad m, IsMap f, Traversable f) =>
|
||||
(a -> Key f)
|
||||
-> (a -> m (Fold m a b))
|
||||
-> Fold m a (m (f b), Maybe (Key f, b))
|
||||
demuxScanWith getKey getFold = fmap extract $ foldlM' step initial
|
||||
|
||||
where
|
||||
|
||||
initial = return $ Tuple' IsMap.mapEmpty Nothing
|
||||
|
||||
{-# INLINE runFold #-}
|
||||
runFold kv (Fold step1 initial1 extract1) (k, a) = do
|
||||
res <- initial1
|
||||
case res of
|
||||
Partial s -> do
|
||||
res1 <- step1 s a
|
||||
return
|
||||
$ case res1 of
|
||||
Partial _ ->
|
||||
let fld = Fold step1 (return res1) extract1
|
||||
in Tuple' (IsMap.mapInsert k fld kv) Nothing
|
||||
Done b -> Tuple' (IsMap.mapDelete k kv) (Just (k, b))
|
||||
Done b -> return $ Tuple' kv (Just (k, b))
|
||||
|
||||
step (Tuple' kv _) a = do
|
||||
let k = getKey a
|
||||
case IsMap.mapLookup k kv of
|
||||
Nothing -> do
|
||||
fld <- getFold a
|
||||
runFold kv fld (k, a)
|
||||
Just f -> runFold kv f (k, a)
|
||||
|
||||
extract (Tuple' kv x) = (Prelude.mapM f kv, x)
|
||||
|
||||
where
|
||||
|
||||
f (Fold _ i e) = do
|
||||
r <- i
|
||||
case r of
|
||||
Partial s -> e s
|
||||
Done b -> return b
|
||||
|
||||
-- | This is specialized version of 'demuxScanWith' that uses mutable cells as
|
||||
-- fold accumulators for better performance.
|
||||
--
|
||||
-- Definition:
|
||||
--
|
||||
-- >>> demuxScanMutWith = Fold.demuxScanWith
|
||||
--
|
||||
{-# INLINE demuxScanMutWith #-}
|
||||
demuxScanMutWith :: (MonadIO m, IsMap f, Traversable f) =>
|
||||
(a -> Key f)
|
||||
-> (a -> m (Fold m a b))
|
||||
-> Fold m a (m (f b), Maybe (Key f, b))
|
||||
demuxScanMutWith getKey getFold = fmap extract $ foldlM' step initial
|
||||
|
||||
where
|
||||
|
||||
initial = return $ Tuple' IsMap.mapEmpty Nothing
|
||||
|
||||
{-# INLINE initFold #-}
|
||||
initFold kv (Fold step1 initial1 extract1) (k, a) = do
|
||||
res <- initial1
|
||||
case res of
|
||||
Partial s -> do
|
||||
res1 <- step1 s a
|
||||
case res1 of
|
||||
Partial _ -> do
|
||||
let fld = Fold step1 (return res1) extract1
|
||||
ref <- liftIO $ newIORef fld
|
||||
return $ Tuple' (IsMap.mapInsert k ref kv) Nothing
|
||||
Done b -> return $ Tuple' kv (Just (k, b))
|
||||
Done b -> return $ Tuple' kv (Just (k, b))
|
||||
|
||||
{-# INLINE runFold #-}
|
||||
runFold kv ref (Fold step1 initial1 extract1) (k, a) = do
|
||||
res <- initial1
|
||||
case res of
|
||||
Partial s -> do
|
||||
res1 <- step1 s a
|
||||
case res1 of
|
||||
Partial _ -> do
|
||||
let fld = Fold step1 (return res1) extract1
|
||||
liftIO $ writeIORef ref fld
|
||||
return $ Tuple' kv Nothing
|
||||
Done b ->
|
||||
let kv1 = IsMap.mapDelete k kv
|
||||
in return $ Tuple' kv1 (Just (k, b))
|
||||
Done _ -> error "demuxScanMutWith: unreachable"
|
||||
|
||||
step (Tuple' kv _) a = do
|
||||
let k = getKey a
|
||||
case IsMap.mapLookup k kv of
|
||||
Nothing -> do
|
||||
f <- getFold a
|
||||
initFold kv f (k, a)
|
||||
Just ref -> do
|
||||
f <- liftIO $ readIORef ref
|
||||
runFold kv ref f (k, a)
|
||||
|
||||
extract (Tuple' kv x) = (Prelude.mapM f kv, x)
|
||||
|
||||
where
|
||||
|
||||
f ref = do
|
||||
(Fold _ i e) <- liftIO $ readIORef ref
|
||||
r <- i
|
||||
case r of
|
||||
Partial s -> e s
|
||||
Done b -> return b
|
||||
|
||||
-- | This collects all the results of 'demuxScanWith' in a container.
|
||||
--
|
||||
{-# INLINE demuxWith #-}
|
||||
demuxWith :: (Monad m, IsMap f, Traversable f) =>
|
||||
(a -> Key f) -> (a -> m (Fold m a b)) -> Fold m a (f b)
|
||||
demuxWith getKey getFold =
|
||||
let
|
||||
classifier = demuxScanWith getKey getFold
|
||||
getMap Nothing = pure IsMap.mapEmpty
|
||||
getMap (Just action) = action
|
||||
aggregator =
|
||||
teeWith IsMap.mapUnion
|
||||
(rmapM getMap $ lmap fst latest)
|
||||
(lmap snd $ catMaybes toMap)
|
||||
in postscan classifier aggregator
|
||||
|
||||
-- | Same as 'demuxWith' but uses 'demuxScanMutWith' for better performance.
|
||||
--
|
||||
-- Definition:
|
||||
--
|
||||
-- >>> demuxMutWith = Fold.demuxWith
|
||||
--
|
||||
{-# INLINE demuxMutWith #-}
|
||||
demuxMutWith :: (MonadIO m, IsMap f, Traversable f) =>
|
||||
(a -> Key f) -> (a -> m (Fold m a b)) -> Fold m a (f b)
|
||||
demuxMutWith getKey getFold =
|
||||
let
|
||||
classifier = demuxScanMutWith getKey getFold
|
||||
getMap Nothing = pure IsMap.mapEmpty
|
||||
getMap (Just action) = action
|
||||
aggregator =
|
||||
teeWith IsMap.mapUnion
|
||||
(rmapM getMap $ lmap fst latest)
|
||||
(lmap snd $ catMaybes toMap)
|
||||
in postscan classifier aggregator
|
||||
|
||||
-- | Fold a stream of key value pairs using a function that maps keys to folds.
|
||||
--
|
||||
-- Definition:
|
||||
--
|
||||
-- >>> demux f = Fold.demuxWith fst (Fold.lmap snd . f)
|
||||
--
|
||||
-- Example:
|
||||
--
|
||||
-- >>> import Data.Map (Map)
|
||||
-- >>> :{
|
||||
-- let f "SUM" = return Fold.sum
|
||||
-- f _ = return Fold.product
|
||||
-- input = Stream.fromList [("SUM",1),("PRODUCT",2),("SUM",3),("PRODUCT",4)]
|
||||
-- in Stream.fold (Fold.demux f) input :: IO (Map String Int)
|
||||
-- :}
|
||||
-- fromList [("PRODUCT",8),("SUM",4)]
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE demux #-}
|
||||
demux :: (Monad m, IsMap f, Traversable f) =>
|
||||
(Key f -> m (Fold m a b)) -> Fold m (Key f, a) (f b)
|
||||
demux f = demuxWith fst (\(k, _) -> fmap (lmap snd) (f k))
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Classify: Like demux but uses the same fold for all keys.
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
-- XXX Change these to make the behavior similar to demux* variants. We can
|
||||
-- implement this using classifyScanManyWith. Maintain a set of done folds in
|
||||
-- the underlying monad, and when initial is called look it up, if the fold is
|
||||
-- done then initial would set a flag in the state to ignore the input or
|
||||
-- return an error.
|
||||
|
||||
-- | Folds the values for each key using the supplied fold. When scanning, as
|
||||
-- soon as the fold is complete, its result is available in the second
|
||||
-- component of the tuple. The first component of the tuple is a snapshot of
|
||||
-- the in-progress folds.
|
||||
--
|
||||
-- Once the fold for a key is done, any future values of the key are ignored.
|
||||
--
|
||||
-- Definition:
|
||||
--
|
||||
-- >>> classifyScanWith f fld = Fold.demuxScanWith f (const fld)
|
||||
--
|
||||
{-# INLINE classifyScanWith #-}
|
||||
classifyScanWith :: (Monad m, IsMap f, Traversable f, Ord (Key f)) =>
|
||||
-- Note: we need to return the Map itself to display the in-progress values
|
||||
-- e.g. to implement top. We could possibly create a separate abstraction
|
||||
-- for that use case. We return an action because we want it to be lazy so
|
||||
-- that the downstream consumers can choose to process or discard it.
|
||||
(a -> Key f) -> Fold m a b -> Fold m a (m (f b), Maybe (Key f, b))
|
||||
classifyScanWith f (Fold step1 initial1 extract1) =
|
||||
fmap extract $ foldlM' step initial
|
||||
|
||||
where
|
||||
|
||||
initial = return $ Tuple3' IsMap.mapEmpty Set.empty Nothing
|
||||
|
||||
{-# INLINE initFold #-}
|
||||
initFold kv set k a = do
|
||||
x <- initial1
|
||||
case x of
|
||||
Partial s -> do
|
||||
r <- step1 s a
|
||||
return
|
||||
$ case r of
|
||||
Partial s1 ->
|
||||
Tuple3' (IsMap.mapInsert k s1 kv) set Nothing
|
||||
Done b ->
|
||||
Tuple3' kv set (Just (k, b))
|
||||
Done b -> return (Tuple3' kv (Set.insert k set) (Just (k, b)))
|
||||
|
||||
step (Tuple3' kv set _) a = do
|
||||
let k = f a
|
||||
case IsMap.mapLookup k kv of
|
||||
Nothing -> do
|
||||
if Set.member k set
|
||||
then return (Tuple3' kv set Nothing)
|
||||
else initFold kv set k a
|
||||
Just s -> do
|
||||
r <- step1 s a
|
||||
return
|
||||
$ case r of
|
||||
Partial s1 ->
|
||||
Tuple3' (IsMap.mapInsert k s1 kv) set Nothing
|
||||
Done b ->
|
||||
let kv1 = IsMap.mapDelete k kv
|
||||
in Tuple3' kv1 (Set.insert k set) (Just (k, b))
|
||||
|
||||
extract (Tuple3' kv _ x) = (Prelude.mapM extract1 kv, x)
|
||||
|
||||
-- XXX we can use a Prim IORef if we can constrain the state "s" to be Prim
|
||||
--
|
||||
-- The code is almost the same as classifyScanWith except the IORef operations.
|
||||
|
||||
-- | Same as classifyScanWith except that it uses mutable IORef cells in the
|
||||
-- Map providing better performance. Be aware that if this is used as a scan,
|
||||
-- the values in the intermediate Maps would be mutable.
|
||||
--
|
||||
-- Definitions:
|
||||
--
|
||||
-- >>> classifyScanMutWith = Fold.classifyScanWith
|
||||
-- >>> classifyScanMutWith f fld = Fold.demuxScanMutWith f (const fld)
|
||||
--
|
||||
{-# INLINE classifyScanMutWith #-}
|
||||
classifyScanMutWith :: (MonadIO m, IsMap f, Traversable f, Ord (Key f)) =>
|
||||
(a -> Key f) -> Fold m a b -> Fold m a (m (f b), Maybe (Key f, b))
|
||||
classifyScanMutWith f (Fold step1 initial1 extract1) =
|
||||
fmap extract $ foldlM' step initial
|
||||
|
||||
where
|
||||
|
||||
initial = return $ Tuple3' IsMap.mapEmpty Set.empty Nothing
|
||||
|
||||
{-# INLINE initFold #-}
|
||||
initFold kv set k a = do
|
||||
x <- initial1
|
||||
case x of
|
||||
Partial s -> do
|
||||
r <- step1 s a
|
||||
case r of
|
||||
Partial s1 -> do
|
||||
ref <- liftIO $ newIORef s1
|
||||
return $ Tuple3' (IsMap.mapInsert k ref kv) set Nothing
|
||||
Done b ->
|
||||
return $ Tuple3' kv set (Just (k, b))
|
||||
Done b -> return (Tuple3' kv (Set.insert k set) (Just (k, b)))
|
||||
|
||||
step (Tuple3' kv set _) a = do
|
||||
let k = f a
|
||||
case IsMap.mapLookup k kv of
|
||||
Nothing -> do
|
||||
if Set.member k set
|
||||
then return (Tuple3' kv set Nothing)
|
||||
else initFold kv set k a
|
||||
Just ref -> do
|
||||
s <- liftIO $ readIORef ref
|
||||
r <- step1 s a
|
||||
case r of
|
||||
Partial s1 -> do
|
||||
liftIO $ writeIORef ref s1
|
||||
return $ Tuple3' kv set Nothing
|
||||
Done b ->
|
||||
let kv1 = IsMap.mapDelete k kv
|
||||
in return
|
||||
$ Tuple3' kv1 (Set.insert k set) (Just (k, b))
|
||||
|
||||
extract (Tuple3' kv _ x) =
|
||||
(Prelude.mapM (\ref -> liftIO (readIORef ref) >>= extract1) kv, x)
|
||||
|
||||
-- | Fold a key value stream to a key-value container. If the same key appears
|
||||
-- multiple times, only the last value is retained.
|
||||
{-# INLINE toMap #-}
|
||||
toMap :: (Monad m, IsMap f) => Fold m (Key f, a) (f a)
|
||||
toMap = foldl' (\kv (k, v) -> IsMap.mapInsert k v kv) IsMap.mapEmpty
|
||||
|
||||
-- | Split the input stream based on a key field and fold each split using the
|
||||
-- given fold. Useful for map/reduce, bucketizing the input in different bins
|
||||
-- or for generating histograms.
|
||||
--
|
||||
-- Example:
|
||||
--
|
||||
-- >>> import Data.Map.Strict (Map)
|
||||
-- >>> :{
|
||||
-- let input = Stream.fromList [("ONE",1),("ONE",1.1),("TWO",2), ("TWO",2.2)]
|
||||
-- classify = Fold.classifyWith fst (Fold.lmap snd Fold.toList)
|
||||
-- in Stream.fold classify input :: IO (Map String [Double])
|
||||
-- :}
|
||||
-- fromList [("ONE",[1.0,1.1]),("TWO",[2.0,2.2])]
|
||||
--
|
||||
-- Once the classifier fold terminates for a particular key any further inputs
|
||||
-- in that bucket are ignored.
|
||||
--
|
||||
-- Space used is proportional to the number of keys seen till now and
|
||||
-- monotonically increases because it stores whether a key has been seen or
|
||||
-- not.
|
||||
--
|
||||
-- /Stops: never/
|
||||
--
|
||||
-- /Pre-release/
|
||||
--
|
||||
{-# INLINE classifyWith #-}
|
||||
classifyWith :: (Monad m, IsMap f, Traversable f, Ord (Key f)) =>
|
||||
(a -> Key f) -> Fold m a b -> Fold m a (f b)
|
||||
classifyWith f fld =
|
||||
let
|
||||
classifier = classifyScanWith f fld
|
||||
getMap Nothing = pure IsMap.mapEmpty
|
||||
getMap (Just action) = action
|
||||
aggregator =
|
||||
teeWith IsMap.mapUnion
|
||||
(rmapM getMap $ lmap fst latest)
|
||||
(lmap snd $ catMaybes toMap)
|
||||
in postscan classifier aggregator
|
||||
|
||||
-- | Same as 'classifyWith' but maybe faster because it uses mutable cells as
|
||||
-- fold accumulators in the Map.
|
||||
--
|
||||
-- Definition:
|
||||
--
|
||||
-- >>> classifyMutWith = Fold.classifyWith
|
||||
--
|
||||
{-# INLINE classifyMutWith #-}
|
||||
classifyMutWith :: (MonadIO m, IsMap f, Traversable f, Ord (Key f)) =>
|
||||
(a -> Key f) -> Fold m a b -> Fold m a (f b)
|
||||
classifyMutWith f fld =
|
||||
let
|
||||
classifier = classifyScanMutWith f fld
|
||||
getMap Nothing = pure IsMap.mapEmpty
|
||||
getMap (Just action) = action
|
||||
aggregator =
|
||||
teeWith IsMap.mapUnion
|
||||
(rmapM getMap $ lmap fst latest)
|
||||
(lmap snd $ catMaybes toMap)
|
||||
in postscan classifier aggregator
|
||||
|
||||
-- | Given an input stream of key value pairs and a fold for values, fold all
|
||||
-- the values belonging to each key. Useful for map/reduce, bucketizing the
|
||||
-- input in different bins or for generating histograms.
|
||||
--
|
||||
-- Definition:
|
||||
--
|
||||
-- >>> classify = Fold.classifyWith fst . Fold.lmap snd
|
||||
--
|
||||
-- Example:
|
||||
--
|
||||
-- >>> :{
|
||||
-- let input = Stream.fromList [("ONE",1),("ONE",1.1),("TWO",2), ("TWO",2.2)]
|
||||
-- in Stream.fold (Fold.classify Fold.toList) input
|
||||
-- :}
|
||||
-- fromList [("ONE",[1.0,1.1]),("TWO",[2.0,2.2])]
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE classify #-}
|
||||
classify :: (Monad m, Ord k) => Fold m a b -> Fold m (k, a) (Map k b)
|
||||
classify = classifyWith fst . lmap snd
|
||||
|
||||
-- | Determine the frequency of each element in the stream.
|
||||
--
|
||||
-- You can just collect the keys of the resulting map to get the unique
|
||||
-- elements in the stream.
|
||||
--
|
||||
-- Definition:
|
||||
--
|
||||
-- >>> frequency = Fold.classifyWith id Fold.length
|
||||
--
|
||||
{-# INLINE frequency #-}
|
||||
frequency :: (Monad m, Ord a) => Fold m a (Map a Int)
|
||||
frequency = classifyWith id length
|
@ -12,29 +12,32 @@ module Streamly.Internal.Data.Stream.Exception.Lifted
|
||||
, bracket
|
||||
, bracket3
|
||||
, finally
|
||||
, retry
|
||||
|
||||
-- For IsStream module
|
||||
, afterD
|
||||
, bracket3D
|
||||
, retryD
|
||||
)
|
||||
where
|
||||
|
||||
#include "inline.hs"
|
||||
|
||||
import Control.Exception (SomeException, mask_)
|
||||
import Control.Exception (Exception, SomeException, mask_)
|
||||
import Control.Monad.Catch (MonadCatch)
|
||||
import Data.Map.Strict (Map)
|
||||
import GHC.Exts (inline)
|
||||
import Streamly.Internal.Control.Concurrent
|
||||
(MonadRunInIO, MonadAsync, withRunInIO)
|
||||
import Streamly.Internal.Data.Stream.Type (Stream, fromStreamD, toStreamD)
|
||||
import Streamly.Internal.Data.IOFinalizer.Lifted
|
||||
(newIOFinalizer, runIOFinalizer, clearingIOFinalizer)
|
||||
import Streamly.Internal.Data.Stream.StreamD (Step(..))
|
||||
|
||||
import qualified Control.Monad.Catch as MC
|
||||
import qualified Data.Map.Strict as Map
|
||||
import qualified Streamly.Internal.Data.Stream.StreamD as D
|
||||
|
||||
import Streamly.Internal.Data.Stream.StreamD.Type hiding (Stream)
|
||||
|
||||
-- $setup
|
||||
-- >>> :m
|
||||
-- >>> import qualified Streamly.Internal.Data.Stream.Exception.Lifted as Stream
|
||||
@ -76,7 +79,7 @@ gbracket bef aft onExc onGC ftry action =
|
||||
return (r, ref)
|
||||
return $ Skip $ GBracketIONormal (action r) r ref
|
||||
|
||||
step gst (GBracketIONormal (UnStream step1 st) v ref) = do
|
||||
step gst (GBracketIONormal (D.UnStream step1 st) v ref) = do
|
||||
res <- ftry $ step1 gst st
|
||||
case res of
|
||||
Right r -> case r of
|
||||
@ -93,9 +96,9 @@ gbracket bef aft onExc onGC ftry action =
|
||||
-- the finalizer and have not run the exception handler then we
|
||||
-- may leak the resource.
|
||||
stream <-
|
||||
clearingIOFinalizer ref (onExc v e (UnStream step1 st))
|
||||
clearingIOFinalizer ref (onExc v e (D.UnStream step1 st))
|
||||
return $ Skip (GBracketIOException stream)
|
||||
step gst (GBracketIOException (UnStream step1 st)) = do
|
||||
step gst (GBracketIOException (D.UnStream step1 st)) = do
|
||||
res <- step1 gst st
|
||||
case res of
|
||||
Yield x s ->
|
||||
@ -115,7 +118,7 @@ bracket3D bef aft onExc onGC =
|
||||
gbracket
|
||||
bef
|
||||
aft
|
||||
(\a (e :: SomeException) _ -> onExc a >> return (nilM (MC.throwM e)))
|
||||
(\a (e :: SomeException) _ -> onExc a >> return (D.nilM (MC.throwM e)))
|
||||
onGC
|
||||
(inline MC.try)
|
||||
|
||||
@ -222,3 +225,97 @@ afterD action (D.Stream step state) = D.Stream step' Nothing
|
||||
{-# INLINE after #-}
|
||||
after :: MonadRunInIO m => m b -> Stream m a -> Stream m a
|
||||
after action xs = fromStreamD $ afterD action $ toStreamD xs
|
||||
|
||||
data RetryState emap s1 s2
|
||||
= RetryWithMap emap s1
|
||||
| RetryDefault s2
|
||||
|
||||
-- | See 'Streamly.Internal.Data.Stream.retry'
|
||||
--
|
||||
{-# INLINE_NORMAL retryD #-}
|
||||
retryD
|
||||
:: forall e m a. (Exception e, Ord e, MonadCatch m)
|
||||
=> Map e Int
|
||||
-- ^ map from exception to retry count
|
||||
-> (e -> D.Stream m a)
|
||||
-- ^ default handler for those exceptions that are not in the map
|
||||
-> D.Stream m a
|
||||
-> D.Stream m a
|
||||
retryD emap0 defaultHandler (D.Stream step0 state0) = D.Stream step state
|
||||
|
||||
where
|
||||
|
||||
state = RetryWithMap emap0 state0
|
||||
|
||||
{-# INLINE_LATE step #-}
|
||||
step gst (RetryWithMap emap st) = do
|
||||
eres <- MC.try $ step0 gst st
|
||||
case eres of
|
||||
Left e -> handler e emap st
|
||||
Right res ->
|
||||
return
|
||||
$ case res of
|
||||
Yield x st1 -> Yield x $ RetryWithMap emap st1
|
||||
Skip st1 -> Skip $ RetryWithMap emap st1
|
||||
Stop -> Stop
|
||||
step gst (RetryDefault (D.UnStream step1 state1)) = do
|
||||
res <- step1 gst state1
|
||||
return
|
||||
$ case res of
|
||||
Yield x st1 -> Yield x $ RetryDefault (D.Stream step1 st1)
|
||||
Skip st1 -> Skip $ RetryDefault (D.Stream step1 st1)
|
||||
Stop -> Stop
|
||||
|
||||
{-# INLINE handler #-}
|
||||
handler e emap st =
|
||||
return
|
||||
$ Skip
|
||||
$ case Map.lookup e emap of
|
||||
Just i
|
||||
| i > 0 ->
|
||||
let emap1 = Map.insert e (i - 1) emap
|
||||
in RetryWithMap emap1 st
|
||||
| otherwise -> RetryDefault $ defaultHandler e
|
||||
Nothing -> RetryDefault $ defaultHandler e
|
||||
|
||||
-- | @retry@ takes 3 arguments
|
||||
--
|
||||
-- 1. A map @m@ whose keys are exceptions and values are the number of times to
|
||||
-- retry the action given that the exception occurs.
|
||||
--
|
||||
-- 2. A handler @han@ that decides how to handle an exception when the exception
|
||||
-- cannot be retried.
|
||||
--
|
||||
-- 3. The stream itself that we want to run this mechanism on.
|
||||
--
|
||||
-- When evaluating a stream if an exception occurs,
|
||||
--
|
||||
-- 1. The stream evaluation aborts
|
||||
--
|
||||
-- 2. The exception is looked up in @m@
|
||||
--
|
||||
-- a. If the exception exists and the mapped value is > 0 then,
|
||||
--
|
||||
-- i. The value is decreased by 1.
|
||||
--
|
||||
-- ii. The stream is resumed from where the exception was called, retrying
|
||||
-- the action.
|
||||
--
|
||||
-- b. If the exception exists and the mapped value is == 0 then the stream
|
||||
-- evaluation stops.
|
||||
--
|
||||
-- c. If the exception does not exist then we handle the exception using
|
||||
-- @han@.
|
||||
--
|
||||
-- /Internal/
|
||||
--
|
||||
{-# INLINE retry #-}
|
||||
retry :: (MonadCatch m, Exception e, Ord e)
|
||||
=> Map e Int
|
||||
-- ^ map from exception to retry count
|
||||
-> (e -> Stream m a)
|
||||
-- ^ default handler for those exceptions that are not in the map
|
||||
-> Stream m a
|
||||
-> Stream m a
|
||||
retry emap handler inp =
|
||||
fromStreamD $ retryD emap (toStreamD . handler) $ toStreamD inp
|
||||
|
164
src/Streamly/Internal/Data/Stream/Extra.hs
Normal file
164
src/Streamly/Internal/Data/Stream/Extra.hs
Normal file
@ -0,0 +1,164 @@
|
||||
-- |
|
||||
-- Module : Streamly.Internal.Data.Stream.Extra
|
||||
-- Copyright : (c) 2019 Composewell Technologies
|
||||
-- License : BSD-3-Clause
|
||||
-- Maintainer : streamly@composewell.com
|
||||
-- Stability : experimental
|
||||
-- Portability : GHC
|
||||
|
||||
module Streamly.Internal.Data.Stream.Extra
|
||||
(
|
||||
nub
|
||||
, joinInnerMap
|
||||
, joinLeftMap
|
||||
, joinOuterMap
|
||||
)
|
||||
where
|
||||
|
||||
#include "inline.hs"
|
||||
|
||||
import Control.Monad.IO.Class (MonadIO)
|
||||
import Streamly.Internal.Data.Stream.StreamD.Step (Step(..))
|
||||
import Streamly.Internal.Data.Stream.Type (Stream)
|
||||
|
||||
import qualified Data.Map.Strict as Map
|
||||
import qualified Data.Set as Set
|
||||
import qualified Streamly.Data.Fold as Fold
|
||||
import qualified Streamly.Data.Stream as Stream
|
||||
import qualified Streamly.Internal.Data.Stream as Stream (concatM)
|
||||
import qualified Streamly.Internal.Data.Stream.StreamD as D
|
||||
|
||||
-- $setup
|
||||
-- >>> :m
|
||||
-- >>> import qualified Streamly.Internal.Data.Stream as Stream
|
||||
|
||||
-- | The memory used is proportional to the number of unique elements in the
|
||||
-- stream. If we want to limit the memory we can just use "take" to limit the
|
||||
-- uniq elements in the stream.
|
||||
{-# INLINE_NORMAL nub #-}
|
||||
nub :: (Monad m, Ord a) => D.Stream m a -> D.Stream m a
|
||||
nub (D.Stream step1 state1) = D.Stream step (Set.empty, state1)
|
||||
|
||||
where
|
||||
|
||||
step gst (set, st) = do
|
||||
r <- step1 gst st
|
||||
return
|
||||
$ case r of
|
||||
Yield x s ->
|
||||
if Set.member x set
|
||||
then Skip (set, s)
|
||||
else Yield x (Set.insert x set, s)
|
||||
Skip s -> Skip (set, s)
|
||||
Stop -> Stop
|
||||
|
||||
-- XXX Generate error if a duplicate insertion is attempted?
|
||||
toMap :: (Monad m, Ord k) => Stream m (k, v) -> m (Map.Map k v)
|
||||
toMap =
|
||||
let f = Fold.foldl' (\kv (k, b) -> Map.insert k b kv) Map.empty
|
||||
in Stream.fold f
|
||||
|
||||
-- If the second stream is too big it can be partitioned based on hashes and
|
||||
-- then we can process one parition at a time.
|
||||
--
|
||||
-- XXX An IntMap may be faster when the keys are Int.
|
||||
-- XXX Use hashmap instead of map?
|
||||
--
|
||||
-- | Like 'joinInner' but uses a 'Map' for efficiency.
|
||||
--
|
||||
-- If the input streams have duplicate keys, the behavior is undefined.
|
||||
--
|
||||
-- For space efficiency use the smaller stream as the second stream.
|
||||
--
|
||||
-- Space: O(n)
|
||||
--
|
||||
-- Time: O(m + n)
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE joinInnerMap #-}
|
||||
joinInnerMap :: (Monad m, Ord k) =>
|
||||
Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, b)
|
||||
joinInnerMap s1 s2 =
|
||||
Stream.concatM $ do
|
||||
km <- toMap s2
|
||||
pure $ Stream.mapMaybe (joinAB km) s1
|
||||
|
||||
where
|
||||
|
||||
joinAB kvm (k, a) =
|
||||
case k `Map.lookup` kvm of
|
||||
Just b -> Just (k, a, b)
|
||||
Nothing -> Nothing
|
||||
|
||||
-- | Like 'joinLeft' but uses a hashmap for efficiency.
|
||||
--
|
||||
-- Space: O(n)
|
||||
--
|
||||
-- Time: O(m + n)
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE joinLeftMap #-}
|
||||
joinLeftMap :: (Ord k, Monad m) =>
|
||||
Stream m (k, a) -> Stream m (k, b) -> Stream m (k, a, Maybe b)
|
||||
joinLeftMap s1 s2 =
|
||||
Stream.concatM $ do
|
||||
km <- toMap s2
|
||||
return $ fmap (joinAB km) s1
|
||||
|
||||
where
|
||||
|
||||
joinAB km (k, a) =
|
||||
case k `Map.lookup` km of
|
||||
Just b -> (k, a, Just b)
|
||||
Nothing -> (k, a, Nothing)
|
||||
|
||||
-- Put the b's that have been paired, in another hash or mutate the hash to set
|
||||
-- a flag. At the end go through @Stream m b@ and find those that are not in that
|
||||
-- hash to return (Nothing, b).
|
||||
--
|
||||
-- | Like 'joinOuter' but uses a 'Map' for efficiency.
|
||||
--
|
||||
-- Space: O(m + n)
|
||||
--
|
||||
-- Time: O(m + n)
|
||||
--
|
||||
-- /Pre-release/
|
||||
{-# INLINE joinOuterMap #-}
|
||||
joinOuterMap ::
|
||||
(Ord k, MonadIO m) =>
|
||||
Stream m (k, a) -> Stream m (k, b) -> Stream m (k, Maybe a, Maybe b)
|
||||
joinOuterMap s1 s2 =
|
||||
Stream.concatM $ do
|
||||
km1 <- kvFold s1
|
||||
km2 <- kvFold s2
|
||||
|
||||
-- XXX Not sure if toList/fromList would fuse optimally. We may have to
|
||||
-- create a fused Map.toStream function.
|
||||
let res1 = fmap (joinAB km2)
|
||||
$ Stream.fromList $ Map.toList km1
|
||||
where
|
||||
joinAB km (k, a) =
|
||||
case k `Map.lookup` km of
|
||||
Just b -> (k, Just a, Just b)
|
||||
Nothing -> (k, Just a, Nothing)
|
||||
|
||||
-- XXX We can take advantage of the lookups in the first pass above to
|
||||
-- reduce the number of lookups in this pass. If we keep mutable cells
|
||||
-- in the second Map, we can flag it in the first pass and not do any
|
||||
-- lookup in the second pass if it is flagged.
|
||||
let res2 = Stream.mapMaybe (joinAB km1)
|
||||
$ Stream.fromList $ Map.toList km2
|
||||
where
|
||||
joinAB km (k, b) =
|
||||
case k `Map.lookup` km of
|
||||
Just _ -> Nothing
|
||||
Nothing -> Just (k, Nothing, Just b)
|
||||
|
||||
return $ Stream.append res1 res2
|
||||
|
||||
where
|
||||
|
||||
-- XXX Generate error if a duplicate insertion is attempted?
|
||||
kvFold =
|
||||
let f = Fold.foldl' (\kv (k, b) -> Map.insert k b kv) Map.empty
|
||||
in Stream.fold f
|
@ -280,7 +280,7 @@ relTimesWith = fmap snd . timesWith
|
||||
--
|
||||
-- >>> concatFold = Prelude.foldl Stream.foldContinue Fold.sum
|
||||
--
|
||||
-- >>> fold f = Fold.finish . Stream.foldContinue f
|
||||
-- >>> fold f = Fold.extractM . Stream.foldContinue f
|
||||
--
|
||||
-- /Internal/
|
||||
{-# INLINE foldContinue #-}
|
||||
|
@ -40,12 +40,12 @@ import qualified Streamly.Internal.Data.Stream.StreamD.Exception as D
|
||||
, finally_
|
||||
, ghandle
|
||||
, handle
|
||||
, retry
|
||||
)
|
||||
|
||||
import qualified Streamly.Internal.Data.Stream.Exception.Lifted as D
|
||||
( afterD
|
||||
, bracket3D
|
||||
, retryD
|
||||
)
|
||||
|
||||
|
||||
@ -281,4 +281,4 @@ retry :: (IsStream t, MonadCatch m, Exception e, Ord e)
|
||||
-> t m a
|
||||
-> t m a
|
||||
retry emap handler inp =
|
||||
fromStreamD $ D.retry emap (toStreamD . handler) $ toStreamD inp
|
||||
fromStreamD $ D.retryD emap (toStreamD . handler) $ toStreamD inp
|
||||
|
@ -709,7 +709,7 @@ classifySessionsByGeneric _ tick reset ejectPred tmout
|
||||
-- session is created for that key.
|
||||
--
|
||||
-- >>> :{
|
||||
-- Stream.fold (Fold.drainBy print)
|
||||
-- Stream.fold (Fold.drainMapM print)
|
||||
-- $ Concur.classifySessionsBy 1 False (const (return False)) 3 (Fold.take 3 Fold.toList)
|
||||
-- $ Stream.timestamped
|
||||
-- $ Stream.delay 0.1
|
||||
@ -815,7 +815,7 @@ classifySessionsOf = classifySessionsBy 1 False
|
||||
--
|
||||
{-# INLINE sampleIntervalEnd #-}
|
||||
sampleIntervalEnd :: MonadAsync m => Double -> Stream m a -> Stream m a
|
||||
sampleIntervalEnd n = Stream.catMaybes . intervalsOf n Fold.last
|
||||
sampleIntervalEnd n = Stream.catMaybes . intervalsOf n Fold.latest
|
||||
|
||||
-- | Like 'sampleInterval' but samples at the beginning of the time window.
|
||||
--
|
||||
|
@ -713,7 +713,7 @@ addToWatch cfg@Config{..} watch0@(Watch handle wdMap) root0 path0 = do
|
||||
pathIsDir <- doesDirectoryExist $ utf8ToString absPath
|
||||
when (watchRec && pathIsDir) $ do
|
||||
let f = addToWatch cfg watch0 root . appendPaths path
|
||||
in S.fold (FL.drainBy f)
|
||||
in S.fold (FL.drainMapM f)
|
||||
$ S.mapM toUtf8
|
||||
$ Dir.readDirs $ utf8ToString absPath
|
||||
|
||||
|
@ -436,7 +436,7 @@ utf8ToStringList = NonEmpty.map utf8ToString
|
||||
closePathHandleStream :: Stream IO (HANDLE, FilePath, Config) -> IO ()
|
||||
closePathHandleStream =
|
||||
let f (h, _, _) = closeHandle h
|
||||
in S.fold (Fold.drainBy f)
|
||||
in S.fold (Fold.drainMapM f)
|
||||
|
||||
-- XXX
|
||||
-- Document the path treatment for Linux/Windows/macOS modules.
|
||||
|
@ -1,5 +1,3 @@
|
||||
#include "inline.hs"
|
||||
|
||||
-- |
|
||||
-- Module : Streamly.Internal.Network.Inet.TCP
|
||||
-- Copyright : (c) 2019 Composewell Technologies
|
||||
@ -94,6 +92,8 @@ module Streamly.Internal.Network.Inet.TCP
|
||||
)
|
||||
where
|
||||
|
||||
#include "inline.hs"
|
||||
|
||||
import Control.Exception (onException)
|
||||
import Control.Monad.Catch (MonadCatch, MonadMask, bracket)
|
||||
import Control.Monad.IO.Class (MonadIO(..))
|
||||
@ -122,8 +122,7 @@ import qualified Streamly.Data.Fold as FL
|
||||
import qualified Streamly.Data.Unfold as UF
|
||||
import qualified Streamly.Internal.Data.Unfold as UF (bracketIO, first)
|
||||
import qualified Streamly.Internal.Data.Array.Unboxed.Stream as AS
|
||||
import qualified Streamly.Internal.Data.Fold.Type as FL
|
||||
(initialize, snoc, Step(..))
|
||||
import qualified Streamly.Internal.Data.Fold.Type as FL (Step(..))
|
||||
import qualified Streamly.Internal.Data.Stream as S
|
||||
import qualified Streamly.Internal.Data.Stream.Exception.Lifted as S
|
||||
import qualified Streamly.Internal.Network.Socket as ISK
|
||||
@ -355,7 +354,7 @@ writeChunks addr port = Fold step initial extract
|
||||
where
|
||||
initial = do
|
||||
skt <- liftIO (connect addr port)
|
||||
fld <- FL.initialize (ISK.writeChunks skt)
|
||||
fld <- FL.reduce (ISK.writeChunks skt)
|
||||
`MC.onException` liftIO (Net.close skt)
|
||||
return $ FL.Partial (Tuple' fld skt)
|
||||
step (Tuple' fld skt) x = do
|
||||
|
@ -450,14 +450,14 @@ reader = UF.first defaultChunkSize readerWith
|
||||
{-# INLINE putChunks #-}
|
||||
putChunks :: (MonadIO m, Unbox a)
|
||||
=> Socket -> Stream m (Array a) -> m ()
|
||||
putChunks h = S.fold (FL.drainBy (liftIO . putChunk h))
|
||||
putChunks h = S.fold (FL.drainMapM (liftIO . putChunk h))
|
||||
|
||||
-- | Write a stream of arrays to a socket. Each array in the stream is written
|
||||
-- to the socket as a separate IO request.
|
||||
--
|
||||
{-# INLINE writeChunks #-}
|
||||
writeChunks :: (MonadIO m, Unbox a) => Socket -> Fold m (Array a) ()
|
||||
writeChunks h = FL.drainBy (liftIO . putChunk h)
|
||||
writeChunks h = FL.drainMapM (liftIO . putChunk h)
|
||||
|
||||
-- | @writeChunksWith bufsize socket@ writes a stream of arrays to
|
||||
-- @socket@ after coalescing the adjacent arrays in chunks of @bufsize@.
|
||||
|
@ -337,7 +337,8 @@ library
|
||||
-- dependency order To view dependency graph:
|
||||
-- graphmod | dot -Tps > deps.ps
|
||||
|
||||
Streamly.Internal.Data.IsMap.HashMap
|
||||
Streamly.Internal.Data.IsMap
|
||||
, Streamly.Internal.Data.IsMap.HashMap
|
||||
|
||||
-- streamly-concurrent
|
||||
, Streamly.Internal.Control.Concurrent
|
||||
@ -352,6 +353,7 @@ library
|
||||
, Streamly.Internal.Data.Fold.Concurrent.Channel.Type
|
||||
, Streamly.Internal.Data.Fold.Concurrent.Channel
|
||||
, Streamly.Internal.Data.Fold.Concurrent
|
||||
, Streamly.Internal.Data.Fold.Extra
|
||||
|
||||
, Streamly.Internal.Data.Stream.Concurrent.Channel.Type
|
||||
, Streamly.Internal.Data.Stream.Concurrent.Channel.Dispatcher
|
||||
@ -364,6 +366,7 @@ library
|
||||
, Streamly.Internal.Data.Stream.Zip.Concurrent
|
||||
, Streamly.Internal.Data.Stream.Time
|
||||
, Streamly.Internal.Data.Stream.Exception.Lifted
|
||||
, Streamly.Internal.Data.Stream.Extra
|
||||
|
||||
-- streamly-unicode (depends on unicode-data)
|
||||
, Streamly.Internal.Unicode.Utf8
|
||||
|
@ -25,6 +25,7 @@ import qualified Data.Map
|
||||
import qualified Prelude
|
||||
import qualified Streamly.Internal.Data.Array.Unboxed.Mut as MArray
|
||||
import qualified Streamly.Internal.Data.Fold as Fold
|
||||
import qualified Streamly.Internal.Data.Fold.Extra as Fold
|
||||
import qualified Streamly.Internal.Data.Stream as Stream
|
||||
|
||||
import Prelude hiding
|
||||
|
Loading…
Reference in New Issue
Block a user