Update doc, add some unimplemented APIs

Add groupScan and sliding window buffer based APIs, commented and unimplemented.
This commit is contained in:
Harendra Kumar 2019-05-22 03:47:22 +05:30
parent c07561673f
commit 9178cf04e4
3 changed files with 122 additions and 27 deletions

View File

@ -28,6 +28,12 @@
-- Similarly, a partitioning applicative can partition the input among the
-- constituent folds. There are many other ways to combine folds.
--
-- All combinators in this module are of true streaming nature. For example,
-- the splitting combinators accept a fold which consumes the elements in the
-- split segments as soon as they are generated by the producer, segments are
-- not buffered in memory, resulting in constant memory usage. The folds used
-- in these combinators can be considered as continuations.
--
-- > import qualified Streamly.Fold as FL
--
--
@ -192,6 +198,7 @@ module Streamly.Fold
, break -- breakPre
-- , breakPost
-- , breakOn
-- , breakAround
-- , spanBy
-- , spanRollingBy
@ -199,19 +206,16 @@ module Streamly.Fold
-- breakOnSeq
-- ** Splitting
-- | Streams can be split into segments in space or in time. We use the
-- term @chunk@ to refer to a spatial length of the stream (spatial window)
-- and the term @session@ to refer to a length in time (time window).
-- In imperative terms grouped folding can be considered as a nested loop
-- In imperative terms, grouped folding can be considered as a nested loop
-- where we loop over the stream to group elements and then loop over
-- individual groups to fold them to a single value that is yielded in the
-- output stream.
--
-- Note that these grouping folds are true streaming folds that never
-- accumulate the group in memory before folding, i.e. the group elements
-- are consumed by the folds as they are yielded by the stream. Therefore,
-- the whole computation runs in constant space.
-- In contrast, we can simply use a scan on the stream to buffer the whole
-- groups in memory and then map a fold on it to fold the groups. This kind
-- of grouping and folding would not work well when the group size is big.
-- , groupScan
-- *** By Chunks
, chunksOf
@ -335,7 +339,9 @@ module Streamly.Fold
, classifySlidingChunks
, classifySlidingSessions
-}
-- ** Sliding Window Buffers
-- , slidingChunkBuffer
-- , slidingSessionBuffer
)
where
@ -362,9 +368,9 @@ import qualified Data.Map.Strict as Map
import Streamly (MonadAsync, parallel)
import Streamly.Fold.Types (Fold(..))
import Streamly.Mem.Array (Array)
-- import Streamly.Mem.Ring (Ring)
import Streamly.Streams.Serial (SerialT)
import Streamly.Streams.StreamK (IsStream())
-- import Streamly.Time.Clock (Clock(..), getTime)
import Streamly.Time.Units
(AbsTime, MilliSecond64(..), addToAbsTime, diffAbsTime, toRelTime,
toAbsTime)
@ -1044,6 +1050,45 @@ splitAt n (Fold stepL initialL extractL) (Fold stepR initialR extractR) =
-- N-ary APIs
------------------------------------------------------------------------------
------------------------------------------------------------------------------
-- Generalized grouping
------------------------------------------------------------------------------
-- This combinator is the most general grouping combinator and can be used to
-- implement all other grouping combinators.
--
-- XXX check if this can implement the splitOn combinator i.e. we can slide in
-- new elements, slide out old elements and incrementally compute the hash.
-- Also, can we implement the windowed classification combinators using this?
--
-- In fact this is a parse. Instead of using a special return value in the fold
-- we are using a mapping function.
--
-- Note that 'scanl'' (usually followed by a map to extract the desired value
-- from the accumulator) can be used to realize many implementations e.g. a
-- sliding window implementation. A scan followed by a mapMaybe is also a good
-- pattern to express many problems where we want to emit a filtered output and
-- not emit an output on every input.
--
-- Passing on of the initial accumulator value to the next fold is equivalent
-- to returning the leftover concept.
{-
-- | @groupScan splitter fold stream@ folds the input stream using @fold@.
-- @splitter@ is applied on the accumulator of the fold every time an item is
-- consumed by the fold. The fold continues until @splitter@ returns a 'Just'
-- value. A 'Just' result from the @splitter@ specifies a result to be emitted
-- in the output stream and the initial value of the accumulator for the next
-- group's fold. This allows us to control whether to start fresh for the next
-- fold or to continue from the previous fold's output.
--
{-# INLINE groupScan #-}
groupScan
:: (IsStream t, Monad m)
=> (x -> m (Maybe (b, x))) -> Fold m a x -> t m a -> t m b
groupScan split fold m = undefined
-}
-- | Group the input stream into groups of @n@ elements each and then fold each
-- group using the provided fold function.
--
@ -1688,21 +1733,21 @@ splitSuffixOnAny subseq f m = undefined
-}
------------------------------------------------------------------------------
-- Grouped by order
-- Reorder in sequence
------------------------------------------------------------------------------
{-
-- Buffer until the next element in sequence arrives. The function argument
-- determines the difference in sequence numbers. This could be useful in
-- implementing sequenced streams, for example, TCP reassembly.
{-# INLINE foldOrderedBy #-}
foldOrderedBy
{-# INLINE reassembleBy #-}
reassembleBy
:: (IsStream t, Monad m)
=> (forall n. Monad n => Fold n a b)
=> Fold m a b
-> (a -> a -> Int)
-> t m a
-> t m b
foldOrderedBy = undefined
reassembleBy = undefined
-}
------------------------------------------------------------------------------
@ -2187,6 +2232,48 @@ classifySlidingSessions tick interval slide (Fold step initial extract) str
= undefined
-}
------------------------------------------------------------------------------
-- Sliding Window Buffers
------------------------------------------------------------------------------
-- These buffered versions could be faster than concurrent incremental folds of
-- all overlapping windows as in many cases we may not need all the values to
-- compute the fold, we can just compute the result using the old value and new
-- value. However, we may need the buffer once in a while, for example for
-- string search we usually compute the hash incrementally but when the hash
-- matches the hash of the pattern we need to compare the whole string.
--
-- XXX we should be able to implement sequence based splitting combinators
-- using this combinator.
{-
-- | Buffer n elements of the input in a ring buffer. When t new elements are
-- collected, slide the window to remove the same number of oldest elements,
-- insert the new elements, and apply an incremental fold on the sliding
-- window, supplying the outgoing elements, the new ring buffer as arguments.
slidingChunkBuffer
:: (IsStream t, Monad m, Ord a, Storable a)
=> Int -- window size
-> Int -- window slide
-> Fold m (Ring a, Array a) b
-> t m a
-> t m b
slidingChunkBuffer = undefined
-- Buffer n seconds worth of stream elements of the input in a radix tree.
-- Every t seconds, remove the items that are older than n seconds, and apply
-- an incremental fold on the sliding window, supplying the outgoing elements,
-- and the new radix tree buffer as arguments.
slidingSessionBuffer
:: (IsStream t, Monad m, Ord a, Storable a)
=> Int -- window size
-> Int -- tick size
-> Fold m (RTree a, Array a) b
-> t m a
-> t m b
slidingSessionBuffer = undefined
-}
------------------------------------------------------------------------------
-- Keyed Session Windows
------------------------------------------------------------------------------

View File

@ -1864,7 +1864,7 @@ mapMaybeMSerial f m = fromStreamD $ D.mapMaybeM f $ toStreamD m
-- > reverse = S.foldlT (flip S.cons) S.nil
--
-- Returns the elements of the stream in reverse order. The stream must be
-- finite.
-- finite. Note that this necessarily buffers the entire stream in memory.
--
-- /Note:/ 'reverse'' is much faster than this, use that when performance
-- matters.

View File

@ -30,19 +30,27 @@ description:
similar in concept to <https://hackage.haskell.org/package/Yampa Yampa> or
<https://hackage.haskell.org/package/reflex reflex>.
.
Streamly focuses on practical engineering with high performance. One can
expect performance competitive to C from well written streamly programs.
High performance streaming eliminates the need for string and text
libraries like
The grouping, splitting and windowing combinators in streamly can substitute
the window operators in <https://flink.apache.org/ Apache Flink>. However,
compared to Flink streamly has a pure functional, succinct and expressive API
that is applicable to full spectrum of applications, from a "hello world"
application to large scale concurrent applications, with equal efficiency.
For those familiar with <http://reactivex.io/ Reactive Extensions>, streamly
has similarities with the RxJs API. For most RxJs combinators you
can find corresponding combinators in streamly.
.
Streamly focuses on practical engineering with high performance. From well
written streamly programs one can expect performance competitive to C. High
performance streaming eliminates the need for string and text libraries like
<https://hackage.haskell.org/package/bytestring bytestring>,
<https://hackage.haskell.org/package/text text> and their lazy and strict
flavors, and all the confusion arising from those. The only basic types are
arrays for storage and streams for processing. Strings and text are simply
streams of 'Char' as they should be. Even Haskell lists can be replaced by
pure streams. Arrays provided by streamly have performance at par with or
better than the vector library.
flavors. All the confusion and cognitive overhead arising from different
string types is eliminated. The only basic types are arrays for storage and
streams for processing. Strings and text are simply streams of 'Char' as they
should be. Even Haskell lists can be replaced by pure streams. Arrays
provided by streamly have performance at par with the vector library.
.
Streamly interoperates with popular streaming libraries, see
Streamly interoperates with popular Haskell streaming libraries, see
the interoperation section in "Streamly.Tutorial".
.
Why use streamly?