Add two queues (outer and inner) in Async streams

To avoid the possibility of out of order execution of the outer loop
blowing up the queue size to unbounded size.
This commit is contained in:
Harendra Kumar 2022-10-04 20:10:26 +05:30
parent a3203ff9b1
commit 34a3a294be
5 changed files with 131 additions and 99 deletions

View File

@ -190,7 +190,10 @@ mkEnqueue :: MonadAsync m =>
mkEnqueue chan runner = do
runInIO <- askRunInIO
$ let q stream = liftIO $ enqueue chan (runInIO, runner q stream) in q
$ let q stream =
-- Enqueue the outer loop
liftIO $ enqueue chan False (runInIO, runner q stream)
in q
-- XXX Can be renamed to concatMapWithK if we move concatMapWithK to higher
-- level module. We can keep only Channel based ops in this module.

View File

@ -48,67 +48,86 @@ import Streamly.Internal.Data.Stream.Channel.Types
{-# INLINE enqueueLIFO #-}
enqueueLIFO ::
Channel m a
-> IORef [(RunInIO m, K.Stream m a)]
-> IORef ([(RunInIO m, K.Stream m a)], [(RunInIO m, K.Stream m a)])
-> Bool
-> (RunInIO m, K.Stream m a)
-> IO ()
enqueueLIFO sv q m = do
atomicModifyIORefCAS_ q $ \ms -> m : ms
enqueueLIFO sv q inner m = do
atomicModifyIORefCAS_ q $ \(xs, ys) ->
if inner then (xs, m : ys) else (m : xs, ys)
ringDoorBell (needDoorBell sv) (outputDoorBell sv)
data QResult a = QEmpty | QOuter a | QInner a
{-# INLINE dequeue #-}
dequeue :: MonadIO m =>
IORef ([(RunInIO m, K.Stream m a)], [(RunInIO m, K.Stream m a)])
-> m (QResult (RunInIO m, K.Stream m a))
dequeue qref =
$ atomicModifyIORefCAS qref
$ \case
(xs, y : ys) -> ((xs, ys), QInner y)
(x : xs, ys) -> ((xs, ys), QOuter x)
x -> (x, QEmpty)
data WorkerStatus = Continue | Suspend
{-# INLINE workLoopLIFO #-}
:: MonadRunInIO m
=> IORef [(RunInIO m, K.Stream m a)]
-- -> State Stream m a
=> IORef ([(RunInIO m, K.Stream m a)], [(RunInIO m, K.Stream m a)])
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFO q sv winfo = run
workLoopLIFO qref sv winfo = run
run = do
work <- dequeue
work <- dequeue qref
case work of
Nothing -> liftIO $ stop sv winfo
Just (RunInIO runin, m) -> do
-- XXX when we finish we need to send the monadic state back to
-- the parent so that the state can be merged back. We capture
-- and return the state in the stop continuation.
-- Instead of using the run function we can just restore the
-- monad state here. That way it can work easily for
-- distributed case as well.
r <- liftIO $ runin $
(return Continue)
res <- restoreM r
case res of
Continue -> run
Suspend -> liftIO $ stop sv winfo
QEmpty ->
liftIO $ stop sv winfo
QInner (RunInIO runin, m) ->
process runin m True
QOuter (RunInIO runin, m) ->
process runin m False
single a = do
res <- liftIO $ yield sv winfo a
return $ if res then Continue else Suspend
process runin m inner = do
-- XXX when we finish we need to send the monadic state back to
-- the parent so that the state can be merged back. We capture
-- and return the state in the stop continuation.
-- Instead of using the run function we can just restore the
-- monad state here. That way it can work easily for
-- distributed case as well.
r <- liftIO $ runin $
(return Continue)
res <- restoreM r
case res of
Continue -> run
Suspend -> liftIO $ stop sv winfo
yieldk a r = do
res <- liftIO $ yield sv winfo a
if res
then K.foldStreamShared undefined yieldk single (return Continue) r
else do
runInIO <- askRunInIO
liftIO $ enqueueLIFO sv q (runInIO, r)
return Suspend
dequeue = liftIO $ atomicModifyIORefCAS q $ \case
[] -> ([], Nothing)
x : xs -> (xs, Just x)
single a = do
res <- liftIO $ yield sv winfo a
return $ if res then Continue else Suspend
yieldk a r = do
res <- liftIO $ yield sv winfo a
if res
then K.foldStreamShared undefined yieldk single (return Continue) r
else do
runInIO <- askRunInIO
liftIO $ enqueueLIFO sv qref inner (runInIO, r)
return Suspend
-- We duplicate workLoop for yield limit and no limit cases because it has
-- around 40% performance overhead in the worst case.
@ -118,12 +137,11 @@ workLoopLIFO q sv winfo = run
{-# INLINE workLoopLIFOLimited #-}
:: forall m a. MonadRunInIO m
=> IORef [(RunInIO m, K.Stream m a)]
-- -> State Stream m a
=> IORef ([(RunInIO m, K.Stream m a)], [(RunInIO m, K.Stream m a)])
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFOLimited q sv winfo = run
workLoopLIFOLimited qref sv winfo = run
@ -131,57 +149,61 @@ workLoopLIFOLimited q sv winfo = run
liftIO (incrementYieldLimit (remainingWork sv)) >> return Continue
run = do
work <- dequeue
work <- dequeue qref
case work of
Nothing -> liftIO $ stop sv winfo
Just (RunInIO runin, m) -> do
-- XXX This is just a best effort minimization of concurrency
-- to the yield limit. If the stream is made of concurrent
-- streams we do not reserve the yield limit in the constituent
-- streams before executing the action. This can be done
-- though, by sharing the yield limit ref with downstream
-- actions via state passing. Just a todo.
yieldLimitOk <- liftIO $ decrementYieldLimit (remainingWork sv)
if yieldLimitOk
then do
r <- liftIO $ runin $
res <- restoreM r
case res of
Continue -> run
Suspend -> liftIO $ stop sv winfo
-- Avoid any side effects, undo the yield limit decrement if we
-- never yielded anything.
else liftIO $ do
enqueueLIFO sv q (RunInIO runin, m)
incrementYieldLimit (remainingWork sv)
stop sv winfo
QEmpty ->
liftIO $ stop sv winfo
QInner item ->
process item True
QOuter item ->
process item False
single a = do
res <- liftIO $ yield sv winfo a
return $ if res then Continue else Suspend
-- XXX can we pass on the yield limit downstream to limit the concurrency
-- of constituent streams.
yieldk a r = do
res <- liftIO $ yield sv winfo a
process item@(RunInIO runin, m) inner = do
-- XXX This is just a best effort minimization of concurrency
-- to the yield limit. If the stream is made of concurrent
-- streams we do not reserve the yield limit in the constituent
-- streams before executing the action. This can be done
-- though, by sharing the yield limit ref with downstream
-- actions via state passing. Just a todo.
yieldLimitOk <- liftIO $ decrementYieldLimit (remainingWork sv)
if res && yieldLimitOk
then K.foldStreamShared undefined yieldk single incrContinue r
else do
runInIO <- askRunInIO
liftIO $ incrementYieldLimit (remainingWork sv)
liftIO $ enqueueLIFO sv q (runInIO, r)
return Suspend
if yieldLimitOk
then do
r <- liftIO $ runin $
res <- restoreM r
case res of
Continue -> run
Suspend -> liftIO $ stop sv winfo
-- Avoid any side effects, undo the yield limit decrement if we
-- never yielded anything.
else liftIO $ do
enqueueLIFO sv qref inner item
incrementYieldLimit (remainingWork sv)
stop sv winfo
dequeue = liftIO $ atomicModifyIORefCAS q $ \case
[] -> ([], Nothing)
x : xs -> (xs, Just x)
single a = do
res <- liftIO $ yield sv winfo a
return $ if res then Continue else Suspend
-- XXX can we pass on the yield limit downstream to limit the
-- concurrency of constituent streams.
yieldk a r = do
res <- liftIO $ yield sv winfo a
yieldLimitOk <- liftIO $ decrementYieldLimit (remainingWork sv)
if res && yieldLimitOk
then K.foldStreamShared undefined yieldk single incrContinue r
else do
runInIO <- askRunInIO
liftIO $ incrementYieldLimit (remainingWork sv)
liftIO $ enqueueLIFO sv qref inner (runInIO, r)
return Suspend
-- SVar creation
@ -201,7 +223,10 @@ getLifoSVar mrun cfg = do
active <- newIORef 0
wfw <- newIORef False
running <- newIORef Set.empty
q <- newIORef ([] :: [(RunInIO m, K.Stream m a)])
q <- newIORef
( [] :: [(RunInIO m, K.Stream m a)]
, [] :: [(RunInIO m, K.Stream m a)]
yl <- case getYieldLimit cfg of
Nothing -> return Nothing
Just x -> Just <$> newIORef x
@ -210,7 +235,11 @@ getLifoSVar mrun cfg = do
stats <- newSVarStats
tid <- myThreadId
let isWorkFinished _ = null <$> readIORef q
-- We are reading it without lock, the result would be reliable only if no
-- worker is pending.
let isWorkFinished _ = do
(xs, ys) <- readIORef q
return (null xs && null ys)
let isWorkFinishedLimited sv = do
yieldsDone <-
@ -219,14 +248,14 @@ getLifoSVar mrun cfg = do
n <- readIORef ref
return (n <= 0)
Nothing -> return False
qEmpty <- null <$> readIORef q
qEmpty <- isWorkFinished sv
return $ qEmpty || yieldsDone
let getSVar :: Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef [(RunInIO m, K.Stream m a)]
-> (IORef ([(RunInIO m, K.Stream m a)], [(RunInIO m, K.Stream m a)])
-> Channel m a
-> Maybe WorkerInfo
-> m())

View File

@ -197,7 +197,7 @@ getFifoSVar mrun cfg = do
, postProcess = postProc sv
, workerThreads = running
, workLoop = wloop q sv
, enqueue = enqueueFIFO sv q
, enqueue = \_ -> enqueueFIFO sv q
, isWorkDone = workDone sv
, isQueueDone = workDone sv
, needDoorBell = wfw

View File

@ -96,7 +96,7 @@ toChannelK :: (MonadIO m, MonadBaseControl IO m) =>
Channel m a -> K.Stream m a -> m ()
toChannelK sv m = do
runIn <- askRunInIO
liftIO $ enqueue sv (runIn, m)
liftIO $ enqueue sv False (runIn, m)
-- INLINE for fromStreamK/toStreamK fusion

View File

@ -84,7 +84,7 @@ data Channel m a = Channel
, remainingWork :: Maybe (IORef Count)
, yieldRateInfo :: Maybe YieldRateInfo
, enqueue :: (RunInIO m, Stream m a) -> IO ()
, enqueue :: Bool -> (RunInIO m, Stream m a) -> IO ()
, isWorkDone :: IO Bool
, isQueueDone :: IO Bool
, needDoorBell :: IORef Bool