Add a middle point for concatMapWith benchmark

* We are testing two extreme cases, add a middle case as well where the outer
  and inner streams are of equal size.

* Enable some pure benchmarks as well

* Separate the zip benchmarks in a separate group as they are scalable
  (memory consumption does not increase with stream size) and parallel
  benchmarks are not scalable.
This commit is contained in:
Harendra Kumar 2020-01-16 15:58:48 +05:30
parent afc02e6970
commit 22bfa72599
2 changed files with 39 additions and 14 deletions

View File

@ -51,7 +51,8 @@ main :: IO ()
main = do
-- XXX Fix indentation
(value, cfg, benches) <- parseCLIOpts defaultStreamSize
value `seq` runMode (mode cfg) cfg benches
let value2 = round $ sqrt $ (fromIntegral value :: Double)
value2 `seq` value `seq` runMode (mode cfg) cfg benches
[ bgroup "asyncly"
[ benchSrcIO asyncly "unfoldr" (Ops.sourceUnfoldr value)
, benchSrcIO asyncly "unfoldrM" (Ops.sourceUnfoldrM value)
@ -69,6 +70,8 @@ main = do
(maxBuffer 1 . Ops.sourceUnfoldrMN (value `div` 10))
, benchMonadicSrcIO "concatMapWith (2,x/2)"
(Ops.concatStreamsWith async 2 (value `div` 2))
, benchMonadicSrcIO "concatMapWith (sqrt x,sqrt x)"
(Ops.concatStreamsWith async value2 value2)
, benchMonadicSrcIO "concatMapWith (x/2,2)"
(Ops.concatStreamsWith async (value `div` 2) 2)
]
@ -83,8 +86,22 @@ main = do
, benchIO value "map" $ Ops.map' wAsyncly 1
, benchIO value "fmap" $ Ops.fmap' wAsyncly 1
, benchIO value "mapM" $ Ops.mapM wAsyncly 1
, benchSrcIO wAsyncly "unfoldrM maxThreads 1"
(maxThreads 1 . Ops.sourceUnfoldrM value)
, benchSrcIO wAsyncly "unfoldrM maxBuffer 1 (x/10 ops)"
(maxBuffer 1 . Ops.sourceUnfoldrMN (value `div` 10))
-- When we merge streams using wAsync the size of the queue increases
-- slowly because of the binary composition adding just one more item
-- to the work queue only after every scheduling pass through the
-- work queue.
--
-- We should see the memory consumption increasing slowly if these
-- benchmarks are left to run on infinite number of streams of infinite
-- sizes.
, benchMonadicSrcIO "concatMapWith (2,x/2)"
(Ops.concatStreamsWith wAsync 2 (value `div` 2))
, benchMonadicSrcIO "concatMapWith (sqrt x,sqrt x)"
(Ops.concatStreamsWith wAsync value2 value2)
, benchMonadicSrcIO "concatMapWith (x/2,2)"
(Ops.concatStreamsWith wAsync (value `div` 2) 2)
]
@ -94,7 +111,7 @@ main = do
[ benchSrcIO aheadly "unfoldr" (Ops.sourceUnfoldr value)
, benchSrcIO aheadly "unfoldrM" (Ops.sourceUnfoldrM value)
, benchSrcIO aheadly "fromFoldableM" (Ops.sourceFromFoldableM value)
-- , benchSrcIO aheadly "foldMapWith" Ops.sourceFoldMapWith
, benchSrcIO aheadly "foldMapWith" (Ops.sourceFoldMapWith value)
, benchSrcIO aheadly "foldMapWithM" (Ops.sourceFoldMapWithM value)
, benchSrcIO aheadly "foldMapM" (Ops.sourceFoldMapM value)
, benchIO value "map" $ Ops.map' aheadly 1
@ -104,35 +121,43 @@ main = do
(maxThreads 1 . Ops.sourceUnfoldrM value)
, benchSrcIO aheadly "unfoldrM maxBuffer 1 (x/10 ops)"
(maxBuffer 1 . Ops.sourceUnfoldrMN (value `div` 10))
-- , benchSrcIO aheadly "fromFoldable" Ops.sourceFromFoldable
, benchSrcIO aheadly "fromFoldable" (Ops.sourceFromFoldable value)
, benchMonadicSrcIO "concatMapWith (2,x/2)"
(Ops.concatStreamsWith ahead 2 (value `div` 2))
, benchMonadicSrcIO "concatMapWith (sqrt x,sqrt x)"
(Ops.concatStreamsWith ahead value2 value2)
, benchMonadicSrcIO "concatMapWith (x/2,2)"
(Ops.concatStreamsWith ahead (value `div` 2) 2)
]
-- XXX need to use smaller streams to finish in reasonable time
, bgroup "parallely"
[ benchSrcIO parallely "unfoldr" (Ops.sourceUnfoldr value)
[ -- unfoldr is pure and works serially irrespective of the stream type
benchSrcIO parallely "unfoldr" (Ops.sourceUnfoldr value)
, benchSrcIO parallely "unfoldrM" (Ops.sourceUnfoldrM value)
--, benchSrcIO parallely "fromFoldable" Ops.sourceFromFoldable
, benchSrcIO parallely "fromFoldable" (Ops.sourceFromFoldable value)
, benchSrcIO parallely "fromFoldableM" (Ops.sourceFromFoldableM value)
-- , benchSrcIO parallely "foldMapWith" Ops.sourceFoldMapWith
, benchSrcIO parallely "foldMapWith" (Ops.sourceFoldMapWith value)
, benchSrcIO parallely "foldMapWithM" (Ops.sourceFoldMapWithM value)
, benchSrcIO parallely "foldMapM" (Ops.sourceFoldMapM value)
-- map/fmap are pure and therefore no concurrency would be added on top
-- of what the source stream (i.e. unfoldrM) provides.
, benchIO value "map" $ Ops.map' parallely 1
, benchIO value "fmap" $ Ops.fmap' parallely 1
, benchIO value "mapM" $ Ops.mapM parallely 1
-- Zip has only one parallel flavor
, benchIO value "zip" Ops.zipAsync
, benchMonadicSrcIO "concatMapWith (2,x/2)"
(Ops.concatStreamsWith parallel 2 (value `div` 2))
, benchMonadicSrcIO "concatMapWith (sqrt x,sqrt x)"
(Ops.concatStreamsWith parallel value2 value2)
, benchMonadicSrcIO "concatMapWith (x/2,2)"
(Ops.concatStreamsWith parallel (value `div` 2) 2)
]
, bgroup "zip"
[ benchIO value "zip" Ops.zipAsync
, benchIO value "zipM" Ops.zipAsyncM
, benchIO value "zipAp" Ops.zipAsyncAp
, benchIO value "fmap zipAsyncly" $ Ops.fmap' zipAsyncly 1
-- Parallel stages in a pipeline
, benchIO value "parAppMap" Ops.parAppMap
, benchIO value "parAppSum" Ops.parAppSum
, benchMonadicSrcIO "concatMapWith (2,x/2)"
(Ops.concatStreamsWith parallel 2 (value `div` 2))
, benchMonadicSrcIO "concatMapWith (x/10,10)"
(Ops.concatStreamsWith parallel (value `div` 10) 10)
]
]

View File

@ -875,8 +875,8 @@ concatStreamsWith
-> IO ()
concatStreamsWith op outer inner n =
S.drain $ S.concatMapWith op
(\_ -> sourceUnfoldrMN inner n)
(sourceUnfoldrMN outer n)
(\i -> sourceUnfoldrMN (i + inner) i)
(sourceUnfoldrMN (n + outer) n)
{-# INLINE concatMapWithSerial #-}
concatMapWithSerial :: Int -> Int -> Int -> IO ()