track and clear nax in |message-still

This commit is contained in:
Ted Blackman 2019-06-18 17:38:25 -07:00
parent 1d8f29ee4c
commit 47665d5843

View File

@ -530,6 +530,7 @@
last-heard=message-num
pending-vane-ack=(qeu [=message-num =message])
live-messages=(map message-num partial-rcv-message)
nax=(set message-num)
==
:: $partial-rcv-message: message for which we've received some fragments
::
@ -740,10 +741,12 @@
:: $message-still-task: job for |message-still
::
:: %done: receive confirmation from vane of processing or failure
:: %forget-nack: clear .message-num from .nax.state
:: %hear: handle receiving a message fragment packet
::
+$ message-still-task
$% [%done ok=?]
[%forget-nack =message-num]
[%hear =lane =shut-packet]
==
:: $message-still-gift: effect from |message-still
@ -846,13 +849,34 @@
++ event-core .
++ abet [(flop moves) ames-state]
++ emit |=(=move event-core(moves [move moves]))
::
:: +on-aver: handle notice from vane that it processed a message
::
++ on-aver
|= [=wire error=(unit error)]
^+ event-core
::
!!
=+ ^- [her=ship =bone] (parse-bone-wire wire)
::
=/ =peer-state
=- ?>(?=(%known -<) ->)
(~(got by peers.ames-state) her)
::
=/ =channel [[our her] now +>.ames-state -.peer-state]
=/ peer-core (make-peer-core peer-state channel)
=/ ok=? ?=(~ error)
:: send message (n)ack packet
::
=. event-core abet:(run-message-still:peer-core bone %done ok)
:: if positive ack, we're done
::
?: ok
event-core
:: send nack-trace message on designated bone
::
=/ nack-trace-bone=^bone (mix 0b10 bone)
=. peer-core (make-peer-core peer-state channel)
::
abet:(run-message-pump:peer-core nack-trace-bone %send /a/nax error)
::
::
++ on-hear
@ -1103,6 +1127,10 @@
++ process-ack-message
|= [=message-num ok=?]
^+ peer-core
:: is this bone a nack-trace bone?
::
?: =(1 (end 0 1 (rsh 0 1 bone)))
!!
:: positive ack gets emitted trivially
::
?: ok
@ -1175,7 +1203,8 @@
:: odd .bone; "request" message to pass to vane before acking
::
?: =(1 (end 0 1 bone))
=/ =wire msg-path
=/ =wire (make-bone-wire her.channel bone)
::
?- i.msg-path
%a ~| %pass-to-ames^her.channel !!
%c (emit duct %pass wire %c %buzz her.channel message.gift)
@ -1795,6 +1824,7 @@
::
?- -.task
%done (on-done ok.task)
%forget-nack (on-forget-nack message-num.task)
%hear (on-hear [lane shut-packet]:task)
==
:: +on-hear: receive message fragment, possibly completing message
@ -1822,9 +1852,10 @@
:: single packet ack
::
(give %send-ack seq %& fragment-num)
:: whole message (n)ack TODO nack lookup
:: whole message (n)ack
::
(give %send-ack seq %| ok=%.y lag=`@dr`0)
=/ ok=? (~(has in nax.state) seq)
(give %send-ack seq %| ok lag=`@dr`0)
:: last-acked<seq<=last-heard; heard message, unprocessed
::
?: (lte seq last-heard.state)
@ -1894,16 +1925,14 @@
=. message-still (enqueue-to-vane seq message)
::
$(seq +(seq))
::
:: +enqueue-to-vane: enqueue message to be sent to local vane
::
++ enqueue-to-vane
|= [seq=message-num =message]
^+ message-still
::
=/ empty=? =(~ pending-vane-ack.state)
::
=. pending-vane-ack.state (~(put to pending-vane-ack.state) seq message)
::
?. empty
message-still
(give %hear-message seq message)
@ -1914,10 +1943,21 @@
^+ message-still
::
=^ pending pending-vane-ack.state ~(get to pending-vane-ack.state)
=. last-acked.state +(last-acked.state)
=/ =message-num message-num.p.pending
::
=. last-acked.state +(last-acked.state)
=? nax.state !ok (~(put in nax.state) message-num)
::
(give %send-ack message-num %| ok lag=`@dr`0)
::
::
++ on-forget-nack
|= =message-num
^+ message-still
::
=. nax.state (~(del in nax.state) message-num)
::
message-still
--
:: +assemble-fragments: concatenate fragments into a $message
::
@ -1951,6 +1991,21 @@
:+ (add 4 next-bone.ossuary)
(~(put by by-duct.ossuary) duct next-bone.ossuary)
(~(put by by-bone.ossuary) next-bone.ossuary duct)
:: +make-bone-wire: encode ship and bone in wire for sending to vane
::
++ make-bone-wire
|= [her=ship =bone]
^- wire
::
/bone/(scot %p her)/(scot %ud bone)
:: +parse-bone-wire: decode ship and bone from wire from local vane
::
++ parse-bone-wire
|= =wire
^- [her=ship =bone]
::
?> ?=([%bone @ @ ~] wire)
[`@p`(slav %p i.t.wire) `@ud`(slav %ud i.t.t.wire)]
:: +make-pump-timer-wire: construct wire for |packet-pump timer
::
++ make-pump-timer-wire