From 15418a8f2c86018e8ae1f4b2a7352b9ed11da7a8 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Wed, 26 Jun 2024 01:39:35 +0530 Subject: [PATCH] Fix a rate control issue in ordered streams We were consing an evaluated stream element back into the stream and puttting it on heap. This caused the latency of that item to be very low next time it was yielded. --- .../Internal/Data/Stream/Channel/Append.hs | 226 ++++++++++++------ 1 file changed, 153 insertions(+), 73 deletions(-) diff --git a/src/Streamly/Internal/Data/Stream/Channel/Append.hs b/src/Streamly/Internal/Data/Stream/Channel/Append.hs index 0b9e13a56..387599091 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Append.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Append.hs @@ -279,18 +279,38 @@ atomicModifyIORef_ ref f = atomicModifyIORef ref $ \x -> (f x, ()) data AheadHeapEntry m a = - AheadEntryNull - | AheadEntryPure a - | AheadEntryStream (RunInIO m, K.StreamK m a) + AheadEntryNull -- ^ an empty result, required for sequencing + | AheadEntryPure a -- ^ a yielded value + -- ^ A stream with its head possibly evaluated, and tail unevaluated + | AheadEntryStream (RunInIO m, Maybe a, K.StreamK m a) data HeapDequeueResult m a = + -- | Not dequeued because someone is processing the heap. This is indicated + -- by the second component of the heap IORef tuple being set to 'Nothing'. Clearing + -- | Not dequeued because the seq no. of the top entry is not the next one + -- expected in seqeunce, we have to wait. | Waiting Int + -- | dequeued successfully, the seq no. of the top entry is the next one + -- expected in sequence. | Ready (Entry Int (AheadHeapEntry m a)) +-- | The heap is stored in an IORef along with a sequence number. When the +-- sequence number is set to 'Nothing' it means we are processing the heap. The +-- type of the dequeued entry would be 'Clearing' in this case. When the +-- sequence number in the IORef is set to 'Just' then it is the next expected +-- sequence number. If the dequeued entry matches with this expected sequence +-- number then it is 'Ready' and dequeued otherwise it is 'Waiting'. When we +-- return 'Clearing' or 'Waiting', the heap is not modified i.e. nothing is +-- dequeued. +-- +-- Note, when we have n streams each consisting of multiple items composed with +-- "ordered" execution then the entire stream is treated as one item with the +-- given sequence number and all of its elements are yielded serially. {-# INLINE dequeueFromHeap #-} dequeueFromHeap :: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) + -- ^ (heap, Maybe sequence-no). -> IO (HeapDequeueResult m a) dequeueFromHeap hpVar = atomicModifyIORef hpVar $ \pair@(hp, snum) -> @@ -305,6 +325,10 @@ dequeueFromHeap hpVar = else assert (seqNo >= n) (pair, Waiting n) Nothing -> (pair, Waiting n) +-- | Called only when the heap is being processed to transfer entries to output +-- queue. Matches the sequence number of the dequeued entry with the supplied +-- sequence number to determine if the entry is 'Ready' or 'Waiting'. Heap is +-- not modified if we return 'Waiting' i.e. entry is not dequeued. {-# INLINE dequeueFromHeapSeq #-} dequeueFromHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) @@ -497,77 +521,27 @@ abortExecution sv winfo = do -- 2) make the other threads queue and go away if draining is in progress -- -- In both cases we give the drainer a chance to run more often. + +-- | Move entries from the heap to the channel's output queue. Only those +-- entries which are in correct order are transferred. Stop whenever a missing +-- sequence number is encountered. -- +-- We enter this function only when we have verified that the sequence number +-- passed to it is the next expected sequence number. processHeap :: MonadRunInIO m - => IORef ([K.StreamK m a], Int) - -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) + => IORef ([K.StreamK m a], Int) -- ^ work queue + -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -- ^ heap -> Channel m a -> Maybe WorkerInfo - -> AheadHeapEntry m a - -> Int - -> Bool -- we are draining the heap before we stop + -> AheadHeapEntry m a -- heap entry dequeued from top of heap + -> Int -- seq no. of the heap entry, this is the next correct seq no. + -> Bool -- True if we are draining the heap when we are finally stopping -> m () processHeap q heap sv winfo entry sno stopping = loopHeap sno entry where - stopIfNeeded ent seqNo r = do - stopIt <- liftIO $ preStopCheck sv heap - if stopIt - then liftIO $ do - -- put the entry back in the heap and stop - requeueOnHeapTop heap (Entry seqNo ent) seqNo - stopWith winfo sv - else runStreamWithYieldLimit True seqNo r - - loopHeap seqNo ent = - case ent of - AheadEntryNull -> nextHeap seqNo - AheadEntryPure a -> do - -- Use 'send' directly so that we do not account this in worker - -- latency as this will not be the real latency. - -- Don't stop the worker in this case as we are just - -- transferring available results from heap to outputQueue. - void - $ liftIO - $ sendEvent - (outputQueue sv) (outputDoorBell sv) (ChildYield a) - nextHeap seqNo - AheadEntryStream (RunInIO runin, r) -> do - if stopping - then stopIfNeeded ent seqNo r - else do - res <- liftIO $ runin (runStreamWithYieldLimit True seqNo r) - restoreM res - - nextHeap prevSeqNo = do - res <- liftIO $ dequeueFromHeapSeq heap (prevSeqNo + 1) - case res of - Ready (Entry seqNo hent) -> loopHeap seqNo hent - Clearing -> liftIO $ stopWith winfo sv - Waiting _ -> - if stopping - then do - r <- liftIO $ preStopCheck sv heap - if r - then liftIO $ stopWith winfo sv - else processWorkQueue prevSeqNo - else inline processWorkQueue prevSeqNo - - processWorkQueue prevSeqNo = do - yieldLimitOk <- liftIO $ decrementYieldLimit (remainingWork sv) - if yieldLimitOk - then do - work <- dequeueAhead q - case work of - Nothing -> liftIO $ stopWith winfo sv - Just (m, seqNo) -> do - if seqNo == prevSeqNo + 1 - then processWithToken q heap sv winfo m seqNo - else processWithoutToken q heap sv winfo m seqNo - else liftIO $ abortExecution sv winfo - -- We do not stop the worker on buffer full here as we want to proceed to -- nextHeap anyway so that we can clear any subsequent entries. We stop -- only in yield continuation where we may have a remaining stream to be @@ -576,6 +550,10 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry void $ liftIO $ yieldWith winfo sv a nextHeap seqNo + yieldStreamFromHeap seqNo a r = do + continue <- liftIO $ yieldWith winfo sv a + runStreamWithYieldLimit continue seqNo r + -- XXX when we have an unfinished stream on the heap we cannot account all -- the yields of that stream until it finishes, so if we have picked up -- and executed more actions beyond that in the parent stream and put them @@ -598,15 +576,93 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry r else do runIn <- askRunInIO - let ent = Entry seqNo (AheadEntryStream (runIn, r)) + let ent = Entry seqNo (AheadEntryStream (runIn, Nothing, r)) liftIO $ do requeueOnHeapTop heap ent seqNo incrementYieldLimit (remainingWork sv) stopWith winfo sv - yieldStreamFromHeap seqNo a r = do - continue <- liftIO $ yieldWith winfo sv a - runStreamWithYieldLimit continue seqNo r + processWorkQueue prevSeqNo = do + yieldLimitOk <- liftIO $ decrementYieldLimit (remainingWork sv) + if yieldLimitOk + then do + work <- dequeueAhead q + case work of + Nothing -> liftIO $ stopWith winfo sv + Just (m, seqNo) -> do + if seqNo == prevSeqNo + 1 + then processWithToken q heap sv winfo m seqNo + else processWithoutToken q heap sv winfo m seqNo + else liftIO $ abortExecution sv winfo + + nextHeap prevSeqNo = do + res <- liftIO $ dequeueFromHeapSeq heap (prevSeqNo + 1) + case res of + Ready (Entry seqNo hent) -> loopHeap seqNo hent + Clearing -> liftIO $ stopWith winfo sv + Waiting _ -> + if stopping + then do + r <- liftIO $ preStopCheck sv heap + if r + then liftIO $ stopWith winfo sv + else processWorkQueue prevSeqNo + else inline processWorkQueue prevSeqNo + + -- The main loop processing the heap. The seqNo is correct in sequence, + -- this is the one we should be sending to output next. + loopHeap seqNo ent = + case ent of + AheadEntryNull -> nextHeap seqNo + AheadEntryPure a -> do + -- Use 'send' directly so that we do not account this in worker + -- latency as this will not be the real latency. + -- Don't stop the worker in this case as we are just + -- transferring available results from heap to outputQueue. + void + $ liftIO + $ sendEvent + (outputQueue sv) (outputDoorBell sv) (ChildYield a) + nextHeap seqNo + AheadEntryStream (RunInIO runin, Just a, r) -> do + let + action = do + -- XXX deduplicate this code with the same code above + void + $ liftIO + $ sendEvent + (outputQueue sv) (outputDoorBell sv) (ChildYield a) + runStreamWithYieldLimit True seqNo r + go = do + res <- liftIO $ runin action + restoreM res + if stopping + then do + stopIt <- liftIO $ preStopCheck sv heap + if stopIt + then liftIO $ do + -- put the entry back in the heap and stop + requeueOnHeapTop heap (Entry seqNo ent) seqNo + stopWith winfo sv + else go + else go + AheadEntryStream (RunInIO runin, Nothing, r) -> do + -- XXX deuplicate this code with the code above + let + action = runStreamWithYieldLimit True seqNo r + go = do + res <- liftIO $ runin action + restoreM res + if stopping + then do + stopIt <- liftIO $ preStopCheck sv heap + if stopIt + then liftIO $ do + -- put the entry back in the heap and stop + requeueOnHeapTop heap (Entry seqNo ent) seqNo + stopWith winfo sv + else go + else go {-# NOINLINE drainHeap #-} drainHeap @@ -625,6 +681,12 @@ drainHeap q heap sv winfo = do data HeapStatus = HContinue | HStop +-- XXX Rename to processOutOfOrder + +-- | Without token means the worker is working on an item which not the next in +-- sequence, therefore, the output has to be placed on the heap rather than +-- sending it directly to the output queue. +-- processWithoutToken :: MonadRunInIO m => IORef ([K.StreamK m a], Int) @@ -646,11 +708,15 @@ processWithoutToken q heap sv winfo m seqNo = do toHeap AheadEntryNull mrun = runInIO $ svarMrun sv + -- XXX When StreamD streams are converted to StreamK, even for singleton + -- streams we have a yield and a stop. That can cause perf overhead in case + -- of concurrent workers. We should always create streams with a "single" + -- continuation. r <- liftIO $ mrun $ K.foldStreamShared undefined (\a r -> do runIn <- askRunInIO - toHeap $ AheadEntryStream (runIn, K.cons a r)) + toHeap $ AheadEntryStream (runIn, Just a, r)) (toHeap . AheadEntryPure) stopk m @@ -678,6 +744,8 @@ processWithoutToken q heap sv winfo m seqNo = do writeIORef (maxHeapSize $ svarStats sv) (H.size newHp) heapOk <- liftIO $ underMaxHeap sv newHp + + -- XXX Refactor to use join points status <- case yieldRateInfo sv of Nothing -> return HContinue @@ -705,6 +773,14 @@ processWithoutToken q heap sv winfo m seqNo = do data TokenWorkerStatus = TokenContinue Int | TokenSuspend +-- XXX Rename to processInOrder + +-- | With token means this worker is working on an item which is the next in +-- sequence, therefore, it can be yielded directly to the output queue, +-- avoiding the heap. +-- +-- Before suspending the worker has the responsibility to transfer all the +-- in-sequence entries from the heap to the output queue. processWithToken :: MonadRunInIO m => IORef ([K.StreamK m a], Int) @@ -761,7 +837,7 @@ processWithToken q heap sv winfo action sno = do r else do runIn <- askRunInIO - let ent = Entry seqNo (AheadEntryStream (runIn, r)) + let ent = Entry seqNo (AheadEntryStream (runIn, Nothing, r)) liftIO $ requeueOnHeapTop heap ent seqNo liftIO $ incrementYieldLimit (remainingWork sv) return TokenSuspend @@ -870,9 +946,13 @@ getLifoSVar :: forall m a. MonadRunInIO m => RunInIO m -> Config -> IO (Channel m a) getLifoSVar mrun cfg = do outQ <- newIORef ([], 0) - -- the second component of the tuple is "Nothing" when heap is being - -- cleared, "Just n" when we are expecting sequence number n to arrive - -- before we can start clearing the heap. + -- The second component of the heap IORef tuple is: + -- + -- * "Nothing" when we are in the process of clearing the heap i.e. when + -- we are procssing the heap and transferring entries from the heap to the + -- output queue + -- * "Just n" when we are expecting sequence number n to arrive before we + -- can start clearing the heap. outH <- newIORef (H.empty, Just 0) outQMv <- newEmptyMVar active <- newIORef 0