diff --git a/benchmark/Concurrent.hs b/benchmark/Concurrent.hs new file mode 100644 index 00000000..b6f5e1cf --- /dev/null +++ b/benchmark/Concurrent.hs @@ -0,0 +1,89 @@ +{-# LANGUAGE RankNTypes #-} +-- | +-- Module : Main +-- Copyright : (c) 2018 Harendra Kumar +-- +-- License : BSD3 +-- Maintainer : harendra.kumar@gmail.com + +import Control.Concurrent +import Control.Monad (when) +-- import Data.IORef + +import Gauge +import Streamly +import qualified Streamly.Prelude as S + +------------------------------------------------------------------------------- +-- Append +------------------------------------------------------------------------------- + +-- Single work item yielded per thread +{-# INLINE append #-} +append :: IsStream t + => Int -> Int -> Int -> (t IO Int -> SerialT IO Int) -> IO () +append buflen tcount d t = + let work = (\i -> when (d /= 0) (threadDelay d) >> return i) + in S.drain + $ t + $ maxBuffer buflen + $ maxThreads (-1) + $ S.fromFoldableM $ map work [1..tcount] + +-- Big stream of items yielded per thread +{-# INLINE concated #-} +concated :: Int -> Int -> Int -> Int -> (forall a. SerialT IO a -> SerialT IO a -> SerialT IO a) -> IO () +concated buflen threads d elems t = + let work = (\i -> (S.replicateM i ((when (d /= 0) (threadDelay d)) >> return i))) + in S.drain + $ adapt + $ maxThreads (-1) + $ maxBuffer buflen + $ S.concatMapBy t work + $ S.replicate threads elems + +appendGroup :: Int -> Int -> Int -> [Benchmark] +appendGroup buflen threads delay = + [ -- bench "serial" $ nfIO $ append buflen threads delay serially + bench "ahead" $ nfIO $ append buflen threads delay aheadly + , bench "async" $ nfIO $ append buflen threads delay asyncly + , bench "wAsync" $ nfIO $ append buflen threads delay wAsyncly + , bench "parallel" $ nfIO $ append buflen threads delay parallely + ] + +concatGroup :: Int -> Int -> Int -> Int -> [Benchmark] +concatGroup buflen threads delay n = + [ bench "serial" $ nfIO $ concated buflen threads delay n serial + , bench "ahead" $ nfIO $ concated buflen threads delay n ahead + , bench "async" $ nfIO $ concated buflen threads delay n async + , bench "wAsync" $ nfIO $ concated buflen threads delay n wAsync + , bench "parallel" $ nfIO $ concated buflen threads delay n parallel + ] + +main :: IO () +main = do + defaultMain + [ -- bgroup "append/buf-1-threads-10k-0sec" (appendGroup 1 10000 0) + -- , bgroup "append/buf-100-threads-100k-0sec" (appendGroup 100 100000 0) + bgroup "append/buf-10k-threads-10k-5sec" (appendGroup 10000 10000 5000000) + -- bgroup "concat/buf-1-threads-100k-count-1" (concatGroup 1 100000 0 1) + -- bgroup "concat/buf-1-threads-1-count-10m" (concatGroup 1 1 0 10000000) + , bgroup "concat/buf-100-threads-100-count-500k" (concatGroup 100 100 0 500000) + {- + , bgroup "forkIO-5000ms-10k" $ + let delay = threadDelay 5000000 + count = 10000 :: Int + list = [1..count] + work i = delay >> return i + in + [ bench "discard" $ nfIO $ do + mapM_ (\i -> forkIO $ work i >> return ()) list + threadDelay 6000000 + , bench "collect" $ nfIO $ do + ref <- newIORef [] + mapM_ (\i -> forkIO $ work i >>= + \j -> atomicModifyIORef ref $ \xs -> (j : xs, ())) list + threadDelay 6000000 + ] + -} + ] diff --git a/streamly.cabal b/streamly.cabal index 82cdd347..38df6df1 100644 --- a/streamly.cabal +++ b/streamly.cabal @@ -710,6 +710,39 @@ benchmark fileio , typed-process >= 0.2.3 && < 0.3 , deepseq >= 1.4.1 && < 1.5 +benchmark concurrent + type: exitcode-stdio-1.0 + hs-source-dirs: benchmark + main-is: Concurrent.hs + default-language: Haskell2010 + ghc-options: -O2 -Wall -fspec-constr-recursive=10 -with-rtsopts "-T" + + if flag(has-llvm) + ghc-options: -fllvm + + if flag(dev) + cpp-options: -DDEVBUILD + ghc-options: -Wmissed-specialisations + -Wall-missed-specialisations + -fno-ignore-asserts + if impl(ghc >= 8.0) + ghc-options: -Wcompat + -Wunrecognised-warning-flags + -Widentities + -Wincomplete-record-updates + -Wincomplete-uni-patterns + -Wredundant-constraints + -Wnoncanonical-monad-instances + -Wnoncanonical-monadfail-instances + if flag(dev) + buildable: True + build-depends: + streamly + , base >= 4.8 && < 5 + , gauge >= 0.2.4 && < 0.3 + else + buildable: False + ------------------------------------------------------------------------------- -- Internal benchmarks for unexposed modules -------------------------------------------------------------------------------