Merge pull request #5923 from urbit/ted/recork-better

ames: recork better
This commit is contained in:
fang 2022-08-05 11:42:45 +02:00 committed by GitHub
commit 8eb8a1da29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -607,7 +607,9 @@
:: life: our $life; how many times we've rekeyed :: life: our $life; how many times we've rekeyed
:: crypto-core: interface for encryption and signing :: crypto-core: interface for encryption and signing
:: bug: debug printing configuration :: bug: debug printing configuration
:: corks: wires for cork flows pending publisher update :: corks(STALE):wires for cork flows pending publisher update
::
:: Note: .corks is only still present for unreleased migration reasons
:: ::
+$ ames-state +$ ames-state
$: peers=(map ship ship-state) $: peers=(map ship ship-state)
@ -764,7 +766,7 @@
[%hear =message-num =ack-meat] [%hear =message-num =ack-meat]
[%near =naxplanation] [%near =naxplanation]
[%prod ~] [%prod ~]
[%wake recork=?] [%wake ~]
== ==
:: $message-pump-gift: effect from |message-pump :: $message-pump-gift: effect from |message-pump
:: ::
@ -795,7 +797,7 @@
$% [%hear =message-num =fragment-num] $% [%hear =message-num =fragment-num]
[%done =message-num lag=@dr] [%done =message-num lag=@dr]
[%halt ~] [%halt ~]
[%wake recork=? current=message-num] [%wake current=message-num]
[%prod ~] [%prod ~]
== ==
:: $packet-pump-gift: effect from |packet-pump :: $packet-pump-gift: effect from |packet-pump
@ -1812,17 +1814,7 @@
event-core event-core
(request-attestation u.ship) (request-attestation u.ship)
:: ::
|^ ?. ?=([%recork ~] wire)
?. ?=([%recork ~] wire) (handle-single-wire wire recork=|)
=/ wires=(list ^wire) ~(tap in corks.ames-state)
|- ^+ event-core
?^ wires
$(wires t.wires, event-core (handle-single-wire i.wires recork=&))
(emit duct %pass /recork %b %wait `@da`(add now ~m20))
::
++ handle-single-wire
|= [=^wire recork=?]
^+ event-core
=/ res=(unit [her=ship =bone]) (parse-pump-timer-wire wire) =/ res=(unit [her=ship =bone]) (parse-pump-timer-wire wire)
?~ res ?~ res
%- (slog leaf+"ames: got timer for strange wire: {<wire>}" ~) %- (slog leaf+"ames: got timer for strange wire: {<wire>}" ~)
@ -1835,11 +1827,22 @@
[leaf+"ames: got timer for strange ship: {<her.u.res>}, ignoring" ~] [leaf+"ames: got timer for strange ship: {<her.u.res>}, ignoring" ~]
:: ::
=/ =channel [[our her.u.res] now channel-state -.u.state] =/ =channel [[our her.u.res] now channel-state -.u.state]
abet:(on-wake:(make-peer-core u.state channel) bone.u.res error)
:: ::
=< abet =. event-core
%- on-wake:(make-peer-core u.state channel) (emit duct %pass /recork %b %wait `@da`(add now ~m20))
[recork bone.u.res error] :: recork up to one bone per peer
-- ::
=/ pez ~(tap by peers.ames-state)
|- ^+ event-core
?~ pez event-core
=+ [her sat]=i.pez
?. ?=(%known -.sat)
$(pez t.pez)
=* peer-state +.sat
=/ =channel [[our her] now channel-state -.peer-state]
=/ peer-core (make-peer-core peer-state channel)
$(pez t.pez, event-core abet:recork-one:peer-core)
:: +on-init: first boot; subscribe to our info from jael :: +on-init: first boot; subscribe to our info from jael
:: ::
++ on-init ++ on-init
@ -2454,15 +2457,8 @@
:: +on-wake: handle timer expiration :: +on-wake: handle timer expiration
:: ::
++ on-wake ++ on-wake
|= [recork=? =bone error=(unit tang)] |= [=bone error=(unit tang)]
^+ peer-core ^+ peer-core
:: ignore spurious pump timers for %cork's
::
:: This is here to fix canaries that had spurious pump timers.
::
?: &(!recork (~(has in closing.peer-state) bone))
~> %slog.0^leaf/"ames: dropping pump wake while recorking {<bone>}"
peer-core
:: if we previously errored out, print and reset timer for later :: if we previously errored out, print and reset timer for later
:: ::
:: This really shouldn't happen, but if it does, make sure we :: This really shouldn't happen, but if it does, make sure we
@ -2531,7 +2527,7 @@
peer-core peer-core
:: maybe resend some timed out packets :: maybe resend some timed out packets
:: ::
(run-message-pump bone %wake recork) (run-message-pump bone %wake ~)
:: +send-shut-packet: fire encrypted packet at rcvr and maybe sponsors :: +send-shut-packet: fire encrypted packet at rcvr and maybe sponsors
:: ::
++ send-shut-packet ++ send-shut-packet
@ -2553,6 +2549,30 @@
our-life.channel her-life.channel our-life.channel her-life.channel
== ==
peer-core peer-core
:: +recork-one: re-send the next %cork to the peer
::
++ recork-one
^+ peer-core
=/ boz (sort ~(tap in closing.peer-state) lte)
|- ^+ peer-core
?~ boz peer-core
=/ pum=message-pump-state (~(got by snd.peer-state) i.boz)
?. =(next current):pum
$(boz t.boz)
:: sanity check on the message pump state
::
?. ?& =(~ unsent-messages.pum)
=(~ unsent-fragments.pum)
=(~ live.packet-pump-state.pum)
==
~> %slog.0^leaf/"ames: bad pump state {<[her.channel i.boz]>}"
$(boz t.boz)
:: no outstanding messages, so send a new %cork
::
:: TODO use +trace
~> %slog.0^leaf/"ames: recork {<[her.channel i.boz]>}"
=/ =plea [%$ /flow [%cork ~]]
(on-memo i.boz plea %plea)
:: +got-duct: look up $duct by .bone, asserting already bound :: +got-duct: look up $duct by .bone, asserting already bound
:: ::
++ got-duct ++ got-duct
@ -2633,7 +2653,6 @@
:: ::
=. message-pump (run-packet-pump:message-pump %done message-num *@dr) =. message-pump (run-packet-pump:message-pump %done message-num *@dr)
=/ =wire (make-pump-timer-wire her.channel bone) =/ =wire (make-pump-timer-wire her.channel bone)
=. corks.ames-state (~(del in corks.ames-state) wire)
=/ nack-bone=^bone (mix 0b10 bone) =/ nack-bone=^bone (mix 0b10 bone)
=? rcv.peer-state (~(has by rcv.peer-state) nack-bone) =? rcv.peer-state (~(has by rcv.peer-state) nack-bone)
:: if the publisher was behind we remove nacks received on that bone :: if the publisher was behind we remove nacks received on that bone
@ -2650,8 +2669,10 @@
by-duct.ossuary (~(del by by-duct.ossuary) (got-duct bone)) by-duct.ossuary (~(del by by-duct.ossuary) (got-duct bone))
by-bone.ossuary (~(del by by-bone.ossuary) bone) by-bone.ossuary (~(del by by-bone.ossuary) bone)
== ==
peer-core :: since we got one cork ack, try the next one
:: +on-pump-krock: if we get a nack for a cork, add it to the recork set ::
recork-one
:: +on-pump-kroc: if we get a nack for a cork, add it to the recork set
:: ::
++ on-pump-kroc ++ on-pump-kroc
|= =^bone |= =^bone
@ -2820,25 +2841,11 @@
:: ::
?. (~(has in krocs.peer-state) target-bone) ?. (~(has in krocs.peer-state) target-bone)
peer-core peer-core
=/ =message-pump-state
(~(gut by snd.peer-state) target-bone *message-pump-state)
=/ message-pump
(make-message-pump message-pump-state channel %.y target-bone)
:: we don't process the gifts here and instead wait for
:: the timer to handle the %cork plea added to the pump
::
=^ * message-pump-state
%- work:message-pump
%memo^(dedup-message (jim [%$ /flow [%cork ~]]))
=. snd.peer-state
(~(put by snd.peer-state) target-bone message-pump-state)
:: if we get a naxplanation for a %cork, the publisher is behind :: if we get a naxplanation for a %cork, the publisher is behind
:: receiving the OTA, so we set up a timer to retry in one day. :: receiving the OTA. The /recork timer will retry eventually.
:: ::
%- %+ trace msg.veb %- %+ trace msg.veb
|.("old publisher, resend %cork on bone={<target-bone>} in ~m20") |.("old publisher, %cork nacked on bone={<target-bone>}")
=/ =wire (make-pump-timer-wire her.channel target-bone)
=. corks.ames-state (~(put in corks.ames-state) wire)
peer-core peer-core
:: +on-sink-plea: handle request message received by |message-sink :: +on-sink-plea: handle request message received by |message-sink
:: ::
@ -2912,15 +2919,12 @@
|= task=message-pump-task |= task=message-pump-task
^+ [gifts state] ^+ [gifts state]
:: ::
=. message-pump (dispatch-task task) =~ (dispatch-task task)
=. message-pump feed-packets feed-packets
:: don't set new pump timer if triggered by a recork timer
::
=? message-pump !=([%wake recork=&] task)
(run-packet-pump %halt ~) (run-packet-pump %halt ~)
:: assert
=. message-pump assert
[(flop gifts) state] [(flop gifts) state]
==
:: +dispatch-task: perform task-specific processing :: +dispatch-task: perform task-specific processing
:: ::
++ dispatch-task ++ dispatch-task
@ -2930,7 +2934,7 @@
?- -.task ?- -.task
%prod (run-packet-pump %prod ~) %prod (run-packet-pump %prod ~)
%memo (on-memo message-blob.task) %memo (on-memo message-blob.task)
%wake (run-packet-pump %wake recork.task current.state) %wake (run-packet-pump %wake current.state)
%hear %hear
?- -.ack-meat.task ?- -.ack-meat.task
%& %&
@ -3178,7 +3182,7 @@
?- -.task ?- -.task
%hear (on-hear [message-num fragment-num]:task) %hear (on-hear [message-num fragment-num]:task)
%done (on-done message-num.task) %done (on-done message-num.task)
%wake (on-wake recork.task current.task) %wake (on-wake current.task)
%prod on-prod %prod on-prod
%halt set-wake %halt set-wake
== ==
@ -3205,7 +3209,7 @@
:: +on-wake: handle packet timeout :: +on-wake: handle packet timeout
:: ::
++ on-wake ++ on-wake
|= [recork=? current=message-num] |= current=message-num
^+ packet-pump ^+ packet-pump
:: assert temporal coherence :: assert temporal coherence
:: ::