mirror of
https://github.com/composewell/streamly.git
synced 2024-09-20 07:58:27 +03:00
Start new worker when existing ones are slow
This commit is contained in:
parent
3e1239048e
commit
e2b2acd6b5
@ -26,10 +26,11 @@ where
|
||||
|
||||
import Control.Applicative (Alternative (..))
|
||||
import Control.Concurrent (ThreadId, forkIO, killThread,
|
||||
myThreadId)
|
||||
myThreadId, threadDelay)
|
||||
import Control.Concurrent.STM (TBQueue, atomically, newTBQueue,
|
||||
readTBQueue, tryReadTBQueue,
|
||||
writeTBQueue, isFullTBQueue)
|
||||
tryReadTBQueue, writeTBQueue,
|
||||
isEmptyTBQueue, isFullTBQueue,
|
||||
peekTBQueue)
|
||||
import Control.Exception (SomeException (..))
|
||||
import qualified Control.Exception.Lifted as EL
|
||||
import Control.Monad (ap, liftM, MonadPlus(..), mzero,
|
||||
@ -184,10 +185,14 @@ push ctx = run (Just ctx) (dequeueLoop ctx)
|
||||
|
||||
where
|
||||
|
||||
send msg = atomically $ writeTBQueue (childChannel ctx) msg
|
||||
send msg = atomically $ writeTBQueue (childChannel ctx) msg
|
||||
stop = do
|
||||
workQEmpty <- liftIO $ atomically $ isEmptyTBQueue (pendingWork ctx)
|
||||
if (not workQEmpty) then push ctx
|
||||
else liftIO $ myThreadId >>= \tid -> send (ChildStop tid Nothing)
|
||||
yield a _ Nothing = liftIO $ myThreadId >>= \tid -> send (ChildDone tid a)
|
||||
yield a c (Just r) = liftIO (send (ChildYield a)) >> run c r
|
||||
run c m = (runAsyncT m) c (push ctx) yield
|
||||
run c m = (runAsyncT m) c stop yield
|
||||
|
||||
-- Thread tracking has a significant performance overhead (~20% on empty
|
||||
-- threads, it will be lower for heavy threads). It is needed for two reasons:
|
||||
@ -237,7 +242,7 @@ handleException e ctx tid = do
|
||||
-- exceptions get propagated to the top level computation and can be handled
|
||||
-- there.
|
||||
{-# NOINLINE pullWorker #-}
|
||||
pullWorker :: (MonadIO m, MonadThrow m) => Context m a -> AsyncT m a
|
||||
pullWorker :: MonadAsync m => Context m a -> AsyncT m a
|
||||
pullWorker ctx = AsyncT $ \pctx stp yld -> do
|
||||
let continue = (runAsyncT (pullWorker ctx)) pctx stp yld
|
||||
yield a = yld a pctx (Just (pullWorker ctx))
|
||||
@ -245,16 +250,27 @@ pullWorker ctx = AsyncT $ \pctx stp yld -> do
|
||||
done <- f ctx tid
|
||||
if done then finish else cont
|
||||
|
||||
ev <- liftIO $ atomically $ readTBQueue (childChannel ctx)
|
||||
case ev of
|
||||
ChildYield a -> yield a
|
||||
ChildDone tid a ->
|
||||
threadOp tid delThread (yld a pctx Nothing) (yield a)
|
||||
ChildStop tid e ->
|
||||
case e of
|
||||
Nothing -> threadOp tid delThread stp continue
|
||||
Just ex -> handleException ex ctx tid
|
||||
ChildCreate tid -> threadOp tid addThread stp continue
|
||||
res <- liftIO $ atomically $ tryReadTBQueue (childChannel ctx)
|
||||
case res of
|
||||
Nothing -> do
|
||||
liftIO $ threadDelay 4
|
||||
let workQ = pendingWork ctx
|
||||
outQ = childChannel ctx
|
||||
workQEmpty <- liftIO $ atomically $ isEmptyTBQueue workQ
|
||||
outQEmpty <- liftIO $ atomically $ isEmptyTBQueue outQ
|
||||
when (not workQEmpty && outQEmpty) $ pushWorker ctx
|
||||
void $ liftIO $ atomically $ peekTBQueue (childChannel ctx)
|
||||
continue
|
||||
Just ev ->
|
||||
case ev of
|
||||
ChildYield a -> yield a
|
||||
ChildDone tid a ->
|
||||
threadOp tid delThread (yld a pctx Nothing) (yield a)
|
||||
ChildStop tid e ->
|
||||
case e of
|
||||
Nothing -> threadOp tid delThread stp continue
|
||||
Just ex -> handleException ex ctx tid
|
||||
ChildCreate tid -> threadOp tid addThread stp continue
|
||||
|
||||
-- If an exception occurs we push it to the channel so that it can handled by
|
||||
-- the parent. 'Paused' exceptions are to be collected at the top level.
|
||||
@ -265,6 +281,18 @@ handleChildException pchan e = do
|
||||
tid <- myThreadId
|
||||
atomically $ writeTBQueue pchan (ChildStop tid (Just e))
|
||||
|
||||
-- This function is different than "forkWorker" because we have to directly
|
||||
-- insert the threadIds here and cannot use the channel to send ChildCreate
|
||||
-- unlike on the push side. If we do that, the first thread's done message
|
||||
-- may arrive even before the second thread is forked, in that case
|
||||
-- pullWorker will falsely detect that all threads are over.
|
||||
{-# INLINE pushWorker #-}
|
||||
pushWorker :: MonadAsync m => Context m a -> m ()
|
||||
pushWorker ctx = do
|
||||
let chan = childChannel ctx
|
||||
tid <- doFork (push ctx) (handleChildException chan)
|
||||
liftIO $ modifyIORef (runningThreads ctx) $ (\s -> S.insert tid s)
|
||||
|
||||
-- | Split the original computation in a pull-push pair. The original
|
||||
-- computation pulls from a Channel while m1 and m2 push to the channel.
|
||||
{-# NOINLINE pullFork #-}
|
||||
@ -276,19 +304,9 @@ pullFork m1 m2 = AsyncT $ \_ stp yld -> do
|
||||
|
||||
where
|
||||
|
||||
-- This function is different than "forkWorker" because we have to directly
|
||||
-- insert the threadIds here and cannot use the channel to send ChildCreate
|
||||
-- unlike on the push side. If we do that, the first thread's done message
|
||||
-- may arrive even before the second thread is forked, in that case
|
||||
-- pullWorker will falsely detect that all threads are over.
|
||||
pushWorker ctx = do
|
||||
let chan = childChannel ctx
|
||||
tid <- doFork (push ctx) (handleChildException chan)
|
||||
liftIO $ modifyIORef (runningThreads ctx) $ (\s -> S.insert tid s)
|
||||
|
||||
newContext = do
|
||||
channel <- atomically $ newTBQueue 16
|
||||
work <- atomically $ newTBQueue 16
|
||||
channel <- atomically $ newTBQueue 32
|
||||
work <- atomically $ newTBQueue 32
|
||||
running <- newIORef S.empty
|
||||
done <- newIORef S.empty
|
||||
return $ Context { childChannel = channel
|
||||
@ -354,11 +372,7 @@ dequeueLoop :: MonadAsync m => Context m a -> AsyncT m a
|
||||
dequeueLoop ctx = AsyncT $ \_ stp yld -> do
|
||||
work <- liftIO $ atomically $ tryReadTBQueue (pendingWork ctx)
|
||||
case work of
|
||||
Nothing -> do
|
||||
let chan = childChannel ctx
|
||||
tid <- liftIO myThreadId
|
||||
liftIO $ atomically $ writeTBQueue chan (ChildStop tid Nothing)
|
||||
case () of {} -- keep the typechecker happy
|
||||
Nothing -> stp
|
||||
Just m -> do
|
||||
let stop = (runAsyncT (dequeueLoop ctx)) Nothing stp yld
|
||||
yield a c Nothing = yld a c (Just (dequeueLoop ctx))
|
||||
|
Loading…
Reference in New Issue
Block a user