From ebdfa837f28455bfdf2590bb9ba4558ec770ba42 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Sat, 25 Aug 2018 03:40:26 +0530 Subject: [PATCH] implement rateBuffer to forget distant gains/losses --- src/Streamly/SVar.hs | 88 ++++++++++++++++++++++++++---------- src/Streamly/Streams/SVar.hs | 30 ++++++------ test/MaxRate.hs | 3 ++ test/rate-timestamp.hs | 2 +- 4 files changed, 83 insertions(+), 40 deletions(-) diff --git a/src/Streamly/SVar.hs b/src/Streamly/SVar.hs index 173014fbf..773956463 100644 --- a/src/Streamly/SVar.hs +++ b/src/Streamly/SVar.hs @@ -235,16 +235,32 @@ data WorkerInfo = WorkerInfo , workerLatencyStart :: IORef (Count, TimeSpec) } --- | Specify the stream yield rate in yields per second (@Hertz@). We try to --- keep the average rate at 'rateGoal', if the actual average rate goes above --- or below the goal we try to recover it by increasing or decreasing the --- instantaneous rate but keeping it within 'rateLow' and 'rateHigh' limits. + +-- | Specifies the stream yield rate in yields per second (@Hertz@). +-- We keep accumulating yield credits at 'rateGoal'. At any point of time we +-- allow only as many yields as we have accumulated as per 'rateGoal' since the +-- start of time. If the consumer or the producer is slower or faster, the +-- actual rate may fall behind or exceed 'rateGoal'. We try to recover the gap +-- between the two by increasing or decreasing the pull rate from the producer. +-- However, if the gap becomes more than 'rateBuffer' we try to recover only as +-- much as 'rateBuffer'. +-- +-- 'rateLow' puts a bound on how low the instantaneous rate can go when +-- recovering the rate gap. In other words, it determines the maximum yield +-- latency. Similarly, 'rateHigh' puts a bound on how high the instantaneous +-- rate can go when recovering the rate gap. In other words, it determines the +-- minimum yield latency. We reduce the latency by increasing concurrency, +-- therefore we can say that it puts an upper bound on concurrency. +-- +-- If the 'rateGoal' is 0 or negative the stream never yields a value. +-- If the 'rateBuffer' is 0 or negative we do not attempt to recover. -- -- @since 0.5.0 data Rate = Rate - { rateLow :: Double -- ^ The lower rate limit - , rateGoal :: Double -- ^ The target rate we want to achieve - , rateHigh :: Double -- ^ The upper rate limit + { rateLow :: Double -- ^ The lower rate limit + , rateGoal :: Double -- ^ The target rate we want to achieve + , rateHigh :: Double -- ^ The upper rate limit + , rateBuffer :: Int -- ^ Maximum slack from the goal } data LatencyRange = LatencyRange @@ -254,8 +270,10 @@ data LatencyRange = LatencyRange -- Rate control. data YieldRateInfo = YieldRateInfo - { svarLatencyTarget :: NanoSecs - , svarLatencyRange :: LatencyRange + { svarLatencyTarget :: NanoSecs + , svarLatencyRange :: LatencyRange + , svarRateBuffer :: Int + , svarGainedLostYields :: IORef Count -- Actual latency/througput as seen from the consumer side, we count the -- yields and the time it took to generates those yields. This is used to @@ -585,22 +603,23 @@ dumpSVarStats sv ss style = do minLat <- readIORef $ minWorkerLatency ss maxLat <- readIORef $ maxWorkerLatency ss (avgCnt, avgTime) <- readIORef $ avgWorkerLatency ss - (svarCnt, svarLat) <- case yieldRateInfo sv of - Nothing -> return (0, 0) + (svarCnt, svarGainLossCnt, svarLat) <- case yieldRateInfo sv of + Nothing -> return (0, 0, 0) Just yinfo -> do (cnt, startTime) <- readIORef $ svarAllTimeLatency yinfo if cnt > 0 then do t <- readIORef (svarStopTime ss) + gl <- readIORef (svarGainedLostYields yinfo) case t of Nothing -> do now <- getTime Monotonic let interval = toNanoSecs (now - startTime) - return $ (cnt, interval `div` fromIntegral cnt) + return $ (cnt, gl, interval `div` fromIntegral cnt) Just stopTime -> do let interval = toNanoSecs (stopTime - startTime) - return $ (cnt, interval `div` fromIntegral cnt) - else return (0, 0) + return $ (cnt, gl, interval `div` fromIntegral cnt) + else return (0, 0, 0) return $ unlines [ "total dispatches = " ++ show dispatches @@ -629,6 +648,9 @@ dumpSVarStats sv ss style = do ++ (if svarCnt > 0 then "\nSVar yield count = " ++ show svarCnt else "") + ++ (if svarGainLossCnt > 0 + then "\nSVar gain/loss yield count = " ++ show svarGainLossCnt + else "") ] {-# NOINLINE dumpSVar #-} @@ -1301,12 +1323,14 @@ data Work estimateWorkers :: Limit -> Count + -> Count -> NanoSecs -> NanoSecs -> NanoSecs -> LatencyRange -> Work -estimateWorkers workerLimit svarYields svarElapsed wLatency targetLat range = +estimateWorkers workerLimit svarYields gainLossYields + svarElapsed wLatency targetLat range = -- XXX we can have a maxEfficiency combinator as well which runs the -- producer at the maximal efficiency i.e. the number of workers are chosen -- such that the latency is minimum or within a range. Or we can call it @@ -1339,7 +1363,8 @@ estimateWorkers workerLimit svarYields svarElapsed wLatency targetLat range = -- When the worker latency is lower than required latency we begin with -- a yield and then wait rather than first waiting and then yielding. targetYields = (svarElapsed + wLatency + targetLat - 1) `div` targetLat - deltaYields = fromIntegral targetYields - svarYields + effectiveYields = svarYields + gainLossYields + deltaYields = fromIntegral targetYields - effectiveYields -- We recover the deficit by running at a higher/lower rate for a -- certain amount of time. To keep the effective rate in reasonable @@ -1362,7 +1387,7 @@ estimateWorkers workerLimit svarYields svarElapsed wLatency targetLat range = $ withLimit $ wLatency `div` adjustedLat) deltaYields else - let expectedDuration = (fromIntegral svarYields) * targetLat + let expectedDuration = fromIntegral effectiveYields * targetLat sleepTime = expectedDuration - svarElapsed maxSleepTime = maxLatency range - wLatency s = min sleepTime maxSleepTime @@ -1407,8 +1432,9 @@ isBeyondMaxRate sv yinfo = do now <- getTime Monotonic let duration = fromInteger $ toNanoSecs $ now - tstamp let targetLat = svarLatencyTarget yinfo - let work = estimateWorkers (maxWorkerLimit sv) count duration wLatency - targetLat (svarLatencyRange yinfo) + gainLoss <- readIORef (svarGainedLostYields yinfo) + let work = estimateWorkers (maxWorkerLimit sv) count gainLoss duration + wLatency targetLat (svarLatencyRange yinfo) cnt <- readIORef $ workerCount sv return $ case work of -- XXX set the worker's maxYields or polling interval based on yields @@ -1536,7 +1562,8 @@ dispatchWorkerPaced sv = do let workerLimit = maxWorkerLimit sv let targetLat = svarLatencyTarget yinfo let range = svarLatencyRange yinfo - let work = estimateWorkers workerLimit svarYields svarElapsed + gainLoss <- liftIO $ readIORef (svarGainedLostYields yinfo) + let work = estimateWorkers workerLimit svarYields gainLoss svarElapsed wLatency targetLat range -- XXX we need to take yieldLimit into account here. If we are at the @@ -1563,12 +1590,15 @@ dispatchWorkerPaced sv = do return False PartialWorker yields -> do assert (yields > 0) (return ()) + updateGainedLostYields yinfo yields + done <- allThreadsDone sv when done $ void $ dispatchWorker yields sv return False ManyWorkers netWorkers yields -> do assert (netWorkers >= 1) (return ()) assert (yields >= 0) (return ()) + updateGainedLostYields yinfo yields let periodRef = workerPollingInterval yinfo ycnt = max 1 $ yields `div` fromIntegral netWorkers @@ -1595,6 +1625,15 @@ dispatchWorkerPaced sv = do where + updateGainedLostYields yinfo yields = do + let buf = fromIntegral $ svarRateBuffer yinfo + when (yields /= 0 && abs yields > buf) $ do + let delta = + if yields > 0 + then yields - buf + else yields + buf + liftIO $ modifyIORef (svarGainedLostYields yinfo) (+ delta) + dispatchN n = do if n == 0 then return True @@ -1806,16 +1845,16 @@ getYieldRateInfo st = do -- convert rate in Hertz to latency in Nanoseconds let rateToLatency r = if r <= 0 then maxBound else round $ 1.0e9 / r case getStreamRate st of - Just (Rate low goal high) -> + Just (Rate low goal high buf) -> let l = rateToLatency goal minl = rateToLatency high maxl = rateToLatency low - in mkYieldRateInfo l (LatencyRange minl maxl) + in mkYieldRateInfo l (LatencyRange minl maxl) buf Nothing -> return Nothing where - mkYieldRateInfo latency latRange = do + mkYieldRateInfo latency latRange buf = do measured <- newIORef 0 wcur <- newIORef (0,0) wcol <- newIORef (0,0) @@ -1823,10 +1862,13 @@ getYieldRateInfo st = do wlong <- newIORef (0,now) period <- newIORef 1 stopTime <- newIORef $ fromNanoSecs 0 + gainLoss <- newIORef (Count 0) return $ Just YieldRateInfo { svarLatencyTarget = latency , svarLatencyRange = latRange + , svarRateBuffer = buf + , svarGainedLostYields = gainLoss , workerBootstrapLatency = getStreamLatency st , workerPollingInterval = period , workerMeasuredLatency = measured diff --git a/src/Streamly/Streams/SVar.hs b/src/Streamly/Streams/SVar.hs index 977f8ddcf..ceb9fe2bc 100644 --- a/src/Streamly/Streams/SVar.hs +++ b/src/Streamly/Streams/SVar.hs @@ -188,11 +188,9 @@ maxBufferSerial :: Int -> SerialT m a -> SerialT m a maxBufferSerial _ = id -} --- | Specify the pull rate of a stream in number of yields per second --- (i.e. @Hertz@). A 'Nothing' value resets the rate to default which is --- unlimited. If the specified rate is 0 or negative the stream never yields a --- value. --- When the rate is specified, concurrent production may be ramped up or down +-- | Specify the pull rate of a stream. +-- A 'Nothing' value resets the rate to default which is unlimited. When the +-- rate is specified, concurrent production may be ramped up or down -- automatically to achieve the specified yield rate. The specific behavior for -- different styles of 'Rate' specifications is documented under 'Rate'. The -- effective maximum production rate achieved by a stream is governed by: @@ -207,11 +205,11 @@ maxBufferSerial _ = id rate :: IsStream t => Maybe Rate -> t m a -> t m a rate r m = fromStream $ Stream $ \st stp sng yld -> do case r of - Just (Rate low goal _) | goal < low -> + Just (Rate low goal _ _) | goal < low -> error "rate: Target rate cannot be lower than minimum rate." - Just (Rate _ goal high) | goal > high -> + Just (Rate _ goal high _) | goal > high -> error "rate: Target rate cannot be greater than maximum rate." - Just (Rate low _ high) | low > high -> + Just (Rate low _ high _) | low > high -> error "rate: Minimum rate cannot be greater than maximum rate." _ -> unStream (toStream m) (setStreamRate r st) stp sng yld @@ -221,7 +219,7 @@ yieldRateSerial :: Double -> SerialT m a -> SerialT m a yieldRateSerial _ = id -} --- | Same as @rate (Just $ Rate (r/2) r (2*r))@ +-- | Same as @rate (Just $ Rate (r/2) r (2*r) maxBound)@ -- -- Specifies the average production rate of a stream in number of yields -- per second (i.e. @Hertz@). Concurrent production is ramped up or down @@ -231,9 +229,9 @@ yieldRateSerial _ = id -- -- @since 0.5.0 avgRate :: IsStream t => Double -> t m a -> t m a -avgRate r = rate (Just $ Rate (r/2) r (2*r)) +avgRate r = rate (Just $ Rate (r/2) r (2*r) maxBound) --- | Same as @rate (Just $ Rate r r (2*r))@ +-- | Same as @rate (Just $ Rate r r (2*r) maxBound)@ -- -- Specifies the minimum rate at which the stream should yield values. As -- far as possible the yield rate would never be allowed to go below the @@ -242,9 +240,9 @@ avgRate r = rate (Just $ Rate (r/2) r (2*r)) -- -- @since 0.5.0 minRate :: IsStream t => Double -> t m a -> t m a -minRate r = rate (Just $ Rate r r (2*r)) +minRate r = rate (Just $ Rate r r (2*r) maxBound) --- | Same as @rate (Just $ Rate (r/2) r r)@ +-- | Same as @rate (Just $ Rate (r/2) r r maxBound)@ -- -- Specifies the maximum rate at which the stream should yield values. As -- far as possible the yield rate would never be allowed to go above the @@ -255,9 +253,9 @@ minRate r = rate (Just $ Rate r r (2*r)) -- -- @since 0.5.0 maxRate :: IsStream t => Double -> t m a -> t m a -maxRate r = rate (Just $ Rate (r/2) r r) +maxRate r = rate (Just $ Rate (r/2) r r maxBound) --- | Same as @rate (Just $ Rate r r r)@ +-- | Same as @rate (Just $ Rate r r r 0)@ -- -- Specifies a constant yield rate. If for some reason the actual rate -- goes above or below the specified rate we do not try to recover it by @@ -267,7 +265,7 @@ maxRate r = rate (Just $ Rate (r/2) r r) -- -- @since 0.5.0 constRate :: IsStream t => Double -> t m a -> t m a -constRate r = rate (Just $ Rate r r r) +constRate r = rate (Just $ Rate r r r 0) -- | Specify the average latency, in nanoseconds, of a single threaded action -- in a concurrent composition. Streamly can measure the latencies, but that is diff --git a/test/MaxRate.hs b/test/MaxRate.hs index b76ef366f..9b1fa8113 100644 --- a/test/MaxRate.hs +++ b/test/MaxRate.hs @@ -115,8 +115,11 @@ main = hspec $ do forM_ rates (\r -> measureRate "aheadly" aheadly r 0 1 range) describe "asyncly with 1 sec producer delay and some consumer delay" $ do + -- ideally it should take 10 x 1 + 1 seconds forM_ [1] (\r -> measureRate "asyncly" asyncly r 1 1 (11, 16)) + -- ideally it should take 10 x 2 + 1 seconds forM_ [1] (\r -> measureRate "asyncly" asyncly r 2 1 (21, 23)) + -- ideally it should take 10 x 3 + 1 seconds forM_ [1] (\r -> measureRate "asyncly" asyncly r 3 1 (31, 33)) describe "aheadly with 1 sec producer delay and some consumer delay" $ do diff --git a/test/rate-timestamp.hs b/test/rate-timestamp.hs index 43f417648..52fc36b86 100644 --- a/test/rate-timestamp.hs +++ b/test/rate-timestamp.hs @@ -14,7 +14,7 @@ withTimeStamp msg = do -- acidRain :: MonadAsync m => SerialT m Event producer = asyncly - $ rate (Just $ AvgRate 1) + $ avgRate 1 $ S.repeatM $ liftIO $ do withTimeStamp "produced"