Add rate benchmarks for new concurrent streams

This commit is contained in:
Harendra Kumar 2024-02-15 08:16:29 +05:30
parent dd12c4f7cb
commit 14932904bd
2 changed files with 225 additions and 79 deletions

View File

@ -0,0 +1,129 @@
{-# LANGUAGE FlexibleContexts #-}
-- |
-- Module : Main
-- Copyright : (c) 2018 Composewell Technologies
--
-- License : BSD3
-- Maintainer : streamly@composewell.com
import Stream.Common (benchIOSrc, sourceUnfoldrM)
import Streamly.Data.Stream (Stream)
import Streamly.Internal.Data.Stream.Prelude (MonadAsync, Config)
import qualified Streamly.Data.Stream.Prelude as Stream
import Streamly.Benchmark.Common
import Test.Tasty.Bench
moduleName :: String
moduleName = "Data.Stream.Rate"
-------------------------------------------------------------------------------
-- Average Rate
-------------------------------------------------------------------------------
{-# INLINE rateNothing #-}
rateNothing :: MonadAsync m =>
((Config -> Config) -> Stream m Int -> Stream m Int)
-> (Config -> Config) -> Int -> Int -> Stream m Int
rateNothing f cfg value = f (Stream.rate Nothing . cfg) . sourceUnfoldrM value
{-# INLINE avgRate #-}
avgRate :: MonadAsync m =>
((Config -> Config) -> Stream m Int -> Stream m Int)
-> (Config -> Config) -> Int -> Double -> Int -> Stream m Int
avgRate f cfg value rt = f (Stream.avgRate rt . cfg) . sourceUnfoldrM value
{-
-- parEval should be maxThreads 1 anyway
{-# INLINE avgRateThreads1 #-}
avgRateThreads1 :: MonadAsync m =>
((Config -> Config) -> Stream m Int -> Stream m Int)
-> (Config -> Config) -> Int -> Double -> Int -> Stream m Int
avgRateThreads1 f cfg value rt =
f (Stream.maxThreads 1 . Stream.avgRate rt . cfg) . sourceUnfoldrM value
-}
{-# INLINE minRate #-}
minRate :: MonadAsync m =>
((Config -> Config) -> Stream m Int -> Stream m Int)
-> (Config -> Config) -> Int -> Double -> Int -> Stream m Int
minRate f cfg value rt = f (Stream.minRate rt . cfg) . sourceUnfoldrM value
{-# INLINE maxRate #-}
maxRate :: MonadAsync m =>
((Config -> Config) -> Stream m Int -> Stream m Int)
-> (Config -> Config) -> Int -> Double -> Int -> Stream m Int
maxRate f cfg value rt = f (Stream.minRate rt . cfg) . sourceUnfoldrM value
{-# INLINE constRate #-}
constRate :: MonadAsync m =>
((Config -> Config) -> Stream m Int -> Stream m Int)
-> (Config -> Config) -> Int -> Double -> Int -> Stream m Int
constRate f cfg value rt = f (Stream.constRate rt . cfg) . sourceUnfoldrM value
-- XXX arbitrarily large rate should be the same as rate Nothing
-- XXX Add tests for multiworker cases as well - parMapM
o_1_space_async :: Int -> [Benchmark]
o_1_space_async value =
[ bgroup
"default/parEval"
[ bgroup
"avgRate"
-- benchIO "unfoldr" $ toNull
-- benchIOSrc "unfoldrM" (sourceUnfoldrM value)
[ benchIOSrc "Nothing" $ rateNothing Stream.parEval id value
, benchIOSrc "1M" $ avgRate Stream.parEval id value 1000000
, benchIOSrc "3M" $ avgRate Stream.parEval id value 3000000
-- , benchIOSrc "10M/maxThreads1" $ avgRateThreads1 Stream.parEval value 10000000
, benchIOSrc "10M" $ avgRate Stream.parEval id value 10000000
, benchIOSrc "20M" $ avgRate Stream.parEval id value 20000000
]
, bgroup
"minRate"
[ benchIOSrc "1M" $ minRate Stream.parEval id value 1000000
, benchIOSrc "10M" $ minRate Stream.parEval id value 10000000
, benchIOSrc "20M" $ minRate Stream.parEval id value 20000000
]
, bgroup
"maxRate"
[ -- benchIOSrc "10K" $ maxRate value 10000
benchIOSrc "10M" $ maxRate Stream.parEval id value 10000000
]
, bgroup
"constRate"
[ -- benchIOSrc "10K" $ constRate value 10000
benchIOSrc "1M" $ constRate Stream.parEval id value 1000000
, benchIOSrc "10M" $ constRate Stream.parEval id value 10000000
]
]
]
o_1_space_ahead :: Int -> [Benchmark]
o_1_space_ahead value =
[ bgroup
"ordered/parEval"
[ benchIOSrc "avgRate/1M"
$ avgRate Stream.parEval (Stream.ordered True) value 1000000
, benchIOSrc "minRate/1M"
$ minRate Stream.parEval (Stream.ordered True) value 1000000
, benchIOSrc "maxRate/1M"
$ maxRate Stream.parEval (Stream.ordered True) value 1000000
, benchIOSrc "constRate/1M"
$ constRate Stream.parEval (Stream.ordered True) value 1000000
]
]
-------------------------------------------------------------------------------
-- Main
-------------------------------------------------------------------------------
main :: IO ()
main = runWithCLIOpts defaultStreamSize allBenchmarks
where
allBenchmarks value =
[ bgroup (o_1_space_prefix moduleName)
$ concat [o_1_space_async value, o_1_space_ahead value]]

View File

@ -296,6 +296,59 @@ benchmark Data.Stream.StreamDK
else
ghc-options: +RTS -M2500M -RTS
benchmark Data.StreamD
import: bench-options
type: exitcode-stdio-1.0
hs-source-dirs: Streamly/Benchmark/Data/Stream
main-is: StreamD.hs
if impl(ghcjs)
buildable: False
benchmark Data.StreamK
import: bench-options
type: exitcode-stdio-1.0
hs-source-dirs: Streamly/Benchmark/Data/Stream
main-is: StreamK.hs
if impl(ghcjs)
buildable: False
benchmark Data.Stream.ToStreamK
import: bench-options
type: exitcode-stdio-1.0
hs-source-dirs: Streamly/Benchmark/Data/Stream
main-is: ToStreamK.hs
if !flag(dev) || impl(ghcjs)
buildable: False
benchmark Data.StreamK.Alt
import: bench-options
type: exitcode-stdio-1.0
hs-source-dirs: Streamly/Benchmark/Data/Stream
main-is: StreamKAlt.hs
if !flag(dev) || impl(ghcjs)
buildable: False
benchmark Data.Unfold
import: bench-options
type: exitcode-stdio-1.0
hs-source-dirs: .
main-is: Streamly/Benchmark/Data/Unfold.hs
if flag(use-streamly-core) || impl(ghcjs)
buildable: False
else
buildable: True
executable nano-bench
import: bench-options
hs-source-dirs: .
main-is: NanoBenchmarks.hs
if !flag(dev)
buildable: False
-------------------------------------------------------------------------------
-- Concurrent Streams
-------------------------------------------------------------------------------
benchmark Data.Stream.Concurrent
import: bench-options-threaded
type: exitcode-stdio-1.0
@ -348,45 +401,23 @@ benchmark Data.Stream.ConcurrentOrdered
else
buildable: True
benchmark Data.Unbox
import: bench-options
benchmark Data.Stream.Rate
import: bench-options-threaded
type: exitcode-stdio-1.0
hs-source-dirs: .
cpp-options: -DUSE_UNBOX
main-is: Streamly/Benchmark/Data/Serialize.hs
benchmark Data.Unbox.Derive.TH
import: bench-options
type: exitcode-stdio-1.0
hs-source-dirs: .
cpp-options: -DUSE_UNBOX
cpp-options: -DUSE_TH
main-is: Streamly/Benchmark/Data/Serialize.hs
benchmark Data.Serialize
import: bench-options
type: exitcode-stdio-1.0
hs-source-dirs: .
cpp-options: -DUSE_TH
main-is: Streamly/Benchmark/Data/Serialize.hs
build-depends: QuickCheck, template-haskell
hs-source-dirs: Streamly/Benchmark/Data/Stream/, Streamly/Benchmark/Data/
main-is: Rate.hs
other-modules:
Streamly.Benchmark.Data.Serialize.TH
Streamly.Benchmark.Data.Serialize.RecCompatible
Streamly.Benchmark.Data.Serialize.RecNonCompatible
if flag(limit-build-mem)
ghc-options: +RTS -M1000M -RTS
benchmark Data.Unfold
import: bench-options
type: exitcode-stdio-1.0
hs-source-dirs: .
main-is: Streamly/Benchmark/Data/Unfold.hs
Stream.ConcurrentCommon
Stream.Common
if flag(use-streamly-core) || impl(ghcjs)
buildable: False
else
buildable: True
-------------------------------------------------------------------------------
-- Folds and Parsers
-------------------------------------------------------------------------------
benchmark Data.Fold
import: bench-options
type: exitcode-stdio-1.0
@ -459,49 +490,6 @@ benchmark Data.ParserK.Chunked.Generic
buildable: True
build-depends: exceptions >= 0.8 && < 0.11
-------------------------------------------------------------------------------
-- Raw Streams
-------------------------------------------------------------------------------
benchmark Data.StreamD
import: bench-options
type: exitcode-stdio-1.0
hs-source-dirs: Streamly/Benchmark/Data/Stream
main-is: StreamD.hs
if impl(ghcjs)
buildable: False
benchmark Data.StreamK
import: bench-options
type: exitcode-stdio-1.0
hs-source-dirs: Streamly/Benchmark/Data/Stream
main-is: StreamK.hs
if impl(ghcjs)
buildable: False
benchmark Data.Stream.ToStreamK
import: bench-options
type: exitcode-stdio-1.0
hs-source-dirs: Streamly/Benchmark/Data/Stream
main-is: ToStreamK.hs
if !flag(dev) || impl(ghcjs)
buildable: False
benchmark Data.StreamK.Alt
import: bench-options
type: exitcode-stdio-1.0
hs-source-dirs: Streamly/Benchmark/Data/Stream
main-is: StreamKAlt.hs
if !flag(dev) || impl(ghcjs)
buildable: False
executable nano-bench
import: bench-options
hs-source-dirs: .
main-is: NanoBenchmarks.hs
if !flag(dev)
buildable: False
-------------------------------------------------------------------------------
-- Streamly.Prelude
-------------------------------------------------------------------------------
@ -608,10 +596,6 @@ benchmark Prelude.Parallel
if flag(limit-build-mem)
ghc-options: +RTS -M2000M -RTS
-------------------------------------------------------------------------------
-- Concurrent Streams
-------------------------------------------------------------------------------
benchmark Prelude.Concurrent
import: bench-options-threaded
type: exitcode-stdio-1.0
@ -639,6 +623,39 @@ benchmark Prelude.Rate
if !flag(use-prelude)
buildable: False
-------------------------------------------------------------------------------
-- Serialization Benchmarks
-------------------------------------------------------------------------------
benchmark Data.Unbox
import: bench-options
type: exitcode-stdio-1.0
hs-source-dirs: .
cpp-options: -DUSE_UNBOX
main-is: Streamly/Benchmark/Data/Serialize.hs
benchmark Data.Unbox.Derive.TH
import: bench-options
type: exitcode-stdio-1.0
hs-source-dirs: .
cpp-options: -DUSE_UNBOX
cpp-options: -DUSE_TH
main-is: Streamly/Benchmark/Data/Serialize.hs
benchmark Data.Serialize
import: bench-options
type: exitcode-stdio-1.0
hs-source-dirs: .
cpp-options: -DUSE_TH
main-is: Streamly/Benchmark/Data/Serialize.hs
build-depends: QuickCheck, template-haskell
other-modules:
Streamly.Benchmark.Data.Serialize.TH
Streamly.Benchmark.Data.Serialize.RecCompatible
Streamly.Benchmark.Data.Serialize.RecNonCompatible
if flag(limit-build-mem)
ghc-options: +RTS -M1000M -RTS
-------------------------------------------------------------------------------
-- Array Benchmarks
-------------------------------------------------------------------------------