Use StreamD for parallel zip

improves concurrent zip perf by 3x
This commit is contained in:
Harendra Kumar 2019-11-21 03:23:48 +05:30
parent 2b31b3d07b
commit 36bfc45842
2 changed files with 17 additions and 7 deletions

View File

@ -276,12 +276,22 @@ parallelMin = joinStreamVarPar ParallelVar StopAny
-- Convert a stream to parallel
------------------------------------------------------------------------------
mkParallel :: (IsStream t, MonadAsync m) => t m a -> m (t m a)
mkParallel m = do
_mkParallelK :: (IsStream t, MonadAsync m) => t m a -> m (t m a)
_mkParallelK m = do
sv <- newParallelVar StopNone defState
pushWorkerPar sv (runOne defState{streamVar = Just sv} $ toStream m)
return $ fromSVar sv
-- We may have to use it in higher order functions like concatMap, so use the
-- Normal phase for inlining.
{-# INLINE_NORMAL mkParallel #-}
mkParallel :: (IsStream t, MonadAsync m)
=> State Stream m a -> t m a -> m (t m a)
mkParallel st m = do
sv <- newParallelVar StopNone defState
pushWorkerParD st sv (D.toStreamD m)
return $ fromSVar sv
------------------------------------------------------------------------------
-- Stream to stream concurrent function application
------------------------------------------------------------------------------

View File

@ -54,7 +54,7 @@ import Text.Read (Lexeme(Ident), lexP, parens, prec, readPrec, readListPrec,
import Prelude hiding (map, repeat, zipWith)
import Streamly.Streams.StreamK (IsStream(..), Stream, mkStream, foldStream)
import Streamly.Streams.Async (mkAsync')
import Streamly.Streams.Parallel (mkParallel)
import Streamly.Streams.Serial (map)
import Streamly.Internal.Data.SVar (MonadAsync, adaptState)
@ -153,8 +153,8 @@ TRAVERSABLE_INSTANCE(ZipSerialM)
zipAsyncWith :: (IsStream t, MonadAsync m)
=> (a -> b -> c) -> t m a -> t m b -> t m c
zipAsyncWith f m1 m2 = mkStream $ \st stp sng yld -> do
ma <- mkAsync' (adaptState st) m1
mb <- mkAsync' (adaptState st) m2
ma <- mkParallel (adaptState st) m1
mb <- mkParallel (adaptState st) m2
foldStream st stp sng yld (K.zipWith f ma mb)
-- | Like 'zipWithM' but zips concurrently i.e. both the streams being zipped
@ -165,8 +165,8 @@ zipAsyncWith f m1 m2 = mkStream $ \st stp sng yld -> do
zipAsyncWithM :: (IsStream t, MonadAsync m)
=> (a -> b -> m c) -> t m a -> t m b -> t m c
zipAsyncWithM f m1 m2 = mkStream $ \st stp sng yld -> do
ma <- mkAsync' (adaptState st) m1
mb <- mkAsync' (adaptState st) m2
ma <- mkParallel (adaptState st) m1
mb <- mkParallel (adaptState st) m2
foldStream st stp sng yld (K.zipWithM f ma mb)
------------------------------------------------------------------------------