Add mergeByMFused

This commit is contained in:
Harendra Kumar 2021-09-25 19:42:10 +05:30
parent d5399efac5
commit 42b4ff2bbb

View File

@ -45,7 +45,8 @@ module Streamly.Internal.Data.Stream.IsStream.Expand
, append
-- ** wSerial
-- | 'wSerial' is a CPS based stream interleaving functions. It can be
-- | 'wSerial' is a CPS based stream interleaving functions. Use
-- 'concatPairsWith wSerial' to interleave @n@ streams uniformly. It can be
-- used with 'concatMapWith' as well, however, the interleaving behavior of
-- @n@ streams would be asymmetric giving exponentially more weightage to
-- streams that come earlier in the composition.
@ -77,6 +78,7 @@ module Streamly.Internal.Data.Stream.IsStream.Expand
-- , merge
, mergeBy
, mergeByM
, mergeByMFused
, mergeAsyncBy
, mergeAsyncByM
@ -167,20 +169,15 @@ import Streamly.Internal.Data.Stream.IsStream.Common
( concatM, concatMapM, concatMap, smapM, fromPure, fromEffect, parallelFst
, zipWith, zipWithM)
import Streamly.Internal.Data.Stream.IsStream.Type
(IsStream(..), fromStreamS, toStreamS, fromStreamD, toStreamD)
(IsStream(..), fromStreamD, toStreamD)
import Streamly.Internal.Data.Unfold.Type (Unfold)
import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream
import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Stream.StreamK as K (mergeBy)
import qualified Streamly.Internal.Data.Stream.StreamK as K (mergeBy, mergeByM)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
#ifdef USE_STREAMK_ONLY
import qualified Streamly.Internal.Data.Stream.StreamK as S
#else
import qualified Streamly.Internal.Data.Stream.StreamD as S
#endif
import qualified Streamly.Internal.Data.Stream.Zip as Zip
import Prelude hiding (concat, concatMap, zipWith)
@ -706,6 +703,8 @@ zipAsyncWith f = zipAsyncWithM (\a b -> return (f a b))
--
-- @
--
-- See also: 'mergeByMFused'
--
-- @since 0.6.0
{-# INLINE mergeBy #-}
mergeBy :: IsStream t => (a -> a -> Ordering) -> t m a -> t m a -> t m a
@ -740,12 +739,30 @@ mergeBy f m1 m2 = fromStream $ K.mergeBy f (toStream m1) (toStream m2)
--
-- @
--
-- See also: 'mergeByMFused'
--
-- @since 0.6.0
{-# INLINABLE mergeByM #-}
mergeByM
:: (IsStream t, Monad m)
=> (a -> a -> m Ordering) -> t m a -> t m a -> t m a
mergeByM f m1 m2 = fromStreamS $ S.mergeByM f (toStreamS m1) (toStreamS m2)
mergeByM f m1 m2 = fromStream $ K.mergeByM f (toStream m1) (toStream m2)
-- XXX Fused versions should probably go to a separate module using the same
-- names for the combinators.
--
-- | Like 'mergeByM' but much faster, works best when merging statically known
-- number of streams. When merging more than two streams try to merge pairs and
-- pair pf pairs in a tree like structure.'mergeByM' works better with variable
-- number of streams being merged using 'concatPairsWith'.
--
-- /Internal/
{-# INLINE mergeByMFused #-}
mergeByMFused
:: (IsStream t, Monad m)
=> (a -> a -> m Ordering) -> t m a -> t m a -> t m a
mergeByMFused f m1 m2 =
fromStreamD $ D.mergeByM f (toStreamD m1) (toStreamD m2)
{-
-- | Like 'mergeByM' but stops merging as soon as any of the two streams stops.