implement rateBuffer to forget distant gains/losses

This commit is contained in:
Harendra Kumar 2018-08-25 03:40:26 +05:30
parent 482b971ebb
commit ebdfa837f2
4 changed files with 83 additions and 40 deletions

View File

@ -235,16 +235,32 @@ data WorkerInfo = WorkerInfo
, workerLatencyStart :: IORef (Count, TimeSpec)
}
-- | Specify the stream yield rate in yields per second (@Hertz@). We try to
-- keep the average rate at 'rateGoal', if the actual average rate goes above
-- or below the goal we try to recover it by increasing or decreasing the
-- instantaneous rate but keeping it within 'rateLow' and 'rateHigh' limits.
-- | Specifies the stream yield rate in yields per second (@Hertz@).
-- We keep accumulating yield credits at 'rateGoal'. At any point of time we
-- allow only as many yields as we have accumulated as per 'rateGoal' since the
-- start of time. If the consumer or the producer is slower or faster, the
-- actual rate may fall behind or exceed 'rateGoal'. We try to recover the gap
-- between the two by increasing or decreasing the pull rate from the producer.
-- However, if the gap becomes more than 'rateBuffer' we try to recover only as
-- much as 'rateBuffer'.
--
-- 'rateLow' puts a bound on how low the instantaneous rate can go when
-- recovering the rate gap. In other words, it determines the maximum yield
-- latency. Similarly, 'rateHigh' puts a bound on how high the instantaneous
-- rate can go when recovering the rate gap. In other words, it determines the
-- minimum yield latency. We reduce the latency by increasing concurrency,
-- therefore we can say that it puts an upper bound on concurrency.
--
-- If the 'rateGoal' is 0 or negative the stream never yields a value.
-- If the 'rateBuffer' is 0 or negative we do not attempt to recover.
--
-- @since 0.5.0
data Rate = Rate
{ rateLow :: Double -- ^ The lower rate limit
, rateGoal :: Double -- ^ The target rate we want to achieve
, rateHigh :: Double -- ^ The upper rate limit
, rateBuffer :: Int -- ^ Maximum slack from the goal
}
data LatencyRange = LatencyRange
@ -256,6 +272,8 @@ data LatencyRange = LatencyRange
data YieldRateInfo = YieldRateInfo
{ svarLatencyTarget :: NanoSecs
, svarLatencyRange :: LatencyRange
, svarRateBuffer :: Int
, svarGainedLostYields :: IORef Count
-- Actual latency/througput as seen from the consumer side, we count the
-- yields and the time it took to generates those yields. This is used to
@ -585,22 +603,23 @@ dumpSVarStats sv ss style = do
minLat <- readIORef $ minWorkerLatency ss
maxLat <- readIORef $ maxWorkerLatency ss
(avgCnt, avgTime) <- readIORef $ avgWorkerLatency ss
(svarCnt, svarLat) <- case yieldRateInfo sv of
Nothing -> return (0, 0)
(svarCnt, svarGainLossCnt, svarLat) <- case yieldRateInfo sv of
Nothing -> return (0, 0, 0)
Just yinfo -> do
(cnt, startTime) <- readIORef $ svarAllTimeLatency yinfo
if cnt > 0
then do
t <- readIORef (svarStopTime ss)
gl <- readIORef (svarGainedLostYields yinfo)
case t of
Nothing -> do
now <- getTime Monotonic
let interval = toNanoSecs (now - startTime)
return $ (cnt, interval `div` fromIntegral cnt)
return $ (cnt, gl, interval `div` fromIntegral cnt)
Just stopTime -> do
let interval = toNanoSecs (stopTime - startTime)
return $ (cnt, interval `div` fromIntegral cnt)
else return (0, 0)
return $ (cnt, gl, interval `div` fromIntegral cnt)
else return (0, 0, 0)
return $ unlines
[ "total dispatches = " ++ show dispatches
@ -629,6 +648,9 @@ dumpSVarStats sv ss style = do
++ (if svarCnt > 0
then "\nSVar yield count = " ++ show svarCnt
else "")
++ (if svarGainLossCnt > 0
then "\nSVar gain/loss yield count = " ++ show svarGainLossCnt
else "")
]
{-# NOINLINE dumpSVar #-}
@ -1301,12 +1323,14 @@ data Work
estimateWorkers
:: Limit
-> Count
-> Count
-> NanoSecs
-> NanoSecs
-> NanoSecs
-> LatencyRange
-> Work
estimateWorkers workerLimit svarYields svarElapsed wLatency targetLat range =
estimateWorkers workerLimit svarYields gainLossYields
svarElapsed wLatency targetLat range =
-- XXX we can have a maxEfficiency combinator as well which runs the
-- producer at the maximal efficiency i.e. the number of workers are chosen
-- such that the latency is minimum or within a range. Or we can call it
@ -1339,7 +1363,8 @@ estimateWorkers workerLimit svarYields svarElapsed wLatency targetLat range =
-- When the worker latency is lower than required latency we begin with
-- a yield and then wait rather than first waiting and then yielding.
targetYields = (svarElapsed + wLatency + targetLat - 1) `div` targetLat
deltaYields = fromIntegral targetYields - svarYields
effectiveYields = svarYields + gainLossYields
deltaYields = fromIntegral targetYields - effectiveYields
-- We recover the deficit by running at a higher/lower rate for a
-- certain amount of time. To keep the effective rate in reasonable
@ -1362,7 +1387,7 @@ estimateWorkers workerLimit svarYields svarElapsed wLatency targetLat range =
$ withLimit
$ wLatency `div` adjustedLat) deltaYields
else
let expectedDuration = (fromIntegral svarYields) * targetLat
let expectedDuration = fromIntegral effectiveYields * targetLat
sleepTime = expectedDuration - svarElapsed
maxSleepTime = maxLatency range - wLatency
s = min sleepTime maxSleepTime
@ -1407,8 +1432,9 @@ isBeyondMaxRate sv yinfo = do
now <- getTime Monotonic
let duration = fromInteger $ toNanoSecs $ now - tstamp
let targetLat = svarLatencyTarget yinfo
let work = estimateWorkers (maxWorkerLimit sv) count duration wLatency
targetLat (svarLatencyRange yinfo)
gainLoss <- readIORef (svarGainedLostYields yinfo)
let work = estimateWorkers (maxWorkerLimit sv) count gainLoss duration
wLatency targetLat (svarLatencyRange yinfo)
cnt <- readIORef $ workerCount sv
return $ case work of
-- XXX set the worker's maxYields or polling interval based on yields
@ -1536,7 +1562,8 @@ dispatchWorkerPaced sv = do
let workerLimit = maxWorkerLimit sv
let targetLat = svarLatencyTarget yinfo
let range = svarLatencyRange yinfo
let work = estimateWorkers workerLimit svarYields svarElapsed
gainLoss <- liftIO $ readIORef (svarGainedLostYields yinfo)
let work = estimateWorkers workerLimit svarYields gainLoss svarElapsed
wLatency targetLat range
-- XXX we need to take yieldLimit into account here. If we are at the
@ -1563,12 +1590,15 @@ dispatchWorkerPaced sv = do
return False
PartialWorker yields -> do
assert (yields > 0) (return ())
updateGainedLostYields yinfo yields
done <- allThreadsDone sv
when done $ void $ dispatchWorker yields sv
return False
ManyWorkers netWorkers yields -> do
assert (netWorkers >= 1) (return ())
assert (yields >= 0) (return ())
updateGainedLostYields yinfo yields
let periodRef = workerPollingInterval yinfo
ycnt = max 1 $ yields `div` fromIntegral netWorkers
@ -1595,6 +1625,15 @@ dispatchWorkerPaced sv = do
where
updateGainedLostYields yinfo yields = do
let buf = fromIntegral $ svarRateBuffer yinfo
when (yields /= 0 && abs yields > buf) $ do
let delta =
if yields > 0
then yields - buf
else yields + buf
liftIO $ modifyIORef (svarGainedLostYields yinfo) (+ delta)
dispatchN n = do
if n == 0
then return True
@ -1806,16 +1845,16 @@ getYieldRateInfo st = do
-- convert rate in Hertz to latency in Nanoseconds
let rateToLatency r = if r <= 0 then maxBound else round $ 1.0e9 / r
case getStreamRate st of
Just (Rate low goal high) ->
Just (Rate low goal high buf) ->
let l = rateToLatency goal
minl = rateToLatency high
maxl = rateToLatency low
in mkYieldRateInfo l (LatencyRange minl maxl)
in mkYieldRateInfo l (LatencyRange minl maxl) buf
Nothing -> return Nothing
where
mkYieldRateInfo latency latRange = do
mkYieldRateInfo latency latRange buf = do
measured <- newIORef 0
wcur <- newIORef (0,0)
wcol <- newIORef (0,0)
@ -1823,10 +1862,13 @@ getYieldRateInfo st = do
wlong <- newIORef (0,now)
period <- newIORef 1
stopTime <- newIORef $ fromNanoSecs 0
gainLoss <- newIORef (Count 0)
return $ Just YieldRateInfo
{ svarLatencyTarget = latency
, svarLatencyRange = latRange
, svarRateBuffer = buf
, svarGainedLostYields = gainLoss
, workerBootstrapLatency = getStreamLatency st
, workerPollingInterval = period
, workerMeasuredLatency = measured

View File

@ -188,11 +188,9 @@ maxBufferSerial :: Int -> SerialT m a -> SerialT m a
maxBufferSerial _ = id
-}
-- | Specify the pull rate of a stream in number of yields per second
-- (i.e. @Hertz@). A 'Nothing' value resets the rate to default which is
-- unlimited. If the specified rate is 0 or negative the stream never yields a
-- value.
-- When the rate is specified, concurrent production may be ramped up or down
-- | Specify the pull rate of a stream.
-- A 'Nothing' value resets the rate to default which is unlimited. When the
-- rate is specified, concurrent production may be ramped up or down
-- automatically to achieve the specified yield rate. The specific behavior for
-- different styles of 'Rate' specifications is documented under 'Rate'. The
-- effective maximum production rate achieved by a stream is governed by:
@ -207,11 +205,11 @@ maxBufferSerial _ = id
rate :: IsStream t => Maybe Rate -> t m a -> t m a
rate r m = fromStream $ Stream $ \st stp sng yld -> do
case r of
Just (Rate low goal _) | goal < low ->
Just (Rate low goal _ _) | goal < low ->
error "rate: Target rate cannot be lower than minimum rate."
Just (Rate _ goal high) | goal > high ->
Just (Rate _ goal high _) | goal > high ->
error "rate: Target rate cannot be greater than maximum rate."
Just (Rate low _ high) | low > high ->
Just (Rate low _ high _) | low > high ->
error "rate: Minimum rate cannot be greater than maximum rate."
_ -> unStream (toStream m) (setStreamRate r st) stp sng yld
@ -221,7 +219,7 @@ yieldRateSerial :: Double -> SerialT m a -> SerialT m a
yieldRateSerial _ = id
-}
-- | Same as @rate (Just $ Rate (r/2) r (2*r))@
-- | Same as @rate (Just $ Rate (r/2) r (2*r) maxBound)@
--
-- Specifies the average production rate of a stream in number of yields
-- per second (i.e. @Hertz@). Concurrent production is ramped up or down
@ -231,9 +229,9 @@ yieldRateSerial _ = id
--
-- @since 0.5.0
avgRate :: IsStream t => Double -> t m a -> t m a
avgRate r = rate (Just $ Rate (r/2) r (2*r))
avgRate r = rate (Just $ Rate (r/2) r (2*r) maxBound)
-- | Same as @rate (Just $ Rate r r (2*r))@
-- | Same as @rate (Just $ Rate r r (2*r) maxBound)@
--
-- Specifies the minimum rate at which the stream should yield values. As
-- far as possible the yield rate would never be allowed to go below the
@ -242,9 +240,9 @@ avgRate r = rate (Just $ Rate (r/2) r (2*r))
--
-- @since 0.5.0
minRate :: IsStream t => Double -> t m a -> t m a
minRate r = rate (Just $ Rate r r (2*r))
minRate r = rate (Just $ Rate r r (2*r) maxBound)
-- | Same as @rate (Just $ Rate (r/2) r r)@
-- | Same as @rate (Just $ Rate (r/2) r r maxBound)@
--
-- Specifies the maximum rate at which the stream should yield values. As
-- far as possible the yield rate would never be allowed to go above the
@ -255,9 +253,9 @@ minRate r = rate (Just $ Rate r r (2*r))
--
-- @since 0.5.0
maxRate :: IsStream t => Double -> t m a -> t m a
maxRate r = rate (Just $ Rate (r/2) r r)
maxRate r = rate (Just $ Rate (r/2) r r maxBound)
-- | Same as @rate (Just $ Rate r r r)@
-- | Same as @rate (Just $ Rate r r r 0)@
--
-- Specifies a constant yield rate. If for some reason the actual rate
-- goes above or below the specified rate we do not try to recover it by
@ -267,7 +265,7 @@ maxRate r = rate (Just $ Rate (r/2) r r)
--
-- @since 0.5.0
constRate :: IsStream t => Double -> t m a -> t m a
constRate r = rate (Just $ Rate r r r)
constRate r = rate (Just $ Rate r r r 0)
-- | Specify the average latency, in nanoseconds, of a single threaded action
-- in a concurrent composition. Streamly can measure the latencies, but that is

View File

@ -115,8 +115,11 @@ main = hspec $ do
forM_ rates (\r -> measureRate "aheadly" aheadly r 0 1 range)
describe "asyncly with 1 sec producer delay and some consumer delay" $ do
-- ideally it should take 10 x 1 + 1 seconds
forM_ [1] (\r -> measureRate "asyncly" asyncly r 1 1 (11, 16))
-- ideally it should take 10 x 2 + 1 seconds
forM_ [1] (\r -> measureRate "asyncly" asyncly r 2 1 (21, 23))
-- ideally it should take 10 x 3 + 1 seconds
forM_ [1] (\r -> measureRate "asyncly" asyncly r 3 1 (31, 33))
describe "aheadly with 1 sec producer delay and some consumer delay" $ do

View File

@ -14,7 +14,7 @@ withTimeStamp msg = do
-- acidRain :: MonadAsync m => SerialT m Event
producer =
asyncly
$ rate (Just $ AvgRate 1)
$ avgRate 1
$ S.repeatM
$ liftIO $ do
withTimeStamp "produced"