Add Timed API for stream generations

This commit is contained in:
Ranjeet Kumar Ranjan 2022-06-27 17:18:30 +05:30 committed by Harendra Kumar
parent 4bb8b7c950
commit b528bae8ce
2 changed files with 134 additions and 2 deletions

View File

@ -14,6 +14,9 @@ module Streamly.Internal.Data.Stream.Bottom
-- * Generation
fromPure
, fromEffect
, timesWith
, absTimesWith
, relTimesWith
-- * Folds
, fold
@ -55,12 +58,12 @@ where
import Control.Monad.IO.Class (MonadIO(..))
import Streamly.Internal.Data.Fold.Type (Fold (..))
import Streamly.Internal.Data.Time.Units (AbsTime, RelTime64, addToAbsTime64)
import Streamly.Internal.Data.Unboxed (Unboxed)
import Streamly.Internal.System.IO (defaultChunkSize)
import qualified Streamly.Internal.Data.Array.Unboxed.Type as A
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Stream.StreamD as D
@ -84,6 +87,69 @@ import Streamly.Internal.Data.Stream.Type
-- >>> import qualified Streamly.Internal.Data.Parser as Parser
-- >>> import qualified Streamly.Internal.Data.Unfold as Unfold
------------------------------------------------------------------------------
-- Generation - Time related
------------------------------------------------------------------------------
-- | @timesWith g@ returns a stream of time value tuples. The first component
-- of the tuple is an absolute time reference (epoch) denoting the start of the
-- stream and the second component is a time relative to the reference.
--
-- The argument @g@ specifies the granularity of the relative time in seconds.
-- A lower granularity clock gives higher precision but is more expensive in
-- terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.
--
-- >>> import Control.Concurrent (threadDelay)
-- >>> import Streamly.Internal.Data.Stream.Generate as Stream (timesWith)
-- >>> Stream.mapM_ (\x -> print x >> threadDelay 1000000) $ Stream.take 3 $ Stream.timesWith 0.01
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Pre-release/
--
{-# INLINE timesWith #-}
timesWith :: MonadIO m => Double -> Stream m (AbsTime, RelTime64)
timesWith g = fromStreamD $ D.times g
-- | @absTimesWith g@ returns a stream of absolute timestamps using a clock of
-- granularity @g@ specified in seconds. A low granularity clock is more
-- expensive in terms of CPU usage. Any granularity lower than 1 ms is treated
-- as 1 ms.
--
-- >>> Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.absTimesWith 0.01
-- AbsTime (TimeSpec {sec = ..., nsec = ...})
-- AbsTime (TimeSpec {sec = ..., nsec = ...})
-- AbsTime (TimeSpec {sec = ..., nsec = ...})
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Pre-release/
--
{-# INLINE absTimesWith #-}
absTimesWith :: MonadIO m => Double -> Stream m AbsTime
absTimesWith = fmap (uncurry addToAbsTime64) . timesWith
-- | @relTimesWith g@ returns a stream of relative time values starting from 0,
-- using a clock of granularity @g@ specified in seconds. A low granularity
-- clock is more expensive in terms of CPU usage. Any granularity lower than 1
-- ms is treated as 1 ms.
--
-- >>> Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimesWith 0.01
-- RelTime64 (NanoSecond64 ...)
-- RelTime64 (NanoSecond64 ...)
-- RelTime64 (NanoSecond64 ...)
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Pre-release/
--
{-# INLINE relTimesWith #-}
relTimesWith :: MonadIO m => Double -> Stream m RelTime64
relTimesWith = fmap snd . timesWith
------------------------------------------------------------------------------
-- Elimination - Running a Fold
------------------------------------------------------------------------------

View File

@ -21,6 +21,13 @@ module Streamly.Internal.Data.Stream.Generate
, Stream.fromPure
, Stream.fromEffect
-- * Time Enumeration
, times
, absTimes
, absTimesWith
, relTimes
, relTimesWith
-- * Cyclic Elements
, mfix
@ -40,7 +47,10 @@ import Control.Monad.IO.Class (MonadIO)
import Data.Word (Word8)
import Foreign.Storable (Storable)
import GHC.Exts (Addr#, Ptr (Ptr))
import Streamly.Internal.Data.Stream.Bottom
(absTimesWith, relTimesWith, timesWith)
import Streamly.Internal.Data.Stream.Type (Stream, fromStreamK, toStreamK)
import Streamly.Internal.Data.Time.Units (AbsTime, RelTime64, addToAbsTime64)
import Streamly.Internal.Data.Unfold.Type (Unfold)
import qualified Streamly.Internal.Data.Stream.StreamD as D
@ -49,12 +59,14 @@ import qualified Streamly.Internal.Data.Stream.Type as Stream
-- $setup
-- >>> :m
-- >>> import Control.Concurrent (threadDelay)
-- >>> import Data.Function (fix)
-- >>> import Prelude hiding (take)
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Data.Unfold as Unfold
-- >>> import qualified Streamly.Internal.Data.Stream as Stream
-- >>> import GHC.Exts (Ptr (Ptr))
------------------------------------------------------------------------------
-- From Unfold
------------------------------------------------------------------------------
@ -72,6 +84,60 @@ import qualified Streamly.Internal.Data.Stream.Type as Stream
unfold :: Monad m => Unfold m a b -> a -> Stream m b
unfold unf = Stream.fromStreamD . D.unfold unf
------------------------------------------------------------------------------
-- Time Enumeration
------------------------------------------------------------------------------
-- | @times@ returns a stream of time value tuples with clock of 10 ms
-- granularity. The first component of the tuple is an absolute time reference
-- (epoch) denoting the start of the stream and the second component is a time
-- relative to the reference.
--
-- >>> Stream.mapM_ (\x -> print x >> threadDelay 1000000) $ Stream.take 3 $ Stream.times
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Pre-release/
--
{-# INLINE times #-}
times :: MonadIO m => Stream m (AbsTime, RelTime64)
times = timesWith 0.01
-- | @absTimes@ returns a stream of absolute timestamps using a clock of 10 ms
-- granularity.
--
-- >>> Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.absTimes
-- AbsTime (TimeSpec {sec = ..., nsec = ...})
-- AbsTime (TimeSpec {sec = ..., nsec = ...})
-- AbsTime (TimeSpec {sec = ..., nsec = ...})
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Pre-release/
--
{-# INLINE absTimes #-}
absTimes :: MonadIO m => Stream m AbsTime
absTimes = fmap (uncurry addToAbsTime64) times
-- | @relTimes@ returns a stream of relative time values starting from 0,
-- using a clock of granularity 10 ms.
--
-- >>> Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimes
-- RelTime64 (NanoSecond64 ...)
-- RelTime64 (NanoSecond64 ...)
-- RelTime64 (NanoSecond64 ...)
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Pre-release/
--
{-# INLINE relTimes #-}
relTimes :: MonadIO m => Stream m RelTime64
relTimes = fmap snd times
-- | We can define cyclic structures using @let@:
--
-- >>> let (a, b) = ([1, b], head a) in (a, b)