mirror of
https://github.com/composewell/streamly.git
synced 2024-10-06 15:58:20 +03:00
Add some locking notes
This commit is contained in:
parent
6fd821cedc
commit
a7556e89bb
@ -258,6 +258,9 @@ data YieldRateInfo = YieldRateInfo
|
||||
{ svarLatencyTarget :: NanoSecond64
|
||||
, svarLatencyRange :: LatencyRange
|
||||
, svarRateBuffer :: Int
|
||||
|
||||
-- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
|
||||
-- read by the worker threads
|
||||
, svarGainedLostYields :: IORef Count
|
||||
|
||||
-- Actual latency/througput as seen from the consumer side, we count the
|
||||
@ -266,6 +269,8 @@ data YieldRateInfo = YieldRateInfo
|
||||
-- rate. The idle time of workers is adjusted in this, so that we only
|
||||
-- account for the rate when the consumer actually demands data.
|
||||
-- XXX interval latency is enough, we can move this under diagnostics build
|
||||
-- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
|
||||
-- read by the worker threads
|
||||
, svarAllTimeLatency :: IORef (Count, AbsTime)
|
||||
|
||||
-- XXX Worker latency specified by the user to be used before the first
|
||||
@ -278,6 +283,8 @@ data YieldRateInfo = YieldRateInfo
|
||||
-- long time, in such cases the consumer can change it.
|
||||
-- 0 means no latency computation
|
||||
-- XXX this is derivable from workerMeasuredLatency, can be removed.
|
||||
-- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
|
||||
-- read by the worker threads
|
||||
, workerPollingInterval :: IORef Count
|
||||
|
||||
-- This is in progress latency stats maintained by the workers which we
|
||||
@ -286,6 +293,10 @@ data YieldRateInfo = YieldRateInfo
|
||||
-- is all yields, the second count is only those yields for which the
|
||||
-- latency was measured to be non-zero (note that if the timer resolution
|
||||
-- is low the measured latency may be zero e.g. on JS platform).
|
||||
-- [LOCKING] Locked access. Modified by the consumer thread as well as
|
||||
-- worker threads. Workers modify it periodically based on
|
||||
-- workerPollingInterval and not on every yield to reduce the locking
|
||||
-- overhead.
|
||||
-- (allYieldCount, yieldCount, timeTaken)
|
||||
, workerPendingLatency :: IORef (Count, Count, NanoSecond64)
|
||||
|
||||
@ -294,10 +305,14 @@ data YieldRateInfo = YieldRateInfo
|
||||
-- bucket until we have stats for a sufficient period and then we reset it
|
||||
-- to start collecting for the next period and retain the computed average
|
||||
-- latency for the last period in workerMeasuredLatency.
|
||||
-- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
|
||||
-- read by the worker threads
|
||||
-- (allYieldCount, yieldCount, timeTaken)
|
||||
, workerCollectedLatency :: IORef (Count, Count, NanoSecond64)
|
||||
|
||||
-- Latency as measured by workers, aggregated for the last period.
|
||||
-- [LOCKING] Unlocked access. Modified by the consumer thread and unsafely
|
||||
-- read by the worker threads
|
||||
, workerMeasuredLatency :: IORef NanoSecond64
|
||||
}
|
||||
|
||||
@ -313,6 +328,7 @@ data SVarStats = SVarStats {
|
||||
, svarStopTime :: IORef (Maybe AbsTime)
|
||||
}
|
||||
|
||||
-- This is essentially a 'Maybe Word' type
|
||||
data Limit = Unlimited | Limited Word deriving Show
|
||||
|
||||
-- When to stop the composed stream.
|
||||
@ -341,7 +357,19 @@ data SVar t m a = SVar
|
||||
-- avoid constructing and reversing the list. Possibly we can also avoid
|
||||
-- the GC copying overhead. When the size increases we should be able to
|
||||
-- allocate the array in chunks.
|
||||
--
|
||||
-- [LOCKING] Frequent locked access. This is updated by workers on each
|
||||
-- yield and once in a while read by the consumer thread. This could have
|
||||
-- big locking overhead if the number of workers is high.
|
||||
--
|
||||
-- XXX We can use a per-CPU data structure to reduce the locking overhead.
|
||||
-- However, a per-cpu structure cannot guarantee the exact sequence in
|
||||
-- which the elements were added, though that may not be important.
|
||||
, outputQueue :: IORef ([ChildEvent a], Int)
|
||||
|
||||
-- [LOCKING] Infrequent MVar. Used when the outputQ transitions from empty
|
||||
-- to non-empty, or a work item is queued by a worker to the work queue and
|
||||
-- needDoorBell is set by the consumer.
|
||||
, outputDoorBell :: MVar () -- signal the consumer about output
|
||||
, readOutputQ :: m [ChildEvent a]
|
||||
, postProcess :: m Bool
|
||||
@ -360,7 +388,13 @@ data SVar t m a = SVar
|
||||
, workLoop :: Maybe WorkerInfo -> m ()
|
||||
|
||||
-- Shared, thread tracking
|
||||
-- [LOCKING] Updated unlocked only by consumer thread in case of
|
||||
-- Async/Ahead style SVars. Updated locked by worker threads in case of
|
||||
-- Parallel style.
|
||||
, workerThreads :: IORef (Set ThreadId)
|
||||
-- [LOCKING] Updated locked by consumer thread when dispatching a worker
|
||||
-- and by the worker threads when the thread stops. This is read unsafely
|
||||
-- at several places where we want to rely on an approximate value.
|
||||
, workerCount :: IORef Int
|
||||
, accountThread :: ThreadId -> m ()
|
||||
, workerStopMVar :: MVar ()
|
||||
@ -852,8 +886,12 @@ doFork action (RunInIO mrun) exHandler =
|
||||
-- if the application is distributed then inc/dec of a shared variable may be
|
||||
-- very costly.
|
||||
--
|
||||
-- A worker decrements the yield limit before it executes an action. However,
|
||||
-- the action may not result in an element being yielded, in that case we have
|
||||
-- to increment the yield limit.
|
||||
--
|
||||
-- Note that we need it to be an Int type so that we have the ability to undo a
|
||||
-- decrement that takes below zero.
|
||||
-- decrement that takes it below zero.
|
||||
{-# INLINE decrementYieldLimit #-}
|
||||
decrementYieldLimit :: SVar t m a -> IO Bool
|
||||
decrementYieldLimit sv =
|
||||
@ -1426,9 +1464,8 @@ dispatchWorker yieldCount sv = do
|
||||
if not done
|
||||
then do
|
||||
qDone <- liftIO $ isQueueDone sv
|
||||
-- Note that the worker count is only decremented during event
|
||||
-- processing in fromStreamVar and therefore it is safe to read and
|
||||
-- use it without a lock.
|
||||
-- This count may not be accurate as it is decremented by the workers
|
||||
-- and we have no synchronization with that decrement.
|
||||
active <- liftIO $ readIORef $ workerCount sv
|
||||
if not qDone
|
||||
then do
|
||||
|
Loading…
Reference in New Issue
Block a user