Copy combinators from IsStream, absent in the new Stream modules

This commit is contained in:
Adithya Kumar 2022-10-21 00:06:53 +05:30 committed by GitHub
parent 0fb79706c3
commit 326b05570e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 348 additions and 6 deletions

View File

@ -40,6 +40,7 @@ module Streamly.Internal.Data.Stream.Eliminate
-- the stream one element at a time, and we have the remaining stream all
-- the time.
, uncons
, init
-- * Right Folds
, foldrM
@ -87,7 +88,7 @@ import qualified Streamly.Internal.Data.Stream.StreamK as K
import Streamly.Internal.Data.Stream.Bottom
import Streamly.Internal.Data.Stream.Type
import Prelude hiding (foldr, reverse)
import Prelude hiding (foldr, init, reverse)
-- $setup
-- >>> :m
@ -132,6 +133,15 @@ import Prelude hiding (foldr, reverse)
uncons :: Monad m => Stream m a -> m (Maybe (a, Stream m a))
uncons m = fmap (fmap (fmap fromStreamK)) $ K.uncons (toStreamK m)
-- | Extract all but the last element of the stream, if any.
--
-- Note: This will end up buffering the entire stream.
--
-- /Pre-release/
{-# INLINE init #-}
init :: Monad m => Stream m a -> m (Maybe (Stream m a))
init m = fmap (fmap fromStreamK) $ K.init $ toStreamK m
------------------------------------------------------------------------------
-- Right Folds
------------------------------------------------------------------------------

View File

@ -41,6 +41,11 @@ module Streamly.Internal.Data.Stream.Expand
, interleave
, interleaveFst
, interleaveMin
, interleaveFstSuffix2
, interleaveFst2
-- ** Round Robin
, roundrobin
-- ** Merge
, mergeBy
@ -140,6 +145,7 @@ import Prelude hiding (concat, concatMap, zipWith)
-- >>> :m
-- >>> import Data.Either (either)
-- >>> import Data.IORef
-- >>> import Streamly.Internal.Data.Stream (Stream)
-- >>> import Prelude hiding (zipWith, concatMap, concat)
-- >>> import qualified Streamly.Data.Array.Unboxed as Array
-- >>> import qualified Streamly.Internal.Data.Fold as Fold
@ -227,6 +233,74 @@ interleaveMin :: Stream m a -> Stream m a -> Stream m a
interleaveMin s1 s2 =
fromStreamK $ K.interleaveMin (toStreamK s1) (toStreamK s2)
-- | Interleaves the outputs of two streams, yielding elements from each stream
-- alternately, starting from the first stream. As soon as the first stream
-- finishes, the output stops, discarding the remaining part of the second
-- stream. In this case, the last element in the resulting stream would be from
-- the second stream. If the second stream finishes early then the first stream
-- still continues to yield elements until it finishes.
--
-- >>> :set -XOverloadedStrings
-- >>> import Data.Functor.Identity (Identity)
-- >>> Stream.interleaveFstSuffix2 "abc" ",,,," :: Stream Identity Char
-- fromList "a,b,c,"
-- >>> Stream.interleaveFstSuffix2 "abc" "," :: Stream Identity Char
-- fromList "a,bc"
--
-- 'interleaveFstSuffix2' is a dual of 'interleaveFst2'.
--
-- Do not use at scale in concatMapWith.
--
-- /Pre-release/
{-# INLINE interleaveFstSuffix2 #-}
interleaveFstSuffix2 :: Monad m => Stream m b -> Stream m b -> Stream m b
interleaveFstSuffix2 m1 m2 =
fromStreamD $ D.interleaveSuffix (toStreamD m1) (toStreamD m2)
-- | Interleaves the outputs of two streams, yielding elements from each stream
-- alternately, starting from the first stream and ending at the first stream.
-- If the second stream is longer than the first, elements from the second
-- stream are infixed with elements from the first stream. If the first stream
-- is longer then it continues yielding elements even after the second stream
-- has finished.
--
-- >>> :set -XOverloadedStrings
-- >>> import Data.Functor.Identity (Identity)
-- >>> Stream.interleaveFst2 "abc" ",,,," :: Stream Identity Char
-- fromList "a,b,c"
-- >>> Stream.interleaveFst2 "abc" "," :: Stream Identity Char
-- fromList "a,bc"
--
-- 'interleaveFst2' is a dual of 'interleaveFstSuffix2'.
--
-- Do not use at scale in concatMapWith.
--
-- /Pre-release/
{-# INLINE interleaveFst2 #-}
interleaveFst2 :: Monad m => Stream m b -> Stream m b -> Stream m b
interleaveFst2 m1 m2 =
fromStreamD $ D.interleaveInfix (toStreamD m1) (toStreamD m2)
------------------------------------------------------------------------------
-- Scheduling
------------------------------------------------------------------------------
-- | Schedule the execution of two streams in a fair round-robin manner,
-- executing each stream once, alternately. Execution of a stream may not
-- necessarily result in an output, a stream may chose to @Skip@ producing an
-- element until later giving the other stream a chance to run. Therefore, this
-- combinator fairly interleaves the execution of two streams rather than
-- fairly interleaving the output of the two streams. This can be useful in
-- co-operative multitasking without using explicit threads. This can be used
-- as an alternative to `async`.
--
-- Do not use at scale in concatMapWith.
--
-- /Pre-release/
{-# INLINE roundrobin #-}
roundrobin :: Monad m => Stream m b -> Stream m b -> Stream m b
roundrobin m1 m2 = fromStreamD $ D.roundRobin (toStreamD m1) (toStreamD m2)
------------------------------------------------------------------------------
-- Merging (sorted streams)
------------------------------------------------------------------------------
@ -361,7 +435,7 @@ unfoldInterleave u m =
{-# INLINE unfoldRoundRobin #-}
unfoldRoundRobin ::Monad m => Unfold m a b -> Stream m a -> Stream m b
unfoldRoundRobin u m =
fromStreamD $ D.unfoldManyInterleave u (toStreamD m)
fromStreamD $ D.unfoldManyRoundRobin u (toStreamD m)
------------------------------------------------------------------------------
-- Combine N Streams - interpose
@ -407,7 +481,7 @@ interposeSuffix x unf str =
-- > unfoldMany unf str =
-- > gintercalate unf str (UF.nilM (\_ -> return ())) (repeat ())
-- | 'interleaveInfix' followed by unfold and concat.
-- | 'interleaveFst' followed by unfold and concat.
--
-- /Pre-release/
{-# INLINE gintercalate #-}
@ -438,7 +512,7 @@ intercalate :: Monad m
intercalate unf seed str = fromStreamD $
D.unfoldMany unf $ D.intersperse seed (toStreamD str)
-- | 'interleaveSuffix' followed by unfold and concat.
-- | 'interleaveFstSuffix2' followed by unfold and concat.
--
-- /Pre-release/
{-# INLINE gintercalateSuffix #-}

View File

@ -38,6 +38,12 @@ module Streamly.Internal.Data.Stream.Reduce
-- | Element unaware grouping.
, arraysOf
-- ** Splitting
-- XXX Implement these as folds or parsers instead.
, splitOnSuffixSeqAny
, splitOnPrefix
, splitOnAny
-- * Reduce By Parsers
-- ** Generic Parsing
-- | Apply parsers on a stream.
@ -204,6 +210,113 @@ refoldIterateM :: Monad m =>
Refold m b a b -> m b -> Stream m a -> Stream m b
refoldIterateM c i m = fromStreamD $ D.refoldIterateM c i (toStreamD m)
------------------------------------------------------------------------------
-- Splitting
------------------------------------------------------------------------------
-- Implement this as a fold or a parser instead.
-- This can be implemented easily using Rabin Karp
-- | Split post any one of the given patterns.
--
-- /Unimplemented/
{-# INLINE splitOnSuffixSeqAny #-}
splitOnSuffixSeqAny :: -- (Monad m, Unboxed a, Integral a) =>
[Array a] -> Fold m a b -> Stream m a -> Stream m b
splitOnSuffixSeqAny _subseq _f _m = undefined
-- D.fromStreamD $ D.splitPostAny f subseq (D.toStreamD m)
-- | Split on a prefixed separator element, dropping the separator. The
-- supplied 'Fold' is applied on the split segments.
--
-- @
-- > splitOnPrefix' p xs = Stream.toList $ Stream.splitOnPrefix p (Fold.toList) (Stream.fromList xs)
-- > splitOnPrefix' (== '.') ".a.b"
-- ["a","b"]
-- @
--
-- An empty stream results in an empty output stream:
-- @
-- > splitOnPrefix' (== '.') ""
-- []
-- @
--
-- An empty segment consisting of only a prefix is folded to the default output
-- of the fold:
--
-- @
-- > splitOnPrefix' (== '.') "."
-- [""]
--
-- > splitOnPrefix' (== '.') ".a.b."
-- ["a","b",""]
--
-- > splitOnPrefix' (== '.') ".a..b"
-- ["a","","b"]
--
-- @
--
-- A prefix is optional at the beginning of the stream:
--
-- @
-- > splitOnPrefix' (== '.') "a"
-- ["a"]
--
-- > splitOnPrefix' (== '.') "a.b"
-- ["a","b"]
-- @
--
-- 'splitOnPrefix' is an inverse of 'intercalatePrefix' with a single element:
--
-- > Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList . Stream.splitOnPrefix (== '.') Fold.toList === id
--
-- Assuming the input stream does not contain the separator:
--
-- > Stream.splitOnPrefix (== '.') Fold.toList . Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList === id
--
-- /Unimplemented/
{-# INLINE splitOnPrefix #-}
splitOnPrefix :: -- (IsStream t, MonadCatch m) =>
(a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
splitOnPrefix _predicate _f = undefined
-- parseMany (Parser.sliceBeginBy predicate f)
-- Int list examples for splitOn:
--
-- >>> splitList [] [1,2,3,3,4]
-- > [[1],[2],[3],[3],[4]]
--
-- >>> splitList [5] [1,2,3,3,4]
-- > [[1,2,3,3,4]]
--
-- >>> splitList [1] [1,2,3,3,4]
-- > [[],[2,3,3,4]]
--
-- >>> splitList [4] [1,2,3,3,4]
-- > [[1,2,3,3],[]]
--
-- >>> splitList [2] [1,2,3,3,4]
-- > [[1],[3,3,4]]
--
-- >>> splitList [3] [1,2,3,3,4]
-- > [[1,2],[],[4]]
--
-- >>> splitList [3,3] [1,2,3,3,4]
-- > [[1,2],[4]]
--
-- >>> splitList [1,2,3,3,4] [1,2,3,3,4]
-- > [[],[]]
-- This can be implemented easily using Rabin Karp
-- | Split on any one of the given patterns.
--
-- /Unimplemented/
--
{-# INLINE splitOnAny #-}
splitOnAny :: -- (Monad m, Unboxed a, Integral a) =>
[Array a] -> Fold m a b -> Stream m a -> Stream m b
splitOnAny _subseq _f _m =
undefined -- D.fromStreamD $ D.splitOnAny f subseq (D.toStreamD m)
------------------------------------------------------------------------------
-- Parsing
------------------------------------------------------------------------------

View File

@ -63,9 +63,14 @@ module Streamly.Internal.Data.Stream.Transform
, take
, takeWhile
, takeWhileM
, takeWhileLast
, takeWhileAround
, drop
, dropLast
, dropWhile
, dropWhileM
, dropWhileLast
, dropWhileAround
-- * Position Indexing
, indexed
@ -512,8 +517,6 @@ filterM p m = fromStreamD $ D.filterM p $ toStreamD m
--
-- Space: @O(1)@
--
-- See also: 'nubBy'.
--
-- /Pre-release/
--
{-# INLINE uniqBy #-}
@ -598,6 +601,27 @@ takeWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
-- takeWhileM p = scanMaybe (FL.takingEndByM_ (\x -> not <$> p x))
takeWhileM p m = fromStreamD $ D.takeWhileM p $ toStreamD m
-- | Take all consecutive elements at the end of the stream for which the
-- predicate is true.
--
-- O(n) space, where n is the number elements taken.
--
-- /Unimplemented/
{-# INLINE takeWhileLast #-}
takeWhileLast :: -- Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
takeWhileLast = undefined -- fromStreamD $ D.takeWhileLast n $ toStreamD m
-- | Like 'takeWhile' and 'takeWhileLast' combined.
--
-- O(n) space, where n is the number elements taken from the end.
--
-- /Unimplemented/
{-# INLINE takeWhileAround #-}
takeWhileAround :: -- Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
takeWhileAround = undefined -- fromStreamD $ D.takeWhileAround n $ toStreamD m
-- | Drop elements in the stream as long as the predicate succeeds and then
-- take the rest of the stream.
--
@ -615,6 +639,37 @@ dropWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
-- dropWhileM p = scanMaybe (FL.droppingWhileM p)
dropWhileM p m = fromStreamD $ D.dropWhileM p $ toStreamD m
-- | Drop @n@ elements at the end of the stream.
--
-- O(n) space, where n is the number elements dropped.
--
-- /Unimplemented/
{-# INLINE dropLast #-}
dropLast :: -- Monad m =>
Int -> Stream m a -> Stream m a
dropLast = undefined -- fromStreamD $ D.dropLast n $ toStreamD m
-- | Drop all consecutive elements at the end of the stream for which the
-- predicate is true.
--
-- O(n) space, where n is the number elements dropped.
--
-- /Unimplemented/
{-# INLINE dropWhileLast #-}
dropWhileLast :: -- Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
dropWhileLast = undefined -- fromStreamD $ D.dropWhileLast n $ toStreamD m
-- | Like 'dropWhile' and 'dropWhileLast' combined.
--
-- O(n) space, where n is the number elements dropped from the end.
--
-- /Unimplemented/
{-# INLINE dropWhileAround #-}
dropWhileAround :: -- Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
dropWhileAround = undefined -- fromStreamD $ D.dropWhileAround n $ toStreamD m
------------------------------------------------------------------------------
-- Inserting Elements
------------------------------------------------------------------------------

View File

@ -11,8 +11,11 @@ module Streamly.Internal.Data.Stream.Time
(
-- Primitives
interjectSuffix
, ticks
, takeInterval
, takeLastInterval
, dropInterval
, dropLastInterval
, intervalsOf
, chunksOfTimeout
@ -22,6 +25,15 @@ module Streamly.Internal.Data.Stream.Time
, classifySessionsOf
, classifyKeepAliveSessions
-- ** Buffering and Sampling
-- | Evaluate strictly using a buffer of results. When the buffer becomes
-- full we can block, drop the new elements, drop the oldest element and
-- insert the new at the end or keep dropping elements uniformly to match
-- the rate of the consumer.
, sampleOld
, sampleNew
, sampleRate
-- Sampling
, sampleIntervalEnd
, sampleIntervalStart
@ -108,6 +120,19 @@ interjectSuffix n f xs = parallelFst [xs, repeatM timed]
timed = liftIO (threadDelay (round $ n * 1000000)) >> f
repeatM = Stream.sequence . Stream.repeat
-- | Generate ticks at the specified rate. The rate is adaptive, the tick
-- generation speed can be increased or decreased at different times to achieve
-- the specified rate. The specific behavior for different styles of 'Rate'
-- specifications is documented under 'Rate'. The effective maximum rate
-- achieved by a stream is governed by the processor speed.
--
-- /Unimplemented/
--
{-# INLINE ticks #-}
ticks :: -- (MonadAsync m) =>
Rate -> Stream m ()
ticks = undefined
-- XXX Notes from D.takeByTime (which was removed)
-- XXX using getTime in the loop can be pretty expensive especially for
-- computations where iterations are lightweight. We have the following
@ -146,6 +171,16 @@ takeInterval d =
. Stream.takeWhile isNothing
. interjectSuffix d (return Nothing) . fmap Just
-- | Take time interval @i@ seconds at the end of the stream.
--
-- O(n) space, where n is the number elements taken.
--
-- /Unimplemented/
{-# INLINE takeLastInterval #-}
takeLastInterval :: -- MonadAsync m =>
Double -> Stream m a -> Stream m a
takeLastInterval = undefined
-- | @dropInterval duration@ drops stream elements until specified @duration@ in
-- seconds has passed. The duration begins when the stream is evaluated for the
-- first time. The time duration is checked /after/ generating a stream element,
@ -167,6 +202,16 @@ dropInterval d =
. Stream.dropWhile isNothing
. interjectSuffix d (return Nothing) . fmap Just
-- | Drop time interval @i@ seconds at the end of the stream.
--
-- O(n) space, where n is the number elements dropped.
--
-- /Unimplemented/
{-# INLINE dropLastInterval #-}
dropLastInterval :: -- MonadAsync m =>
Int -> Stream m a -> Stream m a
dropLastInterval = undefined
-- XXX we can implement this by repeatedly applying the 'lrunFor' fold.
-- XXX add this example after fixing the serial stream rate control
--
@ -860,3 +905,48 @@ sampleBurstEnd = sampleBurst True
{-# INLINE sampleBurstStart #-}
sampleBurstStart :: MonadAsync m => Double -> Stream m a -> Stream m a
sampleBurstStart = sampleBurst False
------------------------------------------------------------------------------
-- Lossy Buffering
------------------------------------------------------------------------------
-- XXX We could use 'maxBuffer Block/Drop/Rotate/Sample' instead. However we
-- may want to have the evaluation rate independent of the sampling rate. To
-- support that we can decouple evaluation and sampling in independent stages.
-- The sampling stage would strictly evaluate and sample, the evaluation stage
-- would control the evaluation rate.
-- | Evaluate the input stream continuously and keep only the oldest @n@
-- elements in the buffer, discard the new ones when the buffer is full. When
-- the output stream is evaluated it consumes the values from the buffer in a
-- FIFO manner.
--
-- /Unimplemented/
--
{-# INLINE sampleOld #-}
sampleOld :: -- MonadAsync m =>
Int -> Stream m a -> Stream m a
sampleOld = undefined
-- | Evaluate the input stream continuously and keep only the latest @n@
-- elements in a ring buffer, keep discarding the older ones to make space for
-- the new ones. When the output stream is evaluated it consumes the values
-- from the buffer in a FIFO manner.
--
-- /Unimplemented/
--
{-# INLINE sampleNew #-}
sampleNew :: -- MonadAsync m =>
Int -> Stream m a -> Stream m a
sampleNew = undefined
-- | Like 'sampleNew' but samples at uniform intervals to match the consumer
-- rate. Note that 'sampleNew' leads to non-uniform sampling depending on the
-- consumer pattern.
--
-- /Unimplemented/
--
{-# INLINE sampleRate #-}
sampleRate :: -- MonadAsync m =>
Double -> Stream m a -> Stream m a
sampleRate = undefined