Fix work queue for parConcatMap

This commit is contained in:
Harendra Kumar 2024-02-13 13:40:15 +05:30
parent 8722c4ce9f
commit fd039f01cb
2 changed files with 14 additions and 3 deletions

View File

@ -90,9 +90,13 @@ import Test.Inspection (inspect, hasNoTypeClassesExcept)
-- be read back from the SVar using 'fromSVar'.
{-# INLINE toChannelK #-}
toChannelK :: MonadRunInIO m => Channel m a -> K.StreamK m a -> m ()
toChannelK sv m = do
toChannelK chan m = do
runIn <- askRunInIO
liftIO $ enqueue sv False (runIn, m)
-- The second argument to enqeue is used in case of lazy on-demand
-- scheduling. See comments in mkEnqueue. By default we enqueue on the
-- inner work q (True). When using concatMap the outer loop is enqueued on
-- the outer work q.
liftIO $ enqueue chan True (runIn, m)
-- INLINE for fromStreamK/toStreamK fusion

View File

@ -295,7 +295,14 @@ mkEnqueue chan runner = do
runInIO <- askRunInIO
return
$ let q stream = do
-- Enqueue the outer loop
-- When using parConcatMap with lazy dispatch we enqueue the
-- outer stream tail and then map a stream generator on the
-- head, which is also queued. If we pick both head and tail
-- with equal priority we may keep blowing up the tail into
-- more and more streams. To avoid that we give preference to
-- the inner streams when picking up for execution. This
-- requires two work queues, one for outer stream and one for
-- inner. Here we enqueue the outer loop stream.
liftIO $ enqueue chan False (runInIO, runner q stream)
-- XXX In case of eager dispatch we can just directly dispatch
-- a worker with the tail stream here rather than first queuing