Implement bound threads

This also adds forkOS(N) and isCurrentThreadBound to MonadConc, a
breaking change.

Note: forkOSWithUnmask(N) is NOT added to MonadConc, as it isn't
supported in base-4.8 (GHC 7.10).  See #132 for the action on this.

A bound thread under test gets a dedicated worker thread, which is
forked bound using the underlying MonadConc.  This worker is used for
all lifted actions, with execution as normal otherwise.
This commit is contained in:
Michael Walker 2017-10-09 20:37:57 +01:00
parent 31d29c11ea
commit 45256193c0
13 changed files with 312 additions and 83 deletions

View File

@ -14,6 +14,18 @@ This project is versioned according to the [Package Versioning Policy](https://p
- **Git tag** [concurrency-1.3.0.0][]
- **Hackage** https://hackage.haskell.org/package/concurrency-1.3.0.0
### Control.Monad.Conc.Class
- `MonadConc` now supports bound threads with new `forkOS`, `forkOSN`, and `isCurrentThreadBound`
functions. (#126)
- New `runInBoundThread` and `runInUnboundThread` functions. (#126)
- The `rtsSupportsBoundThreads` definition is now the definition from Control.Concurrent
re-exported, not just `False`. (#126)
Note that bound threads are only supported if you compile with GHC and link with -threaded.
[concurrency-1.3.0.0]: https://github.com/barrucadu/dejafu/releases/tag/concurrency-1.3.0.0

View File

@ -17,16 +17,16 @@
-- This module captures in a typeclass the interface of concurrency
-- monads.
--
-- __Deviations:__ An instance of @MonadConc@ is not required to be
-- an instance of @MonadFix@, unlike @IO@. The @CRef@, @MVar@, and
-- __Deviations:__ An instance of @MonadConc@ is not required to be an
-- instance of @MonadFix@, unlike @IO@. The @CRef@, @MVar@, and
-- @Ticket@ types are not required to be instances of @Show@ or @Eq@,
-- unlike their normal counterparts. The @threadCapability@,
-- @threadWaitRead@, @threadWaitWrite@, @threadWaitReadSTM@,
-- @threadWaitWriteSTM@, and @mkWeakThreadId@ functions are not
-- provided. The @threadDelay@ function is not required to delay the
-- thread, merely to yield it. Bound threads are not supported. The
-- @BlockedIndefinitelyOnMVar@ (and similar) exceptions are /not/
-- thrown during testing, so do not rely on them at all.
-- thread, merely to yield it. The @BlockedIndefinitelyOnMVar@ (and
-- similar) exceptions are /not/ thrown during testing, so do not rely
-- on them at all.
module Control.Monad.Conc.Class
( MonadConc(..)
@ -35,18 +35,31 @@ module Control.Monad.Conc.Class
, forkFinally
, killThread
-- ** Bound threads
-- | Support for multiple operating system threads and bound threads
-- as described below is currently only available in the GHC runtime
-- system if you use the -threaded option when linking.
--
-- Other Haskell systems do not currently support multiple operating
-- system threads.
--
-- A bound thread is a haskell thread that is bound to an operating
-- system thread. While the bound thread is still scheduled by the
-- Haskell run-time system, the operating system thread takes care
-- of all the foreign calls made by the bound thread.
--
-- To a foreign library, the bound thread will look exactly like an
-- ordinary operating system thread created using OS functions like
-- pthread_create or CreateThread.
, IO.rtsSupportsBoundThreads
, runInBoundThread
, runInUnboundThread
-- ** Named Threads
, forkN
, forkOnN
-- ** Bound Threads
-- | @MonadConc@ does not support bound threads, if you need that
-- sort of thing you will have to use regular @IO@.
, rtsSupportsBoundThreads
, isCurrentThreadBound
-- * Exceptions
, throw
, catch
@ -109,7 +122,7 @@ import qualified Control.Monad.Writer.Strict as WS
-- Every @MonadConc@ has an associated 'MonadSTM', transactions of
-- which can be run atomically.
--
-- @since 1.0.0.0
-- @since 1.3.0.0
class ( Applicative m, Monad m
, MonadCatch m, MonadThrow m, MonadMask m
, MonadSTM (STM m)
@ -118,6 +131,8 @@ class ( Applicative m, Monad m
{-# MINIMAL
(forkWithUnmask | forkWithUnmaskN)
, (forkOnWithUnmask | forkOnWithUnmaskN)
, (forkOS | forkOSN)
, isCurrentThreadBound
, getNumCapabilities
, setNumCapabilities
, myThreadId
@ -194,10 +209,6 @@ class ( Applicative m, Monad m
-- | Like 'forkWithUnmask', but the thread is given a name which may
-- be used to present more useful debugging information.
--
-- If an empty name is given, the @ThreadId@ is used. If names
-- conflict, successive threads with the same name are given a
-- numeric suffix, counting up from 1.
--
-- > forkWithUnmaskN _ = forkWithUnmask
--
-- @since 1.0.0.0
@ -234,6 +245,35 @@ class ( Applicative m, Monad m
forkOnWithUnmaskN :: String -> Int -> ((forall a. m a -> m a) -> m ()) -> m (ThreadId m)
forkOnWithUnmaskN _ = forkOnWithUnmask
-- | Fork a computation to happen in a /bound thread/, which is
-- necessary if you need to call foreign (non-Haskell) libraries
-- that make use of thread-local state, such as OpenGL.
--
-- > forkOS = forkOSN ""
--
-- @since 1.3.0.0
forkOS :: m () -> m (ThreadId m)
forkOS = forkOSN ""
-- | Like 'forkOS', but the thread is given a name which may be used
-- to present more useful debugging information.
--
-- > forkOSN _ = forkOS
--
-- @since 1.3.0.0
forkOSN :: String -> m () -> m (ThreadId m)
forkOSN _ = forkOS
-- | Returns 'True' if the calling thread is bound, that is, if it
-- is safe to use foreign libraries that rely on thread-local state
-- from the calling thread.
--
-- This will always be false if your program is not compiled with
-- the threaded runtime.
--
-- @since 1.3.0.0
isCurrentThreadBound :: m Bool
-- | Get the number of Haskell threads that can run simultaneously.
--
-- @since 1.0.0.0
@ -279,10 +319,6 @@ class ( Applicative m, Monad m
-- | Create a new empty @MVar@, but it is given a name which may be
-- used to present more useful debugging information.
--
-- If an empty name is given, a counter starting from 0 is used. If
-- names conflict, successive @MVar@s with the same name are given a
-- numeric suffix, counting up from 1.
--
-- > newEmptyMVarN _ = newEmptyMVar
--
-- @since 1.0.0.0
@ -343,10 +379,6 @@ class ( Applicative m, Monad m
-- | Create a new reference, but it is given a name which may be
-- used to present more useful debugging information.
--
-- If an empty name is given, a counter starting from 0 is used. If
-- names conflict, successive @CRef@s with the same name are given a
-- numeric suffix, counting up from 1.
--
-- > newCRefN _ = newCRef
--
-- @since 1.0.0.0
@ -479,10 +511,6 @@ killThread tid = throwTo tid ThreadKilled
-- | Like 'fork', but the thread is given a name which may be used to
-- present more useful debugging information.
--
-- If no name is given, the @ThreadId@ is used. If names conflict,
-- successive threads with the same name are given a numeric suffix,
-- counting up from 1.
--
-- @since 1.0.0.0
forkN :: MonadConc m => String -> m () -> m (ThreadId m)
forkN name ma = forkWithUnmaskN name (const ma)
@ -490,27 +518,57 @@ forkN name ma = forkWithUnmaskN name (const ma)
-- | Like 'forkOn', but the thread is given a name which may be used
-- to present more useful debugging information.
--
-- If no name is given, the @ThreadId@ is used. If names conflict,
-- successive threads with the same name are given a numeric suffix,
-- counting up from 1.
--
-- @since 1.0.0.0
forkOnN :: MonadConc m => String -> Int -> m () -> m (ThreadId m)
forkOnN name i ma = forkOnWithUnmaskN name i (const ma)
-- Bound Threads
-- | Provided for compatibility, always returns 'False'.
-- | Run the computation passed as the first argument. If the calling
-- thread is not /bound/, a bound thread is created temporarily.
-- @runInBoundThread@ doesn't finish until the inner computation
-- finishes.
--
-- @since 1.0.0.0
rtsSupportsBoundThreads :: Bool
rtsSupportsBoundThreads = False
-- | Provided for compatibility, always returns 'False'.
-- You can wrap a series of foreign function calls that rely on
-- thread-local state with @runInBoundThread@ so that you can use them
-- without knowing whether the current thread is /bound/.
--
-- @since 1.0.0.0
isCurrentThreadBound :: MonadConc m => m Bool
isCurrentThreadBound = pure False
-- @since 1.3.0.0
runInBoundThread :: MonadConc m => m a -> m a
runInBoundThread =
runInThread (not <$> isCurrentThreadBound) (forkOSN "runInBoundThread")
-- | Run the computation passed as the first argument. If the calling
-- thread is /bound/, an unbound thread is created temporarily using
-- @fork@. @runInBoundThread@ doesn't finish until the inner
-- computation finishes.
--
-- Use this function /only/ in the rare case that you have actually
-- observed a performance loss due to the use of bound threads. A
-- program that doesn't need its main thread to be bound and makes
-- /heavy/ use of concurrency (e.g. a web server), might want to wrap
-- its @main@ action in @runInUnboundThread@.
--
-- Note that exceptions which are thrown to the current thread are
-- thrown in turn to the thread that is executing the given
-- computation. This ensures there's always a way of killing the
-- forked thread.
--
-- @since 1.3.0.0
runInUnboundThread :: MonadConc m => m a -> m a
runInUnboundThread =
runInThread isCurrentThreadBound (forkN "runInUnboundThread")
-- | Helper for 'runInBoundThread' and 'runInUnboundThread'
runInThread :: MonadConc m => m Bool -> (m () -> m (ThreadId m)) -> m a -> m a
runInThread check dofork action = do
flag <- check
if flag
then do
mv <- newEmptyMVar
mask $ \restore -> do
tid <- dofork $ Ca.try (restore action) >>= putMVar mv
let wait = takeMVar mv `catch` \(e :: SomeException) -> throwTo tid e >> wait
wait >>= either (\(e :: SomeException) -> throw e) pure
else action
-- Exceptions
@ -575,10 +633,6 @@ newMVar a = do
-- | Create a new @MVar@ containing a value, but it is given a name
-- which may be used to present more useful debugging information.
--
-- If no name is given, a counter starting from 0 is used. If names
-- conflict, successive @MVar@s with the same name are given a numeric
-- suffix, counting up from 1.
--
-- @since 1.0.0.0
newMVarN :: MonadConc m => String -> a -> m (MVar m a)
newMVarN n a = do
@ -620,10 +674,15 @@ instance MonadConc IO where
fork = IO.forkIO
forkOn = IO.forkOn
forkOS = IO.forkOS
forkWithUnmask = IO.forkIOWithUnmask
forkOnWithUnmask = IO.forkOnWithUnmask
forkOSN n ma = forkOS $ do
labelMe n
ma
forkWithUnmaskN n ma = forkWithUnmask $ \umask -> do
labelMe n
ma umask
@ -632,6 +691,8 @@ instance MonadConc IO where
labelMe n
ma umask
isCurrentThreadBound = IO.isCurrentThreadBound
getNumCapabilities = IO.getNumCapabilities
setNumCapabilities = IO.setNumCapabilities
readMVar = IO.readMVar
@ -698,11 +759,16 @@ instance MonadConc m => MonadConc (IsConc m) where
fork ma = toIsConc (fork $ unIsConc ma)
forkOn i ma = toIsConc (forkOn i $ unIsConc ma)
forkOS ma = toIsConc (forkOS $ unIsConc ma)
forkOSN n ma = toIsConc (forkOSN n $ unIsConc ma)
forkWithUnmask ma = toIsConc (forkWithUnmask (\umask -> unIsConc $ ma (\mx -> toIsConc (umask $ unIsConc mx))))
forkWithUnmaskN n ma = toIsConc (forkWithUnmaskN n (\umask -> unIsConc $ ma (\mx -> toIsConc (umask $ unIsConc mx))))
forkOnWithUnmask i ma = toIsConc (forkOnWithUnmask i (\umask -> unIsConc $ ma (\mx -> toIsConc (umask $ unIsConc mx))))
forkOnWithUnmaskN n i ma = toIsConc (forkOnWithUnmaskN n i (\umask -> unIsConc $ ma (\mx -> toIsConc (umask $ unIsConc mx))))
isCurrentThreadBound = toIsConc isCurrentThreadBound
getNumCapabilities = toIsConc getNumCapabilities
setNumCapabilities = toIsConc . setNumCapabilities
myThreadId = toIsConc myThreadId
@ -744,12 +810,17 @@ instance C => MonadConc (T m) where { \
\
fork = liftedF F fork ; \
forkOn = liftedF F . forkOn ; \
forkOS = liftedF F forkOS ; \
\
forkOSN = liftedF F . forkOSN ; \
\
forkWithUnmask = liftedFork F forkWithUnmask ; \
forkWithUnmaskN n = liftedFork F (forkWithUnmaskN n ) ; \
forkOnWithUnmask i = liftedFork F (forkOnWithUnmask i) ; \
forkOnWithUnmaskN n i = liftedFork F (forkOnWithUnmaskN n i) ; \
\
isCurrentThreadBound = lift isCurrentThreadBound ; \
\
getNumCapabilities = lift getNumCapabilities ; \
setNumCapabilities = lift . setNumCapabilities ; \
myThreadId = lift myThreadId ; \

View File

@ -1,6 +1,8 @@
module Cases.MultiThreaded where
import Control.Exception (ArithException(..))
import Control.Monad.IO.Class (liftIO)
import qualified Control.Concurrent as C
import Test.DejaFu (Failure(..), gives, gives', isUncaughtException)
import Test.Framework (Test)
@ -36,6 +38,41 @@ threadingTests = toTestList
x <- newCRef Nothing
_ <- fork . writeCRef x $ Just ()
readCRef x
, djfuT "The main thread is bound" (gives' [(True, True)]) $ do
b1 <- isCurrentThreadBound
-- check the thread is *really* bound
b2 <- liftIO C.isCurrentThreadBound
pure (b1, b2)
, djfuT "A thread started with forkOS is bound" (gives' [(True, True)]) $ do
v <- newEmptyMVar
forkOS $ do
b1 <- isCurrentThreadBound
b2 <- liftIO C.isCurrentThreadBound
putMVar v (b1, b2)
readMVar v
, djfuT "A thread started with fork is not bound" (gives' [False]) $ do
v <- newEmptyMVar
fork $ putMVar v =<< isCurrentThreadBound
readMVar v
, djfuT "An action can be run in an unbound thread" (gives' [(True, False)]) $ do
v <- newEmptyMVar
forkOS $ do
b1 <- isCurrentThreadBound
b2 <- runInUnboundThread isCurrentThreadBound
putMVar v (b1, b2)
readMVar v
, djfuT "An action can be run in a bound thread" (gives' [(False, True)]) $ do
v <- newEmptyMVar
fork $ do
b1 <- isCurrentThreadBound
b2 <- runInBoundThread isCurrentThreadBound
putMVar v (b1, b2)
readMVar v
]
--------------------------------------------------------------------------------

View File

@ -55,5 +55,8 @@ executable dejafu-tests
, test-framework
, test-framework-hunit
, test-framework-quickcheck2
if impl(ghc < 8.0.1)
build-depends: transformers
-- hs-source-dirs:
default-language: Haskell2010
ghc-options: -threaded

View File

@ -23,6 +23,11 @@ This project is versioned according to the [Package Versioning Policy](https://p
- The `autocheckIO`, `dejafuIO`, `dejafusIO`, `autocheckWayIO`, `dejafuWayIO`, `dejafusWayIO`,
`dejafuDiscardIO`, `runTestM`, and `runTestWayM` functions are now gone.
### Test.DejaFu.Common
- New `ForkOS` and `IsCurrentThreadBound` thread actions. (#126)
- New `WillForkOS` and `WillIsCurrentThreadBound` lookaheads. (#126)
### Test.DejaFu.Conc
- The `ConcST` type alias is gone.
@ -34,6 +39,8 @@ This project is versioned according to the [Package Versioning Policy](https://p
- The `runConcurrent` function now has a `MonadConc` constraint.
- If bound threads are supported, the main thread when testing is bound. (#126)
### Test.DejaFu.SCT
- All testing functions now require a `MonadConc` constraint:

View File

@ -77,7 +77,7 @@ module Test.DejaFu
-- to the list of results produced.
--
-- If you simply wish to check that something is deterministic, see
-- the 'autocheck' and 'autocheckIO' functions.
-- the 'autocheck' function.
--
-- These functions use a Total Store Order (TSO) memory model for
-- unsynchronised actions, see \"Testing under Alternative Memory
@ -317,7 +317,7 @@ autocheck = autocheckWay defaultWay defaultMemType
-- with as few as two threads and two pre-emptions, which is part of
-- what 'dejafus' uses.
--
-- __Warning:__ Using largers bounds will almost certainly
-- __Warning:__ Using larger bounds will almost certainly
-- significantly increase the time taken to test!
--
-- @since 1.0.0.0

View File

@ -276,10 +276,14 @@ initialIdSource = Id 0 0 0 0 [] [] [] []
-- | All the actions that a thread can perform.
--
-- @since 0.9.0.0
-- @since 1.0.0.0
data ThreadAction =
Fork ThreadId
-- ^ Start a new thread.
| ForkOS ThreadId
-- ^ Start a new bound thread.
| IsCurrentThreadBound
-- ^ Check if the current thread is bound.
| MyThreadId
-- ^ Get the 'ThreadId' of the current thread.
| GetNumCapabilities Int
@ -368,6 +372,7 @@ data ThreadAction =
instance NFData ThreadAction where
rnf (Fork t) = rnf t
rnf (ForkOS t) = rnf t
rnf (ThreadDelay n) = rnf n
rnf (GetNumCapabilities c) = rnf c
rnf (SetNumCapabilities c) = rnf c
@ -450,10 +455,14 @@ tvarsRead act = S.fromList $ case act of
-- | A one-step look-ahead at what a thread will do next.
--
-- @since 0.9.0.0
-- @since 1.0.0.0
data Lookahead =
WillFork
-- ^ Will start a new thread.
| WillForkOS
-- ^ Will start a new bound thread.
| WillIsCurrentThreadBound
-- ^ Will check if the current thread is bound.
| WillMyThreadId
-- ^ Will get the 'ThreadId'.
| WillGetNumCapabilities
@ -556,6 +565,8 @@ instance NFData Lookahead where
-- @since 0.4.0.0
rewind :: ThreadAction -> Maybe Lookahead
rewind (Fork _) = Just WillFork
rewind (ForkOS _) = Just WillForkOS
rewind IsCurrentThreadBound = Just WillIsCurrentThreadBound
rewind MyThreadId = Just WillMyThreadId
rewind (GetNumCapabilities _) = Just WillGetNumCapabilities
rewind (SetNumCapabilities i) = Just (WillSetNumCapabilities i)
@ -600,6 +611,7 @@ rewind StopSubconcurrency = Just WillStopSubconcurrency
-- @since 0.4.0.0
willRelease :: Lookahead -> Bool
willRelease WillFork = True
willRelease WillForkOS = True
willRelease WillYield = True
willRelease (WillThreadDelay _) = True
willRelease (WillPutMVar _) = True
@ -834,7 +846,8 @@ showTrace trc = intercalate "\n" $ go False trc : strkey where
-- @since 0.7.3.0
threadNames :: Trace -> [(Int, String)]
threadNames = mapMaybe go where
go (_, _, Fork (ThreadId (Just name) i)) = Just (i, name)
go (_, _, Fork (ThreadId (Just name) i)) = Just (i, name)
go (_, _, ForkOS (ThreadId (Just name) i)) = Just (i, name)
go _ = Nothing
-- | Count the number of pre-emptions in a schedule prefix.

View File

@ -2,6 +2,7 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeFamilies #-}
-- |
@ -122,8 +123,11 @@ instance Monad n => C.MonadConc (ConcT r n) where
-- ----------
forkWithUnmaskN n ma = toConc (AFork n (\umask -> runCont (unC $ ma $ wrap umask) (\_ -> AStop (pure ()))))
forkWithUnmaskN n ma = toConc (AFork n (\umask -> runCont (unC $ ma $ wrap umask) (\_ -> AStop (pure ()))))
forkOnWithUnmaskN n _ = C.forkWithUnmaskN n
forkOSN n ma = forkOSWithUnmaskN n (const ma)
isCurrentThreadBound = toConc AIsBound
-- This implementation lies and returns 2 until a value is set. This
-- will potentially avoid special-case behaviour for 1 capability,
@ -171,6 +175,12 @@ instance Monad n => C.MonadConc (ConcT r n) where
atomically = toConc . AAtom
-- move this into the instance defn when forkOSWithUnmaskN is added to MonadConc in 2018
forkOSWithUnmaskN :: Applicative n => String -> ((forall a. ConcT r n a -> ConcT r n a) -> ConcT r n ()) -> ConcT r n ThreadId
forkOSWithUnmaskN n ma
| C.rtsSupportsBoundThreads = toConc (AForkOS n (\umask -> runCont (unC $ ma $ wrap umask) (\_ -> AStop (pure ()))))
| otherwise = fail "RTS doesn't support multiple OS threads (use ghc -threaded when linking)"
-- | Run a concurrent computation with a given 'Scheduler' and initial
-- state, returning a failure reason on error. Also returned is the
-- final state of the scheduler, and an execution trace.

View File

@ -17,6 +17,8 @@ module Test.DejaFu.Conc.Internal where
import Control.Exception (MaskingState(..),
toException)
import Control.Monad.Conc.Class (MonadConc,
rtsSupportsBoundThreads)
import Control.Monad.Ref (MonadRef, newRef, readRef,
writeRef)
import Data.Functor (void)
@ -45,23 +47,27 @@ type SeqTrace
-- | Run a concurrent computation with a given 'Scheduler' and initial
-- state, returning a failure reason on error. Also returned is the
-- final state of the scheduler, and an execution trace.
runConcurrency :: MonadRef r n
=> Scheduler g
-> MemType
-> g
-> IdSource
-> Int
-> M n r a
-> n (Either Failure a, Context n r g, SeqTrace, Maybe (ThreadId, ThreadAction))
runConcurrency :: (MonadConc n, MonadRef r n)
=> Scheduler g
-> MemType
-> g
-> IdSource
-> Int
-> M n r a
-> n (Either Failure a, Context n r g, SeqTrace, Maybe (ThreadId, ThreadAction))
runConcurrency sched memtype g idsrc caps ma = do
(c, ref) <- runRefCont AStop (Just . Right) (runM ma)
let threads0 = launch' Unmasked initialThread (const c) M.empty
threads <- (if rtsSupportsBoundThreads then makeBound initialThread else pure) threads0
let ctx = Context { cSchedState = g
, cIdSource = idsrc
, cThreads = launch' Unmasked initialThread (const c) M.empty
, cThreads = threads
, cWriteBuf = emptyBuffer
, cCaps = caps
}
(finalCtx, trace, finalAction) <- runThreads sched memtype ref ctx
let finalThreads = cThreads finalCtx
mapM_ (`kill` finalThreads) (M.keys finalThreads)
out <- readRef ref
pure (efromJust "runConcurrency" out, finalCtx, trace, finalAction)
@ -75,7 +81,7 @@ data Context n r g = Context
}
-- | Run a collection of threads, until there are no threads left.
runThreads :: MonadRef r n
runThreads :: (MonadConc n, MonadRef r n)
=> Scheduler g
-> MemType
-> r (Maybe (Either Failure a))
@ -159,7 +165,7 @@ data Act
-- | Run a single thread one step, by dispatching on the type of
-- 'Action'.
stepThread :: forall n r g. MonadRef r n
stepThread :: forall n r g. (MonadConc n, MonadRef r n)
=> Scheduler g
-- ^ The scheduler.
-> MemType
@ -174,9 +180,21 @@ stepThread :: forall n r g. MonadRef r n
stepThread sched memtype tid action ctx = case action of
-- start a new thread, assigning it the next 'ThreadId'
AFork n a b -> pure $
let threads' = launch tid newtid a (cThreads ctx)
(idSource', newtid) = nextTId n (cIdSource ctx)
in (Right ctx { cThreads = goto (b newtid) tid threads', cIdSource = idSource' }, Single (Fork newtid))
let threads' = launch tid newtid a (cThreads ctx)
(idSource', newtid) = nextTId n (cIdSource ctx)
in (Right ctx { cThreads = goto (b newtid) tid threads', cIdSource = idSource' }, Single (Fork newtid))
-- start a new bound thread, assigning it the next 'ThreadId'
AForkOS n a b -> do
let (idSource', newtid) = nextTId n (cIdSource ctx)
let threads' = launch tid newtid a (cThreads ctx)
threads'' <- makeBound newtid threads'
pure (Right ctx { cThreads = goto (b newtid) tid threads'', cIdSource = idSource' }, Single (Fork newtid))
-- check if the current thread is bound
AIsBound c ->
let isBound = isJust (_bound =<< M.lookup tid (cThreads ctx))
in simple (goto (c isBound) tid (cThreads ctx)) IsCurrentThreadBound
-- get the 'ThreadId' of the current thread
AMyTId c -> simple (goto (c tid) tid (cThreads ctx)) MyThreadId
@ -316,7 +334,7 @@ stepThread sched memtype tid action ctx = case action of
-- lift an action from the underlying monad into the @Conc@
-- computation.
ALift na -> do
a <- na
a <- runLiftedAct tid (cThreads ctx) na
simple (goto a tid (cThreads ctx)) LiftIO
-- throw an exception, and propagate it to the appropriate
@ -367,7 +385,10 @@ stepThread sched memtype tid action ctx = case action of
AReturn c -> simple (goto c tid (cThreads ctx)) Return
-- kill the current thread.
AStop na -> na >> simple (kill tid (cThreads ctx)) Stop
AStop na -> do
na
threads' <- kill tid (cThreads ctx)
simple threads' Stop
-- run a subconcurrent computation.
ASub ma c
@ -394,7 +415,9 @@ stepThread sched memtype tid action ctx = case action of
Just ts' -> simple ts' act
Nothing
| t == initialThread -> pure (Left (UncaughtException some), Single act)
| otherwise -> simple (kill t ts) act
| otherwise -> do
ts' <- kill t ts
simple ts' act
-- helper for actions which only change the threads.
simple threads' act = pure (Right ctx { cThreads = threads' }, Single act)

View File

@ -109,7 +109,9 @@ runCont = runM
-- primitives of the concurrency. 'spawn' is absent as it is
-- implemented in terms of 'newEmptyMVar', 'fork', and 'putMVar'.
data Action n r =
AFork String ((forall b. M n r b -> M n r b) -> Action n r) (ThreadId -> Action n r)
AFork String ((forall b. M n r b -> M n r b) -> Action n r) (ThreadId -> Action n r)
| AForkOS String ((forall b. M n r b -> M n r b) -> Action n r) (ThreadId -> Action n r)
| AIsBound (Bool -> Action n r)
| AMyTId (ThreadId -> Action n r)
| AGetNumCapabilities (Int -> Action n r)
@ -155,6 +157,8 @@ data Action n r =
-- | Look as far ahead in the given continuation as possible.
lookahead :: Action n r -> Lookahead
lookahead (AFork _ _ _) = WillFork
lookahead (AForkOS _ _ _) = WillForkOS
lookahead (AIsBound _) = WillIsCurrentThreadBound
lookahead (AMyTId _) = WillMyThreadId
lookahead (AGetNumCapabilities _) = WillGetNumCapabilities
lookahead (ASetNumCapabilities i _) = WillSetNumCapabilities i

View File

@ -13,6 +13,7 @@
-- form part of the public interface of this library.
module Test.DejaFu.Conc.Internal.Threading where
import qualified Control.Concurrent.Classy as C
import Control.Exception (Exception, MaskingState(..),
SomeException, fromException)
import Data.List (intersect)
@ -40,11 +41,23 @@ data Thread n r = Thread
-- ^ Stack of exception handlers
, _masking :: MaskingState
-- ^ The exception masking state.
, _bound :: Maybe (BoundThread n r)
-- ^ State for the associated bound thread, if it exists.
}
-- | The state of a bound thread.
data BoundThread n r = BoundThread
{ _runboundIO :: C.MVar n (n (Action n r))
-- ^ Run an @IO@ action in the bound thread by writing to this.
, _getboundIO :: C.MVar n (Action n r)
-- ^ Get the result of the above by reading from this.
, _boundTId :: C.ThreadId n
-- ^ Thread ID
}
-- | Construct a thread with just one action
mkthread :: Action n r -> Thread n r
mkthread c = Thread c Nothing [] Unmasked
mkthread c = Thread c Nothing [] Unmasked Nothing
--------------------------------------------------------------------------------
-- * Blocking
@ -122,15 +135,11 @@ launch parent tid a threads = launch' ms tid a threads where
-- | Start a thread with the given ID and masking state. This must not already be in use!
launch' :: MaskingState -> ThreadId -> ((forall b. M n r b -> M n r b) -> Action n r) -> Threads n r -> Threads n r
launch' ms tid a = M.insert tid thread where
thread = Thread { _continuation = a umask, _blocking = Nothing, _handlers = [], _masking = ms }
thread = Thread (a umask) Nothing [] ms Nothing
umask mb = resetMask True Unmasked >> mb >>= \b -> resetMask False ms >> pure b
resetMask typ m = cont $ \k -> AResetMask typ True m $ k ()
-- | Kill a thread.
kill :: ThreadId -> Threads n r -> Threads n r
kill = M.delete
-- | Block a thread.
block :: BlockedOn -> ThreadId -> Threads n r -> Threads n r
block blockedOn = M.adjust $ \thread -> thread { _blocking = Just blockedOn }
@ -147,3 +156,44 @@ wake blockedOn threads = (unblock <$> threads, M.keys $ M.filter isBlocked threa
isBlocked thread = case (_blocking thread, blockedOn) of
(Just (OnTVar tvids), OnTVar blockedOn') -> tvids `intersect` blockedOn' /= []
(theblock, _) -> theblock == Just blockedOn
-------------------------------------------------------------------------------
-- ** Bound threads
-- | Turn a thread into a bound thread.
makeBound :: C.MonadConc n => ThreadId -> Threads n r -> n (Threads n r)
makeBound tid threads = do
runboundIO <- C.newEmptyMVar
getboundIO <- C.newEmptyMVar
btid <- C.forkOSN ("bound worker for '" ++ show tid ++ "'") (go runboundIO getboundIO)
let bt = BoundThread runboundIO getboundIO btid
pure (M.adjust (\t -> t { _bound = Just bt }) tid threads)
where
go runboundIO getboundIO =
let loop = do
na <- C.takeMVar runboundIO
C.putMVar getboundIO =<< na
loop
in loop
-- | Kill a thread and remove it from the thread map.
--
-- If the thread is bound, the worker thread is cleaned up.
kill :: C.MonadConc n => ThreadId -> Threads n r -> n (Threads n r)
kill tid threads = case M.lookup tid threads of
Just thread -> case _bound thread of
Just bt -> do
C.killThread (_boundTId bt)
pure (M.delete tid threads)
Nothing -> pure (M.delete tid threads)
Nothing -> pure threads
-- | Run an action.
--
-- If the thread is bound, the action is run in the worker thread.
runLiftedAct :: C.MonadConc n => ThreadId -> Threads n r -> n (Action n r) -> n (Action n r)
runLiftedAct tid threads ma = case _bound =<< M.lookup tid threads of
Just bt -> do
C.putMVar (_runboundIO bt) ma
C.takeMVar (_getboundIO bt)
Nothing -> ma

View File

@ -17,9 +17,8 @@ module Test.HUnit.DejaFu
( -- * Unit testing
-- | This is supported by the 'Assertable' and 'Testable' instances
-- for 'ConcIO'. These instances tries all executions,
-- reporting as failures the cases which throw an 'HUnitFailure'
-- exception.
-- for 'ConcIO'. These instances try all executions, reporting as
-- failures the cases which throw an 'HUnitFailure' exception.
--
-- @instance Testable (ConcIO ())@
-- @instance Assertable (ConcIO ())@

View File

@ -25,7 +25,7 @@ module Test.Tasty.DejaFu
-- @instance IsOption Bounds@
-- @instance IsOption MemType@
-- * Property testing
-- * Unit testing
testAuto
, testDejafu
, testDejafus