Add modular scanning functions for transformations (#1809)

Usable across folds and streams. Implemented common transformations
using scans. Performance of single operation benchmark seems to be
unaffected but in some cases combinations of multiple benchmarks or the
same operation multiple times is degraded.

The perf impact could perhaps be improved if we have a separate Scan
type where we do not need a pre-initialization of state.
This commit is contained in:
Harendra Kumar 2022-09-05 18:17:05 +05:30 committed by GitHub
parent f4de841441
commit 935f69e46e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 626 additions and 195 deletions

View File

@ -93,16 +93,16 @@ sequence_ value =
filter :: Monad m => Int -> Stream m Int -> m ()
filter _ = Stream.fold (FL.filter even FL.drain)
{-# INLINE foldFilter #-}
foldFilter :: Monad m => Int -> Stream m Int -> m ()
foldFilter _ = Stream.fold (FL.foldFilter (FL.satisfy even) FL.drain)
{-# INLINE scanMaybe #-}
scanMaybe :: Monad m => Int -> Stream m Int -> m ()
scanMaybe _ = Stream.fold (FL.scanMaybe (FL.filtering even) FL.drain)
{-# INLINE foldFilter2 #-}
foldFilter2 :: Monad m => Int -> Stream m Int -> m ()
foldFilter2 _ =
{-# INLINE scanMaybe2 #-}
scanMaybe2 :: Monad m => Int -> Stream m Int -> m ()
scanMaybe2 _ =
Stream.fold
$ FL.foldFilter (FL.satisfy even)
$ FL.foldFilter (FL.satisfy odd) FL.drain
$ FL.scanMaybe (FL.filtering even)
$ FL.scanMaybe (FL.filtering odd) FL.drain
-------------------------------------------------------------------------------
-- Splitting by serial application
@ -395,8 +395,8 @@ o_1_space_serial_composition value =
[ bgroup
"composition"
[ benchIOSink value "filter even" $ filter value
, benchIOSink value "foldFilter even" $ foldFilter value
, benchIOSink value "foldFilter even, odd" $ foldFilter2 value
, benchIOSink value "scanMaybe even" $ scanMaybe value
, benchIOSink value "scanMaybe even, odd" $ scanMaybe2 value
, benchIOSink value "foldBreak (recursive)" foldBreak
, benchIOSink value "serialWith (all, any)" $ splitAllAny value
, benchIOSink value "serial_ (all, any)" $ serial_ value

View File

@ -101,7 +101,8 @@ module Streamly.Data.Stream
-- , consM2 -- fused version
-- ** Unfolding
-- | Generalized way of generating a stream efficiently.
-- | 'unfoldrM' is the most general way of generating a stream efficiently.
-- All other generation operations can be expressed using it.
, unfoldr
, unfoldrM
@ -226,6 +227,8 @@ module Streamly.Data.Stream
, uncons
-- ** Folding
-- XXX Need to have a general parse operation here which can be used to
-- express all others.
, fold -- XXX rename to run? We can have a Stream.run and Fold.run.
-- XXX fold1 can be achieved using Monoids or Refolds.
, foldBreak
@ -292,6 +295,8 @@ module Streamly.Data.Stream
-- | Remove elements from the stream.
-- ** Stateless Filters
-- | 'mapMaybeM' is the most general stateless filtering operation. All
-- other filtering operations can be expressed using it.
-- EXPLANATION:
-- In imperative terms a filter over a stream corresponds to a loop with a
-- @continue@ clause for the cases when the predicate fails.
@ -302,6 +307,11 @@ module Streamly.Data.Stream
, filterM
-- ** Stateful Filters
-- | 'scanMaybe' is the most general stateful filtering operation. The
-- filtering folds (folds returning a 'Maybe' type) in
-- "Streamly.Internal.Data.Fold" can be used along with 'scanMaybe' to
-- perform stateful filtering operations in general.
, scanMaybe
, take
, takeWhile
, takeWhileM
@ -389,6 +399,7 @@ module Streamly.Data.Stream
-- * Buffered Operations
-- | Operations that require buffering of the stream.
-- Reverse is essentially a left fold followed by an unfold.
, reverse
-- * Multi-Stream folds
@ -414,6 +425,8 @@ module Streamly.Data.Stream
, handle
-- * Resource Management
-- | 'bracket' is the most general resource management operation, all other
-- operations can be expressed using it.
, before
, after
, finally

View File

@ -36,6 +36,11 @@ module Streamly.Internal.Data.Fold
, foldr
, foldrM
-- * Mappers
-- | Monadic functions useful with mapM/lmapM on folds or streams.
, tracing
, trace
-- * Folds
-- ** Identity
, fromPure
@ -51,7 +56,6 @@ module Streamly.Internal.Data.Fold
-- *** Reducers
, drain
, drainBy
, last
, the
, length
, genericLength
@ -63,7 +67,6 @@ module Streamly.Internal.Data.Fold
, rollingHashWithSalt
, rollingHashFirstN
-- , rollingHashLastN
, rollingMapM
-- *** Saturating Reducers
-- | 'product' terminates if it becomes 0. Other folds can theoretically
@ -86,15 +89,41 @@ module Streamly.Internal.Data.Fold
, toStream
, toStreamRev
, toMap
, topBy
, top
, bottom
-- *** Scanners
-- | Stateful transformation of the elements. Useful in combination with
-- the 'scanMaybe' combinator. For scanners the result of the fold is
-- usually a transformation of the current element rather than an
-- aggregation of all elements till now.
, last -- XXX prev
-- , nthLast -- using Ring array
, indexingWith
, indexing
, indexingRev
, rollingMapM
-- *** Filters
-- | Useful in combination with the 'scanMaybe' combinator.
, filtering
, deleteBy
, nub
, nubInt
, uniqBy
, uniq
, repeated
, findIndices
, elemIndices
-- ** Terminating Folds
-- Element folds. Terminate after inspecting one element. All these can be
-- implemented in terms of the 'maybe' fold.
, head
, one
, null
, satisfy
, maybe
-- Sequence folds. Terminate after inspecting a sequence of elements.
@ -115,6 +144,18 @@ module Streamly.Internal.Data.Fold
, and
, or
-- ** Trimmers
-- | Useful in combination with the 'scanMaybe' combinator.
, taking
, dropping
, takingEndByM
, takingEndBy
, takingEndByM_
, takingEndBy_
, droppingWhileM
, droppingWhile
, prune
-- * Combinators
-- ** Utilities
, with
@ -131,42 +172,35 @@ module Streamly.Internal.Data.Fold
, lmap
--, lsequence
, lmapM
-- ** Sliding Window
, slide2
-- ** Scanning Input
, scan
, scanMany
, postscan
, indexed
, trace
-- ** Zipping Input
, zipWithM
, zip
-- ** Filtering
-- ** Filtering Input
, catMaybes
, mapMaybeM
, mapMaybe
, scanMaybe
, filter
, filterM
, foldFilter
, satisfy
, sampleFromthen
-- , ldeleteBy
-- , luniq
, nub
, nubInt
-- ** Mapping Filters
, catMaybes
, mapMaybe
-- , mapMaybeM
-- Either streams
, lefts
, rights
, both
-- ** Scanning Filters
, findIndices
{-
, elemIndices
-- ** Insertion
-- | Insertion adds more elements to the stream.
@ -304,10 +338,10 @@ import Data.Functor.Identity (Identity(..))
import Data.Int (Int64)
import Data.IORef (newIORef, readIORef, writeIORef)
import Data.Map.Strict (Map)
import Data.Maybe (isJust, fromJust)
import Data.Word (Word32)
import Foreign.Storable (peek, sizeOf)
import Streamly.Internal.Data.IsMap (IsMap(..))
import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe)
import Streamly.Internal.Data.Pipe.Type (Pipe (..), PipeState(..))
import Streamly.Internal.Data.Unboxed (Unboxed)
import Streamly.Internal.Data.Unfold.Type (Unfold(..))
@ -339,17 +373,18 @@ import Streamly.Internal.Data.Fold.Type
-- >>> :m
-- >>> :set -package streamly
-- >>> import Prelude hiding (break, map, span, splitAt)
-- >>> import qualified Streamly.Data.Stream as Stream
-- >>> import qualified Streamly.Data.Array.Unboxed as Array
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Data.Stream as Stream
-- >>> import qualified Streamly.Internal.Data.Array.Unboxed.Mut.Type as MA
-- >>> import qualified Streamly.Internal.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Fold.Type as Fold
-- >>> import qualified Streamly.Internal.Data.Fold.Window as FoldW
-- >>> import qualified Streamly.Data.Array.Unboxed as Array
-- >>> import qualified Streamly.Internal.Data.Parser as Parser
-- >>> import qualified Streamly.Internal.Data.Stream.Type as Stream
-- >>> import qualified Streamly.Internal.Data.Unfold as Unfold
-- >>> import Streamly.Internal.Data.Stream.Type (Stream)
-- >>> import Data.IORef (newIORef, readIORef, writeIORef)
-- >>> import qualified Streamly.Internal.Data.Array.Unboxed.Mut.Type as MA
------------------------------------------------------------------------------
-- hoist
@ -390,10 +425,20 @@ sequence = rmapM id
mapM :: Monad m => (b -> m c) -> Fold m a b -> Fold m a c
mapM = rmapM
-- |
-- >>> mapMaybeM f = Fold.lmapM f . Fold.catMaybes
--
{-# INLINE mapMaybeM #-}
mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Fold m b r -> Fold m a r
mapMaybeM f = lmapM f . catMaybes
-- | @mapMaybe f fold@ maps a 'Maybe' returning function @f@ on the input of
-- the fold, filters out 'Nothing' elements, and return the values extracted
-- from 'Just'.
--
-- >>> mapMaybe f = Fold.lmap f . Fold.catMaybes
-- >>> mapMaybe f = Fold.mapMaybeM (return . f)
--
-- >>> f x = if even x then Just x else Nothing
-- >>> fld = Fold.mapMaybe f Fold.toList
-- >>> Stream.fold fld (Stream.enumerateFromTo 1 10)
@ -401,13 +446,25 @@ mapM = rmapM
--
-- @since 0.8.0
{-# INLINE mapMaybe #-}
mapMaybe :: (Monad m) => (a -> Maybe b) -> Fold m b r -> Fold m a r
mapMaybe f = lmap f . filter isJust . lmap fromJust
mapMaybe :: Monad m => (a -> Maybe b) -> Fold m b r -> Fold m a r
mapMaybe f = lmap f . catMaybes
------------------------------------------------------------------------------
-- Transformations on fold inputs
------------------------------------------------------------------------------
-- | Apply a monadic function on the input and return the input.
--
-- >>> Stream.fold (Fold.lmapM (Fold.tracing print) Fold.drain) $ (Stream.enumerateFromTo (1 :: Int) 2)
-- 1
-- 2
--
-- /Pre-release/
--
{-# INLINE tracing #-}
tracing :: Monad m => (a -> m b) -> (a -> m a)
tracing f x = void (f x) >> return x
-- | Apply a monadic function to each element flowing through and discard the
-- results.
--
@ -415,10 +472,12 @@ mapMaybe f = lmap f . filter isJust . lmap fromJust
-- 1
-- 2
--
-- >>> trace f = Fold.lmapM (Fold.tracing f)
--
-- /Pre-release/
{-# INLINE trace #-}
trace :: Monad m => (a -> m b) -> Fold m a r -> Fold m a r
trace f = lmapM (\x -> void (f x) >> return x)
trace f = lmapM (tracing f)
-- rename to lpipe?
--
@ -516,82 +575,110 @@ scan = scanWith False
scanMany :: Monad m => Fold m a b -> Fold m b c -> Fold m a c
scanMany = scanWith True
-- | Postscan the input of a 'Fold' to change it in a stateful manner using
-- another 'Fold'.
-- /Pre-release/
{-# INLINE postscan #-}
postscan :: Monad m => Fold m a b -> Fold m b c -> Fold m a c
postscan (Fold stepL initialL extractL) (Fold stepR initialR extractR) =
Fold step initial extract
where
{-# INLINE runStep #-}
runStep actionL sR = do
rL <- actionL
case rL of
Done bL -> do
rR <- stepR sR bL
case rR of
Partial sR1 -> Done <$> extractR sR1
Done bR -> return $ Done bR
Partial sL -> do
!b <- extractL sL
rR <- stepR sR b
return
$ case rR of
Partial sR1 -> Partial (sL, sR1)
Done bR -> Done bR
initial = do
r <- initialR
rL <- initialL
case r of
Partial sR ->
case rL of
Done _ -> Done <$> extractR sR
Partial sL -> return $ Partial (sL, sR)
Done b -> return $ Done b
step (sL, sR) x = runStep (stepL sL x) sR
extract = extractR . snd
------------------------------------------------------------------------------
-- Filters
------------------------------------------------------------------------------
-- | Convert a predicate into a filtering fold.
-- | A filtering scan that deletes the first occurrence of the element in the
-- stream that satisfies the given equality predicate.
--
-- >>> f = Fold.foldFilter (Fold.satisfy (> 5)) Fold.sum
-- >>> Stream.fold f $ Stream.fromList [1..10]
-- 40
-- >>> input = Stream.unfold Unfold.fromList [1,3,3,5]
-- >>> Stream.fold Fold.toList $ Stream.scanMaybe (Fold.deleteBy (==) 3) input
-- [1,3,5]
--
-- /Pre-release/
{-# INLINE satisfy #-}
satisfy :: Monad m => (a -> Bool) -> Fold m a (Maybe a)
satisfy f = Fold step (return $ Partial ()) (const (return Nothing))
{-# INLINE_NORMAL deleteBy #-}
deleteBy :: Monad m => (a -> a -> Bool) -> a -> Fold m a (Maybe a)
deleteBy eq x0 = fmap extract $ foldl' step (Tuple' False Nothing)
where
step () a = return $ Done $ if f a then Just a else Nothing
step (Tuple' False _) x =
if eq x x0
then Tuple' True Nothing
else Tuple' False (Just x)
step (Tuple' True _) x = Tuple' True (Just x)
-- | Use a 'Maybe' returning fold as a filtering scan.
extract (Tuple' _ x) = x
-- | Provide a sliding window of length 2 elements.
--
-- >>> f = Fold.foldFilter (Fold.satisfy (> 5)) Fold.sum
-- >>> Stream.fold f $ Stream.fromList [1..10]
-- 40
-- See "Streamly.Internal.Data.Fold.Window".
--
-- The above snippet is equivalent to:
{-# INLINE slide2 #-}
slide2 :: Monad m => Fold m (a, Maybe a) b -> Fold m a b
slide2 (Fold step1 initial1 extract1) = Fold step initial extract
where
initial =
first (Tuple' Nothing) <$> initial1
step (Tuple' prev s) cur =
first (Tuple' (Just cur)) <$> step1 s (cur, prev)
extract (Tuple' _ s) = extract1 s
-- | Drop repeated elements that are adjacent to each other using the supplied
-- comparison function.
--
-- >>> f = Fold.filter (> 5) Fold.sum
-- >>> Stream.fold f $ Stream.fromList [1..10]
-- 40
-- To strip duplicate path separators:
--
-- >>> input = Stream.unfold Unfold.fromList "//a//b"
-- >>> f x y = x == '/' && y == '/'
-- >>> Stream.fold Fold.toList $ Stream.scanMaybe (Fold.uniqBy f) input
-- "/a/b"
--
-- Space: @O(1)@
--
-- See also: 'nubBy'.
--
-- /Pre-release/
{-# INLINE foldFilter #-}
foldFilter :: Monad m => Fold m a (Maybe b) -> Fold m b c -> Fold m a c
foldFilter f1 f2 = many f1 (catMaybes f2)
--
{-# INLINE uniqBy #-}
uniqBy :: Monad m => (a -> a -> Bool) -> Fold m a (Maybe a)
uniqBy eq = rollingMap f
where
f pre curr =
case pre of
Nothing -> Just curr
Just x -> if x `eq` curr then Nothing else Just curr
-- | Drop repeated elements that are adjacent to each other.
--
-- >>> uniq = Fold.uniqBy (==)
--
{-# INLINE uniq #-}
uniq :: (Monad m, Eq a) => Fold m a (Maybe a)
uniq = uniqBy (==)
-- | Strip all leading and trailing occurrences of an element passing a
-- predicate and make all other consecutive occurrences uniq.
--
-- >> prune p = Stream.dropWhileAround p $ Stream.uniqBy (x y -> p x && p y)
--
-- @
-- > Stream.prune isSpace (Stream.fromList " hello world! ")
-- "hello world!"
--
-- @
--
-- Space: @O(1)@
--
-- /Unimplemented/
{-# INLINE prune #-}
prune ::
-- (Monad m, Eq a) =>
(a -> Bool) -> Fold m a (Maybe a)
prune = error "Not implemented yet!"
-- | Emit only repeated elements, once.
--
-- /Unimplemented/
repeated :: -- (Monad m, Eq a) =>
Fold m a (Maybe a)
repeated = error "Not implemented yet!"
-- | Used as a scan. Returns 'Just' for the first occurrence of an element,
-- returns 'Nothing' for any other occurrences.
@ -960,6 +1047,9 @@ rollingHash = rollingHashWithSalt defaultSalt
rollingHashFirstN :: (Monad m, Enum a) => Int -> Fold m a Int64
rollingHashFirstN n = take n rollingHash
-- XXX Compare this with the implementation in Fold.Window, preferrably use the
-- latter if performance is good.
-- | Apply a function on every two successive elements of a stream. The first
-- argument of the map function is the previous element and the second argument
-- is the current element. When processing the very first element in the
@ -983,6 +1073,10 @@ rollingMapM f = Fold step initial extract
extract = return . snd
{-# INLINE rollingMap #-}
rollingMap :: Monad m => (Maybe a -> a -> b) -> Fold m a b
rollingMap f = rollingMapM (\x y -> return $ f x y)
------------------------------------------------------------------------------
-- Monoidal left folds
------------------------------------------------------------------------------
@ -1122,6 +1216,17 @@ index = genericIndex
maybe :: Monad m => (a -> Maybe b) -> Fold m a (Maybe b)
maybe f = foldt' (const (Done . f)) (Partial Nothing) id
-- | Test if the next element satisfies the supplied predicate.
--
-- /Pre-release/
{-# INLINE satisfy #-}
satisfy :: Monad m => (a -> Bool) -> Fold m a (Maybe a)
satisfy f = Fold step (return $ Partial ()) (const (return Nothing))
where
step () a = return $ Done $ if f a then Just a else Nothing
-- Naming notes:
--
-- "head" and "next" are two alternative names for the same API. head sounds
@ -1227,6 +1332,7 @@ findIndex predicate = foldt' step (Partial 0) (const Nothing)
{-# INLINE findIndices #-}
findIndices :: Monad m => (a -> Bool) -> Fold m a (Maybe Int)
findIndices predicate =
-- XXX implement by combining indexing and filtering scans
fmap (either (const Nothing) Just) $ foldl' step (Left (-1))
where
@ -1236,6 +1342,15 @@ findIndices predicate =
then Right (either id id i + 1)
else Left (either id id i + 1)
-- | Find all the indices where the value of the element in the stream is equal
-- to the given value.
--
-- >>> elemIndices a = Fold.findIndices (== a)
--
{-# INLINE elemIndices #-}
elemIndices :: (Monad m, Eq a) => a -> Fold m a (Maybe Int)
elemIndices a = findIndices (== a)
-- | Returns the first index where a given value is found in the stream.
--
-- > elemIndex a = Fold.findIndex (== a)
@ -1393,6 +1508,73 @@ splitAt n fld = serialWith (,) (take n fld)
-- Binary APIs
------------------------------------------------------------------------------
{-# INLINE takingEndByM #-}
takingEndByM :: Monad m => (a -> m Bool) -> Fold m a (Maybe a)
takingEndByM p = Fold step initial (return . toMaybe)
where
initial = return $ Partial Nothing'
step _ a = do
r <- p a
return
$ if r
then Done $ Just a
else Partial $ Just' a
-- |
--
-- >>> takingEndBy p = Fold.takingEndByM (return . p)
--
{-# INLINE takingEndBy #-}
takingEndBy :: Monad m => (a -> Bool) -> Fold m a (Maybe a)
takingEndBy p = takingEndByM (return . p)
{-# INLINE takingEndByM_ #-}
takingEndByM_ :: Monad m => (a -> m Bool) -> Fold m a (Maybe a)
takingEndByM_ p = Fold step initial (return . toMaybe)
where
initial = return $ Partial Nothing'
step _ a = do
r <- p a
return
$ if r
then Done Nothing
else Partial $ Just' a
-- |
--
-- >>> takingEndBy_ p = Fold.takingEndByM_ (return . p)
--
{-# INLINE takingEndBy_ #-}
takingEndBy_ :: Monad m => (a -> Bool) -> Fold m a (Maybe a)
takingEndBy_ p = takingEndByM_ (return . p)
{-# INLINE droppingWhileM #-}
droppingWhileM :: Monad m => (a -> m Bool) -> Fold m a (Maybe a)
droppingWhileM p = Fold step initial (return . toMaybe)
where
initial = return $ Partial Nothing'
step Nothing' a = do
r <- p a
return
$ Partial
$ if r
then Nothing'
else Just' a
step _ a = return $ Partial $ Just' a
{-# INLINE droppingWhile #-}
droppingWhile :: Monad m => (a -> Bool) -> Fold m a (Maybe a)
droppingWhile p = droppingWhileM (return . p)
-- Note: Keep this consistent with S.splitOn. In fact we should eliminate
-- S.splitOn in favor of the fold.
--
@ -1414,6 +1596,7 @@ splitAt n fld = serialWith (,) (take n fld)
-- @since 0.8.0
{-# INLINE takeEndBy_ #-}
takeEndBy_ :: Monad m => (a -> Bool) -> Fold m a b -> Fold m a b
-- takeEndBy_ predicate = scanMaybe (takingEndBy_ predicate)
takeEndBy_ predicate (Fold fstep finitial fextract) =
Fold step finitial fextract
@ -1441,6 +1624,7 @@ takeEndBy_ predicate (Fold fstep finitial fextract) =
-- @since 0.8.0
{-# INLINE takeEndBy #-}
takeEndBy :: Monad m => (a -> Bool) -> Fold m a b -> Fold m a b
-- takeEndBy predicate = scanMaybe (takingEndBy predicate)
takeEndBy predicate (Fold fstep finitial fextract) =
Fold step finitial fextract
@ -2469,11 +2653,30 @@ zip = zipWithM (curry return)
-- | Pair each element of a fold input with its index, starting from index 0.
--
-- /Unimplemented/
{-# INLINE indexingWith #-}
indexingWith :: Monad m => Int -> (Int -> Int) -> Fold m a (Maybe (Int, a))
indexingWith i f = fmap toMaybe $ foldl' step initial
where
initial = Nothing'
step Nothing' a = Just' (i, a)
step (Just' (n, _)) a = Just' (f n, a)
{-# INLINE indexing #-}
indexing :: Monad m => Fold m a (Maybe (Int, a))
indexing = indexingWith 0 (+ 1)
{-# INLINE indexingRev #-}
indexingRev :: Monad m => Int -> Fold m a (Maybe (Int, a))
indexingRev n = indexingWith n (subtract 1)
-- | Pair each element of a fold input with its index, starting from index 0.
--
{-# INLINE indexed #-}
indexed :: -- forall m a b. Monad m =>
Fold m (Int, a) b -> Fold m a b
indexed = undefined -- zip (Stream.enumerateFrom 0 :: Stream m Int)
indexed :: Monad m => Fold m (Int, a) b -> Fold m a b
indexed = scanMaybe indexing
-- | Change the predicate function of a Fold from @a -> b@ to accept an
-- additional state input @(s, a) -> b@. Convenient to filter with an
@ -2493,10 +2696,12 @@ with ::
-> (((s, a) -> c) -> Fold m a b -> Fold m a b)
with f comb g = f . comb g . lmap snd
-- XXX Implement as a filter
-- sampleFromthen :: Monad m => Int -> Int -> Fold m a (Maybe a)
-- | @sampleFromthen offset stride@ samples the element at @offset@ index and
-- then every element at strides of @stride@.
--
-- /Unimplemented/
{-# INLINE sampleFromthen #-}
sampleFromthen :: Monad m => Int -> Int -> Fold m a b -> Fold m a b
sampleFromthen offset size =

View File

@ -12,9 +12,22 @@
-- We can classify stream consumers in the following categories in order of
-- increasing complexity and power:
--
-- == Folds that never terminate
-- * Accumulators: Tee/Zip is simple, cannot be appended, good for scanning.
-- * Terminating folds: Tee/Zip varies based on termination, can be appended,
-- good for scanning, nesting (many) is easy.
-- * Non-failing (backtracking only) parsers: cannot be used as scans because
-- of backtracking, nesting is complicated because of backtracking, appending
-- is efficient because of no Alternative, Alternative does not make sense
-- because it cannot fail.
-- * Parsers: Alternative on failure, appending is not as efficient because of
-- buffering for Alternative.
--
-- An Accumulator is the simplest type of fold, it never fails and never
-- First two are represented by the 'Fold' type and the last two by the
-- 'Parser' type.
--
-- == Folds that never terminate (Accumulators)
--
-- An @Accumulator@ is the simplest type of fold, it never fails and never
-- terminates. It can always accept more inputs (never terminates) and the
-- accumulator is always valid. For example 'Streamly.Internal.Data.Fold.sum'.
-- Traditional Haskell left folds like 'foldl' are accumulators.
@ -40,7 +53,7 @@
--
-- == Folds that terminate after one or more input
--
-- Terminating folds are accumulators that can terminate, like accumulators
-- @Terminating folds@ are accumulators that can terminate, like accumulators
-- they do not fail. Once a fold terminates it no longer accepts any more
-- inputs. Terminating folds can be appended, the next fold can be
-- applied after the first one terminates. Because they cannot fail, they do
@ -130,7 +143,9 @@
-- wordBy
-- @
--
-- However, it creates several complications.
-- However, it creates several complications. The most important one is that we
-- cannot use such folds for scanning. We cannot backtrack after producing an
-- output in a scan.
--
-- === Nested backtracking
--
@ -349,17 +364,22 @@ module Streamly.Internal.Data.Fold.Type
-- ** Mapping Input
, lmap
, lmapM
, postscan
-- ** Filtering
, filter
, filterM
, catMaybes
, scanMaybe
, filter
, filtering
, filterM
, lefts
, rights
, both
-- ** Trimming
, take
, taking
, dropping
-- ** Sequential application
, serialWith -- rename to "append"
@ -395,10 +415,10 @@ module Streamly.Internal.Data.Fold.Type
where
#include "inline.hs"
import Control.Monad ((>=>))
import Data.Bifunctor (Bifunctor(..))
import Data.Either (fromLeft, fromRight, isLeft, isRight)
import Data.Maybe (isJust, fromJust)
import Fusion.Plugin.Types (Fuse(..))
import Streamly.Internal.Data.Fold.Step (Step(..), mapMStep, chainStepM)
import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe)
@ -413,6 +433,7 @@ import Prelude hiding (concatMap, filter, foldr, map, take)
-- >>> :m
-- >>> :set -XFlexibleContexts
-- >>> :set -package streamly
-- >>> import Data.Maybe (fromJust, isJust)
-- >>> import Streamly.Data.Fold (Fold)
-- >>> import Streamly.Internal.Data.Stream.Type (Stream)
-- >>> import qualified Data.Foldable as Foldable
@ -602,7 +623,7 @@ foldt' step initial extract =
-- | Make a terminating fold with an effectful step function and initial state,
-- and a state extraction function.
--
-- > mkFoldM = Fold
-- >>> foldtM' = Fold.Fold
--
-- We can just use 'Fold' but it is provided for completeness.
--
@ -1110,26 +1131,113 @@ lmapM f (Fold step begin done) = Fold step' begin done
where
step' x a = f a >>= step x
-- | Postscan the input of a 'Fold' to change it in a stateful manner using
-- another 'Fold'.
--
-- @postscan scanner collector@
--
-- /Pre-release/
{-# INLINE postscan #-}
postscan :: Monad m => Fold m a b -> Fold m b c -> Fold m a c
postscan (Fold stepL initialL extractL) (Fold stepR initialR extractR) =
Fold step initial extract
where
{-# INLINE runStep #-}
runStep actionL sR = do
rL <- actionL
case rL of
Done bL -> do
rR <- stepR sR bL
case rR of
Partial sR1 -> Done <$> extractR sR1
Done bR -> return $ Done bR
Partial sL -> do
!b <- extractL sL
rR <- stepR sR b
return
$ case rR of
Partial sR1 -> Partial (sL, sR1)
Done bR -> Done bR
initial = do
r <- initialR
rL <- initialL
case r of
Partial sR ->
case rL of
Done _ -> Done <$> extractR sR
Partial sL -> return $ Partial (sL, sR)
Done b -> return $ Done b
step (sL, sR) x = runStep (stepL sL x) sR
extract = extractR . snd
------------------------------------------------------------------------------
-- Filtering
------------------------------------------------------------------------------
-- | Modify a fold to receive a 'Maybe' input, the 'Just' values are unwrapped
-- and sent to the original fold, 'Nothing' values are discarded.
--
-- >>> catMaybes = Fold.mapMaybe id
-- >>> catMaybes = Fold.filter isJust . Fold.lmap fromJust
--
-- @since 0.8.0
{-# INLINE_NORMAL catMaybes #-}
catMaybes :: Monad m => Fold m a b -> Fold m (Maybe a) b
catMaybes (Fold step initial extract) = Fold step1 initial extract
where
step1 s a =
case a of
Nothing -> return $ Partial s
Just x -> step s x
-- | Use a 'Maybe' returning fold as a filtering scan.
--
-- >>> scanMaybe p f = Fold.postscan p (Fold.catMaybes f)
--
-- /Pre-release/
{-# INLINE scanMaybe #-}
scanMaybe :: Monad m => Fold m a (Maybe b) -> Fold m b c -> Fold m a c
scanMaybe f1 f2 = postscan f1 (catMaybes f2)
-- | A scanning fold for filtering elements based on a predicate.
--
{-# INLINE filtering #-}
filtering :: Monad m => (a -> Bool) -> Fold m a (Maybe a)
filtering f = foldl' step Nothing
where
step _ a = if f a then Just a else Nothing
-- | Include only those elements that pass a predicate.
--
-- >>> Stream.fold (Fold.filter (> 5) Fold.sum) $ Stream.fromList [1..10]
-- 40
--
-- > filter f = Fold.filterM (return . f)
-- >>> filter p = Fold.scanMaybe (Fold.filtering p)
-- >>> filter p = Fold.filterM (return . p)
-- >>> filter p = Fold.mapMaybe (\x -> if p x then Just x else Nothing)
--
-- @since 0.8.0
{-# INLINE filter #-}
filter :: Monad m => (a -> Bool) -> Fold m a r -> Fold m a r
-- filter p = scanMaybe (filtering p)
filter f (Fold step begin done) = Fold step' begin done
where
step' x a = if f a then step x a else return $ Partial x
-- | Like 'filter' but with a monadic predicate.
--
-- >>> f p x = p x >>= \r -> return $ if r then Just x else Nothing
-- >>> filterM p = Fold.mapMaybeM (f p)
--
-- @since 0.8.0
{-# INLINE filterM #-}
filterM :: Monad m => (a -> m Bool) -> Fold m a r -> Fold m a r
@ -1139,14 +1247,6 @@ filterM f (Fold step begin done) = Fold step' begin done
use <- f a
if use then step x a else return $ Partial x
-- | Modify a fold to receive a 'Maybe' input, the 'Just' values are unwrapped
-- and sent to the original fold, 'Nothing' values are discarded.
--
-- @since 0.8.0
{-# INLINE_NORMAL catMaybes #-}
catMaybes :: Monad m => Fold m a b -> Fold m (Maybe a) b
catMaybes = filter isJust . lmap fromJust
------------------------------------------------------------------------------
-- Either streams
------------------------------------------------------------------------------
@ -1184,6 +1284,39 @@ both = lmap (either id id)
{-# ANN type Tuple'Fused Fuse #-}
data Tuple'Fused a b = Tuple'Fused !a !b deriving Show
{-# INLINE taking #-}
taking :: Monad m => Int -> Fold m a (Maybe a)
taking n = foldt' step initial extract
where
initial =
if n <= 0
then Done Nothing
else Partial (Tuple'Fused n Nothing)
step (Tuple'Fused i _) a =
if i > 1
then Partial (Tuple'Fused (i - 1) (Just a))
else Done (Just a)
extract (Tuple'Fused _ r) = r
{-# INLINE dropping #-}
dropping :: Monad m => Int -> Fold m a (Maybe a)
dropping n = foldt' step initial extract
where
initial = Partial (Tuple'Fused n Nothing)
step (Tuple'Fused i _) a =
if i > 0
then Partial (Tuple'Fused (i - 1) Nothing)
else Partial (Tuple'Fused i (Just a))
extract (Tuple'Fused _ r) = r
-- | Take at most @n@ input elements and fold them using the supplied fold. A
-- negative count is treated as 0.
--
@ -1193,6 +1326,7 @@ data Tuple'Fused a b = Tuple'Fused !a !b deriving Show
-- @since 0.8.0
{-# INLINE take #-}
take :: Monad m => Int -> Fold m a b -> Fold m a b
-- take n = scanMaybe (taking n)
take n (Fold fstep finitial fextract) = Fold step initial extract
where

View File

@ -18,6 +18,9 @@
-- For more advanced statistical measures see the @streamly-statistics@
-- package.
-- XXX A window fold can be driven either using the Ring.slidingWindow
-- combinator or by zipping nthLast fold and last fold.
module Streamly.Internal.Data.Fold.Window
(
-- * Incremental Folds
@ -36,6 +39,9 @@ module Streamly.Internal.Data.Fold.Window
lmap
, cumulative
, rollingMap
, rollingMapM
-- ** Sums
, length
, sum
@ -98,6 +104,37 @@ lmap f = Fold.lmap (bimap f (f <$>))
cumulative :: Fold m (a, Maybe a) b -> Fold m a b
cumulative = Fold.lmap (, Nothing)
-- XXX Exchange the first two arguments of rollingMap or exchange the order in
-- the fold input tuple.
-- | Apply an effectful function on the latest and the oldest element of the
-- window.
{-# INLINE rollingMapM #-}
rollingMapM :: Monad m =>
(Maybe a -> a -> m (Maybe b)) -> Fold m (a, Maybe a) (Maybe b)
rollingMapM f = Fold.foldlM' f1 initial
where
initial = return Nothing
f1 _ (a, ma) = f ma a
-- | Apply a pure function on the latest and the oldest element of the window.
--
-- >>> rollingMap f = FoldW.rollingMapM (\x y -> return $ f x y)
--
{-# INLINE rollingMap #-}
rollingMap :: Monad m =>
(Maybe a -> a -> Maybe b) -> Fold m (a, Maybe a) (Maybe b)
rollingMap f = Fold.foldl' f1 initial
where
initial = Nothing
f1 _ (a, ma) = f ma a
-------------------------------------------------------------------------------
-- Sum
-------------------------------------------------------------------------------

View File

@ -28,6 +28,9 @@ module Streamly.Internal.Data.Stream.Bottom
, smapM
-- $smapM_Notes
, postscan
, catMaybes
, scanMaybe
, take
, takeWhile
, takeEndBy
@ -79,6 +82,7 @@ import Streamly.Internal.Data.Stream.Type
-- >>> import Control.Monad (join)
-- >>> import Control.Monad.Trans.Class (lift)
-- >>> import Data.Function (fix, (&))
-- >>> import Data.Maybe (fromJust, isJust)
-- >>> import Data.Semigroup (cycle1)
-- >>> import Prelude hiding (take, takeWhile, drop, reverse)
-- >>> import System.IO.Unsafe (unsafePerformIO)
@ -305,6 +309,28 @@ smapM step initial stream =
(fmap (,undefined) initial)
in fmap snd $ postscan f stream
-- | In a stream of 'Maybe's, discard 'Nothing's and unwrap 'Just's.
--
-- >>> catMaybes = Stream.mapMaybe id
-- >>> catMaybes = fmap fromJust . Stream.filter isJust
--
-- /Pre-release/
--
{-# INLINE catMaybes #-}
catMaybes :: Monad m => Stream m (Maybe a) -> Stream m a
-- catMaybes = fmap fromJust . filter isJust
catMaybes = fromStreamD . D.catMaybes . toStreamD
-- | Use a filtering fold on a stream.
--
-- >>> scanMaybe f = Stream.catMaybes . Stream.postscan f
--
-- /Pre-release/
--
{-# INLINE scanMaybe #-}
scanMaybe :: Monad m => Fold m a (Maybe b) -> Stream m a -> Stream m b
scanMaybe p = catMaybes . postscan p
------------------------------------------------------------------------------
-- Transformation - Trimming
------------------------------------------------------------------------------
@ -313,22 +339,26 @@ smapM step initial stream =
--
{-# INLINE take #-}
take :: Monad m => Int -> Stream m a -> Stream m a
-- take n = scanMaybe (Fold.taking n)
take n m = fromStreamD $ D.take n $ toStreamD m
-- | End the stream as soon as the predicate fails on an element.
--
{-# INLINE takeWhile #-}
takeWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
-- takeWhile p = scanMaybe (Fold.takingEndBy_ (not . p))
takeWhile p m = fromStreamD $ D.takeWhile p $ toStreamD m
{-# INLINE takeEndBy #-}
takeEndBy :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
-- takeEndBy p = scanMaybe (Fold.takingEndBy p)
takeEndBy p m = fromStreamD $ D.takeEndBy p $ toStreamD m
-- | Discard first 'n' elements from the stream and take the rest.
--
{-# INLINE drop #-}
drop :: Monad m => Int -> Stream m a -> Stream m a
-- drop n = scanMaybe (Fold.dropping n)
drop n m = fromStreamD $ D.drop n $ toStreamD m
------------------------------------------------------------------------------
@ -342,6 +372,7 @@ drop n m = fromStreamD $ D.drop n $ toStreamD m
--
{-# INLINE findIndices #-}
findIndices :: Monad m => (a -> Bool) -> Stream m a -> Stream m Int
-- findIndices p = scanMaybe (Fold.findIndices p)
findIndices p m = fromStreamD $ D.findIndices p (toStreamD m)
------------------------------------------------------------------------------

View File

@ -68,9 +68,9 @@ module Streamly.Internal.Data.Stream.StreamD.Transform
-- * Filtering
-- | Produce a subset of the stream.
, scanMaybe
, filter
, filterM
, foldFilter
, deleteBy
, uniq
, nub
@ -120,6 +120,7 @@ module Streamly.Internal.Data.Stream.StreamD.Transform
-- * Maybe Streams
, mapMaybe
, mapMaybeM
, catMaybes
)
where
@ -705,19 +706,6 @@ scanl1' f = scanl1M' (\x y -> return (f x y))
-- Filtering
-------------------------------------------------------------------------------
-- XXX nested foldMany does not fuse, therefore, this may not be very useful as
-- a filter unless that is fixed.
--
-- | Use a filtering fold on a stream.
--
-- > Stream.sum $ Stream.foldFilter (Fold.satisfy (> 5)) $ Stream.fromList [1..10]
-- 40
--
-- /Pre-release/
{-# INLINE foldFilter #-}
foldFilter :: Monad m => Fold m a (Maybe b) -> Stream m a -> Stream m b
foldFilter f = catMaybes . foldMany f
-- Adapted from the vector package
{-# INLINE_NORMAL filterM #-}
filterM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
@ -1259,5 +1247,27 @@ mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Stream m a -> Stream m b
mapMaybeM f = fmap fromJust . filter isJust . mapM f
{-# INLINE catMaybes #-}
catMaybes :: (Monad m) => Stream m (Maybe a) -> Stream m a
catMaybes = fmap fromJust . filter isJust
catMaybes :: Monad m => Stream m (Maybe a) -> Stream m a
-- catMaybes = fmap fromJust . filter isJust
catMaybes (Stream step state) = Stream step1 state
where
{-# INLINE_LATE step1 #-}
step1 gst st = do
r <- step (adaptState gst) st
case r of
Yield x s -> do
return
$ case x of
Just a -> Yield a s
Nothing -> Skip s
Skip s -> return $ Skip s
Stop -> return Stop
-- | Use a filtering fold on a stream.
--
-- /Pre-release/
{-# INLINE scanMaybe #-}
scanMaybe :: Monad m => Fold m a (Maybe b) -> Stream m a -> Stream m b
scanMaybe f = catMaybes . postscanOnce f

View File

@ -38,14 +38,21 @@ module Streamly.Internal.Data.Stream.Transform
-- the elements. We can use a concatMap and scan for filtering but these
-- combinators are more efficient and convenient.
-- mapMaybeM is a general filtering combinator as we can map the stream to
-- Just/Nothing using any stateful fold and then use this to filter out.
, mapMaybeM
, mapMaybe
, catMaybes
, scanMaybe
, with
, deleteBy
, filter
, filterM
, foldFilter
-- Stateful/scanning filters
, uniq
, uniqBy
, nubBy
, prune
, repeated
@ -59,6 +66,22 @@ module Streamly.Internal.Data.Stream.Transform
, dropWhile
, dropWhileM
-- * Position Indexing
, indexed
, indexedR
-- * Searching
, findIndices -- XXX indicesBy
, elemIndices -- XXX indicesOf
-- * Rolling map
-- | Map using the previous element.
, rollingMapM
, rollingMap
, rollingMap2
-- Merge
-- * Inserting Elements
-- | Produce a superset of the stream. This is the opposite of
-- filtering/sampling. We can always use concatMap and scan for inserting
@ -89,31 +112,13 @@ module Streamly.Internal.Data.Stream.Transform
-- , intersperseByBefore
-- , intersperseByAfter
-- Fold and Unfold, Buffering
-- * Reordering
, reverse
, reverse'
, reassembleBy
-- * Position Indexing
, indexed
, indexedR
-- * Searching
, findIndices -- XXX indicesBy
, elemIndices -- XXX indicesOf
-- * Rolling map
-- | Map using the previous element.
, rollingMapM
, rollingMap
, rollingMap2
-- * Maybe Streams
-- Move these to Streamly.Data.Maybe.Stream?
, catMaybes -- XXX justs (like lefts/rights)
, mapMaybe
, mapMaybeM
-- * Either Streams
-- Move these to Streamly.Data.Either.Stream?
, lefts
@ -135,6 +140,7 @@ import Streamly.Internal.Data.Fold.Type (Fold)
import Streamly.Internal.Data.Pipe (Pipe)
import qualified Streamly.Internal.Data.Fold as FL
-- import qualified Streamly.Internal.Data.Fold.Window as Window
import qualified Streamly.Internal.Data.Stream.StreamD.Transform as D
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
@ -153,12 +159,14 @@ import Prelude hiding
-- >>> import Control.Monad.IO.Class (MonadIO (liftIO))
-- >>> import Control.Monad.Trans (lift)
-- >>> import Control.Monad.Trans.Identity (runIdentityT)
-- >>> import Data.Either (fromLeft, fromRight, isLeft, isRight, either)
-- >>> import Data.Function ((&))
-- >>> import Data.Maybe (fromJust, isJust)
-- >>> import Prelude hiding (filter, drop, dropWhile, take, takeWhile, foldr, map, mapM, sequence, reverse, foldr1 , scanl, scanl1)
-- >>> import Streamly.Internal.Data.Stream as Stream
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Fold.Window as Window
-- >>> import qualified Streamly.Internal.Data.Unfold as Unfold
-- >>> import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
--
@ -432,28 +440,24 @@ with f comb g = fmap snd . comb g . f
-- | Include only those elements that pass a predicate.
--
-- >>> filter p = Stream.filterM (return . p)
-- >>> filter p = Stream.mapMaybe (\x -> if p x then Just x else Nothing)
-- >>> filter p = Stream.scanMaybe (Fold.filtering p)
--
{-# INLINE filter #-}
filter :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
-- filter p = scanMaybe (FL.filtering p)
filter p m = fromStreamD $ D.filter p $ toStreamD m
-- | Same as 'filter' but with a monadic predicate.
--
-- >>> f p x = p x >>= \r -> return $ if r then Just x else Nothing
-- >>> filterM p = Stream.mapMaybeM (f p)
--
{-# INLINE filterM #-}
filterM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
filterM p m = fromStreamD $ D.filterM p $ toStreamD m
-- | Use a filtering fold on a stream.
--
-- >>> input = Stream.fromList [1..10]
-- >>> Stream.fold Fold.sum $ Stream.foldFilter (Fold.satisfy (> 5)) input
-- 40
--
-- /Pre-release/
--
{-# INLINE foldFilter #-}
foldFilter :: Monad m => Fold m a (Maybe b) -> Stream m a -> Stream m b
foldFilter p = fromStreamD . D.foldFilter p . toStreamD
-- | Drop repeated elements that are adjacent to each other using the supplied
-- comparison function.
--
@ -475,6 +479,7 @@ foldFilter p = fromStreamD . D.foldFilter p . toStreamD
{-# INLINE uniqBy #-}
uniqBy :: Monad m =>
(a -> a -> Bool) -> Stream m a -> Stream m a
-- uniqBy eq = scanMaybe (FL.uniqBy eq)
uniqBy eq = catMaybes . rollingMap f
where
@ -486,8 +491,11 @@ uniqBy eq = catMaybes . rollingMap f
-- | Drop repeated elements that are adjacent to each other.
--
-- >>> uniq = Stream.uniqBy (==)
--
{-# INLINE uniq #-}
uniq :: (Eq a, Monad m) => Stream m a -> Stream m a
-- uniq = scanMaybe FL.uniq
uniq = fromStreamD . D.uniq . toStreamD
-- | Strip all leading and trailing occurrences of an element passing a
@ -524,21 +532,6 @@ repeated :: -- (Monad m, Eq a) =>
Stream m a -> Stream m a
repeated = undefined
-- We can have more efficient implementations for nubOrd and nubInt by using
-- Set and IntSet to find/remove duplication. For Hashable we can use a
-- hashmap. Use rewrite rules to specialize to more efficient impls.
-- | Drop repeated elements anywhere in the stream.
--
-- /Caution: not scalable for infinite streams/
--
-- /Unimplemented/
--
{-# INLINE nubBy #-}
nubBy :: -- Monad m =>
(a -> a -> Bool) -> Stream m a -> Stream m a
nubBy = undefined -- fromStreamD . D.nubBy . toStreamD
-- | Deletes the first occurrence of the element in the stream that satisfies
-- the given equality predicate.
--
@ -548,6 +541,7 @@ nubBy = undefined -- fromStreamD . D.nubBy . toStreamD
--
{-# INLINE deleteBy #-}
deleteBy :: Monad m => (a -> a -> Bool) -> a -> Stream m a -> Stream m a
-- deleteBy cmp x = scanMaybe (FL.deleteBy cmp x)
deleteBy cmp x m = fromStreamD $ D.deleteBy cmp x (toStreamD m)
------------------------------------------------------------------------------
@ -558,6 +552,7 @@ deleteBy cmp x m = fromStreamD $ D.deleteBy cmp x (toStreamD m)
--
{-# INLINE takeWhileM #-}
takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
-- takeWhileM p = scanMaybe (FL.takingEndByM_ (\x -> not <$> p x))
takeWhileM p m = fromStreamD $ D.takeWhileM p $ toStreamD m
-- | Drop elements in the stream as long as the predicate succeeds and then
@ -565,12 +560,14 @@ takeWhileM p m = fromStreamD $ D.takeWhileM p $ toStreamD m
--
{-# INLINE dropWhile #-}
dropWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
-- dropWhile p = scanMaybe (FL.droppingWhile p)
dropWhile p m = fromStreamD $ D.dropWhile p $ toStreamD m
-- | Same as 'dropWhile' but with a monadic predicate.
--
{-# INLINE dropWhileM #-}
dropWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
-- dropWhileM p = scanMaybe (FL.droppingWhileM p)
dropWhileM p m = fromStreamD $ D.dropWhileM p $ toStreamD m
------------------------------------------------------------------------------
@ -763,6 +760,7 @@ reassembleBy = undefined
--
{-# INLINE indexed #-}
indexed :: Monad m => Stream m a -> Stream m (Int, a)
-- indexed = scanMaybe FL.indexing
indexed = fromStreamD . D.indexed . toStreamD
-- |
@ -780,6 +778,7 @@ indexed = fromStreamD . D.indexed . toStreamD
--
{-# INLINE indexedR #-}
indexedR :: Monad m => Int -> Stream m a -> Stream m (Int, a)
-- indexedR n = scanMaybe (FL.indexingRev n)
indexedR n = fromStreamD . D.indexedR n . toStreamD
------------------------------------------------------------------------------
@ -811,6 +810,7 @@ elemIndices a = findIndices (== a)
--
{-# INLINE rollingMap #-}
rollingMap :: Monad m => (Maybe a -> a -> b) -> Stream m a -> Stream m b
-- rollingMap f = scanMaybe (FL.slide2 $ Window.rollingMap f)
rollingMap f m = fromStreamD $ D.rollingMap f $ toStreamD m
-- | Like 'rollingMap' but with an effectful map function.
@ -819,6 +819,7 @@ rollingMap f m = fromStreamD $ D.rollingMap f $ toStreamD m
--
{-# INLINE rollingMapM #-}
rollingMapM :: Monad m => (Maybe a -> a -> m b) -> Stream m a -> Stream m b
-- rollingMapM f = scanMaybe (FL.slide2 $ Window.rollingMapM f)
rollingMapM f m = fromStreamD $ D.rollingMapM f $ toStreamD m
-- | Like 'rollingMap' but requires at least two elements in the stream,
@ -841,7 +842,7 @@ rollingMap2 f m = fromStreamD $ D.rollingMap2 f $ toStreamD m
--
-- Equivalent to:
--
-- >>> mapMaybe f = fmap fromJust . Stream.filter isJust . fmap f
-- >>> mapMaybe f = Stream.catMaybes . fmap f
--
{-# INLINE mapMaybe #-}
mapMaybe :: Monad m => (a -> Maybe b) -> Stream m a -> Stream m b
@ -851,27 +852,23 @@ mapMaybe f m = fromStreamD $ D.mapMaybe f $ toStreamD m
--
-- Equivalent to:
--
-- >>> mapMaybeM f = fmap fromJust . Stream.filter isJust . Stream.mapM f
-- >>> mapMaybeM f = Stream.catMaybes . Stream.mapM f
--
-- >>> mapM f = Stream.mapMaybeM (\x -> Just <$> f x)
--
{-# INLINE_EARLY mapMaybeM #-}
mapMaybeM :: Monad m
=> (a -> m (Maybe b)) -> Stream m a -> Stream m b
mapMaybeM f = fmap fromJust . filter isJust . mapM f
-- | In a stream of 'Maybe's, discard 'Nothing's and unwrap 'Just's.
--
-- /Pre-release/
--
{-# INLINE catMaybes #-}
catMaybes :: Monad m => Stream m (Maybe a) -> Stream m a
catMaybes = fmap fromJust . filter isJust
------------------------------------------------------------------------------
-- Either streams
------------------------------------------------------------------------------
-- | Discard 'Right's and unwrap 'Left's in an 'Either' stream.
--
-- >>> lefts = fmap (fromLeft undefined) . Stream.filter isLeft
--
-- /Pre-release/
--
{-# INLINE lefts #-}
@ -880,6 +877,8 @@ lefts = fmap (fromLeft undefined) . filter isLeft
-- | Discard 'Left's and unwrap 'Right's in an 'Either' stream.
--
-- >>> rights = fmap (fromRight undefined) . Stream.filter isRight
--
-- /Pre-release/
--
{-# INLINE rights #-}
@ -889,6 +888,8 @@ rights = fmap (fromRight undefined) . filter isRight
-- | Remove the either wrapper and flatten both lefts and as well as rights in
-- the output stream.
--
-- >>> both = fmap (either id id)
--
-- /Pre-release/
--
{-# INLINE both #-}