From fd039f01cb39bf1e37993ca15610484348528943 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Tue, 13 Feb 2024 13:40:15 +0530 Subject: [PATCH] Fix work queue for parConcatMap --- src/Streamly/Internal/Data/Stream/Channel/Operations.hs | 8 ++++++-- src/Streamly/Internal/Data/Stream/Concurrent.hs | 9 ++++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/Streamly/Internal/Data/Stream/Channel/Operations.hs b/src/Streamly/Internal/Data/Stream/Channel/Operations.hs index de149f240..f73fd338a 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Operations.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Operations.hs @@ -90,9 +90,13 @@ import Test.Inspection (inspect, hasNoTypeClassesExcept) -- be read back from the SVar using 'fromSVar'. {-# INLINE toChannelK #-} toChannelK :: MonadRunInIO m => Channel m a -> K.StreamK m a -> m () -toChannelK sv m = do +toChannelK chan m = do runIn <- askRunInIO - liftIO $ enqueue sv False (runIn, m) + -- The second argument to enqeue is used in case of lazy on-demand + -- scheduling. See comments in mkEnqueue. By default we enqueue on the + -- inner work q (True). When using concatMap the outer loop is enqueued on + -- the outer work q. + liftIO $ enqueue chan True (runIn, m) -- INLINE for fromStreamK/toStreamK fusion diff --git a/src/Streamly/Internal/Data/Stream/Concurrent.hs b/src/Streamly/Internal/Data/Stream/Concurrent.hs index a8c3bcaa0..cea4f1c8f 100644 --- a/src/Streamly/Internal/Data/Stream/Concurrent.hs +++ b/src/Streamly/Internal/Data/Stream/Concurrent.hs @@ -295,7 +295,14 @@ mkEnqueue chan runner = do runInIO <- askRunInIO return $ let q stream = do - -- Enqueue the outer loop + -- When using parConcatMap with lazy dispatch we enqueue the + -- outer stream tail and then map a stream generator on the + -- head, which is also queued. If we pick both head and tail + -- with equal priority we may keep blowing up the tail into + -- more and more streams. To avoid that we give preference to + -- the inner streams when picking up for execution. This + -- requires two work queues, one for outer stream and one for + -- inner. Here we enqueue the outer loop stream. liftIO $ enqueue chan False (runInIO, runner q stream) -- XXX In case of eager dispatch we can just directly dispatch -- a worker with the tail stream here rather than first queuing