diff --git a/pkg/arvo/sys/vane/ames.hoon b/pkg/arvo/sys/vane/ames.hoon index 0bc81b634b..76bb2e63fe 100644 --- a/pkg/arvo/sys/vane/ames.hoon +++ b/pkg/arvo/sys/vane/ames.hoon @@ -1345,7 +1345,8 @@ ++ send-nack |= [=bone =^error] ^+ event-core - =. event-core abet:(run-message-sink:peer-core bone %done ok=%.n cork=%.n) + =. event-core + abet:(run-message-sink:peer-core bone %done ok=%.n cork=%.n) =/ =^peer-state (got-peer-state her) =/ =^channel [[our her] now channel-state -.peer-state] :: construct nack-trace message, referencing .failed $message-num @@ -1726,9 +1727,8 @@ =/ =peer-state +.u.ship-state =/ =channel [[our ship] now channel-state -.peer-state] :: - =/ =plea [%a /flow [%cork ~]] - :: =^ =bone ossuary.peer-state (bind-duct ossuary.peer-state duct) + =/ =plea [%a /flow/(scot %ud bone) [%cork ~]] :: =. closing.peer-state (~(put in closing.peer-state) bone) abet:(on-memo:(make-peer-core peer-state channel) bone plea %plea) @@ -2697,8 +2697,12 @@ :: =. peer-core (run-message-pump target-bone %near naxplanation) :: - ?. (~(has in closing.peer-state) target-bone) - peer-core + ?. ?& (~(has in closing.peer-state) target-bone) + (~(has by by-bone.ossuary.peer-state) target-bone) + == + %. peer-core + %+ trace odd.veb + |.("weird %cork on bone={}") =/ =message-pump-state (~(gut by snd.peer-state) target-bone *message-pump-state) =/ message-pump @@ -2707,17 +2711,18 @@ :: the timer to handle the %cork plea added to the pump :: =^ * message-pump-state + =/ =wire /flow/[(scot %ud target-bone)] %- work:message-pump - %memo^(dedup-message (jim [%a /flow [%cork ~]])) + %memo^(dedup-message (jim [%a wire [%cork ~]])) =. snd.peer-state (~(put by snd.peer-state) target-bone message-pump-state) :: if we get a naxplanation for a %cork, the publisher is behind :: receiving the OTA, so we set up a timer to retry in one hour. :: %- %+ trace msg.veb - |.("resend %cork on bone={} in ~h1") + |.("old publisher, resend %cork on bone={} in ~h1") =/ =wire (make-pump-timer-wire her.channel target-bone) - (emit [/ames]~ %pass wire %b %wait `@da`(add now ~h1)) + (emit [/ames-recork]~ %pass wire %b %wait `@da`(add now ~h1)) :: +on-sink-plea: handle request message received by |message-sink :: ++ on-sink-plea @@ -2727,40 +2732,72 @@ (~(has in corked.peer-state) bone) == peer-core + |^ %- %+ trace msg.veb =/ dat [her.channel bone=bone message-num=message-num] |.("sink plea {}") :: is this the first time we're trying to process this message? :: - ?. ?=([%hear * * ok=%.n] task) - :: fresh plea; pass to client vane + ?: ?=([%hear * * ok=%.n] task) + :: we previously crashed on this message; send nack :: - =+ ;; =plea message - =/ =wire (make-bone-wire her.channel her-rift.channel bone) - :: - :: - ?: =(vane.plea %a) - :: only ames-to-ames %cork pleas are handled - :: - ~| %non-cork-ames-plea^our^her.channel^path.plea - ?> &(?=([%cork *] payload.plea) =(path.plea `path`/flow)) - =. closing.peer-state (~(put in closing.peer-state) bone) - (emit duct %pass wire %a %plea her.channel [%a /close ~]) + nack-plea + :: fresh plea; pass to client vane + :: + =+ ;; =plea message + =/ =wire (make-bone-wire her.channel her-rift.channel bone) + :: + ?. =(vane.plea %a) ?+ vane.plea ~| %ames-evil-vane^our^her.channel^vane.plea !! %c (emit duct %pass wire %c %plea her.channel plea) %g (emit duct %pass wire %g %plea her.channel plea) %j (emit duct %pass wire %j %plea her.channel plea) == - :: we previously crashed on this message; send nack + :: only ames-to-ames %cork pleas are handled :: - =. peer-core (run-message-sink bone %done ok=%.n cork=%.n) - :: also send nack-trace with blank .error for security + ~| %non-cork-ames-plea^our^her.channel^path.plea + ?> &(?=([%cork *] payload.plea) ?=([%flow ^] path.plea)) + ?: (~(has by by-bone.ossuary.peer-state) bone) + =. closing.peer-state (~(put in closing.peer-state) bone) + (emit duct %pass wire %a %plea her.channel [%a /close ~]) + :: a bone for %cork that is not in our ossuary comes from a + :: publisher that still handles non-cork ames-to-ames %pleas :: - =/ nack-trace-bone=^bone (mix 0b10 bone) - =/ =naxplanation [message-num *error] - =/ =message-blob (jam naxplanation) + =/ cork-bone=^bone (slav %ud +<.path.plea) + :: we nack the %plea that was created from receiving the cork :: - (run-message-pump nack-trace-bone %memo message-blob) + =. peer-core nack-plea + ?. (~(has in closing.peer-state) cork-bone) + %. peer-core + %+ trace odd.veb + |.("got weird %cork on bone={} coming from bone={}") + :: TODO: refactor see +on-sink-nack-trace + :: + =/ =message-pump-state + (~(gut by snd.peer-state) cork-bone *message-pump-state) + =/ message-pump (make-message-pump message-pump-state channel %.y) + =^ * message-pump-state + %- work:message-pump + %memo^(dedup-message (jim [%a path.plea [%cork ~]])) + =. snd.peer-state + (~(put by snd.peer-state) cork-bone message-pump-state) + %- %+ trace msg.veb + |. %+ weld "old publisher, resend %cork on" + "bone={} from bone={} in ~h1" + =/ =^wire (make-pump-timer-wire her.channel cork-bone) + (emit [/ames-recork-old]~ %pass wire %b %wait `@da`(add now ~h1)) + :: + ++ nack-plea + ^+ peer-core + =. peer-core (run-message-sink bone %done ok=%.n cork=%.n) + :: send nack-trace with blank .error for security + :: + =/ nack-trace-bone=^bone (mix 0b10 bone) + =/ =naxplanation [message-num *error] + =/ =message-blob (jam naxplanation) + :: + (run-message-pump nack-trace-bone %memo message-blob) + -- -- -- --