mirror of
https://github.com/composewell/streamly.git
synced 2024-09-19 15:37:48 +03:00
implement maxYields to put a limit on total yields of a stream
maxYields is used to limit the concurrent executions of a stream when it it is immediately followed by a "take" limiting the size of the stream. Also fix the maxBuffer implementation of aheadly.
This commit is contained in:
parent
516cab57bf
commit
cf87d1132e
@ -165,6 +165,7 @@ import qualified Prelude
|
||||
import qualified System.IO as IO
|
||||
|
||||
import Streamly.SVar (MonadAsync, defState, rstState)
|
||||
import Streamly.Streams.SVar (maxYields)
|
||||
import Streamly.Streams.StreamK (IsStream(..))
|
||||
import Streamly.Streams.Serial (SerialT)
|
||||
|
||||
@ -688,7 +689,7 @@ filterM p m = fromStreamD $ D.filterM p $ toStreamD m
|
||||
-- @since 0.1.0
|
||||
{-# INLINE take #-}
|
||||
take :: (IsStream t, Monad m) => Int -> t m a -> t m a
|
||||
take n m = fromStreamS $ S.take n $ toStreamS m
|
||||
take n m = fromStreamS $ S.take n $ toStreamS (maxYields (Just n) m)
|
||||
|
||||
-- | End the stream as soon as the predicate fails on an element.
|
||||
--
|
||||
|
@ -37,7 +37,7 @@ module Streamly.SVar
|
||||
, atomicModifyIORefCAS
|
||||
, ChildEvent (..)
|
||||
, AheadHeapEntry (..)
|
||||
, send
|
||||
, sendYield
|
||||
, sendStop
|
||||
, enqueueLIFO
|
||||
, workLoopLIFO
|
||||
@ -46,7 +46,6 @@ module Streamly.SVar
|
||||
, enqueueAhead
|
||||
, pushWorkerPar
|
||||
|
||||
, maxHeap
|
||||
, queueEmptyAhead
|
||||
, dequeueAhead
|
||||
, dequeueFromHeap
|
||||
@ -158,6 +157,7 @@ data SVar t m a =
|
||||
|
||||
-- Shared output queue (events, length)
|
||||
, outputQueue :: IORef ([ChildEvent a], Int)
|
||||
, maxYieldLimit :: Maybe (IORef Int)
|
||||
, outputDoorBell :: MVar () -- signal the consumer about output
|
||||
, readOutputQ :: m [ChildEvent a]
|
||||
, postProcess :: m Bool
|
||||
@ -187,9 +187,10 @@ data SVar t m a =
|
||||
}
|
||||
|
||||
data State t m a = State
|
||||
{ streamVar :: Maybe (SVar t m a)
|
||||
{ streamVar :: Maybe (SVar t m a)
|
||||
, yieldLimit :: Maybe Int
|
||||
, threadsHigh :: Int
|
||||
, bufferHigh :: Int
|
||||
, bufferHigh :: Int
|
||||
}
|
||||
|
||||
defaultMaxThreads, defaultMaxBuffer :: Int
|
||||
@ -199,15 +200,23 @@ defaultMaxBuffer = 1500
|
||||
defState :: State t m a
|
||||
defState = State
|
||||
{ streamVar = Nothing
|
||||
, yieldLimit = Nothing
|
||||
, threadsHigh = defaultMaxThreads
|
||||
, bufferHigh = defaultMaxBuffer
|
||||
}
|
||||
|
||||
-- XXX if perf gets affected we can have all the Nothing params in a single
|
||||
-- structure so that we reset is fast. We can also use rewrite rules such that
|
||||
-- reset occurs only in concurrent streams to reduce the impact on serial
|
||||
-- streams.
|
||||
-- We can optimize this so that we clear it only if it is a Just value, it
|
||||
-- results in slightly better perf for zip/zipM but the performance of scan
|
||||
-- worsens a lot, it does not fuse.
|
||||
rstState :: State t m a -> State t m b
|
||||
rstState st = st {streamVar = Nothing}
|
||||
rstState st = st
|
||||
{ streamVar = Nothing
|
||||
, yieldLimit = Nothing
|
||||
}
|
||||
|
||||
#ifdef DIAGNOSTICS
|
||||
{-# NOINLINE dumpSVar #-}
|
||||
@ -337,7 +346,6 @@ doFork action exHandler =
|
||||
|
||||
-- | This function is used by the producer threads to queue output for the
|
||||
-- consumer thread to consume. Returns whether the queue has more space.
|
||||
{-# NOINLINE send #-}
|
||||
send :: Int -> SVar t m a -> ChildEvent a -> IO Bool
|
||||
send maxOutputQLen sv msg = do
|
||||
len <- atomicModifyIORefCAS (outputQueue sv) $ \(es, n) ->
|
||||
@ -355,6 +363,15 @@ send maxOutputQLen sv msg = do
|
||||
void $ tryPutMVar (outputDoorBell sv) ()
|
||||
return (len < maxOutputQLen || maxOutputQLen < 0)
|
||||
|
||||
{-# NOINLINE sendYield #-}
|
||||
sendYield :: Int -> SVar t m a -> ChildEvent a -> IO Bool
|
||||
sendYield maxOutputQLen sv msg = do
|
||||
ylimit <- case maxYieldLimit sv of
|
||||
Nothing -> return True
|
||||
Just ref -> atomicModifyIORefCAS ref $ \x -> (x - 1, x > 1)
|
||||
r <- send maxOutputQLen sv msg
|
||||
return $ r && ylimit
|
||||
|
||||
{-# NOINLINE sendStop #-}
|
||||
sendStop :: SVar t m a -> IO ()
|
||||
sendStop sv = do
|
||||
@ -534,9 +551,6 @@ enqueueAhead sv q m = do
|
||||
--
|
||||
-- XXX review for livelock
|
||||
--
|
||||
maxHeap :: Int
|
||||
maxHeap = 1500
|
||||
|
||||
{-# INLINE queueEmptyAhead #-}
|
||||
queueEmptyAhead :: MonadIO m => IORef ([t m a], Int) -> m Bool
|
||||
queueEmptyAhead q = liftIO $ do
|
||||
@ -675,7 +689,15 @@ dispatchWorker maxWorkerLimit sv = do
|
||||
-- executing. In that case we should either configure the maxWorker
|
||||
-- count to higher or use parallel style instead of ahead or async
|
||||
-- style.
|
||||
when (cnt < maxWorkerLimit || maxWorkerLimit < 0) $ pushWorker sv
|
||||
limit <- case maxYieldLimit sv of
|
||||
Nothing -> return maxWorkerLimit
|
||||
Just x -> do
|
||||
lim <- liftIO $ readIORef x
|
||||
return $
|
||||
if maxWorkerLimit > 0
|
||||
then min maxWorkerLimit lim
|
||||
else lim
|
||||
when (cnt < limit || limit < 0) $ pushWorker sv
|
||||
|
||||
{-# NOINLINE sendWorkerWait #-}
|
||||
sendWorkerWait :: MonadAsync m => Int -> SVar t m a -> m ()
|
||||
@ -816,6 +838,9 @@ getAheadSVar st f = do
|
||||
wfw <- newIORef False
|
||||
running <- newIORef S.empty
|
||||
q <- newIORef ([], -1)
|
||||
yl <- case yieldLimit st of
|
||||
Nothing -> return Nothing
|
||||
Just x -> Just <$> newIORef x
|
||||
|
||||
#ifdef DIAGNOSTICS
|
||||
disp <- newIORef 0
|
||||
@ -826,6 +851,7 @@ getAheadSVar st f = do
|
||||
#endif
|
||||
let sv =
|
||||
SVar { outputQueue = outQ
|
||||
, maxYieldLimit = yl
|
||||
, outputDoorBell = outQMv
|
||||
, readOutputQ = readOutputQBounded (threadsHigh st) sv
|
||||
, postProcess = postProcessBounded sv
|
||||
@ -879,6 +905,7 @@ getParallelSVar = do
|
||||
#endif
|
||||
let sv =
|
||||
SVar { outputQueue = outQ
|
||||
, maxYieldLimit = Nothing
|
||||
, outputDoorBell = outQMv
|
||||
, readOutputQ = readOutputQPar sv
|
||||
, postProcess = allThreadsDone sv
|
||||
|
@ -37,7 +37,7 @@ import Control.Monad.State.Class (MonadState(..))
|
||||
import Control.Monad.Trans.Class (MonadTrans(lift))
|
||||
import Data.Atomics (atomicModifyIORefCAS_)
|
||||
import Data.Heap (Heap, Entry(..))
|
||||
import Data.IORef (IORef)
|
||||
import Data.IORef (IORef, readIORef)
|
||||
import Data.Maybe (fromJust)
|
||||
import Data.Semigroup (Semigroup(..))
|
||||
|
||||
@ -51,7 +51,7 @@ import qualified Streamly.Streams.StreamK as K
|
||||
|
||||
#ifdef DIAGNOSTICS
|
||||
import Control.Monad (when)
|
||||
import Data.IORef (writeIORef, readIORef)
|
||||
import Data.IORef (writeIORef)
|
||||
#endif
|
||||
import Prelude hiding (map)
|
||||
|
||||
@ -128,7 +128,14 @@ workLoopAhead st q heap = runHeap
|
||||
toHeap seqNo ent = do
|
||||
hp <- liftIO $ atomicModifyIORefCAS heap $ \(h, snum) ->
|
||||
((H.insert (Entry seqNo ent) h, snum), h)
|
||||
if H.size hp <= maxHeap
|
||||
(_, len) <- liftIO $ readIORef (outputQueue sv)
|
||||
let maxHeap = maxBuf - len
|
||||
limit <- case maxYieldLimit sv of
|
||||
Nothing -> return maxHeap
|
||||
Just ref -> do
|
||||
r <- liftIO $ readIORef ref
|
||||
return $ if r >= 0 then r else maxHeap
|
||||
if H.size hp <= limit
|
||||
then runHeap
|
||||
else liftIO $ sendStop sv
|
||||
|
||||
@ -136,7 +143,7 @@ workLoopAhead st q heap = runHeap
|
||||
yieldToHeap seqNo a r = toHeap seqNo (AheadEntryStream (a `K.cons` r))
|
||||
|
||||
singleOutput seqNo a = do
|
||||
continue <- liftIO $ send maxBuf sv (ChildYield a)
|
||||
continue <- liftIO $ sendYield maxBuf sv (ChildYield a)
|
||||
if continue
|
||||
then runQueueToken seqNo
|
||||
else liftIO $ do
|
||||
@ -144,7 +151,7 @@ workLoopAhead st q heap = runHeap
|
||||
sendStop sv
|
||||
|
||||
yieldOutput seqNo a r = do
|
||||
continue <- liftIO $ send maxBuf sv (ChildYield a)
|
||||
continue <- liftIO $ sendYield maxBuf sv (ChildYield a)
|
||||
if continue
|
||||
then unStream r st (runQueueToken seqNo)
|
||||
(singleOutput seqNo)
|
||||
@ -241,7 +248,7 @@ aheadS m1 m2 = Stream $ \st stp sng yld -> do
|
||||
-- sequencing results. This means the left side cannot further
|
||||
-- split into more ahead computations on the same SVar.
|
||||
unStream m1 (rstState st) stp sng yld
|
||||
_ -> unStream (forkSVarAhead m1 m2) (rstState st) stp sng yld
|
||||
_ -> unStream (forkSVarAhead m1 m2) st stp sng yld
|
||||
|
||||
-- | XXX we can implement it more efficienty by directly implementing instead
|
||||
-- of combining streams using ahead.
|
||||
|
@ -72,10 +72,10 @@ runStreamLIFO st q m stop = unStream m st stop single yieldk
|
||||
sv = fromJust $ streamVar st
|
||||
maxBuf = bufferHigh st
|
||||
single a = do
|
||||
res <- liftIO $ send maxBuf sv (ChildYield a)
|
||||
res <- liftIO $ sendYield maxBuf sv (ChildYield a)
|
||||
if res then stop else liftIO $ sendStop sv
|
||||
yieldk a r = do
|
||||
res <- liftIO $ send maxBuf sv (ChildYield a)
|
||||
res <- liftIO $ sendYield maxBuf sv (ChildYield a)
|
||||
if res
|
||||
then (unStream r) st stop single yieldk
|
||||
else liftIO $ enqueueLIFO sv q r >> sendStop sv
|
||||
@ -97,10 +97,10 @@ runStreamFIFO st q m stop = unStream m st stop single yieldk
|
||||
sv = fromJust $ streamVar st
|
||||
maxBuf = bufferHigh st
|
||||
single a = do
|
||||
res <- liftIO $ send maxBuf sv (ChildYield a)
|
||||
res <- liftIO $ sendYield maxBuf sv (ChildYield a)
|
||||
if res then stop else liftIO $ sendStop sv
|
||||
yieldk a r = do
|
||||
res <- liftIO $ send maxBuf sv (ChildYield a)
|
||||
res <- liftIO $ sendYield maxBuf sv (ChildYield a)
|
||||
liftIO (enqueueFIFO sv q r)
|
||||
if res then stop else liftIO $ sendStop sv
|
||||
|
||||
@ -121,6 +121,9 @@ getLifoSVar st = do
|
||||
wfw <- newIORef False
|
||||
running <- newIORef S.empty
|
||||
q <- newIORef []
|
||||
yl <- case yieldLimit st of
|
||||
Nothing -> return Nothing
|
||||
Just x -> Just <$> newIORef x
|
||||
#ifdef DIAGNOSTICS
|
||||
disp <- newIORef 0
|
||||
maxWrk <- newIORef 0
|
||||
@ -131,6 +134,7 @@ getLifoSVar st = do
|
||||
let checkEmpty = null <$> readIORef q
|
||||
let sv =
|
||||
SVar { outputQueue = outQ
|
||||
, maxYieldLimit = yl
|
||||
, outputDoorBell = outQMv
|
||||
, readOutputQ = readOutputQBounded (threadsHigh st) sv
|
||||
, postProcess = postProcessBounded sv
|
||||
@ -163,6 +167,9 @@ getFifoSVar st = do
|
||||
wfw <- newIORef False
|
||||
running <- newIORef S.empty
|
||||
q <- newQ
|
||||
yl <- case yieldLimit st of
|
||||
Nothing -> return Nothing
|
||||
Just x -> Just <$> newIORef x
|
||||
#ifdef DIAGNOSTICS
|
||||
disp <- newIORef 0
|
||||
maxWrk <- newIORef 0
|
||||
@ -172,6 +179,7 @@ getFifoSVar st = do
|
||||
#endif
|
||||
let sv =
|
||||
SVar { outputQueue = outQ
|
||||
, maxYieldLimit = yl
|
||||
, outputDoorBell = outQMv
|
||||
, readOutputQ = readOutputQBounded (threadsHigh st) sv
|
||||
, postProcess = postProcessBounded sv
|
||||
@ -307,11 +315,11 @@ forkSVarAsync style m1 m2 = Stream $ \st stp sng yld -> do
|
||||
{-# INLINE joinStreamVarAsync #-}
|
||||
joinStreamVarAsync :: MonadAsync m
|
||||
=> SVarStyle -> Stream m a -> Stream m a -> Stream m a
|
||||
joinStreamVarAsync style m1 m2 = Stream $ \st stp sng yld ->
|
||||
joinStreamVarAsync style m1 m2 = Stream $ \st stp sng yld -> do
|
||||
case streamVar st of
|
||||
Just sv | svarStyle sv == style ->
|
||||
liftIO (enqueue sv m2) >> unStream m1 st stp sng yld
|
||||
_ -> unStream (forkSVarAsync style m1 m2) (rstState st) stp sng yld
|
||||
_ -> unStream (forkSVarAsync style m1 m2) st stp sng yld
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
-- Semigroup and Monoid style compositions for parallel actions
|
||||
|
@ -67,7 +67,7 @@ runOne st m = unStream m st stop single yieldk
|
||||
|
||||
sv = fromJust $ streamVar st
|
||||
stop = liftIO $ sendStop sv
|
||||
sendit a = liftIO $ send (-1) sv (ChildYield a)
|
||||
sendit a = liftIO $ sendYield (-1) sv (ChildYield a)
|
||||
single a = sendit a >> stop
|
||||
-- XXX there is no flow control in parallel case. We should perhaps use a
|
||||
-- queue and queue it back on that and exit the thread when the outputQueue
|
||||
|
@ -9,6 +9,8 @@
|
||||
{-# LANGUAGE UnboxedTuples #-}
|
||||
{-# LANGUAGE UndecidableInstances #-} -- XXX
|
||||
|
||||
#include "inline.h"
|
||||
|
||||
-- |
|
||||
-- Module : Streamly.Streams.SVar
|
||||
-- Copyright : (c) 2017 Harendra Kumar
|
||||
@ -25,6 +27,7 @@ module Streamly.Streams.SVar
|
||||
, toSVar
|
||||
, maxThreads
|
||||
, maxBuffer
|
||||
, maxYields
|
||||
)
|
||||
where
|
||||
|
||||
@ -32,6 +35,7 @@ import Control.Monad.Catch (throwM)
|
||||
|
||||
import Streamly.SVar
|
||||
import Streamly.Streams.StreamK
|
||||
import Streamly.Streams.Serial (SerialT)
|
||||
|
||||
-- MVar diagnostics has some overhead - around 5% on asyncly null benchmark, we
|
||||
-- can keep it on in production to debug problems quickly if and when they
|
||||
@ -85,6 +89,10 @@ fromSVar sv = fromStream $ fromStreamVar sv
|
||||
toSVar :: (IsStream t, MonadAsync m) => SVar Stream m a -> t m a -> m ()
|
||||
toSVar sv m = toStreamVar sv (toStream m)
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
-- Concurrency control
|
||||
-------------------------------------------------------------------------------
|
||||
--
|
||||
-- | Specify the maximum number of threads that can be spawned concurrently
|
||||
-- when using concurrent streams. This is not the grand total number of threads
|
||||
-- but maximum threads at each point of concurrency.
|
||||
@ -92,11 +100,16 @@ toSVar sv m = toStreamVar sv (toStream m)
|
||||
-- there is no limit. The default value is 1500.
|
||||
--
|
||||
-- @since 0.4.0
|
||||
{-# INLINE_NORMAL maxThreads #-}
|
||||
maxThreads :: IsStream t => Int -> t m a -> t m a
|
||||
maxThreads n m = fromStream $ Stream $ \st stp sng yld -> do
|
||||
let n' = if n == 0 then defaultMaxThreads else n
|
||||
unStream (toStream m) (st {threadsHigh = n'}) stp sng yld
|
||||
|
||||
{-# RULES "maxThreadsSerial serial" maxThreads = maxThreadsSerial #-}
|
||||
maxThreadsSerial :: Int -> SerialT m a -> SerialT m a
|
||||
maxThreadsSerial _ = id
|
||||
|
||||
-- | Specify the maximum size of the buffer for storing the results from
|
||||
-- concurrent computations. If the buffer becomes full we stop spawning more
|
||||
-- concurrent tasks until there is space in the buffer.
|
||||
@ -104,7 +117,25 @@ maxThreads n m = fromStream $ Stream $ \st stp sng yld -> do
|
||||
-- there is no limit. The default value is 1500.
|
||||
--
|
||||
-- @since 0.4.0
|
||||
{-# INLINE_NORMAL maxBuffer #-}
|
||||
maxBuffer :: IsStream t => Int -> t m a -> t m a
|
||||
maxBuffer n m = fromStream $ Stream $ \st stp sng yld -> do
|
||||
let n' = if n == 0 then defaultMaxBuffer else n
|
||||
unStream (toStream m) (st {bufferHigh = n'}) stp sng yld
|
||||
|
||||
{-# RULES "maxBuffer serial" maxBuffer = maxBufferSerial #-}
|
||||
maxBufferSerial :: Int -> SerialT m a -> SerialT m a
|
||||
maxBufferSerial _ = id
|
||||
|
||||
-- Stop concurrent dispatches after this limit. This is useful in API's like
|
||||
-- "take" where we want to dispatch only upto the number of elements "take"
|
||||
-- needs. This value applies only to the immediate next level and is not
|
||||
-- inherited by everything in enclosed scope.
|
||||
{-# INLINE_NORMAL maxYields #-}
|
||||
maxYields :: IsStream t => Maybe Int -> t m a -> t m a
|
||||
maxYields n m = fromStream $ Stream $ \st stp sng yld -> do
|
||||
unStream (toStream m) (st {yieldLimit = n}) stp sng yld
|
||||
|
||||
{-# RULES "maxYields serial" maxYields = maxYieldsSerial #-}
|
||||
maxYieldsSerial :: Maybe Int -> SerialT m a -> SerialT m a
|
||||
maxYieldsSerial _ = id
|
||||
|
11
test/Main.hs
11
test/Main.hs
@ -380,6 +380,10 @@ main = hspec $ do
|
||||
describe "Composed MonadThrow parallely" $ composeWithMonadThrow parallely
|
||||
describe "Composed MonadThrow aheadly" $ composeWithMonadThrow aheadly
|
||||
|
||||
describe "take on infinite concurrent stream" $ takeInfinite asyncly
|
||||
describe "take on infinite concurrent stream" $ takeInfinite wAsyncly
|
||||
describe "take on infinite concurrent stream" $ takeInfinite aheadly
|
||||
|
||||
it "asyncly crosses thread limit (2000 threads)" $
|
||||
runStream (asyncly $ fold $
|
||||
replicate 2000 $ S.yieldM $ threadDelay 1000000)
|
||||
@ -390,6 +394,13 @@ main = hspec $ do
|
||||
replicate 4000 $ S.yieldM $ threadDelay 1000000)
|
||||
`shouldReturn` ()
|
||||
|
||||
takeInfinite :: IsStream t => (t IO Int -> SerialT IO Int) -> Spec
|
||||
takeInfinite t = do
|
||||
it "take 1" $
|
||||
(runStream $ t $
|
||||
S.take 1 $ S.repeatM (print "hello" >> return (1::Int)))
|
||||
`shouldReturn` ()
|
||||
|
||||
-- XXX need to test that we have promptly cleaned up everything after the error
|
||||
-- XXX We can also check the output that we are expected to get before the
|
||||
-- error occurs.
|
||||
|
Loading…
Reference in New Issue
Block a user