From bfb1624a57632a44816fcb6f5b4755c6a60c8eba Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Fri, 5 Aug 2022 23:46:05 +0530 Subject: [PATCH] Add more Prelude.Serial benchmarks to Data.Stream And refactor for simplification and code-reuse. --- .github/workflows/regression-check.yml | 2 +- .hlint.yaml | 10 +- benchmark/Streamly/Benchmark/Data/Stream.hs | 20 +- .../Streamly/Benchmark/Data/Stream/Common.hs | 434 +++++++++++++- .../Benchmark/Data/Stream/Eliminate.hs | 30 +- .../Benchmark/Data/Stream/Exceptions.hs | 108 ++-- .../Stream/{NestedStream.hs => Expand.hs} | 158 ++--- .../Benchmark/Data/Stream/Generate.hs | 119 ++-- .../Streamly/Benchmark/Data/Stream/Lift.hs | 92 ++- .../Benchmark/Data/Stream/NestedFold.hs | 488 ---------------- .../Streamly/Benchmark/Data/Stream/Reduce.hs | 549 +++++++++++++++--- .../{Transformation.hs => Transform.hs} | 320 +++++----- benchmark/streamly-benchmarks.cabal | 61 +- hie.yaml | 10 +- 14 files changed, 1324 insertions(+), 1077 deletions(-) rename benchmark/Streamly/Benchmark/Data/Stream/{NestedStream.hs => Expand.hs} (78%) delete mode 100644 benchmark/Streamly/Benchmark/Data/Stream/NestedFold.hs rename benchmark/Streamly/Benchmark/Data/Stream/{Transformation.hs => Transform.hs} (72%) diff --git a/.github/workflows/regression-check.yml b/.github/workflows/regression-check.yml index df4436bb2..8b0fdd887 100644 --- a/.github/workflows/regression-check.yml +++ b/.github/workflows/regression-check.yml @@ -25,11 +25,11 @@ jobs: Data.Parser.ParserD Data.Parser.ParserK Data.SmallArray - Data.Stream Data.Stream.StreamD Data.Stream.StreamDK Data.Stream.StreamK:6 Data.Unfold + Data.Stream FileSystem.Handle Prelude.Ahead Prelude.Async:12 diff --git a/.hlint.yaml b/.hlint.yaml index 4c6bcdeaa..0cddd3572 100644 --- a/.hlint.yaml +++ b/.hlint.yaml @@ -26,11 +26,11 @@ - ignore: {name: "Use fmap"} # Warnings ignored in specific places -- ignore: {name: "Use ++", within: Stream.Transformation} -- ignore: {name: "Use mapM", within: Stream.Transformation} -- ignore: {name: "Use traverse", within: Stream.Transformation} -- ignore: {name: "Redundant <*", within: Stream.NestedStream} -- ignore: {name: "Use ++", within: Stream.NestedStream} +- ignore: {name: "Use ++", within: Stream.Transform} +- ignore: {name: "Use mapM", within: Stream.Transform} +- ignore: {name: "Use traverse", within: Stream.Transform} +- ignore: {name: "Redundant <*", within: Stream.Expand} +- ignore: {name: "Use ++", within: Stream.Reduce} - ignore: {name: "Use ++", within: Stream.Split} - ignore: {name: "Redundant bracket", within: Stream.Split} - ignore: {name: "Use isDigit", within: Streamly.Internal.Unicode.Char.Parser} diff --git a/benchmark/Streamly/Benchmark/Data/Stream.hs b/benchmark/Streamly/Benchmark/Data/Stream.hs index 031ea96c2..66d8b1098 100644 --- a/benchmark/Streamly/Benchmark/Data/Stream.hs +++ b/benchmark/Streamly/Benchmark/Data/Stream.hs @@ -15,15 +15,14 @@ import Streamly.Benchmark.Common.Handle (mkHandleBenchEnv) import qualified Stream.Eliminate as Elimination import qualified Stream.Exceptions as Exceptions +import qualified Stream.Expand as NestedStream import qualified Stream.Generate as Generation import qualified Stream.Lift as Lift -import qualified Stream.Reduce as Reduction -import qualified Stream.Transformation as Transformation +import qualified Stream.Reduce as NestedFold #ifdef USE_PRELUDE -import qualified Stream.NestedFold as NestedFold -import qualified Stream.NestedStream as NestedStream import qualified Stream.Split as Split #endif +import qualified Stream.Transform as Transformation import Streamly.Benchmark.Common @@ -45,15 +44,14 @@ main = do where allBenchmarks env size = Prelude.concat - [ Elimination.benchmarks moduleName size + [ Generation.benchmarks moduleName size + , Elimination.benchmarks moduleName size , Exceptions.benchmarks moduleName env size - , Generation.benchmarks moduleName size - , Lift.benchmarks moduleName size - , Reduction.benchmarks moduleName size - , Transformation.benchmarks moduleName size #ifdef USE_PRELUDE - , NestedFold.benchmarks moduleName size - , NestedStream.benchmarks moduleName size , Split.benchmarks moduleName env #endif + , Transformation.benchmarks moduleName size + , NestedFold.benchmarks moduleName size + , Lift.benchmarks moduleName size + , NestedStream.benchmarks moduleName size ] diff --git a/benchmark/Streamly/Benchmark/Data/Stream/Common.hs b/benchmark/Streamly/Benchmark/Data/Stream/Common.hs index ae975c0e9..6251a40fd 100644 --- a/benchmark/Streamly/Benchmark/Data/Stream/Common.hs +++ b/benchmark/Streamly/Benchmark/Data/Stream/Common.hs @@ -1,3 +1,8 @@ +{-# LANGUAGE ConstraintKinds #-} +{-# LANGUAGE CPP #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE ScopedTypeVariables #-} + -- | -- Module : Stream.Common -- Copyright : (c) 2018 Composewell Technologies @@ -5,40 +10,164 @@ -- Maintainer : streamly@composewell.com module Stream.Common - ( sourceUnfoldr + ( MonadAsync + + -- Generation + , enumerateFromTo + , replicate + , unfoldrM + , fromListM + + , append + , append2 + + -- Elimination + , drain + , foldl' + , scanl' + + -- Benchmark stream generation + , sourceUnfoldr , sourceUnfoldrM , sourceUnfoldrAction + , sourceConcatMapId + , sourceFromFoldable + , sourceFromFoldableM + + -- Benchmark stream elimination , benchIOSink , benchIOSrc + + -- Benchmarking functions + , concatStreamsWith + , concatPairsWith + , apDiscardFst + , apDiscardSnd + , apLiftA2 + , toNullAp + , monadThen + , toNullM + , toNullM3 + , filterAllOutM + , filterAllInM + , filterSome + , breakAfterSome + , toListM + , toListSome + , composeN + , mapN + , mapM + , transformMapM + , transformComposeMapM + , transformTeeMapM + , transformZipMapM ) where -import Streamly.Internal.Data.Stream (Stream, unfold) -import qualified Streamly.Internal.Data.Fold as Fold -import qualified Streamly.Internal.Data.Stream as Stream -import qualified Streamly.Internal.Data.Unfold as UF -import Control.DeepSeq (NFData) -import Gauge -import Prelude hiding (mapM) +import Control.Applicative (liftA2) +import Control.Exception (try) +import GHC.Exception (ErrorCall) +import Streamly.Internal.Data.Stream (Stream) import System.Random (randomRIO) -{-# INLINE toNull #-} -toNull :: Monad m => Stream m a -> m () -toNull = Stream.fold Fold.drain +import qualified Streamly.Internal.Data.Fold as Fold +import qualified Streamly.Internal.Data.Pipe as Pipe + +#ifdef USE_PRELUDE +import Streamly.Prelude (foldl', scanl') +import qualified Streamly.Internal.Data.Stream.IsStream as Stream +import qualified Streamly.Prelude as Stream +import Streamly.Benchmark.Prelude + ( composeN, sourceUnfoldr, sourceUnfoldr, sourceFromFoldable + , sourceFromFoldableM, sourceUnfoldrAction, sourceConcatMapId, benchIOSink + , concatStreamsWith, concatPairsWith + ) +#else +import Control.DeepSeq (NFData) +import Streamly.Internal.Data.Stream (unfold) +import qualified Streamly.Internal.Data.Stream as Stream +import qualified Streamly.Internal.Data.Unfold as Unfold +#endif + +import Gauge +import Prelude hiding (mapM, replicate) + +#ifdef USE_PRELUDE +type MonadAsync m = Stream.MonadAsync m +#else +type MonadAsync = Monad +#endif + +{-# INLINE append #-} +append :: Stream m a -> Stream m a -> Stream m a +#ifdef USE_PRELUDE +append = Stream.serial +#else +append = Stream.append +#endif + +{-# INLINE append2 #-} +append2 :: Monad m => Stream m a -> Stream m a -> Stream m a +#ifdef USE_PRELUDE +append2 = Stream.append +#else +append2 = Stream.append2 +#endif + +{-# INLINE drain #-} +drain :: Monad m => Stream m a -> m () +drain = Stream.fold Fold.drain + +{-# INLINE enumerateFromTo #-} +enumerateFromTo :: Monad m => Int -> Int -> Stream m Int +#ifdef USE_PRELUDE +enumerateFromTo = Stream.enumerateFromTo +#else +enumerateFromTo from to = Stream.unfold Unfold.enumerateFromTo (from, to) +#endif + +{-# INLINE replicate #-} +replicate :: Monad m => Int -> a -> Stream m a +#ifdef USE_PRELUDE +replicate = Stream.replicate +#else +replicate n = Stream.unfold (Unfold.replicateM n) . return +#endif + +{-# INLINE unfoldrM #-} +unfoldrM :: MonadAsync m => (b -> m (Maybe (a, b))) -> b -> Stream m a +#ifdef USE_PRELUDE +unfoldrM = Stream.unfoldrM +#else +unfoldrM step = Stream.unfold (Unfold.unfoldrM step) +#endif + +{-# INLINE fromListM #-} +fromListM :: MonadAsync m => [m a] -> Stream m a +#ifdef USE_PRELUDE +fromListM = Stream.fromListM +#else +fromListM = Stream.unfold Unfold.fromListM +#endif {-# INLINE sourceUnfoldrM #-} -sourceUnfoldrM :: Monad m => Int -> Int -> Stream m Int -sourceUnfoldrM count start = unfold (UF.unfoldrM step) start +sourceUnfoldrM :: MonadAsync m => Int -> Int -> Stream m Int +sourceUnfoldrM count start = unfoldrM step start + where + step cnt = if cnt > start + count then return Nothing else return (Just (cnt, cnt + 1)) +#ifndef USE_PRELUDE {-# INLINE sourceUnfoldr #-} sourceUnfoldr :: Monad m => Int -> Int -> Stream m Int -sourceUnfoldr count start = unfold (UF.unfoldr step) start +sourceUnfoldr count start = unfold (Unfold.unfoldr step) start + where + step cnt = if cnt > start + count then Nothing @@ -46,19 +175,30 @@ sourceUnfoldr count start = unfold (UF.unfoldr step) start {-# INLINE sourceUnfoldrAction #-} sourceUnfoldrAction :: (Monad m1, Monad m) => Int -> Int -> Stream m (m1 Int) -sourceUnfoldrAction value n = unfold (UF.unfoldr step) n +sourceUnfoldrAction value n = unfold (Unfold.unfoldr step) n + where + step cnt = if cnt > n + value then Nothing else Just (return cnt, cnt + 1) +{-# INLINE sourceFromFoldable #-} +sourceFromFoldable :: Int -> Int -> Stream m Int +sourceFromFoldable value n = Stream.fromFoldable [n..n+value] + +{-# INLINE sourceFromFoldableM #-} +sourceFromFoldableM :: Monad m => Int -> Int -> Stream m Int +sourceFromFoldableM value n = Stream.fromFoldableM (fmap return [n..n+value]) + {-# INLINE benchIOSink #-} benchIOSink :: (NFData b) => Int -> String -> (Stream IO Int -> IO b) -> Benchmark benchIOSink value name f = bench name $ nfIO $ randomRIO (1,1) >>= f . sourceUnfoldrM value +#endif -- | Takes a source, and uses it with a default drain/fold method. {-# INLINE benchIOSrc #-} @@ -67,4 +207,266 @@ benchIOSrc -> (Int -> Stream IO a) -> Benchmark benchIOSrc name f = - bench name $ nfIO $ randomRIO (1,1) >>= toNull . f + bench name $ nfIO $ randomRIO (1,1) >>= drain . f + +#ifndef USE_PRELUDE +{-# INLINE concatStreamsWith #-} +concatStreamsWith + :: (Stream IO Int -> Stream IO Int -> Stream IO Int) + -> Int + -> Int + -> Int + -> IO () +concatStreamsWith op outer inner n = + drain $ Stream.concatMapWith op + (sourceUnfoldrM inner) + (sourceUnfoldrM outer n) + +{-# INLINE concatPairsWith #-} +concatPairsWith + :: (Stream IO Int -> Stream IO Int -> Stream IO Int) + -> Int + -> Int + -> Int + -> IO () +concatPairsWith op outer inner n = + drain $ Stream.concatPairsWith op + (sourceUnfoldrM inner) + (sourceUnfoldrM outer n) + +{-# INLINE sourceConcatMapId #-} +sourceConcatMapId :: (Monad m) + => Int -> Int -> Stream m (Stream m Int) +sourceConcatMapId value n = + Stream.fromFoldable $ fmap (Stream.fromEffect . return) [n..n+value] +#endif + +{-# INLINE apDiscardFst #-} +apDiscardFst :: MonadAsync m => + Int -> Int -> m () +apDiscardFst linearCount start = drain $ + sourceUnfoldrM nestedCount2 start + *> sourceUnfoldrM nestedCount2 start + + where + + nestedCount2 = round (fromIntegral linearCount**(1/2::Double)) + +{-# INLINE apDiscardSnd #-} +apDiscardSnd :: MonadAsync m => Int -> Int -> m () +apDiscardSnd linearCount start = drain $ + sourceUnfoldrM nestedCount2 start + <* sourceUnfoldrM nestedCount2 start + + where + + nestedCount2 = round (fromIntegral linearCount**(1/2::Double)) + +{-# INLINE apLiftA2 #-} +apLiftA2 :: MonadAsync m => Int -> Int -> m () +apLiftA2 linearCount start = drain $ + liftA2 (+) (sourceUnfoldrM nestedCount2 start) + (sourceUnfoldrM nestedCount2 start) + + where + + nestedCount2 = round (fromIntegral linearCount**(1/2::Double)) + +{-# INLINE toNullAp #-} +toNullAp :: MonadAsync m => Int -> Int -> m () +toNullAp linearCount start = drain $ + (+) <$> sourceUnfoldrM nestedCount2 start + <*> sourceUnfoldrM nestedCount2 start + + where + + nestedCount2 = round (fromIntegral linearCount**(1/2::Double)) + +{-# INLINE monadThen #-} +monadThen :: MonadAsync m => Int -> Int -> m () +monadThen linearCount start = drain $ do + sourceUnfoldrM nestedCount2 start >> + sourceUnfoldrM nestedCount2 start + + where + + nestedCount2 = round (fromIntegral linearCount**(1/2::Double)) + +{-# INLINE toNullM #-} +toNullM :: MonadAsync m => Int -> Int -> m () +toNullM linearCount start = drain $ do + x <- sourceUnfoldrM nestedCount2 start + y <- sourceUnfoldrM nestedCount2 start + return $ x + y + + where + + nestedCount2 = round (fromIntegral linearCount**(1/2::Double)) + +{-# INLINE toNullM3 #-} +toNullM3 :: MonadAsync m => Int -> Int -> m () +toNullM3 linearCount start = drain $ do + x <- sourceUnfoldrM nestedCount3 start + y <- sourceUnfoldrM nestedCount3 start + z <- sourceUnfoldrM nestedCount3 start + return $ x + y + z + where + nestedCount3 = round (fromIntegral linearCount**(1/3::Double)) + +{-# INLINE filterAllOutM #-} +filterAllOutM :: MonadAsync m => Int -> Int -> m () +filterAllOutM linearCount start = drain $ do + x <- sourceUnfoldrM nestedCount2 start + y <- sourceUnfoldrM nestedCount2 start + let s = x + y + if s < 0 + then return s + else Stream.nil + where + nestedCount2 = round (fromIntegral linearCount**(1/2::Double)) + +{-# INLINE filterAllInM #-} +filterAllInM :: MonadAsync m => Int -> Int -> m () +filterAllInM linearCount start = drain $ do + x <- sourceUnfoldrM nestedCount2 start + y <- sourceUnfoldrM nestedCount2 start + let s = x + y + if s > 0 + then return s + else Stream.nil + where + nestedCount2 = round (fromIntegral linearCount**(1/2::Double)) + +{-# INLINE filterSome #-} +filterSome :: MonadAsync m => Int -> Int -> m () +filterSome linearCount start = drain $ do + x <- sourceUnfoldrM nestedCount2 start + y <- sourceUnfoldrM nestedCount2 start + let s = x + y + if s > 1100000 + then return s + else Stream.nil + where + nestedCount2 = round (fromIntegral linearCount**(1/2::Double)) + +{-# INLINE breakAfterSome #-} +breakAfterSome :: Int -> Int -> IO () +breakAfterSome linearCount start = do + (_ :: Either ErrorCall ()) <- try $ drain $ do + x <- sourceUnfoldrM nestedCount2 start + y <- sourceUnfoldrM nestedCount2 start + let s = x + y + if s > 1100000 + then error "break" + else return s + return () + where + nestedCount2 = round (fromIntegral linearCount**(1/2::Double)) + +{-# INLINE toListM #-} +toListM :: MonadAsync m => Int -> Int -> m [Int] +toListM linearCount start = Stream.fold Fold.toList $ do + x <- sourceUnfoldrM nestedCount2 start + y <- sourceUnfoldrM nestedCount2 start + return $ x + y + where + nestedCount2 = round (fromIntegral linearCount**(1/2::Double)) + +-- Taking a specified number of elements is very expensive in logict so we have +-- a test to measure the same. +{-# INLINE toListSome #-} +toListSome :: MonadAsync m => Int -> Int -> m [Int] +toListSome linearCount start = + Stream.fold Fold.toList $ Stream.take 10000 $ do + x <- sourceUnfoldrM nestedCount2 start + y <- sourceUnfoldrM nestedCount2 start + return $ x + y + where + nestedCount2 = round (fromIntegral linearCount**(1/2::Double)) + +#ifndef USE_PRELUDE +{-# INLINE composeN #-} +composeN :: + (Monad m) + => Int + -> (Stream m Int -> Stream m Int) + -> Stream m Int + -> m () +composeN n f = + case n of + 1 -> drain . f + 2 -> drain . f . f + 3 -> drain . f . f . f + 4 -> drain . f . f . f . f + _ -> undefined +#endif + +{-# INLINE mapN #-} +mapN :: + Monad m + => Int + -> Stream m Int + -> m () +mapN n = composeN n $ fmap (+ 1) + +{-# INLINE mapM #-} +mapM :: + MonadAsync m + => Int + -> Stream m Int + -> m () +mapM n = composeN n $ Stream.mapM return + +#ifndef USE_PRELUDE +foldl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> m b +foldl' f z = Stream.fold (Fold.foldl' f z) + +scanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b +scanl' f z = Stream.scan (Fold.foldl' f z) +#endif + +{-# INLINE transformMapM #-} +transformMapM :: + (Monad m) + => Int + -> Stream m Int + -> m () +transformMapM n = composeN n $ Stream.transform (Pipe.mapM return) + +{-# INLINE transformComposeMapM #-} +transformComposeMapM :: + (Monad m) + => Int + -> Stream m Int + -> m () +transformComposeMapM n = + composeN n $ + Stream.transform + (Pipe.mapM (\x -> return (x + 1)) `Pipe.compose` + Pipe.mapM (\x -> return (x + 2))) + +{-# INLINE transformTeeMapM #-} +transformTeeMapM :: + (Monad m) + => Int + -> Stream m Int + -> m () +transformTeeMapM n = + composeN n $ + Stream.transform + (Pipe.mapM (\x -> return (x + 1)) `Pipe.tee` + Pipe.mapM (\x -> return (x + 2))) + +{-# INLINE transformZipMapM #-} +transformZipMapM :: + (Monad m) + => Int + -> Stream m Int + -> m () +transformZipMapM n = + composeN n $ + Stream.transform + (Pipe.zipWith + (+) + (Pipe.mapM (\x -> return (x + 1))) + (Pipe.mapM (\x -> return (x + 2)))) diff --git a/benchmark/Streamly/Benchmark/Data/Stream/Eliminate.hs b/benchmark/Streamly/Benchmark/Data/Stream/Eliminate.hs index 087d50fb6..647d86610 100644 --- a/benchmark/Streamly/Benchmark/Data/Stream/Eliminate.hs +++ b/benchmark/Streamly/Benchmark/Data/Stream/Eliminate.hs @@ -35,14 +35,13 @@ import qualified Streamly.Internal.Data.Stream.StreamD as D #endif import qualified Streamly.Internal.Data.Fold as Fold #ifdef USE_PRELUDE -import qualified Streamly.Prelude as S import qualified Streamly.Internal.Data.Stream.IsStream as S -import qualified Streamly.Internal.Data.Stream.IsStream as Internal #else import qualified Streamly.Internal.Data.Stream as S #endif import Gauge +-- XXX Replace SerialT with Stream import Streamly.Internal.Data.Stream.Serial (SerialT) #ifdef USE_PRELUDE import Streamly.Prelude (fromSerial) @@ -56,7 +55,6 @@ import Stream.Common ) #endif import Streamly.Benchmark.Common - import Prelude hiding (length, sum, or, and, any, all, notElem, elem, (!!), lookup, repeat, minimum, maximum, product, last, mapM_, init) import qualified Prelude @@ -66,6 +64,7 @@ import qualified Prelude repeat :: (Monad m, S.IsStream t) => Int -> Int -> t m Int repeat count = S.take count . S.repeat #endif + ------------------------------------------------------------------------------- -- Elimination ------------------------------------------------------------------------------- @@ -281,6 +280,7 @@ uncons s = do case r of Nothing -> return () Just (_, t) -> uncons t + #ifdef USE_PRELUDE {-# INLINE init #-} init :: Monad m => SerialT m a -> m () @@ -308,6 +308,7 @@ foldrToStream = S.foldr S.cons S.nil {-# INLINE foldrMBuild #-} foldrMBuild :: Monad m => SerialT m Int -> m [Int] foldrMBuild = S.foldrM (\x xs -> (x :) <$> xs) (return []) + #ifdef USE_PRELUDE {-# INLINE foldl'Reduce #-} foldl'Reduce :: Monad m => SerialT m Int -> m Int @@ -411,12 +412,13 @@ drainWhile = S.drainWhile (const True) {-# INLINE (!!) #-} (!!) :: Monad m => Int -> SerialT m Int -> m (Maybe Int) -(!!) = flip (Internal.!!) +(!!) = flip (S.!!) {-# INLINE lookup #-} lookup :: Monad m => Int -> SerialT m Int -> m (Maybe Int) lookup val = S.lookup val . S.map (\x -> (x, x)) #endif + o_1_space_elimination_folds :: Int -> [Benchmark] o_1_space_elimination_folds value = [ bgroup "elimination" @@ -445,10 +447,10 @@ o_1_space_elimination_folds value = ] , bgroup "Identity" [ benchIdentitySink value "foldrMElem" (foldrMElem value) - , benchPureSink value "foldrMToListLength" - (Prelude.length . runIdentity . foldrMBuild) , benchIdentitySink value "foldrToStreamLength" (S.fold Fold.length . runIdentity . foldrToStream) + , benchPureSink value "foldrMToListLength" + (Prelude.length . runIdentity . foldrMBuild) ] ] @@ -467,7 +469,7 @@ o_1_space_elimination_folds value = , benchIOSink value "mapM_" mapM_ -- this is too fast, causes all benchmarks reported in ns - --, benchIOSink value "head" head + -- , benchIOSink value "head" head , benchIOSink value "last" last , benchIOSink value "length" length , benchIOSink value "sum" sum @@ -476,9 +478,8 @@ o_1_space_elimination_folds value = , benchIOSink value "maximum" maximum , benchIOSink value "minimumBy" minimumBy , benchIOSink value "minimum" minimum -#ifdef USE_PRELUDE + , bench "the" $ nfIO $ randomRIO (1,1) >>= the . repeat value -#endif , benchIOSink value "find" (find value) , benchIOSink value "findM" (findM value) -- , benchIOSink value "lookupFirst" (lookup 1) @@ -487,7 +488,7 @@ o_1_space_elimination_folds value = , benchIOSink value "findIndex" (findIndex value) , benchIOSink value "elemIndex" (elemIndex value) -- this is too fast, causes all benchmarks reported in ns - -- , benchIOSink value "null" S.null + -- , benchIOSink value "null" S.null , benchIOSink value "elem" (elem value) , benchIOSink value "notElem" (notElem value) , benchIOSink value "all" (all value) @@ -504,6 +505,7 @@ o_1_space_elimination_folds value = ------------------------------------------------------------------------------- -- Buffered Transformations by fold ------------------------------------------------------------------------------- + #ifdef USE_PRELUDE {-# INLINE foldl'Build #-} foldl'Build :: Monad m => SerialT m Int -> m [Int] @@ -559,14 +561,15 @@ o_n_space_elimination_foldr value = , benchIOSink value "foldrM/reduce/IO (sum)" foldrMReduce ] ] + #ifdef USE_PRELUDE o_n_heap_elimination_toList :: Int -> [Benchmark] o_n_heap_elimination_toList value = [ bgroup "toList" -- Converting the stream to a list or pure stream in a strict monad - [ benchIOSink value "toListRev" Internal.toListRev + [ benchIOSink value "toListRev" S.toListRev , benchIOSink value "toStreamRev" - (Internal.toStreamRev :: (SerialT IO Int -> IO (SerialT Identity Int))) + (S.toStreamRev :: (SerialT IO Int -> IO (SerialT Identity Int))) ] ] @@ -576,10 +579,11 @@ o_n_space_elimination_toList value = -- Converting the stream to a list or pure stream in a strict monad [ benchIOSink value "toList" S.toList , benchIOSink value "toStream" - (Internal.toStream :: (SerialT IO Int -> IO (SerialT Identity Int))) + (S.toStream :: (SerialT IO Int -> IO (SerialT Identity Int))) ] ] #endif + ------------------------------------------------------------------------------- -- Multi-stream folds ------------------------------------------------------------------------------- diff --git a/benchmark/Streamly/Benchmark/Data/Stream/Exceptions.hs b/benchmark/Streamly/Benchmark/Data/Stream/Exceptions.hs index b7cdc3a2a..819fc4467 100644 --- a/benchmark/Streamly/Benchmark/Data/Stream/Exceptions.hs +++ b/benchmark/Streamly/Benchmark/Data/Stream/Exceptions.hs @@ -7,6 +7,7 @@ -- Portability : GHC {-# LANGUAGE CPP #-} +{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE ScopedTypeVariables #-} #ifdef __HADDOCK_VERSION__ @@ -21,26 +22,24 @@ module Stream.Exceptions (benchmarks) where import Control.Exception (SomeException, Exception, throwIO) +import Stream.Common (drain, enumerateFromTo) +import Streamly.Internal.Data.Stream.Serial (SerialT) import System.IO (Handle, hClose, hPutChar) import qualified Data.IORef as Ref import qualified Data.Map.Strict as Map - +import qualified Stream.Common as Common import qualified Streamly.FileSystem.Handle as FH import qualified Streamly.Internal.Data.Unfold as IUF import qualified Streamly.Internal.FileSystem.Handle as IFH #ifdef USE_PRELUDE import qualified Streamly.Internal.Data.Stream.IsStream as Stream -import qualified Streamly.Prelude as S #else import qualified Streamly.Internal.Data.Stream as Stream -import qualified Streamly.Internal.Data.Stream as S -import qualified Streamly.Internal.Data.Fold as Fold import qualified Streamly.Internal.Data.Unfold as Unfold #endif import Gauge hiding (env) -import Streamly.Internal.Data.Stream.Serial (SerialT) import Prelude hiding (last, length) import Streamly.Benchmark.Common import Streamly.Benchmark.Common.Handle @@ -55,28 +54,14 @@ import qualified Streamly.Internal.Data.Stream.StreamD as D -- stream exceptions ------------------------------------------------------------------------------- -drain :: SerialT IO a -> IO () +{-# INLINE replicateM #-} +replicateM :: Common.MonadAsync m => Int -> m a -> SerialT m a #ifdef USE_PRELUDE -drain = Stream.drain +replicateM = Stream.replicateM #else -drain = Stream.fold Fold.drain +replicateM = Stream.unfold . Unfold.replicateM #endif -enumerateFromTo :: Int -> Int -> SerialT IO Int -#ifdef USE_PRELUDE -enumerateFromTo length from = S.enumerateFromTo from (from + length) -#else -enumerateFromTo length from = Stream.unfold Unfold.enumerateFromTo (from, from + length) -#endif - -sourceRef :: (Num b) => Int -> Int -> Ref.IORef b -> SerialT IO b -#ifdef USE_PRELUDE -sourceRef length from ref = S.replicateM (from + length) -#else -sourceRef length from ref = Stream.unfold (Unfold.replicateM (from + length)) -#endif - $ Ref.modifyIORef' ref (+ 1) >> Ref.readIORef ref - data BenchException = BenchException1 | BenchException2 @@ -89,31 +74,32 @@ retryNoneSimple length from = drain $ Stream.retry (Map.singleton BenchException1 length) - (const S.nil) + (const Stream.nil) source where - source = enumerateFromTo length from + source = enumerateFromTo from (from + length) retryNone :: Int -> Int -> IO () retryNone length from = do ref <- Ref.newIORef (0 :: Int) drain - $ Stream.retry (Map.singleton BenchException1 length) (const S.nil) + $ Stream.retry (Map.singleton BenchException1 length) (const Stream.nil) $ source ref where - source = sourceRef length from - + source ref = + replicateM (from + length) + $ Ref.modifyIORef' ref (+ 1) >> Ref.readIORef ref retryAll :: Int -> Int -> IO () retryAll length from = do ref <- Ref.newIORef 0 drain $ Stream.retry - (Map.singleton BenchException1 (length + from)) (const S.nil) + (Map.singleton BenchException1 (length + from)) (const Stream.nil) $ source ref where @@ -131,11 +117,11 @@ retryUnknown :: Int -> Int -> IO () retryUnknown length from = do drain $ Stream.retry (Map.singleton BenchException1 length) (const source) - $ throwIO BenchException2 `S.before` S.nil + $ throwIO BenchException2 `Stream.before` Stream.nil where - source = enumerateFromTo length from + source = enumerateFromTo from (from + length) o_1_space_serial_exceptions :: Int -> [Benchmark] @@ -156,8 +142,8 @@ o_1_space_serial_exceptions length = -- | Send the file contents to /dev/null with exception handling readWriteOnExceptionStream :: Handle -> Handle -> IO () readWriteOnExceptionStream inh devNull = - let readEx = S.onException (hClose inh) (S.unfold FH.read inh) - in S.fold (FH.write devNull) readEx + let readEx = Stream.onException (hClose inh) (Stream.unfold FH.read inh) + in Stream.fold (FH.write devNull) readEx #ifdef INSPECTION inspect $ hasNoTypeClasses 'readWriteOnExceptionStream @@ -166,9 +152,9 @@ inspect $ hasNoTypeClasses 'readWriteOnExceptionStream -- | Send the file contents to /dev/null with exception handling readWriteHandleExceptionStream :: Handle -> Handle -> IO () readWriteHandleExceptionStream inh devNull = - let handler (_e :: SomeException) = S.fromEffect (hClose inh >> return 10) - readEx = S.handle handler (S.unfold FH.read inh) - in S.fold (FH.write devNull) readEx + let handler (_e :: SomeException) = Stream.fromEffect (hClose inh >> return 10) + readEx = Stream.handle handler (Stream.unfold FH.read inh) + in Stream.fold (FH.write devNull) readEx #ifdef INSPECTION inspect $ hasNoTypeClasses 'readWriteHandleExceptionStream @@ -177,8 +163,8 @@ inspect $ hasNoTypeClasses 'readWriteHandleExceptionStream -- | Send the file contents to /dev/null with exception handling readWriteFinally_Stream :: Handle -> Handle -> IO () readWriteFinally_Stream inh devNull = - let readEx = Stream.finally_ (hClose inh) (S.unfold FH.read inh) - in S.fold (FH.write devNull) readEx + let readEx = Stream.finally_ (hClose inh) (Stream.unfold FH.read inh) + in Stream.fold (FH.write devNull) readEx #ifdef INSPECTION inspect $ hasNoTypeClasses 'readWriteFinally_Stream @@ -186,8 +172,8 @@ inspect $ hasNoTypeClasses 'readWriteFinally_Stream readWriteFinallyStream :: Handle -> Handle -> IO () readWriteFinallyStream inh devNull = - let readEx = S.finally (hClose inh) (S.unfold FH.read inh) - in S.fold (FH.write devNull) readEx + let readEx = Stream.finally (hClose inh) (Stream.unfold FH.read inh) + in Stream.fold (FH.write devNull) readEx -- | Send the file contents to /dev/null with exception handling fromToBytesBracket_Stream :: Handle -> Handle -> IO () @@ -202,7 +188,7 @@ inspect $ hasNoTypeClasses 'fromToBytesBracket_Stream fromToBytesBracketStream :: Handle -> Handle -> IO () fromToBytesBracketStream inh devNull = - let readEx = S.bracket (return ()) (\_ -> hClose inh) + let readEx = Stream.bracket (return ()) (\_ -> hClose inh) (\_ -> IFH.getBytes inh) in IFH.putBytes devNull readEx @@ -210,8 +196,8 @@ readWriteBeforeAfterStream :: Handle -> Handle -> IO () readWriteBeforeAfterStream inh devNull = let readEx = Stream.after (hClose inh) - $ Stream.before (hPutChar devNull 'A') (S.unfold FH.read inh) - in S.fold (FH.write devNull) readEx + $ Stream.before (hPutChar devNull 'A') (Stream.unfold FH.read inh) + in Stream.fold (FH.write devNull) readEx #ifdef INSPECTION inspect $ 'readWriteBeforeAfterStream `hasNoType` ''D.Step @@ -219,8 +205,8 @@ inspect $ 'readWriteBeforeAfterStream `hasNoType` ''D.Step readWriteAfterStream :: Handle -> Handle -> IO () readWriteAfterStream inh devNull = - let readEx = Stream.after (hClose inh) (S.unfold FH.read inh) - in S.fold (FH.write devNull) readEx + let readEx = Stream.after (hClose inh) (Stream.unfold FH.read inh) + in Stream.fold (FH.write devNull) readEx #ifdef INSPECTION inspect $ 'readWriteAfterStream `hasNoType` ''D.Step @@ -228,8 +214,8 @@ inspect $ 'readWriteAfterStream `hasNoType` ''D.Step readWriteAfter_Stream :: Handle -> Handle -> IO () readWriteAfter_Stream inh devNull = - let readEx = Stream.after_ (hClose inh) (S.unfold FH.read inh) - in S.fold (FH.write devNull) readEx + let readEx = Stream.after_ (hClose inh) (Stream.unfold FH.read inh) + in Stream.fold (FH.write devNull) readEx #ifdef INSPECTION inspect $ hasNoTypeClasses 'readWriteAfter_Stream @@ -239,25 +225,25 @@ inspect $ 'readWriteAfter_Stream `hasNoType` ''D.Step o_1_space_copy_stream_exceptions :: BenchEnv -> [Benchmark] o_1_space_copy_stream_exceptions env = [ bgroup "exceptions" - [ mkBenchSmall "S.onException" env $ \inh _ -> + [ mkBenchSmall "Stream.onException" env $ \inh _ -> readWriteOnExceptionStream inh (nullH env) - , mkBenchSmall "S.handle" env $ \inh _ -> + , mkBenchSmall "Stream.handle" env $ \inh _ -> readWriteHandleExceptionStream inh (nullH env) - , mkBenchSmall "S.finally_" env $ \inh _ -> + , mkBenchSmall "Stream.finally_" env $ \inh _ -> readWriteFinally_Stream inh (nullH env) - , mkBenchSmall "S.finally" env $ \inh _ -> + , mkBenchSmall "Stream.finally" env $ \inh _ -> readWriteFinallyStream inh (nullH env) - , mkBenchSmall "S.after . S.before" env $ \inh _ -> + , mkBenchSmall "Stream.after . Stream.before" env $ \inh _ -> readWriteBeforeAfterStream inh (nullH env) - , mkBenchSmall "S.after" env $ \inh _ -> + , mkBenchSmall "Stream.after" env $ \inh _ -> readWriteAfterStream inh (nullH env) - , mkBenchSmall "S.after_" env $ \inh _ -> + , mkBenchSmall "Stream.after_" env $ \inh _ -> readWriteAfter_Stream inh (nullH env) ] , bgroup "exceptions/fromToBytes" - [ mkBenchSmall "S.bracket_" env $ \inh _ -> + [ mkBenchSmall "Stream.bracket_" env $ \inh _ -> fromToBytesBracket_Stream inh (nullH env) - , mkBenchSmall "S.bracket" env $ \inh _ -> + , mkBenchSmall "Stream.bracket" env $ \inh _ -> fromToBytesBracketStream inh (nullH env) ] ] @@ -314,7 +300,7 @@ toChunksBracket_ inh devNull = (return ()) (\_ -> hClose inh) (\_ -> IFH.getChunks inh) - in S.fold (IFH.writeChunks devNull) readEx + in Stream.fold (IFH.writeChunks devNull) readEx #ifdef INSPECTION inspect $ hasNoTypeClasses 'toChunksBracket_ @@ -322,18 +308,18 @@ inspect $ hasNoTypeClasses 'toChunksBracket_ toChunksBracket :: Handle -> Handle -> IO () toChunksBracket inh devNull = - let readEx = S.bracket + let readEx = Stream.bracket (return ()) (\_ -> hClose inh) (\_ -> IFH.getChunks inh) - in S.fold (IFH.writeChunks devNull) readEx + in Stream.fold (IFH.writeChunks devNull) readEx o_1_space_copy_exceptions_toChunks :: BenchEnv -> [Benchmark] o_1_space_copy_exceptions_toChunks env = [ bgroup "exceptions/toChunks" - [ mkBench "S.bracket_" env $ \inH _ -> + [ mkBench "Stream.bracket_" env $ \inH _ -> toChunksBracket_ inH (nullH env) - , mkBench "S.bracket" env $ \inH _ -> + , mkBench "Stream.bracket" env $ \inH _ -> toChunksBracket inH (nullH env) ] ] diff --git a/benchmark/Streamly/Benchmark/Data/Stream/NestedStream.hs b/benchmark/Streamly/Benchmark/Data/Stream/Expand.hs similarity index 78% rename from benchmark/Streamly/Benchmark/Data/Stream/NestedStream.hs rename to benchmark/Streamly/Benchmark/Data/Stream/Expand.hs index 7f3e90450..8d5dfd267 100644 --- a/benchmark/Streamly/Benchmark/Data/Stream/NestedStream.hs +++ b/benchmark/Streamly/Benchmark/Data/Stream/Expand.hs @@ -1,5 +1,5 @@ -- | --- Module : Stream.NestedStream +-- Module : Stream.Expand -- Copyright : (c) 2018 Composewell Technologies -- License : BSD-3-Clause -- Maintainer : streamly@composewell.com @@ -18,9 +18,10 @@ {-# OPTIONS_GHC -fplugin Test.Inspection.Plugin #-} #endif -module Stream.NestedStream (benchmarks) where +module Stream.Expand (benchmarks) where import Control.Monad.Trans.Class (lift) +import Streamly.Internal.Data.Stream.Serial (SerialT) import qualified Control.Applicative as AP @@ -31,14 +32,23 @@ import Test.Inspection import qualified Streamly.Internal.Data.Stream.StreamD as D #endif -import qualified Streamly.Prelude as S -import qualified Streamly.Internal.Data.Stream.IsStream as Internal +import qualified Stream.Common as Common +#ifdef USE_PRELUDE +import qualified Streamly.Internal.Data.Stream.IsStream as S +import Streamly.Benchmark.Prelude hiding + (benchIOSrc, sourceUnfoldrM, apDiscardFst, apDiscardSnd, apLiftA2, toNullAp + , monadThen, toNullM, toNullM3, filterAllInM, filterAllOutM, filterSome + , breakAfterSome, toListM, toListSome) +#else +import Streamly.Benchmark.Prelude (benchIO) +import qualified Streamly.Internal.Data.Stream as S +#endif import qualified Streamly.Internal.Data.Unfold as UF +import qualified Streamly.Internal.Data.Fold as Fold import Gauge -import Streamly.Prelude (SerialT, fromSerial, serial) +import Stream.Common hiding (append2) import Streamly.Benchmark.Common -import Streamly.Benchmark.Prelude import Prelude hiding (concatMap) ------------------------------------------------------------------------------- @@ -56,13 +66,14 @@ iterateN g initial count = f count initial -- Iterate a transformation over a singleton stream {-# INLINE iterateSingleton #-} -iterateSingleton :: S.MonadAsync m +iterateSingleton :: Monad m => (Int -> SerialT m Int -> SerialT m Int) -> Int -> Int -> SerialT m Int iterateSingleton g count n = iterateN g (return n) count +{- -- XXX need to check why this is slower than the explicit recursion above, even -- if the above code is written in a foldr like head recursive way. We also -- need to try this with foldlM' once #150 is fixed. @@ -70,12 +81,13 @@ iterateSingleton g count n = iterateN g (return n) count -- foldrM and any related fusion issues. {-# INLINE _iterateSingleton #-} _iterateSingleton :: - S.MonadAsync m + Monad m => (Int -> SerialT m Int -> SerialT m Int) -> Int -> Int -> SerialT m Int _iterateSingleton g value n = S.foldrM g (return n) $ sourceIntFromTo value n +-} ------------------------------------------------------------------------------- -- Multi-Stream @@ -88,34 +100,34 @@ _iterateSingleton g value n = S.foldrM g (return n) $ sourceIntFromTo value n {-# INLINE serial2 #-} serial2 :: Int -> Int -> IO () serial2 count n = - S.drain $ - S.serial (sourceUnfoldrM count n) (sourceUnfoldrM count (n + 1)) + drain $ + Common.append (sourceUnfoldrM count n) (sourceUnfoldrM count (n + 1)) {-# INLINE serial4 #-} serial4 :: Int -> Int -> IO () serial4 count n = - S.drain $ - S.serial - (S.serial (sourceUnfoldrM count n) (sourceUnfoldrM count (n + 1))) - (S.serial + drain $ + Common.append + (Common.append (sourceUnfoldrM count n) (sourceUnfoldrM count (n + 1))) + (Common.append (sourceUnfoldrM count (n + 2)) (sourceUnfoldrM count (n + 3))) {-# INLINE append2 #-} append2 :: Int -> Int -> IO () append2 count n = - S.drain $ - Internal.append (sourceUnfoldrM count n) (sourceUnfoldrM count (n + 1)) + drain $ + Common.append2 (sourceUnfoldrM count n) (sourceUnfoldrM count (n + 1)) {-# INLINE append4 #-} append4 :: Int -> Int -> IO () append4 count n = - S.drain $ - Internal.append - (Internal.append + drain $ + Common.append2 + (Common.append2 (sourceUnfoldrM count n) (sourceUnfoldrM count (n + 1))) - (Internal.append + (Common.append2 (sourceUnfoldrM count (n + 2)) (sourceUnfoldrM count (n + 3))) @@ -139,22 +151,24 @@ o_1_space_joining value = -- Concat Foldable containers ------------------------------------------------------------------------------- +#ifdef USE_PRELUDE o_1_space_concatFoldable :: Int -> [Benchmark] o_1_space_concatFoldable value = [ bgroup "concat-foldable" - [ benchIOSrc fromSerial "foldMapWith (<>) (List)" + [ benchIOSrc "foldMapWith (<>) (List)" (sourceFoldMapWith value) - , benchIOSrc fromSerial "foldMapWith (<>) (Stream)" + , benchIOSrc "foldMapWith (<>) (Stream)" (sourceFoldMapWithStream value) - , benchIOSrc fromSerial "foldMapWithM (<>) (List)" + , benchIOSrc "foldMapWithM (<>) (List)" (sourceFoldMapWithM value) - , benchIOSrc fromSerial "S.concatFoldableWith (<>) (List)" + , benchIOSrc "S.concatFoldableWith (<>) (List)" (concatFoldableWith value) - , benchIOSrc fromSerial "S.concatForFoldableWith (<>) (List)" + , benchIOSrc "S.concatForFoldableWith (<>) (List)" (concatForFoldableWith value) - , benchIOSrc fromSerial "foldMapM (List)" (sourceFoldMapM value) + , benchIOSrc "foldMapM (List)" (sourceFoldMapM value) ] ] +#endif ------------------------------------------------------------------------------- -- Concat @@ -165,14 +179,14 @@ o_1_space_concatFoldable value = {-# INLINE concatMap #-} concatMap :: Int -> Int -> Int -> IO () concatMap outer inner n = - S.drain $ S.concatMap + drain $ S.concatMap (\_ -> sourceUnfoldrM inner n) (sourceUnfoldrM outer n) {-# INLINE concatMapM #-} concatMapM :: Int -> Int -> Int -> IO () concatMapM outer inner n = - S.drain $ S.concatMapM + drain $ S.concatMapM (\_ -> return $ sourceUnfoldrM inner n) (sourceUnfoldrM outer n) @@ -186,7 +200,7 @@ inspect $ 'concatMap `hasNoType` ''SPEC {-# INLINE concatMapPure #-} concatMapPure :: Int -> Int -> Int -> IO () concatMapPure outer inner n = - S.drain $ S.concatMap + drain $ S.concatMap (\_ -> sourceUnfoldr inner n) (sourceUnfoldr outer n) @@ -200,7 +214,7 @@ inspect $ 'concatMapPure `hasNoType` ''SPEC {-# INLINE concatMapRepl #-} concatMapRepl :: Int -> Int -> Int -> IO () concatMapRepl outer inner n = - S.drain $ S.concatMap (S.replicate inner) (sourceUnfoldrM outer n) + drain $ S.concatMap (Common.replicate inner) (sourceUnfoldrM outer n) #ifdef INSPECTION inspect $ hasNoTypeClasses 'concatMapRepl @@ -211,7 +225,7 @@ inspect $ 'concatMapRepl `hasNoType` ''SPEC {-# INLINE concatMapWithSerial #-} concatMapWithSerial :: Int -> Int -> Int -> IO () -concatMapWithSerial = concatStreamsWith S.serial +concatMapWithSerial = concatStreamsWith Common.append #ifdef INSPECTION inspect $ hasNoTypeClasses 'concatMapWithSerial @@ -220,7 +234,7 @@ inspect $ 'concatMapWithSerial `hasNoType` ''SPEC {-# INLINE concatMapWithAppend #-} concatMapWithAppend :: Int -> Int -> Int -> IO () -concatMapWithAppend = concatStreamsWith Internal.append +concatMapWithAppend = concatStreamsWith Common.append2 #ifdef INSPECTION inspect $ hasNoTypeClasses 'concatMapWithAppend @@ -231,11 +245,11 @@ inspect $ 'concatMapWithAppend `hasNoType` ''SPEC {-# INLINE concatPairWithSerial #-} concatPairWithSerial :: Int -> Int -> Int -> IO () -concatPairWithSerial = concatPairsWith Internal.serial +concatPairWithSerial = concatPairsWith Common.append {-# INLINE concatPairWithAppend #-} concatPairWithAppend :: Int -> Int -> Int -> IO () -concatPairWithAppend = concatPairsWith Internal.append +concatPairWithAppend = concatPairsWith Common.append2 -- unfoldMany @@ -244,7 +258,7 @@ concatPairWithAppend = concatPairsWith Internal.append {-# INLINE unfoldManyRepl #-} unfoldManyRepl :: Int -> Int -> Int -> IO () unfoldManyRepl outer inner n = - S.drain + drain $ S.unfoldMany (UF.lmap return (UF.replicateM inner)) (sourceUnfoldrM outer n) @@ -266,7 +280,7 @@ o_1_space_concat value = sqrtVal `seq` (concatMapPure 1 value) -- This is for comparison with foldMapWith - , benchIOSrc fromSerial "concatMapId (n of 1) (fromFoldable)" + , benchIOSrc "concatMapId (n of 1) (fromFoldable)" (S.concatMap id . sourceConcatMapId value) , benchIOSrc1 "concatMap (n of 1)" @@ -284,8 +298,8 @@ o_1_space_concat value = sqrtVal `seq` (concatMapM 1 value) -- This is for comparison with foldMapWith - , benchIOSrc fromSerial "concatMapWithId (n of 1) (fromFoldable)" - (S.concatMapWith serial id . sourceConcatMapId value) + , benchIOSrc "concatMapWithId (n of 1) (fromFoldable)" + (S.concatMapWith Common.append id . sourceConcatMapId value) , benchIOSrc1 "concatMapWith (n of 1)" (concatMapWithSerial value 1) @@ -343,23 +357,23 @@ o_n_space_concat value = sqrtVal `seq` o_1_space_applicative :: Int -> [Benchmark] o_1_space_applicative value = [ bgroup "Applicative" - [ benchIO "(*>) (sqrt n x sqrt n)" $ apDiscardFst value fromSerial - , benchIO "(<*) (sqrt n x sqrt n)" $ apDiscardSnd value fromSerial - , benchIO "(<*>) (sqrt n x sqrt n)" $ toNullAp value fromSerial - , benchIO "liftA2 (sqrt n x sqrt n)" $ apLiftA2 value fromSerial + [ benchIO "(*>) (sqrt n x sqrt n)" $ apDiscardFst value + , benchIO "(<*) (sqrt n x sqrt n)" $ apDiscardSnd value + , benchIO "(<*>) (sqrt n x sqrt n)" $ toNullAp value + , benchIO "liftA2 (sqrt n x sqrt n)" $ apLiftA2 value ] ] o_n_space_applicative :: Int -> [Benchmark] o_n_space_applicative value = [ bgroup "Applicative" - [ benchIOSrc fromSerial "(*>) (n times)" $ + [ benchIOSrc "(*>) (n times)" $ iterateSingleton ((*>) . pure) value - , benchIOSrc fromSerial "(<*) (n times)" $ - iterateSingleton (\x xs -> xs <* pure x) value - , benchIOSrc fromSerial "(<*>) (n times)" $ + , benchIOSrc "(<*) (n times)" $ + iterateSingleton (\x xs -> xs <* pure x) value + , benchIOSrc "(<*>) (n times)" $ iterateSingleton (\x xs -> pure (+ x) <*> xs) value - , benchIOSrc fromSerial "liftA2 (n times)" $ + , benchIOSrc "liftA2 (n times)" $ iterateSingleton (AP.liftA2 (+) . pure) value ] ] @@ -371,18 +385,18 @@ o_n_space_applicative value = o_1_space_monad :: Int -> [Benchmark] o_1_space_monad value = [ bgroup "Monad" - [ benchIO "(>>) (sqrt n x sqrt n)" $ monadThen value fromSerial - , benchIO "(>>=) (sqrt n x sqrt n)" $ toNullM value fromSerial + [ benchIO "(>>) (sqrt n x sqrt n)" $ monadThen value + , benchIO "(>>=) (sqrt n x sqrt n)" $ toNullM value , benchIO "(>>=) (sqrt n x sqrt n) (filterAllOut)" $ - filterAllOutM value fromSerial + filterAllOutM value , benchIO "(>>=) (sqrt n x sqrt n) (filterAllIn)" $ - filterAllInM value fromSerial + filterAllInM value , benchIO "(>>=) (sqrt n x sqrt n) (filterSome)" $ - filterSome value fromSerial + filterSome value , benchIO "(>>=) (sqrt n x sqrt n) (breakAfterSome)" $ - breakAfterSome value fromSerial + breakAfterSome value , benchIO "(>>=) (cubert n x cubert n x cubert n)" $ - toNullM3 value fromSerial + toNullM3 value ] ] @@ -400,16 +414,16 @@ sieve s = do o_n_space_monad :: Int -> [Benchmark] o_n_space_monad value = [ bgroup "Monad" - [ benchIOSrc fromSerial "(>>) (n times)" $ + [ benchIOSrc "(>>) (n times)" $ iterateSingleton ((>>) . pure) value - , benchIOSrc fromSerial "(>>=) (n times)" $ + , benchIOSrc "(>>=) (n times)" $ iterateSingleton (\x xs -> xs >>= \y -> return (x + y)) value , benchIO "(>>=) (sqrt n x sqrt n) (toList)" $ - toListM value fromSerial + toListM value , benchIO "(>>=) (sqrt n x sqrt n) (toListSome)" $ - toListSome value fromSerial + toListSome value , benchIO "naive prime sieve (n/4)" - (\n -> S.sum $ sieve $ S.enumerateFromTo 2 (value `div` 4 + n)) + (\n -> S.fold Fold.sum $ sieve $ enumerateFromTo 2 (value `div` 4 + n)) ] ] @@ -421,22 +435,22 @@ toKv :: Int -> (Int, Int) toKv p = (p, p) {-# INLINE joinWith #-} -joinWith :: (S.MonadAsync m) => +joinWith :: Common.MonadAsync m => ((Int -> Int -> Bool) -> SerialT m Int -> SerialT m Int -> SerialT m b) -> Int -> Int -> m () joinWith j val i = - S.drain $ j (==) (sourceUnfoldrM val i) (sourceUnfoldrM val (val `div` 2)) + drain $ j (==) (sourceUnfoldrM val i) (sourceUnfoldrM val (val `div` 2)) {-# INLINE joinMapWith #-} -joinMapWith :: (S.MonadAsync m) => +joinMapWith :: Common.MonadAsync m => (SerialT m (Int, Int) -> SerialT m (Int, Int) -> SerialT m b) -> Int -> Int -> m () joinMapWith j val i = - S.drain + drain $ j (fmap toKv (sourceUnfoldrM val i)) (fmap toKv (sourceUnfoldrM val (val `div` 2))) @@ -446,21 +460,21 @@ o_n_heap_buffering value = [ bgroup "buffered" [ benchIOSrc1 "joinInner (sqrtVal)" - $ joinWith Internal.joinInner sqrtVal + $ joinWith S.joinInner sqrtVal , benchIOSrc1 "joinInnerMap" - $ joinMapWith Internal.joinInnerMap halfVal + $ joinMapWith S.joinInnerMap halfVal , benchIOSrc1 "joinLeft (sqrtVal)" - $ joinWith Internal.joinLeft sqrtVal + $ joinWith S.joinLeft sqrtVal , benchIOSrc1 "joinLeftMap " - $ joinMapWith Internal.joinLeftMap halfVal + $ joinMapWith S.joinLeftMap halfVal , benchIOSrc1 "joinOuter (sqrtVal)" - $ joinWith Internal.joinOuter sqrtVal + $ joinWith S.joinOuter sqrtVal , benchIOSrc1 "joinOuterMap" - $ joinMapWith Internal.joinOuterMap halfVal + $ joinMapWith S.joinOuterMap halfVal , benchIOSrc1 "intersectBy (sqrtVal)" - $ joinWith Internal.intersectBy sqrtVal + $ joinWith S.intersectBy sqrtVal , benchIOSrc1 "intersectBySorted" - $ joinMapWith (Internal.intersectBySorted compare) halfVal + $ joinMapWith (S.intersectBySorted compare) halfVal ] ] @@ -482,7 +496,9 @@ benchmarks moduleName size = [ -- multi-stream o_1_space_joining size +#ifdef USE_PRELUDE , o_1_space_concatFoldable size +#endif , o_1_space_concat size , o_1_space_applicative size diff --git a/benchmark/Streamly/Benchmark/Data/Stream/Generate.hs b/benchmark/Streamly/Benchmark/Data/Stream/Generate.hs index 781376c20..5f8157a38 100644 --- a/benchmark/Streamly/Benchmark/Data/Stream/Generate.hs +++ b/benchmark/Streamly/Benchmark/Data/Stream/Generate.hs @@ -13,22 +13,29 @@ module Stream.Generate (benchmarks) where import Data.Functor.Identity (Identity) +import qualified Stream.Common as Common #ifdef USE_PRELUDE import qualified GHC.Exts as GHC import qualified Streamly.Prelude as S +import qualified Streamly.Internal.Data.Stream.IsStream as Stream import qualified Prelude -#endif +#else import qualified Streamly.Internal.Data.Stream as Stream -import qualified Streamly.Internal.Data.Unfold as Unfold +#endif import Gauge import Streamly.Benchmark.Common import Streamly.Internal.Data.Stream.Serial (SerialT) #ifdef USE_PRELUDE -import Streamly.Benchmark.Prelude -import Streamly.Prelude (fromSerial, MonadAsync) +import Streamly.Prelude (MonadAsync) +import Stream.Common hiding (MonadAsync, replicate, enumerateFromTo) +import Streamly.Benchmark.Prelude hiding + (benchIOSrc, sourceUnfoldrM, apDiscardFst, apDiscardSnd, apLiftA2, toNullAp + , monadThen, toNullM, toNullM3, filterAllInM, filterAllOutM, filterSome + , breakAfterSome, toListM, toListSome) +#else +import Stream.Common #endif -import qualified Stream.Common as SC import System.IO.Unsafe (unsafeInterleaveIO) @@ -41,6 +48,7 @@ import Prelude hiding (repeat, replicate, iterate) ------------------------------------------------------------------------------- -- fromList ------------------------------------------------------------------------------- + #ifdef USE_PRELUDE {-# INLINE sourceIsList #-} sourceIsList :: Int -> Int -> SerialT Identity Int @@ -126,42 +134,6 @@ fromIndices value n = S.take value $ S.fromIndices (+ n) {-# INLINE fromIndicesM #-} fromIndicesM :: (MonadAsync m, S.IsStream t) => Int -> Int -> t m Int fromIndicesM value n = S.take value $ S.fromIndicesM (return <$> (+ n)) - -o_1_space_generation_prel :: Int -> [Benchmark] -o_1_space_generation_prel value = - [ bgroup "generation" - [ benchIOSrc fromSerial "unfoldr" (sourceUnfoldr value) - , benchIOSrc fromSerial "unfoldrM" (sourceUnfoldrM value) - , benchIOSrc fromSerial "repeat" (repeat value) - , benchIOSrc fromSerial "repeatM" (repeatM value) - , benchIOSrc fromSerial "replicate" (replicate value) - , benchIOSrc fromSerial "replicateM" (replicateM value) - , benchIOSrc fromSerial "iterate" (iterate value) - , benchIOSrc fromSerial "iterateM" (iterateM value) - , benchIOSrc fromSerial "fromIndices" (fromIndices value) - , benchIOSrc fromSerial "fromIndicesM" (fromIndicesM value) - , benchIOSrc fromSerial "intFromTo" (sourceIntFromTo value) - , benchIOSrc fromSerial "intFromThenTo" (sourceIntFromThenTo value) - , benchIOSrc fromSerial "integerFromStep" (sourceIntegerFromStep value) - , benchIOSrc fromSerial "fracFromThenTo" (sourceFracFromThenTo value) - , benchIOSrc fromSerial "fracFromTo" (sourceFracFromTo value) - , benchIOSrc fromSerial "fromList" (sourceFromList value) - , benchPureSrc "IsList.fromList" (sourceIsList value) - , benchPureSrc "IsString.fromString" (sourceIsString value) - , benchIOSrc fromSerial "fromListM" (sourceFromListM value) - , benchIOSrc fromSerial "enumerateFrom" (enumerateFrom value) - , benchIOSrc fromSerial "enumerateFromTo" (enumerateFromTo value) - , benchIOSrc fromSerial "enumerateFromThen" (enumerateFromThen value) - , benchIOSrc fromSerial "enumerateFromThenTo" (enumerateFromThenTo value) - , benchIOSrc fromSerial "enumerate" (enumerate value) - , benchIOSrc fromSerial "enumerateTo" (enumerateTo value) - - -- These essentially test cons and consM - , benchIOSrc fromSerial "fromFoldable" (sourceFromFoldable value) - , benchIOSrc fromSerial "fromFoldableM" (sourceFromFoldableM value) - , benchIOSrc fromSerial "absTimes" $ absTimes value - ] - ] #endif {-# INLINE mfixUnfold #-} @@ -170,29 +142,51 @@ mfixUnfold count start = Stream.mfix f where f action = do let incr n act = fmap ((+n) . snd) $ unsafeInterleaveIO act - x <- Stream.unfold Unfold.fromListM [incr 1 action, incr 2 action] - y <- SC.sourceUnfoldr count start + x <- Common.fromListM [incr 1 action, incr 2 action] + y <- Common.sourceUnfoldr count start return (x, y) -{-# INLINE fromFoldable #-} -fromFoldable :: Int -> Int -> SerialT m Int -fromFoldable count start = - Stream.fromFoldable (Prelude.enumFromTo count start) - -{-# INLINE fromFoldableM #-} -fromFoldableM :: Monad m => Int -> Int -> SerialT m Int -fromFoldableM count start = - Stream.fromFoldableM (fmap return (Prelude.enumFromTo count start)) - o_1_space_generation :: Int -> [Benchmark] o_1_space_generation value = [ bgroup "generation" - [ SC.benchIOSrc "unfold" (SC.sourceUnfoldr value) - , SC.benchIOSrc "fromFoldable" (fromFoldable value) - , SC.benchIOSrc "fromFoldableM" (fromFoldableM value) - , SC.benchIOSrc "mfix_10" (mfixUnfold 10) - , SC.benchIOSrc "mfix_100" (mfixUnfold 100) - , SC.benchIOSrc "mfix_1000" (mfixUnfold 1000) + [ benchIOSrc "unfoldr" (sourceUnfoldr value) + , benchIOSrc "unfoldrM" (sourceUnfoldrM value) +#ifdef USE_PRELUDE + , benchIOSrc "repeat" (repeat value) + , benchIOSrc "repeatM" (repeatM value) + , benchIOSrc "replicate" (replicate value) + , benchIOSrc "replicateM" (replicateM value) + , benchIOSrc "iterate" (iterate value) + , benchIOSrc "iterateM" (iterateM value) + , benchIOSrc "fromIndices" (fromIndices value) + , benchIOSrc "fromIndicesM" (fromIndicesM value) + , benchIOSrc "intFromTo" (sourceIntFromTo value) + , benchIOSrc "intFromThenTo" (sourceIntFromThenTo value) + , benchIOSrc "integerFromStep" (sourceIntegerFromStep value) + , benchIOSrc "fracFromThenTo" (sourceFracFromThenTo value) + , benchIOSrc "fracFromTo" (sourceFracFromTo value) + , benchIOSrc "fromList" (sourceFromList value) + , benchPureSrc "IsList.fromList" (sourceIsList value) + , benchPureSrc "IsString.fromString" (sourceIsString value) + , benchIOSrc "fromListM" (sourceFromListM value) + , benchIOSrc "enumerateFrom" (enumerateFrom value) + , benchIOSrc "enumerateFromTo" (enumerateFromTo value) + , benchIOSrc "enumerateFromThen" (enumerateFromThen value) + , benchIOSrc "enumerateFromThenTo" (enumerateFromThenTo value) + , benchIOSrc "enumerate" (enumerate value) + , benchIOSrc "enumerateTo" (enumerateTo value) +#endif + + -- These essentially test cons and consM + , benchIOSrc "fromFoldable" (sourceFromFoldable value) + , benchIOSrc "fromFoldableM" (sourceFromFoldableM value) + +#ifdef USE_PRELUDE + , benchIOSrc "absTimes" $ absTimes value +#endif + , Common.benchIOSrc "mfix_10" (mfixUnfold 10) + , Common.benchIOSrc "mfix_100" (mfixUnfold 100) + , Common.benchIOSrc "mfix_1000" (mfixUnfold 1000) ] ] @@ -218,11 +212,6 @@ o_n_heap_generation value = -- benchmarks :: String -> Int -> [Benchmark] benchmarks moduleName size = - [ -#ifdef USE_PRELUDE - bgroup (o_1_space_prefix moduleName) (o_1_space_generation_prel size) - , -#endif - bgroup (o_1_space_prefix moduleName) (o_1_space_generation size) + [ bgroup (o_1_space_prefix moduleName) (o_1_space_generation size) , bgroup (o_n_heap_prefix moduleName) (o_n_heap_generation size) ] diff --git a/benchmark/Streamly/Benchmark/Data/Stream/Lift.hs b/benchmark/Streamly/Benchmark/Data/Stream/Lift.hs index 6347ad23a..5e0d7437b 100644 --- a/benchmark/Streamly/Benchmark/Data/Stream/Lift.hs +++ b/benchmark/Streamly/Benchmark/Data/Stream/Lift.hs @@ -11,21 +11,23 @@ module Stream.Lift (benchmarks) where -#ifdef USE_PRELUDE +import Control.DeepSeq (NFData(..)) import Control.Monad.Trans.Class (lift) import Control.Monad.State.Strict (StateT, get, put, MonadState) -import Streamly.Prelude (fromSerial) -import Streamly.Benchmark.Prelude -import qualified Control.Monad.State.Strict as State -import qualified Streamly.Prelude as Stream -import qualified Streamly.Internal.Data.Stream.IsStream as Internal -#else -import Control.DeepSeq (NFData(..)) import Data.Functor.Identity (Identity) -import Stream.Common (sourceUnfoldr, sourceUnfoldrM, benchIOSrc) +import Stream.Common (sourceUnfoldr, sourceUnfoldrM, benchIOSrc, drain) import System.Random (randomRIO) +#ifdef USE_PRELUDE +import Streamly.Benchmark.Prelude hiding + (sourceUnfoldr, sourceUnfoldrM, benchIOSrc) +import qualified Streamly.Internal.Data.Stream.IsStream as Stream +#else +import Streamly.Benchmark.Prelude (benchIO) import qualified Streamly.Internal.Data.Stream as Stream #endif +import qualified Streamly.Internal.Data.Fold as Fold +import qualified Stream.Common as Common +import qualified Control.Monad.State.Strict as State import Gauge import Streamly.Internal.Data.Stream.Serial (SerialT) @@ -36,11 +38,11 @@ import Prelude hiding (reverse, tail) ------------------------------------------------------------------------------- -- Monad transformation (hoisting etc.) ------------------------------------------------------------------------------- -#ifdef USE_PRELUDE + {-# INLINE sourceUnfoldrState #-} -sourceUnfoldrState :: (Stream.IsStream t, Stream.MonadAsync m) - => Int -> Int -> t (StateT Int m) Int -sourceUnfoldrState value n = Stream.unfoldrM step n +sourceUnfoldrState :: Common.MonadAsync m => + Int -> Int -> SerialT (StateT Int m) Int +sourceUnfoldrState value n = Common.unfoldrM step n where step cnt = if cnt > n + value @@ -51,27 +53,36 @@ sourceUnfoldrState value n = Stream.unfoldrM step n return (Just (s, cnt + 1)) {-# INLINE evalStateT #-} -evalStateT :: Stream.MonadAsync m => Int -> Int -> SerialT m Int +evalStateT :: Common.MonadAsync m => Int -> Int -> SerialT m Int evalStateT value n = - Internal.evalStateT (return 0) (sourceUnfoldrState value n) + Stream.evalStateT (return 0) (sourceUnfoldrState value n) {-# INLINE withState #-} -withState :: Stream.MonadAsync m => Int -> Int -> SerialT m Int +withState :: Common.MonadAsync m => Int -> Int -> SerialT m Int withState value n = - Internal.evalStateT - (return (0 :: Int)) (Internal.liftInner (sourceUnfoldrM value n)) + Stream.evalStateT + (return (0 :: Int)) (Stream.liftInner (sourceUnfoldrM value n)) + +{-# INLINE benchHoistSink #-} +benchHoistSink + :: (NFData b) + => Int -> String -> (SerialT Identity Int -> IO b) -> Benchmark +benchHoistSink value name f = + bench name $ nfIO $ randomRIO (1,1) >>= f . sourceUnfoldr value o_1_space_hoisting :: Int -> [Benchmark] o_1_space_hoisting value = [ bgroup "hoisting" - [ benchIOSrc fromSerial "evalState" (evalStateT value) - , benchIOSrc fromSerial "withState" (withState value) + [ benchIOSrc "evalState" (evalStateT value) + , benchIOSrc "withState" (withState value) + , benchHoistSink value "generally" + ((\xs -> Stream.fold Fold.length xs :: IO Int) . Stream.generally) ] ] {-# INLINE iterateStateIO #-} iterateStateIO :: - (Stream.MonadAsync m) + Monad m => Int -> StateT Int m Int iterateStateIO n = do @@ -95,7 +106,7 @@ iterateStateT n = do {-# INLINE iterateState #-} {-# SPECIALIZE iterateState :: Int -> SerialT (StateT Int IO) Int #-} iterateState :: - (Stream.MonadAsync m, MonadState Int m) + MonadState Int m => Int -> SerialT m Int iterateState n = do @@ -112,38 +123,12 @@ o_n_heap_transformer value = [ benchIO "StateT Int IO (n times) (baseline)" $ \n -> State.evalStateT (iterateStateIO n) value , benchIO "SerialT (StateT Int IO) (n times)" $ \n -> - State.evalStateT (Stream.drain (iterateStateT n)) value + State.evalStateT (drain (iterateStateT n)) value , benchIO "MonadState Int m => SerialT m Int" $ \n -> - State.evalStateT (Stream.drain (iterateState n)) value + State.evalStateT (drain (iterateState n)) value ] ] -#else -{-# INLINE benchHoistSink #-} -benchHoistSink - :: (NFData b) - => Int -> String -> (SerialT Identity Int -> IO b) -> Benchmark -benchHoistSink value name f = - bench name $ nfIO $ randomRIO (1,1) >>= f . sourceUnfoldr value --- XXX We should be using sourceUnfoldrM for fair comparison with IO monad, but --- we can't use it as it requires MonadAsync constraint. - -{-# INLINE liftInner #-} -liftInner :: Monad m => Int -> Int -> SerialT m Int -liftInner value n = - Stream.evalStateT - (return (0 :: Int)) (Stream.liftInner (sourceUnfoldrM value n)) - -o_1_space_generation :: Int -> [Benchmark] -o_1_space_generation value = - [ bgroup "lift" - [ benchHoistSink value "length . generally" - ((\(_ :: SerialT IO Int) -> return 8 :: IO Int) . Stream.generally) - - , benchIOSrc "liftInner/evalStateT" (liftInner value) - ] - ] -#endif ------------------------------------------------------------------------------- -- Main ------------------------------------------------------------------------------- @@ -153,11 +138,6 @@ o_1_space_generation value = -- benchmarks :: String -> Int -> [Benchmark] benchmarks moduleName size = - [ -#ifdef USE_PRELUDE - bgroup (o_1_space_prefix moduleName) (o_1_space_hoisting size) + [ bgroup (o_1_space_prefix moduleName) (o_1_space_hoisting size) , bgroup (o_n_heap_prefix moduleName) (o_n_heap_transformer size) -#else - bgroup (o_1_space_prefix moduleName) (o_1_space_generation size) -#endif ] diff --git a/benchmark/Streamly/Benchmark/Data/Stream/NestedFold.hs b/benchmark/Streamly/Benchmark/Data/Stream/NestedFold.hs deleted file mode 100644 index 555453d3f..000000000 --- a/benchmark/Streamly/Benchmark/Data/Stream/NestedFold.hs +++ /dev/null @@ -1,488 +0,0 @@ --- | --- Module : Stream.NestedFold --- Copyright : (c) 2018 Composewell Technologies --- License : BSD-3-Clause --- Maintainer : streamly@composewell.com - -{-# LANGUAGE CPP #-} -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE DeriveAnyClass #-} -{-# LANGUAGE DeriveGeneric #-} -{-# LANGUAGE RankNTypes #-} - -module Stream.NestedFold (benchmarks) where - -import Control.DeepSeq (NFData(..)) -import Control.Monad (when) -import Control.Monad.IO.Class (MonadIO(..)) -import Data.Monoid (Sum(..)) -import Data.Proxy (Proxy(..)) -import Data.HashMap.Strict (HashMap) -import GHC.Generics (Generic) -import Streamly.Internal.Data.IsMap.HashMap () - -import qualified Streamly.Internal.Data.Refold.Type as Refold -import qualified Streamly.Internal.Data.Fold as FL -import qualified Streamly.Internal.Data.Stream.IsStream as Internal -import qualified Streamly.Prelude as S - -import Gauge -import Streamly.Prelude (SerialT, fromSerial) -import Streamly.Benchmark.Common -import Streamly.Benchmark.Prelude -import Prelude hiding (reverse, tail) - -------------------------------------------------------------------------------- --- Iteration/looping utilities -------------------------------------------------------------------------------- - -{-# INLINE iterateN #-} -iterateN :: (Int -> a -> a) -> a -> Int -> a -iterateN g initial count = f count initial - - where - - f (0 :: Int) x = x - f i x = f (i - 1) (g i x) - --- Iterate a transformation over a singleton stream -{-# INLINE iterateSingleton #-} -iterateSingleton :: S.MonadAsync m - => (Int -> SerialT m Int -> SerialT m Int) - -> Int - -> Int - -> SerialT m Int -iterateSingleton g count n = iterateN g (return n) count - --- XXX need to check why this is slower than the explicit recursion above, even --- if the above code is written in a foldr like head recursive way. We also --- need to try this with foldlM' once #150 is fixed. --- However, it is perhaps best to keep the iteration benchmarks independent of --- foldrM and any related fusion issues. -{-# INLINE _iterateSingleton #-} -_iterateSingleton :: - S.MonadAsync m - => (Int -> SerialT m Int -> SerialT m Int) - -> Int - -> Int - -> SerialT m Int -_iterateSingleton g value n = S.foldrM g (return n) $ sourceIntFromTo value n - --- Apply transformation g count times on a stream of length len -{-# INLINE iterateSource #-} -iterateSource :: - S.MonadAsync m - => (SerialT m Int -> SerialT m Int) - -> Int - -> Int - -> Int - -> SerialT m Int -iterateSource g count len n = f count (sourceUnfoldrM len n) - - where - - f (0 :: Int) stream = stream - f i stream = f (i - 1) (g stream) - -------------------------------------------------------------------------------- --- Functor -------------------------------------------------------------------------------- - -o_n_space_functor :: Int -> [Benchmark] -o_n_space_functor value = - [ bgroup "Functor" - [ benchIO "(+) (n times) (baseline)" $ \i0 -> - iterateN (\i acc -> acc >>= \n -> return $ i + n) (return i0) value - , benchIOSrc fromSerial "(<$) (n times)" $ - iterateSingleton (<$) value - , benchIOSrc fromSerial "fmap (n times)" $ - iterateSingleton (fmap . (+)) value - {- - , benchIOSrc fromSerial "_(<$) (n times)" $ - _iterateSingleton (<$) value - , benchIOSrc fromSerial "_fmap (n times)" $ - _iterateSingleton (fmap . (+)) value - -} - ] - ] - -------------------------------------------------------------------------------- --- Grouping transformations -------------------------------------------------------------------------------- - -{-# INLINE groups #-} -groups :: MonadIO m => SerialT m Int -> m () -groups = S.drain . S.groups FL.drain - --- XXX Change this test when the order of comparison is later changed -{-# INLINE groupsByGT #-} -groupsByGT :: MonadIO m => SerialT m Int -> m () -groupsByGT = S.drain . S.groupsBy (>) FL.drain - -{-# INLINE groupsByEq #-} -groupsByEq :: MonadIO m => SerialT m Int -> m () -groupsByEq = S.drain . S.groupsBy (==) FL.drain - --- XXX Change this test when the order of comparison is later changed -{-# INLINE groupsByRollingLT #-} -groupsByRollingLT :: MonadIO m => SerialT m Int -> m () -groupsByRollingLT = - S.drain . S.groupsByRolling (<) FL.drain - -{-# INLINE groupsByRollingEq #-} -groupsByRollingEq :: MonadIO m => SerialT m Int -> m () -groupsByRollingEq = - S.drain . S.groupsByRolling (==) FL.drain - -{-# INLINE foldMany #-} -foldMany :: Monad m => SerialT m Int -> m () -foldMany = - S.drain - . S.map getSum - . Internal.foldMany (FL.take 2 FL.mconcat) - . S.map Sum - -{-# INLINE refoldMany #-} -refoldMany :: Monad m => SerialT m Int -> m () -refoldMany = - S.drain - . S.map getSum - . Internal.refoldMany (Refold.take 2 Refold.sconcat) (return mempty) - . S.map Sum - -{-# INLINE foldIterateM #-} -foldIterateM :: Monad m => SerialT m Int -> m () -foldIterateM = - S.drain - . S.map getSum - . Internal.foldIterateM - (return . FL.take 2 . FL.sconcat) (return (Sum 0)) - . S.map Sum - -{-# INLINE refoldIterateM #-} -refoldIterateM :: Monad m => SerialT m Int -> m () -refoldIterateM = - S.drain - . S.map getSum - . Internal.refoldIterateM - (Refold.take 2 Refold.sconcat) (return (Sum 0)) - . S.map Sum - -o_1_space_grouping :: Int -> [Benchmark] -o_1_space_grouping value = - -- Buffering operations using heap proportional to group/window sizes. - [ bgroup "grouping" - [ benchIOSink value "groups" groups - , benchIOSink value "groupsByGT" groupsByGT - , benchIOSink value "groupsByEq" groupsByEq - , benchIOSink value "groupsByRollingLT" groupsByRollingLT - , benchIOSink value "groupsByRollingEq" groupsByRollingEq - , benchIOSink value "foldMany" foldMany - , benchIOSink value "refoldMany" refoldMany - , benchIOSink value "foldIterateM" foldIterateM - , benchIOSink value "refoldIterateM" refoldIterateM - ] - ] - -------------------------------------------------------------------------------- --- Size conserving transformations (reordering, buffering, etc.) -------------------------------------------------------------------------------- - -{-# INLINE reverse #-} -reverse :: MonadIO m => Int -> SerialT m Int -> m () -reverse n = composeN n S.reverse - -{-# INLINE reverse' #-} -reverse' :: MonadIO m => Int -> SerialT m Int -> m () -reverse' n = composeN n Internal.reverse' - -o_n_heap_buffering :: Int -> [Benchmark] -o_n_heap_buffering value = - [ bgroup "buffered" - [ - -- Reversing a stream - benchIOSink value "reverse" (reverse 1) - , benchIOSink value "reverse'" (reverse' 1) - - , benchIOSink value "mkAsync" (mkAsync fromSerial) - ] - ] - -------------------------------------------------------------------------------- --- Grouping/Splitting -------------------------------------------------------------------------------- - -{-# INLINE classifySessionsOf #-} -classifySessionsOf :: S.MonadAsync m => (Int -> Int) -> SerialT m Int -> m () -classifySessionsOf getKey = - S.drain - . Internal.classifySessionsOf - (const (return False)) 3 (FL.take 10 FL.sum) - . Internal.timestamped - . S.map (\x -> (getKey x, x)) - -{-# INLINE classifySessionsOfHash #-} -classifySessionsOfHash :: S.MonadAsync m => - (Int -> Int) -> SerialT m Int -> m () -classifySessionsOfHash getKey = - S.drain - . Internal.classifySessionsByGeneric - (Proxy :: Proxy (HashMap k)) - 1 False (const (return False)) 3 (FL.take 10 FL.sum) - . Internal.timestamped - . S.map (\x -> (getKey x, x)) - -o_n_space_grouping :: Int -> [Benchmark] -o_n_space_grouping value = - -- Buffering operations using heap proportional to group/window sizes. - [ bgroup "grouping" - [ benchIOSink value "classifySessionsOf (10000 buckets)" - (classifySessionsOf (getKey 10000)) - , benchIOSink value "classifySessionsOf (64 buckets)" - (classifySessionsOf (getKey 64)) - , benchIOSink value "classifySessionsOfHash (10000 buckets)" - (classifySessionsOfHash (getKey 10000)) - , benchIOSink value "classifySessionsOfHash (64 buckets)" - (classifySessionsOfHash (getKey 64)) - ] - ] - - where - - getKey n = (`mod` n) - -------------------------------------------------------------------------------- --- Mixed Transformation -------------------------------------------------------------------------------- - -{-# INLINE scanMap #-} -scanMap :: MonadIO m => Int -> SerialT m Int -> m () -scanMap n = composeN n $ S.map (subtract 1) . S.scanl' (+) 0 - -{-# INLINE dropMap #-} -dropMap :: MonadIO m => Int -> SerialT m Int -> m () -dropMap n = composeN n $ S.map (subtract 1) . S.drop 1 - -{-# INLINE dropScan #-} -dropScan :: MonadIO m => Int -> SerialT m Int -> m () -dropScan n = composeN n $ S.scanl' (+) 0 . S.drop 1 - -{-# INLINE takeDrop #-} -takeDrop :: MonadIO m => Int -> Int -> SerialT m Int -> m () -takeDrop value n = composeN n $ S.drop 1 . S.take (value + 1) - -{-# INLINE takeScan #-} -takeScan :: MonadIO m => Int -> Int -> SerialT m Int -> m () -takeScan value n = composeN n $ S.scanl' (+) 0 . S.take (value + 1) - -{-# INLINE takeMap #-} -takeMap :: MonadIO m => Int -> Int -> SerialT m Int -> m () -takeMap value n = composeN n $ S.map (subtract 1) . S.take (value + 1) - -{-# INLINE filterDrop #-} -filterDrop :: MonadIO m => Int -> Int -> SerialT m Int -> m () -filterDrop value n = composeN n $ S.drop 1 . S.filter (<= (value + 1)) - -{-# INLINE filterTake #-} -filterTake :: MonadIO m => Int -> Int -> SerialT m Int -> m () -filterTake value n = composeN n $ S.take (value + 1) . S.filter (<= (value + 1)) - -{-# INLINE filterScan #-} -filterScan :: MonadIO m => Int -> SerialT m Int -> m () -filterScan n = composeN n $ S.scanl' (+) 0 . S.filter (<= maxBound) - -{-# INLINE filterScanl1 #-} -filterScanl1 :: MonadIO m => Int -> SerialT m Int -> m () -filterScanl1 n = composeN n $ S.scanl1' (+) . S.filter (<= maxBound) - -{-# INLINE filterMap #-} -filterMap :: MonadIO m => Int -> Int -> SerialT m Int -> m () -filterMap value n = composeN n $ S.map (subtract 1) . S.filter (<= (value + 1)) - -------------------------------------------------------------------------------- --- Scan and fold -------------------------------------------------------------------------------- - -data Pair a b = - Pair !a !b - deriving (Generic, NFData) - -{-# INLINE sumProductFold #-} -sumProductFold :: Monad m => SerialT m Int -> m (Int, Int) -sumProductFold = S.foldl' (\(s, p) x -> (s + x, p * x)) (0, 1) - -{-# INLINE sumProductScan #-} -sumProductScan :: Monad m => SerialT m Int -> m (Pair Int Int) -sumProductScan = - S.foldl' (\(Pair _ p) (s0, x) -> Pair s0 (p * x)) (Pair 0 1) . - S.scanl' (\(s, _) x -> (s + x, x)) (0, 0) - -{-# INLINE foldl'ReduceMap #-} -foldl'ReduceMap :: Monad m => SerialT m Int -> m Int -foldl'ReduceMap = fmap (+ 1) . S.foldl' (+) 0 - -o_1_space_transformations_mixed :: Int -> [Benchmark] -o_1_space_transformations_mixed value = - -- scanl-map and foldl-map are equivalent to the scan and fold in the foldl - -- library. If scan/fold followed by a map is efficient enough we may not - -- need monolithic implementations of these. - [ bgroup "mixed" - [ benchIOSink value "scanl-map" (scanMap 1) - , benchIOSink value "foldl-map" foldl'ReduceMap - , benchIOSink value "sum-product-fold" sumProductFold - , benchIOSink value "sum-product-scan" sumProductScan - ] - ] - -o_1_space_transformations_mixedX4 :: Int -> [Benchmark] -o_1_space_transformations_mixedX4 value = - [ bgroup "mixedX4" - [ benchIOSink value "scan-map" (scanMap 4) - , benchIOSink value "drop-map" (dropMap 4) - , benchIOSink value "drop-scan" (dropScan 4) - , benchIOSink value "take-drop" (takeDrop value 4) - , benchIOSink value "take-scan" (takeScan value 4) - , benchIOSink value "take-map" (takeMap value 4) - , benchIOSink value "filter-drop" (filterDrop value 4) - , benchIOSink value "filter-take" (filterTake value 4) - , benchIOSink value "filter-scan" (filterScan 4) - , benchIOSink value "filter-scanl1" (filterScanl1 4) - , benchIOSink value "filter-map" (filterMap value 4) - ] - ] - -------------------------------------------------------------------------------- --- Iterating a transformation over and over again -------------------------------------------------------------------------------- - --- this is quadratic -{-# INLINE iterateScan #-} -iterateScan :: S.MonadAsync m => Int -> Int -> Int -> SerialT m Int -iterateScan = iterateSource (S.scanl' (+) 0) - --- this is quadratic -{-# INLINE iterateScanl1 #-} -iterateScanl1 :: S.MonadAsync m => Int -> Int -> Int -> SerialT m Int -iterateScanl1 = iterateSource (S.scanl1' (+)) - -{-# INLINE iterateMapM #-} -iterateMapM :: S.MonadAsync m => Int -> Int -> Int -> SerialT m Int -iterateMapM = iterateSource (S.mapM return) - -{-# INLINE iterateFilterEven #-} -iterateFilterEven :: S.MonadAsync m => Int -> Int -> Int -> SerialT m Int -iterateFilterEven = iterateSource (S.filter even) - -{-# INLINE iterateTakeAll #-} -iterateTakeAll :: S.MonadAsync m => Int -> Int -> Int -> Int -> SerialT m Int -iterateTakeAll value = iterateSource (S.take (value + 1)) - -{-# INLINE iterateDropOne #-} -iterateDropOne :: S.MonadAsync m => Int -> Int -> Int -> SerialT m Int -iterateDropOne = iterateSource (S.drop 1) - -{-# INLINE iterateDropWhileFalse #-} -iterateDropWhileFalse :: S.MonadAsync m - => Int -> Int -> Int -> Int -> SerialT m Int -iterateDropWhileFalse value = iterateSource (S.dropWhile (> (value + 1))) - -{-# INLINE iterateDropWhileTrue #-} -iterateDropWhileTrue :: S.MonadAsync m - => Int -> Int -> Int -> Int -> SerialT m Int -iterateDropWhileTrue value = iterateSource (S.dropWhile (<= (value + 1))) - -{-# INLINE tail #-} -tail :: Monad m => SerialT m a -> m () -tail s = S.tail s >>= mapM_ tail - -{-# INLINE nullHeadTail #-} -nullHeadTail :: Monad m => SerialT m Int -> m () -nullHeadTail s = do - r <- S.null s - when (not r) $ do - _ <- S.head s - S.tail s >>= mapM_ nullHeadTail - --- Head recursive operations. -o_n_stack_iterated :: Int -> [Benchmark] -o_n_stack_iterated value = by10 `seq` by100 `seq` - [ bgroup "iterated" - [ benchIOSrc fromSerial "mapM (n/10 x 10)" $ iterateMapM by10 10 - , benchIOSrc fromSerial "scanl' (quadratic) (n/100 x 100)" $ - iterateScan by100 100 - , benchIOSrc fromSerial "scanl1' (n/10 x 10)" $ iterateScanl1 by10 10 - , benchIOSrc fromSerial "filterEven (n/10 x 10)" $ - iterateFilterEven by10 10 - , benchIOSrc fromSerial "takeAll (n/10 x 10)" $ - iterateTakeAll value by10 10 - , benchIOSrc fromSerial "dropOne (n/10 x 10)" $ iterateDropOne by10 10 - , benchIOSrc fromSerial "dropWhileFalse (n/10 x 10)" $ - iterateDropWhileFalse value by10 10 - , benchIOSrc fromSerial "dropWhileTrue (n/10 x 10)" $ - iterateDropWhileTrue value by10 10 - , benchIOSink value "tail" tail - , benchIOSink value "nullHeadTail" nullHeadTail - ] - ] - - where - - by10 = value `div` 10 - by100 = value `div` 100 - -------------------------------------------------------------------------------- --- Pipes -------------------------------------------------------------------------------- - -o_1_space_pipes :: Int -> [Benchmark] -o_1_space_pipes value = - [ bgroup "pipes" - [ benchIOSink value "mapM" (transformMapM fromSerial 1) - , benchIOSink value "compose" (transformComposeMapM fromSerial 1) - , benchIOSink value "tee" (transformTeeMapM fromSerial 1) -#ifdef DEVBUILD - -- XXX this take 1 GB memory to compile - , benchIOSink value "zip" (transformZipMapM fromSerial 1) -#endif - ] - ] - -o_1_space_pipesX4 :: Int -> [Benchmark] -o_1_space_pipesX4 value = - [ bgroup "pipesX4" - [ benchIOSink value "mapM" (transformMapM fromSerial 4) - , benchIOSink value "compose" (transformComposeMapM fromSerial 4) - , benchIOSink value "tee" (transformTeeMapM fromSerial 4) -#ifdef DEVBUILD - -- XXX this take 1 GB memory to compile - , benchIOSink value "zip" (transformZipMapM fromSerial 4) -#endif - ] - ] - -------------------------------------------------------------------------------- --- Main -------------------------------------------------------------------------------- - --- In addition to gauge options, the number of elements in the stream can be --- passed using the --stream-size option. --- -benchmarks :: String -> Int -> [Benchmark] -benchmarks moduleName size = - [ bgroup (o_1_space_prefix moduleName) $ Prelude.concat - [ o_1_space_grouping size - , o_1_space_transformations_mixed size - , o_1_space_transformations_mixedX4 size - - -- pipes - , o_1_space_pipes size - , o_1_space_pipesX4 size - ] - , bgroup (o_n_stack_prefix moduleName) (o_n_stack_iterated size) - , bgroup (o_n_heap_prefix moduleName) $ Prelude.concat - [ o_n_space_grouping size - , o_n_space_functor size - , o_n_heap_buffering size - ] - ] diff --git a/benchmark/Streamly/Benchmark/Data/Stream/Reduce.hs b/benchmark/Streamly/Benchmark/Data/Stream/Reduce.hs index 0ae247c81..626ed7bc7 100644 --- a/benchmark/Streamly/Benchmark/Data/Stream/Reduce.hs +++ b/benchmark/Streamly/Benchmark/Data/Stream/Reduce.hs @@ -7,118 +7,533 @@ {-# LANGUAGE CPP #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE RankNTypes #-} module Stream.Reduce (benchmarks) where -import Control.Monad.Catch (MonadCatch) -import Data.Monoid (Sum(Sum), getSum) -import Stream.Common (benchIOSink) -import Streamly.Benchmark.Common (o_1_space_prefix) -import Streamly.Internal.Data.Stream (Stream) +import Control.DeepSeq (NFData(..)) +import Control.Monad.IO.Class (MonadIO(..)) +import Data.Monoid (Sum(..)) +import GHC.Generics (Generic) +import Streamly.Internal.Data.IsMap.HashMap () +import Streamly.Internal.Data.Stream.Serial (SerialT) -import qualified Streamly.Internal.Data.Stream as Stream -import qualified Streamly.Internal.Data.Parser as PR -import qualified Streamly.Internal.Data.Parser.ParserD as ParserD - -import Prelude hiding (length, sum, or, and, any, all, notElem, elem, (!!), - lookup, repeat, minimum, maximum, product, last, mapM_, init) +import qualified Streamly.Internal.Data.Refold.Type as Refold +import qualified Streamly.Internal.Data.Fold as FL +import qualified Stream.Common as Common +#ifdef USE_PRELUDE +import Control.Monad (when) +import Data.Proxy (Proxy(..)) +import Data.HashMap.Strict (HashMap) +import qualified Streamly.Internal.Data.Stream.IsStream as S +import Streamly.Prelude (fromSerial) +import Streamly.Benchmark.Prelude hiding + ( benchIOSrc, sourceUnfoldrM, apDiscardFst, apDiscardSnd, apLiftA2 + , toNullAp, monadThen, toNullM, toNullM3, filterAllInM, filterAllOutM + , filterSome, breakAfterSome, toListM, toListSome, transformMapM + , transformComposeMapM, transformTeeMapM, transformZipMapM) +#else +import Streamly.Benchmark.Prelude (benchIO) +import qualified Streamly.Internal.Data.Stream as S +#endif import Gauge +import Streamly.Benchmark.Common +import Stream.Common +import Prelude hiding (reverse, tail) -import qualified Streamly.Data.Fold as FL -import qualified Streamly.Internal.Data.Refold.Type as Refold -import Control.Monad.IO.Class (MonadIO) +------------------------------------------------------------------------------- +-- Iteration/looping utilities +------------------------------------------------------------------------------- + +{-# INLINE iterateN #-} +iterateN :: (Int -> a -> a) -> a -> Int -> a +iterateN g initial count = f count initial + + where + + f (0 :: Int) x = x + f i x = f (i - 1) (g i x) + +-- Iterate a transformation over a singleton stream +{-# INLINE iterateSingleton #-} +iterateSingleton :: Monad m + => (Int -> SerialT m Int -> SerialT m Int) + -> Int + -> Int + -> SerialT m Int +iterateSingleton g count n = iterateN g (return n) count + +{- +-- XXX need to check why this is slower than the explicit recursion above, even +-- if the above code is written in a foldr like head recursive way. We also +-- need to try this with foldlM' once #150 is fixed. +-- However, it is perhaps best to keep the iteration benchmarks independent of +-- foldrM and any related fusion issues. +{-# INLINE _iterateSingleton #-} +_iterateSingleton :: + Monad m + => (Int -> SerialT m Int -> SerialT m Int) + -> Int + -> Int + -> SerialT m Int +_iterateSingleton g value n = S.foldrM g (return n) $ sourceIntFromTo value n +-} + +-- Apply transformation g count times on a stream of length len +{-# INLINE iterateSource #-} +iterateSource :: + MonadAsync m + => (SerialT m Int -> SerialT m Int) + -> Int + -> Int + -> Int + -> SerialT m Int +iterateSource g count len n = f count (sourceUnfoldrM len n) + + where + + f (0 :: Int) stream = stream + f i stream = f (i - 1) (g stream) + +------------------------------------------------------------------------------- +-- Functor +------------------------------------------------------------------------------- + +o_n_space_functor :: Int -> [Benchmark] +o_n_space_functor value = + [ bgroup "Functor" + [ benchIO "(+) (n times) (baseline)" $ \i0 -> + iterateN (\i acc -> acc >>= \n -> return $ i + n) (return i0) value + , benchIOSrc "(<$) (n times)" $ + iterateSingleton (<$) value + , benchIOSrc "fmap (n times)" $ + iterateSingleton (fmap . (+)) value + {- + , benchIOSrc fromSerial "_(<$) (n times)" $ + _iterateSingleton (<$) value + , benchIOSrc fromSerial "_fmap (n times)" $ + _iterateSingleton (fmap . (+)) value + -} + ] + ] + +------------------------------------------------------------------------------- +-- Grouping transformations +------------------------------------------------------------------------------- + +#ifdef USE_PRELUDE +{-# INLINE groups #-} +groups :: MonadIO m => SerialT m Int -> m () +groups = Common.drain . S.groups FL.drain + +-- XXX Change this test when the order of comparison is later changed +{-# INLINE groupsByGT #-} +groupsByGT :: MonadIO m => SerialT m Int -> m () +groupsByGT = Common.drain . S.groupsBy (>) FL.drain + +{-# INLINE groupsByEq #-} +groupsByEq :: MonadIO m => SerialT m Int -> m () +groupsByEq = Common.drain . S.groupsBy (==) FL.drain + +-- XXX Change this test when the order of comparison is later changed +{-# INLINE groupsByRollingLT #-} +groupsByRollingLT :: MonadIO m => SerialT m Int -> m () +groupsByRollingLT = + Common.drain . S.groupsByRolling (<) FL.drain + +{-# INLINE groupsByRollingEq #-} +groupsByRollingEq :: MonadIO m => SerialT m Int -> m () +groupsByRollingEq = + Common.drain . S.groupsByRolling (==) FL.drain +#endif {-# INLINE foldMany #-} -foldMany :: Monad m => Stream m Int -> m () +foldMany :: Monad m => SerialT m Int -> m () foldMany = - Stream.fold FL.drain + Common.drain . fmap getSum - . Stream.foldMany (FL.take 2 FL.mconcat) + . S.foldMany (FL.take 2 FL.mconcat) . fmap Sum {-# INLINE foldManyPost #-} -foldManyPost :: Monad m => Stream m Int -> m () +foldManyPost :: Monad m => SerialT m Int -> m () foldManyPost = - Stream.fold FL.drain + Common.drain . fmap getSum - . Stream.foldManyPost (FL.take 2 FL.mconcat) + . S.foldManyPost (FL.take 2 FL.mconcat) . fmap Sum {-# INLINE refoldMany #-} -refoldMany :: Monad m => Stream m Int -> m () +refoldMany :: Monad m => SerialT m Int -> m () refoldMany = - Stream.fold FL.drain + Common.drain . fmap getSum - . Stream.refoldMany (Refold.take 2 Refold.sconcat) (return mempty) + . S.refoldMany (Refold.take 2 Refold.sconcat) (return mempty) . fmap Sum {-# INLINE foldIterateM #-} -foldIterateM :: Monad m => Stream m Int -> m () +foldIterateM :: Monad m => SerialT m Int -> m () foldIterateM = - Stream.fold FL.drain + Common.drain . fmap getSum - . Stream.foldIterateM + . S.foldIterateM (return . FL.take 2 . FL.sconcat) (return (Sum 0)) . fmap Sum {-# INLINE refoldIterateM #-} -refoldIterateM :: Monad m => Stream m Int -> m () +refoldIterateM :: Monad m => SerialT m Int -> m () refoldIterateM = - Stream.fold FL.drain + Common.drain . fmap getSum - . Stream.refoldIterateM + . S.refoldIterateM (Refold.take 2 Refold.sconcat) (return (Sum 0)) . fmap Sum -{-# INLINE parseMany #-} -parseMany :: MonadCatch m => Int -> Stream m Int -> m () -parseMany n = - Stream.fold FL.drain - . fmap getSum - . Stream.parseMany (PR.fromFold $ FL.take n FL.mconcat) - . fmap Sum - -{-# INLINE parseManyD #-} -parseManyD :: MonadCatch m => Int -> Stream m Int -> m () -parseManyD n = - Stream.fold FL.drain - . fmap getSum - . Stream.parseManyD (ParserD.fromFold $ FL.take n FL.mconcat) - . fmap Sum - -{-# INLINE parseIterate #-} -parseIterate :: MonadCatch m => Int -> Stream m Int -> m () -parseIterate n = - Stream.fold FL.drain - . fmap getSum - . Stream.parseIterate (\_ -> PR.fromFold $ FL.take n FL.mconcat) 0 - . fmap Sum - -{-# INLINE arraysOf #-} -arraysOf :: (MonadCatch m, MonadIO m) => Int -> Stream m Int -> m () -arraysOf n = - Stream.fold FL.drain - . Stream.arraysOf n - o_1_space_grouping :: Int -> [Benchmark] o_1_space_grouping value = -- Buffering operations using heap proportional to group/window sizes. - [ bgroup "reduce" - [ benchIOSink value "foldMany" foldMany + [ bgroup "grouping" + [ +#ifdef USE_PRELUDE + benchIOSink value "groups" groups + , benchIOSink value "groupsByGT" groupsByGT + , benchIOSink value "groupsByEq" groupsByEq + , benchIOSink value "groupsByRollingLT" groupsByRollingLT + , benchIOSink value "groupsByRollingEq" groupsByRollingEq + , +#endif + -- XXX parseMany/parseIterate benchmarks are in the Parser/ParserD + -- modules we can bring those here. arraysOf benchmarks are in + -- Parser/ParserD/Array.Stream/FileSystem.Handle. + benchIOSink value "foldMany" foldMany , benchIOSink value "foldManyPost" foldManyPost , benchIOSink value "refoldMany" refoldMany , benchIOSink value "foldIterateM" foldIterateM , benchIOSink value "refoldIterateM" refoldIterateM - , benchIOSink value "parseMany" $ parseMany value - , benchIOSink value "parseManyD" $ parseManyD value - , benchIOSink value "parseIterate" $ parseIterate value - , benchIOSink value "arraysOf" $ arraysOf value ] ] +------------------------------------------------------------------------------- +-- Size conserving transformations (reordering, buffering, etc.) +------------------------------------------------------------------------------- + +{-# INLINE reverse #-} +reverse :: MonadIO m => Int -> SerialT m Int -> m () +reverse n = composeN n S.reverse + +{-# INLINE reverse' #-} +reverse' :: MonadIO m => Int -> SerialT m Int -> m () +reverse' n = composeN n S.reverse' + +o_n_heap_buffering :: Int -> [Benchmark] +o_n_heap_buffering value = + [ bgroup "buffered" + [ + -- Reversing a stream + benchIOSink value "reverse" (reverse 1) + , benchIOSink value "reverse'" (reverse' 1) + +#ifdef USE_PRELUDE + , benchIOSink value "mkAsync" (mkAsync fromSerial) +#endif + ] + ] + +------------------------------------------------------------------------------- +-- Grouping/Splitting +------------------------------------------------------------------------------- + +#ifdef USE_PRELUDE +{-# INLINE classifySessionsOf #-} +classifySessionsOf :: MonadAsync m => (Int -> Int) -> SerialT m Int -> m () +classifySessionsOf getKey = + Common.drain + . S.classifySessionsOf + (const (return False)) 3 (FL.take 10 FL.sum) + . S.timestamped + . fmap (\x -> (getKey x, x)) + +{-# INLINE classifySessionsOfHash #-} +classifySessionsOfHash :: MonadAsync m => + (Int -> Int) -> SerialT m Int -> m () +classifySessionsOfHash getKey = + Common.drain + . S.classifySessionsByGeneric + (Proxy :: Proxy (HashMap k)) + 1 False (const (return False)) 3 (FL.take 10 FL.sum) + . S.timestamped + . fmap (\x -> (getKey x, x)) + +o_n_space_grouping :: Int -> [Benchmark] +o_n_space_grouping value = + -- Buffering operations using heap proportional to group/window sizes. + [ bgroup "grouping" + [ benchIOSink value "classifySessionsOf (10000 buckets)" + (classifySessionsOf (getKey 10000)) + , benchIOSink value "classifySessionsOf (64 buckets)" + (classifySessionsOf (getKey 64)) + , benchIOSink value "classifySessionsOfHash (10000 buckets)" + (classifySessionsOfHash (getKey 10000)) + , benchIOSink value "classifySessionsOfHash (64 buckets)" + (classifySessionsOfHash (getKey 64)) + ] + ] + + where + + getKey n = (`mod` n) +#endif + +------------------------------------------------------------------------------- +-- Mixed Transformation +------------------------------------------------------------------------------- + +{-# INLINE scanMap #-} +scanMap :: MonadIO m => Int -> SerialT m Int -> m () +scanMap n = composeN n $ fmap (subtract 1) . Common.scanl' (+) 0 + +{-# INLINE dropMap #-} +dropMap :: MonadIO m => Int -> SerialT m Int -> m () +dropMap n = composeN n $ fmap (subtract 1) . S.drop 1 + +{-# INLINE dropScan #-} +dropScan :: MonadIO m => Int -> SerialT m Int -> m () +dropScan n = composeN n $ Common.scanl' (+) 0 . S.drop 1 + +{-# INLINE takeDrop #-} +takeDrop :: MonadIO m => Int -> Int -> SerialT m Int -> m () +takeDrop value n = composeN n $ S.drop 1 . S.take (value + 1) + +{-# INLINE takeScan #-} +takeScan :: MonadIO m => Int -> Int -> SerialT m Int -> m () +takeScan value n = composeN n $ Common.scanl' (+) 0 . S.take (value + 1) + +{-# INLINE takeMap #-} +takeMap :: MonadIO m => Int -> Int -> SerialT m Int -> m () +takeMap value n = composeN n $ fmap (subtract 1) . S.take (value + 1) + +{-# INLINE filterDrop #-} +filterDrop :: MonadIO m => Int -> Int -> SerialT m Int -> m () +filterDrop value n = composeN n $ S.drop 1 . S.filter (<= (value + 1)) + +{-# INLINE filterTake #-} +filterTake :: MonadIO m => Int -> Int -> SerialT m Int -> m () +filterTake value n = composeN n $ S.take (value + 1) . S.filter (<= (value + 1)) + +{-# INLINE filterScan #-} +filterScan :: MonadIO m => Int -> SerialT m Int -> m () +filterScan n = composeN n $ Common.scanl' (+) 0 . S.filter (<= maxBound) + +#ifdef USE_PRELUDE +{-# INLINE filterScanl1 #-} +filterScanl1 :: MonadIO m => Int -> SerialT m Int -> m () +filterScanl1 n = composeN n $ S.scanl1' (+) . S.filter (<= maxBound) +#endif + +{-# INLINE filterMap #-} +filterMap :: MonadIO m => Int -> Int -> SerialT m Int -> m () +filterMap value n = composeN n $ fmap (subtract 1) . S.filter (<= (value + 1)) + +------------------------------------------------------------------------------- +-- Scan and fold +------------------------------------------------------------------------------- + +data Pair a b = + Pair !a !b + deriving (Generic, NFData) + +{-# INLINE sumProductFold #-} +sumProductFold :: Monad m => SerialT m Int -> m (Int, Int) +sumProductFold = Common.foldl' (\(s, p) x -> (s + x, p * x)) (0, 1) + +{-# INLINE sumProductScan #-} +sumProductScan :: Monad m => SerialT m Int -> m (Pair Int Int) +sumProductScan = + Common.foldl' (\(Pair _ p) (s0, x) -> Pair s0 (p * x)) (Pair 0 1) . + Common.scanl' (\(s, _) x -> (s + x, x)) (0, 0) + +{-# INLINE foldl'ReduceMap #-} +foldl'ReduceMap :: Monad m => SerialT m Int -> m Int +foldl'ReduceMap = fmap (+ 1) . Common.foldl' (+) 0 + +o_1_space_transformations_mixed :: Int -> [Benchmark] +o_1_space_transformations_mixed value = + -- scanl-map and foldl-map are equivalent to the scan and fold in the foldl + -- library. If scan/fold followed by a map is efficient enough we may not + -- need monolithic implementations of these. + [ bgroup "mixed" + [ benchIOSink value "scanl-map" (scanMap 1) + , benchIOSink value "foldl-map" foldl'ReduceMap + , benchIOSink value "sum-product-fold" sumProductFold + , benchIOSink value "sum-product-scan" sumProductScan + ] + ] + +o_1_space_transformations_mixedX4 :: Int -> [Benchmark] +o_1_space_transformations_mixedX4 value = + [ bgroup "mixedX4" + [ benchIOSink value "scan-map" (scanMap 4) + , benchIOSink value "drop-map" (dropMap 4) + , benchIOSink value "drop-scan" (dropScan 4) + , benchIOSink value "take-drop" (takeDrop value 4) + , benchIOSink value "take-scan" (takeScan value 4) + , benchIOSink value "take-map" (takeMap value 4) + , benchIOSink value "filter-drop" (filterDrop value 4) + , benchIOSink value "filter-take" (filterTake value 4) + , benchIOSink value "filter-scan" (filterScan 4) +#ifdef USE_PRELUDE + , benchIOSink value "filter-scanl1" (filterScanl1 4) +#endif + , benchIOSink value "filter-map" (filterMap value 4) + ] + ] + +------------------------------------------------------------------------------- +-- Iterating a transformation over and over again +------------------------------------------------------------------------------- + +-- this is quadratic +{-# INLINE iterateScan #-} +iterateScan :: MonadAsync m => Int -> Int -> Int -> SerialT m Int +iterateScan = iterateSource (Common.scanl' (+) 0) + +#ifdef USE_PRELUDE +-- this is quadratic +{-# INLINE iterateScanl1 #-} +iterateScanl1 :: MonadAsync m => Int -> Int -> Int -> SerialT m Int +iterateScanl1 = iterateSource (S.scanl1' (+)) +#endif + +{-# INLINE iterateMapM #-} +iterateMapM :: MonadAsync m => Int -> Int -> Int -> SerialT m Int +iterateMapM = iterateSource (S.mapM return) + +{-# INLINE iterateFilterEven #-} +iterateFilterEven :: MonadAsync m => Int -> Int -> Int -> SerialT m Int +iterateFilterEven = iterateSource (S.filter even) + +{-# INLINE iterateTakeAll #-} +iterateTakeAll :: MonadAsync m => Int -> Int -> Int -> Int -> SerialT m Int +iterateTakeAll value = iterateSource (S.take (value + 1)) + +{-# INLINE iterateDropOne #-} +iterateDropOne :: MonadAsync m => Int -> Int -> Int -> SerialT m Int +iterateDropOne = iterateSource (S.drop 1) + +{-# INLINE iterateDropWhileFalse #-} +iterateDropWhileFalse :: MonadAsync m + => Int -> Int -> Int -> Int -> SerialT m Int +iterateDropWhileFalse value = iterateSource (S.dropWhile (> (value + 1))) + +{-# INLINE iterateDropWhileTrue #-} +iterateDropWhileTrue :: MonadAsync m + => Int -> Int -> Int -> Int -> SerialT m Int +iterateDropWhileTrue value = iterateSource (S.dropWhile (<= (value + 1))) + +#ifdef USE_PRELUDE +{-# INLINE tail #-} +tail :: Monad m => SerialT m a -> m () +tail s = S.tail s >>= mapM_ tail + +{-# INLINE nullHeadTail #-} +nullHeadTail :: Monad m => SerialT m Int -> m () +nullHeadTail s = do + r <- S.null s + when (not r) $ do + _ <- S.head s + S.tail s >>= mapM_ nullHeadTail +#endif + +-- Head recursive operations. +o_n_stack_iterated :: Int -> [Benchmark] +o_n_stack_iterated value = by10 `seq` by100 `seq` + [ bgroup "iterated" + [ benchIOSrc "mapM (n/10 x 10)" $ iterateMapM by10 10 + , benchIOSrc "scanl' (quadratic) (n/100 x 100)" $ + iterateScan by100 100 +#ifdef USE_PRELUDE + , benchIOSrc "scanl1' (n/10 x 10)" $ iterateScanl1 by10 10 +#endif + , benchIOSrc "filterEven (n/10 x 10)" $ + iterateFilterEven by10 10 + , benchIOSrc "takeAll (n/10 x 10)" $ + iterateTakeAll value by10 10 + , benchIOSrc "dropOne (n/10 x 10)" $ iterateDropOne by10 10 + , benchIOSrc "dropWhileFalse (n/10 x 10)" $ + iterateDropWhileFalse value by10 10 + , benchIOSrc "dropWhileTrue (n/10 x 10)" $ + iterateDropWhileTrue value by10 10 +#ifdef USE_PRELUDE + , benchIOSink value "tail" tail + , benchIOSink value "nullHeadTail" nullHeadTail +#endif + ] + ] + + where + + by10 = value `div` 10 + by100 = value `div` 100 + +------------------------------------------------------------------------------- +-- Pipes +------------------------------------------------------------------------------- + +o_1_space_pipes :: Int -> [Benchmark] +o_1_space_pipes value = + [ bgroup "pipes" + [ benchIOSink value "mapM" (transformMapM 1) + , benchIOSink value "compose" (transformComposeMapM 1) + , benchIOSink value "tee" (transformTeeMapM 1) +#ifdef DEVBUILD + -- XXX this take 1 GB memory to compile + , benchIOSink value "zip" (transformZipMapM 1) +#endif + ] + ] + +o_1_space_pipesX4 :: Int -> [Benchmark] +o_1_space_pipesX4 value = + [ bgroup "pipesX4" + [ benchIOSink value "mapM" (transformMapM 4) + , benchIOSink value "compose" (transformComposeMapM 4) + , benchIOSink value "tee" (transformTeeMapM 4) +#ifdef DEVBUILD + -- XXX this take 1 GB memory to compile + , benchIOSink value "zip" (transformZipMapM 4) +#endif + ] + ] + +------------------------------------------------------------------------------- +-- Main +------------------------------------------------------------------------------- + +-- In addition to gauge options, the number of elements in the stream can be +-- passed using the --stream-size option. +-- benchmarks :: String -> Int -> [Benchmark] benchmarks moduleName size = - [ bgroup (o_1_space_prefix moduleName) $ o_1_space_grouping size - ] + [ bgroup (o_1_space_prefix moduleName) $ Prelude.concat + [ o_1_space_grouping size + , o_1_space_transformations_mixed size + , o_1_space_transformations_mixedX4 size + + -- pipes + , o_1_space_pipes size + , o_1_space_pipesX4 size + ] + , bgroup (o_n_stack_prefix moduleName) (o_n_stack_iterated size) + , bgroup (o_n_heap_prefix moduleName) $ Prelude.concat + [ +#ifdef USE_PRELUDE + o_n_space_grouping size + , +#endif + o_n_space_functor size + , o_n_heap_buffering size + ] + ] diff --git a/benchmark/Streamly/Benchmark/Data/Stream/Transformation.hs b/benchmark/Streamly/Benchmark/Data/Stream/Transform.hs similarity index 72% rename from benchmark/Streamly/Benchmark/Data/Stream/Transformation.hs rename to benchmark/Streamly/Benchmark/Data/Stream/Transform.hs index fa5909965..8a1574792 100644 --- a/benchmark/Streamly/Benchmark/Data/Stream/Transformation.hs +++ b/benchmark/Streamly/Benchmark/Data/Stream/Transform.hs @@ -1,5 +1,5 @@ -- | --- Module : Stream.Transformation +-- Module : Stream.Transform -- Copyright : (c) 2018 Composewell Technologies -- License : BSD-3-Clause -- Maintainer : streamly@composewell.com @@ -18,36 +18,37 @@ {-# OPTIONS_GHC -fplugin Test.Inspection.Plugin #-} #endif -module Stream.Transformation (benchmarks) where +module Stream.Transform (benchmarks) where +import Control.DeepSeq (NFData(..)) import Control.Monad.IO.Class (MonadIO(..)) +import Data.Functor.Identity (Identity) import System.Random (randomRIO) import qualified Streamly.Internal.Data.Fold as FL -#ifdef USE_PRELUDE -import Streamly.Prelude (fromSerial, MonadAsync) -import Streamly.Benchmark.Prelude -import Streamly.Internal.Data.Time.Units -import qualified Streamly.Benchmark.Prelude as BP -import qualified Streamly.Prelude as S -import qualified Streamly.Internal.Data.Stream.IsStream as Internal -import qualified Streamly.Internal.Data.Unfold as Unfold -#else -import Control.DeepSeq (NFData(..)) -import Data.Functor.Identity (Identity) -import Stream.Common -import qualified Streamly.Internal.Data.Stream as Stream -import qualified Streamly.Internal.Data.Stream as S -import qualified Streamly.Internal.Data.Stream as Internal import qualified Prelude +import qualified Stream.Common as Common +import qualified Streamly.Internal.Data.Unfold as Unfold +#ifdef USE_PRELUDE +import Streamly.Benchmark.Prelude hiding + ( benchIOSrc, sourceUnfoldrM, apDiscardFst, apDiscardSnd, apLiftA2, toNullAp + , monadThen, toNullM, toNullM3, filterAllInM, filterAllOutM, filterSome + , breakAfterSome, toListM, toListSome, transformMapM, transformComposeMapM + , transformTeeMapM, transformZipMapM, mapN, mapM) +import qualified Streamly.Internal.Data.Stream.IsStream as Stream +import Streamly.Internal.Data.Time.Units +#else +import Streamly.Benchmark.Prelude (benchIO) +import qualified Streamly.Internal.Data.Stream as Stream #endif import Gauge import Streamly.Internal.Data.Stream.Serial (SerialT) +import Stream.Common hiding (scanl') import Streamly.Benchmark.Common -import Prelude hiding (sequence, mapM, fmap) +import Prelude hiding (sequence, mapM) ------------------------------------------------------------------------------- -- Pipelines (stream-to-stream transformations) @@ -60,7 +61,7 @@ import Prelude hiding (sequence, mapM, fmap) ------------------------------------------------------------------------------- -- Traversable Instance ------------------------------------------------------------------------------- -#ifndef USE_PRELUDE + {-# INLINE traversableTraverse #-} traversableTraverse :: SerialT Identity Int -> IO (SerialT Identity Int) traversableTraverse = traverse return @@ -95,111 +96,89 @@ o_n_space_traversable value = , benchPureSinkIO value "sequence" traversableSequence ] ] -#endif -#ifdef USE_PRELUDE -{-# INLINE composeNG #-} -composeNG :: - (S.IsStream t, Monad m) - => Int - -> (t m Int -> S.SerialT m Int) - -> t m Int - -> m () -composeNG = BP.composeN -#else -{-# INLINE composeNG #-} -composeNG :: - (Monad m) - => Int - -> (SerialT m Int -> SerialT m Int) - -> SerialT m Int - -> m () -composeNG _n = return $ Stream.fold FL.drain -#endif ------------------------------------------------------------------------------- -- maps and scans ------------------------------------------------------------------------------- - -{-# INLINE scan #-} -scan :: MonadIO m => Int -> SerialT m Int -> m () -scan n = composeNG n $ S.scan FL.sum - -{-# INLINE tap #-} -tap :: MonadIO m => Int -> SerialT m Int -> m () -tap n = composeNG n $ S.tap FL.sum - #ifdef USE_PRELUDE {-# INLINE scanl' #-} scanl' :: MonadIO m => Int -> SerialT m Int -> m () -scanl' n = composeNG n $ S.scanl' (+) 0 +scanl' n = composeN n $ Stream.scanl' (+) 0 {-# INLINE scanlM' #-} scanlM' :: MonadIO m => Int -> SerialT m Int -> m () -scanlM' n = composeNG n $ S.scanlM' (\b a -> return $ b + a) (return 0) +scanlM' n = composeN n $ Stream.scanlM' (\b a -> return $ b + a) (return 0) {-# INLINE scanl1' #-} scanl1' :: MonadIO m => Int -> SerialT m Int -> m () -scanl1' n = composeNG n $ S.scanl1' (+) +scanl1' n = composeN n $ Stream.scanl1' (+) {-# INLINE scanl1M' #-} scanl1M' :: MonadIO m => Int -> SerialT m Int -> m () -scanl1M' n = composeNG n $ S.scanl1M' (\b a -> return $ b + a) +scanl1M' n = composeN n $ Stream.scanl1M' (\b a -> return $ b + a) +#endif +{-# INLINE scan #-} +scan :: MonadIO m => Int -> SerialT m Int -> m () +scan n = composeN n $ Stream.scan FL.sum + +#ifdef USE_PRELUDE {-# INLINE postscanl' #-} postscanl' :: MonadIO m => Int -> SerialT m Int -> m () -postscanl' n = composeNG n $ S.postscanl' (+) 0 +postscanl' n = composeN n $ Stream.postscanl' (+) 0 {-# INLINE postscanlM' #-} postscanlM' :: MonadIO m => Int -> SerialT m Int -> m () -postscanlM' n = composeNG n $ S.postscanlM' (\b a -> return $ b + a) (return 0) - +postscanlM' n = composeN n $ Stream.postscanlM' (\b a -> return $ b + a) (return 0) +#endif {-# INLINE postscan #-} postscan :: MonadIO m => Int -> SerialT m Int -> m () -postscan n = composeNG n $ S.postscan FL.sum +postscan n = composeN n $ Stream.postscan FL.sum {-# INLINE sequence #-} -sequence :: - (S.IsStream t, S.MonadAsync m) - => (t m Int -> S.SerialT m Int) - -> t m (m Int) - -> m () -sequence t = S.drain . t . S.sequence +sequence :: MonadAsync m => SerialT m (m Int) -> m () +sequence = Common.drain . Stream.sequence +{-# INLINE tap #-} +tap :: MonadIO m => Int -> SerialT m Int -> m () +tap n = composeN n $ Stream.tap FL.sum + +#ifdef USE_PRELUDE {-# INLINE pollCounts #-} pollCounts :: Int -> SerialT IO Int -> IO () pollCounts n = - composeN n (Internal.pollCounts (const True) f) + composeN n (Stream.pollCounts (const True) f) where - f = S.drain . Internal.rollingMap2 (-) . Internal.delayPost 1 + f = Stream.drain . Stream.rollingMap2 (-) . Stream.delayPost 1 {-# INLINE timestamped #-} -timestamped :: (S.MonadAsync m) => SerialT m Int -> m () -timestamped = S.drain . Internal.timestamped - -{-# INLINE trace #-} -trace :: MonadAsync m => Int -> SerialT m Int -> m () -trace n = composeNG n $ Internal.trace return +timestamped :: (MonadAsync m) => SerialT m Int -> m () +timestamped = Stream.drain . Stream.timestamped #endif {-# INLINE foldrS #-} foldrS :: MonadIO m => Int -> SerialT m Int -> m () -foldrS n = composeNG n $ Internal.foldrS S.cons S.nil +foldrS n = composeN n $ Stream.foldrS Stream.cons Stream.nil {-# INLINE foldrSMap #-} foldrSMap :: MonadIO m => Int -> SerialT m Int -> m () -foldrSMap n = composeNG n $ Internal.foldrS (\x xs -> x + 1 `S.cons` xs) S.nil +foldrSMap n = composeN n $ Stream.foldrS (\x xs -> x + 1 `Stream.cons` xs) Stream.nil {-# INLINE foldrT #-} foldrT :: MonadIO m => Int -> SerialT m Int -> m () -foldrT n = composeNG n $ Internal.foldrT S.cons S.nil +foldrT n = composeN n $ Stream.foldrT Stream.cons Stream.nil {-# INLINE foldrTMap #-} foldrTMap :: MonadIO m => Int -> SerialT m Int -> m () -foldrTMap n = composeNG n $ Internal.foldrT (\x xs -> x + 1 `S.cons` xs) S.nil +foldrTMap n = composeN n $ Stream.foldrT (\x xs -> x + 1 `Stream.cons` xs) Stream.nil + +{-# INLINE trace #-} +trace :: MonadAsync m => Int -> SerialT m Int -> m () +trace n = composeN n $ Stream.trace return o_1_space_mapping :: Int -> [Benchmark] o_1_space_mapping value = @@ -211,13 +190,14 @@ o_1_space_mapping value = , benchIOSink value "foldrSMap" (foldrSMap 1) , benchIOSink value "foldrT" (foldrT 1) , benchIOSink value "foldrTMap" (foldrTMap 1) -#ifdef USE_PRELUDE - -- Mapping - , benchIOSink value "map" (mapN fromSerial 1) - , bench "sequence" $ nfIO $ randomRIO (1, 1000) >>= \n -> - sequence fromSerial (sourceUnfoldrAction value n) - , benchIOSink value "mapM" (mapM fromSerial 1) + -- Mapping + , benchIOSink value "map" (mapN 1) + , bench "sequence" $ nfIO $ randomRIO (1, 1000) >>= \n -> + sequence (sourceUnfoldrAction value n) + , benchIOSink value "mapM" (mapM 1) + , benchIOSink value "tap" (tap 1) +#ifdef USE_PRELUDE , benchIOSink value "pollCounts 1 second" (pollCounts 1) , benchIOSink value "timestamped" timestamped @@ -228,48 +208,46 @@ o_1_space_mapping value = , benchIOSink value "scanl1M'" (scanl1M' 1) , benchIOSink value "postscanl'" (postscanl' 1) , benchIOSink value "postscanlM'" (postscanlM' 1) - , benchIOSink value "postscan" (postscan 1) #endif , benchIOSink value "scan" (scan 1) - , benchIOSink value "tap" (tap 1) + , benchIOSink value "postscan" (postscan 1) ] ] - -#ifdef USE_PRELUDE o_1_space_mappingX4 :: Int -> [Benchmark] o_1_space_mappingX4 value = [ bgroup "mappingX4" - [ benchIOSink value "map" (mapN fromSerial 4) - , benchIOSink value "mapM" (mapM fromSerial 4) + [ benchIOSink value "map" (mapN 4) + , benchIOSink value "mapM" (mapM 4) , benchIOSink value "trace" (trace 4) +#ifdef USE_PRELUDE , benchIOSink value "scanl'" (scanl' 4) , benchIOSink value "scanl1'" (scanl1' 4) , benchIOSink value "scanlM'" (scanlM' 4) , benchIOSink value "scanl1M'" (scanl1M' 4) , benchIOSink value "postscanl'" (postscanl' 4) , benchIOSink value "postscanlM'" (postscanlM' 4) +#endif ] ] - {-# INLINE sieveScan #-} sieveScan :: Monad m => SerialT m Int -> SerialT m Int sieveScan = - S.mapMaybe snd - . S.scanlM' (\(primes, _) n -> do + Stream.mapMaybe snd + . Stream.scan (FL.foldlM' (\(primes, _) n -> do return $ let ps = takeWhile (\p -> p * p <= n) primes in if all (\p -> n `mod` p /= 0) ps then (primes ++ [n], Just n) - else (primes, Nothing)) (return ([2], Just 2)) + else (primes, Nothing)) (return ([2], Just 2))) o_n_space_mapping :: Int -> [Benchmark] o_n_space_mapping value = [ bgroup "mapping" [ benchIO "naive prime sieve" - (\n -> S.sum $ sieveScan $ S.enumerateFromTo 2 (value + n)) + (\n -> Stream.fold FL.sum $ sieveScan $ Common.enumerateFromTo 2 (value + n)) ] ] @@ -280,64 +258,59 @@ o_n_space_mapping value = o_1_space_functor :: Int -> [Benchmark] o_1_space_functor value = [ bgroup "Functor" - [ benchIOSink value "fmap" (fmapN fromSerial 1) - , benchIOSink value "fmap x 4" (fmapN fromSerial 4) + [ benchIOSink value "fmap" (mapN 1) + , benchIOSink value "fmap x 4" (mapN 4) ] ] -#else -{-# INLINE foldFilterEven #-} -foldFilterEven :: MonadIO m => SerialT m Int -> m () -foldFilterEven = Stream.fold FL.drain . Stream.foldFilter (FL.satisfy even) -#endif + ------------------------------------------------------------------------------- -- Size reducing transformations (filtering) ------------------------------------------------------------------------------- {-# INLINE filterEven #-} filterEven :: MonadIO m => Int -> SerialT m Int -> m () -filterEven n = composeNG n $ S.filter even +filterEven n = composeN n $ Stream.filter even {-# INLINE filterAllOut #-} filterAllOut :: MonadIO m => Int -> Int -> SerialT m Int -> m () -filterAllOut value n = composeNG n $ S.filter (> (value + 1)) +filterAllOut value n = composeN n $ Stream.filter (> (value + 1)) {-# INLINE filterAllIn #-} filterAllIn :: MonadIO m => Int -> Int -> SerialT m Int -> m () -filterAllIn value n = composeNG n $ S.filter (<= (value + 1)) +filterAllIn value n = composeN n $ Stream.filter (<= (value + 1)) {-# INLINE filterMEven #-} filterMEven :: MonadIO m => Int -> SerialT m Int -> m () -filterMEven n = composeNG n $ S.filterM (return . even) +filterMEven n = composeN n $ Stream.filterM (return . even) {-# INLINE filterMAllOut #-} filterMAllOut :: MonadIO m => Int -> Int -> SerialT m Int -> m () -filterMAllOut value n = composeNG n $ S.filterM (\x -> return $ x > (value + 1)) +filterMAllOut value n = composeN n $ Stream.filterM (\x -> return $ x > (value + 1)) {-# INLINE filterMAllIn #-} filterMAllIn :: MonadIO m => Int -> Int -> SerialT m Int -> m () -filterMAllIn value n = composeNG n $ S.filterM (\x -> return $ x <= (value + 1)) +filterMAllIn value n = composeN n $ Stream.filterM (\x -> return $ x <= (value + 1)) {-# INLINE _takeOne #-} _takeOne :: MonadIO m => Int -> SerialT m Int -> m () -_takeOne n = composeNG n $ S.take 1 +_takeOne n = composeN n $ Stream.take 1 {-# INLINE takeAll #-} takeAll :: MonadIO m => Int -> Int -> SerialT m Int -> m () -takeAll value n = composeNG n $ S.take (value + 1) +takeAll value n = composeN n $ Stream.take (value + 1) {-# INLINE takeWhileTrue #-} takeWhileTrue :: MonadIO m => Int -> Int -> SerialT m Int -> m () -takeWhileTrue value n = composeNG n $ S.takeWhile (<= (value + 1)) +takeWhileTrue value n = composeN n $ Stream.takeWhile (<= (value + 1)) {-# INLINE takeWhileMTrue #-} takeWhileMTrue :: MonadIO m => Int -> Int -> SerialT m Int -> m () -takeWhileMTrue value n = composeNG n $ S.takeWhileM (return . (<= (value + 1))) +takeWhileMTrue value n = composeN n $ Stream.takeWhileM (return . (<= (value + 1))) #ifdef USE_PRELUDE {-# INLINE takeInterval #-} takeInterval :: NanoSecond64 -> Int -> SerialT IO Int -> IO () -takeInterval i n = composeN n (Internal.takeInterval i) - +takeInterval i n = composeN n (Stream.takeInterval i) #ifdef INSPECTION -- inspect $ hasNoType 'takeInterval ''SPEC @@ -348,33 +321,33 @@ inspect $ hasNoTypeClasses 'takeInterval {-# INLINE dropOne #-} dropOne :: MonadIO m => Int -> SerialT m Int -> m () -dropOne n = composeNG n $ S.drop 1 +dropOne n = composeN n $ Stream.drop 1 {-# INLINE dropAll #-} dropAll :: MonadIO m => Int -> Int -> SerialT m Int -> m () -dropAll value n = composeNG n $ S.drop (value + 1) +dropAll value n = composeN n $ Stream.drop (value + 1) {-# INLINE dropWhileTrue #-} dropWhileTrue :: MonadIO m => Int -> Int -> SerialT m Int -> m () -dropWhileTrue value n = composeNG n $ S.dropWhile (<= (value + 1)) +dropWhileTrue value n = composeN n $ Stream.dropWhile (<= (value + 1)) {-# INLINE dropWhileMTrue #-} dropWhileMTrue :: MonadIO m => Int -> Int -> SerialT m Int -> m () -dropWhileMTrue value n = composeNG n $ S.dropWhileM (return . (<= (value + 1))) +dropWhileMTrue value n = composeN n $ Stream.dropWhileM (return . (<= (value + 1))) {-# INLINE dropWhileFalse #-} dropWhileFalse :: MonadIO m => Int -> Int -> SerialT m Int -> m () -dropWhileFalse value n = composeNG n $ S.dropWhile (> (value + 1)) +dropWhileFalse value n = composeN n $ Stream.dropWhile (> (value + 1)) #ifdef USE_PRELUDE -- XXX Decide on the time interval {-# INLINE _intervalsOfSum #-} _intervalsOfSum :: MonadAsync m => Double -> Int -> SerialT m Int -> m () -_intervalsOfSum i n = composeN n (S.intervalsOf i FL.sum) +_intervalsOfSum i n = composeN n (Stream.intervalsOf i FL.sum) {-# INLINE dropInterval #-} dropInterval :: NanoSecond64 -> Int -> SerialT IO Int -> IO () -dropInterval i n = composeN n (Internal.dropInterval i) +dropInterval i n = composeN n (Stream.dropInterval i) #ifdef INSPECTION inspect $ hasNoTypeClasses 'dropInterval @@ -384,42 +357,40 @@ inspect $ hasNoTypeClasses 'dropInterval {-# INLINE findIndices #-} findIndices :: MonadIO m => Int -> Int -> SerialT m Int -> m () -findIndices value n = composeNG n $ S.findIndices (== (value + 1)) +findIndices value n = composeN n $ Stream.findIndices (== (value + 1)) {-# INLINE elemIndices #-} elemIndices :: MonadIO m => Int -> Int -> SerialT m Int -> m () -elemIndices value n = composeNG n $ S.elemIndices (value + 1) +elemIndices value n = composeN n $ Stream.elemIndices (value + 1) {-# INLINE deleteBy #-} deleteBy :: MonadIO m => Int -> Int -> SerialT m Int -> m () -deleteBy value n = composeNG n $ S.deleteBy (>=) (value + 1) +deleteBy value n = composeN n $ Stream.deleteBy (>=) (value + 1) -- uniq . uniq == uniq, composeN 2 ~ composeN 1 {-# INLINE uniq #-} uniq :: MonadIO m => Int -> SerialT m Int -> m () -uniq n = composeNG n S.uniq +uniq n = composeN n Stream.uniq -#ifdef USE_PRELUDE {-# INLINE mapMaybe #-} mapMaybe :: MonadIO m => Int -> SerialT m Int -> m () mapMaybe n = - composeNG n $ - S.mapMaybe + composeN n $ + Stream.mapMaybe (\x -> if odd x then Nothing else Just x) {-# INLINE mapMaybeM #-} -mapMaybeM :: S.MonadAsync m => Int -> SerialT m Int -> m () +mapMaybeM :: MonadAsync m => Int -> SerialT m Int -> m () mapMaybeM n = - composeNG n $ - S.mapMaybeM + composeN n $ + Stream.mapMaybeM (\x -> if odd x then return Nothing else return $ Just x) -#endif o_1_space_filtering :: Int -> [Benchmark] o_1_space_filtering value = @@ -431,9 +402,6 @@ o_1_space_filtering value = , benchIOSink value "filterM-even" (filterMEven 1) , benchIOSink value "filterM-all-out" (filterMAllOut value 1) , benchIOSink value "filterM-all-in" (filterMAllIn value 1) -#ifndef USE_PRELUDE - , benchIOSink value "foldFilter-even" foldFilterEven -#endif -- Trimming , benchIOSink value "take-all" (takeAll value 1) @@ -460,18 +428,17 @@ o_1_space_filtering value = , benchIOSink value "deleteBy" (deleteBy value 1) , benchIOSink value "uniq" (uniq 1) -#ifdef USE_PRELUDE + -- Map and filter , benchIOSink value "mapMaybe" (mapMaybe 1) , benchIOSink value "mapMaybeM" (mapMaybeM 1) -#endif + -- Searching (stateful map and filter) , benchIOSink value "findIndices" (findIndices value 1) , benchIOSink value "elemIndices" (elemIndices value 1) ] ] - o_1_space_filteringX4 :: Int -> [Benchmark] o_1_space_filteringX4 value = [ bgroup "filteringX4" @@ -483,8 +450,6 @@ o_1_space_filteringX4 value = , benchIOSink value "filterM-all-out" (filterMAllOut value 4) , benchIOSink value "filterM-all-in" (filterMAllIn value 4) - --, benchIOSink value "foldFilter-even" (foldFilterEven 4) - -- trimming , benchIOSink value "take-all" (takeAll value 4) , benchIOSink value "takeWhile-true" (takeWhileTrue value 4) @@ -501,11 +466,10 @@ o_1_space_filteringX4 value = , benchIOSink value "uniq" (uniq 4) -#ifdef USE_PRELUDE -- map and filter , benchIOSink value "mapMaybe" (mapMaybe 4) , benchIOSink value "mapMaybeM" (mapMaybeM 4) -#endif + -- searching , benchIOSink value "findIndices" (findIndices value 4) , benchIOSink value "elemIndices" (elemIndices value 4) @@ -515,29 +479,28 @@ o_1_space_filteringX4 value = ------------------------------------------------------------------------------- -- Size increasing transformations (insertions) ------------------------------------------------------------------------------- -#ifdef USE_PRELUDE + {-# INLINE intersperse #-} -intersperse :: S.MonadAsync m => Int -> Int -> SerialT m Int -> m () -intersperse value n = composeNG n $ S.intersperse (value + 1) +intersperse :: MonadAsync m => Int -> Int -> SerialT m Int -> m () +intersperse value n = composeN n $ Stream.intersperse (value + 1) {-# INLINE intersperseM #-} -intersperseM :: S.MonadAsync m => Int -> Int -> SerialT m Int -> m () -intersperseM value n = composeNG n $ S.intersperseM (return $ value + 1) - -{-# INLINE interposeSuffix #-} -interposeSuffix :: S.MonadAsync m => Int -> Int -> SerialT m Int -> m () -interposeSuffix value n = - composeNG n $ Internal.interposeSuffix (value + 1) Unfold.identity - -{-# INLINE intercalateSuffix #-} -intercalateSuffix :: S.MonadAsync m => Int -> Int -> SerialT m Int -> m () -intercalateSuffix value n = - composeNG n $ Internal.intercalateSuffix Unfold.identity (value + 1) +intersperseM :: MonadAsync m => Int -> Int -> SerialT m Int -> m () +intersperseM value n = composeN n $ Stream.intersperseM (return $ value + 1) {-# INLINE insertBy #-} insertBy :: MonadIO m => Int -> Int -> SerialT m Int -> m () -insertBy value n = composeNG n $ S.insertBy compare (value + 1) +insertBy value n = composeN n $ Stream.insertBy compare (value + 1) +{-# INLINE interposeSuffix #-} +interposeSuffix :: Monad m => Int -> Int -> SerialT m Int -> m () +interposeSuffix value n = + composeN n $ Stream.interposeSuffix (value + 1) Unfold.identity + +{-# INLINE intercalateSuffix #-} +intercalateSuffix :: Monad m => Int -> Int -> SerialT m Int -> m () +intercalateSuffix value n = + composeN n $ Stream.intercalateSuffix Unfold.identity (value + 1) o_1_space_inserting :: Int -> [Benchmark] o_1_space_inserting value = @@ -554,21 +517,21 @@ o_1_space_insertingX4 :: Int -> [Benchmark] o_1_space_insertingX4 value = [ bgroup "insertingX4" [ benchIOSink value "intersperse" (intersperse value 4) - -- , benchIOSink value "insertBy" (insertBy value 4) + , benchIOSink value "insertBy" (insertBy value 4) ] ] -#endif + ------------------------------------------------------------------------------- -- Indexing ------------------------------------------------------------------------------- -#ifdef USE_PRELUDE + {-# INLINE indexed #-} indexed :: MonadIO m => Int -> SerialT m Int -> m () -indexed n = composeN n (S.map snd . S.indexed) +indexed n = composeN n (fmap snd . Stream.indexed) {-# INLINE indexedR #-} indexedR :: MonadIO m => Int -> Int -> SerialT m Int -> m () -indexedR value n = composeN n (S.map snd . S.indexedR value) +indexedR value n = composeN n (fmap snd . Stream.indexedR value) o_1_space_indexing :: Int -> [Benchmark] o_1_space_indexing value = @@ -586,24 +549,6 @@ o_1_space_indexingX4 value = ] ] -#else -{-# INLINE indexed #-} -indexed :: MonadIO m => SerialT m Int -> m () -indexed = Stream.fold FL.drain . Prelude.fmap snd . Stream.indexed - -{-# INLINE indexedR #-} -indexedR :: MonadIO m => Int -> SerialT m Int -> m () -indexedR value = Stream.fold FL.drain . (Prelude.fmap snd . Stream.indexedR value) - -o_1_space_indexing :: Int -> [Benchmark] -o_1_space_indexing value = - [ bgroup "indexing" - [ benchIOSink value "indexed" indexed - , benchIOSink value "indexedR" (indexedR value) - ] - ] -#endif - ------------------------------------------------------------------------------- -- Main ------------------------------------------------------------------------------- @@ -614,23 +559,18 @@ o_1_space_indexing value = benchmarks :: String -> Int -> [Benchmark] benchmarks moduleName size = [ bgroup (o_1_space_prefix moduleName) $ Prelude.concat - [ o_1_space_mapping size - , o_1_space_indexing size + [ o_1_space_functor size + , o_1_space_mapping size + , o_1_space_mappingX4 size , o_1_space_filtering size , o_1_space_filteringX4 size -#ifdef USE_PRELUDE - , o_1_space_functor size - , o_1_space_mappingX4 size , o_1_space_inserting size , o_1_space_insertingX4 size + , o_1_space_indexing size , o_1_space_indexingX4 size -#endif ] - - , bgroup (o_n_space_prefix moduleName) $ -#ifdef USE_PRELUDE - o_n_space_mapping size -#else - o_n_space_traversable size -#endif + , bgroup (o_n_space_prefix moduleName) $ Prelude.concat + [ o_n_space_traversable size + , o_n_space_mapping size + ] ] diff --git a/benchmark/streamly-benchmarks.cabal b/benchmark/streamly-benchmarks.cabal index 866025127..50ba480e0 100644 --- a/benchmark/streamly-benchmarks.cabal +++ b/benchmark/streamly-benchmarks.cabal @@ -65,9 +65,10 @@ flag bench-core default: False flag use-prelude - description: Use Streamly.Prelude instead of Streamly.Data.Stream for serial benchmarks + description: Use Prelude instead of Data.Stream for serial benchmarks manual: True default: False + ------------------------------------------------------------------------------- -- Common stanzas ------------------------------------------------------------------------------- @@ -208,6 +209,33 @@ common bench-options-threaded -- Serial Streams ------------------------------------------------------------------------------- +benchmark Data.Stream + import: bench-options + type: exitcode-stdio-1.0 + hs-source-dirs: Streamly/Benchmark/Data + main-is: Stream.hs + other-modules: + Stream.Generate + Stream.Eliminate + Stream.Transform + Stream.Reduce + Stream.Expand + Stream.Exceptions + Stream.Lift + Stream.Common + if flag(use-prelude) + other-modules: + Stream.Split + if flag(bench-core) || impl(ghcjs) + buildable: False + else + buildable: True + if flag(limit-build-mem) + if flag(dev) + ghc-options: +RTS -M3500M -RTS + else + ghc-options: +RTS -M2500M -RTS + benchmark Prelude.WSerial import: bench-options type: exitcode-stdio-1.0 @@ -387,37 +415,6 @@ benchmark Data.Parser ------------------------------------------------------------------------------- -- Raw Streams ------------------------------------------------------------------------------- -benchmark Data.Stream - import: bench-options - type: exitcode-stdio-1.0 - hs-source-dirs: Streamly/Benchmark/Data - main-is: Stream.hs - other-modules: - Stream.Eliminate - Stream.Exceptions - Stream.Generate - Stream.Lift - Stream.Transformation - if flag(use-prelude) - other-modules: - Stream.NestedFold - Stream.NestedStream - Stream.Split - else - other-modules: - Stream.Common - Stream.Reduce - - if flag(bench-core) || impl(ghcjs) - buildable: False - else - buildable: True - build-depends: exceptions >= 0.8 && < 0.11 - if flag(limit-build-mem) - if flag(dev) - ghc-options: +RTS -M3500M -RTS - else - ghc-options: +RTS -M2000M -RTS benchmark Data.Stream.StreamD import: bench-options diff --git a/hie.yaml b/hie.yaml index c073c6870..fb13a46f7 100644 --- a/hie.yaml +++ b/hie.yaml @@ -30,7 +30,15 @@ cradle: component: "bench:Data.Stream.StreamD" - path: "./benchmark/Streamly/Benchmark/Data/Stream/StreamK.hs" component: "bench:Data.Stream.StreamK" - - path: "./benchmark/Streamly/Benchmark/Data/Stream/*.hs" + - path: "./benchmark/Streamly/Benchmark/Data/Stream/Common.hs" + component: "bench:Data.Stream" + - path: "./benchmark/Streamly/Benchmark/Data/Stream/Expand.hs" + component: "bench:Data.Stream" + - path: "./benchmark/Streamly/Benchmark/Data/Stream/Generate.hs" + component: "bench:Data.Stream" + - path: "./benchmark/Streamly/Benchmark/Data/Stream/Reduce.hs" + component: "bench:Data.Stream" + - path: "./benchmark/Streamly/Benchmark/Data/Stream/Transform.hs" component: "bench:Data.Stream" - path: "./benchmark/Streamly/Benchmark/Data/Unfold.hs" component: "bench:Data.Unfold"