Add concat/append benchmark for concurrent streams

Measure space usage in interesting cases.

The motivation for these benchmarks is parallel stream consuming a lot of stack
space in concatMap case. These benchmarks will hopefully catch any such
problems in future.

These benchmarks may take a lot of time to allow memory consumption to slowly
buildup to noticeable amount in case there is a problem. Therefore they are
enabled only in dev builds. We can use `--include-first-iter`, `--min-duration
0` options in gauge to run them in reasonable time. They need to be run with
`--measure-with` option to allow isolated runs, otherwise memory measurement
won't be correct.
This commit is contained in:
Harendra Kumar 2019-06-28 02:54:35 +05:30
parent ed7f4ab1a6
commit d6eb138165
2 changed files with 122 additions and 0 deletions

89
benchmark/Concurrent.hs Normal file
View File

@ -0,0 +1,89 @@
{-# LANGUAGE RankNTypes #-}
-- |
-- Module : Main
-- Copyright : (c) 2018 Harendra Kumar
--
-- License : BSD3
-- Maintainer : harendra.kumar@gmail.com
import Control.Concurrent
import Control.Monad (when)
-- import Data.IORef
import Gauge
import Streamly
import qualified Streamly.Prelude as S
-------------------------------------------------------------------------------
-- Append
-------------------------------------------------------------------------------
-- Single work item yielded per thread
{-# INLINE append #-}
append :: IsStream t
=> Int -> Int -> Int -> (t IO Int -> SerialT IO Int) -> IO ()
append buflen tcount d t =
let work = (\i -> when (d /= 0) (threadDelay d) >> return i)
in S.drain
$ t
$ maxBuffer buflen
$ maxThreads (-1)
$ S.fromFoldableM $ map work [1..tcount]
-- Big stream of items yielded per thread
{-# INLINE concated #-}
concated :: Int -> Int -> Int -> Int -> (forall a. SerialT IO a -> SerialT IO a -> SerialT IO a) -> IO ()
concated buflen threads d elems t =
let work = (\i -> (S.replicateM i ((when (d /= 0) (threadDelay d)) >> return i)))
in S.drain
$ adapt
$ maxThreads (-1)
$ maxBuffer buflen
$ S.concatMapBy t work
$ S.replicate threads elems
appendGroup :: Int -> Int -> Int -> [Benchmark]
appendGroup buflen threads delay =
[ -- bench "serial" $ nfIO $ append buflen threads delay serially
bench "ahead" $ nfIO $ append buflen threads delay aheadly
, bench "async" $ nfIO $ append buflen threads delay asyncly
, bench "wAsync" $ nfIO $ append buflen threads delay wAsyncly
, bench "parallel" $ nfIO $ append buflen threads delay parallely
]
concatGroup :: Int -> Int -> Int -> Int -> [Benchmark]
concatGroup buflen threads delay n =
[ bench "serial" $ nfIO $ concated buflen threads delay n serial
, bench "ahead" $ nfIO $ concated buflen threads delay n ahead
, bench "async" $ nfIO $ concated buflen threads delay n async
, bench "wAsync" $ nfIO $ concated buflen threads delay n wAsync
, bench "parallel" $ nfIO $ concated buflen threads delay n parallel
]
main :: IO ()
main = do
defaultMain
[ -- bgroup "append/buf-1-threads-10k-0sec" (appendGroup 1 10000 0)
-- , bgroup "append/buf-100-threads-100k-0sec" (appendGroup 100 100000 0)
bgroup "append/buf-10k-threads-10k-5sec" (appendGroup 10000 10000 5000000)
-- bgroup "concat/buf-1-threads-100k-count-1" (concatGroup 1 100000 0 1)
-- bgroup "concat/buf-1-threads-1-count-10m" (concatGroup 1 1 0 10000000)
, bgroup "concat/buf-100-threads-100-count-500k" (concatGroup 100 100 0 500000)
{-
, bgroup "forkIO-5000ms-10k" $
let delay = threadDelay 5000000
count = 10000 :: Int
list = [1..count]
work i = delay >> return i
in
[ bench "discard" $ nfIO $ do
mapM_ (\i -> forkIO $ work i >> return ()) list
threadDelay 6000000
, bench "collect" $ nfIO $ do
ref <- newIORef []
mapM_ (\i -> forkIO $ work i >>=
\j -> atomicModifyIORef ref $ \xs -> (j : xs, ())) list
threadDelay 6000000
]
-}
]

View File

@ -710,6 +710,39 @@ benchmark fileio
, typed-process >= 0.2.3 && < 0.3 , typed-process >= 0.2.3 && < 0.3
, deepseq >= 1.4.1 && < 1.5 , deepseq >= 1.4.1 && < 1.5
benchmark concurrent
type: exitcode-stdio-1.0
hs-source-dirs: benchmark
main-is: Concurrent.hs
default-language: Haskell2010
ghc-options: -O2 -Wall -fspec-constr-recursive=10 -with-rtsopts "-T"
if flag(has-llvm)
ghc-options: -fllvm
if flag(dev)
cpp-options: -DDEVBUILD
ghc-options: -Wmissed-specialisations
-Wall-missed-specialisations
-fno-ignore-asserts
if impl(ghc >= 8.0)
ghc-options: -Wcompat
-Wunrecognised-warning-flags
-Widentities
-Wincomplete-record-updates
-Wincomplete-uni-patterns
-Wredundant-constraints
-Wnoncanonical-monad-instances
-Wnoncanonical-monadfail-instances
if flag(dev)
buildable: True
build-depends:
streamly
, base >= 4.8 && < 5
, gauge >= 0.2.4 && < 0.3
else
buildable: False
------------------------------------------------------------------------------- -------------------------------------------------------------------------------
-- Internal benchmarks for unexposed modules -- Internal benchmarks for unexposed modules
------------------------------------------------------------------------------- -------------------------------------------------------------------------------