mirror of
https://github.com/urbit/shrub.git
synced 2024-12-19 00:13:12 +03:00
ames: handle bounce %cork from old publisher
Because the publisher will send the cork plea back to the subscriber on the next bone, we are not able to know the bone for the original cork. To handle it, we add the cork bone to the plea path still wip: it keeps resending the cork plea faster than its ~h1 timer
This commit is contained in:
parent
070d0a7d4b
commit
17d52b8535
@ -1345,7 +1345,8 @@
|
||||
++ send-nack
|
||||
|= [=bone =^error]
|
||||
^+ event-core
|
||||
=. event-core abet:(run-message-sink:peer-core bone %done ok=%.n cork=%.n)
|
||||
=. event-core
|
||||
abet:(run-message-sink:peer-core bone %done ok=%.n cork=%.n)
|
||||
=/ =^peer-state (got-peer-state her)
|
||||
=/ =^channel [[our her] now channel-state -.peer-state]
|
||||
:: construct nack-trace message, referencing .failed $message-num
|
||||
@ -1726,9 +1727,8 @@
|
||||
=/ =peer-state +.u.ship-state
|
||||
=/ =channel [[our ship] now channel-state -.peer-state]
|
||||
::
|
||||
=/ =plea [%a /flow [%cork ~]]
|
||||
::
|
||||
=^ =bone ossuary.peer-state (bind-duct ossuary.peer-state duct)
|
||||
=/ =plea [%a /flow/(scot %ud bone) [%cork ~]]
|
||||
::
|
||||
=. closing.peer-state (~(put in closing.peer-state) bone)
|
||||
abet:(on-memo:(make-peer-core peer-state channel) bone plea %plea)
|
||||
@ -2697,8 +2697,12 @@
|
||||
::
|
||||
=. peer-core (run-message-pump target-bone %near naxplanation)
|
||||
::
|
||||
?. (~(has in closing.peer-state) target-bone)
|
||||
peer-core
|
||||
?. ?& (~(has in closing.peer-state) target-bone)
|
||||
(~(has by by-bone.ossuary.peer-state) target-bone)
|
||||
==
|
||||
%. peer-core
|
||||
%+ trace odd.veb
|
||||
|.("weird %cork on bone={<target-bone>}")
|
||||
=/ =message-pump-state
|
||||
(~(gut by snd.peer-state) target-bone *message-pump-state)
|
||||
=/ message-pump
|
||||
@ -2707,17 +2711,18 @@
|
||||
:: the timer to handle the %cork plea added to the pump
|
||||
::
|
||||
=^ * message-pump-state
|
||||
=/ =wire /flow/[(scot %ud target-bone)]
|
||||
%- work:message-pump
|
||||
%memo^(dedup-message (jim [%a /flow [%cork ~]]))
|
||||
%memo^(dedup-message (jim [%a wire [%cork ~]]))
|
||||
=. snd.peer-state
|
||||
(~(put by snd.peer-state) target-bone message-pump-state)
|
||||
:: if we get a naxplanation for a %cork, the publisher is behind
|
||||
:: receiving the OTA, so we set up a timer to retry in one hour.
|
||||
::
|
||||
%- %+ trace msg.veb
|
||||
|.("resend %cork on bone={<target-bone>} in ~h1")
|
||||
|.("old publisher, resend %cork on bone={<target-bone>} in ~h1")
|
||||
=/ =wire (make-pump-timer-wire her.channel target-bone)
|
||||
(emit [/ames]~ %pass wire %b %wait `@da`(add now ~h1))
|
||||
(emit [/ames-recork]~ %pass wire %b %wait `@da`(add now ~h1))
|
||||
:: +on-sink-plea: handle request message received by |message-sink
|
||||
::
|
||||
++ on-sink-plea
|
||||
@ -2727,40 +2732,72 @@
|
||||
(~(has in corked.peer-state) bone)
|
||||
==
|
||||
peer-core
|
||||
|^
|
||||
%- %+ trace msg.veb
|
||||
=/ dat [her.channel bone=bone message-num=message-num]
|
||||
|.("sink plea {<dat>}")
|
||||
:: is this the first time we're trying to process this message?
|
||||
::
|
||||
?. ?=([%hear * * ok=%.n] task)
|
||||
:: fresh plea; pass to client vane
|
||||
?: ?=([%hear * * ok=%.n] task)
|
||||
:: we previously crashed on this message; send nack
|
||||
::
|
||||
=+ ;; =plea message
|
||||
=/ =wire (make-bone-wire her.channel her-rift.channel bone)
|
||||
::
|
||||
::
|
||||
?: =(vane.plea %a)
|
||||
:: only ames-to-ames %cork pleas are handled
|
||||
::
|
||||
~| %non-cork-ames-plea^our^her.channel^path.plea
|
||||
?> &(?=([%cork *] payload.plea) =(path.plea `path`/flow))
|
||||
=. closing.peer-state (~(put in closing.peer-state) bone)
|
||||
(emit duct %pass wire %a %plea her.channel [%a /close ~])
|
||||
nack-plea
|
||||
:: fresh plea; pass to client vane
|
||||
::
|
||||
=+ ;; =plea message
|
||||
=/ =wire (make-bone-wire her.channel her-rift.channel bone)
|
||||
::
|
||||
?. =(vane.plea %a)
|
||||
?+ vane.plea ~| %ames-evil-vane^our^her.channel^vane.plea !!
|
||||
%c (emit duct %pass wire %c %plea her.channel plea)
|
||||
%g (emit duct %pass wire %g %plea her.channel plea)
|
||||
%j (emit duct %pass wire %j %plea her.channel plea)
|
||||
==
|
||||
:: we previously crashed on this message; send nack
|
||||
:: only ames-to-ames %cork pleas are handled
|
||||
::
|
||||
=. peer-core (run-message-sink bone %done ok=%.n cork=%.n)
|
||||
:: also send nack-trace with blank .error for security
|
||||
~| %non-cork-ames-plea^our^her.channel^path.plea
|
||||
?> &(?=([%cork *] payload.plea) ?=([%flow ^] path.plea))
|
||||
?: (~(has by by-bone.ossuary.peer-state) bone)
|
||||
=. closing.peer-state (~(put in closing.peer-state) bone)
|
||||
(emit duct %pass wire %a %plea her.channel [%a /close ~])
|
||||
:: a bone for %cork that is not in our ossuary comes from a
|
||||
:: publisher that still handles non-cork ames-to-ames %pleas
|
||||
::
|
||||
=/ nack-trace-bone=^bone (mix 0b10 bone)
|
||||
=/ =naxplanation [message-num *error]
|
||||
=/ =message-blob (jam naxplanation)
|
||||
=/ cork-bone=^bone (slav %ud +<.path.plea)
|
||||
:: we nack the %plea that was created from receiving the cork
|
||||
::
|
||||
(run-message-pump nack-trace-bone %memo message-blob)
|
||||
=. peer-core nack-plea
|
||||
?. (~(has in closing.peer-state) cork-bone)
|
||||
%. peer-core
|
||||
%+ trace odd.veb
|
||||
|.("got weird %cork on bone={<cork-bone>} coming from bone={<bone>}")
|
||||
:: TODO: refactor see +on-sink-nack-trace
|
||||
::
|
||||
=/ =message-pump-state
|
||||
(~(gut by snd.peer-state) cork-bone *message-pump-state)
|
||||
=/ message-pump (make-message-pump message-pump-state channel %.y)
|
||||
=^ * message-pump-state
|
||||
%- work:message-pump
|
||||
%memo^(dedup-message (jim [%a path.plea [%cork ~]]))
|
||||
=. snd.peer-state
|
||||
(~(put by snd.peer-state) cork-bone message-pump-state)
|
||||
%- %+ trace msg.veb
|
||||
|. %+ weld "old publisher, resend %cork on"
|
||||
"bone={<cork-bone>} from bone={<bone>} in ~h1"
|
||||
=/ =^wire (make-pump-timer-wire her.channel cork-bone)
|
||||
(emit [/ames-recork-old]~ %pass wire %b %wait `@da`(add now ~h1))
|
||||
::
|
||||
++ nack-plea
|
||||
^+ peer-core
|
||||
=. peer-core (run-message-sink bone %done ok=%.n cork=%.n)
|
||||
:: send nack-trace with blank .error for security
|
||||
::
|
||||
=/ nack-trace-bone=^bone (mix 0b10 bone)
|
||||
=/ =naxplanation [message-num *error]
|
||||
=/ =message-blob (jam naxplanation)
|
||||
::
|
||||
(run-message-pump nack-trace-bone %memo message-blob)
|
||||
--
|
||||
--
|
||||
--
|
||||
--
|
||||
|
Loading…
Reference in New Issue
Block a user