mirror of
https://github.com/composewell/streamly.git
synced 2024-11-10 12:47:22 +03:00
Change the order of yld and stp continuations
The names were incorrect as per the definition of the type. This problem occurred because the order of the arguments was changed at some point but we missed changing these.
This commit is contained in:
parent
bee5f1d70e
commit
d35674ea02
@ -580,14 +580,14 @@ workLoopAhead q heap st sv winfo = do
|
||||
-- The only difference between forkSVarAsync and this is that we run the left
|
||||
-- computation without a shared SVar.
|
||||
forkSVarAhead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
|
||||
forkSVarAhead m1 m2 = mkStream $ \st stp sng yld -> do
|
||||
forkSVarAhead m1 m2 = mkStream $ \st yld sng stp -> do
|
||||
sv <- newAheadVar st (concurrently (toStream m1) (toStream m2))
|
||||
workLoopAhead
|
||||
foldStream st stp sng yld (fromSVar sv)
|
||||
foldStream st yld sng stp (fromSVar sv)
|
||||
where
|
||||
concurrently ma mb = mkStream $ \st stp sng yld -> do
|
||||
concurrently ma mb = mkStream $ \st yld sng stp -> do
|
||||
liftIO $ enqueue (fromJust $ streamVar st) mb
|
||||
foldStream st stp sng yld ma
|
||||
foldStream st yld sng stp ma
|
||||
|
||||
-- | Polymorphic version of the 'Semigroup' operation '<>' of 'AheadT'.
|
||||
-- Merges two streams sequentially but with concurrent lookahead.
|
||||
@ -595,15 +595,15 @@ forkSVarAhead m1 m2 = mkStream $ \st stp sng yld -> do
|
||||
-- @since 0.3.0
|
||||
{-# INLINE ahead #-}
|
||||
ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
|
||||
ahead m1 m2 = mkStream $ \st stp sng yld ->
|
||||
ahead m1 m2 = mkStream $ \st yld sng stp ->
|
||||
case streamVar st of
|
||||
Just sv | svarStyle sv == AheadVar -> do
|
||||
liftIO $ enqueue sv (toStream m2)
|
||||
-- Always run the left side on a new SVar to avoid complexity in
|
||||
-- sequencing results. This means the left side cannot further
|
||||
-- split into more ahead computations on the same SVar.
|
||||
foldStream st stp sng yld m1
|
||||
_ -> foldStreamShared st stp sng yld (forkSVarAhead m1 m2)
|
||||
foldStream st yld sng stp m1
|
||||
_ -> foldStreamShared st yld sng stp (forkSVarAhead m1 m2)
|
||||
|
||||
-- | XXX we can implement it more efficienty by directly implementing instead
|
||||
-- of combining streams using ahead.
|
||||
|
@ -584,26 +584,26 @@ newWAsyncVar st m = do
|
||||
|
||||
forkSVarAsync :: (IsStream t, MonadAsync m)
|
||||
=> SVarStyle -> t m a -> t m a -> t m a
|
||||
forkSVarAsync style m1 m2 = mkStream $ \st stp sng yld -> do
|
||||
forkSVarAsync style m1 m2 = mkStream $ \st yld sng stp -> do
|
||||
sv <- case style of
|
||||
AsyncVar -> newAsyncVar st (concurrently (toStream m1) (toStream m2))
|
||||
WAsyncVar -> newWAsyncVar st (concurrently (toStream m1) (toStream m2))
|
||||
_ -> error "illegal svar type"
|
||||
foldStream st stp sng yld $ fromSVar sv
|
||||
foldStream st yld sng stp $ fromSVar sv
|
||||
where
|
||||
concurrently ma mb = mkStream $ \st stp sng yld -> do
|
||||
concurrently ma mb = mkStream $ \st yld sng stp -> do
|
||||
liftIO $ enqueue (fromJust $ streamVar st) mb
|
||||
foldStreamShared st stp sng yld ma
|
||||
foldStreamShared st yld sng stp ma
|
||||
|
||||
{-# INLINE joinStreamVarAsync #-}
|
||||
joinStreamVarAsync :: (IsStream t, MonadAsync m)
|
||||
=> SVarStyle -> t m a -> t m a -> t m a
|
||||
joinStreamVarAsync style m1 m2 = mkStream $ \st stp sng yld ->
|
||||
joinStreamVarAsync style m1 m2 = mkStream $ \st yld sng stp ->
|
||||
case streamVar st of
|
||||
Just sv | svarStyle sv == style -> do
|
||||
liftIO $ enqueue sv (toStream m2)
|
||||
foldStreamShared st stp sng yld m1
|
||||
_ -> foldStreamShared st stp sng yld (forkSVarAsync style m1 m2)
|
||||
foldStreamShared st yld sng stp m1
|
||||
_ -> foldStreamShared st yld sng stp (forkSVarAsync style m1 m2)
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Semigroup and Monoid style compositions for parallel actions
|
||||
|
Loading…
Reference in New Issue
Block a user