mirror of
https://github.com/ilyakooo0/urbit.git
synced 2024-12-03 02:35:52 +03:00
ames: simplify recork timer
The previous recork timer queued up %cork messages without sending them. It also relied on making sure pump timers didn't get set for recork bones. This was fragile. The new design enqueues up to one new %cork message per ship during each recork timer, based on the state of the flow. If the flow is closing but there are no outstanding messages in it, then it needs to be recorked. Flows will be recorked in ascending numerical order by bone.
This commit is contained in:
parent
aad5fa6fae
commit
54cd1a5eca
@ -607,7 +607,9 @@
|
||||
:: life: our $life; how many times we've rekeyed
|
||||
:: crypto-core: interface for encryption and signing
|
||||
:: bug: debug printing configuration
|
||||
:: corks: wires for cork flows pending publisher update
|
||||
:: corks(STALE):wires for cork flows pending publisher update
|
||||
::
|
||||
:: Note: .corks is only still present for unreleased migration reasons
|
||||
::
|
||||
+$ ames-state
|
||||
$: peers=(map ship ship-state)
|
||||
@ -764,7 +766,7 @@
|
||||
[%hear =message-num =ack-meat]
|
||||
[%near =naxplanation]
|
||||
[%prod ~]
|
||||
[%wake recork=?]
|
||||
[%wake ~]
|
||||
==
|
||||
:: $message-pump-gift: effect from |message-pump
|
||||
::
|
||||
@ -795,7 +797,7 @@
|
||||
$% [%hear =message-num =fragment-num]
|
||||
[%done =message-num lag=@dr]
|
||||
[%halt ~]
|
||||
[%wake recork=? current=message-num]
|
||||
[%wake current=message-num]
|
||||
[%prod ~]
|
||||
==
|
||||
:: $packet-pump-gift: effect from |packet-pump
|
||||
@ -1812,17 +1814,7 @@
|
||||
event-core
|
||||
(request-attestation u.ship)
|
||||
::
|
||||
|^
|
||||
?. ?=([%recork ~] wire) (handle-single-wire wire recork=|)
|
||||
=/ wires=(list ^wire) ~(tap in corks.ames-state)
|
||||
|- ^+ event-core
|
||||
?^ wires
|
||||
$(wires t.wires, event-core (handle-single-wire i.wires recork=&))
|
||||
(emit duct %pass /recork %b %wait `@da`(add now ~m20))
|
||||
::
|
||||
++ handle-single-wire
|
||||
|= [=^wire recork=?]
|
||||
^+ event-core
|
||||
?. ?=([%recork ~] wire)
|
||||
=/ res=(unit [her=ship =bone]) (parse-pump-timer-wire wire)
|
||||
?~ res
|
||||
%- (slog leaf+"ames: got timer for strange wire: {<wire>}" ~)
|
||||
@ -1835,11 +1827,46 @@
|
||||
[leaf+"ames: got timer for strange ship: {<her.u.res>}, ignoring" ~]
|
||||
::
|
||||
=/ =channel [[our her.u.res] now channel-state -.u.state]
|
||||
::
|
||||
=< abet
|
||||
%- on-wake:(make-peer-core u.state channel)
|
||||
[recork bone.u.res error]
|
||||
--
|
||||
abet:(on-wake:(make-peer-core u.state channel) bone.u.res error)
|
||||
::
|
||||
=. event-core
|
||||
(emit duct %pass /recork %b %wait `@da`(add now ~m20))
|
||||
::
|
||||
=/ pez ~(tap by peers.ames-state)
|
||||
|- ^+ event-core
|
||||
=* ship-loop $
|
||||
?~ pez event-core
|
||||
=+ [her sat]=i.pez
|
||||
?. ?=(%known -.sat)
|
||||
ship-loop(pez t.pez)
|
||||
=* peer-state +.sat
|
||||
=/ boz (sort ~(tap in closing.peer-state) lte)
|
||||
|- ^+ event-core
|
||||
=* bone-loop $
|
||||
?~ boz ship-loop(pez t.pez)
|
||||
=/ pum=message-pump-state (~(got by snd.peer-state) i.boz)
|
||||
?: =(next current):pum
|
||||
bone-loop(boz t.boz)
|
||||
:: sanity check on the message pump state
|
||||
::
|
||||
?. ?& =(~ unsent-messages.pum)
|
||||
=(~ unsent-fragments.pum)
|
||||
=(~ live.packet-pump-state.pum)
|
||||
==
|
||||
~> %slog.0^leaf/"ames: incoherent pump state {<[her i.boz]>}"
|
||||
bone-loop(boz t.boz)
|
||||
:: no outstanding messages, so send a new %cork
|
||||
::
|
||||
:: TODO use +trace
|
||||
~> %slog.0^leaf/"ames: recork {<[her i.boz]>}"
|
||||
::
|
||||
=. event-core
|
||||
=/ =channel [[our her] now channel-state -.peer-state]
|
||||
=/ peer-core (make-peer-core peer-state channel)
|
||||
=/ =plea [%$ /flow [%cork ~]]
|
||||
abet:(on-memo:peer-core i.boz plea %plea)
|
||||
::
|
||||
ship-loop(pez t.pez)
|
||||
:: +on-init: first boot; subscribe to our info from jael
|
||||
::
|
||||
++ on-init
|
||||
@ -2454,15 +2481,8 @@
|
||||
:: +on-wake: handle timer expiration
|
||||
::
|
||||
++ on-wake
|
||||
|= [recork=? =bone error=(unit tang)]
|
||||
|= [=bone error=(unit tang)]
|
||||
^+ peer-core
|
||||
:: ignore spurious pump timers for %cork's
|
||||
::
|
||||
:: This is here to fix canaries that had spurious pump timers.
|
||||
::
|
||||
?: &(!recork (~(has in closing.peer-state) bone))
|
||||
~> %slog.0^leaf/"ames: dropping pump wake while recorking {<bone>}"
|
||||
peer-core
|
||||
:: if we previously errored out, print and reset timer for later
|
||||
::
|
||||
:: This really shouldn't happen, but if it does, make sure we
|
||||
@ -2531,7 +2551,7 @@
|
||||
peer-core
|
||||
:: maybe resend some timed out packets
|
||||
::
|
||||
(run-message-pump bone %wake recork)
|
||||
(run-message-pump bone %wake ~)
|
||||
:: +send-shut-packet: fire encrypted packet at rcvr and maybe sponsors
|
||||
::
|
||||
++ send-shut-packet
|
||||
@ -2633,7 +2653,6 @@
|
||||
::
|
||||
=. message-pump (run-packet-pump:message-pump %done message-num *@dr)
|
||||
=/ =wire (make-pump-timer-wire her.channel bone)
|
||||
=. corks.ames-state (~(del in corks.ames-state) wire)
|
||||
=/ nack-bone=^bone (mix 0b10 bone)
|
||||
=? rcv.peer-state (~(has by rcv.peer-state) nack-bone)
|
||||
:: if the publisher was behind we remove nacks received on that bone
|
||||
@ -2820,25 +2839,11 @@
|
||||
::
|
||||
?. (~(has in krocs.peer-state) target-bone)
|
||||
peer-core
|
||||
=/ =message-pump-state
|
||||
(~(gut by snd.peer-state) target-bone *message-pump-state)
|
||||
=/ message-pump
|
||||
(make-message-pump message-pump-state channel %.y target-bone)
|
||||
:: we don't process the gifts here and instead wait for
|
||||
:: the timer to handle the %cork plea added to the pump
|
||||
::
|
||||
=^ * message-pump-state
|
||||
%- work:message-pump
|
||||
%memo^(dedup-message (jim [%$ /flow [%cork ~]]))
|
||||
=. snd.peer-state
|
||||
(~(put by snd.peer-state) target-bone message-pump-state)
|
||||
:: if we get a naxplanation for a %cork, the publisher is behind
|
||||
:: receiving the OTA, so we set up a timer to retry in one day.
|
||||
:: receiving the OTA. The /recork timer will retry eventually.
|
||||
::
|
||||
%- %+ trace msg.veb
|
||||
|.("old publisher, resend %cork on bone={<target-bone>} in ~m20")
|
||||
=/ =wire (make-pump-timer-wire her.channel target-bone)
|
||||
=. corks.ames-state (~(put in corks.ames-state) wire)
|
||||
|.("old publisher, %cork nacked on bone={<target-bone>}")
|
||||
peer-core
|
||||
:: +on-sink-plea: handle request message received by |message-sink
|
||||
::
|
||||
@ -2912,15 +2917,12 @@
|
||||
|= task=message-pump-task
|
||||
^+ [gifts state]
|
||||
::
|
||||
=. message-pump (dispatch-task task)
|
||||
=. message-pump feed-packets
|
||||
:: don't set new pump timer if triggered by a recork timer
|
||||
::
|
||||
=? message-pump !=([%wake recork=&] task)
|
||||
(run-packet-pump %halt ~)
|
||||
::
|
||||
=. message-pump assert
|
||||
[(flop gifts) state]
|
||||
=~ (dispatch-task task)
|
||||
feed-packets
|
||||
(run-packet-pump %halt ~)
|
||||
assert
|
||||
[(flop gifts) state]
|
||||
==
|
||||
:: +dispatch-task: perform task-specific processing
|
||||
::
|
||||
++ dispatch-task
|
||||
@ -2930,7 +2932,7 @@
|
||||
?- -.task
|
||||
%prod (run-packet-pump %prod ~)
|
||||
%memo (on-memo message-blob.task)
|
||||
%wake (run-packet-pump %wake recork.task current.state)
|
||||
%wake (run-packet-pump %wake current.state)
|
||||
%hear
|
||||
?- -.ack-meat.task
|
||||
%&
|
||||
@ -3178,7 +3180,7 @@
|
||||
?- -.task
|
||||
%hear (on-hear [message-num fragment-num]:task)
|
||||
%done (on-done message-num.task)
|
||||
%wake (on-wake recork.task current.task)
|
||||
%wake (on-wake current.task)
|
||||
%prod on-prod
|
||||
%halt set-wake
|
||||
==
|
||||
@ -3205,7 +3207,7 @@
|
||||
:: +on-wake: handle packet timeout
|
||||
::
|
||||
++ on-wake
|
||||
|= [recork=? current=message-num]
|
||||
|= current=message-num
|
||||
^+ packet-pump
|
||||
:: assert temporal coherence
|
||||
::
|
||||
|
Loading…
Reference in New Issue
Block a user