Fix time measurement issue

1) We were taking a max of the latest timestamp and the timer maintained time
   which is incorrect.

2) We were computing the timeout incorrectly making it twice the actual

Also, maintain sessionCount in the state. We will use this later to put a limit
of the number of sessions.
This commit is contained in:
Harendra Kumar 2020-02-06 21:51:27 +05:30
parent 036ca6a557
commit eea9aa1721

@ -1,5 +1,7 @@
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE FlexibleContexts #-}
#if __GLASGOW_HASKELL__ >= 800
@ -449,6 +451,9 @@ import Control.Monad.State.Strict (StateT)
import Control.Monad.Trans (MonadTrans(..))
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Functor.Identity (Identity (..))
#if __GLASGOW_HASKELL__ >= 800
import Data.Kind (Type)
import Data.Heap (Entry(..))
import Data.Maybe (isJust, fromJust, isNothing)
import Foreign.Storable (Storable)
@ -480,7 +485,7 @@ import Streamly.Internal.Data.Stream.Serial (SerialT, WSerialT)
import Streamly.Internal.Data.Stream.Zip (ZipSerialM)
import Streamly.Internal.Data.Pipe.Types (Pipe (..))
import Streamly.Internal.Data.Time.Units
(AbsTime, MilliSecond64(..), addToAbsTime, diffAbsTime, toRelTime,
(AbsTime, MilliSecond64(..), addToAbsTime, toRelTime,
toAbsTime, TimeUnit64)
import Streamly.Internal.Mutable.Prim.Var (Prim, Var)
@ -3967,49 +3972,83 @@ classifyKeepAliveChunks
classifyKeepAliveChunks spanout = classifyChunksBy spanout True
-- | @classifySessionsBy tick timeout reset f stream@ groups together all input
-- stream elements that belong to the same session. @timeout@ is the maximum
-- lifetime of a session in seconds. All elements belonging to a session are
-- purged after this duration. If "reset" is 'Ture' then the timeout is reset
-- after every event received in the session. Session duration is measured
-- using the timestamp of the first element seen for that session. To detect
-- session timeouts, a monotonic event time clock is maintained using the
-- timestamps seen in the inputs and a timer with a tick duration specified by
-- @tick@.
#if __GLASGOW_HASKELL__ < 800
#define Type *
data SessionState t m k a b = SessionState
{ sessionCurTime :: !AbsTime -- ^ time since last event
, sessionEventTime :: !AbsTime -- ^ time as per last event
, sessionCount :: !Int -- ^ total number sessions in progress
, sessionTimerHeap :: H.Heap (H.Entry AbsTime k) -- ^ heap for timeouts
, sessionKeyValueMap :: Map.Map k a -- ^ Stored sessions for keys
, sessionOutputStream :: t (m :: Type -> Type) (k, b) -- ^ Completed sessions
#undef Type
-- | @classifySessionsBy tick timeout reset f stream@ groups timed events in an
-- input event stream into sessions based on a session key. Each element in the
-- stream is an event consisting of a 4-tuple @(session key, sesssion data,
-- isClosingEvent, timestamp)@. @session key@ is a key that uniquely
-- identifies a session. All the events belonging to a session are folded using
-- the fold @f@ until an event with @isClosingEvent@ flag set arrives or a
-- timeout has occurred. The session key and the result of the fold are
-- emitted in the output stream when the session is purged.
-- @session key@ is a key that uniquely identifies the session for the given
-- element, @timestamp@ characterizes the time when the input element was
-- generated, this is an absolute time measured from some @Epoch@. @session
-- close@ is a boolean indicating whether this element marks the closing of the
-- session. When an input element with @session close@ set to @True@ is seen
-- the session is purged immediately.
-- When @reset@ is 'False', @timeout@ is the maximum lifetime of a session in
-- seconds, measured from the @timestamp@ of the first event in that session.
-- When "reset" is 'True' then the timeout is reset after every event received
-- in the session, in other words timeout is measured from the timestamp of the
-- last event in the session.
-- All the input elements belonging to a session are collected using the fold
-- @f@. The session key and the fold result are emitted in the output stream
-- when the session is purged either via the session close event or via the
-- session lifetime timeout.
-- @timestamp@ in an event characterizes the time when the input event was
-- generated, this is an absolute time measured from some @Epoch@. The notion
-- of current time is maintained by a monotonic event time clock using the
-- timestamps seen in the input stream. The latest timestamp seen till now is
-- used as the base for the current time. When no new events are seen, a timer
-- is started with a tick duration specified by @tick@. This timer is used to
-- detect session timeouts in the absence of new events.
-- /Internal/
-- @since 0.7.0
{-# INLINABLE classifySessionsBy #-}
:: (IsStream t, MonadAsync m, Ord k)
=> Double -- ^ timer tick in seconds
-> Double -- ^ session timeout
-> Double -- ^ session timeout in seconds
-> Bool -- ^ reset the timeout when an event is received
-> Fold m a b -- ^ Fold to be applied to session events
-> t m (k, a, Bool, AbsTime) -- ^ session key, timestamp, close event, data
-> t m (k, b)
-> t m (k, a, Bool, AbsTime) -- ^ session key, data, timestamp, close event
-> t m (k, b) -- ^ session key, fold result
classifySessionsBy tick timeout reset (Fold step initial extract) str =
concatMap (\(Tuple4' _ _ _ s) -> s) $ scanlM' sstep szero stream
concatMap (\session -> sessionOutputStream session)
$ scanlM' sstep szero stream
timeoutMs = toRelTime (round (timeout * 1000) :: MilliSecond64)
tickMs = toRelTime (round (tick * 1000) :: MilliSecond64)
szero = Tuple4' (toAbsTime (0 :: MilliSecond64)) H.empty Map.empty K.nil
szero = SessionState
{ sessionCurTime = toAbsTime (0 :: MilliSecond64)
, sessionEventTime = toAbsTime (0 :: MilliSecond64)
, sessionCount = 0
, sessionTimerHeap = H.empty
, sessionKeyValueMap = Map.empty
, sessionOutputStream = K.nil
-- XXX If there are too many sessions in progress we may want to put a
-- limit on the number of sessions because of memory consumption limits. We
-- can use two different strategies in this case:
-- 1) Eject old sessions or sessions beyond a certain/lower timeout
-- threshold even before timeout, effectively reduce the timeout.
-- 2) Drop creation of new sessions but keep accepting new events for the
-- old ones.
-- Got a new stream input element
sstep (Tuple4' evTime hp mp _) (Just (key, a, closing, ts)) =
sstep (session@SessionState{..}) (Just (key, a, closing, ts)) =
-- XXX we should use a heap in pinned memory to scale it to a large
-- size
@ -4027,66 +4066,90 @@ classifySessionsBy tick timeout reset (Fold step initial extract) str =
-- XXX if the key is an Int, we can also use an IntMap for slightly
-- better performance.
let accumulate v = do
let curTime = max sessionEventTime ts
accumulate v = do
Tuple' _ old <- maybe (initial >>= return . Tuple' ts) return v
new <- step old a
return $ Tuple' ts new
in if closing
then do
let (r, mp') = Map.updateLookupWithKey (\_ _ -> Nothing) key mp
let (r, mp') = Map.updateLookupWithKey (\_ _ -> Nothing) key
Tuple' _ acc <- accumulate r
res <- extract acc
return $ Tuple4' evTime hp mp' (yield (key, res))
return $ session
{ sessionCurTime = curTime
, sessionEventTime = curTime
, sessionCount = sessionCount - 1
, sessionKeyValueMap = mp'
, sessionOutputStream = yield (key, res)
else do
let r = Map.lookup key mp
let r = Map.lookup key sessionKeyValueMap
acc <- accumulate r
let mp' = Map.insert key acc mp
let hp' =
let mp' = Map.insert key acc sessionKeyValueMap
let (hp', count) =
case r of
Nothing ->
let expiry = addToAbsTime ts timeoutMs
in H.insert (Entry expiry key) hp
Just _ -> hp
in (H.insert (Entry expiry key)
sessionTimerHeap, sessionCount + 1)
Just _ -> (sessionTimerHeap, sessionCount)
-- Event time is maintained as monotonically increasing
-- time. If we have lagged behind any of the timestamps
-- seen then we increase it to match the latest time seen
-- in the timestamps. We also increase it on timer ticks.
return $ Tuple4' (max evTime ts) hp' mp' K.nil
return $ SessionState
{ sessionCurTime = curTime
, sessionEventTime = curTime
, sessionCount = count
, sessionTimerHeap = hp'
, sessionKeyValueMap = mp'
, sessionOutputStream = K.nil
-- Got a timer tick event
-- XXX can we yield the entries without accumulating them?
sstep (Tuple4' evTime heap sessions _) Nothing = do
(hp', mp', out) <- go heap sessions K.nil
return $ Tuple4' curTime hp' mp' out
sstep (session@SessionState{..}) Nothing = do
(hp', mp', out, count) <- go sessionTimerHeap sessionKeyValueMap K.nil 0
return $ session
{ sessionCurTime = curTime
, sessionCount = sessionCount - count
, sessionTimerHeap = hp'
, sessionKeyValueMap = mp'
, sessionOutputStream = out
curTime = addToAbsTime evTime tickMs
go hp mp out = do
curTime = addToAbsTime sessionCurTime tickMs
go hp mp out cnt = do
let hres = H.uncons hp
case hres of
Just (Entry ts key, hp') -> do
let duration = diffAbsTime curTime ts
if duration >= timeoutMs
Just (Entry expiry key, hp') -> do
if curTime >= expiry
then do
let (r, mp') = Map.updateLookupWithKey
(\_ _ -> Nothing) key mp
case r of
Nothing -> go hp' mp' out
Nothing -> go hp' mp' out cnt
Just (Tuple' latestTS acc) -> do
let dur = diffAbsTime curTime latestTS
if dur >= timeoutMs || not reset
let expiry' = addToAbsTime latestTS timeoutMs
if curTime >= expiry' || not reset
then do
sess <- extract acc
go hp' mp' ((key, sess) `K.cons` out)
(cnt + 1)
-- reset the session timeout
let expiry = addToAbsTime latestTS timeoutMs
hp'' = H.insert (Entry expiry key) hp'
mp'' = Map.insert key (Tuple' latestTS acc) mp'
in go hp'' mp'' out
else return (hp, mp, out)
Nothing -> return (hp, mp, out)
-- XXX purge anyway if the session size
-- goes beyond maximum session size.
let hp'' = H.insert (Entry expiry' key) hp'
mp'' = Map.insert key
(Tuple' latestTS acc) mp'
in go hp'' mp'' out cnt
else return (hp, mp, out, cnt)
Nothing -> return (hp, mp, out, cnt)
-- merge timer events in the stream
stream = Just str `Par.parallel` repeatM timer
@ -4098,7 +4161,12 @@ classifySessionsBy tick timeout reset (Fold step initial extract) str =
-- received within the session window. The session times out and gets closed
-- only if no event is received within the specified session window size.
-- @since 0.7.0
-- @
-- classifyKeepAliveSessions timeout = classifySessionsBy 1 timeout True
-- @
-- /Internal/
{-# INLINABLE classifyKeepAliveSessions #-}
:: (IsStream t, MonadAsync m, Ord k)
@ -4147,7 +4215,12 @@ classifyChunksOf wsize = classifyChunksBy wsize False
-- session window end, a monotonic event time clock is maintained synced with
-- the timestamps with a clock resolution of 1 second.
-- @since 0.7.0
-- @
-- classifySessionsOf interval = classifySessionsBy 1 interval False
-- @
-- /Internal/
{-# INLINABLE classifySessionsOf #-}
:: (IsStream t, MonadAsync m, Ord k)