diff --git a/src/Streamly/SVar.hs b/src/Streamly/SVar.hs index beba55678..6ef6a7e86 100644 --- a/src/Streamly/SVar.hs +++ b/src/Streamly/SVar.hs @@ -346,7 +346,7 @@ data SVar t m a = SVar -- Combined/aggregate parameters , maxWorkerLimit :: Limit , maxBufferLimit :: Limit - , remainingYields :: Maybe (IORef Count) + , remainingWork :: Maybe (IORef Count) , yieldRateInfo :: Maybe YieldRateInfo -- Used only by bounded SVar types @@ -779,7 +779,7 @@ doFork action exHandler = exHandler runInIO (return tid) --- XXX Can we make access to remainingYields and yieldRateInfo fields in sv +-- XXX Can we make access to remainingWork and yieldRateInfo fields in sv -- faster, along with the fields in sv required by send? -- XXX make it noinline -- @@ -793,7 +793,7 @@ doFork action exHandler = {-# INLINE decrementYieldLimit #-} decrementYieldLimit :: SVar t m a -> IO Bool decrementYieldLimit sv = - case remainingYields sv of + case remainingWork sv of Nothing -> return True Just ref -> do r <- atomicModifyIORefCAS ref $ \x -> (x - 1, x) @@ -804,7 +804,7 @@ decrementYieldLimit sv = {-# INLINE decrementYieldLimitPost #-} decrementYieldLimitPost :: SVar t m a -> IO Bool decrementYieldLimitPost sv = - case remainingYields sv of + case remainingWork sv of Nothing -> return True Just ref -> do r <- atomicModifyIORefCAS ref $ \x -> (x - 1, x) @@ -813,7 +813,7 @@ decrementYieldLimitPost sv = {-# INLINE incrementYieldLimit #-} incrementYieldLimit :: SVar t m a -> IO () incrementYieldLimit sv = - case remainingYields sv of + case remainingWork sv of Nothing -> return () Just ref -> atomicModifyIORefCAS_ ref (+ 1) @@ -1339,6 +1339,9 @@ dispatchWorker yieldCount sv = do -- XXX in case of Ahead streams we should not send more than one worker -- when the work queue is done but heap is not done. done <- liftIO $ isWorkDone sv + -- Note, "done" may not mean that the work is actually finished if there + -- are workers active, because there may be a worker which has not yet + -- queued the leftover work. if (not done) then do qDone <- liftIO $ isQueueDone sv @@ -1353,7 +1356,7 @@ dispatchWorker yieldCount sv = do -- executing. In that case we should either configure the maxWorker -- count to higher or use parallel style instead of ahead or async -- style. - limit <- case remainingYields sv of + limit <- case remainingWork sv of Nothing -> return workerLimit Just ref -> do n <- liftIO $ readIORef ref @@ -1368,12 +1371,12 @@ dispatchWorker yieldCount sv = do let dispatch = pushWorker yieldCount sv >> return True in case limit of Unlimited -> dispatch - -- Note that the use of remainingYields and workerCount is not + -- Note that the use of remainingWork and workerCount is not -- atomic and the counts may even have changed between reading and -- using them here, so this is just approximate logic and we cannot -- rely on it for correctness. We may actually dispatch more -- workers than required. - Limited lim | active < (fromIntegral lim) -> dispatch + Limited lim | lim > 0 -> dispatch _ -> return False else do when (active <= 0) $ pushWorker 0 sv @@ -1474,9 +1477,9 @@ estimateWorkers workerLimit svarYields gainLossYields in assert (adjustedLat > 0) $ if wLatency <= adjustedLat then PartialWorker deltaYields - else ManyWorkers ( fromIntegral - $ withLimit - $ wLatency `div` adjustedLat) deltaYields + else let workers = withLimit $ wLatency `div` adjustedLat + limited = min workers (fromIntegral deltaYields) + in ManyWorkers (fromIntegral limited) deltaYields else let expectedDuration = fromIntegral effectiveYields * targetLat sleepTime = expectedDuration - svarElapsed @@ -2001,7 +2004,7 @@ getAheadSVar st f = do let getSVar sv readOutput postProc = SVar { outputQueue = outQ - , remainingYields = yl + , remainingWork = yl , maxBufferLimit = getMaxBuffer st , maxWorkerLimit = getMaxThreads st , yieldRateInfo = rateInfo @@ -2049,7 +2052,7 @@ getAheadSVar st f = do isQueueDoneAhead sv q = do queueDone <- checkEmpty q yieldsDone <- - case remainingYields sv of + case remainingWork sv of Just yref -> do n <- readIORef yref return (n <= 0) @@ -2097,7 +2100,7 @@ getParallelSVar st = do let sv = SVar { outputQueue = outQ - , remainingYields = yl + , remainingWork = yl , maxBufferLimit = Unlimited , maxWorkerLimit = Unlimited -- Used only for diagnostics diff --git a/src/Streamly/Streams/Async.hs b/src/Streamly/Streams/Async.hs index 62a2b53c8..020689877 100644 --- a/src/Streamly/Streams/Async.hs +++ b/src/Streamly/Streams/Async.hs @@ -283,7 +283,7 @@ getLifoSVar st = do let isWorkFinishedLimited sv = do yieldsDone <- - case remainingYields sv of + case remainingWork sv of Just ref -> do n <- readIORef ref return (n <= 0) @@ -293,7 +293,7 @@ getLifoSVar st = do let getSVar sv readOutput postProc workDone wloop = SVar { outputQueue = outQ - , remainingYields = yl + , remainingWork = yl , maxBufferLimit = getMaxBuffer st , maxWorkerLimit = getMaxThreads st , yieldRateInfo = rateInfo @@ -382,7 +382,7 @@ getFifoSVar st = do let isWorkFinished _ = nullQ q let isWorkFinishedLimited sv = do yieldsDone <- - case remainingYields sv of + case remainingWork sv of Just ref -> do n <- readIORef ref return (n <= 0) @@ -392,7 +392,7 @@ getFifoSVar st = do let getSVar sv readOutput postProc workDone wloop = SVar { outputQueue = outQ - , remainingYields = yl + , remainingWork = yl , maxBufferLimit = getMaxBuffer st , maxWorkerLimit = getMaxThreads st , yieldRateInfo = rateInfo diff --git a/test/Main.hs b/test/Main.hs index 8fb9c400b..340541865 100644 --- a/test/Main.hs +++ b/test/Main.hs @@ -84,19 +84,19 @@ main = hspec $ do `shouldReturn` ([0,0,2,2,4,4,8,8]) describe "restricts concurrency and cleans up extra tasks" $ do - it "take 1 asyncly" $ checkCleanup asyncly (S.take 1) - it "take 1 wAsyncly" $ checkCleanup wAsyncly (S.take 1) - it "take 1 aheadly" $ checkCleanup aheadly (S.take 1) + it "take 1 asyncly" $ checkCleanup 2 asyncly (S.take 1) + it "take 1 wAsyncly" $ checkCleanup 2 wAsyncly (S.take 1) + it "take 1 aheadly" $ checkCleanup 2 aheadly (S.take 1) - it "takeWhile (< 0) asyncly" $ checkCleanup asyncly (S.takeWhile (< 0)) - it "takeWhile (< 0) wAsyncly" $ checkCleanup wAsyncly (S.takeWhile (< 0)) - it "takeWhile (< 0) aheadly" $ checkCleanup aheadly (S.takeWhile (< 0)) + it "takeWhile (< 0) asyncly" $ checkCleanup 2 asyncly (S.takeWhile (< 0)) + it "takeWhile (< 0) wAsyncly" $ checkCleanup 2 wAsyncly (S.takeWhile (< 0)) + it "takeWhile (< 0) aheadly" $ checkCleanup 2 aheadly (S.takeWhile (< 0)) #ifdef DEVBUILD -- parallely fails on CI machines, may need more difference in times of -- the events, but that would make tests even slower. - it "take 1 parallely" $ checkCleanup parallely (S.take 1) - it "takeWhile (< 0) parallely" $ checkCleanup parallely (S.takeWhile (< 0)) + it "take 1 parallely" $ checkCleanup 3 parallely (S.take 1) + it "takeWhile (< 0) parallely" $ checkCleanup 3 parallely (S.takeWhile (< 0)) testFoldOpsCleanup "head" S.head testFoldOpsCleanup "null" S.null @@ -138,10 +138,11 @@ main = hspec $ do describe "Parallel mappend time order check" $ parallelCheck parallely mappend checkCleanup :: IsStream t - => (t IO Int -> SerialT IO Int) + => Int + -> (t IO Int -> SerialT IO Int) -> (t IO Int -> t IO Int) -> IO () -checkCleanup t op = do +checkCleanup d t op = do r <- newIORef (-1 :: Int) runStream . serially $ do _ <- t $ op $ delay r 0 S.|: delay r 1 S.|: delay r 2 S.|: S.nil @@ -151,7 +152,7 @@ checkCleanup t op = do res <- readIORef r res `shouldBe` 0 where - delay ref i = threadDelay (i*200000) >> writeIORef ref i >> return i + delay ref i = threadDelay (i*d*100000) >> writeIORef ref i >> return i #ifdef DEVBUILD checkCleanupFold :: IsStream t diff --git a/test/MaxRate.hs b/test/MaxRate.hs index 9b1fa8113..40d6dac73 100644 --- a/test/MaxRate.hs +++ b/test/MaxRate.hs @@ -105,12 +105,13 @@ main = hspec $ do in describe "wAsyncly no consumer delay and 1 sec producer delay" $ do forM_ rates (\r -> measureRate "wAsyncly" wAsyncly r 0 1 range) - -- XXX does not work well at a million ops per second, need to fix. - let rates = [1, 10, 100, 1000, 10000, 100000] + let rates = [1, 10, 100, 1000, 10000, 100000, 1000000] in describe "aheadly no consumer delay no producer delay" $ do forM_ rates (\r -> measureRate "aheadly" aheadly r 0 0 range) - let rates = [1, 10, 100, 1000, 10000, 25000] + -- XXX after the change to stop workers when the heap is clearing + -- thi does not work well at a 25000 ops per second, need to fix. + let rates = [1, 10, 100, 1000, 10000, 12500] in describe "aheadly no consumer delay and 1 sec producer delay" $ do forM_ rates (\r -> measureRate "aheadly" aheadly r 0 1 range)