mirror of
https://github.com/ilyakooo0/streamly.git
synced 2024-10-06 21:27:35 +03:00
Gracefully deal with context EOF
When using a context asynchronously
This commit is contained in:
parent
bb195c2c64
commit
0d138d0970
@ -58,7 +58,7 @@ import Control.Concurrent (ThreadId, forkIO,
|
||||
myThreadId, threadDelay)
|
||||
import Control.Concurrent.MVar (MVar, newEmptyMVar, tryTakeMVar,
|
||||
tryPutMVar, takeMVar)
|
||||
import Control.Exception (SomeException (..))
|
||||
import Control.Exception (SomeException (..), Exception)
|
||||
import qualified Control.Exception.Lifted as EL
|
||||
import Control.Monad (ap, liftM, MonadPlus(..), mzero,
|
||||
when)
|
||||
@ -390,10 +390,21 @@ sendWorkerWait ctx = dispatch >> void (liftIO $ takeMVar (doorBell ctx))
|
||||
when (not done) $ (pushWorker ctx) >> dispatch
|
||||
|
||||
|
||||
data ContextUsedAfterEOF = ContextUsedAfterEOF deriving Show
|
||||
instance Exception ContextUsedAfterEOF
|
||||
|
||||
-- | Pull an AsyncT stream from a context
|
||||
{-# NOINLINE pullFromCtx #-}
|
||||
pullFromCtx :: MonadAsync m => Context m a -> AsyncT m a
|
||||
pullFromCtx ctx = AsyncT $ \_ stp yld -> do
|
||||
-- When using an async handle to the context, one may keep using a stale
|
||||
-- context even after it has been fullt drained. To detect it gracefully we
|
||||
-- raise an explicit exception.
|
||||
-- XXX if reading the IORef is costly we can use a flag in the context to
|
||||
-- indicate we are done.
|
||||
done <- allThreadsDone ctx
|
||||
when done $ throwM ContextUsedAfterEOF
|
||||
|
||||
res <- liftIO $ tryTakeMVar (doorBell ctx)
|
||||
when (isNothing res) $ sendWorkerWait ctx
|
||||
list <- liftIO $ atomicModifyIORefCAS (outputQueue ctx) $ \x -> ([], x)
|
||||
|
Loading…
Reference in New Issue
Block a user