mirror of
https://github.com/awkward-squad/ki.git
synced 2024-10-03 22:57:51 +03:00
awaitSTM -> await
This commit is contained in:
parent
278d330e1c
commit
308751e5a4
@ -1,4 +1,4 @@
|
||||
-- | The `ki` API, generalized to use 'MonadIO' and 'MonadUnliftIO'.
|
||||
-- | The `ki` API, generalized to use 'MonadUnliftIO'.
|
||||
--
|
||||
-- For a variant of this API that does not use @<https://hackage.haskell.org/package/unliftio-core unliftio-core>@, see
|
||||
-- @<https://hackage.haskell.org/package/ki ki>@.
|
||||
@ -10,14 +10,13 @@ module Ki.Unlifted
|
||||
Ki.Thread,
|
||||
fork,
|
||||
forktry,
|
||||
await,
|
||||
Ki.await,
|
||||
|
||||
-- * Extended API
|
||||
fork_,
|
||||
forkWith,
|
||||
forkWith_,
|
||||
forktryWith,
|
||||
Ki.awaitSTM,
|
||||
|
||||
-- ** Thread options
|
||||
Ki.ThreadOpts (..),
|
||||
@ -32,17 +31,11 @@ module Ki.Unlifted
|
||||
where
|
||||
|
||||
import Control.Exception (Exception)
|
||||
import Control.Monad.IO.Class (MonadIO (liftIO))
|
||||
import Control.Monad.IO.Unlift (MonadUnliftIO (withRunInIO))
|
||||
import Data.Void (Void)
|
||||
import qualified Ki
|
||||
import Prelude
|
||||
|
||||
-- | See 'Ki.await'.
|
||||
await :: forall a m. MonadIO m => Ki.Thread a -> m a
|
||||
await thread =
|
||||
liftIO (Ki.await thread)
|
||||
|
||||
-- | See 'Ki.fork'.
|
||||
fork :: forall a m. MonadUnliftIO m => Ki.Scope -> m a -> m (Ki.Thread a)
|
||||
fork scope action =
|
||||
|
@ -62,7 +62,6 @@ module Ki
|
||||
forkWith,
|
||||
forkWith_,
|
||||
forktryWith,
|
||||
awaitSTM,
|
||||
|
||||
-- ** Thread options
|
||||
ThreadOpts (..),
|
||||
@ -83,7 +82,6 @@ import Ki.Scope
|
||||
ThreadAffinity (..),
|
||||
ThreadOpts (..),
|
||||
await,
|
||||
awaitSTM,
|
||||
defaultThreadOpts,
|
||||
fork,
|
||||
forkWith,
|
||||
|
@ -1,5 +1,3 @@
|
||||
{-# LANGUAGE MagicHash #-}
|
||||
|
||||
module Ki.Scope
|
||||
( Scope,
|
||||
scoped,
|
||||
@ -7,7 +5,6 @@ module Ki.Scope
|
||||
--
|
||||
Thread,
|
||||
await,
|
||||
awaitSTM,
|
||||
fork,
|
||||
forkWith,
|
||||
forkWith_,
|
||||
@ -432,20 +429,12 @@ forktryWith scope opts action = do
|
||||
Just _ -> True
|
||||
|
||||
-- | Wait for a thread to terminate, and return its value.
|
||||
await :: Thread a -> IO a
|
||||
await thread =
|
||||
await :: Thread a -> STM a
|
||||
await (Thread _threadId doAwait) =
|
||||
-- If *they* are deadlocked, we will *both* will be delivered a wakeup from the RTS. We want to shrug this exception
|
||||
-- off, because afterwards they'll have put to the result var. But don't shield indefinitely, once will cover this use
|
||||
-- case and prevent any accidental infinite loops.
|
||||
tryEither (\BlockedIndefinitelyOnSTM -> go) pure go
|
||||
where
|
||||
go =
|
||||
atomically (awaitSTM thread)
|
||||
|
||||
-- | @STM@ variant of 'Ki.await'.
|
||||
awaitSTM :: Thread a -> STM a
|
||||
awaitSTM (Thread _threadId doAwait) =
|
||||
doAwait
|
||||
tryEitherSTM (\BlockedIndefinitelyOnSTM -> doAwait) pure doAwait
|
||||
|
||||
-- TODO more docs
|
||||
-- No precondition on masking state
|
||||
@ -467,3 +456,8 @@ propagateException parentThreadId exception childExceptionVar =
|
||||
tryEither :: Exception e => (e -> IO b) -> (a -> IO b) -> IO a -> IO b
|
||||
tryEither onFailure onSuccess action =
|
||||
join (catch (onSuccess <$> action) (pure . onFailure))
|
||||
|
||||
-- Like try, but with continuations
|
||||
tryEitherSTM :: Exception e => (e -> STM b) -> (a -> STM b) -> STM a -> STM b
|
||||
tryEitherSTM onFailure onSuccess action =
|
||||
join (catchSTM (onSuccess <$> action) (pure . onFailure))
|
||||
|
@ -15,7 +15,7 @@ main =
|
||||
"Unit tests"
|
||||
[ testCase "`fork` throws ErrorCall when the scope is closed" do
|
||||
scope <- Ki.scoped pure
|
||||
(Ki.await =<< Ki.fork scope (pure ())) `shouldThrow` ErrorCall "ki: scope closed"
|
||||
(atomically . Ki.await =<< Ki.fork scope (pure ())) `shouldThrow` ErrorCall "ki: scope closed"
|
||||
pure (),
|
||||
testCase "`wait` succeeds when no threads are alive" do
|
||||
Ki.scoped (atomically . Ki.wait),
|
||||
@ -30,7 +30,7 @@ main =
|
||||
mask \restore -> do
|
||||
thread :: Ki.Thread () <- Ki.fork scope (throwIO A)
|
||||
restore (atomically (Ki.wait scope)) `catch` \(e :: SomeException) -> print e
|
||||
Ki.await thread,
|
||||
atomically (Ki.await thread),
|
||||
testCase "`fork` forks in unmasked state regardless of parent's masking state" do
|
||||
Ki.scoped \scope -> do
|
||||
_ <- Ki.fork scope (getMaskingState `shouldReturn` Unmasked)
|
||||
@ -80,24 +80,24 @@ main =
|
||||
testCase "`forktry` can catch sync exceptions" do
|
||||
Ki.scoped \scope -> do
|
||||
result :: Ki.Thread (Either A ()) <- Ki.forktry scope (throw A)
|
||||
Ki.await result `shouldReturn` Left A,
|
||||
atomically (Ki.await result) `shouldReturn` Left A,
|
||||
testCase "`forktry` can propagate sync exceptions" do
|
||||
(`shouldThrow` A) do
|
||||
Ki.scoped \scope -> do
|
||||
thread :: Ki.Thread (Either A2 ()) <- Ki.forktry scope (throw A)
|
||||
Ki.await thread,
|
||||
atomically (Ki.await thread),
|
||||
testCase "`forktry` propagates async exceptions" do
|
||||
(`shouldThrow` B) do
|
||||
Ki.scoped \scope -> do
|
||||
thread :: Ki.Thread (Either B ()) <- Ki.forktry scope (throw B)
|
||||
Ki.await thread,
|
||||
atomically (Ki.await thread),
|
||||
testCase "`forktry` puts exceptions after propagating" do
|
||||
(`shouldThrow` A2) do
|
||||
Ki.scoped \scope -> do
|
||||
mask \restore -> do
|
||||
thread :: Ki.Thread (Either A ()) <- Ki.forktry scope (throwIO A2)
|
||||
restore (atomically (Ki.wait scope)) `catch` \(_ :: SomeException) -> pure ()
|
||||
Ki.await thread
|
||||
atomically (Ki.await thread)
|
||||
]
|
||||
|
||||
data A = A
|
||||
|
Loading…
Reference in New Issue
Block a user