Add a benchmark for Data.Stream.Async

This commit is contained in:
Harendra Kumar 2022-09-11 23:55:35 +05:30
parent 8a0d9ad39f
commit 5e089b4a36
4 changed files with 215 additions and 2 deletions

View File

@ -0,0 +1,193 @@
{-# LANGUAGE FlexibleContexts #-}
-- |
-- Module : Main
-- Copyright : (c) 2018 Composewell Technologies
--
-- License : BSD3
-- Maintainer : streamly@composewell.com
import Stream.Common
(composeN, benchIO, benchIOSink, benchIOSrc, sourceUnfoldrM)
import Streamly.Data.Stream (Stream)
import Streamly.Internal.Data.Stream.Async (MonadAsync)
import qualified Data.List as List
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Stream as Stream
import qualified Streamly.Internal.Data.Stream.Async as Async
import Gauge
import Prelude hiding (mapM)
import Streamly.Benchmark.Common
moduleName :: String
moduleName = "Data.Stream.Async"
-- XXX Write inspection tests to make sure no dictionaries are being passed
-- around to find specialization issues. Could be really bad for perf.
-------------------------------------------------------------------------------
-- Mapping
-------------------------------------------------------------------------------
{-# INLINE mapM #-}
mapM ::
MonadAsync m
=> Int
-> Stream m Int
-> m ()
mapM n = composeN n $ Async.mapM return
o_1_space_mapping :: Int -> [Benchmark]
o_1_space_mapping value =
[ bgroup "mapping"
[ benchIOSink value "mapM" $ mapM 1
]
]
-------------------------------------------------------------------------------
-- Size conserving transformations (reordering, buffering, etc.)
-------------------------------------------------------------------------------
o_n_heap_buffering :: Int -> [Benchmark]
o_n_heap_buffering value =
[ bgroup "buffered"
[benchIOSink value "mkAsync" (Stream.fold Fold.drain . Async.eval)]
]
-------------------------------------------------------------------------------
-- Joining
-------------------------------------------------------------------------------
{-# INLINE async2 #-}
async2 :: Int -> Int -> IO ()
async2 count n =
Stream.fold Fold.drain $
sourceUnfoldrM count n `Async.append` sourceUnfoldrM count (n + 1)
o_1_space_joining :: Int -> [Benchmark]
o_1_space_joining value =
[ bgroup "joining"
[ benchIOSrc1 "async (2 of n/2)" (async2 (value `div` 2))
]
]
-------------------------------------------------------------------------------
-- Concat
-------------------------------------------------------------------------------
{-# INLINE sourceFoldMapWith #-}
sourceFoldMapWith :: Int -> Int -> Stream IO Int
sourceFoldMapWith value n =
Async.concatMap Stream.fromPure $ Stream.fromList [n..n+value]
{-# INLINE sourceFoldMapWithStream #-}
sourceFoldMapWithStream :: Int -> Int -> Stream IO Int
sourceFoldMapWithStream value n =
Async.concatMap Stream.fromPure $ Stream.enumerateFromTo n (n + value)
{-# INLINE concatFoldableWith #-}
concatFoldableWith :: Int -> Int -> Stream IO Int
concatFoldableWith value n =
let step x =
if x <= n + value
then Just (Stream.fromPure x, x + 1)
else Nothing
list = List.unfoldr step n
in Async.concatList list
o_1_space_concatFoldable :: Int -> [Benchmark]
o_1_space_concatFoldable value =
[ bgroup "concat-foldable"
[ benchIOSrc "foldMapWith (<>) (List)"
(sourceFoldMapWith value)
, benchIOSrc "foldMapWith (<>) (Stream)"
(sourceFoldMapWithStream value)
, benchIOSrc "S.concatFoldableWith (<>) (List)"
(concatFoldableWith value)
]
]
{-# INLINE concatMapStreamsWith #-}
concatMapStreamsWith
:: Int
-> Int
-> Int
-> IO ()
concatMapStreamsWith outer inner n =
Stream.fold Fold.drain
$ Async.concatMap (sourceUnfoldrM inner) (sourceUnfoldrM outer n)
{-# INLINE concatFmapStreamsWith #-}
concatFmapStreamsWith
:: Int
-> Int
-> Int
-> IO ()
concatFmapStreamsWith outer inner n =
Stream.fold Fold.drain
$ Async.concat
$ fmap (sourceUnfoldrM inner) (sourceUnfoldrM outer n)
o_1_space_concatMap :: Int -> [Benchmark]
o_1_space_concatMap value =
value2 `seq`
[ bgroup "concat"
[ benchIO "concatMapWith (n of 1)"
(concatMapStreamsWith value 1)
, benchIO "concatMapWith (sqrt x of sqrt x)"
(concatMapStreamsWith value2 value2)
, benchIO "concatMapWith (1 of n)"
(concatMapStreamsWith 1 value)
, benchIO "concat . fmap (n of 1)"
(concatFmapStreamsWith value 1)
]
]
where
value2 = round $ sqrt (fromIntegral value :: Double)
-------------------------------------------------------------------------------
-- Monadic outer product
-------------------------------------------------------------------------------
{-# INLINE toNullAp #-}
toNullAp :: Int -> Int -> IO ()
toNullAp linearCount start =
Stream.fold Fold.drain
$ Async.apply
(fmap (+) (sourceUnfoldrM nestedCount2 start))
(sourceUnfoldrM nestedCount2 start)
where
nestedCount2 = round (fromIntegral linearCount**(1/2::Double))
o_1_space_outerProduct :: Int -> [Benchmark]
o_1_space_outerProduct value =
[ bgroup "monad-outer-product"
[ benchIO "toNullAp" $ toNullAp value
]
]
-------------------------------------------------------------------------------
-- Main
-------------------------------------------------------------------------------
main :: IO ()
main = runWithCLIOpts defaultStreamSize allBenchmarks
where
allBenchmarks value =
[ bgroup (o_1_space_prefix moduleName) $ concat
[ o_1_space_mapping value
, o_1_space_concatFoldable value
, o_1_space_concatMap value
, o_1_space_outerProduct value
, o_1_space_joining value
]
, bgroup (o_n_heap_prefix moduleName) (o_n_heap_buffering value)
]

View File

@ -244,6 +244,18 @@ benchmark Data.Stream
else
ghc-options: +RTS -M2500M -RTS
benchmark Data.Stream.Async
import: bench-options-threaded
type: exitcode-stdio-1.0
hs-source-dirs: Streamly/Benchmark/Data/Stream/, Streamly/Benchmark/Data/
main-is: Async.hs
other-modules:
Stream.Common
if flag(use-streamly-core) || impl(ghcjs)
buildable: False
else
buildable: True
benchmark Prelude.WSerial
import: bench-options
type: exitcode-stdio-1.0

View File

@ -40,6 +40,8 @@ cradle:
component: "bench:Data.Stream"
- path: "./benchmark/Streamly/Benchmark/Data/Stream/Transform.hs"
component: "bench:Data.Stream"
- path: "./benchmark/Streamly/Benchmark/Data/Stream/Async.hs"
component: "bench:Data.Stream.Async"
- path: "./benchmark/Streamly/Benchmark/Data/Unfold.hs"
component: "bench:Data.Unfold"
- path: "./benchmark/Streamly/Benchmark/FileSystem/Handle/Read.hs"

View File

@ -20,8 +20,6 @@ targets =
)
-- Streams
, ("Data.Stream", ["serial_wserial_cmp"])
, ("Prelude.WSerial", ["serial_wserial_cmp"])
, ("Data.Stream",
[ "prelude_serial_grp"
, "infinite_grp"
@ -29,6 +27,14 @@ targets =
, "serial_async_cmp"
]
)
, ("Data.Stream.Async",
[ "prelude_concurrent_grp"
, "infinite_grp"
, "concurrent_cmp"
, "serial_async_cmp"
]
)
, ("Prelude.WSerial", ["serial_wserial_cmp"])
, ("Prelude.WSerial",
[ "prelude_serial_grp"
, "infinite_grp"