mirror of
https://github.com/barrucadu/dejafu.git
synced 2024-12-19 03:21:49 +03:00
Implement throwTo and killThread
This commit is contained in:
parent
2a79b78901
commit
8faee29b27
@ -3,11 +3,16 @@
|
|||||||
|
|
||||||
-- | This module captures in a typeclass the interface of concurrency
|
-- | This module captures in a typeclass the interface of concurrency
|
||||||
-- monads.
|
-- monads.
|
||||||
module Control.Monad.Conc.Class where
|
module Control.Monad.Conc.Class
|
||||||
|
( MonadConc(..)
|
||||||
|
-- * Utilities
|
||||||
|
, spawn
|
||||||
|
, killThread
|
||||||
|
) where
|
||||||
|
|
||||||
import Control.Concurrent (forkIO)
|
import Control.Concurrent (forkIO)
|
||||||
import Control.Concurrent.MVar (MVar, readMVar, newEmptyMVar, putMVar, tryPutMVar, takeMVar, tryTakeMVar)
|
import Control.Concurrent.MVar (MVar, readMVar, newEmptyMVar, putMVar, tryPutMVar, takeMVar, tryTakeMVar)
|
||||||
import Control.Exception (Exception)
|
import Control.Exception (Exception, AsyncException(ThreadKilled))
|
||||||
import Control.Monad (unless)
|
import Control.Monad (unless)
|
||||||
import Control.Monad.Catch (MonadCatch, MonadThrow, catch, throwM)
|
import Control.Monad.Catch (MonadCatch, MonadThrow, catch, throwM)
|
||||||
import Control.Monad.STM (STM)
|
import Control.Monad.STM (STM)
|
||||||
@ -61,19 +66,6 @@ class ( Monad m, MonadCatch m, MonadThrow m
|
|||||||
-- | Get the @ThreadId@ of the current thread.
|
-- | Get the @ThreadId@ of the current thread.
|
||||||
myThreadId :: m (ThreadId m)
|
myThreadId :: m (ThreadId m)
|
||||||
|
|
||||||
-- | Create a concurrent computation for the provided action, and
|
|
||||||
-- return a @CVar@ which can be used to query the result.
|
|
||||||
--
|
|
||||||
-- > spawn ma = do
|
|
||||||
-- > cvar <- newEmptyCVar
|
|
||||||
-- > _ <- fork $ ma >>= putCVar cvar
|
|
||||||
-- > return cvar
|
|
||||||
spawn :: m a -> m (CVar m a)
|
|
||||||
spawn ma = do
|
|
||||||
cvar <- newEmptyCVar
|
|
||||||
_ <- fork $ ma >>= putCVar cvar
|
|
||||||
return cvar
|
|
||||||
|
|
||||||
-- | Create a new empty @CVar@.
|
-- | Create a new empty @CVar@.
|
||||||
newEmptyCVar :: m (CVar m a)
|
newEmptyCVar :: m (CVar m a)
|
||||||
|
|
||||||
@ -126,6 +118,11 @@ class ( Monad m, MonadCatch m, MonadThrow m
|
|||||||
catch :: Exception e => m a -> (e -> m a) -> m a
|
catch :: Exception e => m a -> (e -> m a) -> m a
|
||||||
catch = Control.Monad.Catch.catch
|
catch = Control.Monad.Catch.catch
|
||||||
|
|
||||||
|
-- | Throw an exception to the target thread. This blocks until the
|
||||||
|
-- exception is delivered, and it is just as if the target thread
|
||||||
|
-- had raised it with 'throw'. This can interrupt a blocked action.
|
||||||
|
throwTo :: Exception e => ThreadId m => e -> m ()
|
||||||
|
|
||||||
-- | Runs its argument, just as if the @_concNoTest@ weren't there.
|
-- | Runs its argument, just as if the @_concNoTest@ weren't there.
|
||||||
--
|
--
|
||||||
-- > _concNoTest x = x
|
-- > _concNoTest x = x
|
||||||
@ -155,9 +152,24 @@ instance MonadConc IO where
|
|||||||
readCVar = readMVar
|
readCVar = readMVar
|
||||||
fork = forkIO
|
fork = forkIO
|
||||||
myThreadId = C.myThreadId
|
myThreadId = C.myThreadId
|
||||||
|
throwTo = C.throwTo
|
||||||
newEmptyCVar = newEmptyMVar
|
newEmptyCVar = newEmptyMVar
|
||||||
putCVar = putMVar
|
putCVar = putMVar
|
||||||
tryPutCVar = tryPutMVar
|
tryPutCVar = tryPutMVar
|
||||||
takeCVar = takeMVar
|
takeCVar = takeMVar
|
||||||
tryTakeCVar = tryTakeMVar
|
tryTakeCVar = tryTakeMVar
|
||||||
atomically = S.atomically
|
atomically = S.atomically
|
||||||
|
|
||||||
|
-- | Create a concurrent computation for the provided action, and
|
||||||
|
-- return a @CVar@ which can be used to query the result.
|
||||||
|
spawn :: MonadConc m => m a -> m (CVar m a)
|
||||||
|
spawn ma = do
|
||||||
|
cvar <- newEmptyCVar
|
||||||
|
_ <- fork $ ma >>= putCVar cvar
|
||||||
|
return cvar
|
||||||
|
|
||||||
|
-- | Raise the 'ThreadKilled' exception in the target thread. Note
|
||||||
|
-- that if the thread is prepared to catch this exception, it won't
|
||||||
|
-- actually kill it.
|
||||||
|
killThread :: MonadConc m => ThreadId m => m ()
|
||||||
|
killThread tid = throwTo tid ThreadKilled
|
||||||
|
@ -18,6 +18,8 @@ module Test.DejaFu.Deterministic
|
|||||||
, spawn
|
, spawn
|
||||||
, atomically
|
, atomically
|
||||||
, throw
|
, throw
|
||||||
|
, throwTo
|
||||||
|
, killThread
|
||||||
, catch
|
, catch
|
||||||
|
|
||||||
-- * Communication: CVars
|
-- * Communication: CVars
|
||||||
@ -75,6 +77,7 @@ instance C.MonadConc (Conc t) where
|
|||||||
|
|
||||||
fork = fork
|
fork = fork
|
||||||
myThreadId = myThreadId
|
myThreadId = myThreadId
|
||||||
|
throwTo = throwTo
|
||||||
newEmptyCVar = newEmptyCVar
|
newEmptyCVar = newEmptyCVar
|
||||||
putCVar = putCVar
|
putCVar = putCVar
|
||||||
tryPutCVar = tryPutCVar
|
tryPutCVar = tryPutCVar
|
||||||
@ -149,6 +152,18 @@ tryTakeCVar cvar = C $ cont $ ATryTake $ unV cvar
|
|||||||
throw :: Exception e => e -> Conc t a
|
throw :: Exception e => e -> Conc t a
|
||||||
throw e = C $ cont $ \_ -> AThrow (SomeException e)
|
throw e = C $ cont $ \_ -> AThrow (SomeException e)
|
||||||
|
|
||||||
|
-- | Throw an exception to the target thread. This blocks until the
|
||||||
|
-- exception is delivered, and it is just as if the target thread had
|
||||||
|
-- raised it with 'throw'. This can interrupt a blocked action.
|
||||||
|
throwTo :: Exception e => ThreadId -> e -> Conc t ()
|
||||||
|
throwTo tid e = C $ cont $ \c -> AThrowTo tid (SomeException e) $ c ()
|
||||||
|
|
||||||
|
-- | Raise the 'ThreadKilled' exception in the target thread. Note
|
||||||
|
-- that if the thread is prepared to catch this exception, it won't
|
||||||
|
-- actually kill it.
|
||||||
|
killThread :: ThreadId => Conc t ()
|
||||||
|
killThread = C.killThread
|
||||||
|
|
||||||
-- | Catch an exception raised by 'throw'. This __cannot__ catch
|
-- | Catch an exception raised by 'throw'. This __cannot__ catch
|
||||||
-- errors, such as evaluating 'undefined', or division by zero. If you
|
-- errors, such as evaluating 'undefined', or division by zero. If you
|
||||||
-- need that, use Control.Exception.catch and 'ConcIO'.
|
-- need that, use Control.Exception.catch and 'ConcIO'.
|
||||||
|
@ -22,6 +22,8 @@ module Test.DejaFu.Deterministic.IO
|
|||||||
, spawn
|
, spawn
|
||||||
, atomically
|
, atomically
|
||||||
, throw
|
, throw
|
||||||
|
, throwTo
|
||||||
|
, killThread
|
||||||
, catch
|
, catch
|
||||||
|
|
||||||
-- * Communication: CVars
|
-- * Communication: CVars
|
||||||
@ -79,6 +81,7 @@ instance C.MonadConc (ConcIO t) where
|
|||||||
|
|
||||||
fork = fork
|
fork = fork
|
||||||
myThreadId = myThreadId
|
myThreadId = myThreadId
|
||||||
|
throwTo = throwTo
|
||||||
newEmptyCVar = newEmptyCVar
|
newEmptyCVar = newEmptyCVar
|
||||||
putCVar = putCVar
|
putCVar = putCVar
|
||||||
tryPutCVar = tryPutCVar
|
tryPutCVar = tryPutCVar
|
||||||
@ -154,6 +157,18 @@ tryTakeCVar cvar = C $ cont $ ATryTake $ unV cvar
|
|||||||
throw :: Exception e => e -> ConcIO t a
|
throw :: Exception e => e -> ConcIO t a
|
||||||
throw e = C $ cont $ \_ -> AThrow (SomeException e)
|
throw e = C $ cont $ \_ -> AThrow (SomeException e)
|
||||||
|
|
||||||
|
-- | Throw an exception to the target thread. This blocks until the
|
||||||
|
-- exception is delivered, and it is just as if the target thread had
|
||||||
|
-- raised it with 'throw'. This can interrupt a blocked action.
|
||||||
|
throwTo :: Exception e => ThreadId -> e -> ConcIO t ()
|
||||||
|
throwTo tid e = C $ cont $ \c -> AThrowTo tid (SomeException e) $ c ()
|
||||||
|
|
||||||
|
-- | Raise the 'ThreadKilled' exception in the target thread. Note
|
||||||
|
-- that if the thread is prepared to catch this exception, it won't
|
||||||
|
-- actually kill it.
|
||||||
|
killThread :: ThreadId => ConcIO t ()
|
||||||
|
killThread = C.killThread
|
||||||
|
|
||||||
-- | Catch an exception raised by 'throw'. This __cannot__ catch
|
-- | Catch an exception raised by 'throw'. This __cannot__ catch
|
||||||
-- errors, such as evaluating 'undefined', or division by zero. If you
|
-- errors, such as evaluating 'undefined', or division by zero. If you
|
||||||
-- need that, use Control.Exception.catch and 'liftIO'.
|
-- need that, use Control.Exception.catch and 'liftIO'.
|
||||||
|
@ -5,9 +5,10 @@
|
|||||||
-- functions.
|
-- functions.
|
||||||
module Test.DejaFu.Deterministic.Internal where
|
module Test.DejaFu.Deterministic.Internal where
|
||||||
|
|
||||||
|
import Control.Applicative ((<$>))
|
||||||
import Control.DeepSeq (NFData(..))
|
import Control.DeepSeq (NFData(..))
|
||||||
import Control.Exception (Exception, SomeException(..), fromException)
|
import Control.Exception (Exception, SomeException(..), fromException)
|
||||||
import Control.Monad (liftM, when)
|
import Control.Monad (when)
|
||||||
import Control.Monad.Cont (Cont, runCont)
|
import Control.Monad.Cont (Cont, runCont)
|
||||||
import Control.State
|
import Control.State
|
||||||
import Data.List (intersect)
|
import Data.List (intersect)
|
||||||
@ -49,6 +50,7 @@ data Action n r s =
|
|||||||
| ANew (CVarId -> n (Action n r s))
|
| ANew (CVarId -> n (Action n r s))
|
||||||
| ALift (n (Action n r s))
|
| ALift (n (Action n r s))
|
||||||
| AThrow SomeException
|
| AThrow SomeException
|
||||||
|
| AThrowTo ThreadId SomeException (Action n r s)
|
||||||
| forall a e. Exception e => ACatching (e -> M n r s a) (M n r s a) (a -> Action n r s)
|
| forall a e. Exception e => ACatching (e -> M n r s a) (M n r s a) (a -> Action n r s)
|
||||||
| APopCatching (Action n r s)
|
| APopCatching (Action n r s)
|
||||||
| AStop
|
| AStop
|
||||||
@ -141,6 +143,8 @@ data ThreadAction =
|
|||||||
-- ^ Pop the innermost exception handler from the stack.
|
-- ^ Pop the innermost exception handler from the stack.
|
||||||
| Throw
|
| Throw
|
||||||
-- ^ Throw an exception.
|
-- ^ Throw an exception.
|
||||||
|
| ThrowTo ThreadId
|
||||||
|
-- ^ Throw an exception to a thread.
|
||||||
| Killed
|
| Killed
|
||||||
-- ^ Killed by an uncaught exception.
|
-- ^ Killed by an uncaught exception.
|
||||||
| Lift
|
| Lift
|
||||||
@ -189,12 +193,12 @@ instance NFData Failure where
|
|||||||
-- state, returning a 'Just' if it terminates, and 'Nothing' if a
|
-- state, returning a 'Just' if it terminates, and 'Nothing' if a
|
||||||
-- deadlock is detected. Also returned is the final state of the
|
-- deadlock is detected. Also returned is the final state of the
|
||||||
-- scheduler, and an execution trace.
|
-- scheduler, and an execution trace.
|
||||||
runFixed :: Monad n => Fixed n r s -> (forall x. s n r x -> CTVarId -> n (Result x, CTVarId))
|
runFixed :: (Functor n, Monad n) => Fixed n r s -> (forall x. s n r x -> CTVarId -> n (Result x, CTVarId))
|
||||||
-> Scheduler g -> g -> M n r s a -> n (Either Failure a, g, Trace)
|
-> Scheduler g -> g -> M n r s a -> n (Either Failure a, g, Trace)
|
||||||
runFixed fixed runstm sched s ma = (\(e,g,_,t) -> (e,g,t)) `liftM` runFixed' fixed runstm sched s initialIdSource ma
|
runFixed fixed runstm sched s ma = (\(e,g,_,t) -> (e,g,t)) <$> runFixed' fixed runstm sched s initialIdSource ma
|
||||||
|
|
||||||
-- | Same as 'runFixed', be parametrised by an 'IdSource'.
|
-- | Same as 'runFixed', be parametrised by an 'IdSource'.
|
||||||
runFixed' :: Monad n => Fixed n r s -> (forall x. s n r x -> CTVarId -> n (Result x, CTVarId))
|
runFixed' :: (Functor n, Monad n) => Fixed n r s -> (forall x. s n r x -> CTVarId -> n (Result x, CTVarId))
|
||||||
-> Scheduler g -> g -> IdSource -> M n r s a -> n (Either Failure a, g, IdSource, Trace)
|
-> Scheduler g -> g -> IdSource -> M n r s a -> n (Either Failure a, g, IdSource, Trace)
|
||||||
runFixed' fixed runstm sched s idSource ma = do
|
runFixed' fixed runstm sched s idSource ma = do
|
||||||
ref <- newRef (wref fixed) Nothing
|
ref <- newRef (wref fixed) Nothing
|
||||||
@ -230,7 +234,7 @@ data Handler n r s = forall e. Exception e => Handler (e -> Action n r s)
|
|||||||
-- which can deal with it.
|
-- which can deal with it.
|
||||||
propagate :: SomeException -> [Handler n r s] -> Maybe (Action n r s, [Handler n r s])
|
propagate :: SomeException -> [Handler n r s] -> Maybe (Action n r s, [Handler n r s])
|
||||||
propagate _ [] = Nothing
|
propagate _ [] = Nothing
|
||||||
propagate e (Handler h:hs) = maybe (propagate e hs) (\act -> Just (act, hs)) $ fmap h e' where
|
propagate e (Handler h:hs) = maybe (propagate e hs) (\act -> Just (act, hs)) $ h <$> e' where
|
||||||
e' = fromException e
|
e' = fromException e
|
||||||
|
|
||||||
-- | The number of ID parameters was getting a bit unwieldy, so this
|
-- | The number of ID parameters was getting a bit unwieldy, so this
|
||||||
@ -261,7 +265,7 @@ initialIdSource = Id 0 0 0
|
|||||||
-- efficient to prepend to a list than append. As this function isn't
|
-- efficient to prepend to a list than append. As this function isn't
|
||||||
-- exposed to users of the library, this is just an internal gotcha to
|
-- exposed to users of the library, this is just an internal gotcha to
|
||||||
-- watch out for.
|
-- watch out for.
|
||||||
runThreads :: Monad n => Fixed n r s -> (forall x. s n r x -> CTVarId -> n (Result x, CTVarId))
|
runThreads :: (Functor n, Monad n) => Fixed n r s -> (forall x. s n r x -> CTVarId -> n (Result x, CTVarId))
|
||||||
-> Scheduler g -> g -> Threads n r s -> IdSource -> r (Maybe (Either Failure a)) -> n (g, IdSource, Trace)
|
-> Scheduler g -> g -> Threads n r s -> IdSource -> r (Maybe (Either Failure a)) -> n (g, IdSource, Trace)
|
||||||
runThreads fixed runstm sched origg origthreads idsrc ref = go idsrc [] (-1) origg origthreads where
|
runThreads fixed runstm sched origg origthreads idsrc ref = go idsrc [] (-1) origg origthreads where
|
||||||
go idSource sofar prior g threads
|
go idSource sofar prior g threads
|
||||||
@ -294,9 +298,9 @@ runThreads fixed runstm sched origg origthreads idsrc ref = go idsrc [] (-1) ori
|
|||||||
isBlocked = isJust . (\(_,b,_) -> b) $ fromJust thread
|
isBlocked = isJust . (\(_,b,_) -> b) $ fromJust thread
|
||||||
isNonexistant = isNothing thread
|
isNonexistant = isNothing thread
|
||||||
isTerminated = 0 `notElem` M.keys threads
|
isTerminated = 0 `notElem` M.keys threads
|
||||||
isDeadlocked = M.null runnable && (((~= OnCVarFull undefined) `fmap` M.lookup 0 threads) == Just True ||
|
isDeadlocked = M.null runnable && (((~= OnCVarFull undefined) <$> M.lookup 0 threads) == Just True ||
|
||||||
((~= OnCVarEmpty undefined) `fmap` M.lookup 0 threads) == Just True)
|
((~= OnCVarEmpty undefined) <$> M.lookup 0 threads) == Just True)
|
||||||
isSTMLocked = M.null runnable && ((~= OnCTVar []) `fmap` M.lookup 0 threads) == Just True
|
isSTMLocked = M.null runnable && ((~= OnCTVar []) <$> M.lookup 0 threads) == Just True
|
||||||
|
|
||||||
runconc ma i = do { (a,_,i',_) <- runFixed' fixed runstm sched g i ma; return (a,i') }
|
runconc ma i = do { (a,_,i',_) <- runFixed' fixed runstm sched g i ma; return (a,i') }
|
||||||
|
|
||||||
@ -312,7 +316,7 @@ runThreads fixed runstm sched origg origthreads idsrc ref = go idsrc [] (-1) ori
|
|||||||
|
|
||||||
-- | Run a single thread one step, by dispatching on the type of
|
-- | Run a single thread one step, by dispatching on the type of
|
||||||
-- 'Action'.
|
-- 'Action'.
|
||||||
stepThread :: Monad n => Fixed n r s
|
stepThread :: (Functor n, Monad n) => Fixed n r s
|
||||||
-> (forall x. M n r s x -> IdSource -> n (Either Failure x, IdSource))
|
-> (forall x. M n r s x -> IdSource -> n (Either Failure x, IdSource))
|
||||||
-- ^ Run a 'MonadConc' computation atomically.
|
-- ^ Run a 'MonadConc' computation atomically.
|
||||||
-> (forall x. s n r x -> CTVarId -> n (Result x, CTVarId))
|
-> (forall x. s n r x -> CTVarId -> n (Result x, CTVarId))
|
||||||
@ -338,6 +342,7 @@ stepThread fixed runconc runstm action idSource tid threads = case action of
|
|||||||
ANew na -> stepNew na
|
ANew na -> stepNew na
|
||||||
ALift na -> stepLift na
|
ALift na -> stepLift na
|
||||||
AThrow e -> stepThrow e
|
AThrow e -> stepThrow e
|
||||||
|
AThrowTo t e c -> stepThrowTo t e c
|
||||||
ACatching h ma c -> stepCatching h ma c
|
ACatching h ma c -> stepCatching h ma c
|
||||||
APopCatching a -> stepPopCatching a
|
APopCatching a -> stepPopCatching a
|
||||||
ANoTest ma a -> stepNoTest ma a
|
ANoTest ma a -> stepNoTest ma a
|
||||||
@ -345,7 +350,7 @@ stepThread fixed runconc runstm action idSource tid threads = case action of
|
|||||||
|
|
||||||
where
|
where
|
||||||
-- | Start a new thread, assigning it the next 'ThreadId'
|
-- | Start a new thread, assigning it the next 'ThreadId'
|
||||||
stepFork a b = return $ Right (goto (b tid) tid threads', idSource', Fork newtid) where
|
stepFork a b = return $ Right (goto (b newtid) tid threads', idSource', Fork newtid) where
|
||||||
threads' = launch newtid a threads
|
threads' = launch newtid a threads
|
||||||
(idSource', newtid) = nextTId idSource
|
(idSource', newtid) = nextTId idSource
|
||||||
|
|
||||||
@ -406,12 +411,23 @@ stepThread fixed runconc runstm action idSource tid threads = case action of
|
|||||||
-- | Throw an exception, and propagate it to the appropriate
|
-- | Throw an exception, and propagate it to the appropriate
|
||||||
-- handler.
|
-- handler.
|
||||||
stepThrow e = return $
|
stepThrow e = return $
|
||||||
case propagate e . (\(_,_,c) -> c) . fromJust $ M.lookup tid threads of
|
case propagate e . (\(_,_,hs) -> hs) . fromJust $ M.lookup tid threads of
|
||||||
Just (act, hs) ->
|
Just (act, hs) ->
|
||||||
let threads' = M.alter (\(Just (_, Nothing, _)) -> Just (act, Nothing, hs)) tid threads
|
let threads' = M.alter (\(Just (_, Nothing, _)) -> Just (act, Nothing, hs)) tid threads
|
||||||
in Right (threads', idSource, Throw)
|
in Right (threads', idSource, Throw)
|
||||||
Nothing -> Left UncaughtException
|
Nothing -> Left UncaughtException
|
||||||
|
|
||||||
|
-- | Throw an exception to the target thread, and propagate it to
|
||||||
|
-- the appropriate handler.
|
||||||
|
stepThrowTo t e c = return $
|
||||||
|
let threads' = goto c tid threads
|
||||||
|
in case propagate e . (\(_,_,hs) -> hs) <$> M.lookup t threads of
|
||||||
|
Just (Just (act, hs)) -> Right (M.insert t (act, Nothing, hs) threads', idSource, ThrowTo t)
|
||||||
|
Just Nothing
|
||||||
|
| t == 0 -> Left UncaughtException
|
||||||
|
| otherwise -> Right (kill t threads', idSource, ThrowTo t)
|
||||||
|
Nothing -> Right (threads', idSource, ThrowTo t)
|
||||||
|
|
||||||
-- | Create a new @CVar@, using the next 'CVarId'.
|
-- | Create a new @CVar@, using the next 'CVarId'.
|
||||||
stepNew na = do
|
stepNew na = do
|
||||||
let (idSource', newcvid) = nextCVId idSource
|
let (idSource', newcvid) = nextCVId idSource
|
||||||
|
@ -227,5 +227,6 @@ interesting _ (BlockedRead _) = True
|
|||||||
interesting _ (BlockedTake _) = True
|
interesting _ (BlockedTake _) = True
|
||||||
interesting _ (STM _) = True
|
interesting _ (STM _) = True
|
||||||
interesting _ BlockedSTM = True
|
interesting _ BlockedSTM = True
|
||||||
|
interesting _ (ThrowTo _) = True
|
||||||
interesting l Lift = l
|
interesting l Lift = l
|
||||||
interesting _ _ = False
|
interesting _ _ = False
|
||||||
|
Loading…
Reference in New Issue
Block a user