Change the input stream type of classifySessionsBy

For more convenient operation.
This commit is contained in:
Harendra Kumar 2021-03-05 01:31:26 +05:30
parent f7681dc897
commit fca88a49e8
2 changed files with 17 additions and 22 deletions

View File

@ -202,7 +202,6 @@ classifySessionsOf :: (S.MonadAsync m) => SerialT m Int -> m ()
classifySessionsOf =
S.drain
. Internal.classifySessionsOf (const (return False)) 3 FL.drain
. S.map (\(ts,(k,a)) -> (k, a, ts))
. Internal.timestamped
. S.concatMap (\x -> S.map (x,) (S.enumerateFromTo 1 (10 :: Int)))

View File

@ -202,14 +202,13 @@ module Streamly.Internal.Data.Stream.IsStream.Nesting
-- ** Keyed Window Classification
-- | Split the stream into chunks or windows in space or time. Each window
-- can be associated with a key, all events associated with a particular
-- key in the window can be folded to a single result. The stream is split
-- into windows of specified size, the window termination can be
-- dynamically controlled by the fold.
-- | Split the stream into chunks or windows by position or time. Each
-- window can be associated with a key, all events associated with a
-- particular key in the window can be folded to a single result. The
-- window termination can be dynamically controlled by the fold.
--
-- The term "chunk" is used for a space window and the term "session" is
-- used for a time window.
-- The term "chunk" is used for a window defined by position of elements
-- and the term "session" is used for a time window.
-- *** Tumbling Windows
-- | A new window starts after the previous window is finished.
@ -1901,12 +1900,10 @@ data SessionState t m k a b = SessionState
, sessionOutputStream :: t (m :: Type -> Type) (k, b) -- ^ Completed sessions
}
-- XXX Perhaps we should use an "Event a" type to represent timestamped data.
-- | @classifySessionsBy tick keepalive predicate timeout fold stream@
-- classifies an input event @stream@ consisting of @(key, value, timestamp)@
-- into sessions based on the @key@, folding all the values corresponding to
-- the same key into a session using the supplied @fold@.
-- classifies an input event @stream@ consisting of @(timestamp, (key,
-- value))@ into sessions based on the @key@, folding all the values
-- corresponding to the same key into a session using the supplied @fold@.
--
-- When the fold terminates or a @timeout@ occurs, a tuple consisting of the
-- session key and the folded value is emitted in the output stream. The
@ -1928,11 +1925,10 @@ data SessionState t m k a b = SessionState
--
-- >>> :{
-- Stream.mapM_ print
-- $ Stream.classifySessionsBy 1 False (const (return False)) 3 (Fold.takeLE 3 Fold.toList)
-- $ Stream.map (\\(ts,(k,a)) -> (k, a, ts))
-- $ Stream.classifySessionsBy 1 False (const (return False)) 3 (Fold.take 3 Fold.toList)
-- $ Stream.timestamped
-- $ Stream.delay 1
-- $ (,) \<$> Stream.fromList [1,2,3] \<*> Stream.fromList [\'a',\'b',\'c']
-- $ (,) <$> Stream.fromList [1,2,3] <*> Stream.fromList ['a','b','c']
-- :}
-- (1,"abc")
-- (2,"abc")
@ -1947,8 +1943,8 @@ classifySessionsBy
-> Bool -- ^ reset the timer when an event is received
-> (Int -> m Bool) -- ^ predicate to eject sessions based on session count
-> Double -- ^ session timeout in seconds
-> Fold m a b -- ^ Fold to be applied to session events
-> t m (k, a, AbsTime) -- ^ session key, data, timestamp
-> Fold m a b -- ^ Fold to be applied to session data
-> t m (AbsTime, (k, a)) -- ^ timestamp, (session key, session data)
-> t m (k, b) -- ^ session key, fold result
classifySessionsBy tick reset ejectPred tmout
(Fold step initial extract) str =
@ -1979,7 +1975,7 @@ classifySessionsBy tick reset ejectPred tmout
-- We use the first strategy as of now.
-- Got a new stream input element
sstep session@SessionState{..} (Just (key, value, timestamp)) = do
sstep session@SessionState{..} (Just (timestamp, (key, value))) = do
-- XXX we should use a heap in pinned memory to scale it to a large
-- size
--
@ -2176,7 +2172,7 @@ classifyKeepAliveSessions ::
=> (Int -> m Bool) -- ^ predicate to eject sessions on session count
-> Double -- ^ session inactive timeout
-> Fold m a b -- ^ Fold to be applied to session payload data
-> t m (k, a, AbsTime) -- ^ session key, data, timestamp
-> t m (AbsTime, (k, a)) -- ^ timestamp, (session key, session data)
-> t m (k, b)
classifyKeepAliveSessions = classifySessionsBy 1 True
@ -2222,8 +2218,8 @@ classifySessionsOf ::
(IsStream t, MonadAsync m, Ord k)
=> (Int -> m Bool) -- ^ predicate to eject sessions on session count
-> Double -- ^ time window size
-> Fold m a b -- ^ Fold to be applied to session events
-> t m (k, a, AbsTime) -- ^ session key, data, timestamp
-> Fold m a b -- ^ Fold to be applied to session data
-> t m (AbsTime, (k, a)) -- ^ timestamp, (session key, session data)
-> t m (k, b)
classifySessionsOf = classifySessionsBy 1 False