mirror of
https://github.com/composewell/streamly.git
synced 2024-09-20 16:08:20 +03:00
Fix a rate control issue
We were not sending enough workers in the end when the streaming is finishing. Also, limit the estimated workers to the number of yields required.
This commit is contained in:
parent
e583f563e0
commit
cca869f7ed
@ -346,7 +346,7 @@ data SVar t m a = SVar
|
||||
-- Combined/aggregate parameters
|
||||
, maxWorkerLimit :: Limit
|
||||
, maxBufferLimit :: Limit
|
||||
, remainingYields :: Maybe (IORef Count)
|
||||
, remainingWork :: Maybe (IORef Count)
|
||||
, yieldRateInfo :: Maybe YieldRateInfo
|
||||
|
||||
-- Used only by bounded SVar types
|
||||
@ -779,7 +779,7 @@ doFork action exHandler =
|
||||
exHandler
|
||||
runInIO (return tid)
|
||||
|
||||
-- XXX Can we make access to remainingYields and yieldRateInfo fields in sv
|
||||
-- XXX Can we make access to remainingWork and yieldRateInfo fields in sv
|
||||
-- faster, along with the fields in sv required by send?
|
||||
-- XXX make it noinline
|
||||
--
|
||||
@ -793,7 +793,7 @@ doFork action exHandler =
|
||||
{-# INLINE decrementYieldLimit #-}
|
||||
decrementYieldLimit :: SVar t m a -> IO Bool
|
||||
decrementYieldLimit sv =
|
||||
case remainingYields sv of
|
||||
case remainingWork sv of
|
||||
Nothing -> return True
|
||||
Just ref -> do
|
||||
r <- atomicModifyIORefCAS ref $ \x -> (x - 1, x)
|
||||
@ -804,7 +804,7 @@ decrementYieldLimit sv =
|
||||
{-# INLINE decrementYieldLimitPost #-}
|
||||
decrementYieldLimitPost :: SVar t m a -> IO Bool
|
||||
decrementYieldLimitPost sv =
|
||||
case remainingYields sv of
|
||||
case remainingWork sv of
|
||||
Nothing -> return True
|
||||
Just ref -> do
|
||||
r <- atomicModifyIORefCAS ref $ \x -> (x - 1, x)
|
||||
@ -813,7 +813,7 @@ decrementYieldLimitPost sv =
|
||||
{-# INLINE incrementYieldLimit #-}
|
||||
incrementYieldLimit :: SVar t m a -> IO ()
|
||||
incrementYieldLimit sv =
|
||||
case remainingYields sv of
|
||||
case remainingWork sv of
|
||||
Nothing -> return ()
|
||||
Just ref -> atomicModifyIORefCAS_ ref (+ 1)
|
||||
|
||||
@ -1339,6 +1339,9 @@ dispatchWorker yieldCount sv = do
|
||||
-- XXX in case of Ahead streams we should not send more than one worker
|
||||
-- when the work queue is done but heap is not done.
|
||||
done <- liftIO $ isWorkDone sv
|
||||
-- Note, "done" may not mean that the work is actually finished if there
|
||||
-- are workers active, because there may be a worker which has not yet
|
||||
-- queued the leftover work.
|
||||
if (not done)
|
||||
then do
|
||||
qDone <- liftIO $ isQueueDone sv
|
||||
@ -1353,7 +1356,7 @@ dispatchWorker yieldCount sv = do
|
||||
-- executing. In that case we should either configure the maxWorker
|
||||
-- count to higher or use parallel style instead of ahead or async
|
||||
-- style.
|
||||
limit <- case remainingYields sv of
|
||||
limit <- case remainingWork sv of
|
||||
Nothing -> return workerLimit
|
||||
Just ref -> do
|
||||
n <- liftIO $ readIORef ref
|
||||
@ -1368,12 +1371,12 @@ dispatchWorker yieldCount sv = do
|
||||
let dispatch = pushWorker yieldCount sv >> return True
|
||||
in case limit of
|
||||
Unlimited -> dispatch
|
||||
-- Note that the use of remainingYields and workerCount is not
|
||||
-- Note that the use of remainingWork and workerCount is not
|
||||
-- atomic and the counts may even have changed between reading and
|
||||
-- using them here, so this is just approximate logic and we cannot
|
||||
-- rely on it for correctness. We may actually dispatch more
|
||||
-- workers than required.
|
||||
Limited lim | active < (fromIntegral lim) -> dispatch
|
||||
Limited lim | lim > 0 -> dispatch
|
||||
_ -> return False
|
||||
else do
|
||||
when (active <= 0) $ pushWorker 0 sv
|
||||
@ -1474,9 +1477,9 @@ estimateWorkers workerLimit svarYields gainLossYields
|
||||
in assert (adjustedLat > 0) $
|
||||
if wLatency <= adjustedLat
|
||||
then PartialWorker deltaYields
|
||||
else ManyWorkers ( fromIntegral
|
||||
$ withLimit
|
||||
$ wLatency `div` adjustedLat) deltaYields
|
||||
else let workers = withLimit $ wLatency `div` adjustedLat
|
||||
limited = min workers (fromIntegral deltaYields)
|
||||
in ManyWorkers (fromIntegral limited) deltaYields
|
||||
else
|
||||
let expectedDuration = fromIntegral effectiveYields * targetLat
|
||||
sleepTime = expectedDuration - svarElapsed
|
||||
@ -2001,7 +2004,7 @@ getAheadSVar st f = do
|
||||
|
||||
let getSVar sv readOutput postProc = SVar
|
||||
{ outputQueue = outQ
|
||||
, remainingYields = yl
|
||||
, remainingWork = yl
|
||||
, maxBufferLimit = getMaxBuffer st
|
||||
, maxWorkerLimit = getMaxThreads st
|
||||
, yieldRateInfo = rateInfo
|
||||
@ -2049,7 +2052,7 @@ getAheadSVar st f = do
|
||||
isQueueDoneAhead sv q = do
|
||||
queueDone <- checkEmpty q
|
||||
yieldsDone <-
|
||||
case remainingYields sv of
|
||||
case remainingWork sv of
|
||||
Just yref -> do
|
||||
n <- readIORef yref
|
||||
return (n <= 0)
|
||||
@ -2097,7 +2100,7 @@ getParallelSVar st = do
|
||||
|
||||
let sv =
|
||||
SVar { outputQueue = outQ
|
||||
, remainingYields = yl
|
||||
, remainingWork = yl
|
||||
, maxBufferLimit = Unlimited
|
||||
, maxWorkerLimit = Unlimited
|
||||
-- Used only for diagnostics
|
||||
|
@ -283,7 +283,7 @@ getLifoSVar st = do
|
||||
|
||||
let isWorkFinishedLimited sv = do
|
||||
yieldsDone <-
|
||||
case remainingYields sv of
|
||||
case remainingWork sv of
|
||||
Just ref -> do
|
||||
n <- readIORef ref
|
||||
return (n <= 0)
|
||||
@ -293,7 +293,7 @@ getLifoSVar st = do
|
||||
|
||||
let getSVar sv readOutput postProc workDone wloop = SVar
|
||||
{ outputQueue = outQ
|
||||
, remainingYields = yl
|
||||
, remainingWork = yl
|
||||
, maxBufferLimit = getMaxBuffer st
|
||||
, maxWorkerLimit = getMaxThreads st
|
||||
, yieldRateInfo = rateInfo
|
||||
@ -382,7 +382,7 @@ getFifoSVar st = do
|
||||
let isWorkFinished _ = nullQ q
|
||||
let isWorkFinishedLimited sv = do
|
||||
yieldsDone <-
|
||||
case remainingYields sv of
|
||||
case remainingWork sv of
|
||||
Just ref -> do
|
||||
n <- readIORef ref
|
||||
return (n <= 0)
|
||||
@ -392,7 +392,7 @@ getFifoSVar st = do
|
||||
|
||||
let getSVar sv readOutput postProc workDone wloop = SVar
|
||||
{ outputQueue = outQ
|
||||
, remainingYields = yl
|
||||
, remainingWork = yl
|
||||
, maxBufferLimit = getMaxBuffer st
|
||||
, maxWorkerLimit = getMaxThreads st
|
||||
, yieldRateInfo = rateInfo
|
||||
|
23
test/Main.hs
23
test/Main.hs
@ -84,19 +84,19 @@ main = hspec $ do
|
||||
`shouldReturn` ([0,0,2,2,4,4,8,8])
|
||||
|
||||
describe "restricts concurrency and cleans up extra tasks" $ do
|
||||
it "take 1 asyncly" $ checkCleanup asyncly (S.take 1)
|
||||
it "take 1 wAsyncly" $ checkCleanup wAsyncly (S.take 1)
|
||||
it "take 1 aheadly" $ checkCleanup aheadly (S.take 1)
|
||||
it "take 1 asyncly" $ checkCleanup 2 asyncly (S.take 1)
|
||||
it "take 1 wAsyncly" $ checkCleanup 2 wAsyncly (S.take 1)
|
||||
it "take 1 aheadly" $ checkCleanup 2 aheadly (S.take 1)
|
||||
|
||||
it "takeWhile (< 0) asyncly" $ checkCleanup asyncly (S.takeWhile (< 0))
|
||||
it "takeWhile (< 0) wAsyncly" $ checkCleanup wAsyncly (S.takeWhile (< 0))
|
||||
it "takeWhile (< 0) aheadly" $ checkCleanup aheadly (S.takeWhile (< 0))
|
||||
it "takeWhile (< 0) asyncly" $ checkCleanup 2 asyncly (S.takeWhile (< 0))
|
||||
it "takeWhile (< 0) wAsyncly" $ checkCleanup 2 wAsyncly (S.takeWhile (< 0))
|
||||
it "takeWhile (< 0) aheadly" $ checkCleanup 2 aheadly (S.takeWhile (< 0))
|
||||
|
||||
#ifdef DEVBUILD
|
||||
-- parallely fails on CI machines, may need more difference in times of
|
||||
-- the events, but that would make tests even slower.
|
||||
it "take 1 parallely" $ checkCleanup parallely (S.take 1)
|
||||
it "takeWhile (< 0) parallely" $ checkCleanup parallely (S.takeWhile (< 0))
|
||||
it "take 1 parallely" $ checkCleanup 3 parallely (S.take 1)
|
||||
it "takeWhile (< 0) parallely" $ checkCleanup 3 parallely (S.takeWhile (< 0))
|
||||
|
||||
testFoldOpsCleanup "head" S.head
|
||||
testFoldOpsCleanup "null" S.null
|
||||
@ -138,10 +138,11 @@ main = hspec $ do
|
||||
describe "Parallel mappend time order check" $ parallelCheck parallely mappend
|
||||
|
||||
checkCleanup :: IsStream t
|
||||
=> (t IO Int -> SerialT IO Int)
|
||||
=> Int
|
||||
-> (t IO Int -> SerialT IO Int)
|
||||
-> (t IO Int -> t IO Int)
|
||||
-> IO ()
|
||||
checkCleanup t op = do
|
||||
checkCleanup d t op = do
|
||||
r <- newIORef (-1 :: Int)
|
||||
runStream . serially $ do
|
||||
_ <- t $ op $ delay r 0 S.|: delay r 1 S.|: delay r 2 S.|: S.nil
|
||||
@ -151,7 +152,7 @@ checkCleanup t op = do
|
||||
res <- readIORef r
|
||||
res `shouldBe` 0
|
||||
where
|
||||
delay ref i = threadDelay (i*200000) >> writeIORef ref i >> return i
|
||||
delay ref i = threadDelay (i*d*100000) >> writeIORef ref i >> return i
|
||||
|
||||
#ifdef DEVBUILD
|
||||
checkCleanupFold :: IsStream t
|
||||
|
@ -105,12 +105,13 @@ main = hspec $ do
|
||||
in describe "wAsyncly no consumer delay and 1 sec producer delay" $ do
|
||||
forM_ rates (\r -> measureRate "wAsyncly" wAsyncly r 0 1 range)
|
||||
|
||||
-- XXX does not work well at a million ops per second, need to fix.
|
||||
let rates = [1, 10, 100, 1000, 10000, 100000]
|
||||
let rates = [1, 10, 100, 1000, 10000, 100000, 1000000]
|
||||
in describe "aheadly no consumer delay no producer delay" $ do
|
||||
forM_ rates (\r -> measureRate "aheadly" aheadly r 0 0 range)
|
||||
|
||||
let rates = [1, 10, 100, 1000, 10000, 25000]
|
||||
-- XXX after the change to stop workers when the heap is clearing
|
||||
-- thi does not work well at a 25000 ops per second, need to fix.
|
||||
let rates = [1, 10, 100, 1000, 10000, 12500]
|
||||
in describe "aheadly no consumer delay and 1 sec producer delay" $ do
|
||||
forM_ rates (\r -> measureRate "aheadly" aheadly r 0 1 range)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user