Add more Prelude.Serial benchmarks to Data.Stream

And refactor for simplification and code-reuse.
This commit is contained in:
Harendra Kumar 2022-08-05 23:46:05 +05:30
parent 8fbc085ef4
commit bfb1624a57
14 changed files with 1324 additions and 1077 deletions

View File

@ -25,11 +25,11 @@ jobs:
Data.Parser.ParserD
Data.Parser.ParserK
Data.SmallArray
Data.Stream
Data.Stream.StreamD
Data.Stream.StreamDK
Data.Stream.StreamK:6
Data.Unfold
Data.Stream
FileSystem.Handle
Prelude.Ahead
Prelude.Async:12

View File

@ -26,11 +26,11 @@
- ignore: {name: "Use fmap"}
# Warnings ignored in specific places
- ignore: {name: "Use ++", within: Stream.Transformation}
- ignore: {name: "Use mapM", within: Stream.Transformation}
- ignore: {name: "Use traverse", within: Stream.Transformation}
- ignore: {name: "Redundant <*", within: Stream.NestedStream}
- ignore: {name: "Use ++", within: Stream.NestedStream}
- ignore: {name: "Use ++", within: Stream.Transform}
- ignore: {name: "Use mapM", within: Stream.Transform}
- ignore: {name: "Use traverse", within: Stream.Transform}
- ignore: {name: "Redundant <*", within: Stream.Expand}
- ignore: {name: "Use ++", within: Stream.Reduce}
- ignore: {name: "Use ++", within: Stream.Split}
- ignore: {name: "Redundant bracket", within: Stream.Split}
- ignore: {name: "Use isDigit", within: Streamly.Internal.Unicode.Char.Parser}

View File

@ -15,15 +15,14 @@ import Streamly.Benchmark.Common.Handle (mkHandleBenchEnv)
import qualified Stream.Eliminate as Elimination
import qualified Stream.Exceptions as Exceptions
import qualified Stream.Expand as NestedStream
import qualified Stream.Generate as Generation
import qualified Stream.Lift as Lift
import qualified Stream.Reduce as Reduction
import qualified Stream.Transformation as Transformation
import qualified Stream.Reduce as NestedFold
#ifdef USE_PRELUDE
import qualified Stream.NestedFold as NestedFold
import qualified Stream.NestedStream as NestedStream
import qualified Stream.Split as Split
#endif
import qualified Stream.Transform as Transformation
import Streamly.Benchmark.Common
@ -45,15 +44,14 @@ main = do
where
allBenchmarks env size = Prelude.concat
[ Elimination.benchmarks moduleName size
[ Generation.benchmarks moduleName size
, Elimination.benchmarks moduleName size
, Exceptions.benchmarks moduleName env size
, Generation.benchmarks moduleName size
, Lift.benchmarks moduleName size
, Reduction.benchmarks moduleName size
, Transformation.benchmarks moduleName size
#ifdef USE_PRELUDE
, NestedFold.benchmarks moduleName size
, NestedStream.benchmarks moduleName size
, Split.benchmarks moduleName env
#endif
, Transformation.benchmarks moduleName size
, NestedFold.benchmarks moduleName size
, Lift.benchmarks moduleName size
, NestedStream.benchmarks moduleName size
]

View File

@ -1,3 +1,8 @@
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
-- |
-- Module : Stream.Common
-- Copyright : (c) 2018 Composewell Technologies
@ -5,40 +10,164 @@
-- Maintainer : streamly@composewell.com
module Stream.Common
( sourceUnfoldr
( MonadAsync
-- Generation
, enumerateFromTo
, replicate
, unfoldrM
, fromListM
, append
, append2
-- Elimination
, drain
, foldl'
, scanl'
-- Benchmark stream generation
, sourceUnfoldr
, sourceUnfoldrM
, sourceUnfoldrAction
, sourceConcatMapId
, sourceFromFoldable
, sourceFromFoldableM
-- Benchmark stream elimination
, benchIOSink
, benchIOSrc
-- Benchmarking functions
, concatStreamsWith
, concatPairsWith
, apDiscardFst
, apDiscardSnd
, apLiftA2
, toNullAp
, monadThen
, toNullM
, toNullM3
, filterAllOutM
, filterAllInM
, filterSome
, breakAfterSome
, toListM
, toListSome
, composeN
, mapN
, mapM
, transformMapM
, transformComposeMapM
, transformTeeMapM
, transformZipMapM
)
where
import Streamly.Internal.Data.Stream (Stream, unfold)
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Stream as Stream
import qualified Streamly.Internal.Data.Unfold as UF
import Control.DeepSeq (NFData)
import Gauge
import Prelude hiding (mapM)
import Control.Applicative (liftA2)
import Control.Exception (try)
import GHC.Exception (ErrorCall)
import Streamly.Internal.Data.Stream (Stream)
import System.Random (randomRIO)
{-# INLINE toNull #-}
toNull :: Monad m => Stream m a -> m ()
toNull = Stream.fold Fold.drain
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Pipe as Pipe
#ifdef USE_PRELUDE
import Streamly.Prelude (foldl', scanl')
import qualified Streamly.Internal.Data.Stream.IsStream as Stream
import qualified Streamly.Prelude as Stream
import Streamly.Benchmark.Prelude
( composeN, sourceUnfoldr, sourceUnfoldr, sourceFromFoldable
, sourceFromFoldableM, sourceUnfoldrAction, sourceConcatMapId, benchIOSink
, concatStreamsWith, concatPairsWith
)
#else
import Control.DeepSeq (NFData)
import Streamly.Internal.Data.Stream (unfold)
import qualified Streamly.Internal.Data.Stream as Stream
import qualified Streamly.Internal.Data.Unfold as Unfold
#endif
import Gauge
import Prelude hiding (mapM, replicate)
#ifdef USE_PRELUDE
type MonadAsync m = Stream.MonadAsync m
#else
type MonadAsync = Monad
#endif
{-# INLINE append #-}
append :: Stream m a -> Stream m a -> Stream m a
#ifdef USE_PRELUDE
append = Stream.serial
#else
append = Stream.append
#endif
{-# INLINE append2 #-}
append2 :: Monad m => Stream m a -> Stream m a -> Stream m a
#ifdef USE_PRELUDE
append2 = Stream.append
#else
append2 = Stream.append2
#endif
{-# INLINE drain #-}
drain :: Monad m => Stream m a -> m ()
drain = Stream.fold Fold.drain
{-# INLINE enumerateFromTo #-}
enumerateFromTo :: Monad m => Int -> Int -> Stream m Int
#ifdef USE_PRELUDE
enumerateFromTo = Stream.enumerateFromTo
#else
enumerateFromTo from to = Stream.unfold Unfold.enumerateFromTo (from, to)
#endif
{-# INLINE replicate #-}
replicate :: Monad m => Int -> a -> Stream m a
#ifdef USE_PRELUDE
replicate = Stream.replicate
#else
replicate n = Stream.unfold (Unfold.replicateM n) . return
#endif
{-# INLINE unfoldrM #-}
unfoldrM :: MonadAsync m => (b -> m (Maybe (a, b))) -> b -> Stream m a
#ifdef USE_PRELUDE
unfoldrM = Stream.unfoldrM
#else
unfoldrM step = Stream.unfold (Unfold.unfoldrM step)
#endif
{-# INLINE fromListM #-}
fromListM :: MonadAsync m => [m a] -> Stream m a
#ifdef USE_PRELUDE
fromListM = Stream.fromListM
#else
fromListM = Stream.unfold Unfold.fromListM
#endif
{-# INLINE sourceUnfoldrM #-}
sourceUnfoldrM :: Monad m => Int -> Int -> Stream m Int
sourceUnfoldrM count start = unfold (UF.unfoldrM step) start
sourceUnfoldrM :: MonadAsync m => Int -> Int -> Stream m Int
sourceUnfoldrM count start = unfoldrM step start
where
step cnt =
if cnt > start + count
then return Nothing
else return (Just (cnt, cnt + 1))
#ifndef USE_PRELUDE
{-# INLINE sourceUnfoldr #-}
sourceUnfoldr :: Monad m => Int -> Int -> Stream m Int
sourceUnfoldr count start = unfold (UF.unfoldr step) start
sourceUnfoldr count start = unfold (Unfold.unfoldr step) start
where
step cnt =
if cnt > start + count
then Nothing
@ -46,19 +175,30 @@ sourceUnfoldr count start = unfold (UF.unfoldr step) start
{-# INLINE sourceUnfoldrAction #-}
sourceUnfoldrAction :: (Monad m1, Monad m) => Int -> Int -> Stream m (m1 Int)
sourceUnfoldrAction value n = unfold (UF.unfoldr step) n
sourceUnfoldrAction value n = unfold (Unfold.unfoldr step) n
where
step cnt =
if cnt > n + value
then Nothing
else Just (return cnt, cnt + 1)
{-# INLINE sourceFromFoldable #-}
sourceFromFoldable :: Int -> Int -> Stream m Int
sourceFromFoldable value n = Stream.fromFoldable [n..n+value]
{-# INLINE sourceFromFoldableM #-}
sourceFromFoldableM :: Monad m => Int -> Int -> Stream m Int
sourceFromFoldableM value n = Stream.fromFoldableM (fmap return [n..n+value])
{-# INLINE benchIOSink #-}
benchIOSink
:: (NFData b)
=> Int -> String -> (Stream IO Int -> IO b) -> Benchmark
benchIOSink value name f =
bench name $ nfIO $ randomRIO (1,1) >>= f . sourceUnfoldrM value
#endif
-- | Takes a source, and uses it with a default drain/fold method.
{-# INLINE benchIOSrc #-}
@ -67,4 +207,266 @@ benchIOSrc
-> (Int -> Stream IO a)
-> Benchmark
benchIOSrc name f =
bench name $ nfIO $ randomRIO (1,1) >>= toNull . f
bench name $ nfIO $ randomRIO (1,1) >>= drain . f
#ifndef USE_PRELUDE
{-# INLINE concatStreamsWith #-}
concatStreamsWith
:: (Stream IO Int -> Stream IO Int -> Stream IO Int)
-> Int
-> Int
-> Int
-> IO ()
concatStreamsWith op outer inner n =
drain $ Stream.concatMapWith op
(sourceUnfoldrM inner)
(sourceUnfoldrM outer n)
{-# INLINE concatPairsWith #-}
concatPairsWith
:: (Stream IO Int -> Stream IO Int -> Stream IO Int)
-> Int
-> Int
-> Int
-> IO ()
concatPairsWith op outer inner n =
drain $ Stream.concatPairsWith op
(sourceUnfoldrM inner)
(sourceUnfoldrM outer n)
{-# INLINE sourceConcatMapId #-}
sourceConcatMapId :: (Monad m)
=> Int -> Int -> Stream m (Stream m Int)
sourceConcatMapId value n =
Stream.fromFoldable $ fmap (Stream.fromEffect . return) [n..n+value]
#endif
{-# INLINE apDiscardFst #-}
apDiscardFst :: MonadAsync m =>
Int -> Int -> m ()
apDiscardFst linearCount start = drain $
sourceUnfoldrM nestedCount2 start
*> sourceUnfoldrM nestedCount2 start
where
nestedCount2 = round (fromIntegral linearCount**(1/2::Double))
{-# INLINE apDiscardSnd #-}
apDiscardSnd :: MonadAsync m => Int -> Int -> m ()
apDiscardSnd linearCount start = drain $
sourceUnfoldrM nestedCount2 start
<* sourceUnfoldrM nestedCount2 start
where
nestedCount2 = round (fromIntegral linearCount**(1/2::Double))
{-# INLINE apLiftA2 #-}
apLiftA2 :: MonadAsync m => Int -> Int -> m ()
apLiftA2 linearCount start = drain $
liftA2 (+) (sourceUnfoldrM nestedCount2 start)
(sourceUnfoldrM nestedCount2 start)
where
nestedCount2 = round (fromIntegral linearCount**(1/2::Double))
{-# INLINE toNullAp #-}
toNullAp :: MonadAsync m => Int -> Int -> m ()
toNullAp linearCount start = drain $
(+) <$> sourceUnfoldrM nestedCount2 start
<*> sourceUnfoldrM nestedCount2 start
where
nestedCount2 = round (fromIntegral linearCount**(1/2::Double))
{-# INLINE monadThen #-}
monadThen :: MonadAsync m => Int -> Int -> m ()
monadThen linearCount start = drain $ do
sourceUnfoldrM nestedCount2 start >>
sourceUnfoldrM nestedCount2 start
where
nestedCount2 = round (fromIntegral linearCount**(1/2::Double))
{-# INLINE toNullM #-}
toNullM :: MonadAsync m => Int -> Int -> m ()
toNullM linearCount start = drain $ do
x <- sourceUnfoldrM nestedCount2 start
y <- sourceUnfoldrM nestedCount2 start
return $ x + y
where
nestedCount2 = round (fromIntegral linearCount**(1/2::Double))
{-# INLINE toNullM3 #-}
toNullM3 :: MonadAsync m => Int -> Int -> m ()
toNullM3 linearCount start = drain $ do
x <- sourceUnfoldrM nestedCount3 start
y <- sourceUnfoldrM nestedCount3 start
z <- sourceUnfoldrM nestedCount3 start
return $ x + y + z
where
nestedCount3 = round (fromIntegral linearCount**(1/3::Double))
{-# INLINE filterAllOutM #-}
filterAllOutM :: MonadAsync m => Int -> Int -> m ()
filterAllOutM linearCount start = drain $ do
x <- sourceUnfoldrM nestedCount2 start
y <- sourceUnfoldrM nestedCount2 start
let s = x + y
if s < 0
then return s
else Stream.nil
where
nestedCount2 = round (fromIntegral linearCount**(1/2::Double))
{-# INLINE filterAllInM #-}
filterAllInM :: MonadAsync m => Int -> Int -> m ()
filterAllInM linearCount start = drain $ do
x <- sourceUnfoldrM nestedCount2 start
y <- sourceUnfoldrM nestedCount2 start
let s = x + y
if s > 0
then return s
else Stream.nil
where
nestedCount2 = round (fromIntegral linearCount**(1/2::Double))
{-# INLINE filterSome #-}
filterSome :: MonadAsync m => Int -> Int -> m ()
filterSome linearCount start = drain $ do
x <- sourceUnfoldrM nestedCount2 start
y <- sourceUnfoldrM nestedCount2 start
let s = x + y
if s > 1100000
then return s
else Stream.nil
where
nestedCount2 = round (fromIntegral linearCount**(1/2::Double))
{-# INLINE breakAfterSome #-}
breakAfterSome :: Int -> Int -> IO ()
breakAfterSome linearCount start = do
(_ :: Either ErrorCall ()) <- try $ drain $ do
x <- sourceUnfoldrM nestedCount2 start
y <- sourceUnfoldrM nestedCount2 start
let s = x + y
if s > 1100000
then error "break"
else return s
return ()
where
nestedCount2 = round (fromIntegral linearCount**(1/2::Double))
{-# INLINE toListM #-}
toListM :: MonadAsync m => Int -> Int -> m [Int]
toListM linearCount start = Stream.fold Fold.toList $ do
x <- sourceUnfoldrM nestedCount2 start
y <- sourceUnfoldrM nestedCount2 start
return $ x + y
where
nestedCount2 = round (fromIntegral linearCount**(1/2::Double))
-- Taking a specified number of elements is very expensive in logict so we have
-- a test to measure the same.
{-# INLINE toListSome #-}
toListSome :: MonadAsync m => Int -> Int -> m [Int]
toListSome linearCount start =
Stream.fold Fold.toList $ Stream.take 10000 $ do
x <- sourceUnfoldrM nestedCount2 start
y <- sourceUnfoldrM nestedCount2 start
return $ x + y
where
nestedCount2 = round (fromIntegral linearCount**(1/2::Double))
#ifndef USE_PRELUDE
{-# INLINE composeN #-}
composeN ::
(Monad m)
=> Int
-> (Stream m Int -> Stream m Int)
-> Stream m Int
-> m ()
composeN n f =
case n of
1 -> drain . f
2 -> drain . f . f
3 -> drain . f . f . f
4 -> drain . f . f . f . f
_ -> undefined
#endif
{-# INLINE mapN #-}
mapN ::
Monad m
=> Int
-> Stream m Int
-> m ()
mapN n = composeN n $ fmap (+ 1)
{-# INLINE mapM #-}
mapM ::
MonadAsync m
=> Int
-> Stream m Int
-> m ()
mapM n = composeN n $ Stream.mapM return
#ifndef USE_PRELUDE
foldl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> m b
foldl' f z = Stream.fold (Fold.foldl' f z)
scanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b
scanl' f z = Stream.scan (Fold.foldl' f z)
#endif
{-# INLINE transformMapM #-}
transformMapM ::
(Monad m)
=> Int
-> Stream m Int
-> m ()
transformMapM n = composeN n $ Stream.transform (Pipe.mapM return)
{-# INLINE transformComposeMapM #-}
transformComposeMapM ::
(Monad m)
=> Int
-> Stream m Int
-> m ()
transformComposeMapM n =
composeN n $
Stream.transform
(Pipe.mapM (\x -> return (x + 1)) `Pipe.compose`
Pipe.mapM (\x -> return (x + 2)))
{-# INLINE transformTeeMapM #-}
transformTeeMapM ::
(Monad m)
=> Int
-> Stream m Int
-> m ()
transformTeeMapM n =
composeN n $
Stream.transform
(Pipe.mapM (\x -> return (x + 1)) `Pipe.tee`
Pipe.mapM (\x -> return (x + 2)))
{-# INLINE transformZipMapM #-}
transformZipMapM ::
(Monad m)
=> Int
-> Stream m Int
-> m ()
transformZipMapM n =
composeN n $
Stream.transform
(Pipe.zipWith
(+)
(Pipe.mapM (\x -> return (x + 1)))
(Pipe.mapM (\x -> return (x + 2))))

View File

@ -35,14 +35,13 @@ import qualified Streamly.Internal.Data.Stream.StreamD as D
#endif
import qualified Streamly.Internal.Data.Fold as Fold
#ifdef USE_PRELUDE
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Stream.IsStream as S
import qualified Streamly.Internal.Data.Stream.IsStream as Internal
#else
import qualified Streamly.Internal.Data.Stream as S
#endif
import Gauge
-- XXX Replace SerialT with Stream
import Streamly.Internal.Data.Stream.Serial (SerialT)
#ifdef USE_PRELUDE
import Streamly.Prelude (fromSerial)
@ -56,7 +55,6 @@ import Stream.Common
)
#endif
import Streamly.Benchmark.Common
import Prelude hiding (length, sum, or, and, any, all, notElem, elem, (!!),
lookup, repeat, minimum, maximum, product, last, mapM_, init)
import qualified Prelude
@ -66,6 +64,7 @@ import qualified Prelude
repeat :: (Monad m, S.IsStream t) => Int -> Int -> t m Int
repeat count = S.take count . S.repeat
#endif
-------------------------------------------------------------------------------
-- Elimination
-------------------------------------------------------------------------------
@ -281,6 +280,7 @@ uncons s = do
case r of
Nothing -> return ()
Just (_, t) -> uncons t
#ifdef USE_PRELUDE
{-# INLINE init #-}
init :: Monad m => SerialT m a -> m ()
@ -308,6 +308,7 @@ foldrToStream = S.foldr S.cons S.nil
{-# INLINE foldrMBuild #-}
foldrMBuild :: Monad m => SerialT m Int -> m [Int]
foldrMBuild = S.foldrM (\x xs -> (x :) <$> xs) (return [])
#ifdef USE_PRELUDE
{-# INLINE foldl'Reduce #-}
foldl'Reduce :: Monad m => SerialT m Int -> m Int
@ -411,12 +412,13 @@ drainWhile = S.drainWhile (const True)
{-# INLINE (!!) #-}
(!!) :: Monad m => Int -> SerialT m Int -> m (Maybe Int)
(!!) = flip (Internal.!!)
(!!) = flip (S.!!)
{-# INLINE lookup #-}
lookup :: Monad m => Int -> SerialT m Int -> m (Maybe Int)
lookup val = S.lookup val . S.map (\x -> (x, x))
#endif
o_1_space_elimination_folds :: Int -> [Benchmark]
o_1_space_elimination_folds value =
[ bgroup "elimination"
@ -445,10 +447,10 @@ o_1_space_elimination_folds value =
]
, bgroup "Identity"
[ benchIdentitySink value "foldrMElem" (foldrMElem value)
, benchPureSink value "foldrMToListLength"
(Prelude.length . runIdentity . foldrMBuild)
, benchIdentitySink value "foldrToStreamLength"
(S.fold Fold.length . runIdentity . foldrToStream)
, benchPureSink value "foldrMToListLength"
(Prelude.length . runIdentity . foldrMBuild)
]
]
@ -467,7 +469,7 @@ o_1_space_elimination_folds value =
, benchIOSink value "mapM_" mapM_
-- this is too fast, causes all benchmarks reported in ns
--, benchIOSink value "head" head
-- , benchIOSink value "head" head
, benchIOSink value "last" last
, benchIOSink value "length" length
, benchIOSink value "sum" sum
@ -476,9 +478,8 @@ o_1_space_elimination_folds value =
, benchIOSink value "maximum" maximum
, benchIOSink value "minimumBy" minimumBy
, benchIOSink value "minimum" minimum
#ifdef USE_PRELUDE
, bench "the" $ nfIO $ randomRIO (1,1) >>= the . repeat value
#endif
, benchIOSink value "find" (find value)
, benchIOSink value "findM" (findM value)
-- , benchIOSink value "lookupFirst" (lookup 1)
@ -487,7 +488,7 @@ o_1_space_elimination_folds value =
, benchIOSink value "findIndex" (findIndex value)
, benchIOSink value "elemIndex" (elemIndex value)
-- this is too fast, causes all benchmarks reported in ns
-- , benchIOSink value "null" S.null
-- , benchIOSink value "null" S.null
, benchIOSink value "elem" (elem value)
, benchIOSink value "notElem" (notElem value)
, benchIOSink value "all" (all value)
@ -504,6 +505,7 @@ o_1_space_elimination_folds value =
-------------------------------------------------------------------------------
-- Buffered Transformations by fold
-------------------------------------------------------------------------------
#ifdef USE_PRELUDE
{-# INLINE foldl'Build #-}
foldl'Build :: Monad m => SerialT m Int -> m [Int]
@ -559,14 +561,15 @@ o_n_space_elimination_foldr value =
, benchIOSink value "foldrM/reduce/IO (sum)" foldrMReduce
]
]
#ifdef USE_PRELUDE
o_n_heap_elimination_toList :: Int -> [Benchmark]
o_n_heap_elimination_toList value =
[ bgroup "toList"
-- Converting the stream to a list or pure stream in a strict monad
[ benchIOSink value "toListRev" Internal.toListRev
[ benchIOSink value "toListRev" S.toListRev
, benchIOSink value "toStreamRev"
(Internal.toStreamRev :: (SerialT IO Int -> IO (SerialT Identity Int)))
(S.toStreamRev :: (SerialT IO Int -> IO (SerialT Identity Int)))
]
]
@ -576,10 +579,11 @@ o_n_space_elimination_toList value =
-- Converting the stream to a list or pure stream in a strict monad
[ benchIOSink value "toList" S.toList
, benchIOSink value "toStream"
(Internal.toStream :: (SerialT IO Int -> IO (SerialT Identity Int)))
(S.toStream :: (SerialT IO Int -> IO (SerialT Identity Int)))
]
]
#endif
-------------------------------------------------------------------------------
-- Multi-stream folds
-------------------------------------------------------------------------------

View File

@ -7,6 +7,7 @@
-- Portability : GHC
{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
#ifdef __HADDOCK_VERSION__
@ -21,26 +22,24 @@
module Stream.Exceptions (benchmarks) where
import Control.Exception (SomeException, Exception, throwIO)
import Stream.Common (drain, enumerateFromTo)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import System.IO (Handle, hClose, hPutChar)
import qualified Data.IORef as Ref
import qualified Data.Map.Strict as Map
import qualified Stream.Common as Common
import qualified Streamly.FileSystem.Handle as FH
import qualified Streamly.Internal.Data.Unfold as IUF
import qualified Streamly.Internal.FileSystem.Handle as IFH
#ifdef USE_PRELUDE
import qualified Streamly.Internal.Data.Stream.IsStream as Stream
import qualified Streamly.Prelude as S
#else
import qualified Streamly.Internal.Data.Stream as Stream
import qualified Streamly.Internal.Data.Stream as S
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Unfold as Unfold
#endif
import Gauge hiding (env)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Prelude hiding (last, length)
import Streamly.Benchmark.Common
import Streamly.Benchmark.Common.Handle
@ -55,28 +54,14 @@ import qualified Streamly.Internal.Data.Stream.StreamD as D
-- stream exceptions
-------------------------------------------------------------------------------
drain :: SerialT IO a -> IO ()
{-# INLINE replicateM #-}
replicateM :: Common.MonadAsync m => Int -> m a -> SerialT m a
#ifdef USE_PRELUDE
drain = Stream.drain
replicateM = Stream.replicateM
#else
drain = Stream.fold Fold.drain
replicateM = Stream.unfold . Unfold.replicateM
#endif
enumerateFromTo :: Int -> Int -> SerialT IO Int
#ifdef USE_PRELUDE
enumerateFromTo length from = S.enumerateFromTo from (from + length)
#else
enumerateFromTo length from = Stream.unfold Unfold.enumerateFromTo (from, from + length)
#endif
sourceRef :: (Num b) => Int -> Int -> Ref.IORef b -> SerialT IO b
#ifdef USE_PRELUDE
sourceRef length from ref = S.replicateM (from + length)
#else
sourceRef length from ref = Stream.unfold (Unfold.replicateM (from + length))
#endif
$ Ref.modifyIORef' ref (+ 1) >> Ref.readIORef ref
data BenchException
= BenchException1
| BenchException2
@ -89,31 +74,32 @@ retryNoneSimple length from =
drain
$ Stream.retry
(Map.singleton BenchException1 length)
(const S.nil)
(const Stream.nil)
source
where
source = enumerateFromTo length from
source = enumerateFromTo from (from + length)
retryNone :: Int -> Int -> IO ()
retryNone length from = do
ref <- Ref.newIORef (0 :: Int)
drain
$ Stream.retry (Map.singleton BenchException1 length) (const S.nil)
$ Stream.retry (Map.singleton BenchException1 length) (const Stream.nil)
$ source ref
where
source = sourceRef length from
source ref =
replicateM (from + length)
$ Ref.modifyIORef' ref (+ 1) >> Ref.readIORef ref
retryAll :: Int -> Int -> IO ()
retryAll length from = do
ref <- Ref.newIORef 0
drain
$ Stream.retry
(Map.singleton BenchException1 (length + from)) (const S.nil)
(Map.singleton BenchException1 (length + from)) (const Stream.nil)
$ source ref
where
@ -131,11 +117,11 @@ retryUnknown :: Int -> Int -> IO ()
retryUnknown length from = do
drain
$ Stream.retry (Map.singleton BenchException1 length) (const source)
$ throwIO BenchException2 `S.before` S.nil
$ throwIO BenchException2 `Stream.before` Stream.nil
where
source = enumerateFromTo length from
source = enumerateFromTo from (from + length)
o_1_space_serial_exceptions :: Int -> [Benchmark]
@ -156,8 +142,8 @@ o_1_space_serial_exceptions length =
-- | Send the file contents to /dev/null with exception handling
readWriteOnExceptionStream :: Handle -> Handle -> IO ()
readWriteOnExceptionStream inh devNull =
let readEx = S.onException (hClose inh) (S.unfold FH.read inh)
in S.fold (FH.write devNull) readEx
let readEx = Stream.onException (hClose inh) (Stream.unfold FH.read inh)
in Stream.fold (FH.write devNull) readEx
#ifdef INSPECTION
inspect $ hasNoTypeClasses 'readWriteOnExceptionStream
@ -166,9 +152,9 @@ inspect $ hasNoTypeClasses 'readWriteOnExceptionStream
-- | Send the file contents to /dev/null with exception handling
readWriteHandleExceptionStream :: Handle -> Handle -> IO ()
readWriteHandleExceptionStream inh devNull =
let handler (_e :: SomeException) = S.fromEffect (hClose inh >> return 10)
readEx = S.handle handler (S.unfold FH.read inh)
in S.fold (FH.write devNull) readEx
let handler (_e :: SomeException) = Stream.fromEffect (hClose inh >> return 10)
readEx = Stream.handle handler (Stream.unfold FH.read inh)
in Stream.fold (FH.write devNull) readEx
#ifdef INSPECTION
inspect $ hasNoTypeClasses 'readWriteHandleExceptionStream
@ -177,8 +163,8 @@ inspect $ hasNoTypeClasses 'readWriteHandleExceptionStream
-- | Send the file contents to /dev/null with exception handling
readWriteFinally_Stream :: Handle -> Handle -> IO ()
readWriteFinally_Stream inh devNull =
let readEx = Stream.finally_ (hClose inh) (S.unfold FH.read inh)
in S.fold (FH.write devNull) readEx
let readEx = Stream.finally_ (hClose inh) (Stream.unfold FH.read inh)
in Stream.fold (FH.write devNull) readEx
#ifdef INSPECTION
inspect $ hasNoTypeClasses 'readWriteFinally_Stream
@ -186,8 +172,8 @@ inspect $ hasNoTypeClasses 'readWriteFinally_Stream
readWriteFinallyStream :: Handle -> Handle -> IO ()
readWriteFinallyStream inh devNull =
let readEx = S.finally (hClose inh) (S.unfold FH.read inh)
in S.fold (FH.write devNull) readEx
let readEx = Stream.finally (hClose inh) (Stream.unfold FH.read inh)
in Stream.fold (FH.write devNull) readEx
-- | Send the file contents to /dev/null with exception handling
fromToBytesBracket_Stream :: Handle -> Handle -> IO ()
@ -202,7 +188,7 @@ inspect $ hasNoTypeClasses 'fromToBytesBracket_Stream
fromToBytesBracketStream :: Handle -> Handle -> IO ()
fromToBytesBracketStream inh devNull =
let readEx = S.bracket (return ()) (\_ -> hClose inh)
let readEx = Stream.bracket (return ()) (\_ -> hClose inh)
(\_ -> IFH.getBytes inh)
in IFH.putBytes devNull readEx
@ -210,8 +196,8 @@ readWriteBeforeAfterStream :: Handle -> Handle -> IO ()
readWriteBeforeAfterStream inh devNull =
let readEx =
Stream.after (hClose inh)
$ Stream.before (hPutChar devNull 'A') (S.unfold FH.read inh)
in S.fold (FH.write devNull) readEx
$ Stream.before (hPutChar devNull 'A') (Stream.unfold FH.read inh)
in Stream.fold (FH.write devNull) readEx
#ifdef INSPECTION
inspect $ 'readWriteBeforeAfterStream `hasNoType` ''D.Step
@ -219,8 +205,8 @@ inspect $ 'readWriteBeforeAfterStream `hasNoType` ''D.Step
readWriteAfterStream :: Handle -> Handle -> IO ()
readWriteAfterStream inh devNull =
let readEx = Stream.after (hClose inh) (S.unfold FH.read inh)
in S.fold (FH.write devNull) readEx
let readEx = Stream.after (hClose inh) (Stream.unfold FH.read inh)
in Stream.fold (FH.write devNull) readEx
#ifdef INSPECTION
inspect $ 'readWriteAfterStream `hasNoType` ''D.Step
@ -228,8 +214,8 @@ inspect $ 'readWriteAfterStream `hasNoType` ''D.Step
readWriteAfter_Stream :: Handle -> Handle -> IO ()
readWriteAfter_Stream inh devNull =
let readEx = Stream.after_ (hClose inh) (S.unfold FH.read inh)
in S.fold (FH.write devNull) readEx
let readEx = Stream.after_ (hClose inh) (Stream.unfold FH.read inh)
in Stream.fold (FH.write devNull) readEx
#ifdef INSPECTION
inspect $ hasNoTypeClasses 'readWriteAfter_Stream
@ -239,25 +225,25 @@ inspect $ 'readWriteAfter_Stream `hasNoType` ''D.Step
o_1_space_copy_stream_exceptions :: BenchEnv -> [Benchmark]
o_1_space_copy_stream_exceptions env =
[ bgroup "exceptions"
[ mkBenchSmall "S.onException" env $ \inh _ ->
[ mkBenchSmall "Stream.onException" env $ \inh _ ->
readWriteOnExceptionStream inh (nullH env)
, mkBenchSmall "S.handle" env $ \inh _ ->
, mkBenchSmall "Stream.handle" env $ \inh _ ->
readWriteHandleExceptionStream inh (nullH env)
, mkBenchSmall "S.finally_" env $ \inh _ ->
, mkBenchSmall "Stream.finally_" env $ \inh _ ->
readWriteFinally_Stream inh (nullH env)
, mkBenchSmall "S.finally" env $ \inh _ ->
, mkBenchSmall "Stream.finally" env $ \inh _ ->
readWriteFinallyStream inh (nullH env)
, mkBenchSmall "S.after . S.before" env $ \inh _ ->
, mkBenchSmall "Stream.after . Stream.before" env $ \inh _ ->
readWriteBeforeAfterStream inh (nullH env)
, mkBenchSmall "S.after" env $ \inh _ ->
, mkBenchSmall "Stream.after" env $ \inh _ ->
readWriteAfterStream inh (nullH env)
, mkBenchSmall "S.after_" env $ \inh _ ->
, mkBenchSmall "Stream.after_" env $ \inh _ ->
readWriteAfter_Stream inh (nullH env)
]
, bgroup "exceptions/fromToBytes"
[ mkBenchSmall "S.bracket_" env $ \inh _ ->
[ mkBenchSmall "Stream.bracket_" env $ \inh _ ->
fromToBytesBracket_Stream inh (nullH env)
, mkBenchSmall "S.bracket" env $ \inh _ ->
, mkBenchSmall "Stream.bracket" env $ \inh _ ->
fromToBytesBracketStream inh (nullH env)
]
]
@ -314,7 +300,7 @@ toChunksBracket_ inh devNull =
(return ())
(\_ -> hClose inh)
(\_ -> IFH.getChunks inh)
in S.fold (IFH.writeChunks devNull) readEx
in Stream.fold (IFH.writeChunks devNull) readEx
#ifdef INSPECTION
inspect $ hasNoTypeClasses 'toChunksBracket_
@ -322,18 +308,18 @@ inspect $ hasNoTypeClasses 'toChunksBracket_
toChunksBracket :: Handle -> Handle -> IO ()
toChunksBracket inh devNull =
let readEx = S.bracket
let readEx = Stream.bracket
(return ())
(\_ -> hClose inh)
(\_ -> IFH.getChunks inh)
in S.fold (IFH.writeChunks devNull) readEx
in Stream.fold (IFH.writeChunks devNull) readEx
o_1_space_copy_exceptions_toChunks :: BenchEnv -> [Benchmark]
o_1_space_copy_exceptions_toChunks env =
[ bgroup "exceptions/toChunks"
[ mkBench "S.bracket_" env $ \inH _ ->
[ mkBench "Stream.bracket_" env $ \inH _ ->
toChunksBracket_ inH (nullH env)
, mkBench "S.bracket" env $ \inH _ ->
, mkBench "Stream.bracket" env $ \inH _ ->
toChunksBracket inH (nullH env)
]
]

View File

@ -1,5 +1,5 @@
-- |
-- Module : Stream.NestedStream
-- Module : Stream.Expand
-- Copyright : (c) 2018 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
@ -18,9 +18,10 @@
{-# OPTIONS_GHC -fplugin Test.Inspection.Plugin #-}
#endif
module Stream.NestedStream (benchmarks) where
module Stream.Expand (benchmarks) where
import Control.Monad.Trans.Class (lift)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import qualified Control.Applicative as AP
@ -31,14 +32,23 @@ import Test.Inspection
import qualified Streamly.Internal.Data.Stream.StreamD as D
#endif
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Stream.IsStream as Internal
import qualified Stream.Common as Common
#ifdef USE_PRELUDE
import qualified Streamly.Internal.Data.Stream.IsStream as S
import Streamly.Benchmark.Prelude hiding
(benchIOSrc, sourceUnfoldrM, apDiscardFst, apDiscardSnd, apLiftA2, toNullAp
, monadThen, toNullM, toNullM3, filterAllInM, filterAllOutM, filterSome
, breakAfterSome, toListM, toListSome)
#else
import Streamly.Benchmark.Prelude (benchIO)
import qualified Streamly.Internal.Data.Stream as S
#endif
import qualified Streamly.Internal.Data.Unfold as UF
import qualified Streamly.Internal.Data.Fold as Fold
import Gauge
import Streamly.Prelude (SerialT, fromSerial, serial)
import Stream.Common hiding (append2)
import Streamly.Benchmark.Common
import Streamly.Benchmark.Prelude
import Prelude hiding (concatMap)
-------------------------------------------------------------------------------
@ -56,13 +66,14 @@ iterateN g initial count = f count initial
-- Iterate a transformation over a singleton stream
{-# INLINE iterateSingleton #-}
iterateSingleton :: S.MonadAsync m
iterateSingleton :: Monad m
=> (Int -> SerialT m Int -> SerialT m Int)
-> Int
-> Int
-> SerialT m Int
iterateSingleton g count n = iterateN g (return n) count
{-
-- XXX need to check why this is slower than the explicit recursion above, even
-- if the above code is written in a foldr like head recursive way. We also
-- need to try this with foldlM' once #150 is fixed.
@ -70,12 +81,13 @@ iterateSingleton g count n = iterateN g (return n) count
-- foldrM and any related fusion issues.
{-# INLINE _iterateSingleton #-}
_iterateSingleton ::
S.MonadAsync m
Monad m
=> (Int -> SerialT m Int -> SerialT m Int)
-> Int
-> Int
-> SerialT m Int
_iterateSingleton g value n = S.foldrM g (return n) $ sourceIntFromTo value n
-}
-------------------------------------------------------------------------------
-- Multi-Stream
@ -88,34 +100,34 @@ _iterateSingleton g value n = S.foldrM g (return n) $ sourceIntFromTo value n
{-# INLINE serial2 #-}
serial2 :: Int -> Int -> IO ()
serial2 count n =
S.drain $
S.serial (sourceUnfoldrM count n) (sourceUnfoldrM count (n + 1))
drain $
Common.append (sourceUnfoldrM count n) (sourceUnfoldrM count (n + 1))
{-# INLINE serial4 #-}
serial4 :: Int -> Int -> IO ()
serial4 count n =
S.drain $
S.serial
(S.serial (sourceUnfoldrM count n) (sourceUnfoldrM count (n + 1)))
(S.serial
drain $
Common.append
(Common.append (sourceUnfoldrM count n) (sourceUnfoldrM count (n + 1)))
(Common.append
(sourceUnfoldrM count (n + 2))
(sourceUnfoldrM count (n + 3)))
{-# INLINE append2 #-}
append2 :: Int -> Int -> IO ()
append2 count n =
S.drain $
Internal.append (sourceUnfoldrM count n) (sourceUnfoldrM count (n + 1))
drain $
Common.append2 (sourceUnfoldrM count n) (sourceUnfoldrM count (n + 1))
{-# INLINE append4 #-}
append4 :: Int -> Int -> IO ()
append4 count n =
S.drain $
Internal.append
(Internal.append
drain $
Common.append2
(Common.append2
(sourceUnfoldrM count n)
(sourceUnfoldrM count (n + 1)))
(Internal.append
(Common.append2
(sourceUnfoldrM count (n + 2))
(sourceUnfoldrM count (n + 3)))
@ -139,22 +151,24 @@ o_1_space_joining value =
-- Concat Foldable containers
-------------------------------------------------------------------------------
#ifdef USE_PRELUDE
o_1_space_concatFoldable :: Int -> [Benchmark]
o_1_space_concatFoldable value =
[ bgroup "concat-foldable"
[ benchIOSrc fromSerial "foldMapWith (<>) (List)"
[ benchIOSrc "foldMapWith (<>) (List)"
(sourceFoldMapWith value)
, benchIOSrc fromSerial "foldMapWith (<>) (Stream)"
, benchIOSrc "foldMapWith (<>) (Stream)"
(sourceFoldMapWithStream value)
, benchIOSrc fromSerial "foldMapWithM (<>) (List)"
, benchIOSrc "foldMapWithM (<>) (List)"
(sourceFoldMapWithM value)
, benchIOSrc fromSerial "S.concatFoldableWith (<>) (List)"
, benchIOSrc "S.concatFoldableWith (<>) (List)"
(concatFoldableWith value)
, benchIOSrc fromSerial "S.concatForFoldableWith (<>) (List)"
, benchIOSrc "S.concatForFoldableWith (<>) (List)"
(concatForFoldableWith value)
, benchIOSrc fromSerial "foldMapM (List)" (sourceFoldMapM value)
, benchIOSrc "foldMapM (List)" (sourceFoldMapM value)
]
]
#endif
-------------------------------------------------------------------------------
-- Concat
@ -165,14 +179,14 @@ o_1_space_concatFoldable value =
{-# INLINE concatMap #-}
concatMap :: Int -> Int -> Int -> IO ()
concatMap outer inner n =
S.drain $ S.concatMap
drain $ S.concatMap
(\_ -> sourceUnfoldrM inner n)
(sourceUnfoldrM outer n)
{-# INLINE concatMapM #-}
concatMapM :: Int -> Int -> Int -> IO ()
concatMapM outer inner n =
S.drain $ S.concatMapM
drain $ S.concatMapM
(\_ -> return $ sourceUnfoldrM inner n)
(sourceUnfoldrM outer n)
@ -186,7 +200,7 @@ inspect $ 'concatMap `hasNoType` ''SPEC
{-# INLINE concatMapPure #-}
concatMapPure :: Int -> Int -> Int -> IO ()
concatMapPure outer inner n =
S.drain $ S.concatMap
drain $ S.concatMap
(\_ -> sourceUnfoldr inner n)
(sourceUnfoldr outer n)
@ -200,7 +214,7 @@ inspect $ 'concatMapPure `hasNoType` ''SPEC
{-# INLINE concatMapRepl #-}
concatMapRepl :: Int -> Int -> Int -> IO ()
concatMapRepl outer inner n =
S.drain $ S.concatMap (S.replicate inner) (sourceUnfoldrM outer n)
drain $ S.concatMap (Common.replicate inner) (sourceUnfoldrM outer n)
#ifdef INSPECTION
inspect $ hasNoTypeClasses 'concatMapRepl
@ -211,7 +225,7 @@ inspect $ 'concatMapRepl `hasNoType` ''SPEC
{-# INLINE concatMapWithSerial #-}
concatMapWithSerial :: Int -> Int -> Int -> IO ()
concatMapWithSerial = concatStreamsWith S.serial
concatMapWithSerial = concatStreamsWith Common.append
#ifdef INSPECTION
inspect $ hasNoTypeClasses 'concatMapWithSerial
@ -220,7 +234,7 @@ inspect $ 'concatMapWithSerial `hasNoType` ''SPEC
{-# INLINE concatMapWithAppend #-}
concatMapWithAppend :: Int -> Int -> Int -> IO ()
concatMapWithAppend = concatStreamsWith Internal.append
concatMapWithAppend = concatStreamsWith Common.append2
#ifdef INSPECTION
inspect $ hasNoTypeClasses 'concatMapWithAppend
@ -231,11 +245,11 @@ inspect $ 'concatMapWithAppend `hasNoType` ''SPEC
{-# INLINE concatPairWithSerial #-}
concatPairWithSerial :: Int -> Int -> Int -> IO ()
concatPairWithSerial = concatPairsWith Internal.serial
concatPairWithSerial = concatPairsWith Common.append
{-# INLINE concatPairWithAppend #-}
concatPairWithAppend :: Int -> Int -> Int -> IO ()
concatPairWithAppend = concatPairsWith Internal.append
concatPairWithAppend = concatPairsWith Common.append2
-- unfoldMany
@ -244,7 +258,7 @@ concatPairWithAppend = concatPairsWith Internal.append
{-# INLINE unfoldManyRepl #-}
unfoldManyRepl :: Int -> Int -> Int -> IO ()
unfoldManyRepl outer inner n =
S.drain
drain
$ S.unfoldMany
(UF.lmap return (UF.replicateM inner))
(sourceUnfoldrM outer n)
@ -266,7 +280,7 @@ o_1_space_concat value = sqrtVal `seq`
(concatMapPure 1 value)
-- This is for comparison with foldMapWith
, benchIOSrc fromSerial "concatMapId (n of 1) (fromFoldable)"
, benchIOSrc "concatMapId (n of 1) (fromFoldable)"
(S.concatMap id . sourceConcatMapId value)
, benchIOSrc1 "concatMap (n of 1)"
@ -284,8 +298,8 @@ o_1_space_concat value = sqrtVal `seq`
(concatMapM 1 value)
-- This is for comparison with foldMapWith
, benchIOSrc fromSerial "concatMapWithId (n of 1) (fromFoldable)"
(S.concatMapWith serial id . sourceConcatMapId value)
, benchIOSrc "concatMapWithId (n of 1) (fromFoldable)"
(S.concatMapWith Common.append id . sourceConcatMapId value)
, benchIOSrc1 "concatMapWith (n of 1)"
(concatMapWithSerial value 1)
@ -343,23 +357,23 @@ o_n_space_concat value = sqrtVal `seq`
o_1_space_applicative :: Int -> [Benchmark]
o_1_space_applicative value =
[ bgroup "Applicative"
[ benchIO "(*>) (sqrt n x sqrt n)" $ apDiscardFst value fromSerial
, benchIO "(<*) (sqrt n x sqrt n)" $ apDiscardSnd value fromSerial
, benchIO "(<*>) (sqrt n x sqrt n)" $ toNullAp value fromSerial
, benchIO "liftA2 (sqrt n x sqrt n)" $ apLiftA2 value fromSerial
[ benchIO "(*>) (sqrt n x sqrt n)" $ apDiscardFst value
, benchIO "(<*) (sqrt n x sqrt n)" $ apDiscardSnd value
, benchIO "(<*>) (sqrt n x sqrt n)" $ toNullAp value
, benchIO "liftA2 (sqrt n x sqrt n)" $ apLiftA2 value
]
]
o_n_space_applicative :: Int -> [Benchmark]
o_n_space_applicative value =
[ bgroup "Applicative"
[ benchIOSrc fromSerial "(*>) (n times)" $
[ benchIOSrc "(*>) (n times)" $
iterateSingleton ((*>) . pure) value
, benchIOSrc fromSerial "(<*) (n times)" $
iterateSingleton (\x xs -> xs <* pure x) value
, benchIOSrc fromSerial "(<*>) (n times)" $
, benchIOSrc "(<*) (n times)" $
iterateSingleton (\x xs -> xs <* pure x) value
, benchIOSrc "(<*>) (n times)" $
iterateSingleton (\x xs -> pure (+ x) <*> xs) value
, benchIOSrc fromSerial "liftA2 (n times)" $
, benchIOSrc "liftA2 (n times)" $
iterateSingleton (AP.liftA2 (+) . pure) value
]
]
@ -371,18 +385,18 @@ o_n_space_applicative value =
o_1_space_monad :: Int -> [Benchmark]
o_1_space_monad value =
[ bgroup "Monad"
[ benchIO "(>>) (sqrt n x sqrt n)" $ monadThen value fromSerial
, benchIO "(>>=) (sqrt n x sqrt n)" $ toNullM value fromSerial
[ benchIO "(>>) (sqrt n x sqrt n)" $ monadThen value
, benchIO "(>>=) (sqrt n x sqrt n)" $ toNullM value
, benchIO "(>>=) (sqrt n x sqrt n) (filterAllOut)" $
filterAllOutM value fromSerial
filterAllOutM value
, benchIO "(>>=) (sqrt n x sqrt n) (filterAllIn)" $
filterAllInM value fromSerial
filterAllInM value
, benchIO "(>>=) (sqrt n x sqrt n) (filterSome)" $
filterSome value fromSerial
filterSome value
, benchIO "(>>=) (sqrt n x sqrt n) (breakAfterSome)" $
breakAfterSome value fromSerial
breakAfterSome value
, benchIO "(>>=) (cubert n x cubert n x cubert n)" $
toNullM3 value fromSerial
toNullM3 value
]
]
@ -400,16 +414,16 @@ sieve s = do
o_n_space_monad :: Int -> [Benchmark]
o_n_space_monad value =
[ bgroup "Monad"
[ benchIOSrc fromSerial "(>>) (n times)" $
[ benchIOSrc "(>>) (n times)" $
iterateSingleton ((>>) . pure) value
, benchIOSrc fromSerial "(>>=) (n times)" $
, benchIOSrc "(>>=) (n times)" $
iterateSingleton (\x xs -> xs >>= \y -> return (x + y)) value
, benchIO "(>>=) (sqrt n x sqrt n) (toList)" $
toListM value fromSerial
toListM value
, benchIO "(>>=) (sqrt n x sqrt n) (toListSome)" $
toListSome value fromSerial
toListSome value
, benchIO "naive prime sieve (n/4)"
(\n -> S.sum $ sieve $ S.enumerateFromTo 2 (value `div` 4 + n))
(\n -> S.fold Fold.sum $ sieve $ enumerateFromTo 2 (value `div` 4 + n))
]
]
@ -421,22 +435,22 @@ toKv :: Int -> (Int, Int)
toKv p = (p, p)
{-# INLINE joinWith #-}
joinWith :: (S.MonadAsync m) =>
joinWith :: Common.MonadAsync m =>
((Int -> Int -> Bool) -> SerialT m Int -> SerialT m Int -> SerialT m b)
-> Int
-> Int
-> m ()
joinWith j val i =
S.drain $ j (==) (sourceUnfoldrM val i) (sourceUnfoldrM val (val `div` 2))
drain $ j (==) (sourceUnfoldrM val i) (sourceUnfoldrM val (val `div` 2))
{-# INLINE joinMapWith #-}
joinMapWith :: (S.MonadAsync m) =>
joinMapWith :: Common.MonadAsync m =>
(SerialT m (Int, Int) -> SerialT m (Int, Int) -> SerialT m b)
-> Int
-> Int
-> m ()
joinMapWith j val i =
S.drain
drain
$ j
(fmap toKv (sourceUnfoldrM val i))
(fmap toKv (sourceUnfoldrM val (val `div` 2)))
@ -446,21 +460,21 @@ o_n_heap_buffering value =
[ bgroup "buffered"
[
benchIOSrc1 "joinInner (sqrtVal)"
$ joinWith Internal.joinInner sqrtVal
$ joinWith S.joinInner sqrtVal
, benchIOSrc1 "joinInnerMap"
$ joinMapWith Internal.joinInnerMap halfVal
$ joinMapWith S.joinInnerMap halfVal
, benchIOSrc1 "joinLeft (sqrtVal)"
$ joinWith Internal.joinLeft sqrtVal
$ joinWith S.joinLeft sqrtVal
, benchIOSrc1 "joinLeftMap "
$ joinMapWith Internal.joinLeftMap halfVal
$ joinMapWith S.joinLeftMap halfVal
, benchIOSrc1 "joinOuter (sqrtVal)"
$ joinWith Internal.joinOuter sqrtVal
$ joinWith S.joinOuter sqrtVal
, benchIOSrc1 "joinOuterMap"
$ joinMapWith Internal.joinOuterMap halfVal
$ joinMapWith S.joinOuterMap halfVal
, benchIOSrc1 "intersectBy (sqrtVal)"
$ joinWith Internal.intersectBy sqrtVal
$ joinWith S.intersectBy sqrtVal
, benchIOSrc1 "intersectBySorted"
$ joinMapWith (Internal.intersectBySorted compare) halfVal
$ joinMapWith (S.intersectBySorted compare) halfVal
]
]
@ -482,7 +496,9 @@ benchmarks moduleName size =
[
-- multi-stream
o_1_space_joining size
#ifdef USE_PRELUDE
, o_1_space_concatFoldable size
#endif
, o_1_space_concat size
, o_1_space_applicative size

View File

@ -13,22 +13,29 @@ module Stream.Generate (benchmarks) where
import Data.Functor.Identity (Identity)
import qualified Stream.Common as Common
#ifdef USE_PRELUDE
import qualified GHC.Exts as GHC
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Stream.IsStream as Stream
import qualified Prelude
#endif
#else
import qualified Streamly.Internal.Data.Stream as Stream
import qualified Streamly.Internal.Data.Unfold as Unfold
#endif
import Gauge
import Streamly.Benchmark.Common
import Streamly.Internal.Data.Stream.Serial (SerialT)
#ifdef USE_PRELUDE
import Streamly.Benchmark.Prelude
import Streamly.Prelude (fromSerial, MonadAsync)
import Streamly.Prelude (MonadAsync)
import Stream.Common hiding (MonadAsync, replicate, enumerateFromTo)
import Streamly.Benchmark.Prelude hiding
(benchIOSrc, sourceUnfoldrM, apDiscardFst, apDiscardSnd, apLiftA2, toNullAp
, monadThen, toNullM, toNullM3, filterAllInM, filterAllOutM, filterSome
, breakAfterSome, toListM, toListSome)
#else
import Stream.Common
#endif
import qualified Stream.Common as SC
import System.IO.Unsafe (unsafeInterleaveIO)
@ -41,6 +48,7 @@ import Prelude hiding (repeat, replicate, iterate)
-------------------------------------------------------------------------------
-- fromList
-------------------------------------------------------------------------------
#ifdef USE_PRELUDE
{-# INLINE sourceIsList #-}
sourceIsList :: Int -> Int -> SerialT Identity Int
@ -126,42 +134,6 @@ fromIndices value n = S.take value $ S.fromIndices (+ n)
{-# INLINE fromIndicesM #-}
fromIndicesM :: (MonadAsync m, S.IsStream t) => Int -> Int -> t m Int
fromIndicesM value n = S.take value $ S.fromIndicesM (return <$> (+ n))
o_1_space_generation_prel :: Int -> [Benchmark]
o_1_space_generation_prel value =
[ bgroup "generation"
[ benchIOSrc fromSerial "unfoldr" (sourceUnfoldr value)
, benchIOSrc fromSerial "unfoldrM" (sourceUnfoldrM value)
, benchIOSrc fromSerial "repeat" (repeat value)
, benchIOSrc fromSerial "repeatM" (repeatM value)
, benchIOSrc fromSerial "replicate" (replicate value)
, benchIOSrc fromSerial "replicateM" (replicateM value)
, benchIOSrc fromSerial "iterate" (iterate value)
, benchIOSrc fromSerial "iterateM" (iterateM value)
, benchIOSrc fromSerial "fromIndices" (fromIndices value)
, benchIOSrc fromSerial "fromIndicesM" (fromIndicesM value)
, benchIOSrc fromSerial "intFromTo" (sourceIntFromTo value)
, benchIOSrc fromSerial "intFromThenTo" (sourceIntFromThenTo value)
, benchIOSrc fromSerial "integerFromStep" (sourceIntegerFromStep value)
, benchIOSrc fromSerial "fracFromThenTo" (sourceFracFromThenTo value)
, benchIOSrc fromSerial "fracFromTo" (sourceFracFromTo value)
, benchIOSrc fromSerial "fromList" (sourceFromList value)
, benchPureSrc "IsList.fromList" (sourceIsList value)
, benchPureSrc "IsString.fromString" (sourceIsString value)
, benchIOSrc fromSerial "fromListM" (sourceFromListM value)
, benchIOSrc fromSerial "enumerateFrom" (enumerateFrom value)
, benchIOSrc fromSerial "enumerateFromTo" (enumerateFromTo value)
, benchIOSrc fromSerial "enumerateFromThen" (enumerateFromThen value)
, benchIOSrc fromSerial "enumerateFromThenTo" (enumerateFromThenTo value)
, benchIOSrc fromSerial "enumerate" (enumerate value)
, benchIOSrc fromSerial "enumerateTo" (enumerateTo value)
-- These essentially test cons and consM
, benchIOSrc fromSerial "fromFoldable" (sourceFromFoldable value)
, benchIOSrc fromSerial "fromFoldableM" (sourceFromFoldableM value)
, benchIOSrc fromSerial "absTimes" $ absTimes value
]
]
#endif
{-# INLINE mfixUnfold #-}
@ -170,29 +142,51 @@ mfixUnfold count start = Stream.mfix f
where
f action = do
let incr n act = fmap ((+n) . snd) $ unsafeInterleaveIO act
x <- Stream.unfold Unfold.fromListM [incr 1 action, incr 2 action]
y <- SC.sourceUnfoldr count start
x <- Common.fromListM [incr 1 action, incr 2 action]
y <- Common.sourceUnfoldr count start
return (x, y)
{-# INLINE fromFoldable #-}
fromFoldable :: Int -> Int -> SerialT m Int
fromFoldable count start =
Stream.fromFoldable (Prelude.enumFromTo count start)
{-# INLINE fromFoldableM #-}
fromFoldableM :: Monad m => Int -> Int -> SerialT m Int
fromFoldableM count start =
Stream.fromFoldableM (fmap return (Prelude.enumFromTo count start))
o_1_space_generation :: Int -> [Benchmark]
o_1_space_generation value =
[ bgroup "generation"
[ SC.benchIOSrc "unfold" (SC.sourceUnfoldr value)
, SC.benchIOSrc "fromFoldable" (fromFoldable value)
, SC.benchIOSrc "fromFoldableM" (fromFoldableM value)
, SC.benchIOSrc "mfix_10" (mfixUnfold 10)
, SC.benchIOSrc "mfix_100" (mfixUnfold 100)
, SC.benchIOSrc "mfix_1000" (mfixUnfold 1000)
[ benchIOSrc "unfoldr" (sourceUnfoldr value)
, benchIOSrc "unfoldrM" (sourceUnfoldrM value)
#ifdef USE_PRELUDE
, benchIOSrc "repeat" (repeat value)
, benchIOSrc "repeatM" (repeatM value)
, benchIOSrc "replicate" (replicate value)
, benchIOSrc "replicateM" (replicateM value)
, benchIOSrc "iterate" (iterate value)
, benchIOSrc "iterateM" (iterateM value)
, benchIOSrc "fromIndices" (fromIndices value)
, benchIOSrc "fromIndicesM" (fromIndicesM value)
, benchIOSrc "intFromTo" (sourceIntFromTo value)
, benchIOSrc "intFromThenTo" (sourceIntFromThenTo value)
, benchIOSrc "integerFromStep" (sourceIntegerFromStep value)
, benchIOSrc "fracFromThenTo" (sourceFracFromThenTo value)
, benchIOSrc "fracFromTo" (sourceFracFromTo value)
, benchIOSrc "fromList" (sourceFromList value)
, benchPureSrc "IsList.fromList" (sourceIsList value)
, benchPureSrc "IsString.fromString" (sourceIsString value)
, benchIOSrc "fromListM" (sourceFromListM value)
, benchIOSrc "enumerateFrom" (enumerateFrom value)
, benchIOSrc "enumerateFromTo" (enumerateFromTo value)
, benchIOSrc "enumerateFromThen" (enumerateFromThen value)
, benchIOSrc "enumerateFromThenTo" (enumerateFromThenTo value)
, benchIOSrc "enumerate" (enumerate value)
, benchIOSrc "enumerateTo" (enumerateTo value)
#endif
-- These essentially test cons and consM
, benchIOSrc "fromFoldable" (sourceFromFoldable value)
, benchIOSrc "fromFoldableM" (sourceFromFoldableM value)
#ifdef USE_PRELUDE
, benchIOSrc "absTimes" $ absTimes value
#endif
, Common.benchIOSrc "mfix_10" (mfixUnfold 10)
, Common.benchIOSrc "mfix_100" (mfixUnfold 100)
, Common.benchIOSrc "mfix_1000" (mfixUnfold 1000)
]
]
@ -218,11 +212,6 @@ o_n_heap_generation value =
--
benchmarks :: String -> Int -> [Benchmark]
benchmarks moduleName size =
[
#ifdef USE_PRELUDE
bgroup (o_1_space_prefix moduleName) (o_1_space_generation_prel size)
,
#endif
bgroup (o_1_space_prefix moduleName) (o_1_space_generation size)
[ bgroup (o_1_space_prefix moduleName) (o_1_space_generation size)
, bgroup (o_n_heap_prefix moduleName) (o_n_heap_generation size)
]

View File

@ -11,21 +11,23 @@
module Stream.Lift (benchmarks) where
#ifdef USE_PRELUDE
import Control.DeepSeq (NFData(..))
import Control.Monad.Trans.Class (lift)
import Control.Monad.State.Strict (StateT, get, put, MonadState)
import Streamly.Prelude (fromSerial)
import Streamly.Benchmark.Prelude
import qualified Control.Monad.State.Strict as State
import qualified Streamly.Prelude as Stream
import qualified Streamly.Internal.Data.Stream.IsStream as Internal
#else
import Control.DeepSeq (NFData(..))
import Data.Functor.Identity (Identity)
import Stream.Common (sourceUnfoldr, sourceUnfoldrM, benchIOSrc)
import Stream.Common (sourceUnfoldr, sourceUnfoldrM, benchIOSrc, drain)
import System.Random (randomRIO)
#ifdef USE_PRELUDE
import Streamly.Benchmark.Prelude hiding
(sourceUnfoldr, sourceUnfoldrM, benchIOSrc)
import qualified Streamly.Internal.Data.Stream.IsStream as Stream
#else
import Streamly.Benchmark.Prelude (benchIO)
import qualified Streamly.Internal.Data.Stream as Stream
#endif
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Stream.Common as Common
import qualified Control.Monad.State.Strict as State
import Gauge
import Streamly.Internal.Data.Stream.Serial (SerialT)
@ -36,11 +38,11 @@ import Prelude hiding (reverse, tail)
-------------------------------------------------------------------------------
-- Monad transformation (hoisting etc.)
-------------------------------------------------------------------------------
#ifdef USE_PRELUDE
{-# INLINE sourceUnfoldrState #-}
sourceUnfoldrState :: (Stream.IsStream t, Stream.MonadAsync m)
=> Int -> Int -> t (StateT Int m) Int
sourceUnfoldrState value n = Stream.unfoldrM step n
sourceUnfoldrState :: Common.MonadAsync m =>
Int -> Int -> SerialT (StateT Int m) Int
sourceUnfoldrState value n = Common.unfoldrM step n
where
step cnt =
if cnt > n + value
@ -51,27 +53,36 @@ sourceUnfoldrState value n = Stream.unfoldrM step n
return (Just (s, cnt + 1))
{-# INLINE evalStateT #-}
evalStateT :: Stream.MonadAsync m => Int -> Int -> SerialT m Int
evalStateT :: Common.MonadAsync m => Int -> Int -> SerialT m Int
evalStateT value n =
Internal.evalStateT (return 0) (sourceUnfoldrState value n)
Stream.evalStateT (return 0) (sourceUnfoldrState value n)
{-# INLINE withState #-}
withState :: Stream.MonadAsync m => Int -> Int -> SerialT m Int
withState :: Common.MonadAsync m => Int -> Int -> SerialT m Int
withState value n =
Internal.evalStateT
(return (0 :: Int)) (Internal.liftInner (sourceUnfoldrM value n))
Stream.evalStateT
(return (0 :: Int)) (Stream.liftInner (sourceUnfoldrM value n))
{-# INLINE benchHoistSink #-}
benchHoistSink
:: (NFData b)
=> Int -> String -> (SerialT Identity Int -> IO b) -> Benchmark
benchHoistSink value name f =
bench name $ nfIO $ randomRIO (1,1) >>= f . sourceUnfoldr value
o_1_space_hoisting :: Int -> [Benchmark]
o_1_space_hoisting value =
[ bgroup "hoisting"
[ benchIOSrc fromSerial "evalState" (evalStateT value)
, benchIOSrc fromSerial "withState" (withState value)
[ benchIOSrc "evalState" (evalStateT value)
, benchIOSrc "withState" (withState value)
, benchHoistSink value "generally"
((\xs -> Stream.fold Fold.length xs :: IO Int) . Stream.generally)
]
]
{-# INLINE iterateStateIO #-}
iterateStateIO ::
(Stream.MonadAsync m)
Monad m
=> Int
-> StateT Int m Int
iterateStateIO n = do
@ -95,7 +106,7 @@ iterateStateT n = do
{-# INLINE iterateState #-}
{-# SPECIALIZE iterateState :: Int -> SerialT (StateT Int IO) Int #-}
iterateState ::
(Stream.MonadAsync m, MonadState Int m)
MonadState Int m
=> Int
-> SerialT m Int
iterateState n = do
@ -112,38 +123,12 @@ o_n_heap_transformer value =
[ benchIO "StateT Int IO (n times) (baseline)" $ \n ->
State.evalStateT (iterateStateIO n) value
, benchIO "SerialT (StateT Int IO) (n times)" $ \n ->
State.evalStateT (Stream.drain (iterateStateT n)) value
State.evalStateT (drain (iterateStateT n)) value
, benchIO "MonadState Int m => SerialT m Int" $ \n ->
State.evalStateT (Stream.drain (iterateState n)) value
State.evalStateT (drain (iterateState n)) value
]
]
#else
{-# INLINE benchHoistSink #-}
benchHoistSink
:: (NFData b)
=> Int -> String -> (SerialT Identity Int -> IO b) -> Benchmark
benchHoistSink value name f =
bench name $ nfIO $ randomRIO (1,1) >>= f . sourceUnfoldr value
-- XXX We should be using sourceUnfoldrM for fair comparison with IO monad, but
-- we can't use it as it requires MonadAsync constraint.
{-# INLINE liftInner #-}
liftInner :: Monad m => Int -> Int -> SerialT m Int
liftInner value n =
Stream.evalStateT
(return (0 :: Int)) (Stream.liftInner (sourceUnfoldrM value n))
o_1_space_generation :: Int -> [Benchmark]
o_1_space_generation value =
[ bgroup "lift"
[ benchHoistSink value "length . generally"
((\(_ :: SerialT IO Int) -> return 8 :: IO Int) . Stream.generally)
, benchIOSrc "liftInner/evalStateT" (liftInner value)
]
]
#endif
-------------------------------------------------------------------------------
-- Main
-------------------------------------------------------------------------------
@ -153,11 +138,6 @@ o_1_space_generation value =
--
benchmarks :: String -> Int -> [Benchmark]
benchmarks moduleName size =
[
#ifdef USE_PRELUDE
bgroup (o_1_space_prefix moduleName) (o_1_space_hoisting size)
[ bgroup (o_1_space_prefix moduleName) (o_1_space_hoisting size)
, bgroup (o_n_heap_prefix moduleName) (o_n_heap_transformer size)
#else
bgroup (o_1_space_prefix moduleName) (o_1_space_generation size)
#endif
]

View File

@ -1,488 +0,0 @@
-- |
-- Module : Stream.NestedFold
-- Copyright : (c) 2018 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE RankNTypes #-}
module Stream.NestedFold (benchmarks) where
import Control.DeepSeq (NFData(..))
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Monoid (Sum(..))
import Data.Proxy (Proxy(..))
import Data.HashMap.Strict (HashMap)
import GHC.Generics (Generic)
import Streamly.Internal.Data.IsMap.HashMap ()
import qualified Streamly.Internal.Data.Refold.Type as Refold
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Stream.IsStream as Internal
import qualified Streamly.Prelude as S
import Gauge
import Streamly.Prelude (SerialT, fromSerial)
import Streamly.Benchmark.Common
import Streamly.Benchmark.Prelude
import Prelude hiding (reverse, tail)
-------------------------------------------------------------------------------
-- Iteration/looping utilities
-------------------------------------------------------------------------------
{-# INLINE iterateN #-}
iterateN :: (Int -> a -> a) -> a -> Int -> a
iterateN g initial count = f count initial
where
f (0 :: Int) x = x
f i x = f (i - 1) (g i x)
-- Iterate a transformation over a singleton stream
{-# INLINE iterateSingleton #-}
iterateSingleton :: S.MonadAsync m
=> (Int -> SerialT m Int -> SerialT m Int)
-> Int
-> Int
-> SerialT m Int
iterateSingleton g count n = iterateN g (return n) count
-- XXX need to check why this is slower than the explicit recursion above, even
-- if the above code is written in a foldr like head recursive way. We also
-- need to try this with foldlM' once #150 is fixed.
-- However, it is perhaps best to keep the iteration benchmarks independent of
-- foldrM and any related fusion issues.
{-# INLINE _iterateSingleton #-}
_iterateSingleton ::
S.MonadAsync m
=> (Int -> SerialT m Int -> SerialT m Int)
-> Int
-> Int
-> SerialT m Int
_iterateSingleton g value n = S.foldrM g (return n) $ sourceIntFromTo value n
-- Apply transformation g count times on a stream of length len
{-# INLINE iterateSource #-}
iterateSource ::
S.MonadAsync m
=> (SerialT m Int -> SerialT m Int)
-> Int
-> Int
-> Int
-> SerialT m Int
iterateSource g count len n = f count (sourceUnfoldrM len n)
where
f (0 :: Int) stream = stream
f i stream = f (i - 1) (g stream)
-------------------------------------------------------------------------------
-- Functor
-------------------------------------------------------------------------------
o_n_space_functor :: Int -> [Benchmark]
o_n_space_functor value =
[ bgroup "Functor"
[ benchIO "(+) (n times) (baseline)" $ \i0 ->
iterateN (\i acc -> acc >>= \n -> return $ i + n) (return i0) value
, benchIOSrc fromSerial "(<$) (n times)" $
iterateSingleton (<$) value
, benchIOSrc fromSerial "fmap (n times)" $
iterateSingleton (fmap . (+)) value
{-
, benchIOSrc fromSerial "_(<$) (n times)" $
_iterateSingleton (<$) value
, benchIOSrc fromSerial "_fmap (n times)" $
_iterateSingleton (fmap . (+)) value
-}
]
]
-------------------------------------------------------------------------------
-- Grouping transformations
-------------------------------------------------------------------------------
{-# INLINE groups #-}
groups :: MonadIO m => SerialT m Int -> m ()
groups = S.drain . S.groups FL.drain
-- XXX Change this test when the order of comparison is later changed
{-# INLINE groupsByGT #-}
groupsByGT :: MonadIO m => SerialT m Int -> m ()
groupsByGT = S.drain . S.groupsBy (>) FL.drain
{-# INLINE groupsByEq #-}
groupsByEq :: MonadIO m => SerialT m Int -> m ()
groupsByEq = S.drain . S.groupsBy (==) FL.drain
-- XXX Change this test when the order of comparison is later changed
{-# INLINE groupsByRollingLT #-}
groupsByRollingLT :: MonadIO m => SerialT m Int -> m ()
groupsByRollingLT =
S.drain . S.groupsByRolling (<) FL.drain
{-# INLINE groupsByRollingEq #-}
groupsByRollingEq :: MonadIO m => SerialT m Int -> m ()
groupsByRollingEq =
S.drain . S.groupsByRolling (==) FL.drain
{-# INLINE foldMany #-}
foldMany :: Monad m => SerialT m Int -> m ()
foldMany =
S.drain
. S.map getSum
. Internal.foldMany (FL.take 2 FL.mconcat)
. S.map Sum
{-# INLINE refoldMany #-}
refoldMany :: Monad m => SerialT m Int -> m ()
refoldMany =
S.drain
. S.map getSum
. Internal.refoldMany (Refold.take 2 Refold.sconcat) (return mempty)
. S.map Sum
{-# INLINE foldIterateM #-}
foldIterateM :: Monad m => SerialT m Int -> m ()
foldIterateM =
S.drain
. S.map getSum
. Internal.foldIterateM
(return . FL.take 2 . FL.sconcat) (return (Sum 0))
. S.map Sum
{-# INLINE refoldIterateM #-}
refoldIterateM :: Monad m => SerialT m Int -> m ()
refoldIterateM =
S.drain
. S.map getSum
. Internal.refoldIterateM
(Refold.take 2 Refold.sconcat) (return (Sum 0))
. S.map Sum
o_1_space_grouping :: Int -> [Benchmark]
o_1_space_grouping value =
-- Buffering operations using heap proportional to group/window sizes.
[ bgroup "grouping"
[ benchIOSink value "groups" groups
, benchIOSink value "groupsByGT" groupsByGT
, benchIOSink value "groupsByEq" groupsByEq
, benchIOSink value "groupsByRollingLT" groupsByRollingLT
, benchIOSink value "groupsByRollingEq" groupsByRollingEq
, benchIOSink value "foldMany" foldMany
, benchIOSink value "refoldMany" refoldMany
, benchIOSink value "foldIterateM" foldIterateM
, benchIOSink value "refoldIterateM" refoldIterateM
]
]
-------------------------------------------------------------------------------
-- Size conserving transformations (reordering, buffering, etc.)
-------------------------------------------------------------------------------
{-# INLINE reverse #-}
reverse :: MonadIO m => Int -> SerialT m Int -> m ()
reverse n = composeN n S.reverse
{-# INLINE reverse' #-}
reverse' :: MonadIO m => Int -> SerialT m Int -> m ()
reverse' n = composeN n Internal.reverse'
o_n_heap_buffering :: Int -> [Benchmark]
o_n_heap_buffering value =
[ bgroup "buffered"
[
-- Reversing a stream
benchIOSink value "reverse" (reverse 1)
, benchIOSink value "reverse'" (reverse' 1)
, benchIOSink value "mkAsync" (mkAsync fromSerial)
]
]
-------------------------------------------------------------------------------
-- Grouping/Splitting
-------------------------------------------------------------------------------
{-# INLINE classifySessionsOf #-}
classifySessionsOf :: S.MonadAsync m => (Int -> Int) -> SerialT m Int -> m ()
classifySessionsOf getKey =
S.drain
. Internal.classifySessionsOf
(const (return False)) 3 (FL.take 10 FL.sum)
. Internal.timestamped
. S.map (\x -> (getKey x, x))
{-# INLINE classifySessionsOfHash #-}
classifySessionsOfHash :: S.MonadAsync m =>
(Int -> Int) -> SerialT m Int -> m ()
classifySessionsOfHash getKey =
S.drain
. Internal.classifySessionsByGeneric
(Proxy :: Proxy (HashMap k))
1 False (const (return False)) 3 (FL.take 10 FL.sum)
. Internal.timestamped
. S.map (\x -> (getKey x, x))
o_n_space_grouping :: Int -> [Benchmark]
o_n_space_grouping value =
-- Buffering operations using heap proportional to group/window sizes.
[ bgroup "grouping"
[ benchIOSink value "classifySessionsOf (10000 buckets)"
(classifySessionsOf (getKey 10000))
, benchIOSink value "classifySessionsOf (64 buckets)"
(classifySessionsOf (getKey 64))
, benchIOSink value "classifySessionsOfHash (10000 buckets)"
(classifySessionsOfHash (getKey 10000))
, benchIOSink value "classifySessionsOfHash (64 buckets)"
(classifySessionsOfHash (getKey 64))
]
]
where
getKey n = (`mod` n)
-------------------------------------------------------------------------------
-- Mixed Transformation
-------------------------------------------------------------------------------
{-# INLINE scanMap #-}
scanMap :: MonadIO m => Int -> SerialT m Int -> m ()
scanMap n = composeN n $ S.map (subtract 1) . S.scanl' (+) 0
{-# INLINE dropMap #-}
dropMap :: MonadIO m => Int -> SerialT m Int -> m ()
dropMap n = composeN n $ S.map (subtract 1) . S.drop 1
{-# INLINE dropScan #-}
dropScan :: MonadIO m => Int -> SerialT m Int -> m ()
dropScan n = composeN n $ S.scanl' (+) 0 . S.drop 1
{-# INLINE takeDrop #-}
takeDrop :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
takeDrop value n = composeN n $ S.drop 1 . S.take (value + 1)
{-# INLINE takeScan #-}
takeScan :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
takeScan value n = composeN n $ S.scanl' (+) 0 . S.take (value + 1)
{-# INLINE takeMap #-}
takeMap :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
takeMap value n = composeN n $ S.map (subtract 1) . S.take (value + 1)
{-# INLINE filterDrop #-}
filterDrop :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
filterDrop value n = composeN n $ S.drop 1 . S.filter (<= (value + 1))
{-# INLINE filterTake #-}
filterTake :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
filterTake value n = composeN n $ S.take (value + 1) . S.filter (<= (value + 1))
{-# INLINE filterScan #-}
filterScan :: MonadIO m => Int -> SerialT m Int -> m ()
filterScan n = composeN n $ S.scanl' (+) 0 . S.filter (<= maxBound)
{-# INLINE filterScanl1 #-}
filterScanl1 :: MonadIO m => Int -> SerialT m Int -> m ()
filterScanl1 n = composeN n $ S.scanl1' (+) . S.filter (<= maxBound)
{-# INLINE filterMap #-}
filterMap :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
filterMap value n = composeN n $ S.map (subtract 1) . S.filter (<= (value + 1))
-------------------------------------------------------------------------------
-- Scan and fold
-------------------------------------------------------------------------------
data Pair a b =
Pair !a !b
deriving (Generic, NFData)
{-# INLINE sumProductFold #-}
sumProductFold :: Monad m => SerialT m Int -> m (Int, Int)
sumProductFold = S.foldl' (\(s, p) x -> (s + x, p * x)) (0, 1)
{-# INLINE sumProductScan #-}
sumProductScan :: Monad m => SerialT m Int -> m (Pair Int Int)
sumProductScan =
S.foldl' (\(Pair _ p) (s0, x) -> Pair s0 (p * x)) (Pair 0 1) .
S.scanl' (\(s, _) x -> (s + x, x)) (0, 0)
{-# INLINE foldl'ReduceMap #-}
foldl'ReduceMap :: Monad m => SerialT m Int -> m Int
foldl'ReduceMap = fmap (+ 1) . S.foldl' (+) 0
o_1_space_transformations_mixed :: Int -> [Benchmark]
o_1_space_transformations_mixed value =
-- scanl-map and foldl-map are equivalent to the scan and fold in the foldl
-- library. If scan/fold followed by a map is efficient enough we may not
-- need monolithic implementations of these.
[ bgroup "mixed"
[ benchIOSink value "scanl-map" (scanMap 1)
, benchIOSink value "foldl-map" foldl'ReduceMap
, benchIOSink value "sum-product-fold" sumProductFold
, benchIOSink value "sum-product-scan" sumProductScan
]
]
o_1_space_transformations_mixedX4 :: Int -> [Benchmark]
o_1_space_transformations_mixedX4 value =
[ bgroup "mixedX4"
[ benchIOSink value "scan-map" (scanMap 4)
, benchIOSink value "drop-map" (dropMap 4)
, benchIOSink value "drop-scan" (dropScan 4)
, benchIOSink value "take-drop" (takeDrop value 4)
, benchIOSink value "take-scan" (takeScan value 4)
, benchIOSink value "take-map" (takeMap value 4)
, benchIOSink value "filter-drop" (filterDrop value 4)
, benchIOSink value "filter-take" (filterTake value 4)
, benchIOSink value "filter-scan" (filterScan 4)
, benchIOSink value "filter-scanl1" (filterScanl1 4)
, benchIOSink value "filter-map" (filterMap value 4)
]
]
-------------------------------------------------------------------------------
-- Iterating a transformation over and over again
-------------------------------------------------------------------------------
-- this is quadratic
{-# INLINE iterateScan #-}
iterateScan :: S.MonadAsync m => Int -> Int -> Int -> SerialT m Int
iterateScan = iterateSource (S.scanl' (+) 0)
-- this is quadratic
{-# INLINE iterateScanl1 #-}
iterateScanl1 :: S.MonadAsync m => Int -> Int -> Int -> SerialT m Int
iterateScanl1 = iterateSource (S.scanl1' (+))
{-# INLINE iterateMapM #-}
iterateMapM :: S.MonadAsync m => Int -> Int -> Int -> SerialT m Int
iterateMapM = iterateSource (S.mapM return)
{-# INLINE iterateFilterEven #-}
iterateFilterEven :: S.MonadAsync m => Int -> Int -> Int -> SerialT m Int
iterateFilterEven = iterateSource (S.filter even)
{-# INLINE iterateTakeAll #-}
iterateTakeAll :: S.MonadAsync m => Int -> Int -> Int -> Int -> SerialT m Int
iterateTakeAll value = iterateSource (S.take (value + 1))
{-# INLINE iterateDropOne #-}
iterateDropOne :: S.MonadAsync m => Int -> Int -> Int -> SerialT m Int
iterateDropOne = iterateSource (S.drop 1)
{-# INLINE iterateDropWhileFalse #-}
iterateDropWhileFalse :: S.MonadAsync m
=> Int -> Int -> Int -> Int -> SerialT m Int
iterateDropWhileFalse value = iterateSource (S.dropWhile (> (value + 1)))
{-# INLINE iterateDropWhileTrue #-}
iterateDropWhileTrue :: S.MonadAsync m
=> Int -> Int -> Int -> Int -> SerialT m Int
iterateDropWhileTrue value = iterateSource (S.dropWhile (<= (value + 1)))
{-# INLINE tail #-}
tail :: Monad m => SerialT m a -> m ()
tail s = S.tail s >>= mapM_ tail
{-# INLINE nullHeadTail #-}
nullHeadTail :: Monad m => SerialT m Int -> m ()
nullHeadTail s = do
r <- S.null s
when (not r) $ do
_ <- S.head s
S.tail s >>= mapM_ nullHeadTail
-- Head recursive operations.
o_n_stack_iterated :: Int -> [Benchmark]
o_n_stack_iterated value = by10 `seq` by100 `seq`
[ bgroup "iterated"
[ benchIOSrc fromSerial "mapM (n/10 x 10)" $ iterateMapM by10 10
, benchIOSrc fromSerial "scanl' (quadratic) (n/100 x 100)" $
iterateScan by100 100
, benchIOSrc fromSerial "scanl1' (n/10 x 10)" $ iterateScanl1 by10 10
, benchIOSrc fromSerial "filterEven (n/10 x 10)" $
iterateFilterEven by10 10
, benchIOSrc fromSerial "takeAll (n/10 x 10)" $
iterateTakeAll value by10 10
, benchIOSrc fromSerial "dropOne (n/10 x 10)" $ iterateDropOne by10 10
, benchIOSrc fromSerial "dropWhileFalse (n/10 x 10)" $
iterateDropWhileFalse value by10 10
, benchIOSrc fromSerial "dropWhileTrue (n/10 x 10)" $
iterateDropWhileTrue value by10 10
, benchIOSink value "tail" tail
, benchIOSink value "nullHeadTail" nullHeadTail
]
]
where
by10 = value `div` 10
by100 = value `div` 100
-------------------------------------------------------------------------------
-- Pipes
-------------------------------------------------------------------------------
o_1_space_pipes :: Int -> [Benchmark]
o_1_space_pipes value =
[ bgroup "pipes"
[ benchIOSink value "mapM" (transformMapM fromSerial 1)
, benchIOSink value "compose" (transformComposeMapM fromSerial 1)
, benchIOSink value "tee" (transformTeeMapM fromSerial 1)
#ifdef DEVBUILD
-- XXX this take 1 GB memory to compile
, benchIOSink value "zip" (transformZipMapM fromSerial 1)
#endif
]
]
o_1_space_pipesX4 :: Int -> [Benchmark]
o_1_space_pipesX4 value =
[ bgroup "pipesX4"
[ benchIOSink value "mapM" (transformMapM fromSerial 4)
, benchIOSink value "compose" (transformComposeMapM fromSerial 4)
, benchIOSink value "tee" (transformTeeMapM fromSerial 4)
#ifdef DEVBUILD
-- XXX this take 1 GB memory to compile
, benchIOSink value "zip" (transformZipMapM fromSerial 4)
#endif
]
]
-------------------------------------------------------------------------------
-- Main
-------------------------------------------------------------------------------
-- In addition to gauge options, the number of elements in the stream can be
-- passed using the --stream-size option.
--
benchmarks :: String -> Int -> [Benchmark]
benchmarks moduleName size =
[ bgroup (o_1_space_prefix moduleName) $ Prelude.concat
[ o_1_space_grouping size
, o_1_space_transformations_mixed size
, o_1_space_transformations_mixedX4 size
-- pipes
, o_1_space_pipes size
, o_1_space_pipesX4 size
]
, bgroup (o_n_stack_prefix moduleName) (o_n_stack_iterated size)
, bgroup (o_n_heap_prefix moduleName) $ Prelude.concat
[ o_n_space_grouping size
, o_n_space_functor size
, o_n_heap_buffering size
]
]

View File

@ -7,118 +7,533 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE RankNTypes #-}
module Stream.Reduce (benchmarks) where
import Control.Monad.Catch (MonadCatch)
import Data.Monoid (Sum(Sum), getSum)
import Stream.Common (benchIOSink)
import Streamly.Benchmark.Common (o_1_space_prefix)
import Streamly.Internal.Data.Stream (Stream)
import Control.DeepSeq (NFData(..))
import Control.Monad.IO.Class (MonadIO(..))
import Data.Monoid (Sum(..))
import GHC.Generics (Generic)
import Streamly.Internal.Data.IsMap.HashMap ()
import Streamly.Internal.Data.Stream.Serial (SerialT)
import qualified Streamly.Internal.Data.Stream as Stream
import qualified Streamly.Internal.Data.Parser as PR
import qualified Streamly.Internal.Data.Parser.ParserD as ParserD
import Prelude hiding (length, sum, or, and, any, all, notElem, elem, (!!),
lookup, repeat, minimum, maximum, product, last, mapM_, init)
import qualified Streamly.Internal.Data.Refold.Type as Refold
import qualified Streamly.Internal.Data.Fold as FL
import qualified Stream.Common as Common
#ifdef USE_PRELUDE
import Control.Monad (when)
import Data.Proxy (Proxy(..))
import Data.HashMap.Strict (HashMap)
import qualified Streamly.Internal.Data.Stream.IsStream as S
import Streamly.Prelude (fromSerial)
import Streamly.Benchmark.Prelude hiding
( benchIOSrc, sourceUnfoldrM, apDiscardFst, apDiscardSnd, apLiftA2
, toNullAp, monadThen, toNullM, toNullM3, filterAllInM, filterAllOutM
, filterSome, breakAfterSome, toListM, toListSome, transformMapM
, transformComposeMapM, transformTeeMapM, transformZipMapM)
#else
import Streamly.Benchmark.Prelude (benchIO)
import qualified Streamly.Internal.Data.Stream as S
#endif
import Gauge
import Streamly.Benchmark.Common
import Stream.Common
import Prelude hiding (reverse, tail)
import qualified Streamly.Data.Fold as FL
import qualified Streamly.Internal.Data.Refold.Type as Refold
import Control.Monad.IO.Class (MonadIO)
-------------------------------------------------------------------------------
-- Iteration/looping utilities
-------------------------------------------------------------------------------
{-# INLINE iterateN #-}
iterateN :: (Int -> a -> a) -> a -> Int -> a
iterateN g initial count = f count initial
where
f (0 :: Int) x = x
f i x = f (i - 1) (g i x)
-- Iterate a transformation over a singleton stream
{-# INLINE iterateSingleton #-}
iterateSingleton :: Monad m
=> (Int -> SerialT m Int -> SerialT m Int)
-> Int
-> Int
-> SerialT m Int
iterateSingleton g count n = iterateN g (return n) count
{-
-- XXX need to check why this is slower than the explicit recursion above, even
-- if the above code is written in a foldr like head recursive way. We also
-- need to try this with foldlM' once #150 is fixed.
-- However, it is perhaps best to keep the iteration benchmarks independent of
-- foldrM and any related fusion issues.
{-# INLINE _iterateSingleton #-}
_iterateSingleton ::
Monad m
=> (Int -> SerialT m Int -> SerialT m Int)
-> Int
-> Int
-> SerialT m Int
_iterateSingleton g value n = S.foldrM g (return n) $ sourceIntFromTo value n
-}
-- Apply transformation g count times on a stream of length len
{-# INLINE iterateSource #-}
iterateSource ::
MonadAsync m
=> (SerialT m Int -> SerialT m Int)
-> Int
-> Int
-> Int
-> SerialT m Int
iterateSource g count len n = f count (sourceUnfoldrM len n)
where
f (0 :: Int) stream = stream
f i stream = f (i - 1) (g stream)
-------------------------------------------------------------------------------
-- Functor
-------------------------------------------------------------------------------
o_n_space_functor :: Int -> [Benchmark]
o_n_space_functor value =
[ bgroup "Functor"
[ benchIO "(+) (n times) (baseline)" $ \i0 ->
iterateN (\i acc -> acc >>= \n -> return $ i + n) (return i0) value
, benchIOSrc "(<$) (n times)" $
iterateSingleton (<$) value
, benchIOSrc "fmap (n times)" $
iterateSingleton (fmap . (+)) value
{-
, benchIOSrc fromSerial "_(<$) (n times)" $
_iterateSingleton (<$) value
, benchIOSrc fromSerial "_fmap (n times)" $
_iterateSingleton (fmap . (+)) value
-}
]
]
-------------------------------------------------------------------------------
-- Grouping transformations
-------------------------------------------------------------------------------
#ifdef USE_PRELUDE
{-# INLINE groups #-}
groups :: MonadIO m => SerialT m Int -> m ()
groups = Common.drain . S.groups FL.drain
-- XXX Change this test when the order of comparison is later changed
{-# INLINE groupsByGT #-}
groupsByGT :: MonadIO m => SerialT m Int -> m ()
groupsByGT = Common.drain . S.groupsBy (>) FL.drain
{-# INLINE groupsByEq #-}
groupsByEq :: MonadIO m => SerialT m Int -> m ()
groupsByEq = Common.drain . S.groupsBy (==) FL.drain
-- XXX Change this test when the order of comparison is later changed
{-# INLINE groupsByRollingLT #-}
groupsByRollingLT :: MonadIO m => SerialT m Int -> m ()
groupsByRollingLT =
Common.drain . S.groupsByRolling (<) FL.drain
{-# INLINE groupsByRollingEq #-}
groupsByRollingEq :: MonadIO m => SerialT m Int -> m ()
groupsByRollingEq =
Common.drain . S.groupsByRolling (==) FL.drain
#endif
{-# INLINE foldMany #-}
foldMany :: Monad m => Stream m Int -> m ()
foldMany :: Monad m => SerialT m Int -> m ()
foldMany =
Stream.fold FL.drain
Common.drain
. fmap getSum
. Stream.foldMany (FL.take 2 FL.mconcat)
. S.foldMany (FL.take 2 FL.mconcat)
. fmap Sum
{-# INLINE foldManyPost #-}
foldManyPost :: Monad m => Stream m Int -> m ()
foldManyPost :: Monad m => SerialT m Int -> m ()
foldManyPost =
Stream.fold FL.drain
Common.drain
. fmap getSum
. Stream.foldManyPost (FL.take 2 FL.mconcat)
. S.foldManyPost (FL.take 2 FL.mconcat)
. fmap Sum
{-# INLINE refoldMany #-}
refoldMany :: Monad m => Stream m Int -> m ()
refoldMany :: Monad m => SerialT m Int -> m ()
refoldMany =
Stream.fold FL.drain
Common.drain
. fmap getSum
. Stream.refoldMany (Refold.take 2 Refold.sconcat) (return mempty)
. S.refoldMany (Refold.take 2 Refold.sconcat) (return mempty)
. fmap Sum
{-# INLINE foldIterateM #-}
foldIterateM :: Monad m => Stream m Int -> m ()
foldIterateM :: Monad m => SerialT m Int -> m ()
foldIterateM =
Stream.fold FL.drain
Common.drain
. fmap getSum
. Stream.foldIterateM
. S.foldIterateM
(return . FL.take 2 . FL.sconcat) (return (Sum 0))
. fmap Sum
{-# INLINE refoldIterateM #-}
refoldIterateM :: Monad m => Stream m Int -> m ()
refoldIterateM :: Monad m => SerialT m Int -> m ()
refoldIterateM =
Stream.fold FL.drain
Common.drain
. fmap getSum
. Stream.refoldIterateM
. S.refoldIterateM
(Refold.take 2 Refold.sconcat) (return (Sum 0))
. fmap Sum
{-# INLINE parseMany #-}
parseMany :: MonadCatch m => Int -> Stream m Int -> m ()
parseMany n =
Stream.fold FL.drain
. fmap getSum
. Stream.parseMany (PR.fromFold $ FL.take n FL.mconcat)
. fmap Sum
{-# INLINE parseManyD #-}
parseManyD :: MonadCatch m => Int -> Stream m Int -> m ()
parseManyD n =
Stream.fold FL.drain
. fmap getSum
. Stream.parseManyD (ParserD.fromFold $ FL.take n FL.mconcat)
. fmap Sum
{-# INLINE parseIterate #-}
parseIterate :: MonadCatch m => Int -> Stream m Int -> m ()
parseIterate n =
Stream.fold FL.drain
. fmap getSum
. Stream.parseIterate (\_ -> PR.fromFold $ FL.take n FL.mconcat) 0
. fmap Sum
{-# INLINE arraysOf #-}
arraysOf :: (MonadCatch m, MonadIO m) => Int -> Stream m Int -> m ()
arraysOf n =
Stream.fold FL.drain
. Stream.arraysOf n
o_1_space_grouping :: Int -> [Benchmark]
o_1_space_grouping value =
-- Buffering operations using heap proportional to group/window sizes.
[ bgroup "reduce"
[ benchIOSink value "foldMany" foldMany
[ bgroup "grouping"
[
#ifdef USE_PRELUDE
benchIOSink value "groups" groups
, benchIOSink value "groupsByGT" groupsByGT
, benchIOSink value "groupsByEq" groupsByEq
, benchIOSink value "groupsByRollingLT" groupsByRollingLT
, benchIOSink value "groupsByRollingEq" groupsByRollingEq
,
#endif
-- XXX parseMany/parseIterate benchmarks are in the Parser/ParserD
-- modules we can bring those here. arraysOf benchmarks are in
-- Parser/ParserD/Array.Stream/FileSystem.Handle.
benchIOSink value "foldMany" foldMany
, benchIOSink value "foldManyPost" foldManyPost
, benchIOSink value "refoldMany" refoldMany
, benchIOSink value "foldIterateM" foldIterateM
, benchIOSink value "refoldIterateM" refoldIterateM
, benchIOSink value "parseMany" $ parseMany value
, benchIOSink value "parseManyD" $ parseManyD value
, benchIOSink value "parseIterate" $ parseIterate value
, benchIOSink value "arraysOf" $ arraysOf value
]
]
-------------------------------------------------------------------------------
-- Size conserving transformations (reordering, buffering, etc.)
-------------------------------------------------------------------------------
{-# INLINE reverse #-}
reverse :: MonadIO m => Int -> SerialT m Int -> m ()
reverse n = composeN n S.reverse
{-# INLINE reverse' #-}
reverse' :: MonadIO m => Int -> SerialT m Int -> m ()
reverse' n = composeN n S.reverse'
o_n_heap_buffering :: Int -> [Benchmark]
o_n_heap_buffering value =
[ bgroup "buffered"
[
-- Reversing a stream
benchIOSink value "reverse" (reverse 1)
, benchIOSink value "reverse'" (reverse' 1)
#ifdef USE_PRELUDE
, benchIOSink value "mkAsync" (mkAsync fromSerial)
#endif
]
]
-------------------------------------------------------------------------------
-- Grouping/Splitting
-------------------------------------------------------------------------------
#ifdef USE_PRELUDE
{-# INLINE classifySessionsOf #-}
classifySessionsOf :: MonadAsync m => (Int -> Int) -> SerialT m Int -> m ()
classifySessionsOf getKey =
Common.drain
. S.classifySessionsOf
(const (return False)) 3 (FL.take 10 FL.sum)
. S.timestamped
. fmap (\x -> (getKey x, x))
{-# INLINE classifySessionsOfHash #-}
classifySessionsOfHash :: MonadAsync m =>
(Int -> Int) -> SerialT m Int -> m ()
classifySessionsOfHash getKey =
Common.drain
. S.classifySessionsByGeneric
(Proxy :: Proxy (HashMap k))
1 False (const (return False)) 3 (FL.take 10 FL.sum)
. S.timestamped
. fmap (\x -> (getKey x, x))
o_n_space_grouping :: Int -> [Benchmark]
o_n_space_grouping value =
-- Buffering operations using heap proportional to group/window sizes.
[ bgroup "grouping"
[ benchIOSink value "classifySessionsOf (10000 buckets)"
(classifySessionsOf (getKey 10000))
, benchIOSink value "classifySessionsOf (64 buckets)"
(classifySessionsOf (getKey 64))
, benchIOSink value "classifySessionsOfHash (10000 buckets)"
(classifySessionsOfHash (getKey 10000))
, benchIOSink value "classifySessionsOfHash (64 buckets)"
(classifySessionsOfHash (getKey 64))
]
]
where
getKey n = (`mod` n)
#endif
-------------------------------------------------------------------------------
-- Mixed Transformation
-------------------------------------------------------------------------------
{-# INLINE scanMap #-}
scanMap :: MonadIO m => Int -> SerialT m Int -> m ()
scanMap n = composeN n $ fmap (subtract 1) . Common.scanl' (+) 0
{-# INLINE dropMap #-}
dropMap :: MonadIO m => Int -> SerialT m Int -> m ()
dropMap n = composeN n $ fmap (subtract 1) . S.drop 1
{-# INLINE dropScan #-}
dropScan :: MonadIO m => Int -> SerialT m Int -> m ()
dropScan n = composeN n $ Common.scanl' (+) 0 . S.drop 1
{-# INLINE takeDrop #-}
takeDrop :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
takeDrop value n = composeN n $ S.drop 1 . S.take (value + 1)
{-# INLINE takeScan #-}
takeScan :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
takeScan value n = composeN n $ Common.scanl' (+) 0 . S.take (value + 1)
{-# INLINE takeMap #-}
takeMap :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
takeMap value n = composeN n $ fmap (subtract 1) . S.take (value + 1)
{-# INLINE filterDrop #-}
filterDrop :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
filterDrop value n = composeN n $ S.drop 1 . S.filter (<= (value + 1))
{-# INLINE filterTake #-}
filterTake :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
filterTake value n = composeN n $ S.take (value + 1) . S.filter (<= (value + 1))
{-# INLINE filterScan #-}
filterScan :: MonadIO m => Int -> SerialT m Int -> m ()
filterScan n = composeN n $ Common.scanl' (+) 0 . S.filter (<= maxBound)
#ifdef USE_PRELUDE
{-# INLINE filterScanl1 #-}
filterScanl1 :: MonadIO m => Int -> SerialT m Int -> m ()
filterScanl1 n = composeN n $ S.scanl1' (+) . S.filter (<= maxBound)
#endif
{-# INLINE filterMap #-}
filterMap :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
filterMap value n = composeN n $ fmap (subtract 1) . S.filter (<= (value + 1))
-------------------------------------------------------------------------------
-- Scan and fold
-------------------------------------------------------------------------------
data Pair a b =
Pair !a !b
deriving (Generic, NFData)
{-# INLINE sumProductFold #-}
sumProductFold :: Monad m => SerialT m Int -> m (Int, Int)
sumProductFold = Common.foldl' (\(s, p) x -> (s + x, p * x)) (0, 1)
{-# INLINE sumProductScan #-}
sumProductScan :: Monad m => SerialT m Int -> m (Pair Int Int)
sumProductScan =
Common.foldl' (\(Pair _ p) (s0, x) -> Pair s0 (p * x)) (Pair 0 1) .
Common.scanl' (\(s, _) x -> (s + x, x)) (0, 0)
{-# INLINE foldl'ReduceMap #-}
foldl'ReduceMap :: Monad m => SerialT m Int -> m Int
foldl'ReduceMap = fmap (+ 1) . Common.foldl' (+) 0
o_1_space_transformations_mixed :: Int -> [Benchmark]
o_1_space_transformations_mixed value =
-- scanl-map and foldl-map are equivalent to the scan and fold in the foldl
-- library. If scan/fold followed by a map is efficient enough we may not
-- need monolithic implementations of these.
[ bgroup "mixed"
[ benchIOSink value "scanl-map" (scanMap 1)
, benchIOSink value "foldl-map" foldl'ReduceMap
, benchIOSink value "sum-product-fold" sumProductFold
, benchIOSink value "sum-product-scan" sumProductScan
]
]
o_1_space_transformations_mixedX4 :: Int -> [Benchmark]
o_1_space_transformations_mixedX4 value =
[ bgroup "mixedX4"
[ benchIOSink value "scan-map" (scanMap 4)
, benchIOSink value "drop-map" (dropMap 4)
, benchIOSink value "drop-scan" (dropScan 4)
, benchIOSink value "take-drop" (takeDrop value 4)
, benchIOSink value "take-scan" (takeScan value 4)
, benchIOSink value "take-map" (takeMap value 4)
, benchIOSink value "filter-drop" (filterDrop value 4)
, benchIOSink value "filter-take" (filterTake value 4)
, benchIOSink value "filter-scan" (filterScan 4)
#ifdef USE_PRELUDE
, benchIOSink value "filter-scanl1" (filterScanl1 4)
#endif
, benchIOSink value "filter-map" (filterMap value 4)
]
]
-------------------------------------------------------------------------------
-- Iterating a transformation over and over again
-------------------------------------------------------------------------------
-- this is quadratic
{-# INLINE iterateScan #-}
iterateScan :: MonadAsync m => Int -> Int -> Int -> SerialT m Int
iterateScan = iterateSource (Common.scanl' (+) 0)
#ifdef USE_PRELUDE
-- this is quadratic
{-# INLINE iterateScanl1 #-}
iterateScanl1 :: MonadAsync m => Int -> Int -> Int -> SerialT m Int
iterateScanl1 = iterateSource (S.scanl1' (+))
#endif
{-# INLINE iterateMapM #-}
iterateMapM :: MonadAsync m => Int -> Int -> Int -> SerialT m Int
iterateMapM = iterateSource (S.mapM return)
{-# INLINE iterateFilterEven #-}
iterateFilterEven :: MonadAsync m => Int -> Int -> Int -> SerialT m Int
iterateFilterEven = iterateSource (S.filter even)
{-# INLINE iterateTakeAll #-}
iterateTakeAll :: MonadAsync m => Int -> Int -> Int -> Int -> SerialT m Int
iterateTakeAll value = iterateSource (S.take (value + 1))
{-# INLINE iterateDropOne #-}
iterateDropOne :: MonadAsync m => Int -> Int -> Int -> SerialT m Int
iterateDropOne = iterateSource (S.drop 1)
{-# INLINE iterateDropWhileFalse #-}
iterateDropWhileFalse :: MonadAsync m
=> Int -> Int -> Int -> Int -> SerialT m Int
iterateDropWhileFalse value = iterateSource (S.dropWhile (> (value + 1)))
{-# INLINE iterateDropWhileTrue #-}
iterateDropWhileTrue :: MonadAsync m
=> Int -> Int -> Int -> Int -> SerialT m Int
iterateDropWhileTrue value = iterateSource (S.dropWhile (<= (value + 1)))
#ifdef USE_PRELUDE
{-# INLINE tail #-}
tail :: Monad m => SerialT m a -> m ()
tail s = S.tail s >>= mapM_ tail
{-# INLINE nullHeadTail #-}
nullHeadTail :: Monad m => SerialT m Int -> m ()
nullHeadTail s = do
r <- S.null s
when (not r) $ do
_ <- S.head s
S.tail s >>= mapM_ nullHeadTail
#endif
-- Head recursive operations.
o_n_stack_iterated :: Int -> [Benchmark]
o_n_stack_iterated value = by10 `seq` by100 `seq`
[ bgroup "iterated"
[ benchIOSrc "mapM (n/10 x 10)" $ iterateMapM by10 10
, benchIOSrc "scanl' (quadratic) (n/100 x 100)" $
iterateScan by100 100
#ifdef USE_PRELUDE
, benchIOSrc "scanl1' (n/10 x 10)" $ iterateScanl1 by10 10
#endif
, benchIOSrc "filterEven (n/10 x 10)" $
iterateFilterEven by10 10
, benchIOSrc "takeAll (n/10 x 10)" $
iterateTakeAll value by10 10
, benchIOSrc "dropOne (n/10 x 10)" $ iterateDropOne by10 10
, benchIOSrc "dropWhileFalse (n/10 x 10)" $
iterateDropWhileFalse value by10 10
, benchIOSrc "dropWhileTrue (n/10 x 10)" $
iterateDropWhileTrue value by10 10
#ifdef USE_PRELUDE
, benchIOSink value "tail" tail
, benchIOSink value "nullHeadTail" nullHeadTail
#endif
]
]
where
by10 = value `div` 10
by100 = value `div` 100
-------------------------------------------------------------------------------
-- Pipes
-------------------------------------------------------------------------------
o_1_space_pipes :: Int -> [Benchmark]
o_1_space_pipes value =
[ bgroup "pipes"
[ benchIOSink value "mapM" (transformMapM 1)
, benchIOSink value "compose" (transformComposeMapM 1)
, benchIOSink value "tee" (transformTeeMapM 1)
#ifdef DEVBUILD
-- XXX this take 1 GB memory to compile
, benchIOSink value "zip" (transformZipMapM 1)
#endif
]
]
o_1_space_pipesX4 :: Int -> [Benchmark]
o_1_space_pipesX4 value =
[ bgroup "pipesX4"
[ benchIOSink value "mapM" (transformMapM 4)
, benchIOSink value "compose" (transformComposeMapM 4)
, benchIOSink value "tee" (transformTeeMapM 4)
#ifdef DEVBUILD
-- XXX this take 1 GB memory to compile
, benchIOSink value "zip" (transformZipMapM 4)
#endif
]
]
-------------------------------------------------------------------------------
-- Main
-------------------------------------------------------------------------------
-- In addition to gauge options, the number of elements in the stream can be
-- passed using the --stream-size option.
--
benchmarks :: String -> Int -> [Benchmark]
benchmarks moduleName size =
[ bgroup (o_1_space_prefix moduleName) $ o_1_space_grouping size
]
[ bgroup (o_1_space_prefix moduleName) $ Prelude.concat
[ o_1_space_grouping size
, o_1_space_transformations_mixed size
, o_1_space_transformations_mixedX4 size
-- pipes
, o_1_space_pipes size
, o_1_space_pipesX4 size
]
, bgroup (o_n_stack_prefix moduleName) (o_n_stack_iterated size)
, bgroup (o_n_heap_prefix moduleName) $ Prelude.concat
[
#ifdef USE_PRELUDE
o_n_space_grouping size
,
#endif
o_n_space_functor size
, o_n_heap_buffering size
]
]

View File

@ -1,5 +1,5 @@
-- |
-- Module : Stream.Transformation
-- Module : Stream.Transform
-- Copyright : (c) 2018 Composewell Technologies
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
@ -18,36 +18,37 @@
{-# OPTIONS_GHC -fplugin Test.Inspection.Plugin #-}
#endif
module Stream.Transformation (benchmarks) where
module Stream.Transform (benchmarks) where
import Control.DeepSeq (NFData(..))
import Control.Monad.IO.Class (MonadIO(..))
import Data.Functor.Identity (Identity)
import System.Random (randomRIO)
import qualified Streamly.Internal.Data.Fold as FL
#ifdef USE_PRELUDE
import Streamly.Prelude (fromSerial, MonadAsync)
import Streamly.Benchmark.Prelude
import Streamly.Internal.Data.Time.Units
import qualified Streamly.Benchmark.Prelude as BP
import qualified Streamly.Prelude as S
import qualified Streamly.Internal.Data.Stream.IsStream as Internal
import qualified Streamly.Internal.Data.Unfold as Unfold
#else
import Control.DeepSeq (NFData(..))
import Data.Functor.Identity (Identity)
import Stream.Common
import qualified Streamly.Internal.Data.Stream as Stream
import qualified Streamly.Internal.Data.Stream as S
import qualified Streamly.Internal.Data.Stream as Internal
import qualified Prelude
import qualified Stream.Common as Common
import qualified Streamly.Internal.Data.Unfold as Unfold
#ifdef USE_PRELUDE
import Streamly.Benchmark.Prelude hiding
( benchIOSrc, sourceUnfoldrM, apDiscardFst, apDiscardSnd, apLiftA2, toNullAp
, monadThen, toNullM, toNullM3, filterAllInM, filterAllOutM, filterSome
, breakAfterSome, toListM, toListSome, transformMapM, transformComposeMapM
, transformTeeMapM, transformZipMapM, mapN, mapM)
import qualified Streamly.Internal.Data.Stream.IsStream as Stream
import Streamly.Internal.Data.Time.Units
#else
import Streamly.Benchmark.Prelude (benchIO)
import qualified Streamly.Internal.Data.Stream as Stream
#endif
import Gauge
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Stream.Common hiding (scanl')
import Streamly.Benchmark.Common
import Prelude hiding (sequence, mapM, fmap)
import Prelude hiding (sequence, mapM)
-------------------------------------------------------------------------------
-- Pipelines (stream-to-stream transformations)
@ -60,7 +61,7 @@ import Prelude hiding (sequence, mapM, fmap)
-------------------------------------------------------------------------------
-- Traversable Instance
-------------------------------------------------------------------------------
#ifndef USE_PRELUDE
{-# INLINE traversableTraverse #-}
traversableTraverse :: SerialT Identity Int -> IO (SerialT Identity Int)
traversableTraverse = traverse return
@ -95,111 +96,89 @@ o_n_space_traversable value =
, benchPureSinkIO value "sequence" traversableSequence
]
]
#endif
#ifdef USE_PRELUDE
{-# INLINE composeNG #-}
composeNG ::
(S.IsStream t, Monad m)
=> Int
-> (t m Int -> S.SerialT m Int)
-> t m Int
-> m ()
composeNG = BP.composeN
#else
{-# INLINE composeNG #-}
composeNG ::
(Monad m)
=> Int
-> (SerialT m Int -> SerialT m Int)
-> SerialT m Int
-> m ()
composeNG _n = return $ Stream.fold FL.drain
#endif
-------------------------------------------------------------------------------
-- maps and scans
-------------------------------------------------------------------------------
{-# INLINE scan #-}
scan :: MonadIO m => Int -> SerialT m Int -> m ()
scan n = composeNG n $ S.scan FL.sum
{-# INLINE tap #-}
tap :: MonadIO m => Int -> SerialT m Int -> m ()
tap n = composeNG n $ S.tap FL.sum
#ifdef USE_PRELUDE
{-# INLINE scanl' #-}
scanl' :: MonadIO m => Int -> SerialT m Int -> m ()
scanl' n = composeNG n $ S.scanl' (+) 0
scanl' n = composeN n $ Stream.scanl' (+) 0
{-# INLINE scanlM' #-}
scanlM' :: MonadIO m => Int -> SerialT m Int -> m ()
scanlM' n = composeNG n $ S.scanlM' (\b a -> return $ b + a) (return 0)
scanlM' n = composeN n $ Stream.scanlM' (\b a -> return $ b + a) (return 0)
{-# INLINE scanl1' #-}
scanl1' :: MonadIO m => Int -> SerialT m Int -> m ()
scanl1' n = composeNG n $ S.scanl1' (+)
scanl1' n = composeN n $ Stream.scanl1' (+)
{-# INLINE scanl1M' #-}
scanl1M' :: MonadIO m => Int -> SerialT m Int -> m ()
scanl1M' n = composeNG n $ S.scanl1M' (\b a -> return $ b + a)
scanl1M' n = composeN n $ Stream.scanl1M' (\b a -> return $ b + a)
#endif
{-# INLINE scan #-}
scan :: MonadIO m => Int -> SerialT m Int -> m ()
scan n = composeN n $ Stream.scan FL.sum
#ifdef USE_PRELUDE
{-# INLINE postscanl' #-}
postscanl' :: MonadIO m => Int -> SerialT m Int -> m ()
postscanl' n = composeNG n $ S.postscanl' (+) 0
postscanl' n = composeN n $ Stream.postscanl' (+) 0
{-# INLINE postscanlM' #-}
postscanlM' :: MonadIO m => Int -> SerialT m Int -> m ()
postscanlM' n = composeNG n $ S.postscanlM' (\b a -> return $ b + a) (return 0)
postscanlM' n = composeN n $ Stream.postscanlM' (\b a -> return $ b + a) (return 0)
#endif
{-# INLINE postscan #-}
postscan :: MonadIO m => Int -> SerialT m Int -> m ()
postscan n = composeNG n $ S.postscan FL.sum
postscan n = composeN n $ Stream.postscan FL.sum
{-# INLINE sequence #-}
sequence ::
(S.IsStream t, S.MonadAsync m)
=> (t m Int -> S.SerialT m Int)
-> t m (m Int)
-> m ()
sequence t = S.drain . t . S.sequence
sequence :: MonadAsync m => SerialT m (m Int) -> m ()
sequence = Common.drain . Stream.sequence
{-# INLINE tap #-}
tap :: MonadIO m => Int -> SerialT m Int -> m ()
tap n = composeN n $ Stream.tap FL.sum
#ifdef USE_PRELUDE
{-# INLINE pollCounts #-}
pollCounts :: Int -> SerialT IO Int -> IO ()
pollCounts n =
composeN n (Internal.pollCounts (const True) f)
composeN n (Stream.pollCounts (const True) f)
where
f = S.drain . Internal.rollingMap2 (-) . Internal.delayPost 1
f = Stream.drain . Stream.rollingMap2 (-) . Stream.delayPost 1
{-# INLINE timestamped #-}
timestamped :: (S.MonadAsync m) => SerialT m Int -> m ()
timestamped = S.drain . Internal.timestamped
{-# INLINE trace #-}
trace :: MonadAsync m => Int -> SerialT m Int -> m ()
trace n = composeNG n $ Internal.trace return
timestamped :: (MonadAsync m) => SerialT m Int -> m ()
timestamped = Stream.drain . Stream.timestamped
#endif
{-# INLINE foldrS #-}
foldrS :: MonadIO m => Int -> SerialT m Int -> m ()
foldrS n = composeNG n $ Internal.foldrS S.cons S.nil
foldrS n = composeN n $ Stream.foldrS Stream.cons Stream.nil
{-# INLINE foldrSMap #-}
foldrSMap :: MonadIO m => Int -> SerialT m Int -> m ()
foldrSMap n = composeNG n $ Internal.foldrS (\x xs -> x + 1 `S.cons` xs) S.nil
foldrSMap n = composeN n $ Stream.foldrS (\x xs -> x + 1 `Stream.cons` xs) Stream.nil
{-# INLINE foldrT #-}
foldrT :: MonadIO m => Int -> SerialT m Int -> m ()
foldrT n = composeNG n $ Internal.foldrT S.cons S.nil
foldrT n = composeN n $ Stream.foldrT Stream.cons Stream.nil
{-# INLINE foldrTMap #-}
foldrTMap :: MonadIO m => Int -> SerialT m Int -> m ()
foldrTMap n = composeNG n $ Internal.foldrT (\x xs -> x + 1 `S.cons` xs) S.nil
foldrTMap n = composeN n $ Stream.foldrT (\x xs -> x + 1 `Stream.cons` xs) Stream.nil
{-# INLINE trace #-}
trace :: MonadAsync m => Int -> SerialT m Int -> m ()
trace n = composeN n $ Stream.trace return
o_1_space_mapping :: Int -> [Benchmark]
o_1_space_mapping value =
@ -211,13 +190,14 @@ o_1_space_mapping value =
, benchIOSink value "foldrSMap" (foldrSMap 1)
, benchIOSink value "foldrT" (foldrT 1)
, benchIOSink value "foldrTMap" (foldrTMap 1)
#ifdef USE_PRELUDE
-- Mapping
, benchIOSink value "map" (mapN fromSerial 1)
, bench "sequence" $ nfIO $ randomRIO (1, 1000) >>= \n ->
sequence fromSerial (sourceUnfoldrAction value n)
, benchIOSink value "mapM" (mapM fromSerial 1)
-- Mapping
, benchIOSink value "map" (mapN 1)
, bench "sequence" $ nfIO $ randomRIO (1, 1000) >>= \n ->
sequence (sourceUnfoldrAction value n)
, benchIOSink value "mapM" (mapM 1)
, benchIOSink value "tap" (tap 1)
#ifdef USE_PRELUDE
, benchIOSink value "pollCounts 1 second" (pollCounts 1)
, benchIOSink value "timestamped" timestamped
@ -228,48 +208,46 @@ o_1_space_mapping value =
, benchIOSink value "scanl1M'" (scanl1M' 1)
, benchIOSink value "postscanl'" (postscanl' 1)
, benchIOSink value "postscanlM'" (postscanlM' 1)
, benchIOSink value "postscan" (postscan 1)
#endif
, benchIOSink value "scan" (scan 1)
, benchIOSink value "tap" (tap 1)
, benchIOSink value "postscan" (postscan 1)
]
]
#ifdef USE_PRELUDE
o_1_space_mappingX4 :: Int -> [Benchmark]
o_1_space_mappingX4 value =
[ bgroup "mappingX4"
[ benchIOSink value "map" (mapN fromSerial 4)
, benchIOSink value "mapM" (mapM fromSerial 4)
[ benchIOSink value "map" (mapN 4)
, benchIOSink value "mapM" (mapM 4)
, benchIOSink value "trace" (trace 4)
#ifdef USE_PRELUDE
, benchIOSink value "scanl'" (scanl' 4)
, benchIOSink value "scanl1'" (scanl1' 4)
, benchIOSink value "scanlM'" (scanlM' 4)
, benchIOSink value "scanl1M'" (scanl1M' 4)
, benchIOSink value "postscanl'" (postscanl' 4)
, benchIOSink value "postscanlM'" (postscanlM' 4)
#endif
]
]
{-# INLINE sieveScan #-}
sieveScan :: Monad m => SerialT m Int -> SerialT m Int
sieveScan =
S.mapMaybe snd
. S.scanlM' (\(primes, _) n -> do
Stream.mapMaybe snd
. Stream.scan (FL.foldlM' (\(primes, _) n -> do
return $
let ps = takeWhile (\p -> p * p <= n) primes
in if all (\p -> n `mod` p /= 0) ps
then (primes ++ [n], Just n)
else (primes, Nothing)) (return ([2], Just 2))
else (primes, Nothing)) (return ([2], Just 2)))
o_n_space_mapping :: Int -> [Benchmark]
o_n_space_mapping value =
[ bgroup "mapping"
[ benchIO "naive prime sieve"
(\n -> S.sum $ sieveScan $ S.enumerateFromTo 2 (value + n))
(\n -> Stream.fold FL.sum $ sieveScan $ Common.enumerateFromTo 2 (value + n))
]
]
@ -280,64 +258,59 @@ o_n_space_mapping value =
o_1_space_functor :: Int -> [Benchmark]
o_1_space_functor value =
[ bgroup "Functor"
[ benchIOSink value "fmap" (fmapN fromSerial 1)
, benchIOSink value "fmap x 4" (fmapN fromSerial 4)
[ benchIOSink value "fmap" (mapN 1)
, benchIOSink value "fmap x 4" (mapN 4)
]
]
#else
{-# INLINE foldFilterEven #-}
foldFilterEven :: MonadIO m => SerialT m Int -> m ()
foldFilterEven = Stream.fold FL.drain . Stream.foldFilter (FL.satisfy even)
#endif
-------------------------------------------------------------------------------
-- Size reducing transformations (filtering)
-------------------------------------------------------------------------------
{-# INLINE filterEven #-}
filterEven :: MonadIO m => Int -> SerialT m Int -> m ()
filterEven n = composeNG n $ S.filter even
filterEven n = composeN n $ Stream.filter even
{-# INLINE filterAllOut #-}
filterAllOut :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
filterAllOut value n = composeNG n $ S.filter (> (value + 1))
filterAllOut value n = composeN n $ Stream.filter (> (value + 1))
{-# INLINE filterAllIn #-}
filterAllIn :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
filterAllIn value n = composeNG n $ S.filter (<= (value + 1))
filterAllIn value n = composeN n $ Stream.filter (<= (value + 1))
{-# INLINE filterMEven #-}
filterMEven :: MonadIO m => Int -> SerialT m Int -> m ()
filterMEven n = composeNG n $ S.filterM (return . even)
filterMEven n = composeN n $ Stream.filterM (return . even)
{-# INLINE filterMAllOut #-}
filterMAllOut :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
filterMAllOut value n = composeNG n $ S.filterM (\x -> return $ x > (value + 1))
filterMAllOut value n = composeN n $ Stream.filterM (\x -> return $ x > (value + 1))
{-# INLINE filterMAllIn #-}
filterMAllIn :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
filterMAllIn value n = composeNG n $ S.filterM (\x -> return $ x <= (value + 1))
filterMAllIn value n = composeN n $ Stream.filterM (\x -> return $ x <= (value + 1))
{-# INLINE _takeOne #-}
_takeOne :: MonadIO m => Int -> SerialT m Int -> m ()
_takeOne n = composeNG n $ S.take 1
_takeOne n = composeN n $ Stream.take 1
{-# INLINE takeAll #-}
takeAll :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
takeAll value n = composeNG n $ S.take (value + 1)
takeAll value n = composeN n $ Stream.take (value + 1)
{-# INLINE takeWhileTrue #-}
takeWhileTrue :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
takeWhileTrue value n = composeNG n $ S.takeWhile (<= (value + 1))
takeWhileTrue value n = composeN n $ Stream.takeWhile (<= (value + 1))
{-# INLINE takeWhileMTrue #-}
takeWhileMTrue :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
takeWhileMTrue value n = composeNG n $ S.takeWhileM (return . (<= (value + 1)))
takeWhileMTrue value n = composeN n $ Stream.takeWhileM (return . (<= (value + 1)))
#ifdef USE_PRELUDE
{-# INLINE takeInterval #-}
takeInterval :: NanoSecond64 -> Int -> SerialT IO Int -> IO ()
takeInterval i n = composeN n (Internal.takeInterval i)
takeInterval i n = composeN n (Stream.takeInterval i)
#ifdef INSPECTION
-- inspect $ hasNoType 'takeInterval ''SPEC
@ -348,33 +321,33 @@ inspect $ hasNoTypeClasses 'takeInterval
{-# INLINE dropOne #-}
dropOne :: MonadIO m => Int -> SerialT m Int -> m ()
dropOne n = composeNG n $ S.drop 1
dropOne n = composeN n $ Stream.drop 1
{-# INLINE dropAll #-}
dropAll :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
dropAll value n = composeNG n $ S.drop (value + 1)
dropAll value n = composeN n $ Stream.drop (value + 1)
{-# INLINE dropWhileTrue #-}
dropWhileTrue :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
dropWhileTrue value n = composeNG n $ S.dropWhile (<= (value + 1))
dropWhileTrue value n = composeN n $ Stream.dropWhile (<= (value + 1))
{-# INLINE dropWhileMTrue #-}
dropWhileMTrue :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
dropWhileMTrue value n = composeNG n $ S.dropWhileM (return . (<= (value + 1)))
dropWhileMTrue value n = composeN n $ Stream.dropWhileM (return . (<= (value + 1)))
{-# INLINE dropWhileFalse #-}
dropWhileFalse :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
dropWhileFalse value n = composeNG n $ S.dropWhile (> (value + 1))
dropWhileFalse value n = composeN n $ Stream.dropWhile (> (value + 1))
#ifdef USE_PRELUDE
-- XXX Decide on the time interval
{-# INLINE _intervalsOfSum #-}
_intervalsOfSum :: MonadAsync m => Double -> Int -> SerialT m Int -> m ()
_intervalsOfSum i n = composeN n (S.intervalsOf i FL.sum)
_intervalsOfSum i n = composeN n (Stream.intervalsOf i FL.sum)
{-# INLINE dropInterval #-}
dropInterval :: NanoSecond64 -> Int -> SerialT IO Int -> IO ()
dropInterval i n = composeN n (Internal.dropInterval i)
dropInterval i n = composeN n (Stream.dropInterval i)
#ifdef INSPECTION
inspect $ hasNoTypeClasses 'dropInterval
@ -384,42 +357,40 @@ inspect $ hasNoTypeClasses 'dropInterval
{-# INLINE findIndices #-}
findIndices :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
findIndices value n = composeNG n $ S.findIndices (== (value + 1))
findIndices value n = composeN n $ Stream.findIndices (== (value + 1))
{-# INLINE elemIndices #-}
elemIndices :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
elemIndices value n = composeNG n $ S.elemIndices (value + 1)
elemIndices value n = composeN n $ Stream.elemIndices (value + 1)
{-# INLINE deleteBy #-}
deleteBy :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
deleteBy value n = composeNG n $ S.deleteBy (>=) (value + 1)
deleteBy value n = composeN n $ Stream.deleteBy (>=) (value + 1)
-- uniq . uniq == uniq, composeN 2 ~ composeN 1
{-# INLINE uniq #-}
uniq :: MonadIO m => Int -> SerialT m Int -> m ()
uniq n = composeNG n S.uniq
uniq n = composeN n Stream.uniq
#ifdef USE_PRELUDE
{-# INLINE mapMaybe #-}
mapMaybe :: MonadIO m => Int -> SerialT m Int -> m ()
mapMaybe n =
composeNG n $
S.mapMaybe
composeN n $
Stream.mapMaybe
(\x ->
if odd x
then Nothing
else Just x)
{-# INLINE mapMaybeM #-}
mapMaybeM :: S.MonadAsync m => Int -> SerialT m Int -> m ()
mapMaybeM :: MonadAsync m => Int -> SerialT m Int -> m ()
mapMaybeM n =
composeNG n $
S.mapMaybeM
composeN n $
Stream.mapMaybeM
(\x ->
if odd x
then return Nothing
else return $ Just x)
#endif
o_1_space_filtering :: Int -> [Benchmark]
o_1_space_filtering value =
@ -431,9 +402,6 @@ o_1_space_filtering value =
, benchIOSink value "filterM-even" (filterMEven 1)
, benchIOSink value "filterM-all-out" (filterMAllOut value 1)
, benchIOSink value "filterM-all-in" (filterMAllIn value 1)
#ifndef USE_PRELUDE
, benchIOSink value "foldFilter-even" foldFilterEven
#endif
-- Trimming
, benchIOSink value "take-all" (takeAll value 1)
@ -460,18 +428,17 @@ o_1_space_filtering value =
, benchIOSink value "deleteBy" (deleteBy value 1)
, benchIOSink value "uniq" (uniq 1)
#ifdef USE_PRELUDE
-- Map and filter
, benchIOSink value "mapMaybe" (mapMaybe 1)
, benchIOSink value "mapMaybeM" (mapMaybeM 1)
#endif
-- Searching (stateful map and filter)
, benchIOSink value "findIndices" (findIndices value 1)
, benchIOSink value "elemIndices" (elemIndices value 1)
]
]
o_1_space_filteringX4 :: Int -> [Benchmark]
o_1_space_filteringX4 value =
[ bgroup "filteringX4"
@ -483,8 +450,6 @@ o_1_space_filteringX4 value =
, benchIOSink value "filterM-all-out" (filterMAllOut value 4)
, benchIOSink value "filterM-all-in" (filterMAllIn value 4)
--, benchIOSink value "foldFilter-even" (foldFilterEven 4)
-- trimming
, benchIOSink value "take-all" (takeAll value 4)
, benchIOSink value "takeWhile-true" (takeWhileTrue value 4)
@ -501,11 +466,10 @@ o_1_space_filteringX4 value =
, benchIOSink value "uniq" (uniq 4)
#ifdef USE_PRELUDE
-- map and filter
, benchIOSink value "mapMaybe" (mapMaybe 4)
, benchIOSink value "mapMaybeM" (mapMaybeM 4)
#endif
-- searching
, benchIOSink value "findIndices" (findIndices value 4)
, benchIOSink value "elemIndices" (elemIndices value 4)
@ -515,29 +479,28 @@ o_1_space_filteringX4 value =
-------------------------------------------------------------------------------
-- Size increasing transformations (insertions)
-------------------------------------------------------------------------------
#ifdef USE_PRELUDE
{-# INLINE intersperse #-}
intersperse :: S.MonadAsync m => Int -> Int -> SerialT m Int -> m ()
intersperse value n = composeNG n $ S.intersperse (value + 1)
intersperse :: MonadAsync m => Int -> Int -> SerialT m Int -> m ()
intersperse value n = composeN n $ Stream.intersperse (value + 1)
{-# INLINE intersperseM #-}
intersperseM :: S.MonadAsync m => Int -> Int -> SerialT m Int -> m ()
intersperseM value n = composeNG n $ S.intersperseM (return $ value + 1)
{-# INLINE interposeSuffix #-}
interposeSuffix :: S.MonadAsync m => Int -> Int -> SerialT m Int -> m ()
interposeSuffix value n =
composeNG n $ Internal.interposeSuffix (value + 1) Unfold.identity
{-# INLINE intercalateSuffix #-}
intercalateSuffix :: S.MonadAsync m => Int -> Int -> SerialT m Int -> m ()
intercalateSuffix value n =
composeNG n $ Internal.intercalateSuffix Unfold.identity (value + 1)
intersperseM :: MonadAsync m => Int -> Int -> SerialT m Int -> m ()
intersperseM value n = composeN n $ Stream.intersperseM (return $ value + 1)
{-# INLINE insertBy #-}
insertBy :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
insertBy value n = composeNG n $ S.insertBy compare (value + 1)
insertBy value n = composeN n $ Stream.insertBy compare (value + 1)
{-# INLINE interposeSuffix #-}
interposeSuffix :: Monad m => Int -> Int -> SerialT m Int -> m ()
interposeSuffix value n =
composeN n $ Stream.interposeSuffix (value + 1) Unfold.identity
{-# INLINE intercalateSuffix #-}
intercalateSuffix :: Monad m => Int -> Int -> SerialT m Int -> m ()
intercalateSuffix value n =
composeN n $ Stream.intercalateSuffix Unfold.identity (value + 1)
o_1_space_inserting :: Int -> [Benchmark]
o_1_space_inserting value =
@ -554,21 +517,21 @@ o_1_space_insertingX4 :: Int -> [Benchmark]
o_1_space_insertingX4 value =
[ bgroup "insertingX4"
[ benchIOSink value "intersperse" (intersperse value 4)
-- , benchIOSink value "insertBy" (insertBy value 4)
, benchIOSink value "insertBy" (insertBy value 4)
]
]
#endif
-------------------------------------------------------------------------------
-- Indexing
-------------------------------------------------------------------------------
#ifdef USE_PRELUDE
{-# INLINE indexed #-}
indexed :: MonadIO m => Int -> SerialT m Int -> m ()
indexed n = composeN n (S.map snd . S.indexed)
indexed n = composeN n (fmap snd . Stream.indexed)
{-# INLINE indexedR #-}
indexedR :: MonadIO m => Int -> Int -> SerialT m Int -> m ()
indexedR value n = composeN n (S.map snd . S.indexedR value)
indexedR value n = composeN n (fmap snd . Stream.indexedR value)
o_1_space_indexing :: Int -> [Benchmark]
o_1_space_indexing value =
@ -586,24 +549,6 @@ o_1_space_indexingX4 value =
]
]
#else
{-# INLINE indexed #-}
indexed :: MonadIO m => SerialT m Int -> m ()
indexed = Stream.fold FL.drain . Prelude.fmap snd . Stream.indexed
{-# INLINE indexedR #-}
indexedR :: MonadIO m => Int -> SerialT m Int -> m ()
indexedR value = Stream.fold FL.drain . (Prelude.fmap snd . Stream.indexedR value)
o_1_space_indexing :: Int -> [Benchmark]
o_1_space_indexing value =
[ bgroup "indexing"
[ benchIOSink value "indexed" indexed
, benchIOSink value "indexedR" (indexedR value)
]
]
#endif
-------------------------------------------------------------------------------
-- Main
-------------------------------------------------------------------------------
@ -614,23 +559,18 @@ o_1_space_indexing value =
benchmarks :: String -> Int -> [Benchmark]
benchmarks moduleName size =
[ bgroup (o_1_space_prefix moduleName) $ Prelude.concat
[ o_1_space_mapping size
, o_1_space_indexing size
[ o_1_space_functor size
, o_1_space_mapping size
, o_1_space_mappingX4 size
, o_1_space_filtering size
, o_1_space_filteringX4 size
#ifdef USE_PRELUDE
, o_1_space_functor size
, o_1_space_mappingX4 size
, o_1_space_inserting size
, o_1_space_insertingX4 size
, o_1_space_indexing size
, o_1_space_indexingX4 size
#endif
]
, bgroup (o_n_space_prefix moduleName) $
#ifdef USE_PRELUDE
o_n_space_mapping size
#else
o_n_space_traversable size
#endif
, bgroup (o_n_space_prefix moduleName) $ Prelude.concat
[ o_n_space_traversable size
, o_n_space_mapping size
]
]

View File

@ -65,9 +65,10 @@ flag bench-core
default: False
flag use-prelude
description: Use Streamly.Prelude instead of Streamly.Data.Stream for serial benchmarks
description: Use Prelude instead of Data.Stream for serial benchmarks
manual: True
default: False
-------------------------------------------------------------------------------
-- Common stanzas
-------------------------------------------------------------------------------
@ -208,6 +209,33 @@ common bench-options-threaded
-- Serial Streams
-------------------------------------------------------------------------------
benchmark Data.Stream
import: bench-options
type: exitcode-stdio-1.0
hs-source-dirs: Streamly/Benchmark/Data
main-is: Stream.hs
other-modules:
Stream.Generate
Stream.Eliminate
Stream.Transform
Stream.Reduce
Stream.Expand
Stream.Exceptions
Stream.Lift
Stream.Common
if flag(use-prelude)
other-modules:
Stream.Split
if flag(bench-core) || impl(ghcjs)
buildable: False
else
buildable: True
if flag(limit-build-mem)
if flag(dev)
ghc-options: +RTS -M3500M -RTS
else
ghc-options: +RTS -M2500M -RTS
benchmark Prelude.WSerial
import: bench-options
type: exitcode-stdio-1.0
@ -387,37 +415,6 @@ benchmark Data.Parser
-------------------------------------------------------------------------------
-- Raw Streams
-------------------------------------------------------------------------------
benchmark Data.Stream
import: bench-options
type: exitcode-stdio-1.0
hs-source-dirs: Streamly/Benchmark/Data
main-is: Stream.hs
other-modules:
Stream.Eliminate
Stream.Exceptions
Stream.Generate
Stream.Lift
Stream.Transformation
if flag(use-prelude)
other-modules:
Stream.NestedFold
Stream.NestedStream
Stream.Split
else
other-modules:
Stream.Common
Stream.Reduce
if flag(bench-core) || impl(ghcjs)
buildable: False
else
buildable: True
build-depends: exceptions >= 0.8 && < 0.11
if flag(limit-build-mem)
if flag(dev)
ghc-options: +RTS -M3500M -RTS
else
ghc-options: +RTS -M2000M -RTS
benchmark Data.Stream.StreamD
import: bench-options

View File

@ -30,7 +30,15 @@ cradle:
component: "bench:Data.Stream.StreamD"
- path: "./benchmark/Streamly/Benchmark/Data/Stream/StreamK.hs"
component: "bench:Data.Stream.StreamK"
- path: "./benchmark/Streamly/Benchmark/Data/Stream/*.hs"
- path: "./benchmark/Streamly/Benchmark/Data/Stream/Common.hs"
component: "bench:Data.Stream"
- path: "./benchmark/Streamly/Benchmark/Data/Stream/Expand.hs"
component: "bench:Data.Stream"
- path: "./benchmark/Streamly/Benchmark/Data/Stream/Generate.hs"
component: "bench:Data.Stream"
- path: "./benchmark/Streamly/Benchmark/Data/Stream/Reduce.hs"
component: "bench:Data.Stream"
- path: "./benchmark/Streamly/Benchmark/Data/Stream/Transform.hs"
component: "bench:Data.Stream"
- path: "./benchmark/Streamly/Benchmark/Data/Unfold.hs"
component: "bench:Data.Unfold"