mirror of
https://github.com/composewell/streamly.git
synced 2024-11-10 12:47:22 +03:00
Add support for large streams in benchmarks
* Instead of using hard coded numbers scale them based on the stream size. * Add concatMapWith benchmarks for concurrent streams * Add a linear-async-long benchmark that works on streams of 10 million elements We need large streams to detect memory leak issues. Specifically, we could not figure out the concatMapWith memory leak issue without a stream of at least 10s of millions of elements. For long benchmarks we use gauge defaults so that we do not run many iterations/samples.
This commit is contained in:
parent
76c3f2b40f
commit
998f496af7
@ -368,46 +368,46 @@ main =
|
||||
, benchIOSink "insertBy" (Ops.insertBy 4)
|
||||
]
|
||||
, bgroup "joining"
|
||||
[ benchIOSrc1 "zip (2x50K)" (Ops.zip 50000)
|
||||
, benchIOSrc1 "zipM (2x50K)" (Ops.zipM 50000)
|
||||
, benchIOSrc1 "mergeBy (2x50K)" (Ops.mergeBy 50000)
|
||||
, benchIOSrc1 "serial (2x50K)" (Ops.serial2 50000)
|
||||
, benchIOSrc1 "append (2x50K)" (Ops.append2 50000)
|
||||
, benchIOSrc1 "serial (2x2x25K)" (Ops.serial4 25000)
|
||||
, benchIOSrc1 "append (2x2x25K)" (Ops.append4 25000)
|
||||
, benchIOSrc1 "wSerial (2x50K)" Ops.wSerial2
|
||||
, benchIOSrc1 "interleave (2x50K)" Ops.interleave2
|
||||
, benchIOSrc1 "roundRobin (2x50K)" Ops.roundRobin2
|
||||
[ benchIOSrc1 "zip (2,x/2)" (Ops.zip (Ops.value `div` 2))
|
||||
, benchIOSrc1 "zipM (2,x/2)" (Ops.zipM (Ops.value `div` 2))
|
||||
, benchIOSrc1 "mergeBy (2,x/2)" (Ops.mergeBy (Ops.value `div` 2))
|
||||
, benchIOSrc1 "serial (2,x/2)" (Ops.serial2 (Ops.value `div` 2))
|
||||
, benchIOSrc1 "append (2,x/2)" (Ops.append2 (Ops.value `div` 2))
|
||||
, benchIOSrc1 "serial (2,2,x/4)" (Ops.serial4 (Ops.value `div` 4))
|
||||
, benchIOSrc1 "append (2,2,x/4)" (Ops.append4 (Ops.value `div` 4))
|
||||
, benchIOSrc1 "wSerial (2,x/2)" Ops.wSerial2
|
||||
, benchIOSrc1 "interleave (2,x/2)" Ops.interleave2
|
||||
, benchIOSrc1 "roundRobin (2,x/2)" Ops.roundRobin2
|
||||
]
|
||||
, bgroup "concat-foldable"
|
||||
[ benchIOSrc serially "foldMapWith (1x100K)" Ops.sourceFoldMapWith
|
||||
, benchIOSrc serially "foldMapWithM (1x100K)" Ops.sourceFoldMapWithM
|
||||
, benchIOSrc serially "foldMapM (1x100K)" Ops.sourceFoldMapM
|
||||
, benchIOSrc serially "foldWithConcatMapId (1x100K)" Ops.sourceConcatMapId
|
||||
[ benchIOSrc serially "foldMapWith" Ops.sourceFoldMapWith
|
||||
, benchIOSrc serially "foldMapWithM" Ops.sourceFoldMapWithM
|
||||
, benchIOSrc serially "foldMapM" Ops.sourceFoldMapM
|
||||
, benchIOSrc serially "foldWithConcatMapId" Ops.sourceConcatMapId
|
||||
]
|
||||
, bgroup "concat-serial"
|
||||
[ benchIOSrc1 "concatMapPure (2x50K)" (Ops.concatMapPure 2 50000)
|
||||
, benchIOSrc1 "concatMap (2x50K)" (Ops.concatMap 2 50000)
|
||||
, benchIOSrc1 "concatMap (50Kx2)" (Ops.concatMap 50000 2)
|
||||
, benchIOSrc1 "concatMapRepl (25Kx4)" Ops.concatMapRepl4xN
|
||||
, benchIOSrc1 "concatUnfoldRepl (25Kx4)" Ops.concatUnfoldRepl4xN
|
||||
[ benchIOSrc1 "concatMapPure (2,x/2)" (Ops.concatMapPure 2 (Ops.value `div` 2))
|
||||
, benchIOSrc1 "concatMap (2,x/2)" (Ops.concatMap 2 (Ops.value `div` 2))
|
||||
, benchIOSrc1 "concatMap (x/2,2)" (Ops.concatMap (Ops.value `div` 2) 2)
|
||||
, benchIOSrc1 "concatMapRepl (x/4,4)" Ops.concatMapRepl4xN
|
||||
, benchIOSrc1 "concatUnfoldRepl (x/4,4)" Ops.concatUnfoldRepl4xN
|
||||
|
||||
, benchIOSrc1 "concatMapWithSerial (2x50K)"
|
||||
(Ops.concatMapWithSerial 2 50000)
|
||||
, benchIOSrc1 "concatMapWithSerial (50Kx2)"
|
||||
(Ops.concatMapWithSerial 50000 2)
|
||||
, benchIOSrc1 "concatMapWithSerial (2,x/2)"
|
||||
(Ops.concatMapWithSerial 2 (Ops.value `div` 2))
|
||||
, benchIOSrc1 "concatMapWithSerial (x/2,2)"
|
||||
(Ops.concatMapWithSerial (Ops.value `div` 2) 2)
|
||||
|
||||
, benchIOSrc1 "concatMapWithAppend (2x50K)"
|
||||
(Ops.concatMapWithAppend 2 50000)
|
||||
, benchIOSrc1 "concatMapWithAppend (2,x/2)"
|
||||
(Ops.concatMapWithAppend 2 (Ops.value `div` 2))
|
||||
]
|
||||
, bgroup "concat-interleave"
|
||||
[ benchIOSrc1 "concatMapWithWSerial (2x50K)"
|
||||
(Ops.concatMapWithWSerial 2 50000)
|
||||
, benchIOSrc1 "concatMapWithWSerial (50Kx2)"
|
||||
(Ops.concatMapWithWSerial 50000 2)
|
||||
, benchIOSrc1 "concatUnfoldInterleaveRepl (25Kx4)"
|
||||
[ benchIOSrc1 "concatMapWithWSerial (2,x/2)"
|
||||
(Ops.concatMapWithWSerial 2 (Ops.value `div` 2))
|
||||
, benchIOSrc1 "concatMapWithWSerial (x/2,2)"
|
||||
(Ops.concatMapWithWSerial (Ops.value `div` 2) 2)
|
||||
, benchIOSrc1 "concatUnfoldInterleaveRepl (x/4,4)"
|
||||
Ops.concatUnfoldInterleaveRepl4xN
|
||||
, benchIOSrc1 "concatUnfoldRoundrobinRepl (25Kx4)"
|
||||
, benchIOSrc1 "concatUnfoldRoundrobinRepl (x/4,4)"
|
||||
Ops.concatUnfoldRoundrobinRepl4xN
|
||||
]
|
||||
-- scanl-map and foldl-map are equivalent to the scan and fold in the foldl
|
||||
|
@ -1,3 +1,4 @@
|
||||
{-# LANGUAGE CPP #-}
|
||||
-- |
|
||||
-- Module : Main
|
||||
-- Copyright : (c) 2018 Harendra Kumar
|
||||
@ -31,6 +32,10 @@ benchSrcIO
|
||||
benchSrcIO t name f
|
||||
= bench name $ nfIO $ randomRIO (1,1) >>= Ops.toNull t . f
|
||||
|
||||
{-# INLINE benchMonadicSrcIO #-}
|
||||
benchMonadicSrcIO :: String -> (Int -> IO ()) -> Benchmark
|
||||
benchMonadicSrcIO name f = bench name $ nfIO $ randomRIO (1,1) >>= f
|
||||
|
||||
{-
|
||||
_benchId :: NFData b => String -> (Ops.Stream m Int -> Identity b) -> Benchmark
|
||||
_benchId name f = bench name $ nf (runIdentity . f) (Ops.source 10)
|
||||
@ -38,7 +43,13 @@ _benchId name f = bench name $ nf (runIdentity . f) (Ops.source 10)
|
||||
|
||||
main :: IO ()
|
||||
main =
|
||||
defaultMain
|
||||
defaultMainWith (defaultConfig
|
||||
{ timeLimit = Just 1
|
||||
, minDuration = 0
|
||||
#ifdef LONG_BENCHMARKS
|
||||
, includeFirstIter = True
|
||||
#endif
|
||||
})
|
||||
[ bgroup "asyncly"
|
||||
[ benchSrcIO asyncly "unfoldr" Ops.sourceUnfoldr
|
||||
, benchSrcIO asyncly "unfoldrM" Ops.sourceUnfoldrM
|
||||
@ -52,8 +63,12 @@ main =
|
||||
, benchIO "mapM" $ Ops.mapM asyncly 1
|
||||
, benchSrcIO asyncly "unfoldrM maxThreads 1"
|
||||
(maxThreads 1 . Ops.sourceUnfoldrM)
|
||||
, benchSrcIO asyncly "unfoldrM maxBuffer 1 (1000 ops)"
|
||||
(maxBuffer 1 . Ops.sourceUnfoldrMN 1000)
|
||||
, benchSrcIO asyncly "unfoldrM maxBuffer 1 (x/10 ops)"
|
||||
(maxBuffer 1 . Ops.sourceUnfoldrMN (Ops.value `div` 10))
|
||||
, benchMonadicSrcIO "concatMapWith (2,x/2)"
|
||||
(Ops.concatStreamsWith async 2 (Ops.value `div` 2))
|
||||
, benchMonadicSrcIO "concatMapWith (x/2,2)"
|
||||
(Ops.concatStreamsWith async (Ops.value `div` 2) 2)
|
||||
]
|
||||
, bgroup "wAsyncly"
|
||||
[ benchSrcIO wAsyncly "unfoldr" Ops.sourceUnfoldr
|
||||
@ -66,6 +81,10 @@ main =
|
||||
, benchIO "map" $ Ops.map' wAsyncly 1
|
||||
, benchIO "fmap" $ Ops.fmap' wAsyncly 1
|
||||
, benchIO "mapM" $ Ops.mapM wAsyncly 1
|
||||
, benchMonadicSrcIO "concatMapWith (2,x/2)"
|
||||
(Ops.concatStreamsWith wAsync 2 (Ops.value `div` 2))
|
||||
, benchMonadicSrcIO "concatMapWith (x/2,2)"
|
||||
(Ops.concatStreamsWith wAsync (Ops.value `div` 2) 2)
|
||||
]
|
||||
-- unfoldr and fromFoldable are always serial and thereofore the same for
|
||||
-- all stream types.
|
||||
@ -81,9 +100,13 @@ main =
|
||||
, benchIO "mapM" $ Ops.mapM aheadly 1
|
||||
, benchSrcIO aheadly "unfoldrM maxThreads 1"
|
||||
(maxThreads 1 . Ops.sourceUnfoldrM)
|
||||
, benchSrcIO aheadly "unfoldrM maxBuffer 1 (1000 ops)"
|
||||
(maxBuffer 1 . Ops.sourceUnfoldrMN 1000)
|
||||
, benchSrcIO aheadly "unfoldrM maxBuffer 1 (x/10 ops)"
|
||||
(maxBuffer 1 . Ops.sourceUnfoldrMN (Ops.value `div` 10))
|
||||
-- , benchSrcIO aheadly "fromFoldable" Ops.sourceFromFoldable
|
||||
, benchMonadicSrcIO "concatMapWith (2,x/2)"
|
||||
(Ops.concatStreamsWith ahead 2 (Ops.value `div` 2))
|
||||
, benchMonadicSrcIO "concatMapWith (x/2,2)"
|
||||
(Ops.concatStreamsWith ahead (Ops.value `div` 2) 2)
|
||||
]
|
||||
-- XXX need to use smaller streams to finish in reasonable time
|
||||
, bgroup "parallely"
|
||||
@ -104,5 +127,9 @@ main =
|
||||
-- Parallel stages in a pipeline
|
||||
, benchIO "parAppMap" Ops.parAppMap
|
||||
, benchIO "parAppSum" Ops.parAppSum
|
||||
, benchMonadicSrcIO "concatMapWith (2,x/2)"
|
||||
(Ops.concatStreamsWith parallel 2 (Ops.value `div` 2))
|
||||
, benchMonadicSrcIO "concatMapWith (x/10,10)"
|
||||
(Ops.concatStreamsWith parallel (Ops.value `div` 10) 10)
|
||||
]
|
||||
]
|
||||
|
@ -10,6 +10,7 @@
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE DeriveAnyClass #-}
|
||||
{-# LANGUAGE DeriveGeneric #-}
|
||||
{-# LANGUAGE RankNTypes #-}
|
||||
|
||||
#ifdef __HADDOCK_VERSION__
|
||||
#undef INSPECTION
|
||||
@ -53,7 +54,11 @@ import qualified Streamly.Internal.Data.Pipe as Pipe
|
||||
import qualified Streamly.Internal.Data.Stream.Parallel as Par
|
||||
|
||||
value, maxValue, value2 :: Int
|
||||
#ifdef LINEAR_ASYNC
|
||||
|
||||
-- To detect memory leak issues we need larger streams
|
||||
#ifdef LONG_BENCHMARKS
|
||||
value = 10000000
|
||||
#elif defined(LINEAR_ASYNC)
|
||||
value = 10000
|
||||
#else
|
||||
value = 100000
|
||||
@ -818,23 +823,29 @@ inspect $ hasNoTypeClasses 'concatMapRepl4xN
|
||||
|
||||
-- concatMapWith
|
||||
|
||||
{-# INLINE concatMapWithSerial #-}
|
||||
concatMapWithSerial :: Int -> Int -> Int -> IO ()
|
||||
concatMapWithSerial outer inner n =
|
||||
S.drain $ S.concatMapWith S.serial
|
||||
{-# INLINE concatStreamsWith #-}
|
||||
concatStreamsWith
|
||||
:: (forall c. S.SerialT IO c -> S.SerialT IO c -> S.SerialT IO c)
|
||||
-> Int
|
||||
-> Int
|
||||
-> Int
|
||||
-> IO ()
|
||||
concatStreamsWith op outer inner n =
|
||||
S.drain $ S.concatMapWith op
|
||||
(\_ -> sourceUnfoldrMN inner n)
|
||||
(sourceUnfoldrMN outer n)
|
||||
|
||||
{-# INLINE concatMapWithSerial #-}
|
||||
concatMapWithSerial :: Int -> Int -> Int -> IO ()
|
||||
concatMapWithSerial = concatStreamsWith S.serial
|
||||
|
||||
#ifdef INSPECTION
|
||||
inspect $ hasNoTypeClasses 'concatMapWithSerial
|
||||
#endif
|
||||
|
||||
{-# INLINE concatMapWithAppend #-}
|
||||
concatMapWithAppend :: Int -> Int -> Int -> IO ()
|
||||
concatMapWithAppend outer inner n =
|
||||
S.drain $ S.concatMapWith Internal.append
|
||||
(\_ -> sourceUnfoldrMN inner n)
|
||||
(sourceUnfoldrMN outer n)
|
||||
concatMapWithAppend = concatStreamsWith Internal.append
|
||||
|
||||
#ifdef INSPECTION
|
||||
inspect $ hasNoTypeClasses 'concatMapWithAppend
|
||||
@ -842,10 +853,7 @@ inspect $ hasNoTypeClasses 'concatMapWithAppend
|
||||
|
||||
{-# INLINE concatMapWithWSerial #-}
|
||||
concatMapWithWSerial :: Int -> Int -> Int -> IO ()
|
||||
concatMapWithWSerial outer inner n =
|
||||
S.drain $ S.concatMapWith S.wSerial
|
||||
(\_ -> sourceUnfoldrMN inner n)
|
||||
(sourceUnfoldrMN outer n)
|
||||
concatMapWithWSerial = concatStreamsWith S.wSerial
|
||||
|
||||
#ifdef INSPECTION
|
||||
inspect $ hasNoTypeClasses 'concatMapWithWSerial
|
||||
|
@ -598,6 +598,26 @@ benchmark linear-async
|
||||
build-depends: template-haskell >= 2.14 && < 2.16
|
||||
, inspection-testing >= 0.4 && < 0.5
|
||||
|
||||
benchmark linear-async-long
|
||||
import: bench-options
|
||||
cpp-options: -DLONG_BENCHMARKS
|
||||
type: exitcode-stdio-1.0
|
||||
hs-source-dirs: benchmark
|
||||
main-is: LinearAsync.hs
|
||||
other-modules: Streamly.Benchmark.Prelude
|
||||
build-depends:
|
||||
streamly
|
||||
, base >= 4.8 && < 5
|
||||
, deepseq >= 1.4.1 && < 1.5
|
||||
, random >= 1.0 && < 2.0
|
||||
, gauge >= 0.2.4 && < 0.3
|
||||
if impl(ghc < 8.0)
|
||||
build-depends:
|
||||
transformers >= 0.4 && < 0.6
|
||||
if flag(inspection)
|
||||
build-depends: template-haskell >= 2.14 && < 2.16
|
||||
, inspection-testing >= 0.4 && < 0.5
|
||||
|
||||
benchmark linear-rate
|
||||
import: bench-options
|
||||
type: exitcode-stdio-1.0
|
||||
|
Loading…
Reference in New Issue
Block a user