2020-05-19 20:25:18 +03:00
|
|
|
module Channel
|
|
|
|
|
|
|
|
import Linear
|
|
|
|
import Data.IORef
|
|
|
|
import Data.List
|
|
|
|
|
|
|
|
import System.Concurrency.Raw
|
|
|
|
|
|
|
|
public export
|
|
|
|
data Actions : Type -> Type where
|
|
|
|
Send : (a : Type) -> (a -> Actions b) -> Actions b
|
|
|
|
Recv : (a : Type) -> (a -> Actions b) -> Actions b
|
|
|
|
Close : Actions ()
|
|
|
|
|
|
|
|
public export
|
|
|
|
dual : Actions a -> Actions a
|
|
|
|
dual (Send x f) = Recv x (\val => dual (f val))
|
|
|
|
dual (Recv x f) = Send x (\val => dual (f val))
|
|
|
|
dual Close = Close
|
|
|
|
|
|
|
|
public export
|
|
|
|
data Protocol : Type -> Type where
|
|
|
|
Request : (a : Type) -> Protocol a
|
|
|
|
Respond : (a : Type) -> Protocol a
|
|
|
|
Bind : Protocol a -> (a -> Protocol b) -> Protocol b
|
|
|
|
Loop : Inf (Protocol a) -> Protocol a
|
|
|
|
Done : Protocol ()
|
|
|
|
|
|
|
|
public export
|
|
|
|
(>>=) : Protocol a -> (a -> Protocol b) -> Protocol b
|
|
|
|
(>>=) = Bind
|
|
|
|
|
|
|
|
public export
|
|
|
|
ClientK : Protocol a -> (a -> Actions b) -> Actions b
|
|
|
|
ClientK (Request a) k = Send a k
|
|
|
|
ClientK (Respond a) k = Recv a k
|
|
|
|
ClientK (Bind act next) k = ClientK act (\res => ClientK (next res) k)
|
|
|
|
ClientK (Loop proto) k = ClientK proto k
|
|
|
|
ClientK Done k = k ()
|
|
|
|
|
|
|
|
public export
|
|
|
|
ServerK : Protocol a -> (a -> Actions b) -> Actions b
|
|
|
|
ServerK (Request a) k = Recv a k
|
|
|
|
ServerK (Respond a) k = Send a k
|
|
|
|
ServerK (Bind act next) k = ServerK act (\res => ServerK (next res) k)
|
|
|
|
ServerK (Loop proto) k = ServerK proto k
|
|
|
|
ServerK Done k = k ()
|
|
|
|
|
|
|
|
public export
|
|
|
|
AsClient : Protocol a -> Actions ()
|
|
|
|
AsClient proto = ClientK proto (\res => Close)
|
|
|
|
|
|
|
|
public export
|
|
|
|
AsServer : Protocol a -> Actions ()
|
|
|
|
AsServer proto = ServerK proto (\res => Close)
|
|
|
|
|
|
|
|
public export
|
|
|
|
data QueueEntry : Type where
|
|
|
|
Entry : (1 val : any) -> QueueEntry
|
|
|
|
|
Remove linearity subtyping
It's disappointing to have to do this, but I think necessary because
various issue reports have shown it to be unsound (at least as far as
inference goes) and, at the very least, confusing. This patch brings us
back to the basic rules of QTT.
On the one hand, this makes the 1 multiplicity less useful, because it
means we can't flag arguments as being used exactly once which would be
useful for optimisation purposes as well as precision in the type. On
the other hand, it removes some complexity (and a hack) from
unification, and has the advantage of being correct! Also, I still
consider the 1 multiplicity an experiment.
We can still do interesting things like protocol state tracking, which
is my primary motivation at least.
Ideally, if the 1 multiplicity is going to be more generall useful,
we'll need some kind of way of doing multiplicity polymorphism in the
future. I don't think subtyping is the way (I've pretty much always come
to regret adding some form of subtyping).
Fixes #73 (and maybe some others).
2020-12-27 22:58:35 +03:00
|
|
|
data Stack : Type -> Type where
|
|
|
|
Nil : Stack a
|
|
|
|
(::) : (1 _ : a) -> (1 xs : Stack a) -> Stack a
|
|
|
|
|
|
|
|
reverseOnto : (1 _ : Stack a) -> (1 _ : Stack a) -> Stack a
|
|
|
|
reverseOnto acc [] = acc
|
|
|
|
reverseOnto acc (x::xs) = reverseOnto (x::acc) xs
|
|
|
|
|
|
|
|
reverse : (1 _ : Stack a) -> Stack a
|
|
|
|
reverse = reverseOnto []
|
|
|
|
|
2020-05-19 20:25:18 +03:00
|
|
|
public export
|
|
|
|
record RawMsgQueue where
|
|
|
|
constructor MkRawMsgQueue
|
Remove linearity subtyping
It's disappointing to have to do this, but I think necessary because
various issue reports have shown it to be unsound (at least as far as
inference goes) and, at the very least, confusing. This patch brings us
back to the basic rules of QTT.
On the one hand, this makes the 1 multiplicity less useful, because it
means we can't flag arguments as being used exactly once which would be
useful for optimisation purposes as well as precision in the type. On
the other hand, it removes some complexity (and a hack) from
unification, and has the advantage of being correct! Also, I still
consider the 1 multiplicity an experiment.
We can still do interesting things like protocol state tracking, which
is my primary motivation at least.
Ideally, if the 1 multiplicity is going to be more generall useful,
we'll need some kind of way of doing multiplicity polymorphism in the
future. I don't think subtyping is the way (I've pretty much always come
to regret adding some form of subtyping).
Fixes #73 (and maybe some others).
2020-12-27 22:58:35 +03:00
|
|
|
1 inputStack : Stack QueueEntry
|
|
|
|
1 outputStack : Stack QueueEntry
|
2020-05-19 20:25:18 +03:00
|
|
|
|
|
|
|
newQueue : RawMsgQueue
|
|
|
|
newQueue = MkRawMsgQueue [] []
|
|
|
|
|
|
|
|
enqueue : (1 val : a) -> RawMsgQueue -> RawMsgQueue
|
|
|
|
enqueue item q = record { inputStack $= (Entry item ::) } q
|
|
|
|
|
|
|
|
dequeue : RawMsgQueue -> Maybe (RawMsgQueue, QueueEntry)
|
|
|
|
dequeue q
|
|
|
|
= case outputStack q of
|
|
|
|
[] => case reverse (inputStack q) of
|
|
|
|
[] => Nothing
|
|
|
|
(x :: xs) => Just (record { outputStack = xs, inputStack = [] } q, x)
|
|
|
|
(x :: xs) => Just (record { outputStack = xs } q, x)
|
|
|
|
|
|
|
|
public export
|
|
|
|
data Channel : {p : Protocol b} -> Actions a -> Type where
|
|
|
|
MkChannel : (lock : Mutex) ->
|
|
|
|
(cond_lock : Mutex) ->
|
|
|
|
(cond : Condition) ->
|
|
|
|
(myInbox : IORef RawMsgQueue) ->
|
|
|
|
(remoteInbox : IORef RawMsgQueue) -> Channel {p} actions
|
|
|
|
|
|
|
|
public export
|
|
|
|
Client : Protocol a -> Type
|
|
|
|
Client p = Channel {p} (AsClient p)
|
|
|
|
|
|
|
|
public export
|
|
|
|
Server : Protocol a -> Type
|
|
|
|
Server p = Channel {p} (AsServer p)
|
|
|
|
|
|
|
|
mkChannels : (p : Protocol a) -> One IO (Client p, Server p)
|
|
|
|
mkChannels p
|
|
|
|
= do clientInbox <- lift $ newIORef newQueue
|
|
|
|
serverInbox <- lift $ newIORef newQueue
|
|
|
|
lock <- lift $ makeMutex
|
|
|
|
cond_lock <- lift $ makeMutex
|
|
|
|
cond <- lift $ makeCondition
|
|
|
|
pure (MkChannel lock cond_lock cond clientInbox serverInbox,
|
|
|
|
MkChannel lock cond_lock cond serverInbox clientInbox)
|
|
|
|
|
|
|
|
export
|
Remove linearity subtyping
It's disappointing to have to do this, but I think necessary because
various issue reports have shown it to be unsound (at least as far as
inference goes) and, at the very least, confusing. This patch brings us
back to the basic rules of QTT.
On the one hand, this makes the 1 multiplicity less useful, because it
means we can't flag arguments as being used exactly once which would be
useful for optimisation purposes as well as precision in the type. On
the other hand, it removes some complexity (and a hack) from
unification, and has the advantage of being correct! Also, I still
consider the 1 multiplicity an experiment.
We can still do interesting things like protocol state tracking, which
is my primary motivation at least.
Ideally, if the 1 multiplicity is going to be more generall useful,
we'll need some kind of way of doing multiplicity polymorphism in the
future. I don't think subtyping is the way (I've pretty much always come
to regret adding some form of subtyping).
Fixes #73 (and maybe some others).
2020-12-27 22:58:35 +03:00
|
|
|
send : (1 chan : Channel {p} (Send ty next)) -> (val : ty) ->
|
2020-05-19 20:25:18 +03:00
|
|
|
One IO (Channel {p} (next val))
|
|
|
|
send (MkChannel lock cond_lock cond local remote) val
|
|
|
|
= do lift $ mutexAcquire lock
|
|
|
|
q <- lift $ readIORef remote
|
|
|
|
lift $ writeIORef remote (enqueue val q)
|
|
|
|
lift $ mutexRelease lock
|
|
|
|
|
|
|
|
lift $ mutexAcquire cond_lock
|
|
|
|
lift $ conditionSignal cond
|
|
|
|
lift $ mutexRelease cond_lock
|
|
|
|
pure (MkChannel lock cond_lock cond local remote)
|
|
|
|
|
|
|
|
-- returns Nothing if the message isn't there
|
|
|
|
export
|
|
|
|
tryRecv : forall b, a .
|
|
|
|
{0 ty : Type} ->
|
|
|
|
{0 next : (ty -> Actions a)} ->
|
|
|
|
{0 p : Protocol b} ->
|
|
|
|
(1 chan : Channel {p} (Recv ty next)) ->
|
|
|
|
One IO (Res (Maybe ty) (\r => case r of
|
|
|
|
Nothing => Channel {p} (Recv ty next)
|
|
|
|
Just val => Channel {p} (next val)))
|
|
|
|
tryRecv (MkChannel lock cond_lock cond local remote)
|
|
|
|
= do lift $ mutexAcquire lock
|
|
|
|
rq <- lift $ readIORef local
|
|
|
|
case dequeue rq of
|
|
|
|
Nothing =>
|
|
|
|
do lift $ mutexRelease lock
|
2020-06-12 13:18:12 +03:00
|
|
|
pure (Nothing # MkChannel lock cond_lock cond local remote)
|
2020-05-19 20:25:18 +03:00
|
|
|
Just (rq', Entry {any} val) =>
|
|
|
|
do lift $ writeIORef local rq'
|
|
|
|
lift $ mutexRelease lock
|
2020-06-12 13:18:12 +03:00
|
|
|
pure (Just (believe_me {a=any} val) #
|
2020-05-19 20:25:18 +03:00
|
|
|
MkChannel lock cond_lock cond local remote)
|
|
|
|
|
|
|
|
-- blocks until the message is there
|
|
|
|
export
|
|
|
|
recv : (1 chan : Channel {p} (Recv ty next)) ->
|
|
|
|
One IO (Res ty (\val => Channel {p} (next val)))
|
|
|
|
recv (MkChannel lock cond_lock cond local remote)
|
|
|
|
= do lift $ mutexAcquire lock
|
|
|
|
rq <- lift $ readIORef local
|
|
|
|
case dequeue rq of
|
|
|
|
Nothing => -- ... no message, so wait for condition and try again
|
|
|
|
do lift $ mutexRelease lock
|
|
|
|
lift $ mutexAcquire cond_lock
|
|
|
|
lift $ conditionWait cond cond_lock
|
|
|
|
lift $ mutexRelease cond_lock
|
|
|
|
recv (MkChannel lock cond_lock cond local remote)
|
|
|
|
Just (rq', Entry {any} val) =>
|
|
|
|
do lift $ writeIORef local rq'
|
|
|
|
lift $ mutexRelease lock
|
2020-06-12 13:18:12 +03:00
|
|
|
pure (believe_me {a=any} val #
|
2020-05-19 20:25:18 +03:00
|
|
|
MkChannel lock cond_lock cond local remote)
|
|
|
|
|
|
|
|
export
|
|
|
|
timeoutRecv : forall b, a .
|
|
|
|
{0 ty : Type} ->
|
|
|
|
{0 next : (ty -> Actions a)} ->
|
|
|
|
{0 p : Protocol b} ->
|
|
|
|
(1 chan : Channel {p} (Recv ty next)) ->
|
|
|
|
(timeout : Int) ->
|
|
|
|
One IO (Res (Maybe ty) (\r => case r of
|
|
|
|
Nothing => Channel {p} (Recv ty next)
|
|
|
|
Just val => Channel {p} (next val)))
|
|
|
|
timeoutRecv (MkChannel lock cond_lock cond local remote) timeout
|
|
|
|
= do lift $ mutexAcquire lock
|
|
|
|
rq <- lift $ readIORef local
|
|
|
|
case dequeue rq of
|
|
|
|
Nothing => -- ... no message, so wait for condition and try again with tryRecv
|
|
|
|
do lift $ mutexRelease lock
|
|
|
|
lift $ mutexAcquire cond_lock
|
|
|
|
lift $ conditionWaitTimeout cond cond_lock timeout
|
|
|
|
lift $ mutexRelease cond_lock
|
2020-06-12 13:18:12 +03:00
|
|
|
res # chan <- tryRecv {ty} {next} (MkChannel lock cond_lock cond local remote)
|
2020-05-19 20:25:18 +03:00
|
|
|
case res of
|
2020-06-12 13:18:12 +03:00
|
|
|
Nothing => pure (Nothing # chan)
|
|
|
|
Just res => pure (Just res # chan)
|
2020-05-19 20:25:18 +03:00
|
|
|
Just (rq', Entry {any} val) =>
|
|
|
|
do lift $ writeIORef local rq'
|
|
|
|
lift $ mutexRelease lock
|
2020-06-12 13:18:12 +03:00
|
|
|
pure (Just (believe_me {a=any} val) #
|
2020-05-19 20:25:18 +03:00
|
|
|
MkChannel lock cond_lock cond local remote)
|
|
|
|
|
|
|
|
export
|
|
|
|
close : (1 chan : Channel Close) -> Any IO ()
|
|
|
|
close (MkChannel _ _ _ _ _) = pure ()
|
|
|
|
|
|
|
|
export
|
|
|
|
fork : {p : _} -> ((1 chan : Server p) -> Any IO ()) ->
|
|
|
|
One IO (Client p)
|
|
|
|
fork proc
|
|
|
|
= do (cchan, MkChannel a b c d e) <- mkChannels p
|
|
|
|
-- deconstruct and reconstruct is a hack to work around the fact that
|
|
|
|
-- 'run' doesn't express the function is only used once in its type, because
|
|
|
|
-- 'Monad' and 'Applicative' don't express linearity... ugh!
|
|
|
|
lift $ fork (run (proc (MkChannel a b c d e)))
|
|
|
|
pure cchan
|