From 50548138d6b0f41a17008ab60a695338d7cac0a7 Mon Sep 17 00:00:00 2001 From: adithyaov Date: Wed, 1 Jan 2020 15:01:00 +0530 Subject: [PATCH] Add currentTime and its benchmarks --- benchmark/Linear.hs | 7 ++++++ benchmark/Streamly/Benchmark/Prelude.hs | 5 ++++ src/Streamly/Internal/Data/Stream/StreamD.hs | 26 ++++++++++++++++++++ src/Streamly/Internal/Prelude.hs | 18 ++++++++++++++ 4 files changed, 56 insertions(+) diff --git a/benchmark/Linear.hs b/benchmark/Linear.hs index 61ad56759..1fd2a037d 100644 --- a/benchmark/Linear.hs +++ b/benchmark/Linear.hs @@ -161,6 +161,13 @@ main = do -- These are essentially cons and consM , benchIOSrc serially "fromFoldable" (Ops.sourceFromFoldable value) , benchIOSrc serially "fromFoldableM" (Ops.sourceFromFoldableM value) + + -- Compare this with takeAll + , bench "currentTime: 0.00001 sec granularity" $ nfIO $ Ops.currentTime value 0.00001 + , bench "currentTime: 0.0001 sec granularity" $ nfIO $ Ops.currentTime value 0.0001 + , bench "currentTime: 0.001 sec granularity" $ nfIO $ Ops.currentTime value 0.001 + , bench "currentTime: 0.01 sec granularity" $ nfIO $ Ops.currentTime value 0.01 + ] , bgroup "elimination" [ bgroup "reduce" diff --git a/benchmark/Streamly/Benchmark/Prelude.hs b/benchmark/Streamly/Benchmark/Prelude.hs index 0f41b06fe..7f4aace14 100644 --- a/benchmark/Streamly/Benchmark/Prelude.hs +++ b/benchmark/Streamly/Benchmark/Prelude.hs @@ -382,6 +382,11 @@ filterAllOut, mapMaybeM :: S.MonadAsync m => Int -> Stream m Int -> m () intersperse :: S.MonadAsync m => Int -> Int -> Stream m Int -> m () +-- XXX Change granularity and compare with takeAll +{-# INLINE currentTime #-} +currentTime :: S.MonadAsync m => Int -> Double -> m () +currentTime value g = S.drain $ S.take value $ Internal.currentTime g + {-# INLINE mapM #-} {-# INLINE map' #-} {-# INLINE fmap' #-} diff --git a/src/Streamly/Internal/Data/Stream/StreamD.hs b/src/Streamly/Internal/Data/Stream/StreamD.hs index 6cc848849..882b40ea6 100644 --- a/src/Streamly/Internal/Data/Stream/StreamD.hs +++ b/src/Streamly/Internal/Data/Stream/StreamD.hs @@ -311,6 +311,7 @@ module Streamly.Internal.Data.Stream.StreamD -- * Time related , takeByTime , dropByTime + , currentTime ) where @@ -345,6 +346,7 @@ import qualified Control.Monad.Reader as Reader import qualified Control.Monad.State.Strict as State import qualified Prelude +import Data.Int (Int64) import Streamly.Internal.Mutable.Prim.Var (Prim, Var, readVar, newVar, modifyVar') import Streamly.Internal.Data.Time.Units @@ -355,6 +357,7 @@ import Streamly.Internal.Memory.Array.Types (Array(..)) import Streamly.Internal.Data.Fold.Types (Fold(..)) import Streamly.Internal.Data.Pipe.Types (Pipe(..), PipeState(..)) import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime) +import Streamly.Internal.Data.Time.Units (MicroSecond64(..), fromAbsTime, toAbsTime, AbsTime) import Streamly.Internal.Data.Unfold.Types (Unfold(..)) import Streamly.Internal.Data.Strict (Tuple3'(..)) @@ -4207,3 +4210,26 @@ dropByTime duration (Stream step1 state1) = Stream step (DropByTimeInit state1) Yield x s -> Yield x (DropByTimeYield s) Skip s -> Skip (DropByTimeYield s) Stop -> Stop + +{-# INLINE_NORMAL currentTime #-} +currentTime :: MonadAsync m => Double -> Stream m AbsTime +currentTime g = Stream step Nothing + where + next timeVar = do + threadDelay $ delayTime + MicroSecond64 t <- fromAbsTime <$> getTime Monotonic + modifyVar' timeVar (const t) + next timeVar + g' = g * 10 ^ (6 :: Int) + {-# INLINE delayTime #-} + delayTime = if g' >= fromIntegral (maxBound :: Int) + then maxBound + else round g' + {-# INLINE_LATE step #-} + step _ Nothing = do + timeVar <- liftIO $ newVar (0 :: Int64) + tid <- forkManaged $ liftIO $ void $ next timeVar + return $ Skip $ Just (timeVar, tid) + step _ s@(Just (timeVar, _)) = do + a <- liftIO $ readVar timeVar + return $ Yield (toAbsTime (MicroSecond64 a)) s diff --git a/src/Streamly/Internal/Prelude.hs b/src/Streamly/Internal/Prelude.hs index d455bef94..fcc357f4e 100644 --- a/src/Streamly/Internal/Prelude.hs +++ b/src/Streamly/Internal/Prelude.hs @@ -421,6 +421,9 @@ module Streamly.Internal.Prelude -- * Diagnostics , inspectMode + -- * Time related + , currentTime + -- * Deprecated , K.once , each @@ -4297,3 +4300,18 @@ usingStateT s f xs = evalStateT s $ f $ liftInner xs {-# INLINE runStateT #-} runStateT :: Monad m => s -> SerialT (StateT s m) a -> SerialT m (s, a) runStateT s xs = fromStreamD $ D.runStateT s (toStreamD xs) + +------------------------------------------------------------------------------ +-- Time related +------------------------------------------------------------------------------ + +-- | /currentTime g/ returns a stream of 'AbsTime'. The time is updated every +-- /g/ seconds. Between any 2 updates, the stream will contain the same +-- element. Getting the absolute time is a costly operation and hence the time +-- taken to generate this stream depends on the granularity /g/. If /g/ is very +-- low, the time taken to generate this the elements of this stream will be +-- very high. Conversely, if the granularity is high, the time taken to +-- generate the elements of this stream will be low. +{-# INLINE currentTime #-} +currentTime :: (IsStream t, MonadAsync m) => Double -> t m AbsTime +currentTime g = fromStreamD $ D.currentTime g