Add correct impl of takeInterval & dropInterval to Stream.Parallel

- Remove the incorrect implementation of from IsStream
- Remove takeByTime and dropByTime from StreamD
This commit is contained in:
Adithya Kumar 2022-09-16 17:04:04 +05:30
parent 99846a8f07
commit ba4d23b489
4 changed files with 131 additions and 165 deletions

View File

@ -35,6 +35,7 @@ import qualified Streamly.Internal.Data.Unfold as Unfold
import qualified Streamly.Internal.Data.Stream.IsStream as Stream
import Streamly.Internal.Data.Time.Units
#else
import qualified Streamly.Internal.Data.Stream.Parallel as Stream
import qualified Streamly.Internal.Data.Stream as Stream
#endif
@ -301,17 +302,17 @@ takeWhileTrue value n = composeN n $ Stream.takeWhile (<= (value + 1))
takeWhileMTrue :: MonadIO m => Int -> Int -> Stream m Int -> m ()
takeWhileMTrue value n = composeN n $ Stream.takeWhileM (return . (<= (value + 1)))
#ifdef USE_PRELUDE
{-# INLINE takeInterval #-}
takeInterval :: NanoSecond64 -> Int -> Stream IO Int -> IO ()
takeInterval :: Double -> Int -> Stream IO Int -> IO ()
takeInterval i n = composeN n (Stream.takeInterval i)
-- Inspection testing is disabled for takeInterval
-- Enable it when looking at it throughly
#ifdef INSPECTION
-- inspect $ hasNoType 'takeInterval ''SPEC
inspect $ hasNoTypeClasses 'takeInterval
-- inspect $ hasNoTypeClasses 'takeInterval
-- inspect $ 'takeInterval `hasNoType` ''D.Step
#endif
#endif
{-# INLINE dropOne #-}
dropOne :: MonadIO m => Int -> Stream m Int -> m ()
@ -338,16 +339,18 @@ dropWhileFalse value n = composeN n $ Stream.dropWhile (> (value + 1))
{-# INLINE _intervalsOfSum #-}
_intervalsOfSum :: MonadAsync m => Double -> Int -> Stream m Int -> m ()
_intervalsOfSum i n = composeN n (Stream.intervalsOf i FL.sum)
#endif
{-# INLINE dropInterval #-}
dropInterval :: NanoSecond64 -> Int -> Stream IO Int -> IO ()
dropInterval :: Double -> Int -> Stream IO Int -> IO ()
dropInterval i n = composeN n (Stream.dropInterval i)
-- Inspection testing is disabled for dropInterval
-- Enable it when looking at it throughly
#ifdef INSPECTION
inspect $ hasNoTypeClasses 'dropInterval
-- inspect $ hasNoTypeClasses 'dropInterval
-- inspect $ 'dropInterval `hasNoType` ''D.Step
#endif
#endif
{-# INLINE findIndices #-}
findIndices :: MonadIO m => Int -> Int -> Stream m Int -> m ()
@ -403,16 +406,8 @@ o_1_space_filtering value =
-- , benchIOSink value "takeWhileM-true" (_takeWhileMTrue value 1)
, benchIOSink value "drop-one" (dropOne 1)
, benchIOSink value "drop-all" (dropAll value 1)
#ifdef USE_PRELUDE
, benchIOSink
value
"takeInterval-all"
(takeInterval (NanoSecond64 maxBound) 1)
, benchIOSink
value
"dropInterval-all"
(dropInterval (NanoSecond64 maxBound) 1)
#endif
, benchIOSink value "takeInterval-all" (takeInterval 10000 1)
, benchIOSink value "dropInterval-all" (dropInterval 10000 1)
, benchIOSink value "dropWhile-true" (dropWhileTrue value 1)
-- , benchIOSink value "dropWhileM-true" (_dropWhileMTrue value 1)
, benchIOSink

View File

@ -78,11 +78,9 @@ module Streamly.Internal.Data.Stream.StreamD.Transform
-- * Trimming
-- | Produce a subset of the stream trimmed at ends.
, take
, takeByTime
, takeWhile
, takeWhileM
, drop
, dropByTime
, dropWhile
, dropWhileM
@ -133,17 +131,14 @@ import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Maybe (fromJust, isJust)
import Fusion.Plugin.Types (Fuse(..))
import GHC.Types (SPEC(..))
import qualified Data.Set as Set
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Control.ForkLifted (forkManaged)
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Pipe.Type (Pipe(..), PipeState(..))
import Streamly.Internal.Data.SVar.Type (defState, adaptState)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units
(TimeUnit64, toRelTime64, diffAbsTime64)
import qualified Data.Set as Set
import qualified Streamly.Internal.Data.Fold.Type as FL
import qualified Streamly.Internal.Data.IORef.Unboxed as Unboxed
import qualified Streamly.Internal.Data.Pipe.Type as Pipe
@ -791,84 +786,6 @@ deleteBy eq x (Stream step state) = Stream step' (state, False)
-- Trimming
------------------------------------------------------------------------------
-- XXX using getTime in the loop can be pretty expensive especially for
-- computations where iterations are lightweight. We have the following
-- options:
--
-- 1) Run a timeout thread updating a flag asynchronously and check that
-- flag here, that way we can have a cheap termination check.
--
-- 2) Use COARSE clock to get time with lower resolution but more efficiently.
--
-- 3) Use rdtscp/rdtsc to get time directly from the processor, compute the
-- termination value of rdtsc in the beginning and then in each iteration just
-- get rdtsc and check if we should terminate.
--
data TakeByTime st s
= TakeByTimeInit st
| TakeByTimeCheck st s
| TakeByTimeYield st s
{-# INLINE_NORMAL takeByTime #-}
takeByTime :: (MonadIO m, TimeUnit64 t) => t -> Stream m a -> Stream m a
takeByTime duration (Stream step1 state1) = Stream step (TakeByTimeInit state1)
where
lim = toRelTime64 duration
{-# INLINE_LATE step #-}
step _ (TakeByTimeInit _) | lim == 0 = return Stop
step _ (TakeByTimeInit st) = do
t0 <- liftIO $ getTime Monotonic
return $ Skip (TakeByTimeYield st t0)
step _ (TakeByTimeCheck st t0) = do
t <- liftIO $ getTime Monotonic
return $
if diffAbsTime64 t t0 > lim
then Stop
else Skip (TakeByTimeYield st t0)
step gst (TakeByTimeYield st t0) = do
r <- step1 gst st
return $ case r of
Yield x s -> Yield x (TakeByTimeCheck s t0)
Skip s -> Skip (TakeByTimeCheck s t0)
Stop -> Stop
data DropByTime st s x
= DropByTimeInit st
| DropByTimeGen st s
| DropByTimeCheck st s x
| DropByTimeYield st
{-# INLINE_NORMAL dropByTime #-}
dropByTime :: (MonadIO m, TimeUnit64 t) => t -> Stream m a -> Stream m a
dropByTime duration (Stream step1 state1) = Stream step (DropByTimeInit state1)
where
lim = toRelTime64 duration
{-# INLINE_LATE step #-}
step _ (DropByTimeInit st) = do
t0 <- liftIO $ getTime Monotonic
return $ Skip (DropByTimeGen st t0)
step gst (DropByTimeGen st t0) = do
r <- step1 gst st
return $ case r of
Yield x s -> Skip (DropByTimeCheck s t0 x)
Skip s -> Skip (DropByTimeGen s t0)
Stop -> Stop
step _ (DropByTimeCheck st t0 x) = do
t <- liftIO $ getTime Monotonic
if diffAbsTime64 t t0 <= lim
then return $ Skip $ DropByTimeGen st t0
else return $ Yield x $ DropByTimeYield st
step gst (DropByTimeYield st) = do
r <- step1 gst st
return $ case r of
Yield x s -> Yield x (DropByTimeYield s)
Skip s -> Skip (DropByTimeYield s)
Stop -> Stop
-- Adapted from the vector package
{-# INLINE_NORMAL drop #-}
drop :: Monad m => Int -> Stream m a -> Stream m a

View File

@ -85,7 +85,6 @@ module Streamly.Internal.Data.Stream.IsStream.Transform
-- | Produce a subset of the stream trimmed at ends.
, take
, takeInterval
, takeLast
, takeLastInterval
, takeWhile
@ -93,7 +92,6 @@ module Streamly.Internal.Data.Stream.IsStream.Transform
, takeWhileLast
, takeWhileAround
, drop
, dropInterval
, dropLast
, dropLastInterval
, dropWhile
@ -250,7 +248,7 @@ import Streamly.Internal.Data.Stream.IsStream.Type
(IsStream(..), fromStreamD, toStreamD, toConsK)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.SVar (Rate(..))
import Streamly.Internal.Data.Time.Units (TimeUnit64, AbsTime, RelTime64)
import Streamly.Internal.Data.Time.Units (AbsTime, RelTime64)
import qualified Streamly.Data.Fold as FL
import qualified Streamly.Internal.Data.Stream.Parallel as Par
@ -259,7 +257,7 @@ import qualified Streamly.Internal.Data.Stream.StreamD as D
(transform, foldrT, tap, tapOffsetEvery, pollCounts, mapM, scanOnce
, scanMany, postscanOnce, scanlx', scanlM', scanl', postscanl', prescanl'
, prescanlM', scanl1M', scanl1', filter, filterM, uniq, deleteBy, takeWhileM
, takeByTime, dropWhile, dropWhileM, dropByTime, insertBy, intersperse
, dropWhile, dropWhileM, insertBy, intersperse
, intersperseM_, intersperseSuffix, intersperseSuffix_
, intersperseSuffixBySpan, indexed, indexedR, rollingMap, rollingMapM
, rollingMap2, mapMaybe, mapMaybeM)
@ -1059,26 +1057,6 @@ takeWhileAround :: -- (IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
takeWhileAround = undefined -- fromStreamD $ D.takeWhileAround n $ toStreamD m
-- | @takeInterval duration@ yields stream elements upto specified time
-- @duration@. The duration starts when the stream is evaluated for the first
-- time, before the first element is yielded. The time duration is checked
-- before generating each element, if the duration has expired the stream
-- stops.
--
-- The total time taken in executing the stream is guaranteed to be /at least/
-- @duration@, however, because the duration is checked before generating an
-- element, the upper bound is indeterminate and depends on the time taken in
-- generating and processing the last element.
--
-- No element is yielded if the duration is zero. At least one element is
-- yielded if the duration is non-zero.
--
-- /Pre-release/
--
{-# INLINE takeInterval #-}
takeInterval ::(MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a
takeInterval d = fromStreamD . D.takeByTime d . toStreamD
-- | Drop elements in the stream as long as the predicate succeeds and then
-- take the rest of the stream.
--
@ -1094,24 +1072,6 @@ dropWhile p m = fromStreamD $ D.dropWhile p $ toStreamD m
dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
dropWhileM p m = fromStreamD $ D.dropWhileM p $ toStreamD m
-- | @dropInterval duration@ drops stream elements until specified @duration@ has
-- passed. The duration begins when the stream is evaluated for the first
-- time. The time duration is checked /after/ generating a stream element, the
-- element is yielded if the duration has expired otherwise it is dropped.
--
-- The time elapsed before starting to generate the first element is /at most/
-- @duration@, however, because the duration expiry is checked after the
-- element is generated, the lower bound is indeterminate and depends on the
-- time taken in generating an element.
--
-- All elements are yielded if the duration is zero.
--
-- /Pre-release/
--
{-# INLINE dropInterval #-}
dropInterval ::(MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a
dropInterval d = fromStreamD . D.dropByTime d . toStreamD
-- | Drop @n@ elements at the end of the stream.
--
-- O(n) space, where n is the number elements dropped.

View File

@ -11,7 +11,7 @@
--
-- To run examples in this module:
--
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import qualified Streamly.Data.Stream as Stream
-- >>> import Control.Concurrent (threadDelay)
-- >>> :{
-- delay n = do
@ -42,10 +42,15 @@ module Streamly.Internal.Data.Stream.Parallel
-- * Callbacks
, newCallbackStream
-- * Combinators
, interjectSuffix
, takeInterval
, dropInterval
)
where
import Control.Concurrent (myThreadId, takeMVar)
import Control.Concurrent (myThreadId, takeMVar, threadDelay)
import Control.Monad (when)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
@ -56,37 +61,36 @@ import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Functor (void)
import Data.IORef (readIORef, writeIORef)
import Data.Maybe (fromJust)
import Data.Maybe (fromJust, isNothing)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import Prelude hiding (map)
import qualified Data.Set as Set
import Streamly.Data.Fold (Fold)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream.StreamD.Type (Step(..))
import Streamly.Internal.Data.Stream.StreamK.Type (Stream)
import Streamly.Internal.Data.Stream.Type (Stream)
import qualified Data.Set as Set
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
(foldStreamShared, mkStream, foldStream, fromEffect
(Stream, foldStreamShared, mkStream, foldStream, fromEffect
, nil, concatMapWith, fromPure, bindWith, withLocal)
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
(Stream(..), mapM, toStreamK, fromStreamK)
import qualified Streamly.Internal.Data.Stream.SVar.Generate as SVar
import qualified Streamly.Internal.Data.Stream.SVar.Eliminate as SVar
import qualified Streamly.Internal.Data.Stream as Stream
(toStreamK, fromStreamK)
(catMaybes, dropWhile, fromStreamK, repeat, sequence, takeWhile, toStreamK)
import Streamly.Internal.Data.SVar
import Prelude hiding (map)
#include "inline.hs"
#include "Instances.hs"
--
-- $setup
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import qualified Streamly.Data.Stream as Stream
-- >>> import Control.Concurrent (threadDelay)
-- >>> :{
-- delay n = do
@ -106,7 +110,7 @@ import Streamly.Internal.Data.SVar
{-# NOINLINE runOne #-}
runOne
:: MonadIO m
=> State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
=> State K.Stream m a -> K.Stream m a -> Maybe WorkerInfo -> m ()
runOne st m0 winfo =
case getYieldLimit st of
Nothing -> go m0
@ -129,7 +133,7 @@ runOne st m0 winfo =
runOneLimited
:: MonadIO m
=> State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
=> State K.Stream m a -> K.Stream m a -> Maybe WorkerInfo -> m ()
runOneLimited st m0 winfo = go m0
where
@ -166,7 +170,7 @@ runOneLimited st m0 winfo = go m0
{-# NOINLINE forkSVarPar #-}
forkSVarPar :: MonadAsync m
=> SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
=> SVarStopStyle -> K.Stream m a -> K.Stream m a -> K.Stream m a
forkSVarPar ss m r = K.mkStream $ \st yld sng stp -> do
sv <- newParallelVar ss st
pushWorkerPar sv (runOne st{streamVar = Just sv} m)
@ -179,8 +183,13 @@ forkSVarPar ss m r = K.mkStream $ \st yld sng stp -> do
K.foldStream st yld sng stp $ Stream.toStreamK (SVar.fromSVar sv)
{-# INLINE joinStreamVarPar #-}
joinStreamVarPar :: MonadAsync m
=> SVarStyle -> SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarPar ::
MonadAsync m
=> SVarStyle
-> SVarStopStyle
-> K.Stream m a
-> K.Stream m a
-> K.Stream m a
joinStreamVarPar style ss m1 m2 = K.mkStream $ \st yld sng stp ->
case streamVar st of
Just sv | svarStyle sv == style && svarStopStyle sv == ss -> do
@ -230,7 +239,7 @@ joinStreamVarPar style ss m1 m2 = K.mkStream $ \st yld sng stp ->
-------------------------------------------------------------------------------
{-# INLINE parallelK #-}
parallelK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
parallelK :: MonadAsync m => K.Stream m a -> K.Stream m a -> K.Stream m a
parallelK = joinStreamVarPar ParallelVar StopNone
-- | XXX we can implement it more efficienty by directly implementing instead
@ -248,7 +257,7 @@ consM m (ParallelT r) = ParallelT $ parallelK (K.fromEffect m) r
--
-- /Pre-release/
{-# INLINE parallelFstK #-}
parallelFstK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
parallelFstK :: MonadAsync m => K.Stream m a -> K.Stream m a -> K.Stream m a
parallelFstK = joinStreamVarPar ParallelVar StopBy
-- This is a race like combinator for streams.
@ -258,7 +267,7 @@ parallelFstK = joinStreamVarPar ParallelVar StopBy
--
-- /Pre-release/
{-# INLINE parallelMinK #-}
parallelMinK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
parallelMinK :: MonadAsync m => K.Stream m a -> K.Stream m a -> K.Stream m a
parallelMinK = joinStreamVarPar ParallelVar StopAny
------------------------------------------------------------------------------
@ -269,7 +278,7 @@ parallelMinK = joinStreamVarPar ParallelVar StopAny
--
-- /Pre-release/
--
mkParallelK :: MonadAsync m => Stream m a -> Stream m a
mkParallelK :: MonadAsync m => K.Stream m a -> K.Stream m a
mkParallelK m = K.mkStream $ \st yld sng stp -> do
sv <- newParallelVar StopNone (adaptState st)
-- pushWorkerPar sv (runOne st{streamVar = Just sv} $ toStream m)
@ -342,7 +351,8 @@ mkParallelD m = D.Stream step Nothing
--
-- /Pre-release/
{-# INLINE tapAsyncK #-}
tapAsyncK :: MonadAsync m => (Stream m a -> m b) -> Stream m a -> Stream m a
tapAsyncK ::
MonadAsync m => (K.Stream m a -> m b) -> K.Stream m a -> K.Stream m a
tapAsyncK f m = K.mkStream $ \st yld sng stp -> do
sv <- SVar.newFoldSVar st (f . Stream.toStreamK)
K.foldStreamShared st yld sng stp
@ -420,7 +430,7 @@ tapAsyncF f (D.Stream step1 state1) = D.Stream step TapInit
-- /Since: 0.7.0 (maxBuffer applies to ParallelT streams)/
--
-- @since 0.8.0
newtype ParallelT m a = ParallelT {getParallelT :: Stream m a}
newtype ParallelT m a = ParallelT {getParallelT :: K.Stream m a}
deriving (MonadTrans)
-- | A parallely composing IO stream of elements of type @a@.
@ -507,7 +517,7 @@ MONAD_COMMON_INSTANCES(ParallelT, MONADPARALLEL)
-- /Pre-release/
--
{-# INLINE_NORMAL newCallbackStream #-}
newCallbackStream :: MonadAsync m => m (a -> m (), Stream m a)
newCallbackStream :: MonadAsync m => m (a -> m (), K.Stream m a)
newCallbackStream = do
sv <- newParallelVar StopNone defState
@ -520,3 +530,87 @@ newCallbackStream = do
-- XXX we can return an SVar and then the consumer can unfold from the
-- SVar?
return (callback, D.toStreamK (SVar.fromSVarD sv))
------------------------------------------------------------------------------
-- Combinators
------------------------------------------------------------------------------
{-# INLINE parallelFst #-}
parallelFst :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
parallelFst m1 m2 =
Stream.fromStreamK
$ parallelFstK (Stream.toStreamK m1) (Stream.toStreamK m2)
-- | Intersperse a monadic action into the input stream after every @n@
-- seconds.
--
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Stream.Parallel as Parallel
-- >>> Stream.fold Fold.drain $ Parallel.interjectSuffix 1.05 (putChar ',') $ Stream.mapM (\x -> threadDelay 1000000 >> putChar x) $ Stream.fromList "hello"
-- h,e,l,l,o
--
-- /Pre-release/
{-# INLINE interjectSuffix #-}
interjectSuffix :: MonadAsync m => Double -> m a -> Stream m a -> Stream m a
interjectSuffix n f xs = xs `parallelFst` repeatM timed
where timed = liftIO (threadDelay (round $ n * 1000000)) >> f
repeatM = Stream.sequence . Stream.repeat
-- XXX Notes from D.takeByTime (which was removed)
-- XXX using getTime in the loop can be pretty expensive especially for
-- computations where iterations are lightweight. We have the following
-- options:
--
-- 1) Run a timeout thread updating a flag asynchronously and check that
-- flag here, that way we can have a cheap termination check.
--
-- 2) Use COARSE clock to get time with lower resolution but more efficiently.
--
-- 3) Use rdtscp/rdtsc to get time directly from the processor, compute the
-- termination value of rdtsc in the beginning and then in each iteration just
-- get rdtsc and check if we should terminate.
-- | @takeInterval duration@ yields stream elements upto specified time
-- @duration@ in seconds. The duration starts when the stream is evaluated for
-- the first time, before the first element is yielded. The time duration is
-- checked before generating each element, if the duration has expired the
-- stream stops.
--
-- The total time taken in executing the stream is guaranteed to be /at least/
-- @duration@, however, because the duration is checked before generating an
-- element, the upper bound is indeterminate and depends on the time taken in
-- generating and processing the last element.
--
-- No element is yielded if the duration is zero. At least one element is
-- yielded if the duration is non-zero.
--
-- /Pre-release/
--
{-# INLINE takeInterval #-}
takeInterval :: MonadAsync m => Double -> Stream m a -> Stream m a
takeInterval d =
Stream.catMaybes
. Stream.takeWhile isNothing
. interjectSuffix d (return Nothing) . fmap Just
-- | @dropInterval duration@ drops stream elements until specified @duration@ in
-- seconds has passed. The duration begins when the stream is evaluated for the
-- first time. The time duration is checked /after/ generating a stream element,
-- the element is yielded if the duration has expired otherwise it is dropped.
--
-- The time elapsed before starting to generate the first element is /at most/
-- @duration@, however, because the duration expiry is checked after the
-- element is generated, the lower bound is indeterminate and depends on the
-- time taken in generating an element.
--
-- All elements are yielded if the duration is zero.
--
-- /Pre-release/
--
{-# INLINE dropInterval #-}
dropInterval :: MonadAsync m => Double -> Stream m a -> Stream m a
dropInterval d =
Stream.catMaybes
. Stream.dropWhile isNothing
. interjectSuffix d (return Nothing) . fmap Just