mirror of
https://github.com/urbit/shrub.git
synced 2024-12-20 09:21:42 +03:00
Merge pull request #5923 from urbit/ted/recork-better
ames: recork better
This commit is contained in:
commit
8eb8a1da29
@ -607,7 +607,9 @@
|
|||||||
:: life: our $life; how many times we've rekeyed
|
:: life: our $life; how many times we've rekeyed
|
||||||
:: crypto-core: interface for encryption and signing
|
:: crypto-core: interface for encryption and signing
|
||||||
:: bug: debug printing configuration
|
:: bug: debug printing configuration
|
||||||
:: corks: wires for cork flows pending publisher update
|
:: corks(STALE):wires for cork flows pending publisher update
|
||||||
|
::
|
||||||
|
:: Note: .corks is only still present for unreleased migration reasons
|
||||||
::
|
::
|
||||||
+$ ames-state
|
+$ ames-state
|
||||||
$: peers=(map ship ship-state)
|
$: peers=(map ship ship-state)
|
||||||
@ -764,7 +766,7 @@
|
|||||||
[%hear =message-num =ack-meat]
|
[%hear =message-num =ack-meat]
|
||||||
[%near =naxplanation]
|
[%near =naxplanation]
|
||||||
[%prod ~]
|
[%prod ~]
|
||||||
[%wake recork=?]
|
[%wake ~]
|
||||||
==
|
==
|
||||||
:: $message-pump-gift: effect from |message-pump
|
:: $message-pump-gift: effect from |message-pump
|
||||||
::
|
::
|
||||||
@ -795,7 +797,7 @@
|
|||||||
$% [%hear =message-num =fragment-num]
|
$% [%hear =message-num =fragment-num]
|
||||||
[%done =message-num lag=@dr]
|
[%done =message-num lag=@dr]
|
||||||
[%halt ~]
|
[%halt ~]
|
||||||
[%wake recork=? current=message-num]
|
[%wake current=message-num]
|
||||||
[%prod ~]
|
[%prod ~]
|
||||||
==
|
==
|
||||||
:: $packet-pump-gift: effect from |packet-pump
|
:: $packet-pump-gift: effect from |packet-pump
|
||||||
@ -1812,17 +1814,7 @@
|
|||||||
event-core
|
event-core
|
||||||
(request-attestation u.ship)
|
(request-attestation u.ship)
|
||||||
::
|
::
|
||||||
|^
|
?. ?=([%recork ~] wire)
|
||||||
?. ?=([%recork ~] wire) (handle-single-wire wire recork=|)
|
|
||||||
=/ wires=(list ^wire) ~(tap in corks.ames-state)
|
|
||||||
|- ^+ event-core
|
|
||||||
?^ wires
|
|
||||||
$(wires t.wires, event-core (handle-single-wire i.wires recork=&))
|
|
||||||
(emit duct %pass /recork %b %wait `@da`(add now ~m20))
|
|
||||||
::
|
|
||||||
++ handle-single-wire
|
|
||||||
|= [=^wire recork=?]
|
|
||||||
^+ event-core
|
|
||||||
=/ res=(unit [her=ship =bone]) (parse-pump-timer-wire wire)
|
=/ res=(unit [her=ship =bone]) (parse-pump-timer-wire wire)
|
||||||
?~ res
|
?~ res
|
||||||
%- (slog leaf+"ames: got timer for strange wire: {<wire>}" ~)
|
%- (slog leaf+"ames: got timer for strange wire: {<wire>}" ~)
|
||||||
@ -1835,11 +1827,22 @@
|
|||||||
[leaf+"ames: got timer for strange ship: {<her.u.res>}, ignoring" ~]
|
[leaf+"ames: got timer for strange ship: {<her.u.res>}, ignoring" ~]
|
||||||
::
|
::
|
||||||
=/ =channel [[our her.u.res] now channel-state -.u.state]
|
=/ =channel [[our her.u.res] now channel-state -.u.state]
|
||||||
|
abet:(on-wake:(make-peer-core u.state channel) bone.u.res error)
|
||||||
::
|
::
|
||||||
=< abet
|
=. event-core
|
||||||
%- on-wake:(make-peer-core u.state channel)
|
(emit duct %pass /recork %b %wait `@da`(add now ~m20))
|
||||||
[recork bone.u.res error]
|
:: recork up to one bone per peer
|
||||||
--
|
::
|
||||||
|
=/ pez ~(tap by peers.ames-state)
|
||||||
|
|- ^+ event-core
|
||||||
|
?~ pez event-core
|
||||||
|
=+ [her sat]=i.pez
|
||||||
|
?. ?=(%known -.sat)
|
||||||
|
$(pez t.pez)
|
||||||
|
=* peer-state +.sat
|
||||||
|
=/ =channel [[our her] now channel-state -.peer-state]
|
||||||
|
=/ peer-core (make-peer-core peer-state channel)
|
||||||
|
$(pez t.pez, event-core abet:recork-one:peer-core)
|
||||||
:: +on-init: first boot; subscribe to our info from jael
|
:: +on-init: first boot; subscribe to our info from jael
|
||||||
::
|
::
|
||||||
++ on-init
|
++ on-init
|
||||||
@ -2454,15 +2457,8 @@
|
|||||||
:: +on-wake: handle timer expiration
|
:: +on-wake: handle timer expiration
|
||||||
::
|
::
|
||||||
++ on-wake
|
++ on-wake
|
||||||
|= [recork=? =bone error=(unit tang)]
|
|= [=bone error=(unit tang)]
|
||||||
^+ peer-core
|
^+ peer-core
|
||||||
:: ignore spurious pump timers for %cork's
|
|
||||||
::
|
|
||||||
:: This is here to fix canaries that had spurious pump timers.
|
|
||||||
::
|
|
||||||
?: &(!recork (~(has in closing.peer-state) bone))
|
|
||||||
~> %slog.0^leaf/"ames: dropping pump wake while recorking {<bone>}"
|
|
||||||
peer-core
|
|
||||||
:: if we previously errored out, print and reset timer for later
|
:: if we previously errored out, print and reset timer for later
|
||||||
::
|
::
|
||||||
:: This really shouldn't happen, but if it does, make sure we
|
:: This really shouldn't happen, but if it does, make sure we
|
||||||
@ -2531,7 +2527,7 @@
|
|||||||
peer-core
|
peer-core
|
||||||
:: maybe resend some timed out packets
|
:: maybe resend some timed out packets
|
||||||
::
|
::
|
||||||
(run-message-pump bone %wake recork)
|
(run-message-pump bone %wake ~)
|
||||||
:: +send-shut-packet: fire encrypted packet at rcvr and maybe sponsors
|
:: +send-shut-packet: fire encrypted packet at rcvr and maybe sponsors
|
||||||
::
|
::
|
||||||
++ send-shut-packet
|
++ send-shut-packet
|
||||||
@ -2553,6 +2549,30 @@
|
|||||||
our-life.channel her-life.channel
|
our-life.channel her-life.channel
|
||||||
==
|
==
|
||||||
peer-core
|
peer-core
|
||||||
|
:: +recork-one: re-send the next %cork to the peer
|
||||||
|
::
|
||||||
|
++ recork-one
|
||||||
|
^+ peer-core
|
||||||
|
=/ boz (sort ~(tap in closing.peer-state) lte)
|
||||||
|
|- ^+ peer-core
|
||||||
|
?~ boz peer-core
|
||||||
|
=/ pum=message-pump-state (~(got by snd.peer-state) i.boz)
|
||||||
|
?. =(next current):pum
|
||||||
|
$(boz t.boz)
|
||||||
|
:: sanity check on the message pump state
|
||||||
|
::
|
||||||
|
?. ?& =(~ unsent-messages.pum)
|
||||||
|
=(~ unsent-fragments.pum)
|
||||||
|
=(~ live.packet-pump-state.pum)
|
||||||
|
==
|
||||||
|
~> %slog.0^leaf/"ames: bad pump state {<[her.channel i.boz]>}"
|
||||||
|
$(boz t.boz)
|
||||||
|
:: no outstanding messages, so send a new %cork
|
||||||
|
::
|
||||||
|
:: TODO use +trace
|
||||||
|
~> %slog.0^leaf/"ames: recork {<[her.channel i.boz]>}"
|
||||||
|
=/ =plea [%$ /flow [%cork ~]]
|
||||||
|
(on-memo i.boz plea %plea)
|
||||||
:: +got-duct: look up $duct by .bone, asserting already bound
|
:: +got-duct: look up $duct by .bone, asserting already bound
|
||||||
::
|
::
|
||||||
++ got-duct
|
++ got-duct
|
||||||
@ -2633,7 +2653,6 @@
|
|||||||
::
|
::
|
||||||
=. message-pump (run-packet-pump:message-pump %done message-num *@dr)
|
=. message-pump (run-packet-pump:message-pump %done message-num *@dr)
|
||||||
=/ =wire (make-pump-timer-wire her.channel bone)
|
=/ =wire (make-pump-timer-wire her.channel bone)
|
||||||
=. corks.ames-state (~(del in corks.ames-state) wire)
|
|
||||||
=/ nack-bone=^bone (mix 0b10 bone)
|
=/ nack-bone=^bone (mix 0b10 bone)
|
||||||
=? rcv.peer-state (~(has by rcv.peer-state) nack-bone)
|
=? rcv.peer-state (~(has by rcv.peer-state) nack-bone)
|
||||||
:: if the publisher was behind we remove nacks received on that bone
|
:: if the publisher was behind we remove nacks received on that bone
|
||||||
@ -2650,8 +2669,10 @@
|
|||||||
by-duct.ossuary (~(del by by-duct.ossuary) (got-duct bone))
|
by-duct.ossuary (~(del by by-duct.ossuary) (got-duct bone))
|
||||||
by-bone.ossuary (~(del by by-bone.ossuary) bone)
|
by-bone.ossuary (~(del by by-bone.ossuary) bone)
|
||||||
==
|
==
|
||||||
peer-core
|
:: since we got one cork ack, try the next one
|
||||||
:: +on-pump-krock: if we get a nack for a cork, add it to the recork set
|
::
|
||||||
|
recork-one
|
||||||
|
:: +on-pump-kroc: if we get a nack for a cork, add it to the recork set
|
||||||
::
|
::
|
||||||
++ on-pump-kroc
|
++ on-pump-kroc
|
||||||
|= =^bone
|
|= =^bone
|
||||||
@ -2820,25 +2841,11 @@
|
|||||||
::
|
::
|
||||||
?. (~(has in krocs.peer-state) target-bone)
|
?. (~(has in krocs.peer-state) target-bone)
|
||||||
peer-core
|
peer-core
|
||||||
=/ =message-pump-state
|
|
||||||
(~(gut by snd.peer-state) target-bone *message-pump-state)
|
|
||||||
=/ message-pump
|
|
||||||
(make-message-pump message-pump-state channel %.y target-bone)
|
|
||||||
:: we don't process the gifts here and instead wait for
|
|
||||||
:: the timer to handle the %cork plea added to the pump
|
|
||||||
::
|
|
||||||
=^ * message-pump-state
|
|
||||||
%- work:message-pump
|
|
||||||
%memo^(dedup-message (jim [%$ /flow [%cork ~]]))
|
|
||||||
=. snd.peer-state
|
|
||||||
(~(put by snd.peer-state) target-bone message-pump-state)
|
|
||||||
:: if we get a naxplanation for a %cork, the publisher is behind
|
:: if we get a naxplanation for a %cork, the publisher is behind
|
||||||
:: receiving the OTA, so we set up a timer to retry in one day.
|
:: receiving the OTA. The /recork timer will retry eventually.
|
||||||
::
|
::
|
||||||
%- %+ trace msg.veb
|
%- %+ trace msg.veb
|
||||||
|.("old publisher, resend %cork on bone={<target-bone>} in ~m20")
|
|.("old publisher, %cork nacked on bone={<target-bone>}")
|
||||||
=/ =wire (make-pump-timer-wire her.channel target-bone)
|
|
||||||
=. corks.ames-state (~(put in corks.ames-state) wire)
|
|
||||||
peer-core
|
peer-core
|
||||||
:: +on-sink-plea: handle request message received by |message-sink
|
:: +on-sink-plea: handle request message received by |message-sink
|
||||||
::
|
::
|
||||||
@ -2912,15 +2919,12 @@
|
|||||||
|= task=message-pump-task
|
|= task=message-pump-task
|
||||||
^+ [gifts state]
|
^+ [gifts state]
|
||||||
::
|
::
|
||||||
=. message-pump (dispatch-task task)
|
=~ (dispatch-task task)
|
||||||
=. message-pump feed-packets
|
feed-packets
|
||||||
:: don't set new pump timer if triggered by a recork timer
|
|
||||||
::
|
|
||||||
=? message-pump !=([%wake recork=&] task)
|
|
||||||
(run-packet-pump %halt ~)
|
(run-packet-pump %halt ~)
|
||||||
::
|
assert
|
||||||
=. message-pump assert
|
|
||||||
[(flop gifts) state]
|
[(flop gifts) state]
|
||||||
|
==
|
||||||
:: +dispatch-task: perform task-specific processing
|
:: +dispatch-task: perform task-specific processing
|
||||||
::
|
::
|
||||||
++ dispatch-task
|
++ dispatch-task
|
||||||
@ -2930,7 +2934,7 @@
|
|||||||
?- -.task
|
?- -.task
|
||||||
%prod (run-packet-pump %prod ~)
|
%prod (run-packet-pump %prod ~)
|
||||||
%memo (on-memo message-blob.task)
|
%memo (on-memo message-blob.task)
|
||||||
%wake (run-packet-pump %wake recork.task current.state)
|
%wake (run-packet-pump %wake current.state)
|
||||||
%hear
|
%hear
|
||||||
?- -.ack-meat.task
|
?- -.ack-meat.task
|
||||||
%&
|
%&
|
||||||
@ -3178,7 +3182,7 @@
|
|||||||
?- -.task
|
?- -.task
|
||||||
%hear (on-hear [message-num fragment-num]:task)
|
%hear (on-hear [message-num fragment-num]:task)
|
||||||
%done (on-done message-num.task)
|
%done (on-done message-num.task)
|
||||||
%wake (on-wake recork.task current.task)
|
%wake (on-wake current.task)
|
||||||
%prod on-prod
|
%prod on-prod
|
||||||
%halt set-wake
|
%halt set-wake
|
||||||
==
|
==
|
||||||
@ -3205,7 +3209,7 @@
|
|||||||
:: +on-wake: handle packet timeout
|
:: +on-wake: handle packet timeout
|
||||||
::
|
::
|
||||||
++ on-wake
|
++ on-wake
|
||||||
|= [recork=? current=message-num]
|
|= current=message-num
|
||||||
^+ packet-pump
|
^+ packet-pump
|
||||||
:: assert temporal coherence
|
:: assert temporal coherence
|
||||||
::
|
::
|
||||||
|
Loading…
Reference in New Issue
Block a user