Add currentTime and its benchmarks

This commit is contained in:
adithyaov 2020-01-01 15:01:00 +05:30 committed by Harendra Kumar
parent b19d96f174
commit 50548138d6
4 changed files with 56 additions and 0 deletions

View File

@ -161,6 +161,13 @@ main = do
-- These are essentially cons and consM
, benchIOSrc serially "fromFoldable" (Ops.sourceFromFoldable value)
, benchIOSrc serially "fromFoldableM" (Ops.sourceFromFoldableM value)
-- Compare this with takeAll
, bench "currentTime: 0.00001 sec granularity" $ nfIO $ Ops.currentTime value 0.00001
, bench "currentTime: 0.0001 sec granularity" $ nfIO $ Ops.currentTime value 0.0001
, bench "currentTime: 0.001 sec granularity" $ nfIO $ Ops.currentTime value 0.001
, bench "currentTime: 0.01 sec granularity" $ nfIO $ Ops.currentTime value 0.01
]
, bgroup "elimination"
[ bgroup "reduce"

View File

@ -382,6 +382,11 @@ filterAllOut,
mapMaybeM :: S.MonadAsync m => Int -> Stream m Int -> m ()
intersperse :: S.MonadAsync m => Int -> Int -> Stream m Int -> m ()
-- XXX Change granularity and compare with takeAll
{-# INLINE currentTime #-}
currentTime :: S.MonadAsync m => Int -> Double -> m ()
currentTime value g = S.drain $ S.take value $ Internal.currentTime g
{-# INLINE mapM #-}
{-# INLINE map' #-}
{-# INLINE fmap' #-}

View File

@ -311,6 +311,7 @@ module Streamly.Internal.Data.Stream.StreamD
-- * Time related
, takeByTime
, dropByTime
, currentTime
)
where
@ -345,6 +346,7 @@ import qualified Control.Monad.Reader as Reader
import qualified Control.Monad.State.Strict as State
import qualified Prelude
import Data.Int (Int64)
import Streamly.Internal.Mutable.Prim.Var
(Prim, Var, readVar, newVar, modifyVar')
import Streamly.Internal.Data.Time.Units
@ -355,6 +357,7 @@ import Streamly.Internal.Memory.Array.Types (Array(..))
import Streamly.Internal.Data.Fold.Types (Fold(..))
import Streamly.Internal.Data.Pipe.Types (Pipe(..), PipeState(..))
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units (MicroSecond64(..), fromAbsTime, toAbsTime, AbsTime)
import Streamly.Internal.Data.Unfold.Types (Unfold(..))
import Streamly.Internal.Data.Strict (Tuple3'(..))
@ -4207,3 +4210,26 @@ dropByTime duration (Stream step1 state1) = Stream step (DropByTimeInit state1)
Yield x s -> Yield x (DropByTimeYield s)
Skip s -> Skip (DropByTimeYield s)
Stop -> Stop
{-# INLINE_NORMAL currentTime #-}
currentTime :: MonadAsync m => Double -> Stream m AbsTime
currentTime g = Stream step Nothing
where
next timeVar = do
threadDelay $ delayTime
MicroSecond64 t <- fromAbsTime <$> getTime Monotonic
modifyVar' timeVar (const t)
next timeVar
g' = g * 10 ^ (6 :: Int)
{-# INLINE delayTime #-}
delayTime = if g' >= fromIntegral (maxBound :: Int)
then maxBound
else round g'
{-# INLINE_LATE step #-}
step _ Nothing = do
timeVar <- liftIO $ newVar (0 :: Int64)
tid <- forkManaged $ liftIO $ void $ next timeVar
return $ Skip $ Just (timeVar, tid)
step _ s@(Just (timeVar, _)) = do
a <- liftIO $ readVar timeVar
return $ Yield (toAbsTime (MicroSecond64 a)) s

View File

@ -421,6 +421,9 @@ module Streamly.Internal.Prelude
-- * Diagnostics
, inspectMode
-- * Time related
, currentTime
-- * Deprecated
, K.once
, each
@ -4297,3 +4300,18 @@ usingStateT s f xs = evalStateT s $ f $ liftInner xs
{-# INLINE runStateT #-}
runStateT :: Monad m => s -> SerialT (StateT s m) a -> SerialT m (s, a)
runStateT s xs = fromStreamD $ D.runStateT s (toStreamD xs)
------------------------------------------------------------------------------
-- Time related
------------------------------------------------------------------------------
-- | /currentTime g/ returns a stream of 'AbsTime'. The time is updated every
-- /g/ seconds. Between any 2 updates, the stream will contain the same
-- element. Getting the absolute time is a costly operation and hence the time
-- taken to generate this stream depends on the granularity /g/. If /g/ is very
-- low, the time taken to generate this the elements of this stream will be
-- very high. Conversely, if the granularity is high, the time taken to
-- generate the elements of this stream will be low.
{-# INLINE currentTime #-}
currentTime :: (IsStream t, MonadAsync m) => Double -> t m AbsTime
currentTime g = fromStreamD $ D.currentTime g