Implement timestamp* and timeIndex* operations (#1944)

This commit is contained in:
Ranjeet Ranjan 2022-10-13 15:46:40 +05:30 committed by GitHub
parent bc60c1ab88
commit 8d8e5a0c30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -70,6 +70,12 @@ module Streamly.Internal.Data.Stream.Transform
, indexed
, indexedR
-- * Time Indexing
, timestamped
, timestampWith
, timeIndexed
, timeIndexWith
-- * Searching
, findIndices -- XXX indicesBy
, elemIndices -- XXX indicesOf
@ -138,6 +144,7 @@ import Data.Maybe (isJust, fromJust)
import Streamly.Internal.Data.Fold.Type (Fold)
import Streamly.Internal.Data.Pipe (Pipe)
import Streamly.Internal.Data.Time.Units (AbsTime, RelTime64)
import qualified Streamly.Internal.Data.Fold as FL
-- import qualified Streamly.Internal.Data.Fold.Window as Window
@ -145,6 +152,7 @@ import qualified Streamly.Internal.Data.Stream.StreamD.Transform as D
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import Streamly.Internal.Data.Stream.Bottom
import Streamly.Internal.Data.Stream.Type
@ -631,7 +639,7 @@ intersperseM_ m = fromStreamD . D.intersperseM_ m . toStreamD
-- /Unimplemented/
{-# INLINE intersperseBySpan #-}
intersperseBySpan :: -- Monad m =>
Int -> m a -> t m a -> t m a
Int -> m a -> Stream m a -> Stream m a
intersperseBySpan _n _f _xs = undefined
-- | Insert an effect and its output after consuming an element of a stream.
@ -753,8 +761,8 @@ reassembleBy
:: -- Monad m =>
Fold m a b
-> (a -> a -> Int)
-> t m a
-> t m b
-> Stream m a
-> Stream m b
reassembleBy = undefined
------------------------------------------------------------------------------
@ -796,6 +804,68 @@ indexedR :: Monad m => Int -> Stream m a -> Stream m (Int, a)
-- indexedR n = scanMaybe (FL.indexingRev n)
indexedR n = fromStreamD . D.indexedR n . toStreamD
-------------------------------------------------------------------------------
-- Time Indexing
-------------------------------------------------------------------------------
-- Note: The timestamp stream must be the second stream in the zip so that the
-- timestamp is generated after generating the stream element and not before.
-- If we do not do that then the following example will generate the same
-- timestamp for first two elements:
--
-- Stream.fold Fold.toList $ Stream.timestamped $ Stream.delay $ Stream.enumerateFromTo 1 3
--
-- | Pair each element in a stream with an absolute timestamp, using a clock of
-- specified granularity. The timestamp is generated just before the element
-- is consumed.
--
-- >>> Stream.fold Fold.toList $ Stream.timestampWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
-- [(AbsTime (TimeSpec {sec = ..., nsec = ...}),1),(AbsTime (TimeSpec {sec = ..., nsec = ...}),2),(AbsTime (TimeSpec {sec = ..., nsec = ...}),3)]
--
-- /Pre-release/
--
{-# INLINE timestampWith #-}
timestampWith :: (MonadIO m)
=> Double -> Stream m a -> Stream m(AbsTime, a)
timestampWith g stream = zipWith (flip (,)) stream (absTimesWith g)
-- TBD: check performance vs a custom implementation without using zipWith.
--
-- /Pre-release/
--
{-# INLINE timestamped #-}
timestamped :: (MonadIO m)
=> Stream m a -> Stream m(AbsTime, a)
timestamped = timestampWith 0.01
-- | Pair each element in a stream with relative times starting from 0, using a
-- clock with the specified granularity. The time is measured just before the
-- element is consumed.
--
-- >>> Stream.fold Fold.toList $ Stream.timeIndexWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
-- [(RelTime64 (NanoSecond64 ...),1),(RelTime64 (NanoSecond64 ...),2),(RelTime64 (NanoSecond64 ...),3)]
--
-- /Pre-release/Monad
--
{-# INLINE timeIndexWith #-}
timeIndexWith :: (MonadIO m)
=> Double -> Stream m a -> Stream m(RelTime64, a)
timeIndexWith g stream = zipWith (flip (,)) stream (relTimesWith g)
-- | Pair each element in a stream with relative times starting from 0, using a
-- 10 ms granularity clock. The time is measured just before the element is
-- consumed.
--
-- >>> Stream.fold Fold.toList $ Stream.timeIndexed $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
-- [(RelTime64 (NanoSecond64 ...),1),(RelTime64 (NanoSecond64 ...),2),(RelTime64 (NanoSecond64 ...),3)]
--
-- /Pre-release/
--
{-# INLINE timeIndexed #-}
timeIndexed :: (MonadIO m)
=> Stream m a -> Stream m(RelTime64, a)
timeIndexed = timeIndexWith 0.01
------------------------------------------------------------------------------
-- Searching
------------------------------------------------------------------------------