mirror of
https://github.com/composewell/streamly.git
synced 2024-10-26 11:39:05 +03:00
Use static argument transformation for better perf
It seems to make signifcant difference in CPU cycles.
This commit is contained in:
parent
13563a0032
commit
4eaeb10c97
@ -291,81 +291,88 @@ sendWorkerWait
|
||||
-> (Channel m a -> m Bool)
|
||||
-> Channel m a
|
||||
-> m ()
|
||||
sendWorkerWait eager delay dispatch sv = do
|
||||
-- Note that we are guaranteed to have at least one outstanding worker when
|
||||
-- we enter this function. So if we sleep we are guaranteed to be woken up
|
||||
-- by an outputDoorBell, when the worker exits.
|
||||
sendWorkerWait eager delay dispatch sv = go
|
||||
|
||||
liftIO $ delay sv
|
||||
(_, n) <- liftIO $ readIORef (outputQueue sv)
|
||||
when (n <= 0 || eager) $ do
|
||||
-- The queue may be empty temporarily if the worker has dequeued the
|
||||
-- work item but has not enqueued the remaining part yet. For the same
|
||||
-- reason, a worker may come back if it tries to dequeue and finds the
|
||||
-- queue empty, even though the whole work has not finished yet.
|
||||
where
|
||||
|
||||
-- If we find that the queue is empty, but it may be empty
|
||||
-- temporarily, when we checked it. If that's the case we might
|
||||
-- sleep indefinitely unless the active workers produce some
|
||||
-- output. We may deadlock specially if the otuput from the active
|
||||
-- workers depends on the future workers that we may never send.
|
||||
-- So in case the queue was temporarily empty set a flag to inform
|
||||
-- the enqueue to send us a doorbell.
|
||||
go = do
|
||||
|
||||
-- Note that this is just a best effort mechanism to avoid a
|
||||
-- deadlock. Deadlocks may still happen if for some weird reason
|
||||
-- the consuming computation shares an MVar or some other resource
|
||||
-- with the producing computation and gets blocked on that resource
|
||||
-- and therefore cannot do any pushworker to add more threads to
|
||||
-- the producer. In such cases the programmer should use a parallel
|
||||
-- style so that all the producers are scheduled immediately and
|
||||
-- unconditionally. We can also use a separate monitor thread to
|
||||
-- push workers instead of pushing them from the consumer, but then
|
||||
-- we are no longer using pull based concurrency rate adaptation.
|
||||
--
|
||||
-- XXX update this in the tutorial.
|
||||
--
|
||||
-- Having pending active workers does not mean that we are guaranteed
|
||||
-- to be woken up if we sleep. In case of Ahead streams, there may be
|
||||
-- queued items in the heap even though the outputQueue is empty, and
|
||||
-- we may have active workers which are deadlocked on those items to be
|
||||
-- processed by the consumer. We should either guarantee that any
|
||||
-- worker, before returning, clears the heap or we send a worker to
|
||||
-- clear it. Normally we always send a worker if no output is seen, but
|
||||
-- if the thread limit is reached or we are using pacing then we may
|
||||
-- not send a worker. See the concurrentApplication test in the tests,
|
||||
-- that test case requires at least one yield from the producer to not
|
||||
-- deadlock, if the last workers output is stuck in the heap then this
|
||||
-- test fails. This problem can be extended to n threads when the
|
||||
-- consumer may depend on the evaluation of next n items in the
|
||||
-- producer stream.
|
||||
-- Note that we are guaranteed to have at least one outstanding worker
|
||||
-- when we enter this function. So if we sleep we are guaranteed to be
|
||||
-- woken up by an outputDoorBell, when the worker exits.
|
||||
|
||||
-- register for the outputDoorBell before we check the queue so that if
|
||||
-- we sleep because the queue was empty we are guaranteed to get a
|
||||
-- doorbell on the next enqueue.
|
||||
liftIO $ delay sv
|
||||
(_, n) <- liftIO $ readIORef (outputQueue sv)
|
||||
when (n <= 0 || eager) $ do
|
||||
-- The queue may be empty temporarily if the worker has dequeued
|
||||
-- the work item but has not enqueued the remaining part yet. For
|
||||
-- the same reason, a worker may come back if it tries to dequeue
|
||||
-- and finds the queue empty, even though the whole work has not
|
||||
-- finished yet.
|
||||
|
||||
liftIO $ atomicModifyIORefCAS_ (needDoorBell sv) $ const True
|
||||
liftIO storeLoadBarrier
|
||||
canDoMore <- dispatch sv
|
||||
-- If we find that the queue is empty, but it may be empty
|
||||
-- temporarily, when we checked it. If that's the case we might
|
||||
-- sleep indefinitely unless the active workers produce some
|
||||
-- output. We may deadlock specially if the otuput from the active
|
||||
-- workers depends on the future workers that we may never send.
|
||||
-- So in case the queue was temporarily empty set a flag to inform
|
||||
-- the enqueue to send us a doorbell.
|
||||
|
||||
-- XXX test for the case when we miss sending a worker when the worker
|
||||
-- count is more than 1500.
|
||||
--
|
||||
-- XXX Assert here that if the heap is not empty then there is at
|
||||
-- least one outstanding worker. Otherwise we could be sleeping
|
||||
-- forever.
|
||||
-- Note that this is just a best effort mechanism to avoid a
|
||||
-- deadlock. Deadlocks may still happen if for some weird reason
|
||||
-- the consuming computation shares an MVar or some other resource
|
||||
-- with the producing computation and gets blocked on that resource
|
||||
-- and therefore cannot do any pushworker to add more threads to
|
||||
-- the producer. In such cases the programmer should use a parallel
|
||||
-- style so that all the producers are scheduled immediately and
|
||||
-- unconditionally. We can also use a separate monitor thread to
|
||||
-- push workers instead of pushing them from the consumer, but then
|
||||
-- we are no longer using pull based concurrency rate adaptation.
|
||||
--
|
||||
-- XXX update this in the tutorial.
|
||||
--
|
||||
-- Having pending active workers does not mean that we are
|
||||
-- guaranteed to be woken up if we sleep. In case of Ahead streams,
|
||||
-- there may be queued items in the heap even though the
|
||||
-- outputQueue is empty, and we may have active workers which are
|
||||
-- deadlocked on those items to be processed by the consumer. We
|
||||
-- should either guarantee that any worker, before returning,
|
||||
-- clears the heap or we send a worker to clear it. Normally we
|
||||
-- always send a worker if no output is seen, but if the thread
|
||||
-- limit is reached or we are using pacing then we may not send a
|
||||
-- worker. See the concurrentApplication test in the tests, that
|
||||
-- test case requires at least one yield from the producer to not
|
||||
-- deadlock, if the last workers output is stuck in the heap then
|
||||
-- this test fails. This problem can be extended to n threads when
|
||||
-- the consumer may depend on the evaluation of next n items in the
|
||||
-- producer stream.
|
||||
|
||||
if canDoMore
|
||||
then sendWorkerWait eager delay dispatch sv
|
||||
else do
|
||||
liftIO
|
||||
$ withDiagMVar
|
||||
(svarInspectMode sv)
|
||||
(dumpSVar sv)
|
||||
"sendWorkerWait: nothing to do"
|
||||
$ takeMVar (outputDoorBell sv)
|
||||
(_, len) <- liftIO $ readIORef (outputQueue sv)
|
||||
when (len <= 0) $ sendWorkerWait eager delay dispatch sv
|
||||
-- register for the outputDoorBell before we check the queue so
|
||||
-- that if we sleep because the queue was empty we are guaranteed
|
||||
-- to get a doorbell on the next enqueue.
|
||||
|
||||
liftIO $ atomicModifyIORefCAS_ (needDoorBell sv) $ const True
|
||||
liftIO storeLoadBarrier
|
||||
canDoMore <- dispatch sv
|
||||
|
||||
-- XXX test for the case when we miss sending a worker when the
|
||||
-- worker count is more than 1500.
|
||||
--
|
||||
-- XXX Assert here that if the heap is not empty then there is at
|
||||
-- least one outstanding worker. Otherwise we could be sleeping
|
||||
-- forever.
|
||||
|
||||
if canDoMore
|
||||
then go
|
||||
else do
|
||||
liftIO
|
||||
$ withDiagMVar
|
||||
(svarInspectMode sv)
|
||||
(dumpSVar sv)
|
||||
"sendWorkerWait: nothing to do"
|
||||
$ takeMVar (outputDoorBell sv)
|
||||
(_, len) <- liftIO $ readIORef (outputQueue sv)
|
||||
when (len <= 0) go
|
||||
|
||||
-- | Start the evaluation of the channel's work queue by kicking off a worker.
|
||||
-- Note: Work queue must not be empty otherwise the worker will exit without
|
||||
|
Loading…
Reference in New Issue
Block a user