diff --git a/src/Streamly/Internal/Prelude.hs b/src/Streamly/Internal/Prelude.hs index 4a6b8cfb2..947dbabdc 100644 --- a/src/Streamly/Internal/Prelude.hs +++ b/src/Streamly/Internal/Prelude.hs @@ -1,5 +1,7 @@ {-# LANGUAGE CPP #-} {-# LANGUAGE RankNTypes #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE KindSignatures #-} {-# LANGUAGE FlexibleContexts #-} #if __GLASGOW_HASKELL__ >= 800 @@ -449,6 +451,9 @@ import Control.Monad.State.Strict (StateT) import Control.Monad.Trans (MonadTrans(..)) import Control.Monad.Trans.Control (MonadBaseControl) import Data.Functor.Identity (Identity (..)) +#if __GLASGOW_HASKELL__ >= 800 +import Data.Kind (Type) +#endif import Data.Heap (Entry(..)) import Data.Maybe (isJust, fromJust, isNothing) import Foreign.Storable (Storable) @@ -480,7 +485,7 @@ import Streamly.Internal.Data.Stream.Serial (SerialT, WSerialT) import Streamly.Internal.Data.Stream.Zip (ZipSerialM) import Streamly.Internal.Data.Pipe.Types (Pipe (..)) import Streamly.Internal.Data.Time.Units - (AbsTime, MilliSecond64(..), addToAbsTime, diffAbsTime, toRelTime, + (AbsTime, MilliSecond64(..), addToAbsTime, toRelTime, toAbsTime, TimeUnit64) import Streamly.Internal.Mutable.Prim.Var (Prim, Var) @@ -3967,49 +3972,83 @@ classifyKeepAliveChunks classifyKeepAliveChunks spanout = classifyChunksBy spanout True -} --- | @classifySessionsBy tick timeout reset f stream@ groups together all input --- stream elements that belong to the same session. @timeout@ is the maximum --- lifetime of a session in seconds. All elements belonging to a session are --- purged after this duration. If "reset" is 'Ture' then the timeout is reset --- after every event received in the session. Session duration is measured --- using the timestamp of the first element seen for that session. To detect --- session timeouts, a monotonic event time clock is maintained using the --- timestamps seen in the inputs and a timer with a tick duration specified by --- @tick@. +#if __GLASGOW_HASKELL__ < 800 +#define Type * +#endif + +data SessionState t m k a b = SessionState + { sessionCurTime :: !AbsTime -- ^ time since last event + , sessionEventTime :: !AbsTime -- ^ time as per last event + , sessionCount :: !Int -- ^ total number sessions in progress + , sessionTimerHeap :: H.Heap (H.Entry AbsTime k) -- ^ heap for timeouts + , sessionKeyValueMap :: Map.Map k a -- ^ Stored sessions for keys + , sessionOutputStream :: t (m :: Type -> Type) (k, b) -- ^ Completed sessions + } + +#undef Type + +-- | @classifySessionsBy tick timeout reset f stream@ groups timed events in an +-- input event stream into sessions based on a session key. Each element in the +-- stream is an event consisting of a 4-tuple @(session key, sesssion data, +-- isClosingEvent, timestamp)@. @session key@ is a key that uniquely +-- identifies a session. All the events belonging to a session are folded using +-- the fold @f@ until an event with @isClosingEvent@ flag set arrives or a +-- timeout has occurred. The session key and the result of the fold are +-- emitted in the output stream when the session is purged. -- --- @session key@ is a key that uniquely identifies the session for the given --- element, @timestamp@ characterizes the time when the input element was --- generated, this is an absolute time measured from some @Epoch@. @session --- close@ is a boolean indicating whether this element marks the closing of the --- session. When an input element with @session close@ set to @True@ is seen --- the session is purged immediately. +-- When @reset@ is 'False', @timeout@ is the maximum lifetime of a session in +-- seconds, measured from the @timestamp@ of the first event in that session. +-- When "reset" is 'True' then the timeout is reset after every event received +-- in the session, in other words timeout is measured from the timestamp of the +-- last event in the session. -- --- All the input elements belonging to a session are collected using the fold --- @f@. The session key and the fold result are emitted in the output stream --- when the session is purged either via the session close event or via the --- session lifetime timeout. +-- @timestamp@ in an event characterizes the time when the input event was +-- generated, this is an absolute time measured from some @Epoch@. The notion +-- of current time is maintained by a monotonic event time clock using the +-- timestamps seen in the input stream. The latest timestamp seen till now is +-- used as the base for the current time. When no new events are seen, a timer +-- is started with a tick duration specified by @tick@. This timer is used to +-- detect session timeouts in the absence of new events. +-- +-- /Internal/ -- --- @since 0.7.0 {-# INLINABLE classifySessionsBy #-} classifySessionsBy :: (IsStream t, MonadAsync m, Ord k) => Double -- ^ timer tick in seconds - -> Double -- ^ session timeout + -> Double -- ^ session timeout in seconds -> Bool -- ^ reset the timeout when an event is received -> Fold m a b -- ^ Fold to be applied to session events - -> t m (k, a, Bool, AbsTime) -- ^ session key, timestamp, close event, data - -> t m (k, b) + -> t m (k, a, Bool, AbsTime) -- ^ session key, data, timestamp, close event + -> t m (k, b) -- ^ session key, fold result classifySessionsBy tick timeout reset (Fold step initial extract) str = - concatMap (\(Tuple4' _ _ _ s) -> s) $ scanlM' sstep szero stream + concatMap (\session -> sessionOutputStream session) + $ scanlM' sstep szero stream where timeoutMs = toRelTime (round (timeout * 1000) :: MilliSecond64) tickMs = toRelTime (round (tick * 1000) :: MilliSecond64) - szero = Tuple4' (toAbsTime (0 :: MilliSecond64)) H.empty Map.empty K.nil + szero = SessionState + { sessionCurTime = toAbsTime (0 :: MilliSecond64) + , sessionEventTime = toAbsTime (0 :: MilliSecond64) + , sessionCount = 0 + , sessionTimerHeap = H.empty + , sessionKeyValueMap = Map.empty + , sessionOutputStream = K.nil + } + -- XXX If there are too many sessions in progress we may want to put a + -- limit on the number of sessions because of memory consumption limits. We + -- can use two different strategies in this case: + -- + -- 1) Eject old sessions or sessions beyond a certain/lower timeout + -- threshold even before timeout, effectively reduce the timeout. + -- 2) Drop creation of new sessions but keep accepting new events for the + -- old ones. + -- -- Got a new stream input element - sstep (Tuple4' evTime hp mp _) (Just (key, a, closing, ts)) = + sstep (session@SessionState{..}) (Just (key, a, closing, ts)) = -- XXX we should use a heap in pinned memory to scale it to a large -- size -- @@ -4027,66 +4066,90 @@ classifySessionsBy tick timeout reset (Fold step initial extract) str = -- XXX if the key is an Int, we can also use an IntMap for slightly -- better performance. -- - let accumulate v = do + let curTime = max sessionEventTime ts + accumulate v = do Tuple' _ old <- maybe (initial >>= return . Tuple' ts) return v new <- step old a return $ Tuple' ts new in if closing then do - let (r, mp') = Map.updateLookupWithKey (\_ _ -> Nothing) key mp + let (r, mp') = Map.updateLookupWithKey (\_ _ -> Nothing) key + sessionKeyValueMap Tuple' _ acc <- accumulate r res <- extract acc - return $ Tuple4' evTime hp mp' (yield (key, res)) + return $ session + { sessionCurTime = curTime + , sessionEventTime = curTime + , sessionCount = sessionCount - 1 + , sessionKeyValueMap = mp' + , sessionOutputStream = yield (key, res) + } else do - let r = Map.lookup key mp + let r = Map.lookup key sessionKeyValueMap acc <- accumulate r - let mp' = Map.insert key acc mp - let hp' = + let mp' = Map.insert key acc sessionKeyValueMap + let (hp', count) = case r of Nothing -> let expiry = addToAbsTime ts timeoutMs - in H.insert (Entry expiry key) hp - Just _ -> hp + in (H.insert (Entry expiry key) + sessionTimerHeap, sessionCount + 1) + Just _ -> (sessionTimerHeap, sessionCount) -- Event time is maintained as monotonically increasing -- time. If we have lagged behind any of the timestamps -- seen then we increase it to match the latest time seen -- in the timestamps. We also increase it on timer ticks. - return $ Tuple4' (max evTime ts) hp' mp' K.nil + return $ SessionState + { sessionCurTime = curTime + , sessionEventTime = curTime + , sessionCount = count + , sessionTimerHeap = hp' + , sessionKeyValueMap = mp' + , sessionOutputStream = K.nil + } -- Got a timer tick event -- XXX can we yield the entries without accumulating them? - sstep (Tuple4' evTime heap sessions _) Nothing = do - (hp', mp', out) <- go heap sessions K.nil - return $ Tuple4' curTime hp' mp' out + sstep (session@SessionState{..}) Nothing = do + (hp', mp', out, count) <- go sessionTimerHeap sessionKeyValueMap K.nil 0 + return $ session + { sessionCurTime = curTime + , sessionCount = sessionCount - count + , sessionTimerHeap = hp' + , sessionKeyValueMap = mp' + , sessionOutputStream = out + } where - curTime = addToAbsTime evTime tickMs - go hp mp out = do + curTime = addToAbsTime sessionCurTime tickMs + go hp mp out cnt = do let hres = H.uncons hp case hres of - Just (Entry ts key, hp') -> do - let duration = diffAbsTime curTime ts - if duration >= timeoutMs + Just (Entry expiry key, hp') -> do + if curTime >= expiry then do let (r, mp') = Map.updateLookupWithKey (\_ _ -> Nothing) key mp case r of - Nothing -> go hp' mp' out + Nothing -> go hp' mp' out cnt Just (Tuple' latestTS acc) -> do - let dur = diffAbsTime curTime latestTS - if dur >= timeoutMs || not reset + let expiry' = addToAbsTime latestTS timeoutMs + if curTime >= expiry' || not reset then do sess <- extract acc go hp' mp' ((key, sess) `K.cons` out) + (cnt + 1) else -- reset the session timeout - let expiry = addToAbsTime latestTS timeoutMs - hp'' = H.insert (Entry expiry key) hp' - mp'' = Map.insert key (Tuple' latestTS acc) mp' - in go hp'' mp'' out - else return (hp, mp, out) - Nothing -> return (hp, mp, out) + -- XXX purge anyway if the session size + -- goes beyond maximum session size. + let hp'' = H.insert (Entry expiry' key) hp' + mp'' = Map.insert key + (Tuple' latestTS acc) mp' + in go hp'' mp'' out cnt + else return (hp, mp, out, cnt) + Nothing -> return (hp, mp, out, cnt) -- merge timer events in the stream stream = Serial.map Just str `Par.parallel` repeatM timer @@ -4098,7 +4161,12 @@ classifySessionsBy tick timeout reset (Fold step initial extract) str = -- received within the session window. The session times out and gets closed -- only if no event is received within the specified session window size. -- --- @since 0.7.0 +-- @ +-- classifyKeepAliveSessions timeout = classifySessionsBy 1 timeout True +-- @ +-- +-- /Internal/ +-- {-# INLINABLE classifyKeepAliveSessions #-} classifyKeepAliveSessions :: (IsStream t, MonadAsync m, Ord k) @@ -4147,7 +4215,12 @@ classifyChunksOf wsize = classifyChunksBy wsize False -- session window end, a monotonic event time clock is maintained synced with -- the timestamps with a clock resolution of 1 second. -- --- @since 0.7.0 +-- @ +-- classifySessionsOf interval = classifySessionsBy 1 interval False +-- @ +-- +-- /Internal/ +-- {-# INLINABLE classifySessionsOf #-} classifySessionsOf :: (IsStream t, MonadAsync m, Ord k)