Refactor and cleanup IsStream and StreamD

This commit has no functionality changes whatsoever. It is a pure
reorganization of existing code. Not a single line in any function
should have changed.

* Push down all functions from IsStream.hs to a common module below
* Each submodule is now complete in itself i.e. exports all functions
  that belong to that module.
* A few functions had to be moved around to classify them in the right
  modules. Each submodule has a crisp definition.
* Further break down "Transform" module to create Nesting/Lift modules
  under IsStream
* Create "Lift" module under StreamD as well, both IsStream and StreamD
  have the same structure now.
* Rearrange exports to properly organize them in clearly defined
  categories.
* Rearrange functions to reflect the export order or to bring them close
  to other similar functions.
This commit is contained in:
Harendra Kumar 2021-01-07 01:52:14 +05:30
parent d216358dc1
commit e7ebd59080
18 changed files with 6976 additions and 6392 deletions

View File

@ -15,6 +15,7 @@ src/Streamly/Internal/Data/Stream/StreamD/Generate.hs
src/Streamly/Internal/Data/Stream/StreamD/Eliminate.hs
src/Streamly/Internal/Data/Stream/StreamD/Transform.hs
src/Streamly/Internal/Data/Stream/StreamD/Exception.hs
src/Streamly/Internal/Data/Stream/StreamD/Lift.hs
src/Streamly/Internal/Data/Stream/StreamD/Nesting.hs
src/Streamly/Internal/Data/Pipe/Types.hs
src/Streamly/Internal/Data/SmallArray/Types.hs

View File

@ -598,17 +598,6 @@ module Streamly.Internal.Data.Stream.IsStream
)
where
import Control.Concurrent (threadDelay)
import Control.Exception (assert)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Kind (Type)
import Data.Heap (Entry(..))
import Data.Maybe (isNothing)
import Foreign.Storable (Storable)
import Streamly.Internal.Data.Stream.Enumeration
(Enumerable(..), enumerate, enumerateTo)
import Streamly.Internal.Data.Fold.Types (Fold (..))
import Streamly.Internal.Data.Array.Storable.Foreign.Types (Array)
import Streamly.Internal.Data.SVar (MonadAsync, Rate (..))
import Streamly.Internal.Data.Stream.Ahead (AheadT, Ahead, aheadly)
import Streamly.Internal.Data.Stream.Async
@ -619,38 +608,26 @@ import Streamly.Internal.Data.Stream.Combinators
, maxRate, constRate)
import Streamly.Internal.Data.Stream.Parallel
( ParallelT, Parallel, parallely)
import Streamly.Internal.Data.Stream.Prelude (toStreamS)
import Streamly.Internal.Data.Stream.StreamK (IsStream((|:), consM), adapt)
import Streamly.Internal.Data.Stream.StreamK (IsStream(), adapt)
import Streamly.Internal.Data.Stream.Serial
( SerialT, WSerialT, Serial, WSerial, serially
, wSerially)
import Streamly.Internal.Data.Stream.Zip
( ZipSerialM, ZipSerial, ZipAsyncM, ZipAsync, zipSerially, zipAsyncly)
import Streamly.Internal.Data.Time.Units
( AbsTime, MilliSecond64(..), addToAbsTime, toRelTime
, toAbsTime, RelTime64)
import Streamly.Internal.Data.Tuple.Strict (Tuple'(..))
import Streamly.Internal.Data.Stream.IsStream.Generate
import Streamly.Internal.Data.Stream.IsStream.Eliminate
import Streamly.Internal.Data.Stream.IsStream.Transform
import Streamly.Internal.Data.Stream.IsStream.Exception
import qualified Streamly.Internal.Data.Array.Storable.Foreign as A
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Stream.Prelude as P
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.Stream.Zip as Z
import qualified Data.Heap as H
import qualified Data.Map.Strict as Map
#ifdef USE_STREAMK_ONLY
import qualified Streamly.Internal.Data.Stream.StreamK as S
#else
import qualified Streamly.Internal.Data.Stream.StreamD as S
#endif
import Streamly.Internal.Data.Stream.IsStream.Generate
import Streamly.Internal.Data.Stream.IsStream.Eliminate
import Streamly.Internal.Data.Stream.IsStream.Transform
import Streamly.Internal.Data.Stream.IsStream.Nesting
import Streamly.Internal.Data.Stream.IsStream.Exception
import Streamly.Internal.Data.Stream.IsStream.Lift
import Prelude hiding
( filter, drop, dropWhile, take, takeWhile, zipWith, foldr
@ -659,931 +636,3 @@ import Prelude hiding
, reverse, iterate, init, and, or, lookup, foldr1, (!!)
, scanl, scanl1, replicate, concatMap, span, splitAt, break
, repeat, concat, mconcat)
------------------------------------------------------------------------------
-- Conversions
------------------------------------------------------------------------------
-- | Takes a callback setter function and provides it with a callback. The
-- callback when invoked adds a value at the tail of the stream. Returns a
-- stream of values generated by the callback.
--
-- /Internal/
--
{-# INLINE fromCallback #-}
fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> SerialT m a
fromCallback setCallback = concatM $ do
(callback, stream) <- D.newCallbackStream
setCallback callback
return stream
------------------------------------------------------------------------------
-- Specialized folds
------------------------------------------------------------------------------
-- XXX this can utilize parallel mapping if we implement it as drain . mapM
-- |
-- > mapM_ = drain . mapM
--
-- Apply a monadic action to each element of the stream and discard the output
-- of the action. This is not really a pure transformation operation but a
-- transformation followed by fold.
--
-- @since 0.1.0
{-# INLINE mapM_ #-}
mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m ()
mapM_ f m = S.mapM_ f $ toStreamS m
-- |
-- > drainN n = drain . take n
-- > drainN n = fold (Fold.ltake n Fold.drain)
--
-- Run maximum up to @n@ iterations of a stream.
--
-- @since 0.7.0
{-# INLINE drainN #-}
drainN :: Monad m => Int -> SerialT m a -> m ()
drainN n = drain . take n
-- |
-- > runN n = runStream . take n
--
-- Run maximum up to @n@ iterations of a stream.
--
-- @since 0.6.0
{-# DEPRECATED runN "Please use \"drainN\" instead" #-}
{-# INLINE runN #-}
runN :: Monad m => Int -> SerialT m a -> m ()
runN = drainN
-- |
-- > drainWhile p = drain . takeWhile p
-- > drainWhile p = fold (Fold.sliceSepBy (not . p) Fold.drain)
--
-- Run a stream as long as the predicate holds true.
--
-- @since 0.7.0
{-# INLINE drainWhile #-}
drainWhile :: Monad m => (a -> Bool) -> SerialT m a -> m ()
drainWhile p = drain . takeWhile p
-- |
-- > runWhile p = runStream . takeWhile p
--
-- Run a stream as long as the predicate holds true.
--
-- @since 0.6.0
{-# DEPRECATED runWhile "Please use \"drainWhile\" instead" #-}
{-# INLINE runWhile #-}
runWhile :: Monad m => (a -> Bool) -> SerialT m a -> m ()
runWhile = drainWhile
------------------------------------------------------------------------------
-- Transformation by Inserting
------------------------------------------------------------------------------
-- | Intersperse a monadic action into the input stream after every @n@
-- seconds.
--
-- @
-- > S.drain $ S.interjectSuffix 1 (putChar ',') $ S.mapM (\\x -> threadDelay 1000000 >> putChar x) $ S.fromList "hello"
-- "h,e,l,l,o"
-- @
--
-- /Internal/
{-# INLINE interjectSuffix #-}
interjectSuffix
:: (IsStream t, MonadAsync m)
=> Double -> m a -> t m a -> t m a
interjectSuffix n f xs = xs `Par.parallelFst` repeatM timed
where timed = liftIO (threadDelay (round $ n * 1000000)) >> f
-------------------------------------------------------------------------------
-- Indexing
-------------------------------------------------------------------------------
-- Note: The timestamp stream must be the second stream in the zip so that the
-- timestamp is generated after generating the stream element and not before.
-- If we do not do that then the following example will generate the same
-- timestamp for first two elements:
--
-- S.mapM_ print $ S.timestamped $ S.delay $ S.enumerateFromTo 1 3
--
-- | Pair each element in a stream with an absolute timestamp, using a clock of
-- specified granularity. The timestamp is generated just before the element
-- is consumed.
--
-- @
-- >>> S.mapM_ print $ S.timestampWith 0.01 $ S.delay 1 $ S.enumerateFromTo 1 3
-- @
--
-- /Internal/
--
{-# INLINE timestampWith #-}
timestampWith :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m a -> t m (AbsTime, a)
timestampWith g stream = Z.zipWith (flip (,)) stream (absTimesWith g)
-- TBD: check performance vs a custom implementation without using zipWith.
--
-- /Internal/
--
{-# INLINE timestamped #-}
timestamped :: (IsStream t, MonadAsync m, Functor (t m))
=> t m a -> t m (AbsTime, a)
timestamped = timestampWith 0.01
-- | Pair each element in a stream with relative times starting from 0, using a
-- clock with the specified granularity. The time is measured just before the
-- element is consumed.
--
-- @
-- >>> S.mapM_ print $ S.timeIndexWith 0.01 $ S.delay 1 $ S.enumerateFromTo 1 3
-- @
--
-- /Internal/
--
{-# INLINE timeIndexWith #-}
timeIndexWith :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m a -> t m (RelTime64, a)
timeIndexWith g stream = Z.zipWith (flip (,)) stream (relTimesWith g)
-- | Pair each element in a stream with relative times starting from 0, using a
-- 10 ms granularity clock. The time is measured just before the element is
-- consumed.
--
-- @
-- >>> S.mapM_ print $ S.timeIndexed $ S.delay 1 $ S.enumerateFromTo 1 3
-- (RelTime64 (NanoSecond64 0),1)
-- (RelTime64 (NanoSecond64 996239000),2)
-- (RelTime64 (NanoSecond64 1996712000),3)
-- @
--
-- /Internal/
--
{-# INLINE timeIndexed #-}
timeIndexed :: (IsStream t, MonadAsync m, Functor (t m))
=> t m a -> t m (RelTime64, a)
timeIndexed = timeIndexWith 0.01
------------------------------------------------------------------------------
-- Searching
------------------------------------------------------------------------------
-- | Returns the first index that satisfies the given predicate.
--
-- > findIndex = fold Fold.findIndex
--
-- @since 0.5.0
{-# INLINE findIndex #-}
findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int)
findIndex p = head . findIndices p
-- | Returns the first index where a given value is found in the stream.
--
-- > elemIndex a = findIndex (== a)
--
-- @since 0.5.0
{-# INLINE elemIndex #-}
elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int)
elemIndex a = findIndex (== a)
------------------------------------------------------------------------------
-- Substreams
------------------------------------------------------------------------------
-- | Returns 'True' if the first stream is a suffix of the second. A stream is
-- considered a suffix of itself.
--
-- @
-- > S.isSuffixOf (S.fromList "hello") (S.fromList "hello" :: SerialT IO Char)
-- True
-- @
--
-- Space: @O(n)@, buffers entire input stream and the suffix.
--
-- /Internal/
--
-- /Suboptimal/ - Help wanted.
--
{-# INLINE isSuffixOf #-}
isSuffixOf :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m Bool
isSuffixOf suffix stream = reverse suffix `isPrefixOf` reverse stream
-- | Returns 'True' if the first stream is an infix of the second. A stream is
-- considered an infix of itself.
--
-- @
-- > S.isInfixOf (S.fromList "hello") (S.fromList "hello" :: SerialT IO Char)
-- True
-- @
--
-- Space: @O(n)@ worst case where @n@ is the length of the infix.
--
-- /Internal/
--
-- /Requires 'Storable' constraint/ - Help wanted.
--
{-# INLINE isInfixOf #-}
isInfixOf :: (MonadIO m, Eq a, Enum a, Storable a)
=> SerialT m a -> SerialT m a -> m Bool
isInfixOf infx stream = do
arr <- fold A.write infx
-- XXX can use breakOnSeq instead (when available)
r <- null $ drop 1 $ splitOnSeq arr FL.drain stream
return (not r)
-- Note: If we want to return a Maybe value to know whether the
-- suffix/infix was present or not along with the stripped stream then
-- we need to buffer the whole input stream.
--
-- | Drops the given suffix from a stream. Returns 'Nothing' if the stream does
-- not end with the given suffix. Returns @Just nil@ when the suffix is the
-- same as the stream.
--
-- It may be more efficient to convert the stream to an Array and use
-- stripSuffix on that especially if the elements have a Storable or Prim
-- instance.
--
-- Space: @O(n)@, buffers the entire input stream as well as the suffix
--
-- /Internal/
{-# INLINE stripSuffix #-}
stripSuffix
:: (Monad m, Eq a)
=> SerialT m a -> SerialT m a -> m (Maybe (SerialT m a))
stripSuffix m1 m2 = fmap reverse <$> stripPrefix (reverse m1) (reverse m2)
------------------------------------------------------------------------------
-- Split on a delimiter sequence
------------------------------------------------------------------------------
-- XXX use a non-monadic intersperse to remove the MonadAsync constraint.
--
-- | Like 'splitOnSeq' but splits the separator as well, as an infix token.
--
-- > splitOn'_ pat xs = S.toList $ S.splitOn' (A.fromList pat) (FL.toList) (S.fromList xs)
--
-- >>> splitOn'_ "" "hello"
-- > ["h","","e","","l","","l","","o"]
--
-- >>> splitOn'_ "hello" ""
-- > [""]
--
-- >>> splitOn'_ "hello" "hello"
-- > ["","hello",""]
--
-- >>> splitOn'_ "x" "hello"
-- > ["hello"]
--
-- >>> splitOn'_ "h" "hello"
-- > ["","h","ello"]
--
-- >>> splitOn'_ "o" "hello"
-- > ["hell","o",""]
--
-- >>> splitOn'_ "e" "hello"
-- > ["h","e","llo"]
--
-- >>> splitOn'_ "l" "hello"
-- > ["he","l","","l","o"]
--
-- >>> splitOn'_ "ll" "hello"
-- > ["he","ll","o"]
--
-- /Internal/
{-# INLINE splitBySeq #-}
splitBySeq
:: (IsStream t, MonadAsync m, Storable a, Enum a, Eq a)
=> Array a -> Fold m a b -> t m a -> t m b
splitBySeq patt f m =
intersperseM (fold f (A.toStream patt)) $ splitOnSeq patt f m
------------------------------------------------------------------------------
-- Flattening Trees
------------------------------------------------------------------------------
-- | Like 'iterateM' but iterates after mapping a stream generator on the
-- output.
--
-- Yield an input element in the output stream, map a stream generator on it
-- and then do the same on the resulting stream. This can be used for a depth
-- first traversal of a tree like structure.
--
-- Note that 'iterateM' is a special case of 'iterateMapWith':
--
-- @
-- iterateM f = iterateMapWith serial (yieldM . f) . yieldM
-- @
--
-- It can be used to traverse a tree structure. For example, to list a
-- directory tree:
--
-- @
-- Stream.iterateMapWith Stream.serial
-- (either Dir.toEither (const nil))
-- (yield (Left "tmp"))
-- @
--
-- /Internal/
--
{-# INLINE iterateMapWith #-}
iterateMapWith
:: IsStream t
=> (t m a -> t m a -> t m a)
-> (a -> t m a)
-> t m a
-> t m a
iterateMapWith combine f = concatMapWith combine go
where
go x = yield x `combine` concatMapWith combine go (f x)
{-
{-# INLINE iterateUnfold #-}
iterateUnfold :: (IsStream t, MonadAsync m)
=> Unfold m a a -> t m a -> t m a
iterateUnfold unf xs = undefined
-}
------------------------------------------------------------------------------
-- Flattening Graphs
------------------------------------------------------------------------------
-- To traverse graphs we need a state to be carried around in the traversal.
-- For example, we can use a hashmap to store the visited status of nodes.
-- | Like 'iterateMap' but carries a state in the stream generation function.
-- This can be used to traverse graph like structures, we can remember the
-- visited nodes in the state to avoid cycles.
--
-- Note that a combination of 'iterateMap' and 'usingState' can also be used to
-- traverse graphs. However, this function provides a more localized state
-- instead of using a global state.
--
-- See also: 'mfix'
--
-- /Internal/
--
{-# INLINE iterateSmapMWith #-}
iterateSmapMWith
:: (IsStream t, Monad m)
=> (t m a -> t m a -> t m a)
-> (b -> a -> m (b, t m a))
-> m b
-> t m a
-> t m a
iterateSmapMWith combine f initial stream =
concatMap (\b -> concatMapWith combine (go b) stream) (yieldM initial)
where
go b a = yield a `combine` feedback b a
feedback b a =
concatMap
(\(b1, s) -> concatMapWith combine (go b1) s)
(yieldM $ f b a)
------------------------------------------------------------------------------
-- Flattening Lists
------------------------------------------------------------------------------
-- | Given a stream value in the underlying monad, lift and join the underlying
-- monad with the stream monad.
--
-- @
-- concatM = concat . yieldM
-- concatM = concat . lift -- requires @(MonadTrans t)@
-- concatM = join . lift -- requires @(MonadTrans t@, @Monad (t m))@
-- @
--
-- See also: 'concat', 'sequence'
--
-- /Internal/
--
{-# INLINE concatM #-}
concatM :: (IsStream t, Monad m) => m (t m a) -> t m a
concatM generator = concatMapM (\() -> generator) (yield ())
------------------------------------------------------------------------------
-- Either streams
------------------------------------------------------------------------------
-- | In an 'Either' stream iterate on 'Left's. This is a special case of
-- 'iterateMapWith':
--
-- @
-- iterateMapLeftsWith combine f = iterateMapWith combine (either f (const nil))
-- @
--
-- To traverse a directory tree:
--
-- @
-- iterateMapLeftsWith serial Dir.toEither (yield (Left "tmp"))
-- @
--
-- /Internal/
--
{-# INLINE iterateMapLeftsWith #-}
iterateMapLeftsWith
:: IsStream t
=> (t m (Either a b) -> t m (Either a b) -> t m (Either a b))
-> (a -> t m (Either a b))
-> t m (Either a b)
-> t m (Either a b)
iterateMapLeftsWith combine f = iterateMapWith combine (either f (const K.nil))
-------------------------------------------------------------------------------
-- Breaking
-------------------------------------------------------------------------------
-- XXX we can implement this by repeatedly applying the 'lrunFor' fold.
-- XXX add this example after fixing the serial stream rate control
-- >>> S.toList $ S.take 5 $ intervalsOf 1 FL.sum $ constRate 2 $ S.enumerateFrom 1
-- > [3,7,11,15,19]
--
-- | Group the input stream into windows of @n@ second each and then fold each
-- group using the provided fold function.
--
-- @since 0.7.0
{-# INLINE intervalsOf #-}
intervalsOf
:: (IsStream t, MonadAsync m)
=> Double -> Fold m a b -> t m a -> t m b
intervalsOf n f xs =
splitWithSuffix isNothing (FL.lcatMaybes f)
(interjectSuffix n (return Nothing) (Serial.map Just xs))
------------------------------------------------------------------------------
-- Windowed classification
------------------------------------------------------------------------------
-- We divide the stream into windows or chunks in space or time and each window
-- can be associated with a key, all events associated with a particular key in
-- the window can be folded to a single result. The stream can be split into
-- windows by size or by using a split predicate on the elements in the stream.
-- For example, when we receive a closing flag, we can close the window.
--
-- A "chunk" is a space window and a "session" is a time window. Are there any
-- other better short words to describe them. An alternative is to use
-- "swindow" and "twindow". Another word for "session" could be "spell".
--
-- TODO: To mark the position in space or time we can have Indexed or
-- TimeStamped types. That can make it easy to deal with the position indices
-- or timestamps.
------------------------------------------------------------------------------
-- Keyed Sliding Windows
------------------------------------------------------------------------------
{-
{-# INLINABLE classifySlidingChunks #-}
classifySlidingChunks
:: (IsStream t, MonadAsync m, Ord k)
=> Int -- ^ window size
-> Int -- ^ window slide
-> Fold m a b -- ^ Fold to be applied to window events
-> t m (k, a, Bool) -- ^ window key, data, close event
-> t m (k, b)
classifySlidingChunks wsize wslide (Fold step initial extract) str
= undefined
-- XXX Another variant could be to slide the window on an event, e.g. in TCP we
-- slide the send window when an ack is received and we slide the receive
-- window when a sequence is complete. Sliding is stateful in case of TCP,
-- sliding releases the send buffer or makes data available to the user from
-- the receive buffer.
{-# INLINABLE classifySlidingSessions #-}
classifySlidingSessions
:: (IsStream t, MonadAsync m, Ord k)
=> Double -- ^ timer tick in seconds
-> Double -- ^ time window size
-> Double -- ^ window slide
-> Fold m a b -- ^ Fold to be applied to window events
-> t m (k, a, Bool, AbsTime) -- ^ window key, data, close flag, timestamp
-> t m (k, b)
classifySlidingSessions tick interval slide (Fold step initial extract) str
= undefined
-}
------------------------------------------------------------------------------
-- Sliding Window Buffers
------------------------------------------------------------------------------
-- These buffered versions could be faster than concurrent incremental folds of
-- all overlapping windows as in many cases we may not need all the values to
-- compute the fold, we can just compute the result using the old value and new
-- value. However, we may need the buffer once in a while, for example for
-- string search we usually compute the hash incrementally but when the hash
-- matches the hash of the pattern we need to compare the whole string.
--
-- XXX we should be able to implement sequence based splitting combinators
-- using this combinator.
{-
-- | Buffer n elements of the input in a ring buffer. When t new elements are
-- collected, slide the window to remove the same number of oldest elements,
-- insert the new elements, and apply an incremental fold on the sliding
-- window, supplying the outgoing elements, the new ring buffer as arguments.
slidingChunkBuffer
:: (IsStream t, Monad m, Ord a, Storable a)
=> Int -- window size
-> Int -- window slide
-> Fold m (Ring a, Array a) b
-> t m a
-> t m b
slidingChunkBuffer = undefined
-- Buffer n seconds worth of stream elements of the input in a radix tree.
-- Every t seconds, remove the items that are older than n seconds, and apply
-- an incremental fold on the sliding window, supplying the outgoing elements,
-- and the new radix tree buffer as arguments.
slidingSessionBuffer
:: (IsStream t, Monad m, Ord a, Storable a)
=> Int -- window size
-> Int -- tick size
-> Fold m (RTree a, Array a) b
-> t m a
-> t m b
slidingSessionBuffer = undefined
-}
------------------------------------------------------------------------------
-- Keyed Session Windows
------------------------------------------------------------------------------
{-
-- | Keyed variable size space windows. Close the window if we do not receive a
-- window event in the next "spaceout" elements.
{-# INLINABLE classifyChunksBy #-}
classifyChunksBy
:: (IsStream t, MonadAsync m, Ord k)
=> Int -- ^ window spaceout (spread)
-> Bool -- ^ reset the spaceout when a chunk window element is received
-> Fold m a b -- ^ Fold to be applied to chunk window elements
-> t m (k, a, Bool) -- ^ chunk key, data, last element
-> t m (k, b)
classifyChunksBy spanout reset (Fold step initial extract) str = undefined
-- | Like 'classifyChunksOf' but the chunk size is reset if an element is
-- received within the chunk size window. The chunk gets closed only if no
-- element is received within the chunk window.
--
{-# INLINABLE classifyKeepAliveChunks #-}
classifyKeepAliveChunks
:: (IsStream t, MonadAsync m, Ord k)
=> Int -- ^ window spaceout (spread)
-> Fold m a b -- ^ Fold to be applied to chunk window elements
-> t m (k, a, Bool) -- ^ chunk key, data, last element
-> t m (k, b)
classifyKeepAliveChunks spanout = classifyChunksBy spanout True
-}
data SessionState t m k a b = SessionState
{ sessionCurTime :: !AbsTime -- ^ time since last event
, sessionEventTime :: !AbsTime -- ^ time as per last event
, sessionCount :: !Int -- ^ total number sessions in progress
, sessionTimerHeap :: H.Heap (H.Entry AbsTime k) -- ^ heap for timeouts
, sessionKeyValueMap :: Map.Map k a -- ^ Stored sessions for keys
, sessionOutputStream :: t (m :: Type -> Type) (k, b) -- ^ Completed sessions
}
#undef Type
-- XXX Perhaps we should use an "Event a" type to represent timestamped data.
-- | @classifySessionsBy tick timeout idle pred f stream@ groups timestamped
-- events in an input event stream into sessions based on a session key. Each
-- element in the input stream is an event consisting of a triple @(session key,
-- sesssion data, timestamp)@. @session key@ is a key that uniquely identifies
-- the session. All the events belonging to a session are folded using the fold
-- @f@ until the fold terminates or a timeout has occurred. The session key and
-- the result of the fold are emitted in the output stream when the session is
-- purged.
--
-- When @idle@ is 'False', @timeout@ is the maximum lifetime of a session in
-- seconds, measured from the @timestamp@ of the first event in that session.
-- When @idle@ is 'True' then the timeout is an idle timeout, it is reset after
-- every event received in the session.
--
-- @timestamp@ in an event characterizes the time when the input event was
-- generated, this is an absolute time measured from some @Epoch@. The notion
-- of current time is maintained by a monotonic event time clock using the
-- timestamps seen in the input stream. The latest timestamp seen till now is
-- used as the base for the current time. When no new events are seen, a timer
-- is started with a tick duration specified by @tick@. This timer is used to
-- detect session timeouts in the absence of new events.
--
-- The predicate @pred@ is invoked with the current session count, if it
-- returns 'True' a session is ejected from the session cache before inserting
-- a new session. This could be useful to alert or eject sessions when the
-- number of sessions becomes too high.
--
-- /Internal/
--
{-# INLINABLE classifySessionsBy #-}
classifySessionsBy
:: (IsStream t, MonadAsync m, Ord k)
=> Double -- ^ timer tick in seconds
-> Double -- ^ session timeout in seconds
-> Bool -- ^ reset the timeout when an event is received
-> (Int -> m Bool) -- ^ predicate to eject sessions based on session count
-> Fold m a b -- ^ Fold to be applied to session events
-> t m (k, a, AbsTime) -- ^ session key, data, timestamp
-> t m (k, b) -- ^ session key, fold result
classifySessionsBy tick tmout reset ejectPred
(Fold step initial extract) str =
concatMap sessionOutputStream $
scanlMAfter' sstep (return szero) flush stream
where
timeoutMs = toRelTime (round (tmout * 1000) :: MilliSecond64)
tickMs = toRelTime (round (tick * 1000) :: MilliSecond64)
szero = SessionState
{ sessionCurTime = toAbsTime (0 :: MilliSecond64)
, sessionEventTime = toAbsTime (0 :: MilliSecond64)
, sessionCount = 0
, sessionTimerHeap = H.empty
, sessionKeyValueMap = Map.empty
, sessionOutputStream = K.nil
}
-- We can eject sessions based on the current session count to limit
-- memory consumption. There are two possible strategies:
--
-- 1) Eject old sessions or sessions beyond a certain/lower timeout
-- threshold even before timeout, effectively reduce the timeout.
-- 2) Drop creation of new sessions but keep accepting new events for the
-- old ones.
--
-- We use the first strategy as of now.
-- Got a new stream input element
sstep session@SessionState{..} (Just (key, value, timestamp)) = do
-- XXX we should use a heap in pinned memory to scale it to a large
-- size
--
-- To detect session inactivity we keep a timestamp of the latest event
-- in the Map along with the fold result. When we purge the session
-- from the heap we match the timestamp in the heap with the timestamp
-- in the Map, if the latest timestamp is newer and has not expired we
-- reinsert the key in the heap.
--
-- XXX if the key is an Int, we can also use an IntMap for slightly
-- better performance.
--
let curTime = max sessionEventTime timestamp
mOld = Map.lookup key sessionKeyValueMap
fs <-
case mOld of
Nothing -> initial
Just (Tuple' _ acc) -> return acc
res <- step fs value
case res of
FL.Done fb -> do
-- deleting a key from the heap is expensive, so we never
-- delete a key from heap, we just purge it from the Map and it
-- gets purged from the heap on timeout. We just need an extra
-- lookup in the Map when the key is purged from the heap, that
-- should not be expensive.
--
let (mp, cnt) = case mOld of
Nothing -> (sessionKeyValueMap, sessionCount)
Just _ -> (Map.delete key sessionKeyValueMap
, sessionCount - 1)
return $ session
{ sessionCurTime = curTime
, sessionEventTime = curTime
, sessionCount = cnt
, sessionKeyValueMap = mp
, sessionOutputStream = yield (key, fb)
}
FL.Partial fs1 -> do
let acc = Tuple' timestamp fs1
(hp1, mp1, out1, cnt1) <- do
let vars = (sessionTimerHeap, sessionKeyValueMap,
K.nil, sessionCount)
case mOld of
-- inserting new entry
Nothing -> do
-- Eject a session from heap and map is needed
eject <- ejectPred sessionCount
(hp, mp, out, cnt) <-
if eject
then ejectOne vars
else return vars
-- Insert the new session in heap
let expiry = addToAbsTime timestamp timeoutMs
hp' = H.insert (Entry expiry key) hp
in return (hp', mp, out, cnt + 1)
-- updating old entry
Just _ -> return vars
let mp2 = Map.insert key acc mp1
return $ SessionState
{ sessionCurTime = curTime
, sessionEventTime = curTime
, sessionCount = cnt1
, sessionTimerHeap = hp1
, sessionKeyValueMap = mp2
, sessionOutputStream = out1
}
-- Got a timer tick event
sstep sessionState@SessionState{..} Nothing =
let curTime = addToAbsTime sessionCurTime tickMs
in ejectExpired sessionState curTime
flush session@SessionState{..} = do
(hp', mp', out, count) <-
ejectAll
( sessionTimerHeap
, sessionKeyValueMap
, K.nil
, sessionCount
)
return $ session
{ sessionCount = count
, sessionTimerHeap = hp'
, sessionKeyValueMap = mp'
, sessionOutputStream = out
}
-- delete from map and output the fold accumulator
ejectEntry hp mp out cnt acc key = do
sess <- extract acc
let out1 = (key, sess) `K.cons` out
let mp1 = Map.delete key mp
return (hp, mp1, out1, cnt - 1)
ejectAll (hp, mp, out, !cnt) = do
let hres = H.uncons hp
case hres of
Just (Entry _ key, hp1) -> do
r <- case Map.lookup key mp of
Nothing -> return (hp1, mp, out, cnt)
Just (Tuple' _ acc) -> ejectEntry hp1 mp out cnt acc key
ejectAll r
Nothing -> do
assert (Map.null mp) (return ())
return (hp, mp, out, cnt)
ejectOne (hp, mp, out, !cnt) = do
let hres = H.uncons hp
case hres of
Just (Entry expiry key, hp1) ->
case Map.lookup key mp of
Nothing -> ejectOne (hp1, mp, out, cnt)
Just (Tuple' latestTS acc) -> do
let expiry1 = addToAbsTime latestTS timeoutMs
if not reset || expiry1 <= expiry
then ejectEntry hp1 mp out cnt acc key
else
-- reset the session timeout and continue
let hp2 = H.insert (Entry expiry1 key) hp1
in ejectOne (hp2, mp, out, cnt)
Nothing -> do
assert (Map.null mp) (return ())
return (hp, mp, out, cnt)
ejectExpired session@SessionState{..} curTime = do
(hp', mp', out, count) <-
ejectLoop sessionTimerHeap sessionKeyValueMap K.nil sessionCount
return $ session
{ sessionCurTime = curTime
, sessionCount = count
, sessionTimerHeap = hp'
, sessionKeyValueMap = mp'
, sessionOutputStream = out
}
where
ejectLoop hp mp out !cnt = do
let hres = H.uncons hp
case hres of
Just (Entry expiry key, hp1) -> do
(eject, force) <-
if curTime >= expiry
then return (True, False)
else do
r <- ejectPred cnt
return (r, r)
if eject
then
case Map.lookup key mp of
Nothing -> ejectLoop hp1 mp out cnt
Just (Tuple' latestTS acc) -> do
let expiry1 = addToAbsTime latestTS timeoutMs
if expiry1 <= curTime || not reset || force
then do
(hp2,mp1,out1,cnt1) <-
ejectEntry hp1 mp out cnt acc key
ejectLoop hp2 mp1 out1 cnt1
else
-- reset the session timeout and continue
let hp2 = H.insert (Entry expiry1 key) hp1
in ejectLoop hp2 mp out cnt
else return (hp, mp, out, cnt)
Nothing -> do
assert (Map.null mp) (return ())
return (hp, mp, out, cnt)
-- merge timer events in the stream
stream = Serial.map Just str `Par.parallelFst` repeatM timer
timer = do
liftIO $ threadDelay (round $ tick * 1000000)
return Nothing
-- | Like 'classifySessionsOf' but the session is kept alive if an event is
-- received within the session window. The session times out and gets closed
-- only if no event is received within the specified session window size.
--
-- If the ejection predicate returns 'True', the session that was idle for
-- the longest time is ejected before inserting a new session.
--
-- @
-- classifyKeepAliveSessions timeout pred = classifySessionsBy 1 timeout True pred
-- @
--
-- /Internal/
--
{-# INLINABLE classifyKeepAliveSessions #-}
classifyKeepAliveSessions ::
(IsStream t, MonadAsync m, Ord k)
=> Double -- ^ session inactive timeout
-> (Int -> m Bool) -- ^ predicate to eject sessions on session count
-> Fold m a b -- ^ Fold to be applied to session payload data
-> t m (k, a, AbsTime) -- ^ session key, data, timestamp
-> t m (k, b)
classifyKeepAliveSessions tmout =
classifySessionsBy 1 tmout True
------------------------------------------------------------------------------
-- Keyed tumbling windows
------------------------------------------------------------------------------
-- Tumbling windows is a special case of sliding windows where the window slide
-- is the same as the window size. Or it can be a special case of session
-- windows where the reset flag is set to False.
-- XXX instead of using the early termination flag in the stream, we can use an
-- early terminating fold instead.
{-
-- | Split the stream into fixed size chunks of specified size. Within each
-- such chunk fold the elements in buckets identified by the keys. A particular
-- bucket fold can be terminated early if a closing flag is encountered in an
-- element for that key.
--
-- @since 0.7.0
{-# INLINABLE classifyChunksOf #-}
classifyChunksOf
:: (IsStream t, MonadAsync m, Ord k)
=> Int -- ^ window size
-> Fold m a b -- ^ Fold to be applied to window events
-> t m (k, a, Bool) -- ^ window key, data, close event
-> t m (k, b)
classifyChunksOf wsize = classifyChunksBy wsize False
-}
-- | Split the stream into fixed size time windows of specified interval in
-- seconds. Within each such window, fold the elements in sessions identified
-- by the session keys. The fold result is emitted in the output stream if the
-- fold returns a 'Left' result or if the time window ends.
--
-- Session @timestamp@ in the input stream is an absolute time from some epoch,
-- characterizing the time when the input element was generated. To detect
-- session window end, a monotonic event time clock is maintained synced with
-- the timestamps with a clock resolution of 1 second.
--
-- If the ejection predicate returns 'True', the session with the longest
-- lifetime is ejected before inserting a new session.
--
-- @
-- classifySessionsOf interval pred = classifySessionsBy 1 interval False pred
-- @
--
-- @
-- >>> S.mapM_ print
-- $ S.classifySessionsOf 3 (const (return False)) (fmap Right FL.toList)
-- $ S.map (\(ts,(k,a)) -> (k, a, ts))
-- $ S.timestamped
-- $ S.delay 1
-- $ (,) <$> S.fromList [1,2,3] <*> S.fromList [1,2,3]
-- @
--
-- /Internal/
--
{-# INLINABLE classifySessionsOf #-}
classifySessionsOf ::
(IsStream t, MonadAsync m, Ord k)
=> Double -- ^ time window size
-> (Int -> m Bool) -- ^ predicate to eject sessions on session count
-> Fold m a b -- ^ Fold to be applied to session events
-> t m (k, a, AbsTime) -- ^ session key, data, timestamp
-> t m (k, b)
classifySessionsOf interval =
classifySessionsBy 1 interval False

View File

@ -0,0 +1,502 @@
{-# OPTIONS_GHC -Wno-orphans #-}
-- |
-- Module : Streamly.Internal.Data.Stream.IsStream.Common
-- Copyright : (c) 2017 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
module Streamly.Internal.Data.Stream.IsStream.Common
(
-- * Generation
yield
, yieldM
, repeatM
, timesWith
, absTimesWith
, relTimesWith
-- * Elimination
, fold
-- * Transformation
, scanlMAfter'
, postscanlM'
, smapM
-- $smapM_Notes
, take
, takeWhile
, drop
, findIndices
, intersperseM
, interjectSuffix
, reverse
, reverse'
-- * Nesting
, concatM
, concatMapM
, splitOnSeq
)
where
#include "inline.hs"
import Control.Concurrent (threadDelay)
import Control.Monad.IO.Class (MonadIO(..))
import Foreign.Storable (Storable)
import Streamly.Internal.Data.Array.Storable.Foreign.Types (Array)
import Streamly.Internal.Data.Fold.Types (Fold (..))
import Streamly.Internal.Data.Stream.Combinators (maxYields)
import Streamly.Internal.Data.Stream.Prelude (fromStreamS, toStreamS)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.StreamD (fromStreamD, toStreamD)
import Streamly.Internal.Data.Stream.StreamK (IsStream())
import Streamly.Internal.Data.SVar (MonadAsync)
import Streamly.Internal.Data.Time.Units (AbsTime, RelTime64, addToAbsTime64)
import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.Stream.Prelude as P
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Stream.StreamD as D
#ifdef USE_STREAMK_ONLY
import qualified Streamly.Internal.Data.Stream.StreamK as S
#else
import qualified Streamly.Internal.Data.Stream.StreamD as S
#endif
import Prelude hiding (take, takeWhile, drop, reverse)
------------------------------------------------------------------------------
-- Generation
------------------------------------------------------------------------------
-- Faster than yieldM because there is no bind.
--
-- |
-- @
-- yield a = a \`cons` nil
-- @
--
-- Create a singleton stream from a pure value.
--
-- The following holds in monadic streams, but not in Zip streams:
--
-- @
-- yield = pure
-- yield = yieldM . pure
-- @
--
-- In Zip applicative streams 'yield' is not the same as 'pure' because in that
-- case 'pure' is equivalent to 'repeat' instead. 'yield' and 'pure' are
-- equally efficient, in other cases 'yield' may be slightly more efficient
-- than the other equivalent definitions.
--
-- @since 0.4.0
{-# INLINE yield #-}
yield :: IsStream t => a -> t m a
yield = K.yield
-- |
-- @
-- yieldM m = m \`consM` nil
-- @
--
-- Create a singleton stream from a monadic action.
--
-- @
-- > toList $ yieldM getLine
-- hello
-- ["hello"]
-- @
--
-- @since 0.4.0
{-# INLINE yieldM #-}
yieldM :: (Monad m, IsStream t) => m a -> t m a
yieldM = K.yieldM
-- |
-- @
-- repeatM = fix . consM
-- repeatM = cycle1 . yieldM
-- @
--
-- Generate a stream by repeatedly executing a monadic action forever.
--
-- @
-- drain $ serially $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)
-- drain $ asyncly $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)
-- @
--
-- /Concurrent, infinite (do not use with 'parallely')/
--
-- @since 0.2.0
{-# INLINE_EARLY repeatM #-}
repeatM :: (IsStream t, MonadAsync m) => m a -> t m a
repeatM = K.repeatM
{-# RULES "repeatM serial" repeatM = repeatMSerial #-}
{-# INLINE repeatMSerial #-}
repeatMSerial :: MonadAsync m => m a -> SerialT m a
repeatMSerial = fromStreamS . S.repeatM
------------------------------------------------------------------------------
-- Generation - Time related
------------------------------------------------------------------------------
-- | @timesWith g@ returns a stream of time value tuples. The first component
-- of the tuple is an absolute time reference (epoch) denoting the start of the
-- stream and the second component is a time relative to the reference.
--
-- The argument @g@ specifies the granularity of the relative time in seconds.
-- A lower granularity clock gives higher precision but is more expensive in
-- terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.
--
-- @
-- >>> S.mapM_ (\x -> print x >> threadDelay 1000000) $ S.timesWith 0.01
-- > (AbsTime (TimeSpec {sec = 2496295, nsec = 536223000}),RelTime64 (NanoSecond64 0))
-- > (AbsTime (TimeSpec {sec = 2496295, nsec = 536223000}),RelTime64 (NanoSecond64 1002028000))
-- > (AbsTime (TimeSpec {sec = 2496295, nsec = 536223000}),RelTime64 (NanoSecond64 1996656000))
-- @
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Internal/
--
{-# INLINE timesWith #-}
timesWith :: (IsStream t, MonadAsync m) => Double -> t m (AbsTime, RelTime64)
timesWith g = fromStreamD $ D.times g
-- | @absTimesWith g@ returns a stream of absolute timestamps using a clock of
-- granularity @g@ specified in seconds. A low granularity clock is more
-- expensive in terms of CPU usage. Any granularity lower than 1 ms is treated
-- as 1 ms.
--
-- @
-- >>> S.mapM_ print $ S.delayPre 1 $ S.absTimesWith 0.01
-- @
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Internal/
--
{-# INLINE absTimesWith #-}
absTimesWith :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m AbsTime
absTimesWith = fmap (uncurry addToAbsTime64) . timesWith
-- | @relTimesWith g@ returns a stream of relative time values starting from 0,
-- using a clock of granularity @g@ specified in seconds. A low granularity
-- clock is more expensive in terms of CPU usage. Any granularity lower than 1
-- ms is treated as 1 ms.
--
-- @
-- >>> S.mapM_ print $ S.delayPre 1 $ S.relTimesWith 0.01
-- > RelTime64 (NanoSecond64 0)
-- > RelTime64 (NanoSecond64 91139000)
-- > RelTime64 (NanoSecond64 204052000)
-- @
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Internal/
--
{-# INLINE relTimesWith #-}
relTimesWith :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m RelTime64
relTimesWith = fmap snd . timesWith
------------------------------------------------------------------------------
-- Elimination - Running a Fold
------------------------------------------------------------------------------
-- | Fold a stream using the supplied left 'Fold' and reducing the resulting
-- expression strictly at each step. The behavior is similar to 'foldl''. A
-- 'Fold' can terminate early without consuming the full stream. See the
-- documentation of individual 'Fold's for termination behavior.
--
-- >>> S.fold FL.sum (S.enumerateFromTo 1 100)
-- 5050
--
-- @since 0.7.0
{-# INLINE fold #-}
fold :: Monad m => Fold m a b -> SerialT m a -> m b
fold = P.foldOnce
------------------------------------------------------------------------------
-- Transformation
------------------------------------------------------------------------------
-- | @scanlMAfter' accumulate initial done stream@ is like 'scanlM'' except
-- that it provides an additional @done@ function to be applied on the
-- accumulator when the stream stops. The result of @done@ is also emitted in
-- the stream.
--
-- This function can be used to allocate a resource in the beginning of the
-- scan and release it when the stream ends or to flush the internal state of
-- the scan at the end.
--
-- /Internal/
--
{-# INLINE scanlMAfter' #-}
scanlMAfter' :: (IsStream t, Monad m)
=> (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
scanlMAfter' step initial done stream =
fromStreamD $ D.scanlMAfter' step initial done $ toStreamD stream
-- XXX this needs to be concurrent
-- | Like 'postscanl'' but with a monadic step function and a monadic seed.
--
-- /Since: 0.7.0/
--
-- /Since: 0.8.0 (signature change)/
{-# INLINE postscanlM' #-}
postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
postscanlM' step z m = fromStreamD $ D.postscanlM' step z $ toStreamD m
-- $smapM_Notes
--
-- The stateful step function can be simplified to @(s -> a -> m b)@ to provide
-- a read-only environment. However, that would just be 'mapM'.
--
-- The initial action could be @m (s, Maybe b)@, and we can also add a final
-- action @s -> m (Maybe b)@. This can be used to get pre/post scan like
-- functionality and also to flush the state in the end like scanlMAfter'.
-- We can also use it along with a fusible version of bracket to get
-- scanlMAfter' like functionality. See issue #677.
--
-- This can be further generalized to a type similar to Fold/Parser, giving it
-- filtering and parsing capability as well (this is in fact equivalent to
-- parseMany):
--
-- smapM :: (s -> a -> m (Step s b)) -> m s -> t m a -> t m b
--
-- | A stateful 'mapM', equivalent to a left scan, more like mapAccumL.
-- Hopefully, this is a better alternative to @scan@. Separation of state from
-- the output makes it easier to think in terms of a shared state, and also
-- makes it easier to keep the state fully strict and the output lazy.
--
-- See also: 'scanlM''
--
-- /Internal/
--
{-# INLINE smapM #-}
smapM :: (IsStream t, Monad m) =>
(s -> a -> m (s, b))
-> m s
-> t m a
-> t m b
smapM step initial stream =
-- XXX implement this directly instead of using scanlM'
-- Once we have postscanlM' with monadic initial we can use this code
-- let r = postscanlM'
-- (\(s, _) a -> step s a)
-- (fmap (,undefined) initial)
-- stream
let r = postscanlM'
(\(s, _) a -> step s a)
(fmap (,undefined) initial)
stream
in Serial.map snd r
------------------------------------------------------------------------------
-- Transformation - Trimming
------------------------------------------------------------------------------
-- | Take first 'n' elements from the stream and discard the rest.
--
-- @since 0.1.0
{-# INLINE take #-}
take :: (IsStream t, Monad m) => Int -> t m a -> t m a
take n m = fromStreamS $ S.take n $ toStreamS
(maxYields (Just (fromIntegral n)) m)
-- | End the stream as soon as the predicate fails on an element.
--
-- @since 0.1.0
{-# INLINE takeWhile #-}
takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
takeWhile p m = fromStreamS $ S.takeWhile p $ toStreamS m
-- | Discard first 'n' elements from the stream and take the rest.
--
-- @since 0.1.0
{-# INLINE drop #-}
drop :: (IsStream t, Monad m) => Int -> t m a -> t m a
drop n m = fromStreamS $ S.drop n $ toStreamS m
------------------------------------------------------------------------------
-- Searching
------------------------------------------------------------------------------
-- | Find all the indices where the element in the stream satisfies the given
-- predicate.
--
-- > findIndices = fold Fold.findIndices
--
-- @since 0.5.0
{-# INLINE findIndices #-}
findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int
findIndices p m = fromStreamS $ S.findIndices p (toStreamS m)
------------------------------------------------------------------------------
-- Transformation by Inserting
------------------------------------------------------------------------------
-- intersperseM = intersperseBySpan 1
-- | Insert an effect and its output before consuming an element of a stream
-- except the first one.
--
-- @
-- >>> S.toList $ S.trace putChar $ S.intersperseM (putChar '.' >> return ',') $ S.fromList "hello"
-- > h.,e.,l.,l.,o"h,e,l,l,o"
-- @
--
-- @since 0.5.0
{-# INLINE intersperseM #-}
intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
intersperseM m = fromStreamS . S.intersperseM m . toStreamS
-- | Intersperse a monadic action into the input stream after every @n@
-- seconds.
--
-- @
-- > S.drain $ S.interjectSuffix 1 (putChar ',') $ S.mapM (\\x -> threadDelay 1000000 >> putChar x) $ S.fromList "hello"
-- "h,e,l,l,o"
-- @
--
-- /Internal/
{-# INLINE interjectSuffix #-}
interjectSuffix
:: (IsStream t, MonadAsync m)
=> Double -> m a -> t m a -> t m a
interjectSuffix n f xs = xs `Par.parallelFst` repeatM timed
where timed = liftIO (threadDelay (round $ n * 1000000)) >> f
------------------------------------------------------------------------------
-- Transformation by Reordering
------------------------------------------------------------------------------
-- XXX Use a compact region list to temporarily store the list, in both reverse
-- as well as in reverse'.
--
-- /Note:/ 'reverse'' is much faster than this, use that when performance
-- matters.
--
-- > reverse = S.foldlT (flip S.cons) S.nil
--
-- | Returns the elements of the stream in reverse order. The stream must be
-- finite. Note that this necessarily buffers the entire stream in memory.
--
-- /Since 0.7.0 (Monad m constraint)/
--
-- /Since: 0.1.1/
{-# INLINE reverse #-}
reverse :: (IsStream t, Monad m) => t m a -> t m a
reverse s = fromStreamS $ S.reverse $ toStreamS s
-- | Like 'reverse' but several times faster, requires a 'Storable' instance.
--
-- /Internal/
{-# INLINE reverse' #-}
reverse' :: (IsStream t, MonadIO m, Storable a) => t m a -> t m a
reverse' s = fromStreamD $ D.reverse' $ toStreamD s
------------------------------------------------------------------------------
-- Combine streams and flatten
------------------------------------------------------------------------------
-- | Map a stream producing monadic function on each element of the stream
-- and then flatten the results into a single stream. Since the stream
-- generation function is monadic, unlike 'concatMap', it can produce an
-- effect at the beginning of each iteration of the inner loop.
--
-- @since 0.6.0
{-# INLINE concatMapM #-}
concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b
concatMapM f m = fromStreamD $ D.concatMapM (fmap toStreamD . f) (toStreamD m)
-- | Given a stream value in the underlying monad, lift and join the underlying
-- monad with the stream monad.
--
-- @
-- concatM = concat . yieldM
-- concatM = concat . lift -- requires @(MonadTrans t)@
-- concatM = join . lift -- requires @(MonadTrans t@, @Monad (t m))@
-- @
--
-- See also: 'concat', 'sequence'
--
-- /Internal/
--
{-# INLINE concatM #-}
concatM :: (IsStream t, Monad m) => m (t m a) -> t m a
concatM generator = concatMapM (\() -> generator) (yield ())
------------------------------------------------------------------------------
-- Split stream and fold
------------------------------------------------------------------------------
-- | Like 'splitOn' but the separator is a sequence of elements instead of a
-- single element.
--
-- For illustration, let's define a function that operates on pure lists:
--
-- @
-- splitOnSeq' pat xs = S.toList $ S.splitOnSeq (A.fromList pat) (FL.toList) (S.fromList xs)
-- @
--
-- >>> splitOnSeq' "" "hello"
-- > ["h","e","l","l","o"]
--
-- >>> splitOnSeq' "hello" ""
-- > [""]
--
-- >>> splitOnSeq' "hello" "hello"
-- > ["",""]
--
-- >>> splitOnSeq' "x" "hello"
-- > ["hello"]
--
-- >>> splitOnSeq' "h" "hello"
-- > ["","ello"]
--
-- >>> splitOnSeq' "o" "hello"
-- > ["hell",""]
--
-- >>> splitOnSeq' "e" "hello"
-- > ["h","llo"]
--
-- >>> splitOnSeq' "l" "hello"
-- > ["he","","o"]
--
-- >>> splitOnSeq' "ll" "hello"
-- > ["he","o"]
--
-- 'splitOnSeq' is an inverse of 'intercalate'. The following law always holds:
--
-- > intercalate . splitOn == id
--
-- The following law holds when the separator is non-empty and contains none of
-- the elements present in the input lists:
--
-- > splitOn . intercalate == id
--
-- /Internal/
-- XXX We can use a polymorphic vector implemented by Array# to represent the
-- sequence, that way we can avoid the Storable constraint. If we still need
-- Storable Array for performance, we can use a separate splitOnArray API for
-- that. We can also have an API where the sequence itself is a lazy stream, so
-- that we can search files in files for example.
{-# INLINE splitOnSeq #-}
splitOnSeq
:: (IsStream t, MonadIO m, Storable a, Enum a, Eq a)
=> Array a -> Fold m a b -> t m a -> t m b
splitOnSeq patt f m = D.fromStreamD $ D.splitOnSeq patt f (D.toStreamD m)

View File

@ -5,44 +5,70 @@
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
-- This module contains functions ending in the shape:
--
-- @
-- t m a -> m b
-- @
--
-- We call them stream folding functions, they reduce a stream @t m a@ to a
-- monadic value @m b@.
module Streamly.Internal.Data.Stream.IsStream.Eliminate
(
-- * Elimination
-- ** Deconstruction
uncons
, tail
, init
-- * Running a 'Fold'
-- See "Streamly.Internal.Data.Fold".
fold
-- ** Folding
-- ** Right Folds
, foldrM
, foldr
-- ** Left Folds
, foldl'
, foldl1'
, foldlM'
-- ** Composable Left Folds
-- | See "Streamly.Internal.Data.Fold"
, fold
-- ** Parsers
-- | See "Streamly.Internal.Data.Parser"
-- * Running a 'Parser'
-- "Streamly.Internal.Data.Parser".
, parse
, parseK
, parseD
-- ** Concurrent Folds
, foldAsync
, (|$.)
, (|&.)
-- * Stream Deconstruction
-- | foldr and foldl do not provide the remaining stream. 'uncons' is more
-- general, as it can be used to implement those as well. It allows to use
-- the stream one element at a time, and we have the remaining stream all
-- the time.
, uncons
-- * Right Folds
, foldrM
, foldr
-- * Left Folds
, foldl'
, foldl1'
, foldlM'
-- * Specific 'Fold's
-- XXX Move to Data.Fold?
, toStream -- XXX rename to write? buffer?
, toStreamRev -- XXX rename to writeRev? bufferRev?
-- * Specific Fold Functions
-- | Folds as functions of the shape @t m a -> m b@.
--
-- These functions are good to run individually but they do not compose
-- well. Prefer writing folds as the 'Fold' data type. Use folds from
-- "Streamly.Internal.Data.Fold" instead of using the functions in this
-- section.
--
-- This section can possibly be removed in future. Are these better in
-- some case compared to 'Fold'? When the input stream is in CPS style
-- (StreamK) we may want to rewrite the function call to CPS implementation
-- of the fold through these definitions. Will that be more efficient for
-- StreamK?
-- ** Full Folds
-- -- ** To Summary (Full Folds)
, mapM_
, drain
, drainN
, drainWhile
, last
, length
, sum
@ -56,18 +82,6 @@ module Streamly.Internal.Data.Stream.IsStream.Eliminate
, minimum
, the
-- ** Lazy Folds
-- -- ** To Containers (Full Folds)
, toList
, toListRev
, toPure
, toPureRev
-- ** Composable Left Folds
, toStream -- XXX rename to write?
, toStreamRev -- XXX rename to writeRev?
-- ** Partial Folds
-- -- ** To Elements (Partial Folds)
@ -76,8 +90,12 @@ module Streamly.Internal.Data.Stream.IsStream.Eliminate
, (!!)
, head
, headElse
, tail
, init
, findM
, find
, findIndex
, elemIndex
, lookup
-- -- ** To Boolean (Partial Folds)
@ -89,24 +107,41 @@ module Streamly.Internal.Data.Stream.IsStream.Eliminate
, and
, or
-- ** Multi-Stream folds
-- -- ** Lazy Folds
-- ** To Containers
, toList
, toListRev
, toPure -- XXX move toStream to Data.Fold and rename this to toStream?
, toPureRev
-- * Concurrent Folds
, foldAsync
, (|$.)
, (|&.)
-- * Multi-Stream folds
-- Full equivalence
, eqBy
, cmpBy
-- finding subsequences
, isPrefixOf
, isInfixOf
, isSuffixOf
, isSubsequenceOf
-- trimming sequences
, stripPrefix
-- , stripInfix
, stripSuffix
-- * Deprecated
, foldx
, foldxM
, foldr1
, runStream
, runN
, runWhile
, toHandle
)
where
@ -116,20 +151,24 @@ where
import Control.Monad.Catch (MonadThrow)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Functor.Identity (Identity (..))
import Foreign.Storable (Storable)
import Streamly.Internal.Data.Fold.Types (Fold (..))
import Streamly.Internal.Data.Parser (Parser (..))
import Streamly.Internal.Data.SVar (MonadAsync, defState)
import Streamly.Internal.Data.Stream.IsStream.Common
(fold, drop, findIndices, reverse, splitOnSeq, take, takeWhile)
import Streamly.Internal.Data.Stream.Prelude (toStreamS)
import Streamly.Internal.Data.Stream.StreamD (fromStreamD, toStreamD)
import Streamly.Internal.Data.Stream.StreamK (IsStream)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import qualified Streamly.Internal.Data.Array.Storable.Foreign as A
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Stream.Prelude as P
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Parser.ParserK.Types as PRK
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Parser.ParserD as PRD
import qualified Streamly.Internal.Data.Parser.ParserK.Types as PRK
import qualified System.IO as IO
#ifdef USE_STREAMK_ONLY
import qualified Streamly.Internal.Data.Stream.StreamK as S
@ -138,12 +177,10 @@ import qualified Streamly.Internal.Data.Stream.StreamD as S
#endif
import Prelude hiding
( filter, drop, dropWhile, take, takeWhile, zipWith, foldr
, foldl, map, mapM, mapM_, sequence, all, any, sum, product, elem
, notElem, maximum, minimum, head, last, tail, length, null
, reverse, iterate, init, and, or, lookup, foldr1, (!!)
, scanl, scanl1, replicate, concatMap, span, splitAt, break
, repeat, concat, mconcat)
( drop, take, takeWhile, foldr , foldl, mapM_, sequence, all, any, sum
, product, elem, notElem, maximum, minimum, head, last, tail, length
, null , reverse, init, and, or, lookup, foldr1, (!!) , splitAt, break
, mconcat)
------------------------------------------------------------------------------
-- Deconstruction
@ -165,7 +202,7 @@ uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a))
uncons m = K.uncons (K.adapt m)
------------------------------------------------------------------------------
-- Elimination by Folding
-- Right Folds
------------------------------------------------------------------------------
-- | Right associative/lazy pull fold. @foldrM build final stream@ constructs
@ -219,6 +256,10 @@ foldr = P.foldr
foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a)
foldr1 f m = S.foldr1 f (toStreamS m)
------------------------------------------------------------------------------
-- Left Folds
------------------------------------------------------------------------------
-- | Strict left fold with an extraction function. Like the standard strict
-- left fold, but applies a user supplied extraction function (the third
-- argument) to the folded value at the end. This is designed to work with the
@ -274,23 +315,6 @@ foldxM = P.foldlMx'
foldlM' :: Monad m => (b -> a -> m b) -> m b -> SerialT m a -> m b
foldlM' step begin m = S.foldlM' step begin $ toStreamS m
------------------------------------------------------------------------------
-- Running a Fold
------------------------------------------------------------------------------
-- | Fold a stream using the supplied left 'Fold' and reducing the resulting
-- expression strictly at each step. The behavior is similar to 'foldl''. A
-- 'Fold' can terminate early without consuming the full stream. See the
-- documentation of individual 'Fold's for termination behavior.
--
-- >>> S.fold FL.sum (S.enumerateFromTo 1 100)
-- 5050
--
-- @since 0.7.0
{-# INLINE fold #-}
fold :: Monad m => Fold m a b -> SerialT m a -> m b
fold = P.foldOnce
------------------------------------------------------------------------------
-- Running a sink
------------------------------------------------------------------------------
@ -327,9 +351,22 @@ parse :: MonadThrow m => Parser m a b -> SerialT m a -> m b
parse = parseD . PRK.fromParserK
------------------------------------------------------------------------------
-- Specialized folds
-- Specific Fold Functions
------------------------------------------------------------------------------
-- XXX this can utilize parallel mapping if we implement it as drain . mapM
-- |
-- > mapM_ = drain . mapM
--
-- Apply a monadic action to each element of the stream and discard the output
-- of the action. This is not really a pure transformation operation but a
-- transformation followed by fold.
--
-- @since 0.1.0
{-# INLINE mapM_ #-}
mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m ()
mapM_ f m = S.mapM_ f $ toStreamS m
-- |
-- > drain = mapM_ (\_ -> return ())
-- > drain = fold Fold.drain
@ -343,6 +380,50 @@ parse = parseD . PRK.fromParserK
drain :: Monad m => SerialT m a -> m ()
drain = P.drain
-- |
-- > drainN n = drain . take n
-- > drainN n = fold (Fold.ltake n Fold.drain)
--
-- Run maximum up to @n@ iterations of a stream.
--
-- @since 0.7.0
{-# INLINE drainN #-}
drainN :: Monad m => Int -> SerialT m a -> m ()
drainN n = drain . take n
-- |
-- > runN n = runStream . take n
--
-- Run maximum up to @n@ iterations of a stream.
--
-- @since 0.6.0
{-# DEPRECATED runN "Please use \"drainN\" instead" #-}
{-# INLINE runN #-}
runN :: Monad m => Int -> SerialT m a -> m ()
runN = drainN
-- |
-- > drainWhile p = drain . takeWhile p
-- > drainWhile p = fold (Fold.sliceSepBy (not . p) Fold.drain)
--
-- Run a stream as long as the predicate holds true.
--
-- @since 0.7.0
{-# INLINE drainWhile #-}
drainWhile :: Monad m => (a -> Bool) -> SerialT m a -> m ()
drainWhile p = drain . takeWhile p
-- |
-- > runWhile p = runStream . takeWhile p
--
-- Run a stream as long as the predicate holds true.
--
-- @since 0.6.0
{-# DEPRECATED runWhile "Please use \"drainWhile\" instead" #-}
{-# INLINE runWhile #-}
runWhile :: Monad m => (a -> Bool) -> SerialT m a -> m ()
runWhile = drainWhile
-- | Run a stream, discarding the results. By default it interprets the stream
-- as 'SerialT', to run other types of streams use the type adapting
-- combinators for example @runStream . 'asyncly'@.
@ -553,6 +634,10 @@ maximumBy cmp m = S.maximumBy cmp (toStreamS m)
the :: (Eq a, Monad m) => SerialT m a -> m (Maybe a)
the m = S.the (toStreamS m)
------------------------------------------------------------------------------
-- Searching
------------------------------------------------------------------------------
-- | Lookup the element at the given index.
--
-- @since 0.6.0
@ -590,130 +675,26 @@ find p m = S.find p (toStreamS m)
findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a)
findM p m = S.findM p (toStreamS m)
------------------------------------------------------------------------------
-- Substreams
------------------------------------------------------------------------------
-- | Returns the first index that satisfies the given predicate.
--
-- > findIndex = fold Fold.findIndex
--
-- @since 0.5.0
{-# INLINE findIndex #-}
findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int)
findIndex p = head . findIndices p
-- | Returns 'True' if the first stream is the same as or a prefix of the
-- second. A stream is a prefix of itself.
-- | Returns the first index where a given value is found in the stream.
--
-- @
-- > S.isPrefixOf (S.fromList "hello") (S.fromList "hello" :: SerialT IO Char)
-- True
-- @
-- > elemIndex a = findIndex (== a)
--
-- @since 0.6.0
{-# INLINE isPrefixOf #-}
isPrefixOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool
isPrefixOf m1 m2 = D.isPrefixOf (toStreamD m1) (toStreamD m2)
-- Note: isPrefixOf uses the prefix stream only once. In contrast, isSuffixOf
-- may use the suffix stream many times. To run in optimal memory we do not
-- want to buffer the suffix stream in memory therefore we need an ability to
-- clone (or consume it multiple times) the suffix stream without any side
-- effects so that multiple potential suffix matches can proceed in parallel
-- without buffering the suffix stream. For example, we may create the suffix
-- stream from a file handle, however, if we evaluate the stream multiple
-- times, once for each match, we will need a different file handle each time
-- which may exhaust the file descriptors. Instead, we want to share the same
-- underlying file descriptor, use pread on it to generate the stream and clone
-- the stream for each match. Therefore the suffix stream should be built in
-- such a way that it can be consumed multiple times without any problems.
-- XXX Can be implemented with better space/time complexity.
-- Space: @O(n)@ worst case where @n@ is the length of the suffix.
-- | Returns 'True' if all the elements of the first stream occur, in order, in
-- the second stream. The elements do not have to occur consecutively. A stream
-- is a subsequence of itself.
--
-- @
-- > S.isSubsequenceOf (S.fromList "hlo") (S.fromList "hello" :: SerialT IO Char)
-- True
-- @
--
-- @since 0.6.0
{-# INLINE isSubsequenceOf #-}
isSubsequenceOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool
isSubsequenceOf m1 m2 = D.isSubsequenceOf (toStreamD m1) (toStreamD m2)
-- | @stripPrefix prefix stream@ strips @prefix@ from @stream@ if it is a
-- prefix of stream. Returns 'Nothing' if the stream does not start with the
-- given prefix, stripped stream otherwise. Returns @Just nil@ when the prefix
-- is the same as the stream.
--
-- Space: @O(1)@
--
-- @since 0.6.0
{-# INLINE stripPrefix #-}
stripPrefix
:: (Eq a, IsStream t, Monad m)
=> t m a -> t m a -> m (Maybe (t m a))
stripPrefix m1 m2 = fmap fromStreamD <$>
D.stripPrefix (toStreamD m1) (toStreamD m2)
-- @since 0.5.0
{-# INLINE elemIndex #-}
elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int)
elemIndex a = findIndex (== a)
------------------------------------------------------------------------------
-- Concurrent Application
------------------------------------------------------------------------------
infixr 0 |$.
infixl 1 |&.
-- | Parallel fold application operator; applies a fold function @t m a -> m b@
-- to a stream @t m a@ concurrently; The the input stream is evaluated
-- asynchronously in an independent thread yielding elements to a buffer and
-- the folding action runs in another thread consuming the input from the
-- buffer.
--
-- If you read the signature as @(t m a -> m b) -> (t m a -> m b)@ you can look
-- at it as a transformation that converts a fold function to a buffered
-- concurrent fold function.
--
-- The @.@ at the end of the operator is a mnemonic for termination of the
-- stream.
--
-- @
-- S.foldlM' (\\_ a -> threadDelay 1000000 >> print a) ()
-- |$. S.repeatM (threadDelay 1000000 >> return 1)
-- @
--
-- /Concurrent/
--
-- /Since: 0.3.0 ("Streamly")/
--
-- @since 0.8.0
{-# INLINE (|$.) #-}
(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> (t m a -> m b)
-- (|$.) f = f . Async.mkAsync
(|$.) f = f . D.mkParallel
-- | Same as '|$.'.
--
-- /Internal/
--
{-# INLINE foldAsync #-}
foldAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> (t m a -> m b)
foldAsync = (|$.)
-- | Parallel reverse function application operator for applying a run or fold
-- functions to a stream. Just like '|$.' except that the operands are reversed.
--
-- @
-- S.repeatM (threadDelay 1000000 >> return 1)
-- |&. S.foldlM' (\\_ a -> threadDelay 1000000 >> print a) ()
-- @
--
-- /Concurrent/
--
-- /Since: 0.3.0 ("Streamly")/
--
-- @since 0.8.0
{-# INLINE (|&.) #-}
(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b
x |&. f = f |$. x
------------------------------------------------------------------------------
-- Conversions
-- To containers
------------------------------------------------------------------------------
-- |
@ -822,6 +803,191 @@ toPure = foldr K.cons K.nil
toPureRev :: Monad m => SerialT m a -> m (SerialT Identity a)
toPureRev = foldl' (flip K.cons) K.nil
------------------------------------------------------------------------------
-- Concurrent Application
------------------------------------------------------------------------------
-- | Parallel fold application operator; applies a fold function @t m a -> m b@
-- to a stream @t m a@ concurrently; The the input stream is evaluated
-- asynchronously in an independent thread yielding elements to a buffer and
-- the folding action runs in another thread consuming the input from the
-- buffer.
--
-- If you read the signature as @(t m a -> m b) -> (t m a -> m b)@ you can look
-- at it as a transformation that converts a fold function to a buffered
-- concurrent fold function.
--
-- The @.@ at the end of the operator is a mnemonic for termination of the
-- stream.
--
-- @
-- S.foldlM' (\\_ a -> threadDelay 1000000 >> print a) ()
-- |$. S.repeatM (threadDelay 1000000 >> return 1)
-- @
--
-- /Concurrent/
--
-- /Since: 0.3.0 ("Streamly")/
--
-- @since 0.8.0
{-# INLINE (|$.) #-}
(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> (t m a -> m b)
-- (|$.) f = f . Async.mkAsync
(|$.) f = f . D.mkParallel
infixr 0 |$.
-- | Same as '|$.'.
--
-- /Internal/
--
{-# INLINE foldAsync #-}
foldAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> (t m a -> m b)
foldAsync = (|$.)
-- | Parallel reverse function application operator for applying a run or fold
-- functions to a stream. Just like '|$.' except that the operands are reversed.
--
-- @
-- S.repeatM (threadDelay 1000000 >> return 1)
-- |&. S.foldlM' (\\_ a -> threadDelay 1000000 >> print a) ()
-- @
--
-- /Concurrent/
--
-- /Since: 0.3.0 ("Streamly")/
--
-- @since 0.8.0
{-# INLINE (|&.) #-}
(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b
x |&. f = f |$. x
infixl 1 |&.
------------------------------------------------------------------------------
-- Multi-stream folds
------------------------------------------------------------------------------
-- | Returns 'True' if the first stream is the same as or a prefix of the
-- second. A stream is a prefix of itself.
--
-- @
-- > S.isPrefixOf (S.fromList "hello") (S.fromList "hello" :: SerialT IO Char)
-- True
-- @
--
-- @since 0.6.0
{-# INLINE isPrefixOf #-}
isPrefixOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool
isPrefixOf m1 m2 = D.isPrefixOf (toStreamD m1) (toStreamD m2)
-- | Returns 'True' if the first stream is an infix of the second. A stream is
-- considered an infix of itself.
--
-- @
-- > S.isInfixOf (S.fromList "hello") (S.fromList "hello" :: SerialT IO Char)
-- True
-- @
--
-- Space: @O(n)@ worst case where @n@ is the length of the infix.
--
-- /Internal/
--
-- /Requires 'Storable' constraint/ - Help wanted.
--
{-# INLINE isInfixOf #-}
isInfixOf :: (MonadIO m, Eq a, Enum a, Storable a)
=> SerialT m a -> SerialT m a -> m Bool
isInfixOf infx stream = do
arr <- fold A.write infx
-- XXX can use breakOnSeq instead (when available)
r <- null $ drop 1 $ splitOnSeq arr FL.drain stream
return (not r)
-- Note: isPrefixOf uses the prefix stream only once. In contrast, isSuffixOf
-- may use the suffix stream many times. To run in optimal memory we do not
-- want to buffer the suffix stream in memory therefore we need an ability to
-- clone (or consume it multiple times) the suffix stream without any side
-- effects so that multiple potential suffix matches can proceed in parallel
-- without buffering the suffix stream. For example, we may create the suffix
-- stream from a file handle, however, if we evaluate the stream multiple
-- times, once for each match, we will need a different file handle each time
-- which may exhaust the file descriptors. Instead, we want to share the same
-- underlying file descriptor, use pread on it to generate the stream and clone
-- the stream for each match. Therefore the suffix stream should be built in
-- such a way that it can be consumed multiple times without any problems.
-- XXX Can be implemented with better space/time complexity.
-- Space: @O(n)@ worst case where @n@ is the length of the suffix.
-- | Returns 'True' if the first stream is a suffix of the second. A stream is
-- considered a suffix of itself.
--
-- @
-- > S.isSuffixOf (S.fromList "hello") (S.fromList "hello" :: SerialT IO Char)
-- True
-- @
--
-- Space: @O(n)@, buffers entire input stream and the suffix.
--
-- /Internal/
--
-- /Suboptimal/ - Help wanted.
--
{-# INLINE isSuffixOf #-}
isSuffixOf :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m Bool
isSuffixOf suffix stream = reverse suffix `isPrefixOf` reverse stream
-- | Returns 'True' if all the elements of the first stream occur, in order, in
-- the second stream. The elements do not have to occur consecutively. A stream
-- is a subsequence of itself.
--
-- @
-- > S.isSubsequenceOf (S.fromList "hlo") (S.fromList "hello" :: SerialT IO Char)
-- True
-- @
--
-- @since 0.6.0
{-# INLINE isSubsequenceOf #-}
isSubsequenceOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool
isSubsequenceOf m1 m2 = D.isSubsequenceOf (toStreamD m1) (toStreamD m2)
-- Note: If we want to return a Maybe value to know whether the
-- suffix/infix was present or not along with the stripped stream then
-- we need to buffer the whole input stream.
--
-- | @stripPrefix prefix stream@ strips @prefix@ from @stream@ if it is a
-- prefix of stream. Returns 'Nothing' if the stream does not start with the
-- given prefix, stripped stream otherwise. Returns @Just nil@ when the prefix
-- is the same as the stream.
--
-- Space: @O(1)@
--
-- @since 0.6.0
{-# INLINE stripPrefix #-}
stripPrefix
:: (Eq a, IsStream t, Monad m)
=> t m a -> t m a -> m (Maybe (t m a))
stripPrefix m1 m2 = fmap fromStreamD <$>
D.stripPrefix (toStreamD m1) (toStreamD m2)
-- | Drops the given suffix from a stream. Returns 'Nothing' if the stream does
-- not end with the given suffix. Returns @Just nil@ when the suffix is the
-- same as the stream.
--
-- It may be more efficient to convert the stream to an Array and use
-- stripSuffix on that especially if the elements have a Storable or Prim
-- instance.
--
-- Space: @O(n)@, buffers the entire input stream as well as the suffix
--
-- /Internal/
{-# INLINE stripSuffix #-}
stripSuffix
:: (Monad m, Eq a)
=> SerialT m a -> SerialT m a -> m (Maybe (SerialT m a))
stripSuffix m1 m2 = fmap reverse <$> stripPrefix (reverse m1) (reverse m2)
------------------------------------------------------------------------------
-- Comparison
------------------------------------------------------------------------------

View File

@ -8,7 +8,6 @@
module Streamly.Internal.Data.Stream.IsStream.Exception
(
-- * Exceptions
before
, after_
, after
@ -26,20 +25,12 @@ import Control.Exception (Exception)
import Control.Monad.Catch (MonadCatch)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Control (MonadBaseControl)
import Streamly.Internal.Data.SVar (MonadAsync)
import Streamly.Internal.Data.Stream.StreamD (toStreamD)
import Streamly.Internal.Data.Stream.StreamK (IsStream)
import Streamly.Internal.Data.SVar (MonadAsync)
import qualified Streamly.Internal.Data.Stream.StreamD as D
import Prelude hiding
( filter, drop, dropWhile, take, takeWhile, zipWith, foldr
, foldl, map, mapM, mapM_, sequence, all, any, sum, product, elem
, notElem, maximum, minimum, head, last, tail, length, null
, reverse, iterate, init, and, or, lookup, foldr1, (!!)
, scanl, scanl1, replicate, concatMap, span, splitAt, break
, repeat, concat, mconcat)
------------------------------------------------------------------------------
-- Exceptions
------------------------------------------------------------------------------
@ -209,20 +200,3 @@ handle :: (IsStream t, MonadCatch m, Exception e)
=> (e -> t m a) -> t m a -> t m a
handle handler xs =
D.fromStreamD $ D.handle (D.toStreamD . handler) $ D.toStreamD xs
-- Keep concating either streams as long as rights are generated, stop as soon
-- as a left is generated and concat the left stream.
--
-- See also: 'handle'
--
-- /Unimplemented/
--
{-
concatMapEitherWith
:: -- (IsStream t, MonadAsync m) =>
(forall x. t m x -> t m x -> t m x)
-> (a -> t m (Either (t m b) b))
-> t m a
-> t m b
concatMapEitherWith = undefined
-}

View File

@ -7,23 +7,47 @@
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
-- Most of the combinators in this module can be implemented as unfolds. Some
-- of them however can only be expressed in terms StreamK e.g. cons/consM,
-- fromFoldable, mfix. We can possibly remove those from this module which can
-- be expressed as unfolds. Unless we want to use rewrite rules to rewrite them
-- as StreamK when StreamK is used, avoiding conversion to StreamD. Will that
-- help? Are there any other reasons to keep these and not use unfolds?
module Streamly.Internal.Data.Stream.IsStream.Generate
(
-- ** From Values
yield
-- * Primitives
K.nil
, K.nilM
, K.cons
, (K..:)
, consM
, (|:)
-- * From 'Unfold'
, unfold
, unfold0
-- * Unfolding
, unfoldr
, unfoldrM
-- * From Values
, yield
, yieldM
, repeat
, repeatM
, replicate
, replicateM
-- ** Enumeration
-- , Enumerable (..)
-- , enumerate
-- , enumerateTo
-- * Enumeration
, Enumerable (..)
, enumerate
, enumerateTo
-- ** Time Enumeration
-- * Time Enumeration
, times
, absTimes
, absTimesWith
@ -32,33 +56,32 @@ module Streamly.Internal.Data.Stream.IsStream.Generate
, durations
, ticks
, timeout
, currentTime
-- ** From Generators
, unfoldr
, unfoldrM
, unfold
, unfold0
-- * From Generators
, fromIndices
, fromIndicesM
-- , generate
-- , generateM
-- ** Iteration
-- * Iteration
, iterate
, iterateM
-- ** Cyclic Elements
-- * Cyclic Elements
, K.mfix
-- ** From Containers
-- * From Containers
, P.fromList
, fromListM
, K.fromFoldable
, fromFoldableM
, fromPrimIORef
, fromCallback
-- * Deprecated
, each
, fromHandle
, currentTime
)
where
@ -68,37 +91,50 @@ import Control.Monad.IO.Class (MonadIO(..))
import Data.Void (Void)
import Streamly.Internal.Data.Unfold.Types (Unfold)
import Streamly.Internal.Data.SVar (MonadAsync, Rate (..))
import Streamly.Internal.Data.Stream.Enumeration
(Enumerable(..), enumerate, enumerateTo)
import Streamly.Internal.Data.Stream.IsStream.Common
(absTimesWith, concatM, relTimesWith, timesWith, yield, yieldM, repeatM)
import Streamly.Internal.Data.Stream.Prelude (fromStreamS)
import Streamly.Internal.Data.Stream.StreamD (fromStreamD)
import Streamly.Internal.Data.Stream.StreamK (IsStream(consM))
import Streamly.Internal.Data.Stream.StreamK (IsStream((|:), consM))
import Streamly.Internal.Data.Stream.Serial (SerialT, WSerialT)
import Streamly.Internal.Data.Stream.Zip (ZipSerialM)
import Streamly.Internal.Data.Time.Units
( AbsTime
, RelTime64, addToAbsTime64)
import Streamly.Internal.Data.Time.Units (AbsTime , RelTime64, addToAbsTime64)
import Streamly.Internal.Data.IORef.Prim (Prim, IORef)
import qualified Streamly.Internal.Data.Stream.Prelude as P
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Stream.StreamD as D (toStreamK)
import qualified Streamly.Internal.Data.Stream.StreamD.Generate as D
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Prelude
import qualified System.IO as IO
import qualified Streamly.Internal.Data.Stream.StreamD.Generate as D
import qualified Streamly.Internal.Data.Stream.StreamK as K
#ifdef USE_STREAMK_ONLY
import qualified Streamly.Internal.Data.Stream.StreamK as S
#else
import qualified Streamly.Internal.Data.Stream.StreamD as S (toStreamK)
import qualified Streamly.Internal.Data.Stream.StreamD.Generate as S
#endif
import qualified System.IO as IO
import Prelude hiding
( filter, drop, dropWhile, take, takeWhile, zipWith, foldr
, foldl, map, mapM, mapM_, sequence, all, any, sum, product, elem
, notElem, maximum, minimum, head, last, tail, length, null
, reverse, iterate, init, and, or, lookup, foldr1, (!!)
, scanl, scanl1, replicate, concatMap, span, splitAt, break
, repeat, concat, mconcat)
import Prelude hiding (iterate, replicate, repeat)
------------------------------------------------------------------------------
-- From Unfold
------------------------------------------------------------------------------
-- | Convert an 'Unfold' into a stream by supplying it an input seed.
--
-- >>> unfold (UF.replicateM 10) (putStrLn "hello")
--
-- /Since: 0.7.0/
{-# INLINE unfold #-}
unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b
unfold unf x = fromStreamD $ D.unfold unf x
-- | Convert an 'Unfold' with a closed input end into a stream.
--
-- /Internal/
{-# INLINE unfold0 #-}
unfold0 :: (IsStream t, Monad m) => Unfold m Void b -> t m b
unfold0 unf = unfold unf (error "unfold0: unexpected void evaluation")
------------------------------------------------------------------------------
-- Generation by Unfolding
@ -185,70 +221,159 @@ unfoldrMWSerial = Serial.unfoldrM
unfoldrMZipSerial :: MonadAsync m => (b -> m (Maybe (a, b))) -> b -> ZipSerialM m a
unfoldrMZipSerial = Serial.unfoldrM
-- | Convert an 'Unfold' into a stream by supplying it an input seed.
--
-- >>> unfold (UF.replicateM 10) (putStrLn "hello")
--
-- /Since: 0.7.0/
{-# INLINE unfold #-}
unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b
unfold unf x = fromStreamD $ D.unfold unf x
------------------------------------------------------------------------------
-- From Values
------------------------------------------------------------------------------
-- | Convert an 'Unfold' with a closed input end into a stream.
-- |
-- Generate an infinite stream by repeating a pure value.
--
-- @since 0.4.0
{-# INLINE_NORMAL repeat #-}
repeat :: (IsStream t, Monad m) => a -> t m a
repeat = fromStreamS . S.repeat
-- |
-- @
-- replicate = take n . repeat
-- @
--
-- Generate a stream of length @n@ by repeating a value @n@ times.
--
-- @since 0.6.0
{-# INLINE_NORMAL replicate #-}
replicate :: (IsStream t, Monad m) => Int -> a -> t m a
replicate n = fromStreamS . S.replicate n
-- |
-- @
-- replicateM = take n . repeatM
-- @
--
-- Generate a stream by performing a monadic action @n@ times. Same as:
--
-- @
-- drain $ serially $ S.replicateM 10 $ (threadDelay 1000000 >> print 1)
-- drain $ asyncly $ S.replicateM 10 $ (threadDelay 1000000 >> print 1)
-- @
--
-- /Concurrent/
--
-- @since 0.1.1
{-# INLINE_EARLY replicateM #-}
replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a
replicateM = K.replicateM
{-# RULES "replicateM serial" replicateM = replicateMSerial #-}
{-# INLINE replicateMSerial #-}
replicateMSerial :: MonadAsync m => Int -> m a -> SerialT m a
replicateMSerial n = fromStreamS . S.replicateM n
------------------------------------------------------------------------------
-- Time Enumeration
------------------------------------------------------------------------------
-- | @times@ returns a stream of time value tuples with clock of 10 ms
-- granularity. The first component of the tuple is an absolute time reference
-- (epoch) denoting the start of the stream and the second component is a time
-- relative to the reference.
--
-- @
-- >>> S.mapM_ (\x -> print x >> threadDelay 1000000) $ S.times
-- > (AbsTime (TimeSpec {sec = 2496295, nsec = 536223000}),RelTime64 (NanoSecond64 0))
-- > (AbsTime (TimeSpec {sec = 2496295, nsec = 536223000}),RelTime64 (NanoSecond64 1002028000))
-- > (AbsTime (TimeSpec {sec = 2496295, nsec = 536223000}),RelTime64 (NanoSecond64 1996656000))
-- @
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Internal/
{-# INLINE unfold0 #-}
unfold0 :: (IsStream t, Monad m) => Unfold m Void b -> t m b
unfold0 unf = unfold unf (error "unfold0: unexpected void evaluation")
--
{-# INLINE times #-}
times :: (IsStream t, MonadAsync m) => t m (AbsTime, RelTime64)
times = timesWith 0.01
-- | @absTimes@ returns a stream of absolute timestamps using a clock of 10 ms
-- granularity.
--
-- @
-- >>> S.mapM_ print $ S.delayPre 1 $ S.absTimes
-- @
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Internal/
--
{-# INLINE absTimes #-}
absTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m AbsTime
absTimes = fmap (uncurry addToAbsTime64) times
{-# DEPRECATED currentTime "Please use absTimes instead" #-}
{-# INLINE currentTime #-}
currentTime :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m AbsTime
currentTime = absTimesWith
-- | @relTimes@ returns a stream of relative time values starting from 0,
-- using a clock of granularity 10 ms.
--
-- @
-- >>> S.mapM_ print $ S.delayPre 1 $ S.relTimes
-- @
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Internal/
--
{-# INLINE relTimes #-}
relTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m RelTime64
relTimes = fmap snd times
-- | @durations g@ returns a stream of relative time values measuring the time
-- elapsed since the immediate predecessor element of the stream was generated.
-- The first element of the stream is always 0. @durations@ uses a clock of
-- granularity @g@ specified in seconds. A low granularity clock is more
-- expensive in terms of CPU usage. The minimum granularity is 1 millisecond.
-- Durations lower than 1 ms will be 0.
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Unimplemented/
--
{-# INLINE durations #-}
durations :: -- (IsStream t, MonadAsync m) =>
Double -> t m RelTime64
durations = undefined
-- | Generate ticks at the specified rate. The rate is adaptive, the tick
-- generation speed can be increased or decreased at different times to achieve
-- the specified rate. The specific behavior for different styles of 'Rate'
-- specifications is documented under 'Rate'. The effective maximum rate
-- achieved by a stream is governed by the processor speed.
--
-- /Unimplemented/
--
{-# INLINE ticks #-}
ticks :: -- (IsStream t, MonadAsync m) =>
Rate -> t m ()
ticks = undefined
-- | Generate a singleton event at or after the specified absolute time. Note
-- that this is different from a threadDelay, a threadDelay starts from the
-- time when the action is evaluated, whereas if we use AbsTime based timeout
-- it will immediately expire if the action is evaluated too late.
--
-- /Unimplemented/
--
{-# INLINE timeout #-}
timeout :: -- (IsStream t, MonadAsync m) =>
AbsTime -> t m ()
timeout = undefined
------------------------------------------------------------------------------
-- Specialized Generation
-- From Generator functions
------------------------------------------------------------------------------
-- Faster than yieldM because there is no bind.
--
-- |
-- @
-- yield a = a \`cons` nil
-- @
--
-- Create a singleton stream from a pure value.
--
-- The following holds in monadic streams, but not in Zip streams:
--
-- @
-- yield = pure
-- yield = yieldM . pure
-- @
--
-- In Zip applicative streams 'yield' is not the same as 'pure' because in that
-- case 'pure' is equivalent to 'repeat' instead. 'yield' and 'pure' are
-- equally efficient, in other cases 'yield' may be slightly more efficient
-- than the other equivalent definitions.
--
-- @since 0.4.0
{-# INLINE yield #-}
yield :: IsStream t => a -> t m a
yield = K.yield
-- |
-- @
-- yieldM m = m \`consM` nil
-- @
--
-- Create a singleton stream from a monadic action.
--
-- @
-- > toList $ yieldM getLine
-- hello
-- ["hello"]
-- @
--
-- @since 0.4.0
{-# INLINE yieldM #-}
yieldM :: (Monad m, IsStream t) => m a -> t m a
yieldM = K.yieldM
-- |
-- @
-- fromIndices f = let g i = f i \`cons` g (i + 1) in g 0
@ -288,74 +413,9 @@ fromIndicesM = K.fromIndicesM
fromIndicesMSerial :: MonadAsync m => (Int -> m a) -> SerialT m a
fromIndicesMSerial = fromStreamS . S.fromIndicesM
-- |
-- @
-- replicateM = take n . repeatM
-- @
--
-- Generate a stream by performing a monadic action @n@ times. Same as:
--
-- @
-- drain $ serially $ S.replicateM 10 $ (threadDelay 1000000 >> print 1)
-- drain $ asyncly $ S.replicateM 10 $ (threadDelay 1000000 >> print 1)
-- @
--
-- /Concurrent/
--
-- @since 0.1.1
{-# INLINE_EARLY replicateM #-}
replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a
replicateM = K.replicateM
{-# RULES "replicateM serial" replicateM = replicateMSerial #-}
{-# INLINE replicateMSerial #-}
replicateMSerial :: MonadAsync m => Int -> m a -> SerialT m a
replicateMSerial n = fromStreamS . S.replicateM n
-- |
-- @
-- replicate = take n . repeat
-- @
--
-- Generate a stream of length @n@ by repeating a value @n@ times.
--
-- @since 0.6.0
{-# INLINE_NORMAL replicate #-}
replicate :: (IsStream t, Monad m) => Int -> a -> t m a
replicate n = fromStreamS . S.replicate n
-- |
-- Generate an infinite stream by repeating a pure value.
--
-- @since 0.4.0
{-# INLINE_NORMAL repeat #-}
repeat :: (IsStream t, Monad m) => a -> t m a
repeat = fromStreamS . S.repeat
-- |
-- @
-- repeatM = fix . consM
-- repeatM = cycle1 . yieldM
-- @
--
-- Generate a stream by repeatedly executing a monadic action forever.
--
-- @
-- drain $ serially $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)
-- drain $ asyncly $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)
-- @
--
-- /Concurrent, infinite (do not use with 'parallely')/
--
-- @since 0.2.0
{-# INLINE_EARLY repeatM #-}
repeatM :: (IsStream t, MonadAsync m) => m a -> t m a
repeatM = K.repeatM
{-# RULES "repeatM serial" repeatM = repeatMSerial #-}
{-# INLINE repeatMSerial #-}
repeatMSerial :: MonadAsync m => m a -> SerialT m a
repeatMSerial = fromStreamS . S.repeatM
------------------------------------------------------------------------------
-- Iterating functions
------------------------------------------------------------------------------
-- |
-- @
@ -416,21 +476,6 @@ iterateMSerial step = fromStreamS . S.iterateM step
-- Conversions
------------------------------------------------------------------------------
-- |
-- @
-- fromListM = 'Prelude.foldr' 'K.consM' 'K.nil'
-- @
--
-- Construct a stream from a list of monadic actions. This is more efficient
-- than 'fromFoldableM' for serial streams.
--
-- @since 0.4.0
{-# INLINE_EARLY fromListM #-}
fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a
fromListM = fromStreamD . D.fromListM
{-# RULES "fromListM fallback to StreamK" [1]
forall a. D.toStreamK (D.fromListM a) = fromFoldableM a #-}
-- |
-- @
-- fromFoldableM = 'Prelude.foldr' 'consM' 'K.nil'
@ -450,6 +495,21 @@ fromListM = fromStreamD . D.fromListM
fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a
fromFoldableM = Prelude.foldr consM K.nil
-- |
-- @
-- fromListM = 'Prelude.foldr' 'K.consM' 'K.nil'
-- @
--
-- Construct a stream from a list of monadic actions. This is more efficient
-- than 'fromFoldableM' for serial streams.
--
-- @since 0.4.0
{-# INLINE_EARLY fromListM #-}
fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a
fromListM = fromStreamD . D.fromListM
{-# RULES "fromListM fallback to StreamK" [1]
forall a. D.toStreamK (D.fromListM a) = fromFoldableM a #-}
-- | Same as 'fromFoldable'.
--
-- @since 0.1.0
@ -482,165 +542,15 @@ fromHandle h = go
fromPrimIORef :: (IsStream t, MonadIO m, Prim a) => IORef a -> t m a
fromPrimIORef = fromStreamD . D.fromPrimIORef
------------------------------------------------------------------------------
-- Time related
------------------------------------------------------------------------------
-- | @timesWith g@ returns a stream of time value tuples. The first component
-- of the tuple is an absolute time reference (epoch) denoting the start of the
-- stream and the second component is a time relative to the reference.
--
-- The argument @g@ specifies the granularity of the relative time in seconds.
-- A lower granularity clock gives higher precision but is more expensive in
-- terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.
--
-- @
-- >>> S.mapM_ (\x -> print x >> threadDelay 1000000) $ S.timesWith 0.01
-- > (AbsTime (TimeSpec {sec = 2496295, nsec = 536223000}),RelTime64 (NanoSecond64 0))
-- > (AbsTime (TimeSpec {sec = 2496295, nsec = 536223000}),RelTime64 (NanoSecond64 1002028000))
-- > (AbsTime (TimeSpec {sec = 2496295, nsec = 536223000}),RelTime64 (NanoSecond64 1996656000))
-- @
--
-- Note: This API is not safe on 32-bit machines.
-- | Takes a callback setter function and provides it with a callback. The
-- callback when invoked adds a value at the tail of the stream. Returns a
-- stream of values generated by the callback.
--
-- /Internal/
--
{-# INLINE timesWith #-}
timesWith :: (IsStream t, MonadAsync m) => Double -> t m (AbsTime, RelTime64)
timesWith g = fromStreamD $ D.times g
-- | @times@ returns a stream of time value tuples with clock of 10 ms
-- granularity. The first component of the tuple is an absolute time reference
-- (epoch) denoting the start of the stream and the second component is a time
-- relative to the reference.
--
-- @
-- >>> S.mapM_ (\x -> print x >> threadDelay 1000000) $ S.times
-- > (AbsTime (TimeSpec {sec = 2496295, nsec = 536223000}),RelTime64 (NanoSecond64 0))
-- > (AbsTime (TimeSpec {sec = 2496295, nsec = 536223000}),RelTime64 (NanoSecond64 1002028000))
-- > (AbsTime (TimeSpec {sec = 2496295, nsec = 536223000}),RelTime64 (NanoSecond64 1996656000))
-- @
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Internal/
--
{-# INLINE times #-}
times :: (IsStream t, MonadAsync m) => t m (AbsTime, RelTime64)
times = timesWith 0.01
-- | @absTimesWith g@ returns a stream of absolute timestamps using a clock of
-- granularity @g@ specified in seconds. A low granularity clock is more
-- expensive in terms of CPU usage. Any granularity lower than 1 ms is treated
-- as 1 ms.
--
-- @
-- >>> S.mapM_ print $ S.delayPre 1 $ S.absTimesWith 0.01
-- @
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Internal/
--
{-# INLINE absTimesWith #-}
absTimesWith :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m AbsTime
absTimesWith = fmap (uncurry addToAbsTime64) . timesWith
-- | @absTimes@ returns a stream of absolute timestamps using a clock of 10 ms
-- granularity.
--
-- @
-- >>> S.mapM_ print $ S.delayPre 1 $ S.absTimes
-- @
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Internal/
--
{-# INLINE absTimes #-}
absTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m AbsTime
absTimes = fmap (uncurry addToAbsTime64) times
{-# DEPRECATED currentTime "Please use absTimes instead" #-}
{-# INLINE currentTime #-}
currentTime :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m AbsTime
currentTime = absTimesWith
-- | @relTimesWith g@ returns a stream of relative time values starting from 0,
-- using a clock of granularity @g@ specified in seconds. A low granularity
-- clock is more expensive in terms of CPU usage. Any granularity lower than 1
-- ms is treated as 1 ms.
--
-- @
-- >>> S.mapM_ print $ S.delayPre 1 $ S.relTimesWith 0.01
-- > RelTime64 (NanoSecond64 0)
-- > RelTime64 (NanoSecond64 91139000)
-- > RelTime64 (NanoSecond64 204052000)
-- @
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Internal/
--
{-# INLINE relTimesWith #-}
relTimesWith :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m RelTime64
relTimesWith = fmap snd . timesWith
-- | @relTimes@ returns a stream of relative time values starting from 0,
-- using a clock of granularity 10 ms.
--
-- @
-- >>> S.mapM_ print $ S.delayPre 1 $ S.relTimes
-- @
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Internal/
--
{-# INLINE relTimes #-}
relTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m RelTime64
relTimes = fmap snd times
-- | @durations g@ returns a stream of relative time values measuring the time
-- elapsed since the immediate predecessor element of the stream was generated.
-- The first element of the stream is always 0. @durations@ uses a clock of
-- granularity @g@ specified in seconds. A low granularity clock is more
-- expensive in terms of CPU usage. The minimum granularity is 1 millisecond.
-- Durations lower than 1 ms will be 0.
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Unimplemented/
--
{-# INLINE durations #-}
durations :: -- (IsStream t, MonadAsync m) =>
Double -> t m RelTime64
durations = undefined
-- | Generate ticks at the specified rate. The rate is adaptive, the tick
-- generation speed can be increased or decreased at different times to achieve
-- the specified rate. The specific behavior for different styles of 'Rate'
-- specifications is documented under 'Rate'. The effective maximum rate
-- achieved by a stream is governed by the processor speed.
--
-- /Unimplemented/
--
{-# INLINE ticks #-}
ticks :: -- (IsStream t, MonadAsync m) =>
Rate -> t m ()
ticks = undefined
-- | Generate a singleton event at or after the specified absolute time. Note
-- that this is different from a threadDelay, a threadDelay starts from the
-- time when the action is evaluated, whereas if we use AbsTime based timeout
-- it will immediately expire if the action is evaluated too late.
--
-- /Unimplemented/
--
{-# INLINE timeout #-}
timeout :: -- (IsStream t, MonadAsync m) =>
AbsTime -> t m ()
timeout = undefined
{-# INLINE fromCallback #-}
fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> SerialT m a
fromCallback setCallback = concatM $ do
(callback, stream) <- D.newCallbackStream
setCallback callback
return stream

View File

@ -0,0 +1,148 @@
-- |
-- Module : Streamly.Internal.Data.Stream.IsStream.Lift
-- Copyright : (c) 2020 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
module Streamly.Internal.Data.Stream.IsStream.Lift
(
-- * Generalize Inner Monad
hoist
, generally
-- * Transform Inner Monad
, liftInner
, usingReaderT
, runReaderT
, evalStateT
, usingStateT
, runStateT
)
where
#include "inline.hs"
import Control.Monad.Trans.Reader (ReaderT)
import Control.Monad.Trans.State.Strict (StateT)
import Control.Monad.Trans.Class (MonadTrans(..))
import Data.Functor.Identity (Identity (..))
import Streamly.Internal.Data.Stream.Prelude (fromStreamS, toStreamS)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.StreamD (fromStreamD, toStreamD)
import Streamly.Internal.Data.Stream.StreamK (IsStream)
import qualified Streamly.Internal.Data.Stream.StreamD as D
#ifdef USE_STREAMK_ONLY
import qualified Streamly.Internal.Data.Stream.StreamK as S
#else
import qualified Streamly.Internal.Data.Stream.StreamD as S
#endif
------------------------------------------------------------------------------
-- Generalize the underlying monad
------------------------------------------------------------------------------
-- | Transform the inner monad of a stream using a natural transformation.
--
-- / Internal/
--
{-# INLINE hoist #-}
hoist :: (Monad m, Monad n)
=> (forall x. m x -> n x) -> SerialT m a -> SerialT n a
hoist f xs = fromStreamS $ S.hoist f (toStreamS xs)
-- | Generalize the inner monad of the stream from 'Identity' to any monad.
--
-- / Internal/
--
{-# INLINE generally #-}
generally :: (IsStream t, Monad m) => t Identity a -> t m a
generally xs = fromStreamS $ S.hoist (return . runIdentity) (toStreamS xs)
------------------------------------------------------------------------------
-- Add and remove a monad transformer
------------------------------------------------------------------------------
-- | Lift the inner monad @m@ of a stream @t m a@ to @tr m@ using the monad
-- transformer @tr@.
--
-- / Internal/
--
{-# INLINE liftInner #-}
liftInner :: (Monad m, IsStream t, MonadTrans tr, Monad (tr m))
=> t m a -> t (tr m) a
liftInner xs = fromStreamD $ D.liftInner (toStreamD xs)
------------------------------------------------------------------------------
-- Sharing read only state in a stream
------------------------------------------------------------------------------
-- | Evaluate the inner monad of a stream as 'ReaderT'.
--
-- / Internal/
--
{-# INLINE runReaderT #-}
runReaderT :: (IsStream t, Monad m) => m s -> t (ReaderT s m) a -> t m a
runReaderT s xs = fromStreamD $ D.runReaderT s (toStreamD xs)
-- | Run a stream transformation using a given environment.
--
-- See also: 'Serial.map'
--
-- / Internal/
--
{-# INLINE usingReaderT #-}
usingReaderT
:: (Monad m, IsStream t)
=> m r
-> (t (ReaderT r m) a -> t (ReaderT r m) a)
-> t m a
-> t m a
usingReaderT r f xs = runReaderT r $ f $ liftInner xs
------------------------------------------------------------------------------
-- Sharing read write state in a stream
------------------------------------------------------------------------------
-- | Evaluate the inner monad of a stream as 'StateT'.
--
-- This is supported only for 'SerialT' as concurrent state updation may not be
-- safe.
--
-- / Internal/
--
{-# INLINE evalStateT #-}
evalStateT :: Monad m => m s -> SerialT (StateT s m) a -> SerialT m a
evalStateT s xs = fromStreamD $ D.evalStateT s (toStreamD xs)
-- | Run a stateful (StateT) stream transformation using a given state.
--
-- This is supported only for 'SerialT' as concurrent state updation may not be
-- safe.
--
-- See also: 'scanl''
--
-- / Internal/
--
{-# INLINE usingStateT #-}
usingStateT
:: Monad m
=> m s
-> (SerialT (StateT s m) a -> SerialT (StateT s m) a)
-> SerialT m a
-> SerialT m a
usingStateT s f xs = evalStateT s $ f $ liftInner xs
-- | Evaluate the inner monad of a stream as 'StateT' and emit the resulting
-- state and value pair after each step.
--
-- This is supported only for 'SerialT' as concurrent state updation may not be
-- safe.
--
-- / Internal/
--
{-# INLINE runStateT #-}
runStateT :: Monad m => m s -> SerialT (StateT s m) a -> SerialT m (s, a)
runStateT s xs = fromStreamD $ D.runStateT s (toStreamD xs)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -3,9 +3,7 @@
-- |
-- Module : Streamly.Internal.Data.Stream.StreamD
-- Copyright : (c) 2018 Composewell Technologies
-- (c) Roman Leshchinskiy 2008-2010
-- (c) The University of Glasgow, 2009
-- License : BSD3
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
@ -307,10 +305,12 @@ import Prelude hiding
, notElem, null, head, tail, zipWith, lookup, foldr1, sequence
, (!!), scanl, scanl1, concatMap, replicate, enumFromTo, concat
, reverse, iterate, splitAt)
-- XXX We could just export these modules directly
import Streamly.Internal.Data.Stream.StreamD.Type
import Streamly.Internal.Data.Stream.StreamD.Generate
import Streamly.Internal.Data.Stream.StreamD.Eliminate
import Streamly.Internal.Data.Stream.StreamD.Exception
import Streamly.Internal.Data.Stream.StreamD.Lift
import Streamly.Internal.Data.Stream.StreamD.Nesting
import Streamly.Internal.Data.Stream.StreamD.Transform

View File

@ -1,37 +1,42 @@
#include "inline.hs"
-- |
-- Module : Streamly.Internal.Data.Stream.StreamD.Eliminate
-- Copyright : (c) 2018 Composewell Technologies
-- (c) Roman Leshchinskiy 2008-2010
-- (c) The University of Glasgow, 2009
-- License : BSD3
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
-- A few functions in this module have been adapted from the vector package
-- (c) Roman Leshchinskiy.
--
module Streamly.Internal.Data.Stream.StreamD.Eliminate
(
-- * Elimination
-- ** General Folds
foldrM
, foldrMx
-- * Running a 'Fold'
foldOnce -- XXX rename to "fold"
-- -- * Running a 'Parser'
-- , parse
-- * Stream Deconstruction
, uncons
-- * Right Folds
, foldrM
, foldr
, foldrMx
, foldr1
, foldl'
-- * Left Folds
, foldlM'
, toList
, toListRev
, foldlx'
, foldl'
, foldlMx'
, foldOnce
, foldlx'
, parselMx'
-- ** Specialized Folds
-- * Specific Fold Functions
, drain
, mapM_ -- Map and Fold
, null
, head
, headElse
@ -45,30 +50,33 @@ module Streamly.Internal.Data.Stream.StreamD.Eliminate
, maximumBy
, minimum
, minimumBy
, findIndices
, lookup
, findM
, find
, (!!)
, the
-- * To containers
, toList
, toListRev
, toSVarParallel
-- * Multi-Stream Folds
-- ** Comparisons
-- | These should probably be expressed using zipping operations.
, eqBy
, cmpBy
-- ** Transformation comprehensions
, the
-- ** Substreams
-- | These should probably be expressed using parsers.
, isPrefixOf
, isSubsequenceOf
, stripPrefix
-- ** Map and Fold
, mapM_
)
where
#include "inline.hs"
import Control.Exception (assert)
import Control.Monad.Catch (MonadThrow, throwM)
import GHC.Exts (SpecConstrAnnotation(..))
@ -100,7 +108,7 @@ foldr1 f m = do
Just (h, t) -> fmap Just (foldr f h t)
------------------------------------------------------------------------------
-- Parses
-- Parsers
------------------------------------------------------------------------------
-- Inlined definition. Without the inline "serially/parser/take" benchmark
@ -381,18 +389,6 @@ findM p m = foldrM (\x xs -> p x >>= \r -> if r then return (Just x) else xs)
find :: Monad m => (a -> Bool) -> Stream m a -> m (Maybe a)
find p = findM (return . p)
{-# INLINE_NORMAL findIndices #-}
findIndices :: Monad m => (a -> Bool) -> Stream m a -> Stream m Int
findIndices p (Stream step state) = Stream step' (state, 0)
where
{-# INLINE_LATE step' #-}
step' gst (st, i) = i `seq` do
r <- step (adaptState gst) st
return $ case r of
Yield x s -> if p x then Yield i (s, i+1) else Skip (s, i+1)
Skip s -> Skip (s, i)
Stop -> Stop
{-# INLINE toListRev #-}
toListRev :: Monad m => Stream m a -> m [a]
toListRev = foldl' (flip (:)) []
@ -420,7 +416,16 @@ the (Stream step state) = go state
Stop -> return (Just n)
------------------------------------------------------------------------------
-- Substreams
-- Map and Fold
------------------------------------------------------------------------------
-- | Execute a monadic action for each element of the 'Stream'
{-# INLINE_NORMAL mapM_ #-}
mapM_ :: Monad m => (a -> m b) -> Stream m a -> m ()
mapM_ m = drain . mapM m
------------------------------------------------------------------------------
-- Multi-stream folds
------------------------------------------------------------------------------
{-# INLINE_NORMAL isPrefixOf #-}
@ -487,12 +492,3 @@ stripPrefix (Stream stepa ta) (Stream stepb tb) = go (ta, tb, Nothing)
else return Nothing
Skip sb' -> go (sa, sb', Just x)
Stop -> return Nothing
------------------------------------------------------------------------------
-- Map and Fold
------------------------------------------------------------------------------
-- | Execute a monadic action for each element of the 'Stream'
{-# INLINE_NORMAL mapM_ #-}
mapM_ :: Monad m => (a -> m b) -> Stream m a -> m ()
mapM_ m = drain . mapM m

View File

@ -1,8 +1,6 @@
#include "inline.hs"
-- |
-- Module : Streamly.Internal.Data.Stream.StreamD.Exception
-- Copyright : (c) 2020 Composewell Technologies
-- Copyright : (c) 2020 Composewell Technologies and Contributors
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
@ -10,10 +8,12 @@
module Streamly.Internal.Data.Stream.StreamD.Exception
(
-- * Exceptions
-- * Finalizers
newFinalizedIORef
, runIORefFinalizer
, withIORefFinalizer
-- * Combinators
, gbracket_
, gbracket
, before
@ -29,9 +29,9 @@ module Streamly.Internal.Data.Stream.StreamD.Exception
)
where
#include "inline.hs"
import Control.Exception
(Exception, SomeException, mask_)
import Control.Exception (Exception, SomeException, mask_)
import Control.Monad (void)
import Control.Monad.Catch (MonadCatch)
import Control.Monad.IO.Class (MonadIO(..))
@ -40,72 +40,9 @@ import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef, IORef)
import qualified Control.Monad.Catch as MC
import Prelude hiding
( map, mapM, mapM_, repeat, foldr, last, take, filter
, takeWhile, drop, dropWhile, all, any, maximum, minimum, elem
, notElem, null, head, tail, zipWith, lookup, foldr1, sequence
, (!!), scanl, scanl1, concatMap, replicate, enumFromTo, concat
, reverse, iterate, splitAt)
import Streamly.Internal.Data.Stream.StreamD.Type
import Streamly.Internal.Data.SVar
------------------------------------------------------------------------------
-- Exceptions
------------------------------------------------------------------------------
data GbracketState s1 s2 v
= GBracketInit
| GBracketNormal s1 v
| GBracketException s2
-- | Like 'gbracket' but with following differences:
--
-- * alloc action @m c@ runs with async exceptions enabled
-- * cleanup action @c -> m d@ won't run if the stream is garbage collected
-- after partial evaluation.
-- * does not require a 'MonadAsync' constraint.
--
-- /Inhibits stream fusion/
--
-- /Internal/
--
{-# INLINE_NORMAL gbracket_ #-}
gbracket_
:: Monad m
=> m c -- ^ before
-> (forall s. m s -> m (Either e s)) -- ^ try (exception handling)
-> (c -> m d) -- ^ after, on normal stop
-> (c -> e -> Stream m b -> Stream m b) -- ^ on exception
-> (c -> Stream m b) -- ^ stream generator
-> Stream m b
gbracket_ bef exc aft fexc fnormal =
Stream step GBracketInit
where
{-# INLINE_LATE step #-}
step _ GBracketInit = do
r <- bef
return $ Skip $ GBracketNormal (fnormal r) r
step gst (GBracketNormal (UnStream step1 st) v) = do
res <- exc $ step1 gst st
case res of
Right r -> case r of
Yield x s ->
return $ Yield x (GBracketNormal (Stream step1 s) v)
Skip s -> return $ Skip (GBracketNormal (Stream step1 s) v)
Stop -> aft v >> return Stop
-- XXX Do not handle async exceptions, just rethrow them.
Left e ->
return $ Skip (GBracketException (fexc v e (UnStream step1 st)))
step gst (GBracketException (UnStream step1 st)) = do
res <- step1 gst st
case res of
Yield x s -> return $ Yield x (GBracketException (Stream step1 s))
Skip s -> return $ Skip (GBracketException (Stream step1 s))
Stop -> return Stop
------------------------------------------------------------------------------
-- Finalizers
------------------------------------------------------------------------------
@ -172,6 +109,61 @@ withIORefFinalizer ref action = do
runinio action
------------------------------------------------------------------------------
-- Exception handling combinators.
------------------------------------------------------------------------------
data GbracketState s1 s2 v
= GBracketInit
| GBracketNormal s1 v
| GBracketException s2
-- | Like 'gbracket' but with following differences:
--
-- * alloc action @m c@ runs with async exceptions enabled
-- * cleanup action @c -> m d@ won't run if the stream is garbage collected
-- after partial evaluation.
-- * does not require a 'MonadAsync' constraint.
--
-- /Inhibits stream fusion/
--
-- /Internal/
--
{-# INLINE_NORMAL gbracket_ #-}
gbracket_
:: Monad m
=> m c -- ^ before
-> (forall s. m s -> m (Either e s)) -- ^ try (exception handling)
-> (c -> m d) -- ^ after, on normal stop
-> (c -> e -> Stream m b -> Stream m b) -- ^ on exception
-> (c -> Stream m b) -- ^ stream generator
-> Stream m b
gbracket_ bef exc aft fexc fnormal =
Stream step GBracketInit
where
{-# INLINE_LATE step #-}
step _ GBracketInit = do
r <- bef
return $ Skip $ GBracketNormal (fnormal r) r
step gst (GBracketNormal (UnStream step1 st) v) = do
res <- exc $ step1 gst st
case res of
Right r -> case r of
Yield x s ->
return $ Yield x (GBracketNormal (Stream step1 s) v)
Skip s -> return $ Skip (GBracketNormal (Stream step1 s) v)
Stop -> aft v >> return Stop
-- XXX Do not handle async exceptions, just rethrow them.
Left e ->
return $ Skip (GBracketException (fexc v e (UnStream step1 st)))
step gst (GBracketException (UnStream step1 st)) = do
res <- step1 gst st
case res of
Yield x s -> return $ Yield x (GBracketException (Stream step1 s))
Skip s -> return $ Skip (GBracketException (Stream step1 s))
Stop -> return Stop
data GbracketIOState s1 s2 v wref
= GBracketIOInit
@ -250,7 +242,7 @@ gbracket bef exc aft fexc fnormal =
Skip s -> return $ Skip (GBracketIOException (Stream step1 s))
Stop -> return Stop
-- | See 'Streamly.Internal.Data.Stream.IsStream.after_'.
-- | See 'Streamly.Internal.Data.Stream.IsStream.before'.
--
{-# INLINE_NORMAL before #-}
before :: Monad m => m b -> Stream m a -> Stream m a

View File

@ -1,41 +1,51 @@
#include "inline.hs"
-- |
-- Module : Streamly.Internal.Data.Stream.StreamD.Generate
-- Copyright : (c) 2020 Composewell Technologies
-- Copyright : (c) 2020 Composewell Technologies and Contributors
-- (c) Roman Leshchinskiy 2008-2010
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
-- Prefer unfolds ("Streamly.Internal.Data.Unfold") over the combinators in
-- this module. They are more powerful and efficient as they can be transformed
-- and composed on the input side efficiently and they can fuse in nested
-- operations (e.g. concatUnfold). All the combinators in this module can be
-- expressed using unfolds with the same efficiency.
--
-- Operations in this module that are not in "Streamly.Internal.Data.Unfold":
-- generate, times, fromPrimIORef.
--
-- We should plan to replace this module with "Streamly.Internal.Data.Unfold"
-- in future.
-- A few combinators in this module have been adapted from the vector package
-- (c) Roman Leshchinskiy. See the notes in specific combinators.
--
module Streamly.Internal.Data.Stream.StreamD.Generate
(
-- * Construction
-- * Primitives
nil
, nilM
, cons
, consM
-- * Generation
-- ** Unfolds
, unfoldr
, unfoldrM
-- * From 'Unfold'
, unfold
-- ** Specialized Generation
-- | Generate a monadic stream from a seed.
-- * Unfolding
, unfoldr
, unfoldrM
-- * From Values
, yield
, yieldM
, repeat
, repeatM
, replicate
, replicateM
, fromIndices
, fromIndicesM
, generate
, generateM
, iterate
, iterateM
-- ** Enumerations
-- * Enumeration
, enumerateFromStepIntegral
, enumerateFromIntegral
, enumerateFromThenIntegral
@ -48,33 +58,48 @@ module Streamly.Internal.Data.Stream.StreamD.Generate
, enumerateFromToFractional
, enumerateFromThenToFractional
-- ** Time
-- * Time Enumeration
, times
-- ** Conversions
-- * From Generators
-- | Generate a monadic stream from a seed.
, fromIndices
, fromIndicesM
, generate
, generateM
-- * Iteration
, iterate
, iterateM
-- * From Containers
-- | Transform an input structure into a stream.
-- | Direct style stream does not support @fromFoldable@.
, yield
, yieldM
-- Note: Direct style stream does not support @fromFoldable@.
, fromList
, fromListM
, fromStreamK
, fromStreamD
, fromPrimIORef
, fromProducer
, fromSVar
, toStreamD
-- * Callbacks
, newCallbackStream
-- * Conversions
, fromStreamK
, toStreamK
, fromStreamD
, toStreamD
)
where
#include "inline.hs"
import Control.Concurrent (myThreadId, threadDelay)
import Control.Monad (void, forever)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Int (Int64)
import Streamly.Internal.Data.Time.Units
(toRelTime64, RelTime64)
import Streamly.Internal.Data.Time.Units (toRelTime64, RelTime64)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units
(MicroSecond64(..), fromAbsTime, toAbsTime, AbsTime)
@ -88,7 +113,7 @@ import Streamly.Internal.Data.Stream.StreamD.Type
import Streamly.Internal.Data.SVar
------------------------------------------------------------------------------
-- Construction
-- Primitives
------------------------------------------------------------------------------
-- | An empty 'Stream'.
@ -115,24 +140,9 @@ cons x (Stream step state) = Stream step1 Nothing
Stop -> Stop
------------------------------------------------------------------------------
-- Generation by unfold
-- From 'Unfold'
------------------------------------------------------------------------------
{-# INLINE_NORMAL unfoldrM #-}
unfoldrM :: Monad m => (s -> m (Maybe (a, s))) -> s -> Stream m a
unfoldrM next state = Stream step state
where
{-# INLINE_LATE step #-}
step _ st = do
r <- next st
return $ case r of
Just (x, s) -> Yield x s
Nothing -> Stop
{-# INLINE_LATE unfoldr #-}
unfoldr :: Monad m => (s -> Maybe (a, s)) -> s -> Stream m a
unfoldr f = unfoldrM (return . f)
data UnfoldState s = UnfoldNothing | UnfoldJust s
-- | Convert an 'Unfold' into a 'Stream' by supplying it a seed.
@ -151,7 +161,27 @@ unfold (Unfold ustep inject) seed = Stream step UnfoldNothing
Stop -> Stop
------------------------------------------------------------------------------
-- Specialized Generation
-- Unfolding
------------------------------------------------------------------------------
-- Adapted from vector package
{-# INLINE_NORMAL unfoldrM #-}
unfoldrM :: Monad m => (s -> m (Maybe (a, s))) -> s -> Stream m a
unfoldrM next state = Stream step state
where
{-# INLINE_LATE step #-}
step _ st = do
r <- next st
return $ case r of
Just (x, s) -> Yield x s
Nothing -> Stop
{-# INLINE_LATE unfoldr #-}
unfoldr :: Monad m => (s -> Maybe (a, s)) -> s -> Stream m a
unfoldr f = unfoldrM (return . f)
------------------------------------------------------------------------------
-- From values
------------------------------------------------------------------------------
{-# INLINE_NORMAL repeatM #-}
@ -162,14 +192,7 @@ repeatM x = Stream (\_ _ -> x >>= \r -> return $ Yield r ()) ()
repeat :: Monad m => a -> Stream m a
repeat x = Stream (\_ _ -> return $ Yield x ()) ()
{-# INLINE_NORMAL iterateM #-}
iterateM :: Monad m => (a -> m a) -> m a -> Stream m a
iterateM step = Stream (\_ st -> st >>= \x -> return $ Yield x (step x))
{-# INLINE_NORMAL iterate #-}
iterate :: Monad m => (a -> a) -> a -> Stream m a
iterate step st = iterateM (return . step) (return st)
-- Adapted from the vector package
{-# INLINE_NORMAL replicateM #-}
replicateM :: forall m a. Monad m => Int -> m a -> Stream m a
replicateM n p = Stream step n
@ -185,6 +208,10 @@ replicateM n p = Stream step n
replicate :: Monad m => Int -> a -> Stream m a
replicate n x = replicateM n (return x)
------------------------------------------------------------------------------
-- Enumeration
------------------------------------------------------------------------------
-- This would not work properly for floats, therefore we put an Integral
-- constraint.
-- | Can be used to enumerate unbounded integrals. This does not check for
@ -323,6 +350,10 @@ numFrom from = enumerateFromStepNum from 1
numFromThen :: (Monad m, Num a) => a -> a -> Stream m a
numFromThen from next = enumerateFromStepNum from (next - from)
------------------------------------------------------------------------------
-- Time Enumeration
------------------------------------------------------------------------------
{-# INLINE updateTimeVar #-}
updateTimeVar :: Prim.IORef Int64 -> IO ()
updateTimeVar timeVar = do
@ -348,8 +379,6 @@ updateWithDelay precision timeVar = do
g' = g * 10 ^ (6 :: Int)
-- The take/drop combinators above should be moved to filtering section.
{-# INLINE_NORMAL times #-}
times :: MonadAsync m => Double -> Stream m (AbsTime, RelTime64)
times g = Stream step Nothing
@ -376,7 +405,7 @@ times g = Stream step Nothing
(toRelTime64 (MicroSecond64 (a - t0)))) s
-------------------------------------------------------------------------------
-- Generation by Conversion
-- From Generators
-------------------------------------------------------------------------------
{-# INLINE_NORMAL fromIndicesM #-}
@ -392,6 +421,7 @@ fromIndicesM gen = Stream step 0
fromIndices :: Monad m => (Int -> a) -> Stream m a
fromIndices gen = fromIndicesM (return . gen)
-- Adapted from the vector package
{-# INLINE_NORMAL generateM #-}
generateM :: Monad m => Int -> (Int -> m a) -> Stream m a
generateM n gen = n `seq` Stream step 0
@ -406,6 +436,22 @@ generateM n gen = n `seq` Stream step 0
generate :: Monad m => Int -> (Int -> a) -> Stream m a
generate n gen = generateM n (return . gen)
-------------------------------------------------------------------------------
-- Iteration
-------------------------------------------------------------------------------
{-# INLINE_NORMAL iterateM #-}
iterateM :: Monad m => (a -> m a) -> m a -> Stream m a
iterateM step = Stream (\_ st -> st >>= \x -> return $ Yield x (step x))
{-# INLINE_NORMAL iterate #-}
iterate :: Monad m => (a -> a) -> a -> Stream m a
iterate step st = iterateM (return . step) (return st)
-------------------------------------------------------------------------------
-- From containers
-------------------------------------------------------------------------------
-- XXX we need the MonadAsync constraint because of a rewrite rule.
-- | Convert a list of monadic actions to a 'Stream'
{-# INLINE_LATE fromListM #-}
@ -416,6 +462,10 @@ fromListM = Stream step
step _ (m:ms) = m >>= \x -> return $ Yield x ms
step _ [] = return Stop
-------------------------------------------------------------------------------
-- From callback
-------------------------------------------------------------------------------
-- Note: we can use another API with two callbacks stop and yield if we want
-- the callback to be able to indicate end of stream.
--

View File

@ -0,0 +1,112 @@
-- |
-- Module : Streamly.Internal.Data.Stream.StreamD.Lift
-- Copyright : (c) 2018 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
-- Transform the underlying monad of a stream.
module Streamly.Internal.Data.Stream.StreamD.Lift
(
-- * Generalize Inner Monad
hoist
, generally -- XXX generalize
-- * Transform Inner Monad
, liftInner
, runReaderT
, evalStateT
, runStateT
)
where
#include "inline.hs"
import Control.Monad.Trans.Class (MonadTrans(lift))
import Control.Monad.Trans.Reader (ReaderT)
import Control.Monad.Trans.State.Strict (StateT)
import Data.Functor.Identity (Identity(..))
import Streamly.Internal.Data.SVar (adaptState)
import qualified Control.Monad.Trans.Reader as Reader
import qualified Control.Monad.Trans.State.Strict as State
import Streamly.Internal.Data.Stream.StreamD.Type
-------------------------------------------------------------------------------
-- Generalize Inner Monad
-------------------------------------------------------------------------------
{-# INLINE_NORMAL hoist #-}
hoist :: Monad n => (forall x. m x -> n x) -> Stream m a -> Stream n a
hoist f (Stream step state) = (Stream step' state)
where
{-# INLINE_LATE step' #-}
step' gst st = do
r <- f $ step (adaptState gst) st
return $ case r of
Yield x s -> Yield x s
Skip s -> Skip s
Stop -> Stop
{-# INLINE generally #-}
generally :: Monad m => Stream Identity a -> Stream m a
generally = hoist (return . runIdentity)
-------------------------------------------------------------------------------
-- Transform Inner Monad
-------------------------------------------------------------------------------
{-# INLINE_NORMAL liftInner #-}
liftInner :: (Monad m, MonadTrans t, Monad (t m))
=> Stream m a -> Stream (t m) a
liftInner (Stream step state) = Stream step' state
where
{-# INLINE_LATE step' #-}
step' gst st = do
r <- lift $ step (adaptState gst) st
return $ case r of
Yield x s -> Yield x s
Skip s -> Skip s
Stop -> Stop
{-# INLINE_NORMAL runReaderT #-}
runReaderT :: Monad m => m s -> Stream (ReaderT s m) a -> Stream m a
runReaderT env (Stream step state) = Stream step' (state, env)
where
{-# INLINE_LATE step' #-}
step' gst (st, action) = do
sv <- action
r <- Reader.runReaderT (step (adaptState gst) st) sv
return $ case r of
Yield x s -> Yield x (s, return sv)
Skip s -> Skip (s, return sv)
Stop -> Stop
{-# INLINE_NORMAL evalStateT #-}
evalStateT :: Monad m => m s -> Stream (StateT s m) a -> Stream m a
evalStateT initial (Stream step state) = Stream step' (state, initial)
where
{-# INLINE_LATE step' #-}
step' gst (st, action) = do
sv <- action
(r, sv') <- State.runStateT (step (adaptState gst) st) sv
return $ case r of
Yield x s -> Yield x (s, return sv')
Skip s -> Skip (s, return sv')
Stop -> Stop
{-# INLINE_NORMAL runStateT #-}
runStateT :: Monad m => m s -> Stream (StateT s m) a -> Stream m (s, a)
runStateT initial (Stream step state) = Stream step' (state, initial)
where
{-# INLINE_LATE step' #-}
step' gst (st, action) = do
sv <- action
(r, sv') <- State.runStateT (step (adaptState gst) st) sv
return $ case r of
Yield x s -> Yield (sv', x) (s, return sv')
Skip s -> Skip (s, return sv')
Stop -> Stop

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -451,23 +451,19 @@ library
, Streamly.Internal.Data.Stream.StreamK.Type
, Streamly.Internal.Data.Stream.StreamK
, Streamly.Internal.Data.Stream.StreamD.Type
, Streamly.Internal.Data.Stream.StreamD
, Streamly.Internal.Data.Stream.StreamD.Generate
, Streamly.Internal.Data.Stream.StreamD.Eliminate
, Streamly.Internal.Data.Stream.StreamD.Nesting
, Streamly.Internal.Data.Stream.StreamD.Transform
, Streamly.Internal.Data.Stream.StreamD.Exception
, Streamly.Internal.Data.Stream.StreamD.Lift
, Streamly.Internal.Data.Stream.StreamD
, Streamly.Internal.Data.Stream.StreamDK.Type
, Streamly.Internal.Data.Stream.StreamDK
, Streamly.Internal.Data.Stream.Enumeration
, Streamly.Internal.Data.Stream.Prelude
-- Higher level streams
, Streamly.Internal.Data.Stream.IsStream
, Streamly.Internal.Data.Stream.IsStream.Generate
, Streamly.Internal.Data.Stream.IsStream.Eliminate
, Streamly.Internal.Data.Stream.IsStream.Transform
, Streamly.Internal.Data.Stream.IsStream.Exception
, Streamly.Internal.Data.Stream.SVar
, Streamly.Internal.Data.Stream.Serial
, Streamly.Internal.Data.Stream.Async
@ -475,6 +471,14 @@ library
, Streamly.Internal.Data.Stream.Ahead
, Streamly.Internal.Data.Stream.Zip
, Streamly.Internal.Data.Stream.Combinators
, Streamly.Internal.Data.Stream.IsStream.Common
, Streamly.Internal.Data.Stream.IsStream.Generate
, Streamly.Internal.Data.Stream.IsStream.Eliminate
, Streamly.Internal.Data.Stream.IsStream.Transform
, Streamly.Internal.Data.Stream.IsStream.Nesting
, Streamly.Internal.Data.Stream.IsStream.Exception
, Streamly.Internal.Data.Stream.IsStream.Lift
, Streamly.Internal.Data.Stream.IsStream
, Streamly.Internal.Data.List
-- Unfolds