mirror of
https://github.com/idris-lang/Idris2.git
synced 2024-12-24 20:23:11 +03:00
222 lines
7.8 KiB
Idris
222 lines
7.8 KiB
Idris
module Channel
|
|
|
|
import Linear
|
|
import Data.IORef
|
|
import Data.List
|
|
|
|
import System.Concurrency
|
|
|
|
public export
|
|
data Actions : Type -> Type where
|
|
Send : (a : Type) -> (a -> Actions b) -> Actions b
|
|
Recv : (a : Type) -> (a -> Actions b) -> Actions b
|
|
Close : Actions ()
|
|
|
|
public export
|
|
dual : Actions a -> Actions a
|
|
dual (Send x f) = Recv x (\val => dual (f val))
|
|
dual (Recv x f) = Send x (\val => dual (f val))
|
|
dual Close = Close
|
|
|
|
public export
|
|
data Protocol : Type -> Type where
|
|
Request : (a : Type) -> Protocol a
|
|
Respond : (a : Type) -> Protocol a
|
|
Bind : Protocol a -> (a -> Protocol b) -> Protocol b
|
|
Loop : Inf (Protocol a) -> Protocol a
|
|
Done : Protocol ()
|
|
|
|
public export
|
|
(>>=) : Protocol a -> (a -> Protocol b) -> Protocol b
|
|
(>>=) = Bind
|
|
|
|
public export
|
|
(>>) : Protocol a -> Protocol b -> Protocol b
|
|
ma >> mb = ma >>= \ _ => mb
|
|
|
|
public export
|
|
ClientK : Protocol a -> (a -> Actions b) -> Actions b
|
|
ClientK (Request a) k = Send a k
|
|
ClientK (Respond a) k = Recv a k
|
|
ClientK (Bind act next) k = ClientK act (\res => ClientK (next res) k)
|
|
ClientK (Loop proto) k = ClientK proto k
|
|
ClientK Done k = k ()
|
|
|
|
public export
|
|
ServerK : Protocol a -> (a -> Actions b) -> Actions b
|
|
ServerK (Request a) k = Recv a k
|
|
ServerK (Respond a) k = Send a k
|
|
ServerK (Bind act next) k = ServerK act (\res => ServerK (next res) k)
|
|
ServerK (Loop proto) k = ServerK proto k
|
|
ServerK Done k = k ()
|
|
|
|
public export
|
|
AsClient : Protocol a -> Actions ()
|
|
AsClient proto = ClientK proto (\res => Close)
|
|
|
|
public export
|
|
AsServer : Protocol a -> Actions ()
|
|
AsServer proto = ServerK proto (\res => Close)
|
|
|
|
public export
|
|
data QueueEntry : Type where
|
|
Entry : (1 val : any) -> QueueEntry
|
|
|
|
data Stack : Type -> Type where
|
|
Nil : Stack a
|
|
(::) : (1 _ : a) -> (1 xs : Stack a) -> Stack a
|
|
|
|
reverseOnto : (1 _ : Stack a) -> (1 _ : Stack a) -> Stack a
|
|
reverseOnto acc [] = acc
|
|
reverseOnto acc (x::xs) = reverseOnto (x::acc) xs
|
|
|
|
reverse : (1 _ : Stack a) -> Stack a
|
|
reverse = reverseOnto []
|
|
|
|
public export
|
|
record RawMsgQueue where
|
|
constructor MkRawMsgQueue
|
|
1 inputStack : Stack QueueEntry
|
|
1 outputStack : Stack QueueEntry
|
|
|
|
newQueue : RawMsgQueue
|
|
newQueue = MkRawMsgQueue [] []
|
|
|
|
enqueue : (1 val : a) -> RawMsgQueue -> RawMsgQueue
|
|
enqueue item q = record { inputStack $= (Entry item ::) } q
|
|
|
|
dequeue : RawMsgQueue -> Maybe (RawMsgQueue, QueueEntry)
|
|
dequeue q
|
|
= case outputStack q of
|
|
[] => case reverse (inputStack q) of
|
|
[] => Nothing
|
|
(x :: xs) => Just (record { outputStack = xs, inputStack = [] } q, x)
|
|
(x :: xs) => Just (record { outputStack = xs } q, x)
|
|
|
|
public export
|
|
data Channel : {p : Protocol b} -> Actions a -> Type where
|
|
MkChannel : (lock : Mutex) ->
|
|
(cond_lock : Mutex) ->
|
|
(cond : Condition) ->
|
|
(myInbox : IORef RawMsgQueue) ->
|
|
(remoteInbox : IORef RawMsgQueue) -> Channel {p} actions
|
|
|
|
public export
|
|
Client : Protocol a -> Type
|
|
Client p = Channel {p} (AsClient p)
|
|
|
|
public export
|
|
Server : Protocol a -> Type
|
|
Server p = Channel {p} (AsServer p)
|
|
|
|
mkChannels : (p : Protocol a) -> One IO (Client p, Server p)
|
|
mkChannels p
|
|
= do clientInbox <- lift $ newIORef newQueue
|
|
serverInbox <- lift $ newIORef newQueue
|
|
lock <- lift $ makeMutex
|
|
cond_lock <- lift $ makeMutex
|
|
cond <- lift $ makeCondition
|
|
pure (MkChannel lock cond_lock cond clientInbox serverInbox,
|
|
MkChannel lock cond_lock cond serverInbox clientInbox)
|
|
|
|
export
|
|
send : (1 chan : Channel {p} (Send ty next)) -> (val : ty) ->
|
|
One IO (Channel {p} (next val))
|
|
send (MkChannel lock cond_lock cond local remote) val
|
|
= do lift $ mutexAcquire lock
|
|
q <- lift $ readIORef remote
|
|
lift $ writeIORef remote (enqueue val q)
|
|
lift $ mutexRelease lock
|
|
|
|
lift $ mutexAcquire cond_lock
|
|
lift $ conditionSignal cond
|
|
lift $ mutexRelease cond_lock
|
|
pure (MkChannel lock cond_lock cond local remote)
|
|
|
|
-- returns Nothing if the message isn't there
|
|
export
|
|
tryRecv : forall b, a .
|
|
{0 ty : Type} ->
|
|
{0 next : (ty -> Actions a)} ->
|
|
{0 p : Protocol b} ->
|
|
(1 chan : Channel {p} (Recv ty next)) ->
|
|
One IO (Res (Maybe ty) (\r => case r of
|
|
Nothing => Channel {p} (Recv ty next)
|
|
Just val => Channel {p} (next val)))
|
|
tryRecv (MkChannel lock cond_lock cond local remote)
|
|
= do lift $ mutexAcquire lock
|
|
rq <- lift $ readIORef local
|
|
case dequeue rq of
|
|
Nothing =>
|
|
do lift $ mutexRelease lock
|
|
pure (Nothing # MkChannel lock cond_lock cond local remote)
|
|
Just (rq', Entry {any} val) =>
|
|
do lift $ writeIORef local rq'
|
|
lift $ mutexRelease lock
|
|
pure (Just (believe_me {a=any} val) #
|
|
MkChannel lock cond_lock cond local remote)
|
|
|
|
-- blocks until the message is there
|
|
export
|
|
recv : (1 chan : Channel {p} (Recv ty next)) ->
|
|
One IO (Res ty (\val => Channel {p} (next val)))
|
|
recv (MkChannel lock cond_lock cond local remote)
|
|
= do lift $ mutexAcquire lock
|
|
rq <- lift $ readIORef local
|
|
case dequeue rq of
|
|
Nothing => -- ... no message, so wait for condition and try again
|
|
do lift $ mutexRelease lock
|
|
lift $ mutexAcquire cond_lock
|
|
lift $ conditionWait cond cond_lock
|
|
lift $ mutexRelease cond_lock
|
|
recv (MkChannel lock cond_lock cond local remote)
|
|
Just (rq', Entry {any} val) =>
|
|
do lift $ writeIORef local rq'
|
|
lift $ mutexRelease lock
|
|
pure (believe_me {a=any} val #
|
|
MkChannel lock cond_lock cond local remote)
|
|
|
|
export
|
|
timeoutRecv : forall b, a .
|
|
{0 ty : Type} ->
|
|
{0 next : (ty -> Actions a)} ->
|
|
{0 p : Protocol b} ->
|
|
(1 chan : Channel {p} (Recv ty next)) ->
|
|
(timeout : Int) ->
|
|
One IO (Res (Maybe ty) (\r => case r of
|
|
Nothing => Channel {p} (Recv ty next)
|
|
Just val => Channel {p} (next val)))
|
|
timeoutRecv (MkChannel lock cond_lock cond local remote) timeout
|
|
= do lift $ mutexAcquire lock
|
|
rq <- lift $ readIORef local
|
|
case dequeue rq of
|
|
Nothing => -- ... no message, so wait for condition and try again with tryRecv
|
|
do lift $ mutexRelease lock
|
|
lift $ mutexAcquire cond_lock
|
|
lift $ conditionWaitTimeout cond cond_lock timeout
|
|
lift $ mutexRelease cond_lock
|
|
res # chan <- tryRecv {ty} {next} (MkChannel lock cond_lock cond local remote)
|
|
case res of
|
|
Nothing => pure (Nothing # chan)
|
|
Just res => pure (Just res # chan)
|
|
Just (rq', Entry {any} val) =>
|
|
do lift $ writeIORef local rq'
|
|
lift $ mutexRelease lock
|
|
pure (Just (believe_me {a=any} val) #
|
|
MkChannel lock cond_lock cond local remote)
|
|
|
|
export
|
|
close : (1 chan : Channel Close) -> Any IO ()
|
|
close (MkChannel _ _ _ _ _) = pure ()
|
|
|
|
export
|
|
fork : {p : _} -> ((1 chan : Server p) -> Any IO ()) ->
|
|
One IO (Client p)
|
|
fork proc
|
|
= do (cchan, MkChannel a b c d e) <- mkChannels p
|
|
-- deconstruct and reconstruct is a hack to work around the fact that
|
|
-- 'run' doesn't express the function is only used once in its type, because
|
|
-- 'Monad' and 'Applicative' don't express linearity... ugh!
|
|
lift $ ignore $ fork (run (proc (MkChannel a b c d e)))
|
|
pure cchan
|