Overhaul of concurrency primitives (#968)

Co-authored-by: Guillaume ALLAIS <guillaume.allais@ens-lyon.org>
This commit is contained in:
Wen Kokke 2021-02-05 16:16:20 +00:00 committed by GitHub
parent 7f495999bd
commit bd683938bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
59 changed files with 756 additions and 149 deletions

View File

@ -10,6 +10,61 @@ Changes since Idris 2 v0.2.1
Library changes:
* Overhaul of the concurrency primitives:
- Renamed `System.Concurrency.Raw` to `System.Concurrency`.
- Modified the implementation of `Prelude.IO.fork` in the Chez Scheme RTS, which
now returns a semaphore instead of a thread object. This allows the main
thread to wait for the child thread to finish (see next bullet). The Racket
implementation already returned a thread descriptor, which could be used to
wait for the thread to finish.
- Added `Prelude.IO.threadWait` which waits for a thread, identified by a
`ThreadID`, to finish. This operation is supported by both the Chez Scheme and
the Racket RTS'es.
- Added semaphores to `System.Concurrency`, supported by both the Chez Scheme
and Racket RTS'es.
- Added barriers to `System.Concurrency`, supported by both the Chez Scheme
and Racket RTS'es.
- Added synchronous channels to `System.Concurrency`, supported by both the Chez
Scheme and Racket RTS'es.
- Fixed the support for mutexes in the Racket RTS. Formerly, they were
implemented with semaphores, and calling`mutexRelease` multiple times would
increment the internal counter multiple times, allowing multiple concurrent
`mutexAcquire` operations to succeed simultaneously. Currently, `mutexRelease`
fails when called on a mutex which isn't owned. (However, `mutexRelease` does
not check whether the mutex is in fact owned by the current thread, which may
be a bug.)
- Modified the support for condition variables in the Racket RTS. Formerly,
they were implemented using synchronous channels, meaning that:
+ `conditionSignal` was a blocking operation; and
+ calling `conditionSignal` on a condition variable on which no thread
was waiting would wake the next thread to call `conditionWait`, whereas
condition variables are supposed to be stateless, and only wake threads
already in the queue.
The implementation was replaced with an implementation based on asynchronous
channels and mutexes, based on the following paper:
https://www.microsoft.com/en-us/research/wp-content/uploads/2004/12/ImplementingCVs.pdf
- Removed `threadID` and `blodwen-thisthread`. Formerly, in the Chez Scheme
backend, this function returned "the thread id of the current thread" as a
value of type `ThreadID`. However, `fork` returned a "thread object" as a
value of type `ThreadID`. These are *different kinds of values* in Chez
Scheme. As there was nothing one could do with a value of type `ThreadID`, I
chose to remove `threadID`, as it allowed me to implement `threadWait` more
easily.
- Renamed `blodwen-lock` and `blodwen-unlock` to `blodwen-mutex-acquire` and
`blodwen-mutex-release` for consistency, as these functions are referred to
with acquire and release both in Chez Scheme and in the Idris2 concurrency
module.
* Added `Data.HVect` in `contrib`, for heterogeneous vectors.
* Various other library functions added throughout `base` and `contrib`

View File

@ -8773,7 +8773,7 @@
(define CompilerC-45SchemeC-45Racket-showRacketString (lambda (arg-0) (let ((sc0 arg-0)) (case (vector-ref sc0 0) ((0) (lambda (eta-0) eta-0)) (else (let ((e-2 (vector-ref sc0 1))) (let ((e-3 (vector-ref sc0 2))) (let ((sc1 e-2)) (cond ((equal? sc1 #\") (lambda (eta-0) (PreludeC-45TypesC-45Strings-C-43C-43 "\\\"" ((CompilerC-45SchemeC-45Racket-showRacketString e-3) eta-0))))(else (lambda (eta-0) ((CompilerC-45SchemeC-45Racket-showRacketChar e-2) ((CompilerC-45SchemeC-45Racket-showRacketString e-3) eta-0)))))))))))))
(define CompilerC-45SchemeC-45Racket-showRacketChar (lambda (arg-0) (let ((sc0 arg-0)) (cond ((equal? sc0 #\\) (lambda (arg-1) (PreludeC-45TypesC-45Strings-C-43C-43 "\\\\" arg-1)))(else (CompilerC-45SchemeC-45Racket-case--showRacketChar-477 arg-0 (PreludeC-45Basics-C-124C-124 (PreludeC-45EqOrd-C-60_Ord_Char arg-0 (PreludeC-45Types-chr 32)) (lambda () (PreludeC-45EqOrd-C-62_Ord_Char arg-0 (PreludeC-45Types-chr 126))))))))))
(define CompilerC-45SchemeC-45Racket-schemeCall (lambda (arg-0 arg-1 arg-2 arg-3) (let ((call (PreludeC-45TypesC-45Strings-C-43C-43 "(" (PreludeC-45TypesC-45Strings-C-43C-43 arg-1 (PreludeC-45TypesC-45Strings-C-43C-43 " " (PreludeC-45TypesC-45Strings-C-43C-43 (CoreC-45NameC-45Namespace-showSep " " (PreludeC-45Types-map_Functor_List 'erased 'erased (lambda (eta-0) (CompilerC-45SchemeC-45Common-schName eta-0)) arg-2)) ")")))))) (let ((sc0 arg-3)) (case (vector-ref sc0 0) ((14) (lambda (eta-0) (vector 1 (CompilerC-45SchemeC-45Common-mkWorld call))))(else (lambda (eta-0) (vector 1 call))))))))
(define CompilerC-45SchemeC-45Racket-schHeader (lambda (arg-0) (PreludeC-45TypesC-45Strings-C-43C-43 "#lang racket/base\u000a" (PreludeC-45TypesC-45Strings-C-43C-43 "; @generated\u000a" (PreludeC-45TypesC-45Strings-C-43C-43 "(require racket/future)\u000a" (PreludeC-45TypesC-45Strings-C-43C-43 "(require racket/math)\u000a" (PreludeC-45TypesC-45Strings-C-43C-43 "(require racket/system)\u000a" (PreludeC-45TypesC-45Strings-C-43C-43 "(require rnrs/bytevectors-6)\u000a" (PreludeC-45TypesC-45Strings-C-43C-43 "(require rnrs/io/ports-6)\u000a" (PreludeC-45TypesC-45Strings-C-43C-43 "(require srfi/19)\u000a" (PreludeC-45TypesC-45Strings-C-43C-43 "(require ffi/unsafe ffi/unsafe/define)\u000a" (PreludeC-45TypesC-45Strings-C-43C-43 arg-0 "(let ()\u000a"))))))))))))
(define CompilerC-45SchemeC-45Racket-schHeader (lambda (arg-0) (PreludeC-45TypesC-45Strings-C-43C-43 "#lang racket/base\u000a" (PreludeC-45TypesC-45Strings-C-43C-43 "; @generated\u000a" (PreludeC-45TypesC-45Strings-C-43C-43 "(require racket/async-channel)\u000a" (PreludeC-45TypesC-45Strings-C-43C-43 "(require racket/future)\u000a" (PreludeC-45TypesC-45Strings-C-43C-43 "(require racket/math)\u000a" (PreludeC-45TypesC-45Strings-C-43C-43 "(require racket/system)\u000a" (PreludeC-45TypesC-45Strings-C-43C-43 "(require rnrs/bytevectors-6)\u000a" (PreludeC-45TypesC-45Strings-C-43C-43 "(require rnrs/io/ports-6)\u000a" (PreludeC-45TypesC-45Strings-C-43C-43 "(require srfi/19)\u000a" (PreludeC-45TypesC-45Strings-C-43C-43 "(require ffi/unsafe ffi/unsafe/define)\u000a" (PreludeC-45TypesC-45Strings-C-43C-43 arg-0 "(let ()\u000a")))))))))))))
(define CompilerC-45SchemeC-45Racket-schFooter (lambda () ") (collect-garbage)"))
(define CompilerC-45SchemeC-45Racket-schFgnDef (lambda (arg-0 arg-1 arg-2 arg-3 arg-4 arg-5 arg-6 arg-7) (let ((sc0 arg-7)) (case (vector-ref sc0 0) ((2) (let ((e-0 (vector-ref sc0 1))) (let ((e-1 (vector-ref sc0 2))) (let ((e-2 (vector-ref sc0 3))) (let ((argns (CompilerC-45SchemeC-45Racket-mkArgs 0 e-1))) (let ((allargns (PreludeC-45Types-map_Functor_List 'erased 'erased (lambda (eta-0) (Builtin-fst 'erased 'erased eta-0)) argns))) (let ((useargns (PreludeC-45Types-map_Functor_List 'erased 'erased (lambda (eta-0) (Builtin-fst 'erased 'erased eta-0)) (DataC-45List-filter 'erased (lambda (eta-0) (Builtin-snd 'erased 'erased eta-0)) argns)))) (lambda (eta-0) (let ((act-24 ((CoreC-45Core-traverseC-39 'erased 'erased (lambda (eta-1) (CompilerC-45SchemeC-45Racket-mkStruct arg-3 eta-1)) e-1 (vector 0 )) eta-0))) (let ((sc1 act-24)) (case (vector-ref sc1 0) ((0) (let ((e-3 (vector-ref sc1 1))) (vector 0 e-3))) (else (let ((e-5 (vector-ref sc1 1))) (let ((act-25 ((CompilerC-45SchemeC-45Racket-mkStruct arg-3 e-2) eta-0))) (let ((sc2 act-25)) (case (vector-ref sc2 0) ((0) (let ((e-3 (vector-ref sc2 1))) (vector 0 e-3))) (else (let ((e-6 (vector-ref sc2 1))) (let ((act-26 ((CompilerC-45SchemeC-45Racket-useCC arg-0 arg-1 arg-2 arg-4 arg-5 e-0 (DataC-45List-zip 'erased 'erased useargns e-1) e-2) eta-0))) (let ((sc3 act-26)) (case (vector-ref sc3 0) ((0) (let ((e-3 (vector-ref sc3 1))) (vector 0 e-3))) (else (let ((e-7 (vector-ref sc3 1))) (let ((sc4 e-7)) (let ((e-4 (vector-ref sc4 1))) (let ((e-3 (vector-ref sc4 2))) (let ((act-27 (let ((act-27 (unbox arg-1))) (vector 1 act-27)))) (let ((sc5 act-27)) (case (vector-ref sc5 0) ((0) (let ((e-8 (vector-ref sc5 1))) (vector 0 e-8))) (else (let ((e-8 (vector-ref sc5 1))) (let ((act-28 ((CoreC-45Context-full_HasNames_Name (let ((sc6 e-8)) (let ((e-35 (vector-ref sc6 1))) e-35)) arg-6) eta-0))) (let ((sc6 act-28)) (case (vector-ref sc6 0) ((0) (let ((e-9 (vector-ref sc6 1))) (vector 0 e-9))) (else (let ((e-9 (vector-ref sc6 1))) (vector 1 (vector 0 (PreludeC-45TypesC-45Strings-C-43C-43 ((PreludeC-45Interfaces-concat 'erased 'erased (vector 0 (vector 0 (lambda (acc) (lambda (elem) (lambda (func) (lambda (init) (lambda (input) (PreludeC-45Types-foldr_Foldable_List 'erased 'erased func init input)))))) (lambda (elem) (lambda (acc) (lambda (func) (lambda (init) (lambda (input) (PreludeC-45Types-foldl_Foldable_List 'erased 'erased func init input)))))) (lambda (elem) (lambda (arg-821) (PreludeC-45Types-null_Foldable_List 'erased arg-821)))) (vector 0 (lambda (arg-8) (lambda (arg-9) (PreludeC-45Types-C-60C-43C-62_Semigroup_String arg-8 arg-9))) (PreludeC-45Types-neutral_Monoid_String)))) e-5) (PreludeC-45TypesC-45Strings-C-43C-43 e-6 e-4)) (PreludeC-45TypesC-45Strings-C-43C-43 "(define " (PreludeC-45TypesC-45Strings-C-43C-43 (CompilerC-45SchemeC-45Common-schName e-9) (PreludeC-45TypesC-45Strings-C-43C-43 " (lambda (" (PreludeC-45TypesC-45Strings-C-43C-43 (CoreC-45NameC-45Namespace-showSep " " (PreludeC-45Types-map_Functor_List 'erased 'erased (lambda (eta-1) (CompilerC-45SchemeC-45Common-schName eta-1)) allargns)) (PreludeC-45TypesC-45Strings-C-43C-43 ") " (PreludeC-45TypesC-45Strings-C-43C-43 e-3 "))\u000a"))))))))))))))))))))))))))))))))))))))))))))(else (lambda (eta-0) (vector 1 (vector 0 "" ""))))))))
(define CompilerC-45SchemeC-45Racket-rktToC (lambda (arg-0 arg-1) (let ((sc0 arg-0)) (case (vector-ref sc0 0) ((8) (PreludeC-45TypesC-45Strings-C-43C-43 "(char->integer " (PreludeC-45TypesC-45Strings-C-43C-43 arg-1 ")")))(else arg-1)))))

View File

@ -0,0 +1,191 @@
module System.Concurrency
-- At the moment this is pretty fundamentally tied to the Scheme RTS.
-- Given that different back ends will have entirely different threading
-- models, it might be unavoidable, but we might want to think about possible
-- primitives that back ends should support.
-- Thread mailboxes
%foreign "scheme:blodwen-set-thread-data"
prim__setThreadData : {a : Type} -> a -> PrimIO ()
%foreign "scheme:blodwen-get-thread-data"
prim__getThreadData : (a : Type) -> PrimIO a
export
setThreadData : HasIO io => {a : Type} -> a -> io ()
setThreadData val = primIO (prim__setThreadData val)
export
getThreadData : HasIO io => (a : Type) -> io a
getThreadData a = primIO (prim__getThreadData a)
-- Mutexes
export
data Mutex : Type where [external]
%foreign "scheme:blodwen-make-mutex"
prim__makeMutex : PrimIO Mutex
%foreign "scheme:blodwen-mutex-acquire"
prim__mutexAcquire : Mutex -> PrimIO ()
%foreign "scheme:blodwen-mutex-release"
prim__mutexRelease : Mutex -> PrimIO ()
||| Creates and returns a new mutex.
export
makeMutex : HasIO io => io Mutex
makeMutex = primIO prim__makeMutex
||| Acquires the mutex identified by `mutex`. The thread blocks until the mutex
||| has been acquired.
|||
||| Mutexes are recursive in Posix threads terminology, which means that the
||| calling thread can use mutex-acquire to (re)acquire a mutex it already has.
||| In this case, an equal number of mutex-release calls is necessary to release
||| the mutex.
export
mutexAcquire : HasIO io => Mutex -> io ()
mutexAcquire m = primIO (prim__mutexAcquire m)
||| Releases the mutex identified by `mutex`. Unpredictable behavior results if
||| the mutex is not owned by the calling thread.
export
mutexRelease : HasIO io => Mutex -> io ()
mutexRelease m = primIO (prim__mutexRelease m)
-- Condition variables
export
data Condition : Type where [external]
%foreign "scheme:blodwen-make-condition"
prim__makeCondition : PrimIO Condition
%foreign "scheme:blodwen-condition-wait"
prim__conditionWait : Condition -> Mutex -> PrimIO ()
%foreign "scheme:blodwen-condition-wait-timeout"
prim__conditionWaitTimeout : Condition -> Mutex -> Int -> PrimIO ()
%foreign "scheme:blodwen-condition-signal"
prim__conditionSignal : Condition -> PrimIO ()
%foreign "scheme:blodwen-condition-broadcast"
prim__conditionBroadcast : Condition -> PrimIO ()
||| Creates and returns a new condition variable.
export
makeCondition : HasIO io => io Condition
makeCondition = primIO prim__makeCondition
||| Waits up to the specified timeout for the condition identified by the
||| condition variable `cond`. The calling thread must have acquired the mutex
||| identified by `mutex` at the time `conditionWait` is called. The mutex is
||| released as a side effect of the call to `conditionWait`. When a thread is
||| later released from the condition variable by one of the procedures
||| described below, the mutex is reacquired and `conditionWait` returns.
export
conditionWait : HasIO io => Condition -> Mutex -> io ()
conditionWait cond mutex = primIO (prim__conditionWait cond mutex)
||| Variant of `conditionWait` with a timeout in microseconds.
||| When the timeout expires, the thread is released, `mutex` is reacquired, and
||| `conditionWaitTimeout` returns.
export
conditionWaitTimeout : HasIO io => Condition -> Mutex -> Int -> io ()
conditionWaitTimeout cond mutex timeout = primIO (prim__conditionWaitTimeout cond mutex timeout)
||| Releases one of the threads waiting for the condition identified by `cond`.
export
conditionSignal : HasIO io => Condition -> io ()
conditionSignal c = primIO (prim__conditionSignal c)
||| Releases all of the threads waiting for the condition identified by `cond`.
export
conditionBroadcast : HasIO io => Condition -> io ()
conditionBroadcast c = primIO (prim__conditionBroadcast c)
-- Semaphores
export
data Semaphore : Type where [external]
%foreign "scheme:blodwen-make-semaphore"
prim__makeSemaphore : Int -> PrimIO Semaphore
%foreign "scheme:blodwen-semaphore-post"
prim__semaphorePost : Semaphore -> PrimIO ()
%foreign "scheme:blodwen-semaphore-wait"
prim__semaphoreWait : Semaphore -> PrimIO ()
||| Creates and returns a new semaphore with the counter initially set to `init`.
export
makeSemaphore : HasIO io => Int -> io Semaphore
makeSemaphore init = primIO (prim__makeSemaphore init)
||| Increments the semaphore's internal counter.
export
semaphorePost : HasIO io => Semaphore -> io ()
semaphorePost sema = primIO (prim__semaphorePost sema)
||| Blocks until the internal counter for semaphore sema is non-zero. When the
||| counter is non-zero, it is decremented and `semaphoreWait` returns.
export
semaphoreWait : HasIO io => Semaphore -> io ()
semaphoreWait sema = primIO (prim__semaphoreWait sema)
-- Barriers
||| A barrier enables multiple threads to synchronize the beginning of some
||| computation.
export
data Barrier : Type where [external]
%foreign "scheme:blodwen-make-barrier"
prim__makeBarrier : Int -> PrimIO Barrier
%foreign "scheme:blodwen-barrier-wait"
prim__barrierWait : Barrier -> PrimIO ()
||| Creates a new barrier that can block a given number of threads.
export
makeBarrier : HasIO io => Int -> io Barrier
makeBarrier numThreads = primIO (prim__makeBarrier numThreads)
||| Blocks the current thread until all threads have rendezvoused here.
export
barrierWait : HasIO io => Barrier -> io ()
barrierWait barrier = primIO (prim__barrierWait barrier)
-- Channels
export
data Channel : Type -> Type where [external]
%foreign "scheme:blodwen-make-channel"
prim__makeChannel : PrimIO (Channel a)
%foreign "scheme:blodwen-channel-get"
prim__channelGet : Channel a -> PrimIO a
%foreign "scheme:blodwen-channel-put"
prim__channelPut : Channel a -> a -> PrimIO ()
||| Creates and returns a new channel. The channel can be used with channelGet
||| to receive a value through the channel. The channel can be used with
||| channelPut to send a value through the channel.
export
makeChannel : HasIO io => io (Channel a)
makeChannel = primIO prim__makeChannel
||| Blocks until a sender is ready to provide a value through `chan`. The result
||| is the sent value.
export
channelGet : HasIO io => Channel a -> io a
channelGet chan = primIO (prim__channelGet chan)
||| Blocks until a receiver is ready to accept the value `val` through `chan`.
export
channelPut : HasIO io => Channel a -> a -> io ()
channelPut chan val = primIO (prim__channelPut chan val)

View File

@ -1,84 +0,0 @@
module System.Concurrency.Raw
-- At the moment this is pretty fundamentally tied to the Scheme RTS
-- Given that different back ends will have entirely different threading
-- models, it might be unavoidable, but we might want to think about possible
-- primitives that back ends should support.
%foreign "scheme:blodwen-thisthread"
prim__threadID : PrimIO ThreadID
%foreign "scheme:blodwen-set-thread-data"
prim__setThreadData : {a : Type} -> a -> PrimIO ()
%foreign "scheme:blodwen-get-thread-data"
prim__getThreadData : (a : Type) -> PrimIO a
-- Mutexes and condition variables.
export
threadID : IO ThreadID
threadID = primIO prim__threadID
export
setThreadData : {a : Type} -> a -> IO ()
setThreadData val = primIO (prim__setThreadData val)
export
getThreadData : (a : Type) -> IO a
getThreadData a = primIO (prim__getThreadData a)
export
data Mutex : Type where [external]
%foreign "scheme:blodwen-mutex"
prim__makeMutex : PrimIO Mutex
%foreign "scheme:blodwen-lock"
prim__mutexAcquire : Mutex -> PrimIO ()
%foreign "scheme:blodwen-unlock"
prim__mutexRelease : Mutex -> PrimIO ()
export
makeMutex : HasIO io => io Mutex
makeMutex = primIO prim__makeMutex
export
mutexAcquire : HasIO io => Mutex -> io ()
mutexAcquire m = primIO (prim__mutexAcquire m)
export
mutexRelease : HasIO io => Mutex -> io ()
mutexRelease m = primIO (prim__mutexRelease m)
export
data Condition : Type where [external]
%foreign "scheme:blodwen-condition"
prim__makeCondition : PrimIO Condition
%foreign "scheme:blodwen-condition-wait"
prim__conditionWait : Condition -> Mutex -> PrimIO ()
%foreign "scheme:blodwen-condition-wait-timeout"
prim__conditionWaitTimeout : Condition -> Mutex -> Int -> PrimIO ()
%foreign "scheme:blodwen-condition-signal"
prim__conditionSignal : Condition -> PrimIO ()
%foreign "scheme:blodwen-condition-broadcast"
prim__conditionBroadcast : Condition -> PrimIO ()
export
makeCondition : HasIO io => io Condition
makeCondition = primIO prim__makeCondition
export
conditionWait : HasIO io => Condition -> Mutex -> io ()
conditionWait c m = primIO (prim__conditionWait c m)
||| Timeout is in microseconds
export
conditionWaitTimeout : HasIO io => Condition -> Mutex -> Int -> io ()
conditionWaitTimeout c m t = primIO (prim__conditionWaitTimeout c m t)
export
conditionSignal : HasIO io => Condition -> io ()
conditionSignal c = primIO (prim__conditionSignal c)
export
conditionBroadcast : HasIO io => Condition -> io ()
conditionBroadcast c = primIO (prim__conditionBroadcast c)

View File

@ -74,7 +74,7 @@ modules = Control.App,
Language.Reflection.TTImp,
System,
System.Concurrency.Raw,
System.Concurrency,
System.Clock,
System.Directory,
System.File,

View File

@ -8,8 +8,8 @@ export
data Future : Type -> Type where [external]
%extern prim__makeFuture : {0 a : Type} -> Lazy a -> Future a
%foreign "racket:blodwen-await-future"
"scheme:blodwen-await-future"
%foreign "scheme:blodwen-await-future"
prim__awaitFuture : {0 a : Type} -> Future a -> a
export

View File

@ -123,6 +123,14 @@ export
fork : (1 prog : IO ()) -> IO ThreadID
fork act = fromPrim (prim__fork (toPrim act))
%foreign "scheme:blodwen-thread-wait"
export
prim__threadWait : (1 threadID : ThreadID) -> PrimIO ()
export
threadWait : (1 threadID : ThreadID) -> IO ()
threadWait threadID = fromPrim (prim__threadWait threadID)
%foreign "C:idris2_readString, libidris2_support"
export
prim__getErrno : Int

View File

@ -44,6 +44,7 @@ schHeader : String -> String
schHeader libs
= "#lang racket/base\n" ++
"; @generated\n" ++
"(require racket/async-channel)\n" ++ -- for asynchronous channels
"(require racket/future)\n" ++ -- for parallelism/concurrency
"(require racket/math)\n" ++ -- for math ops
"(require racket/system)\n" ++ -- for system

View File

@ -195,10 +195,21 @@
;; Threads
(define blodwen-thread-data (make-thread-parameter #f))
(define-record thread-handle (semaphore))
(define (blodwen-thread p)
(fork-thread (lambda () (p (vector 0)))))
(define (blodwen-thread proc)
(let [(sema (blodwen-make-semaphore 0))]
(fork-thread (lambda () (proc (vector 0)) (blodwen-semaphore-post sema)))
(make-thread-handle sema)
))
(define (blodwen-thread-wait handle)
(blodwen-semaphore-wait (thread-handle-semaphore handle)))
;; Thread mailboxes
(define blodwen-thread-data
(make-thread-parameter #f))
(define (blodwen-get-thread-data ty)
(blodwen-thread-data))
@ -206,19 +217,103 @@
(define (blodwen-set-thread-data a)
(blodwen-thread-data a))
(define (blodwen-mutex) (make-mutex))
(define (blodwen-lock m) (mutex-acquire m))
(define (blodwen-unlock m) (mutex-release m))
(define (blodwen-thisthread) (get-thread-id))
;; Semaphore
(define (blodwen-condition) (make-condition))
(define (blodwen-condition-wait c m) (condition-wait c m))
(define (blodwen-condition-wait-timeout c m t)
(let ((sec (div t 1000000))
(micro (mod t 1000000)))
(condition-wait c m (make-time 'time-duration (* 1000 micro) sec))))
(define (blodwen-condition-signal c) (condition-signal c))
(define (blodwen-condition-broadcast c) (condition-broadcast c))
(define-record semaphore (box mutex condition))
(define (blodwen-make-semaphore init)
(make-semaphore (box init) (make-mutex) (make-condition)))
(define (blodwen-semaphore-post sema)
(with-mutex (semaphore-mutex sema)
(let [(sema-box (semaphore-box sema))]
(set-box! sema-box (+ (unbox sema-box) 1))
(condition-signal (semaphore-condition sema))
)))
(define (blodwen-semaphore-wait sema)
(with-mutex (semaphore-mutex sema)
(let [(sema-box (semaphore-box sema))]
(when (= (unbox sema-box) 0)
(condition-wait (semaphore-condition sema) (semaphore-mutex sema)))
(set-box! sema-box (- (unbox sema-box) 1))
)))
;; Barrier
(define-record barrier (count-box num-threads mutex cond))
(define (blodwen-make-barrier num-threads)
(make-barrier (box 0) num-threads (make-mutex) (make-condition)))
(define (blodwen-barrier-wait barrier)
(let [(count-box (barrier-count-box barrier))
(num-threads (barrier-num-threads barrier))
(mutex (barrier-mutex barrier))
(condition (barrier-cond barrier))]
(with-mutex mutex
(let* [(count-old (unbox count-box))
(count-new (+ count-old 1))]
(set-box! count-box count-new)
(if (= count-new num-threads)
(condition-broadcast condition)
(condition-wait condition mutex))
))))
;; Channel
(define-record channel (box mutex semaphore-get semaphore-put))
(define (blodwen-make-channel ty)
(make-channel
(box '())
(make-mutex)
(blodwen-make-semaphore 0)
(blodwen-make-semaphore 0)))
(define (blodwen-channel-get ty chan)
(blodwen-semaphore-post (channel-semaphore-get chan))
(blodwen-semaphore-wait (channel-semaphore-put chan))
(with-mutex (channel-mutex chan)
(let* [(chan-box (channel-box chan))
(chan-msg-queue (unbox chan-box))]
(set-box! chan-box (cdr chan-msg-queue))
(car chan-msg-queue)
)))
(define (blodwen-channel-put ty chan val)
(with-mutex (channel-mutex chan)
(let* [(chan-box (channel-box chan))
(chan-msg-queue (unbox chan-box))]
(set-box! chan-box (append chan-msg-queue (list val)))))
(blodwen-semaphore-post (channel-semaphore-put chan))
(blodwen-semaphore-wait (channel-semaphore-get chan)))
;; Mutex
(define (blodwen-make-mutex)
(make-mutex))
(define (blodwen-mutex-acquire mutex)
(mutex-acquire mutex))
(define (blodwen-mutex-release mutex)
(mutex-release mutex))
;; Condition variable
(define (blodwen-make-condition)
(make-condition))
(define (blodwen-condition-wait condition mutex)
(condition-wait condition mutex))
(define (blodwen-condition-wait-timeout condition mutex timeout)
(let* [(sec (div timeout 1000000))
(micro (mod timeout 1000000))]
(condition-wait condition mutex (make-time 'time-duration (* 1000 micro) sec))))
(define (blodwen-condition-signal condition)
(condition-signal condition))
(define (blodwen-condition-broadcast condition)
(condition-broadcast condition))
;; Future
(define-record future-internal (result ready mutex signal))
(define (blodwen-make-future work)

View File

@ -186,10 +186,15 @@
;; Threads
(define blodwen-thread-data (make-thread-cell #f))
(define (blodwen-thread proc)
(thread (lambda () (proc (vector 0)))))
(define (blodwen-thread p)
(thread (lambda () (p (vector 0)))))
(define (blodwen-thread-wait handle)
(thread-wait handle))
;; Thread mailboxes
(define blodwen-thread-data (make-thread-cell #f))
(define (blodwen-get-thread-data ty)
(thread-cell-ref blodwen-thread-data))
@ -197,22 +202,92 @@
(define (blodwen-set-thread-data a)
(thread-cell-set! blodwen-thread-data a))
(define (blodwen-mutex) (make-semaphore 1))
(define (blodwen-lock m) (semaphore-post m))
(define (blodwen-unlock m) (semaphore-wait m))
(define (blodwen-thisthread) (current-thread))
;; Semaphores
(define (blodwen-make-semaphore init)
(make-semaphore init))
(define (blodwen-semaphore-post sema)
(semaphore-post sema))
(define (blodwen-semaphore-wait sema)
(semaphore-wait sema))
;; Barriers
(struct barrier (count-box num-threads mutex semaphore))
(define (blodwen-make-barrier num-threads)
(barrier (box 0) num-threads (blodwen-make-mutex) (make-semaphore 0)))
(define (blodwen-barrier-wait barrier)
(blodwen-mutex-acquire (barrier-mutex barrier))
(let* [(count-box (barrier-count-box barrier))
(count-old (unbox count-box))
(count-new (+ count-old 1))
(sema (barrier-semaphore barrier))]
(set-box! count-box count-new)
(blodwen-mutex-release (barrier-mutex barrier))
(when (= count-new (barrier-num-threads barrier)) (semaphore-post sema))
(semaphore-wait sema)
(semaphore-post sema)
))
;; Channels
(define (blodwen-make-channel ty)
(make-channel))
(define (blodwen-channel-get ty chan)
(channel-get chan))
(define (blodwen-channel-put ty chan val)
(channel-put chan val))
;; Mutex
(define (blodwen-make-mutex)
(make-semaphore 1))
(define (blodwen-mutex-acquire sema)
(semaphore-wait sema))
(define (blodwen-mutex-release sema)
(if (semaphore-try-wait? sema)
(blodwen-error-quit "Exception in mutexRelease: thread does not own mutex")
(semaphore-post sema)))
;; Condition Variables
(define (blodwen-make-condition)
(make-async-channel))
(define (blodwen-condition-wait ach mutex)
;; Pre-condition: this threads holds `mutex'.
(let [(sema (make-semaphore 0))]
(async-channel-put ach sema)
(blodwen-mutex-release mutex)
(sync sema)
(blodwen-mutex-acquire mutex)))
(define (blodwen-condition-wait-timeout ach mutex timeout)
;; Pre-condition: this threads holds `mutex'.
(let [(sema (make-semaphore 0))]
(async-channel-put ach sema)
(blodwen-mutex-release mutex)
(sync/timeout (/ timeout 1000000) sema)
(blodwen-mutex-acquire mutex)))
(define (blodwen-condition-signal ach)
(let [(sema (async-channel-try-get ach))]
(when sema (semaphore-post sema))))
(define (blodwen-condition-broadcast ach)
(letrec [(loop (lambda ()
(let [(sema (async-channel-try-get ach))]
(when sema ((semaphore-post sema) (loop))))))]
loop))
(define (blodwen-condition) (make-channel))
(define (blodwen-condition-wait c m)
(blodwen-unlock m) ;; consistency with interface for posix condition variables
(sync c)
(blodwen-lock m))
(define (blodwen-condition-wait-timeout c m t)
(blodwen-unlock m) ;; consistency with interface for posix condition variables
(sync/timeout (/ t 1000000) c)
(blodwen-lock m))
(define (blodwen-condition-signal c)
(channel-put c 'ready))
(define (blodwen-make-future work) (future work))
(define (blodwen-await-future ty future) (touch future))

View File

@ -20,17 +20,18 @@ import Lib
ttimpTests : TestPool
ttimpTests = MkTestPool []
["basic001", "basic002", "basic003", "basic004", "basic005",
"basic006",
"coverage001", "coverage002",
"dot001",
"eta001", "eta002",
"lazy001",
"nest001", "nest002",
"perf001", "perf002", "perf003",
"record001", "record002", "record003",
"qtt001", "qtt003",
"total001", "total002", "total003"]
[ "basic001", "basic002", "basic003", "basic004", "basic005"
, "basic006"
, "coverage001", "coverage002"
, "dot001"
, "eta001", "eta002"
, "lazy001"
, "nest001", "nest002"
, "perf001", "perf002", "perf003"
, "record001", "record002", "record003"
, "qtt001", "qtt003"
, "total001", "total002", "total003"
]
idrisTestsBasic : TestPool
idrisTestsBasic = MkTestPool []
@ -157,21 +158,25 @@ idrisTests = MkTestPool []
typeddTests : TestPool
typeddTests = MkTestPool []
["chapter01", "chapter02", "chapter03", "chapter04", "chapter05",
"chapter06", "chapter07", "chapter08", "chapter09", "chapter10",
"chapter11", "chapter12", "chapter13", "chapter14"]
[ "chapter01", "chapter02", "chapter03", "chapter04", "chapter05"
, "chapter06", "chapter07", "chapter08", "chapter09", "chapter10"
, "chapter11", "chapter12", "chapter13", "chapter14"
]
chezTests : TestPool
chezTests = MkTestPool [Chez]
["chez001", "chez002", "chez003", "chez004", "chez005", "chez006",
"chez007", "chez008", "chez009", "chez010", "chez011", "chez012",
"chez013", "chez014", "chez015", "chez016", "chez017", "chez018",
"chez019", "chez020", "chez021", "chez022", "chez023", "chez024",
"chez025", "chez026", "chez027", "chez028", "chez029", "chez030",
"chez031",
"concurrency001",
"perf001",
"reg001"]
[ "chez001", "chez002", "chez003", "chez004", "chez005", "chez006"
, "chez007", "chez008", "chez009", "chez010", "chez011", "chez012"
, "chez013", "chez014", "chez015", "chez016", "chez017", "chez018"
, "chez019", "chez020", "chez021", "chez022", "chez023", "chez024"
, "chez025", "chez026", "chez027", "chez028", "chez029", "chez030"
, "chez031"
, "futures001"
, "semaphores001"
, "semaphores002"
, "perf001"
, "reg001"
]
refcTests : TestPool
refcTests = MkTestPool [C]
@ -179,7 +184,10 @@ refcTests = MkTestPool [C]
racketTests : TestPool
racketTests = MkTestPool [Racket]
["concurrency001"]
[ "forkjoin001"
, "semaphores001", "semaphores002"
, "futures001"
]
nodeTests : TestPool
nodeTests = MkTestPool [Node]
@ -195,15 +203,18 @@ nodeTests = MkTestPool [Node]
ideModeTests : TestPool
ideModeTests = MkTestPool []
[ "ideMode001", "ideMode002", "ideMode003", "ideMode004" ]
[ "ideMode001", "ideMode002", "ideMode003", "ideMode004"
]
preludeTests : TestPool
preludeTests = MkTestPool []
[ "reg001" ]
[ "reg001"
]
templateTests : TestPool
templateTests = MkTestPool []
[ "simple-test", "ttimp", "with-ipkg" ]
[ "simple-test", "ttimp", "with-ipkg"
]
main : IO ()
main = runner

View File

@ -0,0 +1,15 @@
module Main
import System
import System.Concurrency
main : IO ()
main = do
barrier <- makeBarrier 3
threadIDs <- for [1,2,3] $ \n => fork $ do
putStrLn "Hello"
barrierWait barrier
putStrLn "Goodbye"
for threadIDs $ \threadID =>
threadWait threadID
sleep 1

View File

@ -0,0 +1,6 @@
Hello
Hello
Hello
Goodbye
Goodbye
Goodbye

View File

View File

@ -0,0 +1 @@
$1 --no-banner --no-color --console-width 0 --cg chez Main.idr --exec main

View File

@ -0,0 +1,16 @@
module Main
import System
import System.Concurrency
main : IO ()
main = do
chan <- makeChannel
threadID <- fork $ do
channelPut chan "Hello"
channelPut chan "Goodbye"
val <- channelGet chan
putStrLn val
val <- channelGet chan
putStrLn val
sleep 1

View File

@ -0,0 +1,2 @@
Hello
Goodbye

View File

@ -0,0 +1 @@
$1 --no-banner --no-color --console-width 0 --cg chez Main.idr --exec main

View File

@ -0,0 +1,11 @@
module Main
import System
main : IO ()
main = do
threadID <- fork $ do
sleep 1
putStrLn "Hello"
threadWait threadID
putStrLn "Goodbye"

View File

@ -0,0 +1,2 @@
Hello
Goodbye

View File

@ -0,0 +1 @@
$1 --no-banner --no-color --console-width 0 --cg chez Main.idr --exec main

View File

@ -0,0 +1,13 @@
module Main
import System.Concurrency
main : IO ()
main = do
sema <- makeSemaphore 0
semaphorePost sema
semaphorePost sema
putStrLn "Hello"
semaphoreWait sema
semaphoreWait sema
putStrLn "Goodbye"

View File

@ -0,0 +1,2 @@
Hello
Goodbye

View File

@ -0,0 +1 @@
$1 --no-banner --no-color --console-width 0 --cg chez Main.idr --exec main

View File

@ -0,0 +1,14 @@
module Main
import System.Concurrency
main : IO ()
main = do
sema <- makeSemaphore 0
fork $ do
putStrLn "Hello"
semaphorePost sema
semaphorePost sema
semaphoreWait sema
semaphoreWait sema
putStrLn "Goodbye"

View File

@ -0,0 +1,2 @@
Hello
Goodbye

View File

@ -0,0 +1 @@
$1 --no-banner --no-color --console-width 0 --cg chez Main.idr --exec main

View File

@ -4,7 +4,7 @@ import Linear
import Data.IORef
import Data.List
import System.Concurrency.Raw
import System.Concurrency
public export
data Actions : Type -> Type where

View File

@ -0,0 +1,15 @@
module Main
import System
import System.Concurrency
main : IO ()
main = do
barrier <- makeBarrier 3
threadIDs <- for [1,2,3] $ \n => fork $ do
putStrLn "Hello"
barrierWait barrier
putStrLn "Goodbye"
for threadIDs $ \threadID =>
threadWait threadID
sleep 1

View File

@ -0,0 +1,6 @@
Hello
Hello
Hello
Goodbye
Goodbye
Goodbye

View File

View File

@ -0,0 +1 @@
$1 --no-banner --no-color --console-width 0 --cg chez Main.idr --exec main

View File

@ -0,0 +1,16 @@
module Main
import System
import System.Concurrency
main : IO ()
main = do
chan <- makeChannel
threadID <- fork $ do
channelPut chan "Hello"
channelPut chan "Goodbye"
val <- channelGet chan
putStrLn val
val <- channelGet chan
putStrLn val
sleep 1

View File

@ -0,0 +1,2 @@
Hello
Goodbye

View File

@ -0,0 +1 @@
$1 --no-banner --no-color --console-width 0 --cg racket Main.idr --exec main

View File

@ -0,0 +1,21 @@
module Main
import Debug.Trace
import System
import System.Concurrency
main : IO ()
main = do
mutex <- makeMutex
cond <- makeCondition
threadID <- fork $ do
putStrLn "Hello"
conditionSignal cond
mutexAcquire mutex
conditionWait cond mutex
putStrLn "Goodbye"
mutexRelease mutex
threadWait threadID

View File

@ -0,0 +1,2 @@
Hello
Goodbye

View File

@ -0,0 +1 @@
$1 --no-banner --no-color --console-width 0 --cg racket Main.idr --exec main

View File

@ -0,0 +1,21 @@
module Main
import Debug.Trace
import System
import System.Concurrency
main : IO ()
main = do
mutex <- makeMutex
cond <- makeCondition
threadID <- fork $ do
mutexAcquire mutex
conditionWait cond mutex
putStrLn "Goodbye"
mutexRelease mutex
putStrLn "Hello"
conditionSignal cond
threadWait threadID

View File

@ -0,0 +1,2 @@
Hello
Goodbye

View File

@ -0,0 +1 @@
$1 --no-banner --no-color --console-width 0 --cg racket Main.idr --exec main

View File

@ -0,0 +1,28 @@
module Main
import Debug.Trace
import System
import System.Concurrency
main : IO ()
main = do
mutex <- makeMutex
cond <- makeCondition
thread1 <- fork $ do
mutexAcquire mutex
conditionWait cond mutex
putStrLn "Goodbye"
mutexRelease mutex
thread2 <- fork $ do
mutexAcquire mutex
conditionWait cond mutex
putStrLn "Goodbye"
mutexRelease mutex
putStrLn "Hello"
conditionBroadcast cond
threadWait thread1
threadWait thread2

View File

@ -0,0 +1,2 @@
Hello
Goodbye

View File

@ -0,0 +1 @@
$1 --no-banner --no-color --console-width 0 --cg racket Main.idr --exec main

View File

@ -0,0 +1,11 @@
module Main
import System
main : IO ()
main = do
threadID <- fork $ do
sleep 1
putStrLn "Hello"
threadWait threadID
putStrLn "Goodbye"

View File

@ -0,0 +1,2 @@
Hello
Goodbye

View File

@ -0,0 +1 @@
$1 --no-banner --no-color --console-width 0 --cg racket Main.idr --exec main

View File

@ -0,0 +1,13 @@
module Main
import System.Concurrency
main : IO ()
main = do
sema <- makeSemaphore 0
semaphorePost sema
semaphorePost sema
putStrLn "Hello"
semaphoreWait sema
semaphoreWait sema
putStrLn "Goodbye"

View File

@ -0,0 +1,2 @@
Hello
Goodbye

View File

@ -0,0 +1 @@
$1 --no-banner --no-color --console-width 0 --cg racket Main.idr --exec main

View File

@ -0,0 +1,14 @@
module Main
import System.Concurrency
main : IO ()
main = do
sema <- makeSemaphore 0
fork $ do
putStrLn "Hello"
semaphorePost sema
semaphorePost sema
semaphoreWait sema
semaphoreWait sema
putStrLn "Goodbye"

View File

@ -0,0 +1,2 @@
Hello
Goodbye

View File

@ -0,0 +1 @@
$1 --no-banner --no-color --console-width 0 --cg racket Main.idr --exec main