Add benchmarks for eagerEval option

This commit is contained in:
Harendra Kumar 2022-10-06 15:08:58 +05:30
parent 4eaeb10c97
commit 0945bc327b
7 changed files with 310 additions and 207 deletions

View File

@ -6,219 +6,20 @@
-- License : BSD3
-- Maintainer : streamly@composewell.com
import Stream.Common
(composeN, benchIO, benchIOSink, benchIOSrc, sourceUnfoldrM)
import Streamly.Data.Stream (Stream)
import Streamly.Internal.Data.Stream.Async (MonadAsync)
import qualified Data.List as List
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Stream as Stream
import qualified Streamly.Internal.Data.Stream.Async as Async
import Gauge
import Prelude hiding (mapM)
import Streamly.Benchmark.Common
import Stream.AsyncCommon (allBenchmarks, interleaveBenchmarks)
import Streamly.Benchmark.Common (runWithCLIOpts, defaultStreamSize)
moduleName :: String
moduleName = "Data.Stream.Async"
-- XXX Write inspection tests to make sure no dictionaries are being passed
-- around to find specialization issues. Could be really bad for perf.
-------------------------------------------------------------------------------
-- Mapping
-------------------------------------------------------------------------------
{-# INLINE mapM #-}
mapM ::
MonadAsync m
=> Int
-> Stream m Int
-> m ()
mapM n = composeN n $ Async.mapM return
o_1_space_mapping :: Int -> [Benchmark]
o_1_space_mapping value =
[ bgroup "mapping"
[ benchIOSink value "mapM" $ mapM 1
]
]
-------------------------------------------------------------------------------
-- Size conserving transformations (reordering, buffering, etc.)
-------------------------------------------------------------------------------
o_n_heap_buffering :: Int -> [Benchmark]
o_n_heap_buffering value =
[ bgroup "buffered"
[benchIOSink value "mkAsync" (Stream.fold Fold.drain . Async.eval)]
]
-------------------------------------------------------------------------------
-- Joining
-------------------------------------------------------------------------------
{-# INLINE async2 #-}
async2 :: Int -> Int -> IO ()
async2 count n =
Stream.fold Fold.drain $
sourceUnfoldrM count n `Async.append` sourceUnfoldrM count (n + 1)
{-# INLINE concatAsync2 #-}
concatAsync2 :: Int -> Int -> IO ()
concatAsync2 count n =
Stream.fold Fold.drain $
Async.concatList [sourceUnfoldrM count n, sourceUnfoldrM count (n + 1)]
{-# INLINE interleave2 #-}
interleave2 :: Int -> Int -> IO ()
interleave2 count n =
Stream.fold Fold.drain $
sourceUnfoldrM count n `Async.interleave` sourceUnfoldrM count (n + 1)
o_1_space_joining :: Int -> [Benchmark]
o_1_space_joining value =
[ bgroup "joining"
[ benchIOSrc1 "async (2 of n/2)" (async2 (value `div` 2))
, benchIOSrc1 "concat async (2 of n/2)" (concatAsync2 (value `div` 2))
, benchIOSrc1 "interleave (2 of n/2)" (interleave2 (value `div` 2))
]
]
-------------------------------------------------------------------------------
-- Concat
-------------------------------------------------------------------------------
{-# INLINE sourceFoldMapWith #-}
sourceFoldMapWith :: Int -> Int -> Stream IO Int
sourceFoldMapWith value n =
Async.concatMap Stream.fromPure $ Stream.fromList [n..n+value]
{-# INLINE sourceFoldMapWithStream #-}
sourceFoldMapWithStream :: Int -> Int -> Stream IO Int
sourceFoldMapWithStream value n =
Async.concatMap Stream.fromPure $ Stream.enumerateFromTo n (n + value)
{-# INLINE concatFoldableWith #-}
concatFoldableWith :: Int -> Int -> Stream IO Int
concatFoldableWith value n =
let step x =
if x <= n + value
then Just (Stream.fromPure x, x + 1)
else Nothing
list = List.unfoldr step n
in Async.concatList list
o_1_space_concatFoldable :: Int -> [Benchmark]
o_1_space_concatFoldable value =
[ bgroup "concat-foldable"
[ benchIOSrc "foldMapWith (<>) (List)"
(sourceFoldMapWith value)
, benchIOSrc "foldMapWith (<>) (Stream)"
(sourceFoldMapWithStream value)
, benchIOSrc "S.concatFoldableWith (<>) (List)"
(concatFoldableWith value)
]
]
{-# INLINE concatMapStreamsWith #-}
concatMapStreamsWith
:: Int
-> Int
-> Int
-> IO ()
concatMapStreamsWith outer inner n =
Stream.fold Fold.drain
$ Async.concatMap (sourceUnfoldrM inner) (sourceUnfoldrM outer n)
{-# INLINE concatFmapStreamsWith #-}
concatFmapStreamsWith
:: Int
-> Int
-> Int
-> IO ()
concatFmapStreamsWith outer inner n =
Stream.fold Fold.drain
$ Async.concat
$ fmap (sourceUnfoldrM inner) (sourceUnfoldrM outer n)
{-# INLINE concatMapInterleaveStreamsWith #-}
concatMapInterleaveStreamsWith
:: Int
-> Int
-> Int
-> IO ()
concatMapInterleaveStreamsWith outer inner n =
Stream.fold Fold.drain
$ Async.concatMapInterleave
(sourceUnfoldrM inner) (sourceUnfoldrM outer n)
o_1_space_concatMap :: Int -> [Benchmark]
o_1_space_concatMap value =
value2 `seq`
[ bgroup "concat"
[ benchIO "concatMapWith (n of 1)"
(concatMapStreamsWith value 1)
, benchIO "concatMapWith (sqrt x of sqrt x)"
(concatMapStreamsWith value2 value2)
, benchIO "concatMapWith (1 of n)"
(concatMapStreamsWith 1 value)
, benchIO "concat . fmap (n of 1)"
(concatFmapStreamsWith value 1)
, benchIO "concatMapInterleaveWith (n of 1)"
(concatMapInterleaveStreamsWith value 1)
, benchIO "concatMapInterleaveWith (sqrt x of sqrt x)"
(concatMapInterleaveStreamsWith value2 value2)
, benchIO "concatMapInterleaveWith (1 of n)"
(concatMapInterleaveStreamsWith 1 value)
]
]
where
value2 = round $ sqrt (fromIntegral value :: Double)
-------------------------------------------------------------------------------
-- Monadic outer product
-------------------------------------------------------------------------------
{-# INLINE toNullAp #-}
toNullAp :: Int -> Int -> IO ()
toNullAp linearCount start =
Stream.fold Fold.drain
$ Async.apply
(fmap (+) (sourceUnfoldrM nestedCount2 start))
(sourceUnfoldrM nestedCount2 start)
where
nestedCount2 = round (fromIntegral linearCount**(1/2::Double))
o_1_space_outerProduct :: Int -> [Benchmark]
o_1_space_outerProduct value =
[ bgroup "monad-outer-product"
[ benchIO "toNullAp" $ toNullAp value
]
]
-------------------------------------------------------------------------------
-- Main
-------------------------------------------------------------------------------
main :: IO ()
main = runWithCLIOpts defaultStreamSize allBenchmarks
where
allBenchmarks value =
[ bgroup (o_1_space_prefix moduleName) $ concat
[ o_1_space_mapping value
, o_1_space_concatFoldable value
, o_1_space_concatMap value
, o_1_space_outerProduct value
, o_1_space_joining value
]
, bgroup (o_n_heap_prefix moduleName) (o_n_heap_buffering value)
]
main =
runWithCLIOpts defaultStreamSize
(\value ->
allBenchmarks moduleName id value
<> interleaveBenchmarks moduleName id value
)

View File

@ -0,0 +1,253 @@
{-# LANGUAGE FlexibleContexts #-}
-- |
-- Module : Main
-- Copyright : (c) 2018 Composewell Technologies
--
-- License : BSD3
-- Maintainer : streamly@composewell.com
module Stream.AsyncCommon
( allBenchmarks
, interleaveBenchmarks
)
where
import Stream.Common
(composeN, benchIO, benchIOSink, benchIOSrc, sourceUnfoldrM)
import Streamly.Data.Stream (Stream)
import Streamly.Internal.Data.Stream.Async (MonadAsync, Config)
import qualified Data.List as List
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Stream as Stream
import qualified Streamly.Internal.Data.Stream.Async as Async
import Gauge
import Prelude hiding (mapM)
import Streamly.Benchmark.Common
-- XXX Write inspection tests to make sure no dictionaries are being passed
-- around to find specialization issues. Could be really bad for perf.
-------------------------------------------------------------------------------
-- Mapping
-------------------------------------------------------------------------------
{-# INLINE mapM #-}
mapM ::
MonadAsync m
=> (Config -> Config)
-> Int
-> Stream m Int
-> m ()
mapM f n = composeN n $ Async.mapMWith f return
o_1_space_mapping :: Int -> (Config -> Config) -> [Benchmark]
o_1_space_mapping value f =
[ bgroup "mapping"
[ benchIOSink value "mapM" $ mapM f 1
]
]
-------------------------------------------------------------------------------
-- Size conserving transformations (reordering, buffering, etc.)
-------------------------------------------------------------------------------
o_n_heap_buffering :: Int -> (Config -> Config) -> [Benchmark]
o_n_heap_buffering value f =
[ bgroup "buffered"
[ benchIOSink value "mkAsync"
(Stream.fold Fold.drain . Async.evalWith f)
]
]
-------------------------------------------------------------------------------
-- Joining
-------------------------------------------------------------------------------
{-# INLINE async2 #-}
async2 :: (Config -> Config) -> Int -> Int -> IO ()
async2 f count n =
Stream.fold Fold.drain
$ Async.appendWith f
(sourceUnfoldrM count n) (sourceUnfoldrM count (n + 1))
{-# INLINE concatAsync2 #-}
concatAsync2 :: (Config -> Config) -> Int -> Int -> IO ()
concatAsync2 f count n =
Stream.fold Fold.drain
$ Async.concatWith f
$ Stream.fromList
[sourceUnfoldrM count n, sourceUnfoldrM count (n + 1)]
{-# INLINE interleave2 #-}
interleave2 :: (Config -> Config) -> Int -> Int -> IO ()
interleave2 f count n =
Stream.fold Fold.drain
$ Async.interleaveWith f
(sourceUnfoldrM count n) (sourceUnfoldrM count (n + 1))
o_1_space_joining :: Int -> (Config -> Config) -> [Benchmark]
o_1_space_joining value f =
[ bgroup "joining"
[ benchIOSrc1 "async (2 of n/2)" (async2 f (value `div` 2))
, benchIOSrc1 "concat async (2 of n/2)" (concatAsync2 f (value `div` 2))
, benchIOSrc1 "interleave (2 of n/2)" (interleave2 f (value `div` 2))
]
]
-------------------------------------------------------------------------------
-- Concat
-------------------------------------------------------------------------------
{-# INLINE sourceFoldMapWith #-}
sourceFoldMapWith :: (Config -> Config) -> Int -> Int -> Stream IO Int
sourceFoldMapWith f value n =
Async.concatMapWith f Stream.fromPure $ Stream.fromList [n..n+value]
{-# INLINE sourceFoldMapWithStream #-}
sourceFoldMapWithStream :: (Config -> Config) -> Int -> Int -> Stream IO Int
sourceFoldMapWithStream f value n =
Async.concatMapWith f Stream.fromPure
$ Stream.enumerateFromTo n (n + value)
{-# INLINE concatFoldableWith #-}
concatFoldableWith :: (Config -> Config) -> Int -> Int -> Stream IO Int
concatFoldableWith f value n =
let step x =
if x <= n + value
then Just (Stream.fromPure x, x + 1)
else Nothing
list = List.unfoldr step n
in Async.concatWith f (Stream.fromList list)
o_1_space_concatFoldable :: Int -> (Config -> Config) -> [Benchmark]
o_1_space_concatFoldable value f =
[ bgroup "concat-foldable"
[ benchIOSrc "foldMapWith (<>) (List)"
(sourceFoldMapWith f value)
, benchIOSrc "foldMapWith (<>) (Stream)"
(sourceFoldMapWithStream f value)
, benchIOSrc "S.concatFoldableWith (<>) (List)"
(concatFoldableWith f value)
]
]
{-# INLINE concatMapStreamsWith #-}
concatMapStreamsWith
:: (Config -> Config)
-> Int
-> Int
-> Int
-> IO ()
concatMapStreamsWith f outer inner n =
Stream.fold Fold.drain
$ Async.concatMapWith f (sourceUnfoldrM inner) (sourceUnfoldrM outer n)
{-# INLINE concatFmapStreamsWith #-}
concatFmapStreamsWith
:: (Config -> Config)
-> Int
-> Int
-> Int
-> IO ()
concatFmapStreamsWith f outer inner n =
Stream.fold Fold.drain
$ Async.concatWith f
$ fmap (sourceUnfoldrM inner) (sourceUnfoldrM outer n)
{-# INLINE concatMapInterleaveStreamsWith #-}
concatMapInterleaveStreamsWith
:: (Config -> Config)
-> Int
-> Int
-> Int
-> IO ()
concatMapInterleaveStreamsWith f outer inner n =
Stream.fold Fold.drain
$ Async.concatMapInterleaveWith f
(sourceUnfoldrM inner) (sourceUnfoldrM outer n)
o_1_space_concatMap :: Int -> (Config -> Config) -> [Benchmark]
o_1_space_concatMap value f =
value2 `seq`
[ bgroup "concat"
[ benchIO "concatMapWith (n of 1)"
(concatMapStreamsWith f value 1)
, benchIO "concatMapWith (sqrt x of sqrt x)"
(concatMapStreamsWith f value2 value2)
, benchIO "concatMapWith (1 of n)"
(concatMapStreamsWith f 1 value)
, benchIO "concat . fmap (n of 1)"
(concatFmapStreamsWith f value 1)
]
]
where
value2 = round $ sqrt (fromIntegral value :: Double)
-- XXX These do not work with the eager option
o_1_space_concatMapInterleave :: Int -> (Config -> Config) -> [Benchmark]
o_1_space_concatMapInterleave value f =
value2 `seq`
[ bgroup "concat"
[ benchIO "concatMapInterleaveWith (n of 1)"
(concatMapInterleaveStreamsWith f value 1)
, benchIO "concatMapInterleaveWith (sqrt x of sqrt x)"
(concatMapInterleaveStreamsWith f value2 value2)
, benchIO "concatMapInterleaveWith (1 of n)"
(concatMapInterleaveStreamsWith f 1 value)
]
]
where
value2 = round $ sqrt (fromIntegral value :: Double)
-------------------------------------------------------------------------------
-- Monadic outer product
-------------------------------------------------------------------------------
{-# INLINE toNullAp #-}
toNullAp :: (Config -> Config) -> Int -> Int -> IO ()
toNullAp f linearCount start =
Stream.fold Fold.drain
$ Async.applyWith f
(fmap (+) (sourceUnfoldrM nestedCount2 start))
(sourceUnfoldrM nestedCount2 start)
where
nestedCount2 = round (fromIntegral linearCount**(1/2::Double))
o_1_space_outerProduct :: Int -> (Config -> Config) -> [Benchmark]
o_1_space_outerProduct value f =
[ bgroup "monad-outer-product"
[ benchIO "toNullAp" $ toNullAp f value
]
]
-------------------------------------------------------------------------------
-- Benchmark sets
-------------------------------------------------------------------------------
allBenchmarks :: String -> (Config -> Config) -> Int -> [Benchmark]
allBenchmarks moduleName modifier value =
[ bgroup (o_1_space_prefix moduleName) $ concat
[ o_1_space_mapping value modifier
, o_1_space_concatFoldable value modifier
, o_1_space_concatMap value modifier
, o_1_space_outerProduct value modifier
, o_1_space_joining value modifier
]
, bgroup (o_n_heap_prefix moduleName)
(o_n_heap_buffering value modifier)
]
interleaveBenchmarks :: String -> (Config -> Config) -> Int -> [Benchmark]
interleaveBenchmarks moduleName modifier value =
[ bgroup
(o_1_space_prefix moduleName)
(o_1_space_concatMapInterleave value modifier)
]

View File

@ -0,0 +1,23 @@
{-# LANGUAGE FlexibleContexts #-}
-- |
-- Module : Main
-- Copyright : (c) 2018 Composewell Technologies
--
-- License : BSD3
-- Maintainer : streamly@composewell.com
import Stream.AsyncCommon (allBenchmarks)
import Streamly.Benchmark.Common (runWithCLIOpts, defaultStreamSize)
import qualified Streamly.Internal.Data.Stream.Async as Async
moduleName :: String
moduleName = "Data.Stream.AsyncEager"
-------------------------------------------------------------------------------
-- Main
-------------------------------------------------------------------------------
main :: IO ()
main =
runWithCLIOpts defaultStreamSize (allBenchmarks moduleName Async.eagerEval)

View File

@ -51,6 +51,8 @@ rtsOpts exeName benchName0 = unwords [general, exeSpecific, benchSpecific]
"-K8M -M64M"
| "Data.Stream/o-n-space.grouping." `isPrefixOf` benchName = ""
| "Data.Stream/o-n-space." `isPrefixOf` benchName = "-K4M"
| "Data.Stream.AsyncEager/o-1-space.monad-outer-product.toNullAp" `isPrefixOf` benchName = "-M1024M -K4M"
| "Data.Stream.AsyncEager/o-1-space." `isPrefixOf` benchName = "-M512M -K4M"
| "Prelude.WSerial/o-n-space." `isPrefixOf` benchName = "-K4M"
| "Prelude.Async/o-n-space.monad-outer-product." `isPrefixOf` benchName =
"-K4M"

View File

@ -250,6 +250,20 @@ benchmark Data.Stream.Async
hs-source-dirs: Streamly/Benchmark/Data/Stream/, Streamly/Benchmark/Data/
main-is: Async.hs
other-modules:
Stream.AsyncCommon
Stream.Common
if flag(use-streamly-core) || impl(ghcjs)
buildable: False
else
buildable: True
benchmark Data.Stream.AsyncEager
import: bench-options-threaded
type: exitcode-stdio-1.0
hs-source-dirs: Streamly/Benchmark/Data/Stream/, Streamly/Benchmark/Data/
main-is: AsyncEager.hs
other-modules:
Stream.AsyncCommon
Stream.Common
if flag(use-streamly-core) || impl(ghcjs)
buildable: False

View File

@ -42,6 +42,10 @@ cradle:
component: "bench:Data.Stream"
- path: "./benchmark/Streamly/Benchmark/Data/Stream/Async.hs"
component: "bench:Data.Stream.Async"
- path: "./benchmark/Streamly/Benchmark/Data/Stream/AsyncCommon.hs"
component: "bench:Data.Stream.Async"
- path: "./benchmark/Streamly/Benchmark/Data/Stream/AsyncEager.hs"
component: "bench:Data.Stream.AsyncEager"
- path: "./benchmark/Streamly/Benchmark/Data/Unfold.hs"
component: "bench:Data.Unfold"
- path: "./benchmark/Streamly/Benchmark/FileSystem/Handle/Read.hs"

View File

@ -34,6 +34,12 @@ targets =
, "serial_async_cmp"
]
)
, ("Data.Stream.AsyncEager",
[ "prelude_concurrent_grp"
, "infinite_grp"
, "concurrent_cmp"
]
)
, ("Prelude.WSerial", ["serial_wserial_cmp"])
, ("Prelude.WSerial",
[ "prelude_serial_grp"