Make Map generic version of classifySessionsBy

This commit is contained in:
Harendra Kumar 2022-03-31 13:13:22 +05:30
parent b30eb0e62d
commit 2ebed4d2fe
4 changed files with 101 additions and 64 deletions

View File

@ -18,6 +18,8 @@ import Control.DeepSeq (NFData(..))
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Monoid (Sum(..))
import Data.Proxy (Proxy(..))
import Data.HashMap.Strict (HashMap)
import GHC.Generics (Generic)
import qualified Streamly.Internal.Data.Refold.Type as Refold
@ -219,11 +221,25 @@ classifySessionsOf =
. Internal.timestamped
. S.concatMap (\x -> S.map (x,) (S.enumerateFromTo 1 (10 :: Int)))
{-# INLINE classifySessionsOfHash #-}
classifySessionsOfHash :: (S.MonadAsync m) => SerialT m Int -> m ()
classifySessionsOfHash =
S.drain
. Internal.classifySessionsByGeneric
(Proxy :: Proxy (HashMap k v))
1 False (const (return False)) 3 FL.drain
. Internal.timestamped
. S.concatMap (\x -> S.map (x,) (S.enumerateFromTo 1 (10 :: Int)))
o_n_space_grouping :: Int -> [Benchmark]
o_n_space_grouping value =
-- Buffering operations using heap proportional to group/window sizes.
[ bgroup "grouping"
[ benchIOSink (value `div` 10) "classifySessionsOf" classifySessionsOf
-- We use 10 element stream per input, so div by 10 here
[ benchIOSink (value `div` 10) "classifySessionsOf"
classifySessionsOf
, benchIOSink (value `div` 10) "classifySessionsOfHash"
classifySessionsOfHash
]
]

View File

@ -68,7 +68,7 @@ bench_rts_options () {
Prelude.Serial/o-1-space/mixed/sum-product-fold) echo -n "-K64M" ;;
# XXX These should be moved to o-n-space?
Prelude.Serial/o-n-heap/grouping/classifySessionsOf) echo -n "-K1M -M32M" ;;
Prelude.Serial/o-n-heap/grouping/classifySessionsOf*) echo -n "-K1M -M32M" ;;
Prelude.Serial/o-n-heap/Functor/*) echo -n "-K4M -M32M" ;;
Prelude.Serial/o-n-heap/transformer/*) echo -n "-K8M -M64M" ;;

View File

@ -29,6 +29,7 @@ class IsMap f where
mapInsert :: Key f -> a -> f a -> f a
mapDelete :: Key f -> f a -> f a
mapUnion :: f a -> f a -> f a
mapNull :: f a -> Bool
instance Ord k => IsMap (Map k) where
type Key (Map k) = k
@ -39,6 +40,7 @@ instance Ord k => IsMap (Map k) where
mapInsert = Map.insert
mapDelete = Map.delete
mapUnion = Map.union
mapNull = Map.null
instance IsMap IntMap.IntMap where
type Key IntMap.IntMap = Int
@ -49,6 +51,7 @@ instance IsMap IntMap.IntMap where
mapInsert = IntMap.insert
mapDelete = IntMap.delete
mapUnion = IntMap.union
mapNull = IntMap.null
instance (Hashable k, Eq k) => IsMap (HashMap.HashMap k) where
type Key (HashMap.HashMap k) = k
@ -59,3 +62,4 @@ instance (Hashable k, Eq k) => IsMap (HashMap.HashMap k) where
mapInsert = HashMap.insert
mapDelete = HashMap.delete
mapUnion = HashMap.union
mapNull = HashMap.null

View File

@ -89,6 +89,7 @@ module Streamly.Internal.Data.Stream.IsStream.Reduce
-- | A new window starts after the previous window is finished.
-- , classifyChunksOf
, classifySessionsByGeneric
, classifySessionsBy
, classifySessionsOf
@ -153,10 +154,13 @@ import Control.Monad.Catch (MonadThrow)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Heap (Entry(..))
import Data.Kind (Type)
import Data.Map (Map)
import Data.Maybe (isNothing)
import Data.Proxy (Proxy(..))
import Foreign.Storable (Storable)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Fold.Type (Fold (..))
import Streamly.Internal.Data.IsMap (IsMap(..))
import Streamly.Internal.Data.Refold.Type (Refold (..))
import Streamly.Internal.Data.Parser (Parser (..))
import Streamly.Internal.Data.Array.Foreign.Type (Array)
@ -178,6 +182,7 @@ import qualified Data.Heap as H
import qualified Data.Map.Strict as Map
import qualified Streamly.Internal.Data.Array.Foreign.Type as A
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.IsMap as IsMap
import qualified Streamly.Internal.Data.Parser.ParserD as PRD
import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream
import qualified Streamly.Internal.Data.Stream.StreamD as D
@ -1150,64 +1155,23 @@ data SessionState t m k a b = SessionState
data SessionEntry a b = LiveSession !a !b | ZombieSession
-- | @classifySessionsBy tick keepalive predicate timeout fold stream@
-- classifies an input event @stream@ consisting of @(timestamp, (key,
-- value))@ into sessions based on the @key@, folding all the values
-- corresponding to the same key into a session using the supplied @fold@.
--
-- When the fold terminates or a @timeout@ occurs, a tuple consisting of the
-- session key and the folded value is emitted in the output stream. The
-- timeout is measured from the first event in the session. If the @keepalive@
-- option is set to 'True' the timeout is reset to 0 whenever an event is
-- received.
--
-- The @timestamp@ in the input stream is an absolute time from some epoch,
-- characterizing the time when the input event was generated. The notion of
-- current time is maintained by a monotonic event time clock using the
-- timestamps seen in the input stream. The latest timestamp seen till now is
-- used as the base for the current time. When no new events are seen, a timer
-- is started with a clock resolution of @tick@ seconds. This timer is used to
-- detect session timeouts in the absence of new events.
--
-- To ensure an upper bound on the memory used the number of sessions can be
-- limited to an upper bound. If the ejection @predicate@ returns 'True', the
-- oldest session is ejected before inserting a new session.
--
-- When the stream ends any buffered sessions are ejected immediately.
--
-- If a session key is received even after a session has finished, another
-- session is created for that key.
--
-- >>> :{
-- Stream.mapM_ print
-- $ Stream.classifySessionsBy 1 False (const (return False)) 3 (Fold.take 3 Fold.toList)
-- $ Stream.timestamped
-- $ Stream.delay 0.1
-- $ (,) <$> Stream.fromList [1,2,3] <*> Stream.fromList ['a','b','c']
-- :}
-- (1,"abc")
-- (2,"abc")
-- (3,"abc")
--
-- /Pre-release/
-- XXX Use a proxy type for the map type to use.
-- XXX Use mutable IORef in accumulator
--
-- XXX this fuses with INLINE but it may be too much work for the compiler.
-- Maybe we can NOINLINE the heap processing stuff and then INLINE it to reduce
-- the code bloat.
{-# INLINABLE classifySessionsBy #-}
classifySessionsBy
:: (IsStream t, MonadAsync m, Ord k)
=> Double -- ^ timer tick in seconds
{-# INLINABLE classifySessionsByGeneric #-}
classifySessionsByGeneric
:: (IsStream t, MonadAsync m, Ord (Key f))
=> Proxy (f s)
-> Double -- ^ timer tick in seconds
-> Bool -- ^ reset the timer when an event is received
-> (Int -> m Bool) -- ^ predicate to eject sessions based on session count
-> Double -- ^ session timeout in seconds
-> Fold m a b -- ^ Fold to be applied to session data
-> t m (AbsTime, (k, a)) -- ^ timestamp, (session key, session data)
-> t m (k, b) -- ^ session key, fold result
classifySessionsBy tick reset ejectPred tmout
-> t m (AbsTime, (Key f, a)) -- ^ timestamp, (session key, session data)
-> t m (Key f, b) -- ^ session key, fold result
classifySessionsByGeneric _ tick reset ejectPred tmout
(Fold step initial extract) input =
Expand.unfoldMany (Unfold.lmap sessionOutputStream Unfold.fromStream)
$ scanlMAfter' sstep (return szero) flush
@ -1223,7 +1187,7 @@ classifySessionsBy tick reset ejectPred tmout
, sessionEventTime = toAbsTime (0 :: MilliSecond64)
, sessionCount = 0
, sessionTimerHeap = H.empty
, sessionKeyValueMap = Map.empty
, sessionKeyValueMap = IsMap.mapEmpty
, sessionOutputStream = IsStream.nil
}
@ -1281,7 +1245,7 @@ classifySessionsBy tick reset ejectPred tmout
-- way to achieve mutual recursion.
--
let curTime = max sessionEventTime timestamp
mOld = Map.lookup key sessionKeyValueMap
mOld = IsMap.mapLookup key sessionKeyValueMap
let done fb = do
-- deleting a key from the heap is expensive, so we never
-- delete a key from heap, we just purge it from the Map and it
@ -1291,7 +1255,8 @@ classifySessionsBy tick reset ejectPred tmout
--
let (mp, cnt) = case mOld of
Just (LiveSession _ _) ->
( Map.insert key ZombieSession sessionKeyValueMap
( IsMap.mapInsert
key ZombieSession sessionKeyValueMap
, sessionCount - 1
)
_ -> (sessionKeyValueMap, sessionCount)
@ -1324,7 +1289,7 @@ classifySessionsBy tick reset ejectPred tmout
Just _ -> return vars
let acc = LiveSession expiry fs1
mp2 = Map.insert key acc mp1
mp2 = IsMap.mapInsert key acc mp1
return $ SessionState
{ sessionCurTime = curTime
, sessionEventTime = curTime
@ -1371,17 +1336,17 @@ classifySessionsBy tick reset ejectPred tmout
ejectEntry hp mp out cnt acc key = do
sess <- extract acc
let out1 = (key, sess) `cons` out
let mp1 = Map.delete key mp
let mp1 = IsMap.mapDelete key mp
return (hp, mp1, out1, cnt - 1)
ejectAll (hp, mp, out, !cnt) = do
let hres = H.uncons hp
case hres of
Just (Entry _ key, hp1) -> do
r <- case Map.lookup key mp of
r <- case IsMap.mapLookup key mp of
Nothing -> return (hp1, mp, out, cnt)
Just ZombieSession ->
return (hp1, Map.delete key mp, out, cnt)
return (hp1, IsMap.mapDelete key mp, out, cnt)
Just (LiveSession _ acc) ->
ejectEntry hp1 mp out cnt acc key
ejectAll r
@ -1393,10 +1358,10 @@ classifySessionsBy tick reset ejectPred tmout
let hres = H.uncons hp
case hres of
Just (Entry expiry key, hp1) ->
case Map.lookup key mp of
case IsMap.mapLookup key mp of
Nothing -> ejectOne (hp1, mp, out, cnt)
Just ZombieSession ->
ejectOne (hp1, Map.delete key mp, out, cnt)
ejectOne (hp1, IsMap.mapDelete key mp, out, cnt)
Just (LiveSession expiry1 acc) -> do
if not reset || expiry1 <= expiry
then ejectEntry hp1 mp out cnt acc key
@ -1405,7 +1370,7 @@ classifySessionsBy tick reset ejectPred tmout
let hp2 = H.insert (Entry expiry1 key) hp1
in ejectOne (hp2, mp, out, cnt)
Nothing -> do
assert (Map.null mp) (return ())
assert (IsMap.mapNull mp) (return ())
return (hp, mp, out, cnt)
ejectExpired session@SessionState{..} curTime = do
@ -1434,10 +1399,10 @@ classifySessionsBy tick reset ejectPred tmout
return (r, r)
if eject
then
case Map.lookup key mp of
case IsMap.mapLookup key mp of
Nothing -> ejectLoop hp1 mp out cnt
Just ZombieSession ->
ejectLoop hp1 (Map.delete key mp) out cnt
ejectLoop hp1 (IsMap.mapDelete key mp) out cnt
Just (LiveSession expiry1 acc) -> do
if expiry1 <= curTime || not reset || force
then do
@ -1450,9 +1415,61 @@ classifySessionsBy tick reset ejectPred tmout
in ejectLoop hp2 mp out cnt
else return (hp, mp, out, cnt)
Nothing -> do
assert (Map.null mp) (return ())
assert (IsMap.mapNull mp) (return ())
return (hp, mp, out, cnt)
-- | @classifySessionsBy tick keepalive predicate timeout fold stream@
-- classifies an input event @stream@ consisting of @(timestamp, (key,
-- value))@ into sessions based on the @key@, folding all the values
-- corresponding to the same key into a session using the supplied @fold@.
--
-- When the fold terminates or a @timeout@ occurs, a tuple consisting of the
-- session key and the folded value is emitted in the output stream. The
-- timeout is measured from the first event in the session. If the @keepalive@
-- option is set to 'True' the timeout is reset to 0 whenever an event is
-- received.
--
-- The @timestamp@ in the input stream is an absolute time from some epoch,
-- characterizing the time when the input event was generated. The notion of
-- current time is maintained by a monotonic event time clock using the
-- timestamps seen in the input stream. The latest timestamp seen till now is
-- used as the base for the current time. When no new events are seen, a timer
-- is started with a clock resolution of @tick@ seconds. This timer is used to
-- detect session timeouts in the absence of new events.
--
-- To ensure an upper bound on the memory used the number of sessions can be
-- limited to an upper bound. If the ejection @predicate@ returns 'True', the
-- oldest session is ejected before inserting a new session.
--
-- When the stream ends any buffered sessions are ejected immediately.
--
-- If a session key is received even after a session has finished, another
-- session is created for that key.
--
-- >>> :{
-- Stream.mapM_ print
-- $ Stream.classifySessionsBy 1 False (const (return False)) 3 (Fold.take 3 Fold.toList)
-- $ Stream.timestamped
-- $ Stream.delay 0.1
-- $ (,) <$> Stream.fromList [1,2,3] <*> Stream.fromList ['a','b','c']
-- :}
-- (1,"abc")
-- (2,"abc")
-- (3,"abc")
--
-- /Pre-release/
{-# INLINE classifySessionsBy #-}
classifySessionsBy
:: (IsStream t, MonadAsync m, Ord k)
=> Double -- ^ timer tick in seconds
-> Bool -- ^ reset the timer when an event is received
-> (Int -> m Bool) -- ^ predicate to eject sessions based on session count
-> Double -- ^ session timeout in seconds
-> Fold m a b -- ^ Fold to be applied to session data
-> t m (AbsTime, (k, a)) -- ^ timestamp, (session key, session data)
-> t m (k, b) -- ^ session key, fold result
classifySessionsBy = classifySessionsByGeneric (Proxy :: Proxy (Map k s))
-- | Same as 'classifySessionsBy' with a timer tick of 1 second and keepalive
-- option set to 'True'.
--