Use zipWith from StreamD in zipAsyncWith etc.

Improves perf of zipAsyncWith by 4x and applicative ZipAsync by 2x.

Not sure why applicative does not show the same perf as zipAsyncWith.
This commit is contained in:
Harendra Kumar 2019-11-21 21:31:04 +05:30
parent 36bfc45842
commit e16a980c5f
2 changed files with 38 additions and 43 deletions

View File

@ -248,8 +248,8 @@ module Streamly.Internal.Prelude
-- ** Zipping
, zipWith
, zipWithM
, Z.zipAsyncWith
, Z.zipAsyncWithM
, zipAsyncWith
, zipAsyncWithM
-- ** Nested Streams
, concatMapM
@ -435,7 +435,7 @@ import Streamly.Internal.Data.Fold.Types (Fold (..), Fold2 (..))
import Streamly.Internal.Data.Unfold.Types (Unfold)
import Streamly.Internal.Memory.Array.Types (Array, writeNUnsafe)
-- import Streamly.Memory.Ring (Ring)
import Streamly.Internal.Data.SVar (MonadAsync, defState)
import Streamly.Internal.Data.SVar (MonadAsync, defState, adaptState)
import Streamly.Streams.Async (mkAsync')
import Streamly.Streams.Combinators (inspectMode, maxYields)
import Streamly.Streams.Prelude
@ -456,11 +456,9 @@ import qualified Streamly.Internal.Data.Fold.Types as FL
import qualified Streamly.Streams.Prelude as P
import qualified Streamly.Streams.StreamK as K
import qualified Streamly.Streams.StreamD as D
import qualified Streamly.Streams.Zip as Z
#ifdef USE_STREAMK_ONLY
import qualified Streamly.Streams.StreamK as S
import qualified Streamly.Streams.Zip as S
#else
import qualified Streamly.Streams.StreamD as S
#endif
@ -2034,6 +2032,34 @@ zipWithM f m1 m2 = fromStreamS $ S.zipWithM f (toStreamS m1) (toStreamS m2)
zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c
zipWith f m1 m2 = fromStreamS $ S.zipWith f (toStreamS m1) (toStreamS m2)
------------------------------------------------------------------------------
-- Parallel Zipping
------------------------------------------------------------------------------
-- | Like 'zipWith' but zips concurrently i.e. both the streams being zipped
-- are generated concurrently.
--
-- @since 0.1.0
{-# INLINE zipAsyncWith #-}
zipAsyncWith :: (IsStream t, MonadAsync m)
=> (a -> b -> c) -> t m a -> t m b -> t m c
zipAsyncWith f m1 m2 = K.mkStream $ \st stp sng yld -> do
ma <- Par.mkParallel (adaptState st) m1
mb <- Par.mkParallel (adaptState st) m2
K.foldStream st stp sng yld $ zipWith f ma mb
-- | Like 'zipWithM' but zips concurrently i.e. both the streams being zipped
-- are generated concurrently.
--
-- @since 0.4.0
{-# INLINABLE zipAsyncWithM #-}
zipAsyncWithM :: (IsStream t, MonadAsync m)
=> (a -> b -> m c) -> t m a -> t m b -> t m c
zipAsyncWithM f m1 m2 = K.mkStream $ \st stp sng yld -> do
ma <- Par.mkParallel (adaptState st) m1
mb <- Par.mkParallel (adaptState st) m2
K.foldStream st stp sng yld $ zipWithM f ma mb
------------------------------------------------------------------------------
-- Comparison
------------------------------------------------------------------------------

View File

@ -20,12 +20,7 @@
--
module Streamly.Streams.Zip
(
K.zipWith
, K.zipWithM
, zipAsyncWith
, zipAsyncWithM
, ZipSerialM
ZipSerialM
, ZipSerial
, ZipStream -- deprecated
, zipSerially
@ -53,10 +48,10 @@ import Text.Read (Lexeme(Ident), lexP, parens, prec, readPrec, readListPrec,
readListPrecDefault)
import Prelude hiding (map, repeat, zipWith)
import Streamly.Streams.StreamK (IsStream(..), Stream, mkStream, foldStream)
import Streamly.Streams.Parallel (mkParallel)
import Streamly.Streams.StreamK (IsStream(..), Stream)
import Streamly.Streams.Serial (map)
import Streamly.Internal.Data.SVar (MonadAsync, adaptState)
import Streamly.Internal.Data.SVar (MonadAsync)
import Streamly.Internal.Prelude (zipWith, zipAsyncWith)
import qualified Streamly.Streams.Prelude as P
import qualified Streamly.Streams.StreamK as K
@ -136,39 +131,12 @@ instance Monad m => Functor (ZipSerialM m) where
instance Monad m => Applicative (ZipSerialM m) where
pure = ZipSerialM . K.repeat
(<*>) = K.zipWith id
{-# INLINE (<*>) #-}
(<*>) = zipWith id
FOLDABLE_INSTANCE(ZipSerialM)
TRAVERSABLE_INSTANCE(ZipSerialM)
------------------------------------------------------------------------------
-- Parallel Zipping
------------------------------------------------------------------------------
-- | Like 'zipWith' but zips concurrently i.e. both the streams being zipped
-- are generated concurrently.
--
-- @since 0.1.0
{-# INLINABLE zipAsyncWith #-}
zipAsyncWith :: (IsStream t, MonadAsync m)
=> (a -> b -> c) -> t m a -> t m b -> t m c
zipAsyncWith f m1 m2 = mkStream $ \st stp sng yld -> do
ma <- mkParallel (adaptState st) m1
mb <- mkParallel (adaptState st) m2
foldStream st stp sng yld (K.zipWith f ma mb)
-- | Like 'zipWithM' but zips concurrently i.e. both the streams being zipped
-- are generated concurrently.
--
-- @since 0.4.0
{-# INLINABLE zipAsyncWithM #-}
zipAsyncWithM :: (IsStream t, MonadAsync m)
=> (a -> b -> m c) -> t m a -> t m b -> t m c
zipAsyncWithM f m1 m2 = mkStream $ \st stp sng yld -> do
ma <- mkParallel (adaptState st) m1
mb <- mkParallel (adaptState st) m2
foldStream st stp sng yld (K.zipWithM f ma mb)
------------------------------------------------------------------------------
-- Parallely Zipping Streams
------------------------------------------------------------------------------
@ -233,4 +201,5 @@ instance Monad m => Functor (ZipAsyncM m) where
instance MonadAsync m => Applicative (ZipAsyncM m) where
pure = ZipAsyncM . K.repeat
{-# INLINE (<*>) #-}
m1 <*> m2 = zipAsyncWith id m1 m2