Revert "Fix zipWith(M) to work concurrently according to the stream type"

This reverts commit 1ddcfc4634.

This requires MonadAsync constraint which breaks the existing
zipWith for pure streams e.g. 'SerialT Identity' (for example in
streaming-benchmarks package). We can possibly have different zipWith
APIs for concurrent zipping.
This commit is contained in:
Harendra Kumar 2021-06-22 12:36:02 +05:30
parent 65875852d5
commit e09428332c
4 changed files with 25 additions and 52 deletions

View File

@ -15,9 +15,6 @@
* `encodeLatin1` now silently truncates any character beyond 255 to incorrect
characters in the input stream. Use `encodeLatin1'` to recover previous
functionality.
* The zipping functions `Streamly.Prelude.zipWith` and
`Streamly.Prelude.zipWithM` are now applied concurrently for concurrent
streams.
### Breaking type changes

View File

@ -372,7 +372,7 @@ iterateDropWhileTrue streamLen iterStreamLen maxIters = iterateSource iterStream
-------------------------------------------------------------------------------
{-# INLINE zipWith #-}
zipWith :: S.MonadAsync m => Stream m Int -> m ()
zipWith :: Monad m => Stream m Int -> m ()
zipWith src = drain $ S.zipWith (,) src src
-------------------------------------------------------------------------------

View File

@ -995,40 +995,34 @@ mapMaybe f m = go m
-- Serial Zipping
------------------------------------------------------------------------------
-- XXX We can probably implement zipWith in terms of zipWithM
-- | Zip two streams serially using a pure zipping function. The zipping
-- function is applied concurrently for concurrent streams.
-- | Zip two streams serially using a pure zipping function.
--
-- @since 0.1.0
{-# INLINABLE zipWith #-}
zipWith ::
(IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c
zipWith :: IsStream t => (a -> b -> c) -> t m a -> t m b -> t m c
zipWith f = go
where
go mx my = mkStream $ \st yld sng stp -> do
let merge a ra =
let single2 b = sng (f a b)
runIt x = foldStream st yld sng stp x
yield2 b rb = runIt (return (f a b) `consM` go ra rb)
yield2 b rb = yld (f a b) (go ra rb)
in foldStream (adaptState st) yield2 single2 stp my
let single1 a = merge a nil
yield1 = merge
foldStream (adaptState st) yield1 single1 stp mx
-- | Zip two streams serially using a monadic zipping function. The zipping
-- function is applied concurrently for concurrent streams.
-- | Zip two streams serially using a monadic zipping function.
--
-- @since 0.1.0
{-# INLINABLE zipWithM #-}
zipWithM ::
(IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c
zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c
zipWithM f m1 m2 = go m1 m2
where
go mx my = mkStream $ \st yld sng stp -> do
let merge a ra =
let runIt x = foldStream st yld sng stp x
single2 b = f a b >>= sng
yield2 b rb = runIt (f a b `consM` go ra rb)
yield2 b rb = f a b >>= \x -> runIt (x `cons` go ra rb)
in foldStream (adaptState st) yield2 single2 stp my
let single1 a = merge a nil
yield1 = merge

View File

@ -35,8 +35,6 @@ module Streamly.Internal.Data.Stream.Zip
)
where
#include "inline.hs"
import Control.Applicative (liftA2)
import Control.DeepSeq (NFData(..))
#if MIN_VERSION_deepseq(1,4,3)
@ -58,13 +56,18 @@ import Streamly.Internal.Data.Stream.StreamK (IsStream(..), Stream)
import Streamly.Internal.Data.Maybe.Strict (Maybe'(..), toMaybe)
import Streamly.Internal.Data.SVar (MonadAsync)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.Stream.Prelude as P
(cmpBy, eqBy, foldl', foldr, fromList, toList)
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Stream.StreamD as D
(cmpBy, eqBy, foldl', foldr, fromList, toList, fromStreamS, toStreamS)
import qualified Streamly.Internal.Data.Stream.StreamK as K (repeat)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import qualified Streamly.Internal.Data.Stream.StreamD as D (zipWithM)
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
#ifdef USE_STREAMK_ONLY
import qualified Streamly.Internal.Data.Stream.StreamK as S (zipWith, zipWithM)
#else
import qualified Streamly.Internal.Data.Stream.StreamD as S (zipWith, zipWithM)
#endif
import Prelude hiding (map, repeat, zipWith, errorWithoutStackTrace)
@ -83,21 +86,11 @@ import Prelude hiding (map, repeat, zipWith, errorWithoutStackTrace)
-- | Like 'zipWith' but using a monadic zipping function.
--
-- @since 0.4.0
{-# INLINE_EARLY zipWithM #-}
zipWithM ::
(IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c
zipWithM f m1 m2 =
K.fromStream $ K.zipWithM f (K.toStream m1) (K.toStream m2)
{-# INLINABLE zipWithM #-}
zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c
zipWithM f m1 m2 = P.fromStreamS $ S.zipWithM f (P.toStreamS m1) (P.toStreamS m2)
{-# RULES "zipWithM serial" zipWithM = zipWithMSerial #-}
{-# INLINE zipWithMSerial #-}
zipWithMSerial ::
Monad m => (a -> b -> m c) -> SerialT m a -> SerialT m b -> SerialT m c
zipWithMSerial f m1 m2 =
D.fromStreamD $ D.zipWithM f (D.toStreamD m1) (D.toStreamD m2)
-- | Zip two streams serially using a pure zipping function. The zipping
-- function is applied concurrently for concurrent streams.
-- | Zip two streams serially using a pure zipping function.
--
-- @
-- > S.toList $ S.zipWith (+) (S.fromList [1,2,3]) (S.fromList [4,5,6])
@ -105,17 +98,9 @@ zipWithMSerial f m1 m2 =
-- @
--
-- @since 0.1.0
{-# INLINE_EARLY zipWith #-}
zipWith ::
(IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c
zipWith f m1 m2 = K.fromStream $ K.zipWith f (K.toStream m1) (K.toStream m2)
{-# RULES "zipWith serial" zipWith = zipWithSerial #-}
{-# INLINE zipWithSerial #-}
zipWithSerial ::
Monad m => (a -> b -> c) -> SerialT m a -> SerialT m b -> SerialT m c
zipWithSerial f m1 m2 =
D.fromStreamD $ D.zipWith f (D.toStreamD m1) (D.toStreamD m2)
{-# INLINABLE zipWith #-}
zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c
zipWith f m1 m2 = P.fromStreamS $ S.zipWith f (P.toStreamS m1) (P.toStreamS m2)
------------------------------------------------------------------------------
-- Parallel Zipping
@ -220,10 +205,7 @@ instance Monad m => Functor (ZipSerialM m) where
instance Monad m => Applicative (ZipSerialM m) where
pure = ZipSerialM . K.repeat
{-# INLINE (<*>) #-}
m1 <*> m2 = fromStream $ toStream $ zipWithSerial id m1_ m2_
where
m1_ = fromStream (toStream m1)
m2_ = fromStream (toStream m2)
(<*>) = zipWith id
FOLDABLE_INSTANCE(ZipSerialM)
TRAVERSABLE_INSTANCE(ZipSerialM)