mirror of
https://github.com/composewell/streamly.git
synced 2024-10-05 15:29:09 +03:00
Remove the two queue solution for async streams
Since the stream tail is always a single entry in the queue, this cannot help.
This commit is contained in:
parent
a0f352938a
commit
9992b820f2
@ -134,15 +134,7 @@ mkEnqueue chan runner = do
|
||||
runInIO <- askRunInIO
|
||||
return
|
||||
$ let f stream = do
|
||||
-- When using parConcatMap with lazy dispatch we enqueue the
|
||||
-- outer stream tail and then map a stream generator on the
|
||||
-- head, which is also queued. If we pick both head and tail
|
||||
-- with equal priority we may keep blowing up the tail into
|
||||
-- more and more streams. To avoid that we give preference to
|
||||
-- the inner streams when picking up for execution. This
|
||||
-- requires two work queues, one for outer stream and one for
|
||||
-- inner. Here we enqueue the outer loop stream.
|
||||
liftIO $ enqueue chan False (runInIO, runner f stream)
|
||||
liftIO $ enqueue chan (runInIO, runner f stream)
|
||||
-- XXX In case of eager dispatch we can just directly dispatch
|
||||
-- a worker with the tail stream here rather than first queuing
|
||||
-- and then dispatching a worker which dequeues the work. The
|
||||
|
@ -48,69 +48,33 @@ import Streamly.Internal.Data.Stream.Channel.Type
|
||||
-- Concurrent streams with first-come-first serve results
|
||||
------------------------------------------------------------------------------
|
||||
|
||||
-- | We use two queues, one outer and the other inner. When entries are present
|
||||
-- in both the queues, inner queue is given preference when dequeuing.
|
||||
--
|
||||
-- Normally entries are queued to the inner queue only. The parConcatMap
|
||||
-- implementation makes use of the outer queue as well. The tail of the outer
|
||||
-- stream is queued to the outer queue whereas the inner loop streams are
|
||||
-- queued to the inner queue so that inner streams are given preference,
|
||||
-- otherwise we might just keep generating streams from the outer stream and
|
||||
-- not use them fast enough. We need to go depth first rather than breadth
|
||||
-- first.
|
||||
--
|
||||
-- If we do not use outer and inner distinction there are two problematic
|
||||
-- cases. The outer stream gets executed faster than inner and may keep adding
|
||||
-- more entries. When we queue it back on the work queue it may be the first
|
||||
-- one to be picked if it is on top of the LIFO.
|
||||
--
|
||||
-- Normally, when using parConcatMap the outer queue would have only one entry
|
||||
-- which is the tail of the outer stream. However, when manually queueing
|
||||
-- streams on the channel using 'toChannelK' you could queue to outer or inner,
|
||||
-- in which case outer queue may have multiple entries.
|
||||
--
|
||||
{-# INLINE enqueueLIFO #-}
|
||||
enqueueLIFO ::
|
||||
Channel m a
|
||||
-- | (outer queue, inner queue)
|
||||
-> IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
|
||||
-> Bool -- True means put it on inner queue, otherwise outer
|
||||
-> IORef [(RunInIO m, K.StreamK m a)]
|
||||
-> (RunInIO m, K.StreamK m a)
|
||||
-> IO ()
|
||||
enqueueLIFO sv q inner m = do
|
||||
atomicModifyIORefCAS_ q $ \(xs, ys) ->
|
||||
if inner then (xs, m : ys) else (m : xs, ys)
|
||||
enqueueLIFO sv q m = do
|
||||
atomicModifyIORefCAS_ q (m :)
|
||||
ringDoorBell (doorBellOnWorkQ sv) (outputDoorBell sv)
|
||||
|
||||
-- | We need to know whether an entry was dequeued from the outer q or inner q
|
||||
-- because when we consume it partially and q it back on the q we need to know
|
||||
-- which q to put it back on.
|
||||
data QResult a =
|
||||
QEmpty
|
||||
| QOuter a -- ^ Entry dequeued from outer q
|
||||
| QInner a -- ^ Entry dequeued from inner q
|
||||
|
||||
-- | Dequeues from inner q first and if it is empty then dequeue from the
|
||||
-- outer.
|
||||
{-# INLINE dequeue #-}
|
||||
dequeue :: MonadIO m =>
|
||||
-- | (outer queue, inner queue)
|
||||
IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
|
||||
-> m (QResult (RunInIO m, K.StreamK m a))
|
||||
IORef [(RunInIO m, K.StreamK m a)]
|
||||
-> m (Maybe (RunInIO m, K.StreamK m a))
|
||||
dequeue qref =
|
||||
liftIO
|
||||
$ atomicModifyIORefCAS qref
|
||||
$ \case
|
||||
(xs, y : ys) -> ((xs, ys), QInner y)
|
||||
(x : xs, ys) -> ((xs, ys), QOuter x)
|
||||
x -> (x, QEmpty)
|
||||
(x : xs) -> (xs, Just x)
|
||||
x -> (x, Nothing)
|
||||
|
||||
data WorkerStatus = Continue | Suspend
|
||||
|
||||
{-# INLINE workLoopLIFO #-}
|
||||
workLoopLIFO
|
||||
:: MonadRunInIO m
|
||||
=> IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
|
||||
=> IORef [(RunInIO m, K.StreamK m a)]
|
||||
-> Channel m a
|
||||
-> Maybe WorkerInfo
|
||||
-> m ()
|
||||
@ -121,14 +85,10 @@ workLoopLIFO qref sv winfo = run
|
||||
run = do
|
||||
work <- dequeue qref
|
||||
case work of
|
||||
QEmpty ->
|
||||
liftIO $ stopWith winfo sv
|
||||
QInner (RunInIO runin, m) ->
|
||||
process runin m True
|
||||
QOuter (RunInIO runin, m) ->
|
||||
process runin m False
|
||||
Nothing -> liftIO $ stopWith winfo sv
|
||||
Just (RunInIO runin, m) -> process runin m
|
||||
|
||||
process runin m inner = do
|
||||
process runin m = do
|
||||
-- XXX when we finish we need to send the monadic state back to
|
||||
-- the parent so that the state can be merged back. We capture
|
||||
-- and return the state in the stop continuation.
|
||||
@ -160,7 +120,7 @@ workLoopLIFO qref sv winfo = run
|
||||
then K.foldStreamShared undefined yieldk single (return Continue) r
|
||||
else do
|
||||
runInIO <- askRunInIO
|
||||
liftIO $ enqueueLIFO sv qref inner (runInIO, r)
|
||||
liftIO $ enqueueLIFO sv qref (runInIO, r)
|
||||
return Suspend
|
||||
|
||||
-- We duplicate workLoop for yield limit and no limit cases because it has
|
||||
@ -171,7 +131,7 @@ workLoopLIFO qref sv winfo = run
|
||||
{-# INLINE workLoopLIFOLimited #-}
|
||||
workLoopLIFOLimited
|
||||
:: forall m a. MonadRunInIO m
|
||||
=> IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
|
||||
=> IORef [(RunInIO m, K.StreamK m a)]
|
||||
-> Channel m a
|
||||
-> Maybe WorkerInfo
|
||||
-> m ()
|
||||
@ -185,14 +145,10 @@ workLoopLIFOLimited qref sv winfo = run
|
||||
run = do
|
||||
work <- dequeue qref
|
||||
case work of
|
||||
QEmpty ->
|
||||
liftIO $ stopWith winfo sv
|
||||
QInner item ->
|
||||
process item True
|
||||
QOuter item ->
|
||||
process item False
|
||||
Nothing -> liftIO $ stopWith winfo sv
|
||||
Just item -> process item
|
||||
|
||||
process item@(RunInIO runin, m) inner = do
|
||||
process item@(RunInIO runin, m) = do
|
||||
-- XXX This is just a best effort minimization of concurrency
|
||||
-- to the yield limit. If the stream is made of concurrent
|
||||
-- streams we do not reserve the yield limit in the constituent
|
||||
@ -216,7 +172,7 @@ workLoopLIFOLimited qref sv winfo = run
|
||||
-- Avoid any side effects, undo the yield limit decrement if we
|
||||
-- never yielded anything.
|
||||
else liftIO $ do
|
||||
enqueueLIFO sv qref inner item
|
||||
enqueueLIFO sv qref item
|
||||
incrementYieldLimit (remainingWork sv)
|
||||
stopWith winfo sv
|
||||
|
||||
@ -236,7 +192,7 @@ workLoopLIFOLimited qref sv winfo = run
|
||||
else do
|
||||
runInIO <- askRunInIO
|
||||
liftIO $ incrementYieldLimit (remainingWork sv)
|
||||
liftIO $ enqueueLIFO sv qref inner (runInIO, r)
|
||||
liftIO $ enqueueLIFO sv qref (runInIO, r)
|
||||
return Suspend
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
@ -923,10 +879,7 @@ getLifoSVar mrun cfg = do
|
||||
active <- newIORef 0
|
||||
wfw <- newIORef False
|
||||
running <- newIORef Set.empty
|
||||
q <- newIORef
|
||||
( [] :: [(RunInIO m, K.StreamK m a)]
|
||||
, [] :: [(RunInIO m, K.StreamK m a)]
|
||||
)
|
||||
q <- newIORef ([] :: [(RunInIO m, K.StreamK m a)])
|
||||
-- Sequence number is incremented whenever something is de-queued,
|
||||
-- therefore, first sequence number would be 0
|
||||
aheadQ <- newIORef ([], -1)
|
||||
@ -943,8 +896,8 @@ getLifoSVar mrun cfg = do
|
||||
-- We are reading it without lock, the result would be reliable only if no
|
||||
-- worker is pending.
|
||||
let isWorkFinished _ = do
|
||||
(xs, ys) <- readIORef q
|
||||
return (null xs && null ys)
|
||||
xs <- readIORef q
|
||||
return (null xs)
|
||||
|
||||
let isWorkFinishedLimited sv = do
|
||||
yieldsDone <-
|
||||
@ -963,7 +916,7 @@ getLifoSVar mrun cfg = do
|
||||
-> (Channel m a -> m [ChildEvent a])
|
||||
-> (Channel m a -> m Bool)
|
||||
-> (Channel m a -> IO Bool)
|
||||
-> (IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
|
||||
-> (IORef [(RunInIO m, K.StreamK m a)]
|
||||
-> Channel m a
|
||||
-> Maybe WorkerInfo
|
||||
-> m())
|
||||
@ -984,10 +937,9 @@ getLifoSVar mrun cfg = do
|
||||
then workLoopAhead aheadQ outH sv
|
||||
else wloop q sv
|
||||
, enqueue =
|
||||
\inner ->
|
||||
if inOrder
|
||||
then enqueueAhead sv aheadQ
|
||||
else enqueueLIFO sv q inner
|
||||
else enqueueLIFO sv q
|
||||
, eagerDispatch = when eagerEval $ void $ dispatchWorker 0 sv
|
||||
, isWorkDone =
|
||||
if inOrder
|
||||
|
@ -198,7 +198,7 @@ getFifoSVar mrun cfg = do
|
||||
, postProcess = postProc sv
|
||||
, workerThreads = running
|
||||
, workLoop = wloop q sv
|
||||
, enqueue = \_ -> enqueueFIFO sv q
|
||||
, enqueue = enqueueFIFO sv q
|
||||
, eagerDispatch = return ()
|
||||
, isWorkDone = workDone sv
|
||||
, isQueueDone = workDone sv
|
||||
|
@ -117,11 +117,7 @@ import Test.Inspection (inspect, hasNoTypeClassesExcept)
|
||||
toChannelK :: MonadRunInIO m => Channel m a -> K.StreamK m a -> m ()
|
||||
toChannelK chan m = do
|
||||
runIn <- askRunInIO
|
||||
-- The second argument to enqeue is used in case of lazy on-demand
|
||||
-- scheduling. See comments in mkEnqueue. By default we enqueue on the
|
||||
-- inner work q (True). When using concatMap the outer loop is enqueued on
|
||||
-- the outer work q.
|
||||
liftIO $ enqueue chan True (runIn, m)
|
||||
liftIO $ enqueue chan (runIn, m)
|
||||
|
||||
-- INLINE for fromStreamK/toStreamK fusion
|
||||
|
||||
|
@ -181,13 +181,10 @@ data Channel m a = Channel
|
||||
-- as there is work to do.
|
||||
, eagerDispatch :: m ()
|
||||
|
||||
-- | Enqueue a stream for evaluation on the channel. The first argument is
|
||||
-- used only when 'ordered' or 'interleaved' is NOT set. In that case the
|
||||
-- queue has two priority levels, True means higher priority and False
|
||||
-- means lower priority. The first element of the tuple is the runner
|
||||
-- function which is used to run the stream actions in a specific monadic
|
||||
-- context.
|
||||
, enqueue :: Bool -> (RunInIO m, StreamK m a) -> IO ()
|
||||
-- | Enqueue a stream for evaluation on the channel. The first element of
|
||||
-- the tuple is the runner function which is used to run the stream actions
|
||||
-- in a specific monadic context.
|
||||
, enqueue :: (RunInIO m, StreamK m a) -> IO ()
|
||||
|
||||
-- | Determine if the work queue is empty, therefore, there is no more work
|
||||
-- to do.
|
||||
|
Loading…
Reference in New Issue
Block a user