Add modular stream stats counting

* pollCounts to poll the element count in another thread
* delayPost to introduce a delay in polling
* rollingMap to compute diff of successive elements

These combinators can be used to compute and report the element
processing rate in a stream.
This commit is contained in:
Harendra Kumar 2019-12-28 23:01:36 +05:30
parent 135063fb0c
commit 5073377ba9
5 changed files with 168 additions and 12 deletions

View File

@ -323,6 +323,7 @@ main =
, benchIOSink "foldrTMap" (Ops.foldrTMap 1)
, benchIOSink "tap" (Ops.tap 1)
, benchIOSink "tapRate 1 second" (Ops.tapRate 1)
, benchIOSink "pollCounts 1 second" (Ops.pollCounts 1)
, benchIOSink "tapAsync" (Ops.tapAsync 1)
, benchIOSink "tapAsyncS" (Ops.tapAsyncS 1)
]

View File

@ -25,7 +25,7 @@ module Streamly.Benchmark.Prelude where
import Control.DeepSeq (NFData)
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.State.Strict (StateT, get, put)
import Data.Functor.Identity (Identity, runIdentity)
import Data.IORef (newIORef, modifyIORef')
@ -411,6 +411,12 @@ tapRate n str = do
cref <- newIORef 0
composeN n (Internal.tapRate 1 (\c -> modifyIORef' cref (c +))) str
{-# INLINE pollCounts #-}
pollCounts :: Int -> Stream IO Int -> IO ()
pollCounts n str = do
composeN n (Internal.pollCounts f FL.drain) str
where f = Internal.rollingMap (P.-) . Internal.delayPost 1
{-# INLINE tapAsyncS #-}
tapAsyncS :: S.MonadAsync m => Int -> Stream m Int -> m ()
tapAsyncS n = composeN n $ Par.tapAsync S.sum

View File

@ -108,6 +108,7 @@ module Streamly.Internal.Data.SVar
, modifyThread
, doFork
, fork
, forkManaged
, toStreamVar
, SVarStats (..)
@ -118,7 +119,7 @@ module Streamly.Internal.Data.SVar
where
import Control.Concurrent
(ThreadId, myThreadId, threadDelay, throwTo, forkIO)
(ThreadId, myThreadId, threadDelay, throwTo, forkIO, killThread)
import Control.Concurrent.MVar
(MVar, newEmptyMVar, tryPutMVar, takeMVar, tryTakeMVar, newMVar,
tryReadMVar)
@ -151,11 +152,13 @@ import Data.Set (Set)
import GHC.Conc (ThreadId(..))
import GHC.Exts
import GHC.IO (IO(..))
import System.IO (hPutStrLn, stderr)
import System.Mem.Weak (addFinalizer)
import Streamly.Internal.Data.Time.Clock (Clock(..), getTime)
import Streamly.Internal.Data.Time.Units
(AbsTime, NanoSecond64(..), MicroSecond64(..), diffAbsTime64,
fromRelTime64, toRelTime64, showNanoSecond64, showRelTime64)
import System.IO (hPutStrLn, stderr)
import qualified Data.Heap as H
import qualified Data.Set as S
@ -958,6 +961,16 @@ doFork action (RunInIO mrun) exHandler =
fork :: MonadBaseControl IO m => m () -> m ThreadId
fork = liftBaseDiscard forkIO
-- | Fork a thread that is automatically killed as soon as the reference to the
-- returned threadId is garbage collected.
--
{-# INLINABLE forkManaged #-}
forkManaged :: (MonadIO m, MonadBaseControl IO m) => m () -> m ThreadId
forkManaged action = do
tid <- fork action
liftIO $ addFinalizer tid (killThread tid)
return tid
------------------------------------------------------------------------------
-- Collecting results from child workers in a streamed fashion
------------------------------------------------------------------------------

View File

@ -130,6 +130,7 @@ module Streamly.Internal.Data.Stream.StreamD
, tap
, tapAsync
, tapRate
, pollCounts
, drain
, null
, head
@ -254,6 +255,8 @@ module Streamly.Internal.Data.Stream.StreamD
, map
, mapM
, sequence
, rollingMap
, rollingMapM
-- * Inserting
, intersperseM
@ -335,7 +338,7 @@ import qualified Control.Monad.State.Strict as State
import qualified Prelude
import Streamly.Internal.Mutable.Prim.Var
(MonadMut, Prim, Var, readVar, newVar, modifyVar')
(Prim, Var, readVar, newVar, modifyVar')
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS_)
import Streamly.Internal.Memory.Array.Types (Array(..))
import Streamly.Internal.Data.Fold.Types (Fold(..))
@ -668,11 +671,11 @@ toStreamD :: (K.IsStream t, Monad m) => t m a -> Stream m a
toStreamD = fromStreamK . K.toStream
{-# INLINE_NORMAL fromPrimVar #-}
fromPrimVar :: (MonadMut m, Prim a) => Var m a -> Stream m a
fromPrimVar :: (MonadIO m, Prim a) => Var IO a -> Stream m a
fromPrimVar var = Stream step ()
where
{-# INLINE_LATE step #-}
step _ () = readVar var >>= \x -> return $ Yield x ()
step _ () = liftIO (readVar var) >>= \x -> return $ Yield x ()
-------------------------------------------------------------------------------
-- Generation from SVar
@ -3221,6 +3224,40 @@ scanl1M' fstep (Stream step state) = Stream step' (state, Nothing)
scanl1' :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a
scanl1' f = scanl1M' (\x y -> return (f x y))
------------------------------------------------------------------------------
-- Stateful map/scan
------------------------------------------------------------------------------
data RollingMapState s a = RollingMapInit s | RollingMapGo s a
{-# INLINE rollingMapM #-}
rollingMapM :: Monad m => (a -> a -> m b) -> Stream m a -> Stream m b
rollingMapM f (Stream step1 state1) = Stream step (RollingMapInit state1)
where
step gst (RollingMapInit st) = do
r <- step1 (adaptState gst) st
return $ case r of
Yield x s -> Skip $ RollingMapGo s x
Skip s -> Skip $ RollingMapInit s
Stop -> Stop
step gst (RollingMapGo s1 x1) = do
r <- step1 (adaptState gst) s1
case r of
Yield x s -> do
!res <- f x x1
return $ Yield res $ RollingMapGo s x
Skip s -> return $ Skip $ RollingMapGo s x1
Stop -> return $ Stop
{-# INLINE rollingMap #-}
rollingMap :: Monad m => (a -> a -> b) -> Stream m a -> Stream m b
rollingMap f = rollingMapM (\x y -> return $ f x y)
------------------------------------------------------------------------------
-- Tapping/Distributing
------------------------------------------------------------------------------
{-# INLINE tap #-}
tap :: Monad m => Fold m a b -> Stream m a -> Stream m a
tap (Fold fstep initial extract) (Stream step state) = Stream step' Nothing
@ -3242,6 +3279,35 @@ tap (Fold fstep initial extract) (Stream step state) = Stream step' Nothing
void $ extract acc
return $ Stop
{-# INLINE_NORMAL pollCounts #-}
pollCounts
:: MonadAsync m
=> (Stream m Int -> Stream m Int)
-> Fold m Int b
-> Stream m a
-> Stream m a
pollCounts transf fld (Stream step state) = Stream step' Nothing
where
{-# INLINE_LATE step' #-}
step' _ Nothing = do
countVar <- liftIO $ newVar (0 :: Int)
tid <- forkManaged
$ void $ runFold fld
$ transf $ fromPrimVar countVar
return $ Skip (Just (countVar, tid, state))
step' gst (Just (countVar, tid, st)) = do
r <- step gst st
case r of
Yield x s -> do
liftIO $ modifyVar' countVar (+ 1)
return $ Yield x (Just (countVar, tid, s))
Skip s -> return $ Skip (Just (countVar, tid, s))
Stop -> do
liftIO $ killThread tid
return Stop
{-# INLINE_NORMAL tapRate #-}
tapRate ::
(MonadAsync m, MonadCatch m)

View File

@ -205,6 +205,8 @@ module Streamly.Internal.Prelude
-- ** Mapping Filters
, mapMaybe
, mapMaybeM
, rollingMapM
, rollingMap
-- ** Scanning Filters
, findIndices
@ -219,6 +221,7 @@ module Streamly.Internal.Prelude
, intersperseSuffixBySpan
-- , intersperseBySpan
, interjectSuffix
, delayPost
-- ** Reordering
, reverse
@ -344,6 +347,7 @@ module Streamly.Internal.Prelude
, tap
, tapAsync
, tapRate
, pollCounts
-- * Windowed Classification
@ -467,7 +471,7 @@ import Streamly.Internal.Data.Pipe.Types (Pipe (..))
import Streamly.Internal.Data.Time.Units
(AbsTime, MilliSecond64(..), addToAbsTime, diffAbsTime, toRelTime,
toAbsTime)
import Streamly.Internal.Mutable.Prim.Var (MonadMut, Prim, Var)
import Streamly.Internal.Mutable.Prim.Var (Prim, Var)
import Streamly.Internal.Data.Strict
@ -879,7 +883,7 @@ fromHandle h = go
-- /Internal/
--
{-# INLINE fromPrimVar #-}
fromPrimVar :: (IsStream t, MonadMut m, Prim a) => Var m a -> t m a
fromPrimVar :: (IsStream t, MonadIO m, Prim a) => Var IO a -> t m a
fromPrimVar = fromStreamD . D.fromPrimVar
------------------------------------------------------------------------------
@ -1786,6 +1790,27 @@ scan (Fold step begin done) = P.scanlMx' step begin done
postscan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
postscan (Fold step begin done) = P.postscanlMx' step begin done
------------------------------------------------------------------------------
-- Stateful Transformations
------------------------------------------------------------------------------
-- | Apply a function on every two successive elements of a stream. If the
-- stream consists of a single element the output is an empty stream.
--
-- /Internal/
--
{-# INLINE rollingMap #-}
rollingMap :: (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b
rollingMap f m = fromStreamD $ D.rollingMap f $ toStreamD m
-- | Like 'rollingMap' but with an effectful map function.
--
-- /Internal/
--
{-# INLINE rollingMapM #-}
rollingMapM :: (IsStream t, Monad m) => (a -> a -> m b) -> t m a -> t m b
rollingMapM f m = fromStreamD $ D.rollingMapM f $ toStreamD m
------------------------------------------------------------------------------
-- Transformation by Filtering
------------------------------------------------------------------------------
@ -2004,10 +2029,9 @@ reverse' s = fromStreamD $ D.reverse' $ toStreamD s
-- intersperseM = intersperseBySpan 1
-- | Generate a stream by performing a monadic action between consecutive
-- elements of the given stream.
--
-- /Concurrent (do not use with 'parallely' on infinite streams)/
-- | Generate a stream by inserting the result of a monadic action between
-- consecutive elements of the given stream. Note that the monadic action is
-- performed after the stream action before which its result is inserted.
--
-- @
-- > S.toList $ S.intersperseM (return ',') $ S.fromList "hello"
@ -2039,6 +2063,28 @@ intersperse a = fromStreamS . S.intersperse a . toStreamS
intersperseSuffix :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
intersperseSuffix m = fromStreamD . D.intersperseSuffix m . toStreamD
-- | Perform a side effect after each element of a stream. The output of the
-- effectful action is discarded, therefore, the input stream remains
-- unchanged.
--
-- @
-- > S.mapM_ putChar $ S.intersperseSuffix_ (threadDelay 1000000) $ S.fromList "hello"
-- @
--
-- /Internal/
--
{-# INLINE intersperseSuffix_ #-}
intersperseSuffix_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
intersperseSuffix_ m = Serial.mapM (\x -> void m >> return x)
-- | Introduces a delay of specified seconds after each element of a stream.
--
-- /Internal/
--
{-# INLINE delayPost #-}
delayPost :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
delayPost n = intersperseSuffix_ $ liftIO $ threadDelay $ round $ n * 1000000
-- | Like 'intersperseSuffix' but intersperses a monadic action into the input
-- stream after every @n@ elements and after the last element.
--
@ -3614,6 +3660,30 @@ tap f xs = D.fromStreamD $ D.tap f (D.toStreamD xs)
tapAsync :: (IsStream t, MonadAsync m) => FL.Fold m a b -> t m a -> t m a
tapAsync f xs = D.fromStreamD $ D.tapAsync f (D.toStreamD xs)
-- | Maintain the count of elements flowing in the stream and poll the count
-- asynchronously from another thread. The count stream is transformed using
-- the supplied transform and then folded using the supplied fold. The thread
-- is automatically cleaned up if the stream stops or aborts due to exception.
--
-- For example, to print the count of elements processed every second:
--
-- @
-- > S.drain $ S.pollCounts (rollingMap (-) . delayPost 1) (FL.drainBy print)
-- $ S.enumerateFrom 0
-- @
--
{-# INLINE pollCounts #-}
pollCounts ::
(IsStream t, MonadAsync m)
=> (t m Int -> t m Int)
-> Fold m Int b
-> t m a
-> t m a
pollCounts transf f xs =
D.fromStreamD
$ D.pollCounts (D.toStreamD . transf . D.fromStreamD) f
$ (D.toStreamD xs)
-- | Calls the supplied function with the number of elements consumed
-- every @n@ seconds. The given function is run in a separate thread
-- until the end of the stream. In case there is an exception in the