Rename concatMapDivK to concatMapHeadK

This commit is contained in:
Harendra Kumar 2024-02-15 03:09:01 +05:30
parent a7de37d86a
commit 7f6deea1e6

View File

@ -262,22 +262,23 @@ parTwo modifier stream1 stream2 =
-- Evaluator
-------------------------------------------------------------------------------
-- | @concatMapDivK useTail useHead stream@, divides the stream in head and
-- tail, maps a stream generator on the head and maps an action on the tail of
-- a stream. Returns the stream generated by the head.
-- | @concatMapHeadK consumeTail mapHead stream@, maps a stream generation
-- function on the head element and performs a side effect on the tail.
--
-- Used for concurrent evaluation of streams using a Channel.
{-# INLINE concatMapDivK #-}
concatMapDivK :: Monad m =>
-- Used for concurrent evaluation of streams using a Channel. A worker
-- evaluating the stream would queue the tail and go on to evaluate the head.
-- The tail is picked up by another worker which does the same.
{-# INLINE concatMapHeadK #-}
concatMapHeadK :: Monad m =>
(K.StreamK m a -> m ()) -- ^ Queue the tail
-> (a -> K.StreamK m b) -- ^ Generate a stream from the head
-> K.StreamK m a
-> K.StreamK m b
concatMapDivK useTail useHead stream =
concatMapHeadK consumeTail mapHead stream =
K.mkStream $ \st yld sng stp -> do
let foldShared = K.foldStreamShared st yld sng stp
single a = foldShared $ useHead a
yieldk a r = useTail r >> single a
single a = foldShared $ mapHead a
yieldk a r = consumeTail r >> single a
in K.foldStreamShared (adaptState st) yieldk single stp stream
-------------------------------------------------------------------------------
@ -338,7 +339,7 @@ mkEnqueue chan runner = do
parConcatMapChanK :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanK chan f stream =
let run q = concatMapDivK q f
let run q = concatMapHeadK q f
in K.concatMapEffect (`run` stream) (mkEnqueue chan run)
-- K.parConcatMap (_appendWithChanK chan) f stream
@ -347,7 +348,7 @@ parConcatMapChanKAny :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKAny chan f stream =
let done = K.nilM (shutdown chan)
run q = concatMapDivK q (\x -> K.append (f x) done)
run q = concatMapHeadK q (\x -> K.append (f x) done)
in K.concatMapEffect (`run` stream) (mkEnqueue chan run)
{-# INLINE parConcatMapChanKFirst #-}
@ -355,7 +356,7 @@ parConcatMapChanKFirst :: MonadAsync m =>
Channel m b -> (a -> K.StreamK m b) -> K.StreamK m a -> K.StreamK m b
parConcatMapChanKFirst chan f stream =
let done = K.nilM (shutdown chan)
run q = concatMapDivK q f
run q = concatMapHeadK q f
in K.concatEffect $ do
res <- K.uncons stream
case res of