diff --git a/src/Streamly/Internal/Data/Stream/Channel/Append.hs b/src/Streamly/Internal/Data/Stream/Channel/Append.hs index 0b9e13a56..387599091 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Append.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Append.hs @@ -279,18 +279,38 @@ atomicModifyIORef_ ref f = atomicModifyIORef ref $ \x -> (f x, ()) data AheadHeapEntry m a = - AheadEntryNull - | AheadEntryPure a - | AheadEntryStream (RunInIO m, K.StreamK m a) + AheadEntryNull -- ^ an empty result, required for sequencing + | AheadEntryPure a -- ^ a yielded value + -- ^ A stream with its head possibly evaluated, and tail unevaluated + | AheadEntryStream (RunInIO m, Maybe a, K.StreamK m a) data HeapDequeueResult m a = + -- | Not dequeued because someone is processing the heap. This is indicated + -- by the second component of the heap IORef tuple being set to 'Nothing'. Clearing + -- | Not dequeued because the seq no. of the top entry is not the next one + -- expected in seqeunce, we have to wait. | Waiting Int + -- | dequeued successfully, the seq no. of the top entry is the next one + -- expected in sequence. | Ready (Entry Int (AheadHeapEntry m a)) +-- | The heap is stored in an IORef along with a sequence number. When the +-- sequence number is set to 'Nothing' it means we are processing the heap. The +-- type of the dequeued entry would be 'Clearing' in this case. When the +-- sequence number in the IORef is set to 'Just' then it is the next expected +-- sequence number. If the dequeued entry matches with this expected sequence +-- number then it is 'Ready' and dequeued otherwise it is 'Waiting'. When we +-- return 'Clearing' or 'Waiting', the heap is not modified i.e. nothing is +-- dequeued. +-- +-- Note, when we have n streams each consisting of multiple items composed with +-- "ordered" execution then the entire stream is treated as one item with the +-- given sequence number and all of its elements are yielded serially. {-# INLINE dequeueFromHeap #-} dequeueFromHeap :: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) + -- ^ (heap, Maybe sequence-no). -> IO (HeapDequeueResult m a) dequeueFromHeap hpVar = atomicModifyIORef hpVar $ \pair@(hp, snum) -> @@ -305,6 +325,10 @@ dequeueFromHeap hpVar = else assert (seqNo >= n) (pair, Waiting n) Nothing -> (pair, Waiting n) +-- | Called only when the heap is being processed to transfer entries to output +-- queue. Matches the sequence number of the dequeued entry with the supplied +-- sequence number to determine if the entry is 'Ready' or 'Waiting'. Heap is +-- not modified if we return 'Waiting' i.e. entry is not dequeued. {-# INLINE dequeueFromHeapSeq #-} dequeueFromHeapSeq :: IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) @@ -497,77 +521,27 @@ abortExecution sv winfo = do -- 2) make the other threads queue and go away if draining is in progress -- -- In both cases we give the drainer a chance to run more often. + +-- | Move entries from the heap to the channel's output queue. Only those +-- entries which are in correct order are transferred. Stop whenever a missing +-- sequence number is encountered. -- +-- We enter this function only when we have verified that the sequence number +-- passed to it is the next expected sequence number. processHeap :: MonadRunInIO m - => IORef ([K.StreamK m a], Int) - -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) + => IORef ([K.StreamK m a], Int) -- ^ work queue + -> IORef (Heap (Entry Int (AheadHeapEntry m a)), Maybe Int) -- ^ heap -> Channel m a -> Maybe WorkerInfo - -> AheadHeapEntry m a - -> Int - -> Bool -- we are draining the heap before we stop + -> AheadHeapEntry m a -- heap entry dequeued from top of heap + -> Int -- seq no. of the heap entry, this is the next correct seq no. + -> Bool -- True if we are draining the heap when we are finally stopping -> m () processHeap q heap sv winfo entry sno stopping = loopHeap sno entry where - stopIfNeeded ent seqNo r = do - stopIt <- liftIO $ preStopCheck sv heap - if stopIt - then liftIO $ do - -- put the entry back in the heap and stop - requeueOnHeapTop heap (Entry seqNo ent) seqNo - stopWith winfo sv - else runStreamWithYieldLimit True seqNo r - - loopHeap seqNo ent = - case ent of - AheadEntryNull -> nextHeap seqNo - AheadEntryPure a -> do - -- Use 'send' directly so that we do not account this in worker - -- latency as this will not be the real latency. - -- Don't stop the worker in this case as we are just - -- transferring available results from heap to outputQueue. - void - $ liftIO - $ sendEvent - (outputQueue sv) (outputDoorBell sv) (ChildYield a) - nextHeap seqNo - AheadEntryStream (RunInIO runin, r) -> do - if stopping - then stopIfNeeded ent seqNo r - else do - res <- liftIO $ runin (runStreamWithYieldLimit True seqNo r) - restoreM res - - nextHeap prevSeqNo = do - res <- liftIO $ dequeueFromHeapSeq heap (prevSeqNo + 1) - case res of - Ready (Entry seqNo hent) -> loopHeap seqNo hent - Clearing -> liftIO $ stopWith winfo sv - Waiting _ -> - if stopping - then do - r <- liftIO $ preStopCheck sv heap - if r - then liftIO $ stopWith winfo sv - else processWorkQueue prevSeqNo - else inline processWorkQueue prevSeqNo - - processWorkQueue prevSeqNo = do - yieldLimitOk <- liftIO $ decrementYieldLimit (remainingWork sv) - if yieldLimitOk - then do - work <- dequeueAhead q - case work of - Nothing -> liftIO $ stopWith winfo sv - Just (m, seqNo) -> do - if seqNo == prevSeqNo + 1 - then processWithToken q heap sv winfo m seqNo - else processWithoutToken q heap sv winfo m seqNo - else liftIO $ abortExecution sv winfo - -- We do not stop the worker on buffer full here as we want to proceed to -- nextHeap anyway so that we can clear any subsequent entries. We stop -- only in yield continuation where we may have a remaining stream to be @@ -576,6 +550,10 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry void $ liftIO $ yieldWith winfo sv a nextHeap seqNo + yieldStreamFromHeap seqNo a r = do + continue <- liftIO $ yieldWith winfo sv a + runStreamWithYieldLimit continue seqNo r + -- XXX when we have an unfinished stream on the heap we cannot account all -- the yields of that stream until it finishes, so if we have picked up -- and executed more actions beyond that in the parent stream and put them @@ -598,15 +576,93 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry r else do runIn <- askRunInIO - let ent = Entry seqNo (AheadEntryStream (runIn, r)) + let ent = Entry seqNo (AheadEntryStream (runIn, Nothing, r)) liftIO $ do requeueOnHeapTop heap ent seqNo incrementYieldLimit (remainingWork sv) stopWith winfo sv - yieldStreamFromHeap seqNo a r = do - continue <- liftIO $ yieldWith winfo sv a - runStreamWithYieldLimit continue seqNo r + processWorkQueue prevSeqNo = do + yieldLimitOk <- liftIO $ decrementYieldLimit (remainingWork sv) + if yieldLimitOk + then do + work <- dequeueAhead q + case work of + Nothing -> liftIO $ stopWith winfo sv + Just (m, seqNo) -> do + if seqNo == prevSeqNo + 1 + then processWithToken q heap sv winfo m seqNo + else processWithoutToken q heap sv winfo m seqNo + else liftIO $ abortExecution sv winfo + + nextHeap prevSeqNo = do + res <- liftIO $ dequeueFromHeapSeq heap (prevSeqNo + 1) + case res of + Ready (Entry seqNo hent) -> loopHeap seqNo hent + Clearing -> liftIO $ stopWith winfo sv + Waiting _ -> + if stopping + then do + r <- liftIO $ preStopCheck sv heap + if r + then liftIO $ stopWith winfo sv + else processWorkQueue prevSeqNo + else inline processWorkQueue prevSeqNo + + -- The main loop processing the heap. The seqNo is correct in sequence, + -- this is the one we should be sending to output next. + loopHeap seqNo ent = + case ent of + AheadEntryNull -> nextHeap seqNo + AheadEntryPure a -> do + -- Use 'send' directly so that we do not account this in worker + -- latency as this will not be the real latency. + -- Don't stop the worker in this case as we are just + -- transferring available results from heap to outputQueue. + void + $ liftIO + $ sendEvent + (outputQueue sv) (outputDoorBell sv) (ChildYield a) + nextHeap seqNo + AheadEntryStream (RunInIO runin, Just a, r) -> do + let + action = do + -- XXX deduplicate this code with the same code above + void + $ liftIO + $ sendEvent + (outputQueue sv) (outputDoorBell sv) (ChildYield a) + runStreamWithYieldLimit True seqNo r + go = do + res <- liftIO $ runin action + restoreM res + if stopping + then do + stopIt <- liftIO $ preStopCheck sv heap + if stopIt + then liftIO $ do + -- put the entry back in the heap and stop + requeueOnHeapTop heap (Entry seqNo ent) seqNo + stopWith winfo sv + else go + else go + AheadEntryStream (RunInIO runin, Nothing, r) -> do + -- XXX deuplicate this code with the code above + let + action = runStreamWithYieldLimit True seqNo r + go = do + res <- liftIO $ runin action + restoreM res + if stopping + then do + stopIt <- liftIO $ preStopCheck sv heap + if stopIt + then liftIO $ do + -- put the entry back in the heap and stop + requeueOnHeapTop heap (Entry seqNo ent) seqNo + stopWith winfo sv + else go + else go {-# NOINLINE drainHeap #-} drainHeap @@ -625,6 +681,12 @@ drainHeap q heap sv winfo = do data HeapStatus = HContinue | HStop +-- XXX Rename to processOutOfOrder + +-- | Without token means the worker is working on an item which not the next in +-- sequence, therefore, the output has to be placed on the heap rather than +-- sending it directly to the output queue. +-- processWithoutToken :: MonadRunInIO m => IORef ([K.StreamK m a], Int) @@ -646,11 +708,15 @@ processWithoutToken q heap sv winfo m seqNo = do toHeap AheadEntryNull mrun = runInIO $ svarMrun sv + -- XXX When StreamD streams are converted to StreamK, even for singleton + -- streams we have a yield and a stop. That can cause perf overhead in case + -- of concurrent workers. We should always create streams with a "single" + -- continuation. r <- liftIO $ mrun $ K.foldStreamShared undefined (\a r -> do runIn <- askRunInIO - toHeap $ AheadEntryStream (runIn, K.cons a r)) + toHeap $ AheadEntryStream (runIn, Just a, r)) (toHeap . AheadEntryPure) stopk m @@ -678,6 +744,8 @@ processWithoutToken q heap sv winfo m seqNo = do writeIORef (maxHeapSize $ svarStats sv) (H.size newHp) heapOk <- liftIO $ underMaxHeap sv newHp + + -- XXX Refactor to use join points status <- case yieldRateInfo sv of Nothing -> return HContinue @@ -705,6 +773,14 @@ processWithoutToken q heap sv winfo m seqNo = do data TokenWorkerStatus = TokenContinue Int | TokenSuspend +-- XXX Rename to processInOrder + +-- | With token means this worker is working on an item which is the next in +-- sequence, therefore, it can be yielded directly to the output queue, +-- avoiding the heap. +-- +-- Before suspending the worker has the responsibility to transfer all the +-- in-sequence entries from the heap to the output queue. processWithToken :: MonadRunInIO m => IORef ([K.StreamK m a], Int) @@ -761,7 +837,7 @@ processWithToken q heap sv winfo action sno = do r else do runIn <- askRunInIO - let ent = Entry seqNo (AheadEntryStream (runIn, r)) + let ent = Entry seqNo (AheadEntryStream (runIn, Nothing, r)) liftIO $ requeueOnHeapTop heap ent seqNo liftIO $ incrementYieldLimit (remainingWork sv) return TokenSuspend @@ -870,9 +946,13 @@ getLifoSVar :: forall m a. MonadRunInIO m => RunInIO m -> Config -> IO (Channel m a) getLifoSVar mrun cfg = do outQ <- newIORef ([], 0) - -- the second component of the tuple is "Nothing" when heap is being - -- cleared, "Just n" when we are expecting sequence number n to arrive - -- before we can start clearing the heap. + -- The second component of the heap IORef tuple is: + -- + -- * "Nothing" when we are in the process of clearing the heap i.e. when + -- we are procssing the heap and transferring entries from the heap to the + -- output queue + -- * "Just n" when we are expecting sequence number n to arrive before we + -- can start clearing the heap. outH <- newIORef (H.empty, Just 0) outQMv <- newEmptyMVar active <- newIORef 0