Idris2/tests/idris2/real001/Channel.idr

222 lines
7.8 KiB
Idris

module Channel
import Linear
import Data.IORef
import Data.List
import System.Concurrency
public export
data Actions : Type -> Type where
Send : (a : Type) -> (a -> Actions b) -> Actions b
Recv : (a : Type) -> (a -> Actions b) -> Actions b
Close : Actions ()
public export
dual : Actions a -> Actions a
dual (Send x f) = Recv x (\val => dual (f val))
dual (Recv x f) = Send x (\val => dual (f val))
dual Close = Close
public export
data Protocol : Type -> Type where
Request : (a : Type) -> Protocol a
Respond : (a : Type) -> Protocol a
Bind : Protocol a -> (a -> Protocol b) -> Protocol b
Loop : Inf (Protocol a) -> Protocol a
Done : Protocol ()
public export
(>>=) : Protocol a -> (a -> Protocol b) -> Protocol b
(>>=) = Bind
public export
(>>) : Protocol a -> Protocol b -> Protocol b
ma >> mb = ma >>= \ _ => mb
public export
ClientK : Protocol a -> (a -> Actions b) -> Actions b
ClientK (Request a) k = Send a k
ClientK (Respond a) k = Recv a k
ClientK (Bind act next) k = ClientK act (\res => ClientK (next res) k)
ClientK (Loop proto) k = ClientK proto k
ClientK Done k = k ()
public export
ServerK : Protocol a -> (a -> Actions b) -> Actions b
ServerK (Request a) k = Recv a k
ServerK (Respond a) k = Send a k
ServerK (Bind act next) k = ServerK act (\res => ServerK (next res) k)
ServerK (Loop proto) k = ServerK proto k
ServerK Done k = k ()
public export
AsClient : Protocol a -> Actions ()
AsClient proto = ClientK proto (\res => Close)
public export
AsServer : Protocol a -> Actions ()
AsServer proto = ServerK proto (\res => Close)
public export
data QueueEntry : Type where
Entry : (1 val : any) -> QueueEntry
data Stack : Type -> Type where
Nil : Stack a
(::) : (1 _ : a) -> (1 xs : Stack a) -> Stack a
reverseOnto : (1 _ : Stack a) -> (1 _ : Stack a) -> Stack a
reverseOnto acc [] = acc
reverseOnto acc (x::xs) = reverseOnto (x::acc) xs
reverse : (1 _ : Stack a) -> Stack a
reverse = reverseOnto []
public export
record RawMsgQueue where
constructor MkRawMsgQueue
1 inputStack : Stack QueueEntry
1 outputStack : Stack QueueEntry
newQueue : RawMsgQueue
newQueue = MkRawMsgQueue [] []
enqueue : (1 val : a) -> RawMsgQueue -> RawMsgQueue
enqueue item q = { inputStack $= (Entry item ::) } q
dequeue : RawMsgQueue -> Maybe (RawMsgQueue, QueueEntry)
dequeue q
= case outputStack q of
[] => case reverse (inputStack q) of
[] => Nothing
(x :: xs) => Just ({ outputStack := xs, inputStack := [] } q, x)
(x :: xs) => Just ({ outputStack := xs } q, x)
public export
data Channel : {p : Protocol b} -> Actions a -> Type where
MkChannel : (lock : Mutex) ->
(cond_lock : Mutex) ->
(cond : Condition) ->
(myInbox : IORef RawMsgQueue) ->
(remoteInbox : IORef RawMsgQueue) -> Channel {p} actions
public export
Client : Protocol a -> Type
Client p = Channel {p} (AsClient p)
public export
Server : Protocol a -> Type
Server p = Channel {p} (AsServer p)
mkChannels : (p : Protocol a) -> One IO (Client p, Server p)
mkChannels p
= do clientInbox <- lift $ newIORef newQueue
serverInbox <- lift $ newIORef newQueue
lock <- lift $ makeMutex
cond_lock <- lift $ makeMutex
cond <- lift $ makeCondition
pure (MkChannel lock cond_lock cond clientInbox serverInbox,
MkChannel lock cond_lock cond serverInbox clientInbox)
export
send : (1 chan : Channel {p} (Send ty next)) -> (val : ty) ->
One IO (Channel {p} (next val))
send (MkChannel lock cond_lock cond local remote) val
= do lift $ mutexAcquire lock
q <- lift $ readIORef remote
lift $ writeIORef remote (enqueue val q)
lift $ mutexRelease lock
lift $ mutexAcquire cond_lock
lift $ conditionSignal cond
lift $ mutexRelease cond_lock
pure (MkChannel lock cond_lock cond local remote)
-- returns Nothing if the message isn't there
export
tryRecv : forall b, a .
{0 ty : Type} ->
{0 next : (ty -> Actions a)} ->
{0 p : Protocol b} ->
(1 chan : Channel {p} (Recv ty next)) ->
One IO (Res (Maybe ty) (\r => case r of
Nothing => Channel {p} (Recv ty next)
Just val => Channel {p} (next val)))
tryRecv (MkChannel lock cond_lock cond local remote)
= do lift $ mutexAcquire lock
rq <- lift $ readIORef local
case dequeue rq of
Nothing =>
do lift $ mutexRelease lock
pure (Nothing # MkChannel lock cond_lock cond local remote)
Just (rq', Entry {any} val) =>
do lift $ writeIORef local rq'
lift $ mutexRelease lock
pure (Just (believe_me {a=any} val) #
MkChannel lock cond_lock cond local remote)
-- blocks until the message is there
export
recv : (1 chan : Channel {p} (Recv ty next)) ->
One IO (Res ty (\val => Channel {p} (next val)))
recv (MkChannel lock cond_lock cond local remote)
= do lift $ mutexAcquire lock
rq <- lift $ readIORef local
case dequeue rq of
Nothing => -- ... no message, so wait for condition and try again
do lift $ mutexRelease lock
lift $ mutexAcquire cond_lock
lift $ conditionWait cond cond_lock
lift $ mutexRelease cond_lock
recv (MkChannel lock cond_lock cond local remote)
Just (rq', Entry {any} val) =>
do lift $ writeIORef local rq'
lift $ mutexRelease lock
pure (believe_me {a=any} val #
MkChannel lock cond_lock cond local remote)
export
timeoutRecv : forall b, a .
{0 ty : Type} ->
{0 next : (ty -> Actions a)} ->
{0 p : Protocol b} ->
(1 chan : Channel {p} (Recv ty next)) ->
(timeout : Int) ->
One IO (Res (Maybe ty) (\r => case r of
Nothing => Channel {p} (Recv ty next)
Just val => Channel {p} (next val)))
timeoutRecv (MkChannel lock cond_lock cond local remote) timeout
= do lift $ mutexAcquire lock
rq <- lift $ readIORef local
case dequeue rq of
Nothing => -- ... no message, so wait for condition and try again with tryRecv
do lift $ mutexRelease lock
lift $ mutexAcquire cond_lock
lift $ conditionWaitTimeout cond cond_lock timeout
lift $ mutexRelease cond_lock
res # chan <- tryRecv {ty} {next} (MkChannel lock cond_lock cond local remote)
case res of
Nothing => pure (Nothing # chan)
Just res => pure (Just res # chan)
Just (rq', Entry {any} val) =>
do lift $ writeIORef local rq'
lift $ mutexRelease lock
pure (Just (believe_me {a=any} val) #
MkChannel lock cond_lock cond local remote)
export
close : (1 chan : Channel Close) -> Any IO ()
close (MkChannel _ _ _ _ _) = pure ()
export
fork : {p : _} -> ((1 chan : Server p) -> Any IO ()) ->
One IO (Client p)
fork proc
= do (cchan, MkChannel a b c d e) <- mkChannels p
-- deconstruct and reconstruct is a hack to work around the fact that
-- 'run' doesn't express the function is only used once in its type, because
-- 'Monad' and 'Applicative' don't express linearity... ugh!
lift $ ignore $ fork (run (proc (MkChannel a b c d e)))
pure cchan