move renaming: %memo and %done

This commit is contained in:
Ted Blackman 2019-06-20 00:21:37 -07:00
parent d50ec5ddca
commit 40798d54a7
2 changed files with 119 additions and 116 deletions

View File

@ -449,7 +449,7 @@
:: The following equation is always true:
:: .next - .current == number of messages in flight
::
:: At the end of a task, |message-pump sends a %finalize task to
:: At the end of a task, |message-pump sends a %halt task to
:: |packet-pump, which can trigger a timer to be set or cleared based
:: on congestion control calculations. When it fires, the timer will
:: generally cause one or more packets to be resent.
@ -560,7 +560,7 @@
:: %sunk: a ship breached and has a new .rift
:: %vega: kernel reload notification
:: %wegh: request for memory usage report
:: %buzz: request to send message
:: %memo: request to send message
::
+$ task
$% [%born ~]
@ -571,7 +571,7 @@
[%sunk =ship =rift]
[%vega ~]
[%wegh ~]
[%buzz =ship =message]
[%memo =ship =message]
==
:: $gift: effect from ames
::
@ -589,7 +589,7 @@
::
:: When the local Ames receives either a positive message ack or a
:: combination of a nack and nack-trace (explained in more detail
:: below), it gives an %aver move to the local vane that had
:: below), it gives an %done move to the local vane that had
:: requested the original "forward flow" message be sent.
::
:: A "backward flow" message, which is similar to a response or a
@ -597,11 +597,11 @@
:: transmits the message to the peer's Ames, which gives the message
:: to the destination vane.
::
:: Ames will give a %buzz to a vane upon hearing the message from a
:: Ames will give a %memo to a vane upon hearing the message from a
:: remote. This message is a "backward flow" message, forming one of
:: potentially many responses to a "forward flow" message that a
:: local vane had passed to our local Ames, and which local Ames had
:: relayed to the remote. Ames gives the %buzz on the same duct the
:: relayed to the remote. Ames gives the %memo on the same duct the
:: local vane had originally used to pass Ames the "forward flow"
:: message.
::
@ -628,20 +628,20 @@
:: on all those messages, we apply positive acks when we hear the
:: nack-trace.
::
:: %buzz: message to vane from peer
:: %memo: message to vane from peer
:: %send: packet to unix
:: %aver: notify vane that peer (n)acked our message
:: %done: notify vane that peer (n)acked our message
::
+$ gift
$% [%buzz =message]
$% [%memo =message]
[%send =lane =blob]
[%aver error=(unit error)]
[%done error=(unit error)]
==
:: $note: request to other vane
::
:: TODO: specialize gall interface for subscription management
::
:: Ames passes a %buzz note to another vane when it receives a
:: Ames passes a %memo note to another vane when it receives a
:: message on a "forward flow" from a peer, originally passed from
:: one of the peer's vanes to the peer's Ames.
::
@ -651,20 +651,20 @@
[%rest date=@da]
== ==
$: %c
$% [%buzz =ship =message]
$% [%memo =ship =message]
== ==
$: %g
$% [%buzz =ship =message]
$% [%memo =ship =message]
== ==
$: %j
$% [%buzz =ship =message]
$% [%memo =ship =message]
[%pubs =ship]
[%turf ~]
[%vein ~]
== == ==
:: $sign: response from other vane
::
:: A vane gives a %buzz sign to Ames on a duct on which it had
:: A vane gives a %memo sign to Ames on a duct on which it had
:: previously received a message on a "forward flow". Ames will
:: transmit the message to the peer that had originally sent the
:: message on the forward flow. The peer's Ames will then give the
@ -676,16 +676,16 @@
$% [%wake error=(unit tang)]
== ==
$: %c
$% [%aver error=(unit error)]
[%buzz =message]
$% [%done error=(unit error)]
[%memo =message]
== ==
$: %g
$% [%aver error=(unit error)]
[%buzz =message]
$% [%done error=(unit error)]
[%memo =message]
== ==
$: %j
$% [%aver error=(unit error)]
[%buzz =message]
$% [%done error=(unit error)]
[%memo =message]
[%pubs public:able:jael]
[%turf turf=(list turf)]
[%vein =life vein=(map life ring)]
@ -693,38 +693,38 @@
:: $message-pump-task: job for |message-pump
::
:: %send: packetize and send application-level message
:: %hear-ack: handle receipt of ack on fragment or message
:: %hear: handle receipt of ack on fragment or message
:: %wake: handle timer firing
::
+$ message-pump-task
$% [%send =message]
[%hear-ack =message-num =ack-meat]
[%hear =message-num =ack-meat]
[%wake ~]
==
:: $message-pump-gift: effect from |message-pump
::
:: %ack-message: report message acknowledgment
:: %done: report message acknowledgment
:: %send: emit message fragment
:: %wait: set a new timer at .date
:: %rest: cancel timer at .date
::
+$ message-pump-gift
$% [%ack-message =message-num ok=?]
$% [%done =message-num ok=?]
[%send =static-fragment]
[%wait date=@da]
[%rest date=@da]
==
:: $packet-pump-task: job for |packet-pump
::
:: %hear-fragment-ack: deal with a packet acknowledgment
:: %hear-message-ack: deal with message acknowledgment
:: %finalize: finish event, possibly updating timer
:: %hear: deal with a packet acknowledgment
:: %done: deal with message acknowledgment
:: %halt: finish event, possibly updating timer
:: %wake: handle timer firing
::
+$ packet-pump-task
$% [%hear-fragment-ack =message-num =fragment-num]
[%hear-message-ack =message-num lag=@dr]
[%finalize ~]
$% [%hear =message-num =fragment-num]
[%done =message-num lag=@dr]
[%halt ~]
[%wake ~]
==
:: $packet-pump-gift: effect from |packet-pump
@ -741,23 +741,23 @@
:: $message-still-task: job for |message-still
::
:: %done: receive confirmation from vane of processing or failure
:: %forget-nack: clear .message-num from .nax.state
:: %drop: clear .message-num from .nax.state
:: %hear: handle receiving a message fragment packet
::
+$ message-still-task
$% [%done ok=?]
[%forget-nack =message-num]
[%drop =message-num]
[%hear =lane =shut-packet]
==
:: $message-still-gift: effect from |message-still
::
:: %hear-message: $message assembled from received packets, to be
:: sent to a local vane for processing
:: %send-ack: emit an ack packet
:: %memo: $message assembled from received packets,
:: to be sent to a local vane for processing
:: %send: emit an ack packet
::
+$ message-still-gift
$% [%hear-message =message-num =message]
[%send-ack =message-num =ack-meat]
$% [%memo =message-num =message]
[%send =message-num =ack-meat]
==
--
:: external vane interface
@ -792,7 +792,7 @@
%sunk !!
%vega !!
%wegh !!
%buzz (on-buzz:event-core [ship message]:task)
%memo (on-memo:event-core [ship message]:task)
==
::
[moves ames-gate]
@ -809,13 +809,13 @@
?- sign
[%b %wake *] (on-take-wake:event-core wire error.sign)
::
[%c %aver *] (on-aver:event-core wire error.sign)
[%g %aver *] (on-aver:event-core wire error.sign)
[%j %aver *] (on-aver:event-core wire error.sign)
[%c %done *] (on-aver:event-core wire error.sign)
[%g %done *] (on-aver:event-core wire error.sign)
[%j %done *] (on-aver:event-core wire error.sign)
::
[%c %buzz *] (on-take-buzz:event-core wire message.sign)
[%g %buzz *] (on-take-buzz:event-core wire message.sign)
[%j %buzz *] (on-take-buzz:event-core wire message.sign)
[%c %memo *] (on-take-memo:event-core wire message.sign)
[%g %memo *] (on-take-memo:event-core wire message.sign)
[%j %memo *] (on-take-memo:event-core wire message.sign)
::
[%j %pubs *] !!
[%j %turf *] !!
@ -1039,18 +1039,18 @@
(emit duct %pass /alien %j %pubs ship)
::
event-core
:: +on-take-buzz: receive request to give message to peer
:: +on-take-memo: receive request to give message to peer
::
++ on-take-buzz
++ on-take-memo
|= [=wire =message]
^+ event-core
::
=/ =ship ?>(?=([@ *] wire) `@p`(slav %p i.wire))
::
(on-buzz ship message)
:: +on-buzz: handle request to send message
(on-memo ship message)
:: +on-memo: handle request to send message
::
++ on-buzz
++ on-memo
|= [=ship =message]
^+ event-core
::
@ -1062,7 +1062,7 @@
=/ =peer-state +.u.rcvr-state
=/ =channel [[our ship] now +>.ames-state -.peer-state]
::
abet:(on-buzz:(make-peer-core peer-state channel) message)
abet:(on-memo:(make-peer-core peer-state channel) message)
:: +on-take-wake: receive wakeup or error notification from behn
::
++ on-take-wake
@ -1094,7 +1094,7 @@
::
?: ?=(%& -.meat.shut-packet)
(run-message-still bone %hear lane shut-packet)
(run-message-pump bone %hear-ack [message-num +.meat]:shut-packet)
(run-message-pump bone %hear [message-num +.meat]:shut-packet)
:: +run-message-pump: process $message-pump-task and its effects
::
++ run-message-pump
@ -1117,14 +1117,15 @@
=* gift i.pump-gifts
=. peer-core
?- -.gift
%ack-message (process-ack-message [message-num ok]:gift)
%send (process-send static-fragment.gift)
%wait (process-wait date.gift)
%rest (process-rest date.gift)
%done (process-done [message-num ok]:gift)
%send (process-send static-fragment.gift)
%wait (process-wait date.gift)
%rest (process-rest date.gift)
==
$(pump-gifts t.pump-gifts)
:: +process-done: handle |message-pump's report of message (n)ack
::
++ process-ack-message
++ process-done
|= [=message-num ok=?]
^+ peer-core
:: if odd bone, ack is on "subscription update" message; no-op
@ -1138,11 +1139,11 @@
::
=/ target-bone=^bone (mix 0b10 bone)
::
(run-message-still target-bone %forget-nack message-num)
(run-message-still target-bone %drop message-num)
:: not a nack-trace bone; positive ack gets emitted trivially
::
?: ok
(emit client-duct %give %aver error=~)
(emit client-duct %give %done error=~)
:: nack; enqueue, pending nack-trace message
::
=/ nax-key [bone message-num]
@ -1156,6 +1157,7 @@
=. nax.peer-state (~(put in nax.peer-state) nax-key)
::
peer-core
:: +process-send: emit ack packet requested by |message-pump
::
++ process-send
|= =static-fragment
@ -1169,6 +1171,7 @@
message-num.static-fragment
%& +.static-fragment
==
:: +process-wait: relay |message-pump's set-timer request
::
++ process-wait
|= date=@da
@ -1176,6 +1179,7 @@
::
=/ =wire (make-pump-timer-wire her.channel bone)
(emit client-duct %pass wire %b %wait date)
:: +process-rest: relay |message-pump's unset-timer request
::
++ process-rest
|= date=@da
@ -1205,7 +1209,20 @@
=* gift i.still-gifts
=. peer-core
?- -.gift
%hear-message
:: %send: emit ack packet as requested by |message-still
::
%send
%- send-shut-packet :*
our-life.channel
her-life.channel
bone
message-num.gift
%| ack-meat.gift
==
::
:: %memo: handle message received by |message-still
::
%memo
=/ msg-path=path path.message.gift
?> ?=([?(%a %c %g %j) *] msg-path)
:: odd .bone; "request" message to pass to vane before acking
@ -1215,9 +1232,9 @@
::
?- i.msg-path
%a ~| %pass-to-ames^her.channel !!
%c (emit duct %pass wire %c %buzz her.channel message.gift)
%g (emit duct %pass wire %g %buzz her.channel message.gift)
%j (emit duct %pass wire %j %buzz her.channel message.gift)
%c (emit duct %pass wire %c %memo her.channel message.gift)
%g (emit duct %pass wire %g %memo her.channel message.gift)
%j (emit duct %pass wire %j %memo her.channel message.gift)
==
:: even bone means backward flow; ack automatically
::
@ -1232,7 +1249,7 @@
::
=/ client-duct (~(got by by-bone.ossuary.peer-state) bone)
::
(emit client-duct %give %buzz message.gift)
(emit client-duct %give %memo message.gift)
:: .bone is a nack-trace; validate message
::
?> =(/a/nax `path`msg-path)
@ -1253,7 +1270,7 @@
::
=? peer-core !(~(has in nax.peer-state) nax-key)
%- run-message-pump
[target-bone %hear-ack message-num %| ok=%.n lag=`@dr`0]
[target-bone %hear message-num %| ok=%.n lag=`@dr`0]
:: clear the nack from our state and relay to vane
::
=. nax.peer-state (~(del in nax.peer-state) nax-key)
@ -1261,21 +1278,12 @@
=/ target-duct
(~(got by by-bone.ossuary.peer-state) target-bone)
::
(emit target-duct %give %aver `error)
::
%send-ack
%- send-shut-packet :*
our-life.channel
her-life.channel
bone
message-num.gift
%| ack-meat.gift
==
(emit target-duct %give %done `error)
==
$(still-gifts t.still-gifts)
:: +on-buzz: handle request to send message
:: +on-memo: handle request to send message
::
++ on-buzz
++ on-memo
|= =message
^+ peer-core
::
@ -1335,7 +1343,7 @@
::
=~ (dispatch-task task)
feed-packets
(run-packet-pump %finalize ~)
(run-packet-pump %halt ~)
[(flop gifts) state]
==
:: +dispatch-task: perform task-specific processing
@ -1347,13 +1355,10 @@
?- -.task
%send (on-send message.task)
%wake (run-packet-pump task)
%hear-ack
%hear
?- -.ack-meat.task
%& %- on-hear-fragment-ack
[message-num fragment-num=p.ack-meat]:task
::
%| %- on-hear-message-ack
[message-num [ok lag]:p.ack-meat]:task
%& (on-hear [message-num fragment-num=p.ack-meat]:task)
%| (on-done [message-num [ok lag]:p.ack-meat]:task)
== ==
:: +on-send: handle request to send a message
::
@ -1363,19 +1368,19 @@
::
=. unsent-messages.state (~(put to unsent-messages.state) message)
message-pump
:: +on-hear-fragment-ack: handle packet acknowledgment
:: +on-hear: handle packet acknowledgment
::
++ on-hear-fragment-ack
++ on-hear
|= [=message-num =fragment-num]
^+ message-pump
:: pass to |packet-pump unless duplicate or future ack
::
?. (is-message-num-in-range message-num)
message-pump
(run-packet-pump %hear-fragment-ack message-num fragment-num)
:: +on-hear-message-ack: handle message-level acknowledgment
(run-packet-pump %hear message-num fragment-num)
:: +on-done: handle message acknowledgment
::
++ on-hear-message-ack
++ on-done
|= [=message-num ok=? lag=@dr]
^+ message-pump
:: future nack implies positive ack on all earlier messages
@ -1403,7 +1408,7 @@
~
:: clear all packets from this message from the packet pump
::
=. message-pump (run-packet-pump %hear-message-ack message-num lag)
=. message-pump (run-packet-pump %done message-num lag)
:: enqueue this ack to be sent back to local client vane
::
=. queued-message-acks.state
@ -1420,7 +1425,7 @@
=. queued-message-acks.state
(~(del by queued-message-acks.state) current.state)
::
=. message-pump (give %ack-message current.state ok.u.ack)
=. message-pump (give %done current.state ok.u.ack)
::
$(current.state +(current.state))
:: +is-message-num-in-range: %.y unless duplicate or future ack
@ -1531,10 +1536,10 @@
=- [(flop gifts) state]
::
?- -.task
%hear-fragment-ack (on-hear-fragment-ack [message-num fragment-num]:task)
%hear-message-ack (on-hear-message-ack message-num.task)
%wake resend-lost(next-wake.state ~)
%finalize set-wake
%hear (on-hear [message-num fragment-num]:task)
%done (on-done message-num.task)
%wake resend-lost(next-wake.state ~)
%halt set-wake
==
:: +resend-lost: resend as many lost packets as .gauge will allow
::
@ -1640,7 +1645,7 @@
?~ sent packet-pump
=. packet-pump (give %send i.sent)
$(sent t.sent)
:: +on-hear-fragment-ack: handle ack on a live packet
:: +on-hear: handle ack on a live packet
::
:: Traverse .live from the head, marking packets as lost until we
:: find the acked packet. Then delete the acked packet and try to
@ -1649,7 +1654,7 @@
:: If we don't find the acked packet, no-op: no mutations, effects,
:: or resending of lost packets.
::
++ on-hear-fragment-ack
++ on-hear
|= [=message-num =fragment-num]
^+ packet-pump
::
@ -1694,9 +1699,9 @@
:+ new-val=`val(expiry `@da`0)
stop=%.n
[found=%.n metrics=(on-skipped-packet:gauge -.val)]
:: +on-hear-message-ack: apply ack to all packets from .message-num
:: +on-done: apply ack to all packets from .message-num
::
++ on-hear-message-ack
++ on-done
|= =message-num
^+ packet-pump
::
@ -1843,9 +1848,9 @@
=- [(flop gifts) state]
::
?- -.task
%done (on-done ok.task)
%forget-nack (on-forget-nack message-num.task)
%hear (on-hear [lane shut-packet]:task)
%done (on-done ok.task)
%drop (on-drop message-num.task)
%hear (on-hear [lane shut-packet]:task)
==
:: +on-hear: receive message fragment, possibly completing message
::
@ -1871,11 +1876,11 @@
?. is-last-fragment
:: single packet ack
::
(give %send-ack seq %& fragment-num)
(give %send seq %& fragment-num)
:: whole message (n)ack
::
=/ ok=? (~(has in nax.state) seq)
(give %send-ack seq %| ok lag=`@dr`0)
(give %send seq %| ok lag=`@dr`0)
:: last-acked<seq<=last-heard; heard message, unprocessed
::
?: (lte seq last-heard.state)
@ -1885,7 +1890,7 @@
message-still
:: ack all other packets
::
(give %send-ack seq %& fragment-num)
(give %send seq %& fragment-num)
:: last-heard<seq<10+last-heard; this is a packet in a live message
::
=/ =partial-rcv-message
@ -1906,7 +1911,7 @@
?: already-heard
?: is-last-fragment
message-still
(give %send-ack seq %& fragment-num)
(give %send seq %& fragment-num)
:: new fragment; store in state and check if message is done
::
=. num-received.partial-rcv-message
@ -1920,7 +1925,7 @@
:: ack any packet other than the last one, and continue either way
::
=? message-still !is-last-fragment
(give %send-ack seq %& fragment-num)
(give %send seq %& fragment-num)
:: enqueue all completed messages starting at +(last-heard.state)
::
|- ^+ message-still
@ -1955,7 +1960,7 @@
=. pending-vane-ack.state (~(put to pending-vane-ack.state) seq message)
?. empty
message-still
(give %hear-message seq message)
(give %memo seq message)
:: +on-done: handle confirmation of message processing from vane
::
++ on-done
@ -1968,10 +1973,10 @@
=. last-acked.state +(last-acked.state)
=? nax.state !ok (~(put in nax.state) message-num)
::
(give %send-ack message-num %| ok lag=`@dr`0)
(give %send message-num %| ok lag=`@dr`0)
:: +on-drop: drop .message-num from our .nax state
::
::
++ on-forget-nack
++ on-drop
|= =message-num
^+ message-still
::

View File

@ -227,7 +227,7 @@
::
=/ res1
%- call:alice-core
[~[/alice] *type %buzz ~doznec-doznec /g/talk [%first %post]]
[~[/alice] *type %memo ~doznec-doznec /g/talk [%first %post]]
::
::~& res1=-.res1
::
@ -252,7 +252,7 @@
::
=/ res3
%- take:bob-core
[/bone/~nec/1 ~[/bob] ** %g %aver ~]
[/bone/~nec/1 ~[/bob] ** %g %done ~]
::
::~& res3=-.res3
::
@ -271,10 +271,8 @@
%- call:alice-core
[~[/alice] *type %hear lane blob]
::
~& res4=-.res4
::
%+ expect-eq
!> :~ :+ ~[/alice] %give [%aver error=~]
!> :~ :+ ~[/alice] %give [%done error=~]
:+ ~[/alice] %pass
[/pump/~doznec-doznec/0 %b %rest ~2222.2.2..00.00.05]
==