Fix a rate control issue in ordered streams

We were consing an evaluated stream element back into the stream and
puttting it on heap. This caused the latency of that item to be very low
next time it was yielded.
This commit is contained in:
Harendra Kumar 2024-06-26 01:39:35 +05:30
parent d6ccf9598c
commit 15418a8f2c

View File

@ -279,18 +279,38 @@ atomicModifyIORef_ ref f =
atomicModifyIORef ref $ \x -> (f x, ())
data AheadHeapEntry m a =
AheadEntryNull
| AheadEntryPure a
| AheadEntryStream (RunInIO m, K.StreamK m a)
AheadEntryNull -- ^ an empty result, required for sequencing
| AheadEntryPure a -- ^ a yielded value
-- ^ A stream with its head possibly evaluated, and tail unevaluated
| AheadEntryStream (RunInIO m, Maybe a, K.StreamK m a)
data HeapDequeueResult m a =
-- | Not dequeued because someone is processing the heap. This is indicated
-- by the second component of the heap IORef tuple being set to 'Nothing'.
Clearing
-- | Not dequeued because the seq no. of the top entry is not the next one
-- expected in seqeunce, we have to wait.
| Waiting Int
-- | dequeued successfully, the seq no. of the top entry is the next one
-- expected in sequence.
| Ready (Entry Int (AheadHeapEntry m a))
-- | The heap is stored in an IORef along with a sequence number. When the
-- sequence number is set to 'Nothing' it means we are processing the heap. The
-- type of the dequeued entry would be 'Clearing' in this case. When the
-- sequence number in the IORef is set to 'Just' then it is the next expected
-- sequence number. If the dequeued entry matches with this expected sequence
-- number then it is 'Ready' and dequeued otherwise it is 'Waiting'. When we
-- return 'Clearing' or 'Waiting', the heap is not modified i.e. nothing is
-- dequeued.
--
-- Note, when we have n streams each consisting of multiple items composed with
-- "ordered" execution then the entire stream is treated as one item with the
-- given sequence number and all of its elements are yielded serially.
{-# INLINE dequeueFromHeap #-}
dequeueFromHeap
:: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
-- ^ (heap, Maybe sequence-no).
-> IO (HeapDequeueResult m a)
dequeueFromHeap hpVar =
atomicModifyIORef hpVar $ \pair@(hp, snum) ->
@ -305,6 +325,10 @@ dequeueFromHeap hpVar =
else assert (seqNo >= n) (pair, Waiting n)
Nothing -> (pair, Waiting n)
-- | Called only when the heap is being processed to transfer entries to output
-- queue. Matches the sequence number of the dequeued entry with the supplied
-- sequence number to determine if the entry is 'Ready' or 'Waiting'. Heap is
-- not modified if we return 'Waiting' i.e. entry is not dequeued.
{-# INLINE dequeueFromHeapSeq #-}
dequeueFromHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
@ -497,77 +521,27 @@ abortExecution sv winfo = do
-- 2) make the other threads queue and go away if draining is in progress
--
-- In both cases we give the drainer a chance to run more often.
-- | Move entries from the heap to the channel's output queue. Only those
-- entries which are in correct order are transferred. Stop whenever a missing
-- sequence number is encountered.
--
-- We enter this function only when we have verified that the sequence number
-- passed to it is the next expected sequence number.
processHeap
:: MonadRunInIO m
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int)
=> IORef ([K.StreamK m a], Int) -- ^ work queue
-> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -- ^ heap
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry m a
-> Int
-> Bool -- we are draining the heap before we stop
-> AheadHeapEntry m a -- heap entry dequeued from top of heap
-> Int -- seq no. of the heap entry, this is the next correct seq no.
-> Bool -- True if we are draining the heap when we are finally stopping
-> m ()
processHeap q heap sv winfo entry sno stopping = loopHeap sno entry
where
stopIfNeeded ent seqNo r = do
stopIt <- liftIO $ preStopCheck sv heap
if stopIt
then liftIO $ do
-- put the entry back in the heap and stop
requeueOnHeapTop heap (Entry seqNo ent) seqNo
stopWith winfo sv
else runStreamWithYieldLimit True seqNo r
loopHeap seqNo ent =
case ent of
AheadEntryNull -> nextHeap seqNo
AheadEntryPure a -> do
-- Use 'send' directly so that we do not account this in worker
-- latency as this will not be the real latency.
-- Don't stop the worker in this case as we are just
-- transferring available results from heap to outputQueue.
void
$ liftIO
$ sendEvent
(outputQueue sv) (outputDoorBell sv) (ChildYield a)
nextHeap seqNo
AheadEntryStream (RunInIO runin, r) -> do
if stopping
then stopIfNeeded ent seqNo r
else do
res <- liftIO $ runin (runStreamWithYieldLimit True seqNo r)
restoreM res
nextHeap prevSeqNo = do
res <- liftIO $ dequeueFromHeapSeq heap (prevSeqNo + 1)
case res of
Ready (Entry seqNo hent) -> loopHeap seqNo hent
Clearing -> liftIO $ stopWith winfo sv
Waiting _ ->
if stopping
then do
r <- liftIO $ preStopCheck sv heap
if r
then liftIO $ stopWith winfo sv
else processWorkQueue prevSeqNo
else inline processWorkQueue prevSeqNo
processWorkQueue prevSeqNo = do
yieldLimitOk <- liftIO $ decrementYieldLimit (remainingWork sv)
if yieldLimitOk
then do
work <- dequeueAhead q
case work of
Nothing -> liftIO $ stopWith winfo sv
Just (m, seqNo) -> do
if seqNo == prevSeqNo + 1
then processWithToken q heap sv winfo m seqNo
else processWithoutToken q heap sv winfo m seqNo
else liftIO $ abortExecution sv winfo
-- We do not stop the worker on buffer full here as we want to proceed to
-- nextHeap anyway so that we can clear any subsequent entries. We stop
-- only in yield continuation where we may have a remaining stream to be
@ -576,6 +550,10 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry
void $ liftIO $ yieldWith winfo sv a
nextHeap seqNo
yieldStreamFromHeap seqNo a r = do
continue <- liftIO $ yieldWith winfo sv a
runStreamWithYieldLimit continue seqNo r
-- XXX when we have an unfinished stream on the heap we cannot account all
-- the yields of that stream until it finishes, so if we have picked up
-- and executed more actions beyond that in the parent stream and put them
@ -598,15 +576,93 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry
r
else do
runIn <- askRunInIO
let ent = Entry seqNo (AheadEntryStream (runIn, r))
let ent = Entry seqNo (AheadEntryStream (runIn, Nothing, r))
liftIO $ do
requeueOnHeapTop heap ent seqNo
incrementYieldLimit (remainingWork sv)
stopWith winfo sv
yieldStreamFromHeap seqNo a r = do
continue <- liftIO $ yieldWith winfo sv a
runStreamWithYieldLimit continue seqNo r
processWorkQueue prevSeqNo = do
yieldLimitOk <- liftIO $ decrementYieldLimit (remainingWork sv)
if yieldLimitOk
then do
work <- dequeueAhead q
case work of
Nothing -> liftIO $ stopWith winfo sv
Just (m, seqNo) -> do
if seqNo == prevSeqNo + 1
then processWithToken q heap sv winfo m seqNo
else processWithoutToken q heap sv winfo m seqNo
else liftIO $ abortExecution sv winfo
nextHeap prevSeqNo = do
res <- liftIO $ dequeueFromHeapSeq heap (prevSeqNo + 1)
case res of
Ready (Entry seqNo hent) -> loopHeap seqNo hent
Clearing -> liftIO $ stopWith winfo sv
Waiting _ ->
if stopping
then do
r <- liftIO $ preStopCheck sv heap
if r
then liftIO $ stopWith winfo sv
else processWorkQueue prevSeqNo
else inline processWorkQueue prevSeqNo
-- The main loop processing the heap. The seqNo is correct in sequence,
-- this is the one we should be sending to output next.
loopHeap seqNo ent =
case ent of
AheadEntryNull -> nextHeap seqNo
AheadEntryPure a -> do
-- Use 'send' directly so that we do not account this in worker
-- latency as this will not be the real latency.
-- Don't stop the worker in this case as we are just
-- transferring available results from heap to outputQueue.
void
$ liftIO
$ sendEvent
(outputQueue sv) (outputDoorBell sv) (ChildYield a)
nextHeap seqNo
AheadEntryStream (RunInIO runin, Just a, r) -> do
let
action = do
-- XXX deduplicate this code with the same code above
void
$ liftIO
$ sendEvent
(outputQueue sv) (outputDoorBell sv) (ChildYield a)
runStreamWithYieldLimit True seqNo r
go = do
res <- liftIO $ runin action
restoreM res
if stopping
then do
stopIt <- liftIO $ preStopCheck sv heap
if stopIt
then liftIO $ do
-- put the entry back in the heap and stop
requeueOnHeapTop heap (Entry seqNo ent) seqNo
stopWith winfo sv
else go
else go
AheadEntryStream (RunInIO runin, Nothing, r) -> do
-- XXX deuplicate this code with the code above
let
action = runStreamWithYieldLimit True seqNo r
go = do
res <- liftIO $ runin action
restoreM res
if stopping
then do
stopIt <- liftIO $ preStopCheck sv heap
if stopIt
then liftIO $ do
-- put the entry back in the heap and stop
requeueOnHeapTop heap (Entry seqNo ent) seqNo
stopWith winfo sv
else go
else go
{-# NOINLINE drainHeap #-}
drainHeap
@ -625,6 +681,12 @@ drainHeap q heap sv winfo = do
data HeapStatus = HContinue | HStop
-- XXX Rename to processOutOfOrder
-- | Without token means the worker is working on an item which not the next in
-- sequence, therefore, the output has to be placed on the heap rather than
-- sending it directly to the output queue.
--
processWithoutToken
:: MonadRunInIO m
=> IORef ([K.StreamK m a], Int)
@ -646,11 +708,15 @@ processWithoutToken q heap sv winfo m seqNo = do
toHeap AheadEntryNull
mrun = runInIO $ svarMrun sv
-- XXX When StreamD streams are converted to StreamK, even for singleton
-- streams we have a yield and a stop. That can cause perf overhead in case
-- of concurrent workers. We should always create streams with a "single"
-- continuation.
r <- liftIO $ mrun $
K.foldStreamShared undefined
(\a r -> do
runIn <- askRunInIO
toHeap $ AheadEntryStream (runIn, K.cons a r))
toHeap $ AheadEntryStream (runIn, Just a, r))
(toHeap . AheadEntryPure)
stopk
m
@ -678,6 +744,8 @@ processWithoutToken q heap sv winfo m seqNo = do
writeIORef (maxHeapSize $ svarStats sv) (H.size newHp)
heapOk <- liftIO $ underMaxHeap sv newHp
-- XXX Refactor to use join points
status <-
case yieldRateInfo sv of
Nothing -> return HContinue
@ -705,6 +773,14 @@ processWithoutToken q heap sv winfo m seqNo = do
data TokenWorkerStatus = TokenContinue Int | TokenSuspend
-- XXX Rename to processInOrder
-- | With token means this worker is working on an item which is the next in
-- sequence, therefore, it can be yielded directly to the output queue,
-- avoiding the heap.
--
-- Before suspending the worker has the responsibility to transfer all the
-- in-sequence entries from the heap to the output queue.
processWithToken
:: MonadRunInIO m
=> IORef ([K.StreamK m a], Int)
@ -761,7 +837,7 @@ processWithToken q heap sv winfo action sno = do
r
else do
runIn <- askRunInIO
let ent = Entry seqNo (AheadEntryStream (runIn, r))
let ent = Entry seqNo (AheadEntryStream (runIn, Nothing, r))
liftIO $ requeueOnHeapTop heap ent seqNo
liftIO $ incrementYieldLimit (remainingWork sv)
return TokenSuspend
@ -870,9 +946,13 @@ getLifoSVar :: forall m a. MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getLifoSVar mrun cfg = do
outQ <- newIORef ([], 0)
-- the second component of the tuple is "Nothing" when heap is being
-- cleared, "Just n" when we are expecting sequence number n to arrive
-- before we can start clearing the heap.
-- The second component of the heap IORef tuple is:
--
-- * "Nothing" when we are in the process of clearing the heap i.e. when
-- we are procssing the heap and transferring entries from the heap to the
-- output queue
-- * "Just n" when we are expecting sequence number n to arrive before we
-- can start clearing the heap.
outH <- newIORef (H.empty, Just 0)
outQMv <- newEmptyMVar
active <- newIORef 0