Add low rate examples, maxYields support in rate test

This commit is contained in:
Harendra Kumar 2024-02-17 02:09:04 +05:30
parent 486f5ee49d
commit 66745dc16e

View File

@ -8,13 +8,16 @@
module Streamly.Test.Data.Stream.Rate (main) where module Streamly.Test.Data.Stream.Rate (main) where
import Streamly.Data.Stream.Prelude (Stream, Config) import Data.Int (Int64)
import Streamly.Data.Stream.Prelude (Config)
import Streamly.Internal.Data.Time.Clock (getTime, Clock(..)) import Streamly.Internal.Data.Time.Clock (getTime, Clock(..))
import Streamly.Internal.Data.Time.Units import Streamly.Internal.Data.Time.Units
(NanoSecond64, diffAbsTime64, fromRelTime64) (NanoSecond64, diffAbsTime64, fromRelTime64)
import System.Mem (performMajorGC)
import qualified Streamly.Data.Fold as Fold import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Stream.Prelude as Stream import qualified Streamly.Data.Stream as Stream
import qualified Streamly.Internal.Data.Stream.Prelude as Stream
import Control.Concurrent import Control.Concurrent
import Control.Monad import Control.Monad
@ -46,40 +49,57 @@ measureRate' ::
-> Spec -> Spec
measureRate' measureRate'
desc modifier buffers threads rval consumerDelay producerDelay expectedRange = do desc modifier buffers threads rval consumerDelay producerDelay expectedRange = do
let
cfg =
Stream.maxBuffer buffers
. Stream.maxThreads threads
. case rval of
Left r -> Stream.avgRate r
Right _ -> Stream.rate Nothing
. modifier
threadAction =
case rval of
Left r ->
Stream.take (round $ 10 * r)
. Stream.parRepeatM cfg
Right n ->
Stream.parReplicateM cfg n
rateDesc = case rval of
Left r -> " rate: " <> show r
Right n -> " count: " <> show n
it (desc <> rateDesc it (desc <> rateDesc
<> " buffers: " <> show buffers <> " buffers: " <> show buffers
<> " threads: " <> show threads <> " threads: " <> show threads
<> ", consumer latency: " <> show consumerDelay <> ", consumer latency: " <> show consumerDelay
<> ", producer latency: " <> show producerDelay) <> ", producer latency: " <> show producerDelay)
$ durationShouldBe expectedRange $ runTest
Stream.fold Fold.drain
where
-- Keep a minimum of 2 for the very low rate cases, otherwise the
-- timing would be 0 because we will finish as soon as the first result
-- arrives.
yieldCount :: Int
yieldCount = case rval of
Left r -> max 2 (round (10 * r))
Right n -> max 2 n
rateDesc = (case rval of
Left r -> ", rate: " <> show r <> ", count: " <> show yieldCount
Right n -> ", count: " <> show n) <> ","
cfg (_n :: Maybe Int64) =
modifier
. Stream.maxBuffer buffers
-- . Stream.inspect True
. Stream.maxThreads threads
. case rval of
Left r -> Stream.avgRate r
Right _ -> Stream.rate Nothing
-- XXX it comes out less than expected for ordered streams at high
-- rates, need to fix.
-- . Stream.maxYields (Just (fromIntegral yieldCount))
threadAction f =
case rval of
Left _ ->
Stream.take yieldCount
$ Stream.parMapM (cfg (Just (fromIntegral yieldCount))) f
$ Stream.enumerateFrom (1 :: Int)
Right _ ->
Stream.parReplicateM
(cfg (Just (fromIntegral yieldCount))) yieldCount (f 1)
runTest = do
durationShouldBe expectedRange $ do
res <- Stream.fold Fold.length
$ (if consumerDelay > 0 $ (if consumerDelay > 0
then Stream.mapM $ \x -> then Stream.mapM $ \x ->
threadDelay (toMicroSecs consumerDelay) >> return x threadDelay (toMicroSecs consumerDelay) >> return x
else id) else id)
$ threadAction $ do $ threadAction $ \_idx -> do
let (t1, t2) = producerDelay let (t1, t2) = producerDelay
r <- if t1 == t2 r <- if t1 == t2
then return $ round $ toMicroSecs t1 then return $ round $ toMicroSecs t1
@ -93,7 +113,15 @@ measureRate'
-- putStrLn $ "delay took: " <> show delta -- putStrLn $ "delay took: " <> show delta
-- when (delta > 2) $ do -- when (delta > 2) $ do
-- putStrLn $ "delay took high: " <> show delta -- putStrLn $ "delay took high: " <> show delta
-- putStrLn $ "Done: " ++ show idx
return (1 :: Int) return (1 :: Int)
when (res /= yieldCount) $
error $ "expected yield count: " ++ show yieldCount
++ " actual: " ++ show res
-- To ensure that when we use "inspect" option on the channel, GC
-- occurs and cleans up the channel to print the debug info.
performMajorGC
measureRateVariable :: measureRateVariable ::
String String
@ -168,29 +196,36 @@ main = hspec $ do
measureThreads "ordered" (Stream.ordered True) 1 5 measureThreads "ordered" (Stream.ordered True) 1 5
measureThreads "ordered" (Stream.ordered True) 5 5 measureThreads "ordered" (Stream.ordered True) 5 5
measureThreads "interleaved" (Stream.interleaved True) (-1) 5
measureThreads "interleaved" (Stream.interleaved True) 1 5
measureThreads "interleaved" (Stream.interleaved True) 5 5
describe "max rate possible (count / time)" $ do
measureRate "async" (Stream.rate Nothing) 1000000 0 0 (0, 1e9)
let range = (8,12) let range = (8,12)
-- Note that because after the last yield we don't wait, the last period -- Note that because after the last yield we don't wait, the last period
-- will be effectively shorter. This becomes significant when the rates are -- will be effectively shorter. This becomes significant when the rates are
-- lower (1 or lower). For rate 1 we lose 1 second in the end and for rate -- lower (1 or lower). For rate 1 we lose 1 second in the end and for rate
-- 10 0.1 second. -- 10 0.1 second.
let rates = [1, 10, 100, 1000, 10000 let rates = [0.1, 1, 10, 100, 1000, 10000
#ifndef __GHCJS__ #ifndef __GHCJS__
, 100000, 1000000 , 100000, 1000000
#endif #endif
] ]
in describe "asyncly no consumer delay no producer delay" $ in describe "async no consumer delay no producer delay" $
forM_ rates (\r -> measureRate "async" id r 0 0 range) forM_ rates (\r -> measureRate "async" id r 0 0 range)
-- XXX try staggering the dispatches to achieve higher rates -- XXX try staggering the dispatches to achieve higher rates
-- Producer delay causes a lot of threads to be created, consuming large -- Producer delay causes a lot of threads to be created, consuming large
-- amounts of memory at higher rates. -- amounts of memory at higher rates.
let rates = [1, 10, 100 let rates = [0.1, 1, 10, 100
#if !defined(__GHCJS__) && defined USE_LARGE_MEMORY #if !defined(__GHCJS__) && defined USE_LARGE_MEMORY
1000, 10000, 25000 1000, 10000, 25000
#endif #endif
] ]
in describe "asyncly no consumer delay and 1 sec producer delay" $ in describe "async no consumer delay and 1 sec producer delay" $
forM_ rates (\r -> measureRate "async" id r 0 1 range) forM_ rates (\r -> measureRate "async" id r 0 1 range)
-- At lower rates (1/10) this is likely to vary quite a bit depending on -- At lower rates (1/10) this is likely to vary quite a bit depending on
@ -200,11 +235,11 @@ main = hspec $ do
, 1000, 10000, 25000 , 1000, 10000, 25000
#endif #endif
] ]
in describe "asyncly, no consumer delay and variable producer delay" $ in describe "async, no consumer delay and variable producer delay" $
forM_ rates $ \r -> forM_ rates $ \r ->
measureRateVariable "async" id r 0 (0.1, 3) range measureRateVariable "async" id r 0 (0.1, 3) range
let rates = [1, 10, 100, 1000, 10000 let rates = [0.1, 1, 10, 100, 1000, 10000
#ifndef __GHCJS__ #ifndef __GHCJS__
, 100000, 1000000 , 100000, 1000000
#endif #endif
@ -212,7 +247,7 @@ main = hspec $ do
in describe "interleaved, no consumer delay no producer delay" $ in describe "interleaved, no consumer delay no producer delay" $
forM_ rates (\r -> measureRate "interleaved" (Stream.interleaved True) r 0 0 range) forM_ rates (\r -> measureRate "interleaved" (Stream.interleaved True) r 0 0 range)
let rates = [1, 10, 100, 1000 let rates = [0.1, 1, 10, 100, 1000
#if !defined(__GHCJS__) && defined USE_LARGE_MEMORY #if !defined(__GHCJS__) && defined USE_LARGE_MEMORY
, 10000, 25000 , 10000, 25000
#endif #endif
@ -220,7 +255,7 @@ main = hspec $ do
in describe "interleaved, no consumer delay and 1 sec producer delay" $ in describe "interleaved, no consumer delay and 1 sec producer delay" $
forM_ rates (\r -> measureRate "interleaved" (Stream.interleaved True) r 0 1 range) forM_ rates (\r -> measureRate "interleaved" (Stream.interleaved True) r 0 1 range)
let rates = [1, 10, 100, 1000, 10000 let rates = [0.1, 1, 10, 100, 1000, 10000
#ifndef __GHCJS__ #ifndef __GHCJS__
, 100000, 1000000 , 100000, 1000000
#endif #endif
@ -230,7 +265,7 @@ main = hspec $ do
-- XXX after the change to stop workers when the heap is clearing -- XXX after the change to stop workers when the heap is clearing
-- thi does not work well at a 25000 ops per second, need to fix. -- thi does not work well at a 25000 ops per second, need to fix.
let rates = [1, 10, 100, 1000 let rates = [0.1, 1, 10, 100, 1000
#if !defined(__GHCJS__) && defined USE_LARGE_MEMORY #if !defined(__GHCJS__) && defined USE_LARGE_MEMORY
, 10000, 12500 , 10000, 12500
#endif #endif