mirror of
https://github.com/ilyakooo0/urbit.git
synced 2025-01-05 22:03:50 +03:00
ames: fix ack queueing
This commit is contained in:
parent
64e8318657
commit
dbeb94f179
@ -422,6 +422,13 @@
|
||||
:: $naxplanation: nack trace; explains which message failed and why
|
||||
::
|
||||
+$ naxplanation [=message-num =error]
|
||||
:: $ack: positive ack, nack packet, or nack trace
|
||||
::
|
||||
+$ ack
|
||||
$% [%ok ~]
|
||||
[%nack ~]
|
||||
[%naxplanation =error]
|
||||
==
|
||||
::
|
||||
+| %statics
|
||||
::
|
||||
@ -520,7 +527,7 @@
|
||||
:: When we hear a message ack (positive or negative), we treat that
|
||||
:: as though all fragments have been acked. If this message is not
|
||||
:: .current, then it's a future message and .current has not yet been
|
||||
:: acked, so we place the message in .queued-message-acks.
|
||||
:: acked, so we place the ack in .queued-message-acks.
|
||||
::
|
||||
:: If we hear a message ack before we've sent all the
|
||||
:: fragments for that message, clear .unsent-fragments. If the
|
||||
@ -561,7 +568,7 @@
|
||||
next=_`message-num`1
|
||||
unsent-messages=(qeu message-blob)
|
||||
unsent-fragments=(list static-fragment)
|
||||
queued-message-acks=(map message-num ok=?)
|
||||
queued-message-acks=(map message-num ack)
|
||||
=packet-pump-state
|
||||
==
|
||||
+$ static-fragment
|
||||
@ -723,11 +730,13 @@
|
||||
::
|
||||
:: %memo: packetize and send application-level message
|
||||
:: %hear: handle receipt of ack on fragment or message
|
||||
:: %near: handle receipt of naxplanation
|
||||
:: %wake: handle timer firing
|
||||
::
|
||||
+$ message-pump-task
|
||||
$% [%memo =message-blob]
|
||||
[%hear =message-num =ack-meat]
|
||||
[%near =naxplanation]
|
||||
[%wake ~]
|
||||
==
|
||||
:: $message-pump-gift: effect from |message-pump
|
||||
@ -738,7 +747,7 @@
|
||||
:: %rest: cancel timer at .date
|
||||
::
|
||||
+$ message-pump-gift
|
||||
$% [%done =message-num ok=?]
|
||||
$% [%done =message-num error=(unit error)]
|
||||
[%send =static-fragment]
|
||||
[%wait date=@da]
|
||||
[%rest date=@da]
|
||||
@ -2076,25 +2085,9 @@
|
||||
:: flip .bone's second bit to find referenced flow
|
||||
::
|
||||
=/ target-bone=^bone (mix 0b10 bone)
|
||||
=/ nax-key [target-bone message-num.naxplanation]
|
||||
:: if we haven't heard a message nack, pretend we have
|
||||
:: notify |message-pump that this message got naxplained
|
||||
::
|
||||
:: The nack-trace message counts as a valid message nack on
|
||||
:: the original failed message.
|
||||
::
|
||||
:: This prevents us from having to wait for a message nack
|
||||
:: packet, which would mean we couldn't immediately ack the
|
||||
:: nack-trace message, which would in turn violate the
|
||||
:: semantics of backward flows.
|
||||
::
|
||||
=? peer-core !(~(has in nax.peer-state) nax-key)
|
||||
%- run-message-pump
|
||||
[target-bone %hear message-num.naxplanation %| ok=%.n lag=`@dr`0]
|
||||
:: clear the nack from our state and relay to vane
|
||||
::
|
||||
=. nax.peer-state (~(del in nax.peer-state) nax-key)
|
||||
::
|
||||
(emit (got-duct target-bone) %give %done `error.naxplanation)
|
||||
(run-message-pump target-bone %near naxplanation)
|
||||
:: +on-sink-plea: handle request message received by |message-sink
|
||||
::
|
||||
++ on-sink-plea
|
||||
@ -2162,8 +2155,10 @@
|
||||
%hear
|
||||
?- -.ack-meat.task
|
||||
%& (on-hear [message-num fragment-num=p.ack-meat]:task)
|
||||
%| (on-done [message-num [ok lag]:p.ack-meat]:task)
|
||||
== ==
|
||||
%| (on-done [message-num ?:(ok.p.ack-meat [%ok ~] [%nack ~])]:task)
|
||||
==
|
||||
%near (on-done [message-num %naxplanation error]:naxplanation.task)
|
||||
==
|
||||
:: +on-memo: handle request to send a message
|
||||
::
|
||||
++ on-memo
|
||||
@ -2185,11 +2180,16 @@
|
||||
(run-packet-pump %hear message-num fragment-num)
|
||||
:: +on-done: handle message acknowledgment
|
||||
::
|
||||
:: A nack-trace message counts as a valid message nack on the
|
||||
:: original failed message.
|
||||
::
|
||||
:: This prevents us from having to wait for a message nack packet,
|
||||
:: which would mean we couldn't immediately ack the nack-trace
|
||||
:: message, which would in turn violate the semantics of backward
|
||||
:: flows.
|
||||
::
|
||||
++ on-done
|
||||
:: check-old: loop terminator variable
|
||||
::
|
||||
=/ check-old=? %.y
|
||||
|= [=message-num ok=? lag=@dr]
|
||||
|= [=message-num =ack]
|
||||
^+ message-pump
|
||||
:: unsent messages from the future should never get acked
|
||||
::
|
||||
@ -2199,18 +2199,6 @@
|
||||
?: (lth message-num current.state)
|
||||
%- (trace snd.veb |.("duplicate done {<current.state message-num>}"))
|
||||
message-pump
|
||||
:: future nack implies positive ack on all earlier messages
|
||||
::
|
||||
?: &(!ok check-old)
|
||||
|- ^+ message-pump
|
||||
:: base case: current message got nacked; handle same as ack
|
||||
::
|
||||
?: =(message-num current.state)
|
||||
^$(check-old %.n)
|
||||
:: recursive case: future message got nacked
|
||||
::
|
||||
=. message-pump ^$(ok %.y, message-num current.state)
|
||||
$
|
||||
:: ignore duplicate and future acks
|
||||
::
|
||||
?. (is-message-num-in-range message-num)
|
||||
@ -2224,27 +2212,40 @@
|
||||
~
|
||||
:: clear all packets from this message from the packet pump
|
||||
::
|
||||
=. message-pump (run-packet-pump %done message-num lag)
|
||||
=. message-pump (run-packet-pump %done message-num lag=*@dr)
|
||||
:: enqueue this ack to be sent back to local client vane
|
||||
::
|
||||
=. queued-message-acks.state
|
||||
(~(put by queued-message-acks.state) message-num ok)
|
||||
:: Don't clobber a naxplanation with just a nack packet.
|
||||
::
|
||||
=? queued-message-acks.state
|
||||
=/ old (~(get by queued-message-acks.state) message-num)
|
||||
!?=([~ %naxplanation *] old)
|
||||
(~(put by queued-message-acks.state) message-num ack)
|
||||
:: emit local acks from .queued-message-acks until incomplete
|
||||
::
|
||||
|- ^+ message-pump
|
||||
:: if .current hasn't been fully acked, we're done
|
||||
::
|
||||
?~ ack=(~(get by queued-message-acks.state) current.state)
|
||||
?~ cur=(~(get by queued-message-acks.state) current.state)
|
||||
message-pump
|
||||
:: .current is complete; pop, emit local ack, and try next message
|
||||
::
|
||||
=. queued-message-acks.state
|
||||
(~(del by queued-message-acks.state) current.state)
|
||||
:: give %done to vane
|
||||
:: give %done to vane if we're ready
|
||||
::
|
||||
=. message-pump (give %done current.state ok.u.ack)
|
||||
?- -.u.cur
|
||||
%ok
|
||||
=. message-pump (give %done current.state ~)
|
||||
$(current.state +(current.state))
|
||||
::
|
||||
$(current.state +(current.state))
|
||||
%nack
|
||||
message-pump
|
||||
::
|
||||
%naxplanation
|
||||
=. message-pump (give %done current.state `error.u.cur)
|
||||
$(current.state +(current.state))
|
||||
==
|
||||
:: +is-message-num-in-range: %.y unless duplicate or future ack
|
||||
::
|
||||
++ is-message-num-in-range
|
||||
|
Loading…
Reference in New Issue
Block a user