diff --git a/pkg/arvo/app/dbug.hoon b/pkg/arvo/app/dbug.hoon index f0dc3c810a..6867e83bb5 100644 --- a/pkg/arvo/app/dbug.hoon +++ b/pkg/arvo/app/dbug.hoon @@ -206,13 +206,14 @@ 'ship'^(ship s) 'path'^(path p) == + :: TODO: display subscription nonce :: ++ outgoing |= =boat:gall ^- json :- %a %+ turn ~(tap by boat) - |= [[w=wire s=^ship t=term] [a=? p=^path]] + |= [[w=wire s=^ship t=term] [a=? p=^path nonce=@]] %- pairs :~ 'wire'^(path w) 'ship'^(ship s) diff --git a/pkg/arvo/app/spider.hoon b/pkg/arvo/app/spider.hoon index 83e360d2d1..821f2ac5e1 100644 --- a/pkg/arvo/app/spider.hoon +++ b/pkg/arvo/app/spider.hoon @@ -607,7 +607,7 @@ == :_ state %+ murn ~(tap by wex.bowl) - |= [[=wire =ship =term] [acked=? =path]] + |= [[=wire =ship =term] [acked=? =path nonce=@]] ^- (unit card) ?. ?& ?=([%thread @ *] wire) =(tid i.t.wire) diff --git a/pkg/arvo/sys/lull.hoon b/pkg/arvo/sys/lull.hoon index f2d6e98f04..fba3c4d693 100644 --- a/pkg/arvo/sys/lull.hoon +++ b/pkg/arvo/sys/lull.hoon @@ -351,6 +351,7 @@ :: %hear: packet from unix :: %heed: track peer's responsiveness; gives %clog if slow :: %jilt: stop tracking peer's responsiveness + :: %cork: request to delete message flow :: %plea: request to send message :: :: System and Lifecycle Tasks @@ -366,6 +367,7 @@ $% [%hear =lane =blob] [%heed =ship] [%jilt =ship] + [%cork =ship] $>(%plea vane-task) :: $>(%born vane-task) @@ -524,6 +526,8 @@ rcv=(map bone message-sink-state) nax=(set [=bone =message-num]) heeds=(set duct) + closing=(set bone) + corked=(set bone) == :: $qos: quality of service; how is our connection to a peer doing? :: @@ -1653,7 +1657,7 @@ +$ bitt (map duct (pair ship path)) :: incoming subs +$ boat :: outgoing subs %+ map [=wire =ship =term] :: - [acked=? =path] :: + [acked=? =path nonce=@] :: +$ bowl :: standard app state $: $: our=ship :: host src=ship :: guest diff --git a/pkg/arvo/sys/vane/ames.hoon b/pkg/arvo/sys/vane/ames.hoon index c7ba830d06..2877503838 100644 --- a/pkg/arvo/sys/vane/ames.hoon +++ b/pkg/arvo/sys/vane/ames.hoon @@ -586,6 +586,24 @@ :: +$ naxplanation [=message-num =error] :: ++| %statics +:: +:: $ames-state: state for entire vane +:: +:: peers: states of connections to other ships +:: unix-duct: handle to give moves to unix +:: life: our $life; how many times we've rekeyed +:: crypto-core: interface for encryption and signing +:: bug: debug printing configuration +:: ++$ ames-state + $: peers=(map ship ship-state) + =unix=duct + =life + crypto-core=acru:ames + =bug + == +:: +$ ames-state-4 ames-state-5 +$ ames-state-5 $: peers=(map ship ship-state-5) @@ -616,23 +634,34 @@ heeds=(set duct) == :: -+| %statics -:: -:: $ames-state: state for entire vane -:: -:: peers: states of connections to other ships -:: unix-duct: handle to give moves to unix -:: life: our $life; how many times we've rekeyed -:: crypto-core: interface for encryption and signing -:: bug: debug printing configuration -:: -+$ ames-state - $: peers=(map ship ship-state) ++$ ames-state-6 + $: peers=(map ship ship-state-6) =unix=duct =life crypto-core=acru:ames =bug == +:: ++$ ship-state-6 + $% [%alien alien-agenda] + [%known peer-state-6] + == +:: ++$ peer-state-6 + $: $: =symmetric-key + =life + =rift + =public-key + sponsor=ship + == + route=(unit [direct=? =lane]) + =qos + =ossuary + snd=(map bone message-pump-state) + rcv=(map bone message-sink-state) + nax=(set [=bone =message-num]) + heeds=(set duct) + == :: $bug: debug printing configuration :: :: veb: verbosity toggles @@ -716,12 +745,14 @@ :: $message-pump-gift: effect from |message-pump :: :: %done: report message acknowledgment +:: %cork: kill flow :: %send: emit message fragment :: %wait: set a new timer at .date :: %rest: cancel timer at .date :: +$ message-pump-gift $% [%done =message-num error=(unit error)] + [%cork ~] [%send =static-fragment] [%wait date=@da] [%rest date=@da] @@ -758,7 +789,7 @@ :: .ok: %.y unless previous failed attempt :: +$ message-sink-task - $% [%done ok=?] + $% [%done ok=? cork=?] [%drop =message-num] [%hear =lane =shut-packet ok=?] == @@ -770,6 +801,7 @@ +$ message-sink-gift $% [%memo =message-num message=*] [%send =message-num =ack-meat] + [%cork ~] == -- :: external vane interface @@ -779,7 +811,7 @@ :: =< =* adult-gate . =| queued-events=(qeu queued-event) - =| cached-state=(unit [%5 ames-state-5]) + =| cached-state=(unit [%6 ames-state-6]) :: |= [now=@da eny=@ rof=roof] =* larval-gate . @@ -803,7 +835,7 @@ ?: update-ready =. ames-state.adult-gate ?> ?=(^ cached-state) - (state-5-to-6:load:adult-core +.u.cached-state) + (state-6-to-7:load:adult-core +.u.cached-state) =. cached-state ~ ~> %slog.1^leaf/"ames: metamorphosis reload" [~ adult-gate] @@ -886,7 +918,7 @@ ?: update-ready =. ames-state.adult-gate ?> ?=(^ cached-state) - (state-5-to-6:load:adult-core +.u.cached-state) + (state-6-to-7:load:adult-core +.u.cached-state) =. cached-state ~ ~> %slog.1^leaf/"ames: metamorphosis reload" [moves adult-gate] @@ -920,6 +952,13 @@ [%adult state=ames-state-5] == == $: %6 + $% $: %larva + events=(qeu queued-event) + state=ames-state-6 + == + [%adult state=ames-state-6] + == == + $: %7 $% $: %larva events=(qeu queued-event) state=_ames-state.adult-gate @@ -935,23 +974,32 @@ =. adult-gate (load:adult-core %4 state.old) larval-gate :: - [%5 %adult *] - =. cached-state `[%5 state.old] - ~> %slog.1^leaf/"ames: larva reload" - larval-gate + [%5 %adult *] (load:adult-core %5 state.old) :: [%5 %larva *] ~> %slog.1^leaf/"ames: larva: load" =. queued-events events.old + =. adult-gate (load:adult-core %5 state.old) larval-gate :: - [%6 %adult *] (load:adult-core %6 state.old) + [%6 %adult *] + =. cached-state `[%6 state.old] + ~> %slog.1^leaf/"ames: larva reload" + larval-gate :: [%6 %larva *] ~> %slog.1^leaf/"ames: larva: load" =. queued-events events.old - =. adult-gate (load:adult-core %6 state.old) larval-gate + :: + [%7 %adult *] (load:adult-core %7 state.old) + :: + [%7 %larva *] + ~> %slog.1^leaf/"ames: larva: load" + =. queued-events events.old + =. adult-gate (load:adult-core %7 state.old) + larval-gate + :: == -- :: adult ames, after metamorphosis from larva @@ -993,6 +1041,7 @@ %trim on-trim:event-core %vega on-vega:event-core %plea (on-plea:event-core [ship plea]:task) + %cork (on-cork:event-core ship.task) == :: [moves ames-gate] @@ -1030,19 +1079,23 @@ =< |= $= old-state $% [%4 ames-state-4] [%5 ames-state-5] - [%6 ^ames-state] + [%6 ames-state-6] + [%7 ^ames-state] == ^+ ames-gate =? old-state ?=(%4 -.old-state) %5^(state-4-to-5 +.old-state) - :: XX this would crash with ames-state-5 but load is never + =? old-state ?=(%5 -.old-state) %6^(state-5-to-6 +.old-state) + ?< ?=(%6 -.old-state) + :: XX this would crash with ames-state-6 but load is never :: called with it -- the upgrade is handled by the larval load :: - ?> ?=(%6 -.old-state) + ?> ?=(%7 -.old-state) ames-gate(ames-state +.old-state) + :: |% ++ state-4-to-5 |= ames-state=ames-state-4 - ^- ames-state-4 + ^- ames-state-5 =. peers.ames-state %- ~(run by peers.ames-state) |= ship-state=ship-state-4 @@ -1059,11 +1112,11 @@ :: ++ state-5-to-6 |= ames-state=ames-state-5 - ^- ^^ames-state + ^- ames-state-6 :_ +.ames-state %- ~(rut by peers.ames-state) |= [=ship ship-state=ship-state-5] - ^- ^ship-state + ^- ship-state-6 ?. ?=(%known -.ship-state) ship-state =/ peer-state=peer-state-5 +.ship-state @@ -1074,12 +1127,24 @@ ;; @ud =< q.q %- need %- need (rof ~ %j `beam`[[our %rift %da now] /(scot %p ship)]) - =/ =^peer-state - :_ +.peer-state - =, -.peer-state - [symmetric-key life rift public-key sponsor] + :- -.ship-state + :_ +.peer-state + =, -.peer-state + [symmetric-key life rift public-key sponsor] + :: + ++ state-6-to-7 + |= ames-state=ames-state-6 + ^- ^^ames-state + :_ +.ames-state + %- ~(run by peers.ames-state) + |= ship-state=ship-state-6 ^- ^ship-state - [-.ship-state peer-state] + ?. ?=(%known -.ship-state) + ship-state + :- %known + ^- peer-state + :- +<.ship-state + [route qos ossuary snd rcv nax heeds ~ ~]:ship-state -- :: +scry: dereference namespace :: @@ -1254,13 +1319,14 @@ ++ send-ack |= =bone ^+ event-core - abet:(run-message-sink:peer-core bone %done ok=%.y) + =/ cork=? (~(has in closing.peer-state) bone) + abet:(run-message-sink:peer-core bone %done ok=%.y cork) :: failed; send message nack packet :: ++ send-nack |= [=bone =^error] ^+ event-core - =. event-core abet:(run-message-sink:peer-core bone %done ok=%.n) + =. event-core abet:(run-message-sink:peer-core bone %done ok=%.n cork=%.n) =/ =^peer-state (got-peer-state her) =/ =^channel [[our her] now channel-state -.peer-state] :: construct nack-trace message, referencing .failed $message-num @@ -1575,7 +1641,31 @@ =/ sndr [our our-life.channel] =/ rcvr [ship her-life.channel] "plea {}" + :: since flow kill goes like: + :: client vane cork task -> client ames pass cork as plea -> + :: -> server ames sinks plea -> server ames +on-plea (we are here); + :: if it's %cork plea passed to ames from its sink, + :: give %done and process flow closing after +on-take-done call :: + ?: &(=(vane.plea %a) =(path.plea `path`/flow) ?=([%cork *] payload.plea)) + (emit duct %give %done ~) + abet:(on-memo:(make-peer-core peer-state channel) bone plea %plea) + :: +on-cork: handle request to kill a flow + :: + ++ on-cork + |= =ship + ^+ event-core + =/ ship-state (~(get by peers.ames-state) ship) + :: + ?> ?=([~ %known *] ship-state) + =/ =peer-state +.u.ship-state + =/ =channel [[our ship] now channel-state -.peer-state] + :: + =/ =plea [%a /flow [%cork ~]] + :: + =^ =bone ossuary.peer-state (bind-duct ossuary.peer-state duct) + :: + =. closing.peer-state (~(put in closing.peer-state) bone) abet:(on-memo:(make-peer-core peer-state channel) bone plea %plea) :: +on-take-wake: receive wakeup or error notification from behn :: @@ -1641,7 +1731,6 @@ :: ?- public-keys-result [%diff @ %rift *] - :: event-core (on-publ-rift [who to.diff]:public-keys-result) :: [%diff @ %keys *] @@ -2289,7 +2378,8 @@ =/ =message-pump-state (~(gut by snd.peer-state) bone *message-pump-state) :: - =/ message-pump (make-message-pump message-pump-state channel) + =/ closing=? (~(has in closing.peer-state) bone) + =/ message-pump (make-message-pump message-pump-state channel closing) =^ pump-gifts message-pump-state (work:message-pump task) =. snd.peer-state (~(put by snd.peer-state) bone message-pump-state) :: process effects from |message-pump @@ -2300,6 +2390,7 @@ =. peer-core ?- -.gift %done (on-pump-done [message-num error]:gift) + %cork on-pump-cork %send (on-pump-send static-fragment.gift) %wait (on-pump-wait date.gift) %rest (on-pump-rest date.gift) @@ -2325,10 +2416,26 @@ :: not a nack-trace bone; relay ack to client vane :: (emit (got-duct bone) %give %done error) + :: +on-pump-cork: kill flow on cork sender side + :: + ++ on-pump-cork + ^+ peer-core + =. peer-state + =, peer-state + %_ peer-state + snd (~(del by snd) bone) + rcv (~(del by rcv) bone) + corked (~(put in corked) bone) + closing (~(del in closing) bone) + by-duct.ossuary (~(del by by-duct.ossuary) (got-duct bone)) + by-bone.ossuary (~(del by by-bone.ossuary) bone) + == + peer-core :: +on-pump-send: emit message fragment requested by |message-pump :: ++ on-pump-send - |=(f=static-fragment (send-shut-packet bone [message-num %& +]:f)) + |= f=static-fragment + (send-shut-packet bone [message-num %& +]:f) :: +on-pump-wait: relay |message-pump's set-timer request :: ++ on-pump-wait @@ -2353,6 +2460,7 @@ ++ run-message-sink |= [=bone task=message-sink-task] ^+ peer-core + ?: (~(has in corked.peer-state) bone) peer-core :: pass .task to the |message-sink and apply state mutations :: =/ =message-sink-state @@ -2370,8 +2478,30 @@ ?- -.gift %memo (on-sink-memo [message-num message]:gift) %send (on-sink-send [message-num ack-meat]:gift) + %cork on-sink-cork == $(sink-gifts t.sink-gifts) + :: +on-sink-cork: handle flow kill after server ames has taken %done + :: + ++ on-sink-cork + ^+ peer-core + =/ =message-pump-state + (~(gut by snd.peer-state) bone *message-pump-state) + =? peer-core ?=(^ next-wake.packet-pump-state.message-pump-state) + =* next-wake u.next-wake.packet-pump-state.message-pump-state + =/ =wire (make-pump-timer-wire her.channel bone) + :: resetting timer for boons + :: + (emit [/ames]~ %pass wire %b %rest next-wake) + =. peer-state + =, peer-state + %_ peer-state + rcv (~(del by rcv) bone) + snd (~(del by snd) bone) + corked (~(put in corked) bone) + closing (~(del in closing) bone) + == + peer-core :: +on-sink-send: emit ack packet as requested by |message-sink :: ++ on-sink-send @@ -2405,10 +2535,14 @@ ++ on-sink-boon |= [=message-num message=*] ^+ peer-core + ?: ?| (~(has in closing.peer-state) bone) + (~(has in corked.peer-state) bone) + == + peer-core :: send ack unconditionally :: =. peer-core (emit (got-duct bone) %give %boon message) - =. peer-core (run-message-sink bone %done ok=%.y) + =. peer-core (run-message-sink bone %done ok=%.y cork=%.n) :: ?. ?=([%hear * * ok=%.n] task) :: fresh boon; give message to client vane @@ -2446,7 +2580,7 @@ =+ ;; =naxplanation message :: ack nack-trace message (only applied if we don't later crash) :: - =. peer-core (run-message-sink bone %done ok=%.y) + =. peer-core (run-message-sink bone %done ok=%.y cork=%.n) :: flip .bone's second bit to find referenced flow :: =/ target-bone=^bone (mix 0b10 bone) @@ -2458,6 +2592,10 @@ ++ on-sink-plea |= [=message-num message=*] ^+ peer-core + ?: ?| (~(has in closing.peer-state) bone) + (~(has in corked.peer-state) bone) + == + peer-core %- %+ trace msg.veb =/ dat [her.channel bone=bone message-num=message-num] |.("sink plea {}") @@ -2467,6 +2605,10 @@ :: fresh plea; pass to client vane :: =+ ;; =plea message + :: if this plea is %cork, put to closing + :: + =? closing.peer-state ?=([%cork *] payload.plea) + (~(put in closing.peer-state) bone) :: =/ =wire (make-bone-wire her.channel her-rift.channel bone) :: @@ -2477,7 +2619,7 @@ == :: we previously crashed on this message; send nack :: - =. peer-core (run-message-sink bone %done ok=%.n) + =. peer-core (run-message-sink bone %done ok=%.n cork=%.n) :: also send nack-trace with blank .error for security :: =/ nack-trace-bone=^bone (mix 0b10 bone) @@ -2491,7 +2633,7 @@ :: +make-message-pump: constructor for |message-pump :: ++ make-message-pump - |= [state=message-pump-state =channel] + |= [state=message-pump-state =channel closing=?] =* veb veb.bug.channel =| gifts=(list message-pump-gift) :: @@ -2525,11 +2667,27 @@ %memo (on-memo message-blob.task) %wake (run-packet-pump %wake current.state) %hear - ?- -.ack-meat.task - %& (on-hear [message-num fragment-num=p.ack-meat]:task) - %| (on-done [message-num ?:(ok.p.ack-meat [%ok ~] [%nack ~])]:task) + ?- -.ack-meat.task + %& + (on-hear [message-num fragment-num=p.ack-meat]:task) + :: + %| + =/ cork=? + =/ top-live + (pry:packet-queue:*make-packet-pump live.packet-pump-state.state) + :: If we send a %cork and get an ack, we can know by + :: sequence number that the ack is for the %cork message + :: + ?& closing + ?=(^ top-live) + =(1 ~(wyt by live.packet-pump-state.state)) + =(message-num:task message-num.key.u.top-live) + == + =* ack p.ack-meat.task + %- on-done + [[message-num:task ?:(ok.ack [%ok ~] [%nack ~])] cork] == - %near (on-done [message-num %naxplanation error]:naxplanation.task) + %near (on-done [[message-num %naxplanation error]:naxplanation.task %&]) == :: +on-memo: handle request to send a message :: @@ -2561,7 +2719,7 @@ :: flows. :: ++ on-done - |= [=message-num =ack] + |= [[=message-num =ack] cork=?] ^+ message-pump :: unsent messages from the future should never get acked :: @@ -2623,6 +2781,7 @@ ?- -.u.cur %ok =. message-pump (give %done current.state ~) + =? message-pump cork (give %cork ~) $(current.state +(current.state)) :: %nack @@ -3154,7 +3313,7 @@ =- [(flop gifts) state] :: ?- -.task - %done (on-done ok.task) + %done (on-done ok.task cork.task) %drop (on-drop message-num.task) %hear (on-hear [lane shut-packet ok]:task) == @@ -3303,7 +3462,7 @@ :: +on-done: handle confirmation of message processing from vane :: ++ on-done - |= ok=? + |= [ok=? cork=?] ^+ message-sink :: =^ pending pending-vane-ack.state ~(get to pending-vane-ack.state) @@ -3313,6 +3472,7 @@ =? nax.state !ok (~(put in nax.state) message-num) :: =. message-sink (give %send message-num %| ok lag=`@dr`0) + =? message-sink cork (give %cork ~) =/ next ~(top to pending-vane-ack.state) ?~ next message-sink diff --git a/pkg/arvo/sys/vane/gall.hoon b/pkg/arvo/sys/vane/gall.hoon index af07252e2c..793b7bd006 100644 --- a/pkg/arvo/sys/vane/gall.hoon +++ b/pkg/arvo/sys/vane/gall.hoon @@ -14,7 +14,7 @@ +$ move [=duct move=(wind note-arvo gift-arvo)] :: $state-8: overall gall state, versioned :: -+$ state-8 [%8 state] ++$ state-9 [%9 state] :: $state: overall gall state :: :: system-duct: TODO document @@ -45,6 +45,8 @@ :: $yoke: agent runner state :: :: control-duct: TODO document +:: run-nonce: unique for each rebuild +:: sub-nonce: app-wide global %watch nonce :: live: is this agent running? TODO document better :: stats: TODO document :: watches: incoming and outgoing subscription state @@ -54,8 +56,9 @@ :: +$ yoke $: control-duct=duct - nonce=@t - live=? ::TODO remove, replaced by -.agent + run-nonce=@t + sub-nonce=_1 + live=? =stats =watches agent=(each agent vase) @@ -108,6 +111,7 @@ %poke %leave %missing + %cork == :: |migrate: data structures for upgrades :: @@ -116,7 +120,7 @@ :: $spore: structures for update, produced by +stay :: +$ spore - $: %8 + $: %9 system-duct=duct outstanding=(map [wire duct] (qeu remote-request)) contacts=(set ship) @@ -127,7 +131,8 @@ :: +$ egg $: control-duct=duct - nonce=@t + run-nonce=@t + sub-nonce=@ live=? =stats =watches @@ -164,7 +169,7 @@ [^duct %pass /whiz/gall %$ %whiz ~]~ =/ adult adult-core =. state.adult - [%8 system-duct outstanding contacts yokes=~ blocked]:spore + [%9 system-duct outstanding contacts yokes=~ blocked]:spore =/ mo-core (mo-abed:mo:adult duct) =. mo-core =/ apps=(list [dap=term =egg]) ~(tap by eggs.spore) @@ -223,9 +228,9 @@ :: ++ load |^ |= old=spore-any - =? old ?=(%7 -.old) - (spore-7-to-8 old) - ?> ?=(%8 -.old) + =? old ?=(%7 -.old) (spore-7-to-8 old) + =? old ?=(%8 -.old) (spore-8-to-9 old) + ?> ?=(%9 -.old) =. spore old ?. =(~ eggs.spore) pupal-gate @@ -234,32 +239,79 @@ state spore(eggs *(map term yoke)) == :: - +$ spore-any $%(^spore spore-7) + +$ spore-any $%(^spore spore-8 spore-7) +$ spore-7 $: %7 wipe-eyre-subs=_| ::NOTE band-aid for #3196 system-duct=duct outstanding=(map [wire duct] (qeu remote-request)) contacts=(set ship) - eggs=(map term egg) + eggs=(map term egg-7) blocked=(map term (qeu blocked-move)) == :: + +$ spore-8 + $: %8 + system-duct=duct + outstanding=(map [wire duct] (qeu remote-request)) + contacts=(set ship) + eggs=(map term egg-8) + blocked=(map term (qeu blocked-move)) + == + :: + +$ egg-7 egg-8 + +$ egg-8 + $: control-duct=duct + run-nonce=@t + live=? + =stats + watches=watches-8 + old-state=(each vase vase) + =beak + marks=(map duct mark) + == + :: + +$ watches-8 [inbound=bitt outbound=boat-8] + +$ boat-8 (map [wire ship term] [acked=? =path]) + :: ++ spore-7-to-8 |= old=spore-7 - ^- ^spore + ^- spore-8 :- %8 =. eggs.old %- ~(urn by eggs.old) - |= [a=term e=egg] + |= [a=term e=egg-7] ::NOTE kiln will kick off appropriate app revival e(old-state [%| p.old-state.e]) +>.old + :: + ++ spore-8-to-9 + |= old=spore-8 + ^- ^spore + =- old(- %9, eggs -) + %- ~(run by eggs.old) + |= =egg-8 + ^- egg + :* control-duct.egg-8 + run-nonce.egg-8 + sub-nonce=0 + live.egg-8 + stats.egg-8 + [inbound.watches.egg-8 (boat-8-to-9 outbound.watches.egg-8)] + [old-state beak marks]:egg-8 + == + :: + ++ boat-8-to-9 + |= =boat-8 + ^- boat + %- ~(run by boat-8) + |= [acked=? =path] + [acked path nonce=0] -- -- :: adult gall vane interface, for type compatibility with pupa :: -=| state=state-8 +=| state=state-9 |= [now=@da eny=@uvJ rof=roof] =* gall-payload . =< ~% %gall-wrap ..mo ~ @@ -357,7 +409,7 @@ control-duct hen beak bek agent &+agent - nonce (scot %uw (end 5 (shas %yoke-nonce eny))) + run-nonce (scot %uw (end 5 (shas %yoke-nonce eny))) == :: =/ old mo-core @@ -457,8 +509,12 @@ =. outstanding.state =/ stand (~(gut by outstanding.state) [wire hen] *(qeu remote-request)) - (~(put by outstanding.state) [wire hen] (~(put to stand) -.deal)) - (mo-pass wire note-arvo) + %+ ~(put by outstanding.state) [wire hen] + (~(gas to stand) ?.(?=(%leave -.deal) ~[-.deal] ~[%leave %cork])) + =. mo-core (mo-pass wire note-arvo) + ?. ?=(%leave -.deal) + mo-core + (mo-pass wire [%a [%cork ship]]) :: +mo-track-ship: subscribe to ames and jael for notices about .ship :: ++ mo-track-ship @@ -680,7 +736,12 @@ (~(put to *(qeu remote-request)) %missing) ~| [full-wire=full-wire hen=hen stand=stand] =^ rr stand ~(get to stand) - [rr (~(put by outstanding.state) [full-wire hen] stand)] + ~? &(=(rr %cork) ?=(^ stand)) + [%outstanding-queue-not-empty wire hen] + :- rr + ?: ?=(%cork rr) + (~(del by outstanding.state) [full-wire hen]) + (~(put by outstanding.state) [full-wire hen] stand) :: non-null case of wire is old, remove on next breach after :: 2019/12 :: @@ -696,6 +757,7 @@ %watch (mo-give %unto %watch-ack err) %poke (mo-give %unto %poke-ack err) %leave mo-core + %cork mo-core %missing (mo-give:(mo-give %unto %watch-ack err) %unto %poke-ack err) == :: @@ -738,7 +800,7 @@ ?~ yoke %- (slog leaf+"gall: {} dead, got {<+<.sign-arvo>}" ~) mo-core - ?. =(nonce.u.yoke i.t.wire) + ?. =(run-nonce.u.yoke i.t.wire) %- (slog leaf+"gall: got old {<+<.sign-arvo>} for {}" ~) mo-core ?. ?=([?(%gall %behn) %unto *] sign-arvo) @@ -1036,7 +1098,7 @@ :: ++ ap-nuke ^+ ap-core - =/ out=(list [[=wire =ship =term] ? =path]) + =/ out=(list [[=wire =ship =term] ? =path nonce=@]) ~(tap by outbound.watches.yoke) =/ inbound-paths=(set path) %- silt @@ -1049,7 +1111,7 @@ ~ [%give %kick ~(tap in inbound-paths) ~]~ %+ turn ~(tap by outbound.watches.yoke) - |= [[=wire =ship =term] ? =path] + |= [[=wire =ship =term] ? =path nonce=@] [%pass wire %agent [ship term] %leave ~] =^ maybe-tang ap-core (ap-ingest ~ |.([will *agent])) ap-core @@ -1133,12 +1195,13 @@ tang.neet == =. wire + :^ %use agent-name run-nonce.yoke ?- -.neet %agent [%out (scot %p ship.neet) name.neet wire] %huck [%out (scot %p ship.neet) name.neet wire] %arvo [(scot %p attributing.agent-routes) wire] == - =. wire [%use agent-name nonce.yoke wire] + :: =/ =note-arvo ?- -.neet %arvo note-arvo.neet @@ -1161,7 +1224,7 @@ core(agent-duct agent-duct) $(in t.in) :: - =/ out=(list [[=wire =^ship =term] ? =path]) + =/ out=(list [[=wire =^ship =term] ? =path nonce=@]) ~(tap by outbound.watches.yoke) |- ^+ ap-core ?~ out @@ -1169,7 +1232,8 @@ =? ap-core =(ship ship.i.out) =/ core =. agent-duct system-duct.state - =/ way [%out (scot %p ship) term.i.out wire.i.out] + =/ way + [%out (scot %p ship) term.i.out (scot %ud nonce.i.out) wire.i.out] (ap-specific-take way %kick ~) core(agent-duct agent-duct) $(out t.out) @@ -1284,15 +1348,6 @@ ?: ?=(%& -.res) ``want^p.res ((slog leaf+"peek failed tube from {(trip have)} to {(trip want)}" ~) ~) - :: +ap-update-subscription: update subscription. - :: - ++ ap-update-subscription - ~/ %ap-update-subscription - |= [is-ok=? =other=ship other-agent=term =wire] - ^+ ap-core - ?: is-ok - ap-core - (ap-kill-down wire [other-ship other-agent]) :: +ap-move: send move :: ++ ap-move @@ -1404,6 +1459,7 @@ =/ other-agent i.t.t.wire =/ =dock [other-ship other-agent] =/ agent-wire t.t.t.wire + =/ nonce=@ 0 :: =^ =sign:agent ap-core ?. ?=(%raw-fact -.unto) @@ -1424,48 +1480,107 @@ %- ap-move :_ ~ :^ hen %pass /nowhere [%c %warp our q.beak.yoke ~ %sing %b case /[mark.unto]] - :: - :: if subscription ack or close, handle before calling user code - :: - =? outbound.watches.yoke ?=(%kick -.sign) - %- ~(del by outbound.watches.yoke) - [agent-wire dock] - ?: ?& ?=(%watch-ack -.sign) - !(~(has by outbound.watches.yoke) [agent-wire dock]) - == - %- %: slog - leaf+"{}: got ack for nonexistent subscription" - leaf+"{}: {}" - >wire=wire< - ~ + |^ ^+ ap-core + :: %poke-ack has no nonce; ingest directly + :: + ?: ?=(%poke-ack -.sign) + ingest-and-check-error + :: if .agent-wire matches, it's an old pre-nonce subscription + :: + ?: (~(has by outbound.watches.yoke) sub-key) + run-sign + :: if an app happened to use a null wire, no-op + :: + ?: =(~ agent-wire) + on-missing + =/ has-nonce=(unit @ud) (slaw %ud (head agent-wire)) + ?: &(?=(~ has-nonce) ?=(%kick -.sign)) + on-weird-kick + :: pop nonce off .agent-wire and match against stored subscription + :: + ?> ?=(^ has-nonce) + =: nonce u.has-nonce + agent-wire (tail agent-wire) == - ap-core + =/ got (~(get by outbound.watches.yoke) sub-key) + ?~ got + on-missing + ?. =(nonce.u.got nonce) + (on-bad-nonce nonce.u.got) + run-sign :: - =? outbound.watches.yoke ?=(%watch-ack -.sign) - ?^ p.sign - %- ~(del by outbound.watches.yoke) - [agent-wire dock] - %+ ~(jab by outbound.watches.yoke) [agent-wire dock] - |= [acked=? =path] - =. . - ?. acked - . - %- =/ =tape - "{}: received 2nd watch-ack on {}" - (slog leaf+tape ~) - . - [& path] + ++ sub-key [agent-wire dock] + ++ ingest (ap-ingest ~ |.((on-agent:ap-agent-core agent-wire sign))) + ++ run-sign + ?- -.sign + %poke-ack !! + %fact + =^ tan ap-core ingest + ?~ tan ap-core + =. ap-core (ap-kill-down sub-key) + (ap-error -.sign leaf/"take %fact failed, closing subscription" u.tan) + :: + %kick + =. outbound.watches.yoke + (~(del by outbound.watches.yoke) sub-key) + :: + ingest-and-check-error + :: + %watch-ack + ?. (~(has by outbound.watches.yoke) sub-key) + %- %: slog + leaf+"{}: got ack for nonexistent subscription" + leaf+"{}: {}" + >wire=wire< + ~ + == + ap-core + =. outbound.watches.yoke + ?^ p.sign + (~(del by outbound.watches.yoke) sub-key) + :: + %+ ~(jab by outbound.watches.yoke) sub-key + |= val=[acked=? =path nonce=@] + =? . acked.val + %.(. (slog leaf+"{} 2nd watch-ack on {}" ~)) + val(acked &) + :: + ingest-and-check-error + == :: - =^ maybe-tang ap-core - %+ ap-ingest ~ |. - (on-agent:ap-agent-core agent-wire sign) - :: if failed %fact handling, kill subscription + ++ on-missing + %. ap-core + %- slog :~ + leaf+"{}: got {<-.sign>} for nonexistent subscription" + leaf+"{}: {<[nonce=nonce agent-wire]>}" + >wire=wire< + == :: - =? ap-core ?=(%fact -.sign) - (ap-update-subscription =(~ maybe-tang) p.dock q.dock agent-wire) - ?^ maybe-tang - (ap-error -.sign leaf/"closing subscription" u.maybe-tang) - ap-core + ++ on-weird-kick + %. run-sign + %- slog :~ + leaf+"{}: got %kick for nonexistent subscription" + leaf+"{}: {}" + >wire=wire< + == + :: + ++ on-bad-nonce + |= stored-nonce=@ + %. ap-core + %- slog :~ + =/ nonces [expected=stored-nonce got=nonce] + =/ ok |(?=(?(%fact %kick) -.sign) =(~ p.sign)) + leaf+"{}: stale %watch-ack {} ok={}" + :: + leaf+"{}: {}" + >wire=wire< + == + :: + ++ ingest-and-check-error + ^+ ap-core + =^ tan ap-core ingest + ?~(tan ap-core (ap-error -.sign leaf/"take {<-.sign>} failed" u.tan)) + -- :: +ap-install: install wrapper. :: ++ ap-install @@ -1656,34 +1771,53 @@ ?: ?=([* %pass * %g %deal * * %leave *] move) =/ =wire p.move.move ?> ?=([%use @ @ %out @ @ *] wire) - =/ short-wire t.t.t.t.t.t.wire - =/ =dock [q.p q]:q.move.move + =/ =dock [q.p q]:q.move.move + =/ sys-wire=^wire (scag 6 `^wire`wire) + =/ sub-wire=^wire (slag 6 `^wire`wire) + :: + ?. (~(has by outbound.watches.yoke) sub-wire dock) + =; =tang + %- (slog tang) + $(moves t.moves) + [leaf+"gall: {} missing subscription, got %leave"]~ + =/ have=[acked=? =path nonce=@] + (~(got by outbound.watches.yoke) sub-wire dock) + =. p.move.move + (weld sys-wire [(scot %ud nonce.have) sub-wire]) =. outbound.watches.yoke - (~(del by outbound.watches.yoke) [short-wire dock]) + (~(del by outbound.watches.yoke) [sub-wire dock]) $(moves t.moves, new-moves [move new-moves]) ?. ?=([* %pass * %g %deal * * ?(%watch %watch-as) *] move) $(moves t.moves, new-moves [move new-moves]) =/ =wire p.move.move ?> ?=([%use @ @ %out @ @ *] wire) - =/ short-wire t.t.t.t.t.t.wire - =/ =dock [q.p q]:q.move.move - =/ =path - ?- -.r.q.move.move - %watch path.r.q.move.move - %watch-as path.r.q.move.move - == - ?: (~(has by outbound.watches.yoke) short-wire dock) + =/ sys-wire=^wire (scag 6 `^wire`wire) + =/ sub-wire=^wire (slag 6 `^wire`wire) + =/ [=dock =deal] [[q.p q] r]:q.move.move + :: + ?: (~(has by outbound.watches.yoke) sub-wire dock) =. ap-core =/ =tang - ~[leaf+"subscribe wire not unique" >agent-name< >short-wire< >dock<] + ~[leaf+"subscribe wire not unique" >agent-name< >sub-wire< >dock<] =/ have - (~(got by outbound.watches.yoke) short-wire dock) + (~(got by outbound.watches.yoke) sub-wire dock) %- (slog >out=have< tang) (ap-error %watch-not-unique tang) :: reentrant, maybe bad? $(moves t.moves) - =. outbound.watches.yoke - (~(put by outbound.watches.yoke) [short-wire dock] [| path]) - $(moves t.moves, new-moves [move new-moves]) + :: + =. p.move.move + (weld sys-wire [(scot %ud sub-nonce.yoke) sub-wire]) + %_ $ + moves t.moves + new-moves [move new-moves] + sub-nonce.yoke +(sub-nonce.yoke) + :: + outbound.watches.yoke + %+ ~(put by outbound.watches.yoke) [sub-wire dock] + :+ acked=| + path=?+(-.deal !! %watch path.deal, %watch-as path.deal) + sub-nonce.yoke + == -- -- :: +call: request diff --git a/pkg/base-dev/lib/dbug.hoon b/pkg/base-dev/lib/dbug.hoon index ce98619e85..c61286669d 100644 --- a/pkg/base-dev/lib/dbug.hoon +++ b/pkg/base-dev/lib/dbug.hoon @@ -84,7 +84,7 @@ %+ sort ~(tap by wex.bowl) |= [[[a=wire *] *] [[b=wire *] *]] (aor a b) - |= [[=wire =ship =term] [acked=? =path]] + |= [[=wire =ship =term] [acked=? =path nonce=@]] ^- (unit tank) =; relevant=? ?. relevant ~