diff --git a/bin/solid.pill b/bin/solid.pill index a79fa3353c..f30f656060 100644 --- a/bin/solid.pill +++ b/bin/solid.pill @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:c45166ff0f8ab8dc1552bcef519c77c0afa6ca52f8ed1ba31ed632012667d619 -size 8674763 +oid sha256:cab0dd267cc5c17eb0d0164e876556ae7975fd5db59a738d741ecd767cca8594 +size 8760359 diff --git a/pkg/arvo/gen/hood/gall-sift.hoon b/pkg/arvo/gen/hood/gall-sift.hoon new file mode 100644 index 0000000000..9f02165ee6 --- /dev/null +++ b/pkg/arvo/gen/hood/gall-sift.hoon @@ -0,0 +1,8 @@ +:: Helm: Set Gall Verbosity by Agent +:: +/? 310 +:: +:- %say +|= [^ dudes=(list dude:gall) ~] +:- %helm-gall-sift +dudes diff --git a/pkg/arvo/gen/hood/gall-verb.hoon b/pkg/arvo/gen/hood/gall-verb.hoon new file mode 100644 index 0000000000..32b4457014 --- /dev/null +++ b/pkg/arvo/gen/hood/gall-verb.hoon @@ -0,0 +1,11 @@ +:: Helm: Adjust Gall verbosity +:: +:: List of diagnostic flags is in verb:gall in zuse.hoon, documented in +:: gall.hoon +:: +/? 310 +:: +:- %say +|= [^ veb=(list verb:gall) ~] +:- %helm-gall-verb +veb diff --git a/pkg/arvo/lib/hood/drum.hoon b/pkg/arvo/lib/hood/drum.hoon index 0ec1b49590..649e48e254 100644 --- a/pkg/arvo/lib/hood/drum.hoon +++ b/pkg/arvo/lib/hood/drum.hoon @@ -77,7 +77,8 @@ :: ++ de-gill :: gill from wire |= way=wire ^- gill:gall - ?>(?=([@ @ ~] way) [(slav %p i.way) i.t.way]) + ~| way + ?>(?=([@ @ *] way) [(slav %p i.way) i.t.way]) -- :: TODO: remove .ost :: diff --git a/pkg/arvo/lib/hood/helm.hoon b/pkg/arvo/lib/hood/helm.hoon index 3fb38c7806..b0a8ba926c 100644 --- a/pkg/arvo/lib/hood/helm.hoon +++ b/pkg/arvo/lib/hood/helm.hoon @@ -230,6 +230,14 @@ |= veb=(list verb:ames) =< abet (emit %pass /helm %arvo %a %spew veb) :: +++ poke-gall-sift + |= dudes=(list dude:gall) =< abet + (emit %pass /helm %arvo %g %sift dudes) +:: +++ poke-gall-verb + |= veb=(list verb:gall) =< abet + (emit %pass /helm %arvo %g %spew veb) +:: ++ poke-ames-wake |= ~ =< abet (emit %pass /helm %arvo %a %stir '') @@ -269,6 +277,8 @@ %helm-code =;(f (f !<(_+<.f vase)) poke-code) %helm-cors-approve =;(f (f !<(_+<.f vase)) poke-cors-approve) %helm-cors-reject =;(f (f !<(_+<.f vase)) poke-cors-reject) + %helm-gall-sift =;(f (f !<(_+<.f vase)) poke-gall-sift) + %helm-gall-verb =;(f (f !<(_+<.f vase)) poke-gall-verb) %helm-hi =;(f (f !<(_+<.f vase)) poke-hi) %helm-knob =;(f (f !<(_+<.f vase)) poke-knob) %helm-pans =;(f (f !<(_+<.f vase)) poke-pans) diff --git a/pkg/arvo/sys/lull.hoon b/pkg/arvo/sys/lull.hoon index 827cad33dd..41c3c9c41d 100644 --- a/pkg/arvo/sys/lull.hoon +++ b/pkg/arvo/sys/lull.hoon @@ -351,6 +351,7 @@ :: %hear: packet from unix :: %heed: track peer's responsiveness; gives %clog if slow :: %jilt: stop tracking peer's responsiveness + :: %cork: request to delete message flow :: %plea: request to send message :: :: System and Lifecycle Tasks @@ -367,6 +368,7 @@ $% [%hear =lane =blob] [%heed =ship] [%jilt =ship] + [%cork =ship] $>(%plea vane-task) :: $>(%born vane-task) @@ -511,6 +513,9 @@ :: entry and emit a nack to the local vane that asked us to send :: the message. :: heeds: listeners for %clog notifications + :: closing: bones closed on the sender side + :: corked: bones closed on both sender and receiver + :: krocs: bones that need to be sent again to the publisher :: +$ peer-state $: $: =symmetric-key @@ -526,6 +531,9 @@ rcv=(map bone message-sink-state) nax=(set [=bone =message-num]) heeds=(set duct) + closing=(set bone) + corked=(set bone) + krocs=(set bone) == :: $qos: quality of service; how is our connection to a peer doing? :: @@ -1655,11 +1663,12 @@ $>(%trim vane-task) :: trim state $>(%vega vane-task) :: report upgrade $>(%plea vane-task) :: network request + [%spew veb=(list verb)] :: set verbosity + [%sift dudes=(list dude)] :: per agent == :: +$ bitt (map duct (pair ship path)) :: incoming subs - +$ boat :: outgoing subs - %+ map [=wire =ship =term] :: - [acked=? =path] :: + +$ boat (map [=wire =ship =term] [acked=? =path]) :: outgoing subs + +$ boar (map [=wire =ship =term] nonce=@) :: and their nonces +$ bowl :: standard app state $: $: our=ship :: host src=ship :: guest @@ -1695,6 +1704,9 @@ $% [%raw-fact =mark =noun] sign:agent == + :: TODO: add more flags? + :: + +$ verb ?(%odd) :: :: +agent: app core :: diff --git a/pkg/arvo/sys/vane/ames.hoon b/pkg/arvo/sys/vane/ames.hoon index b2b4f8c9bc..e583634f5c 100644 --- a/pkg/arvo/sys/vane/ames.hoon +++ b/pkg/arvo/sys/vane/ames.hoon @@ -598,6 +598,26 @@ :: +$ naxplanation [=message-num =error] :: ++| %statics +:: +:: $ames-state: state for entire vane +:: +:: peers: states of connections to other ships +:: unix-duct: handle to give moves to unix +:: life: our $life; how many times we've rekeyed +:: crypto-core: interface for encryption and signing +:: bug: debug printing configuration +:: corks: wires for cork flows pending publisher update +:: ++$ ames-state + $: peers=(map ship ship-state) + =unix=duct + =life + crypto-core=acru:ames + =bug + corks=(set wire) + == +:: +$ ames-state-4 ames-state-5 +$ ames-state-5 $: peers=(map ship ship-state-5) @@ -628,17 +648,36 @@ heeds=(set duct) == :: -+| %statics ++$ ames-state-6 + $: peers=(map ship ship-state-6) + =unix=duct + =life + crypto-core=acru:ames + =bug + == :: -:: $ames-state: state for entire vane ++$ ship-state-6 + $% [%alien alien-agenda] + [%known peer-state-6] + == :: -:: peers: states of connections to other ships -:: unix-duct: handle to give moves to unix -:: life: our $life; how many times we've rekeyed -:: crypto-core: interface for encryption and signing -:: bug: debug printing configuration ++$ peer-state-6 + $: $: =symmetric-key + =life + =rift + =public-key + sponsor=ship + == + route=(unit [direct=? =lane]) + =qos + =ossuary + snd=(map bone message-pump-state) + rcv=(map bone message-sink-state) + nax=(set [=bone =message-num]) + heeds=(set duct) + == :: -+$ ames-state ++$ ames-state-7 $: peers=(map ship ship-state) =unix=duct =life @@ -730,12 +769,16 @@ :: $message-pump-gift: effect from |message-pump :: :: %done: report message acknowledgment +:: %cork: kill flow +:: %kroc: recork this bone :: %send: emit message fragment :: %wait: set a new timer at .date :: %rest: cancel timer at .date :: +$ message-pump-gift $% [%done =message-num error=(unit error)] + [%cork ~] + [%kroc =bone] [%send =static-fragment] [%wait date=@da] [%rest date=@da] @@ -774,7 +817,7 @@ :: .ok: %.y unless previous failed attempt :: +$ message-sink-task - $% [%done ok=?] + $% [%done ok=? cork=?] [%drop =message-num] [%hear =lane =shut-packet ok=?] == @@ -786,6 +829,7 @@ +$ message-sink-gift $% [%memo =message-num message=*] [%send =message-num =ack-meat] + [%cork ~] == -- :: external vane interface @@ -795,7 +839,7 @@ :: =< =* adult-gate . =| queued-events=(qeu queued-event) - =| cached-state=(unit [%5 ames-state-5]) + =| cached-state=(unit $%([%5 ames-state-5] [%6 ames-state-6] [%7 ames-state-7])) :: |= [now=@da eny=@ rof=roof] =* larval-gate . @@ -821,7 +865,9 @@ :: ?: &(?=(^ cached-state) ?=(~ queued-events)) =^ moves adult-gate (call:adult-core duct dud task) - (molt moves) + %- molt + ~> %slog.0^leaf/"ames: init daily recork timer" + :_(moves [duct %pass /recork %b %wait `@da`(add now ~d1)]) :: %born: set .unix-duct and start draining .queued-events :: ?: ?=(%born -.task) @@ -903,9 +949,12 @@ :: .queued-events has been cleared; metamorphose :: ?~ queued-events - ?: ?=(^ cached-state) (molt moves) - ~> %slog.0^leaf/"ames: metamorphosis" - [moves adult-gate] + ?. ?=(^ cached-state) + ~> %slog.0^leaf/"ames: metamorphosis" + [moves adult-gate] + %- molt + ~> %slog.0^leaf/"ames: init daily recork timer" + :_(moves [duct %pass /recork %b %wait `@da`(add now ~d1)]) :: set timer to drain next event :: =. moves :_(moves [duct %pass /larva %b %wait now]) @@ -913,7 +962,7 @@ :: lifecycle arms; mostly pass-throughs to the contained adult ames :: ++ scry scry:adult-core - ++ stay [%6 %larva queued-events ames-state.adult-gate] + ++ stay [%8 %larva queued-events ames-state.adult-gate] ++ load |= $= old $% $: %4 @@ -931,6 +980,20 @@ [%adult state=ames-state-5] == == $: %6 + $% $: %larva + events=(qeu queued-event) + state=ames-state-6 + == + [%adult state=ames-state-6] + == == + $: %7 + $% $: %larva + events=(qeu queued-event) + state=ames-state-7 + == + [%adult state=ames-state-7] + == == + $: %8 $% $: %larva events=(qeu queued-event) state=_ames-state.adult-gate @@ -955,22 +1018,47 @@ =. queued-events events.old larval-gate :: - [%6 %adult *] (load:adult-core %6 state.old) + [%6 %adult *] + =. cached-state `[%6 state.old] + ~> %slog.0^leaf/"ames: larva reload" + larval-gate :: [%6 %larva *] ~> %slog.0^leaf/"ames: larva: load" =. queued-events events.old - =. adult-gate (load:adult-core %6 state.old) larval-gate + :: + [%7 %adult *] + =. cached-state `[%7 state.old] + ~> %slog.0^leaf/"ames: larva reload" + larval-gate + :: + [%7 %larva *] + ~> %slog.0^leaf/"ames: larva: load" + =. queued-events events.old + larval-gate + :: + [%8 %adult *] (load:adult-core %8 state.old) + :: + [%8 %larva *] + ~> %slog.1^leaf/"ames: larva: load" + =. queued-events events.old + =. adult-gate (load:adult-core %8 state.old) + larval-gate + :: == :: +molt: re-evolve to adult-ames :: ++ molt |= moves=(list move) ^- (quip move _adult-gate) + =? cached-state &(?=(^ cached-state) ?=(%5 +<.cached-state)) + `%6^(state-5-to-6:load:adult-core +.u.cached-state) + =? cached-state &(?=(^ cached-state) ?=(%6 +<.cached-state)) + `%7^(state-6-to-7:load:adult-core +.u.cached-state) =. ames-state.adult-gate - ?> ?=(^ cached-state) - (state-5-to-6:load:adult-core +.u.cached-state) + ?> &(?=(^ cached-state) ?=(%7 +<.cached-state)) + (state-7-to-8:load:adult-core +.u.cached-state) =. cached-state ~ ~> %slog.0^leaf/"ames: metamorphosis reload" [moves adult-gate] @@ -1015,6 +1103,7 @@ %trim on-trim:event-core %vega on-vega:event-core %plea (on-plea:event-core [ship plea]:task) + %cork (on-cork:event-core ship.task) == :: [moves ames-gate] @@ -1045,20 +1134,23 @@ [moves ames-gate] :: +stay: extract state before reload :: -++ stay [%6 %adult ames-state] +++ stay [%8 %adult ames-state] :: +load: load in old state after reload :: ++ load - =< |= old-state=[%6 ^ames-state] + =< |= $= old-state + $% [%8 ^ames-state] + == ^+ ames-gate - ?> ?=(%6 -.old-state) + ?> ?=(%8 -.old-state) ames-gate(ames-state +.old-state) + :: |% :: +state-4-to-5 called from larval-ames :: ++ state-4-to-5 |= ames-state=ames-state-4 - ^- ames-state-4 + ^- ames-state-5 =. peers.ames-state %- ~(run by peers.ames-state) |= ship-state=ship-state-4 @@ -1076,11 +1168,11 @@ :: ++ state-5-to-6 |= ames-state=ames-state-5 - ^- ^^ames-state + ^- ames-state-6 :_ +.ames-state %- ~(rut by peers.ames-state) |= [=ship ship-state=ship-state-5] - ^- ^ship-state + ^- ship-state-6 ?. ?=(%known -.ship-state) ship-state =/ peer-state=peer-state-5 +.ship-state @@ -1091,12 +1183,37 @@ ;; @ud =< q.q %- need %- need (rof ~ %j `beam`[[our %rift %da now] /(scot %p ship)]) - =/ =^peer-state - :_ +.peer-state - =, -.peer-state - [symmetric-key life rift public-key sponsor] + :- -.ship-state + :_ +.peer-state + =, -.peer-state + [symmetric-key life rift public-key sponsor] + :: +state-6-to-7 called from larval-ames + :: + ++ state-6-to-7 + |= ames-state=ames-state-6 + ^- ames-state-7 + :_ +.ames-state + %- ~(run by peers.ames-state) + |= ship-state=ship-state-6 ^- ^ship-state - [-.ship-state peer-state] + ?. ?=(%known -.ship-state) + ship-state + :- %known + ^- peer-state + :- +<.ship-state + [route qos ossuary snd rcv nax heeds ~ ~ ~]:ship-state + :: +state-7-to-8 called from larval-ames + :: + ++ state-7-to-8 + |= ames-state=ames-state-7 + ^- ^^ames-state + :* peers.ames-state + unix-duct.ames-state + life.ames-state + crypto-core.ames-state + bug.ames-state + *(set wire) + == -- :: +scry: dereference namespace :: @@ -1134,6 +1251,7 @@ :: /ax/peers/[ship]/forward-lane (list lane) :: /ax/bones/[ship] [snd=(set bone) rcv=(set bone)] :: /ax/snd-bones/[ship]/[bone] vase + :: /ax/corks (list wire) :: ?. ?=(%x ren) ~ ?+ tyl ~ @@ -1209,6 +1327,9 @@ =/ res u.mps ``noun+!>(!>(res)) + :: + [%corks ~] + ``noun+!>(~(tap in corks.ames-state)) == -- :: |per-event: inner event-handling core @@ -1271,13 +1392,15 @@ ++ send-ack |= =bone ^+ event-core - abet:(run-message-sink:peer-core bone %done ok=%.y) + =/ cork=? (~(has in closing.peer-state) bone) + abet:(run-message-sink:peer-core bone %done ok=%.y cork) :: failed; send message nack packet :: ++ send-nack |= [=bone =^error] ^+ event-core - =. event-core abet:(run-message-sink:peer-core bone %done ok=%.n) + =. event-core + abet:(run-message-sink:peer-core bone %done ok=%.n cork=%.n) =/ =^peer-state (got-peer-state her) =/ =^channel [[our her] now channel-state -.peer-state] :: construct nack-trace message, referencing .failed $message-num @@ -1638,7 +1761,35 @@ =/ sndr [our our-life.channel] =/ rcvr [ship her-life.channel] "plea {}" + :: since flow kill goes like: + :: client vane cork task -> client ames pass cork as plea -> + :: -> server ames sinks plea -> server ames +on-plea (we are here); + :: if it's %cork plea passed to ames from its sink, + :: give %done and process flow closing after +on-take-done call :: + ?: &(=(vane.plea %a) =(path.plea `path`/close) ?=(~ payload.plea)) + (emit duct %give %done ~) + abet:(on-memo:(make-peer-core peer-state channel) bone plea %plea) + :: +on-cork: handle request to kill a flow + :: + ++ on-cork + |= =ship + ^+ event-core + =/ ship-state (~(get by peers.ames-state) ship) + :: + ?> ?=([~ %known *] ship-state) + =/ =peer-state +.u.ship-state + =/ =channel [[our ship] now channel-state -.peer-state] + :: + =^ =bone ossuary.peer-state (bind-duct ossuary.peer-state duct) + =/ =plea [%$ /flow [%cork ~]] + :: + =. closing.peer-state (~(put in closing.peer-state) bone) + %- %^ trace msg.veb ship + |. ^- tape + =/ sndr [our our-life.channel] + =/ rcvr [ship her-life.channel] + "cork plea {}" abet:(on-memo:(make-peer-core peer-state channel) bone plea %plea) :: +on-take-wake: receive wakeup or error notification from behn :: @@ -1660,19 +1811,32 @@ event-core (request-attestation u.ship) :: - =/ res=(unit [her=ship =bone]) (parse-pump-timer-wire wire) - ?~ res - %- (slog leaf+"ames: got timer for strange wire: {}" ~) - event-core + |^ + ?. ?=([%recork ~] wire) (handle-single-wire wire) + =/ wires=(list ^wire) ~(tap in corks.ames-state) + |- ^+ event-core + ?^ wires + $(wires t.wires, event-core (handle-single-wire i.wires)) + (emit duct %pass /recork %b %wait `@da`(add now ~d1)) :: - =/ state=(unit peer-state) (get-peer-state her.u.res) - ?~ state - %- (slog leaf+"ames: got timer for strange ship: {}, ignoring" ~) - event-core - :: - =/ =channel [[our her.u.res] now channel-state -.u.state] - :: - abet:(on-wake:(make-peer-core u.state channel) bone.u.res error) + ++ handle-single-wire + |= =^wire + ^+ event-core + =/ res=(unit [her=ship =bone]) (parse-pump-timer-wire wire) + ?~ res + %- (slog leaf+"ames: got timer for strange wire: {}" ~) + event-core + :: + =/ state=(unit peer-state) (get-peer-state her.u.res) + ?~ state + %. event-core + %- slog + [leaf+"ames: got timer for strange ship: {}, ignoring" ~] + :: + =/ =channel [[our her.u.res] now channel-state -.u.state] + :: + abet:(on-wake:(make-peer-core u.state channel) bone.u.res error) + -- :: +on-init: first boot; subscribe to our info from jael :: ++ on-init @@ -1718,7 +1882,6 @@ :: ?- public-keys-result [%diff @ %rift *] - :: event-core (on-publ-rift [who to.diff]:public-keys-result) :: [%diff @ %keys *] @@ -1926,6 +2089,7 @@ :: =. qos.peer-state [%unborn now] =. life.peer-state life.point + =. rift.peer-state rift.point =. public-key.peer-state public-key =. symmetric-key.peer-state symmetric-key =. sponsor.peer-state @@ -2349,6 +2513,12 @@ =(1 current:(~(got by snd.peer-state) bone)) == (send-blob | her.channel (attestation-packet [her her-life]:channel)) + ?: (~(has in corked.peer-state) bone) + :: if the bone was corked the flow doesn't exist anymore + :: TODO: clean up corked bones in the peer state when it's _safe_? + :: (e.g. if this bone is N blocks behind the next one) + :: + peer-core :: maybe resend some timed out packets :: (run-message-pump bone %wake ~) @@ -2390,7 +2560,8 @@ =/ =message-pump-state (~(gut by snd.peer-state) bone *message-pump-state) :: - =/ message-pump (make-message-pump message-pump-state channel) + =/ close=? (~(has in closing.peer-state) bone) + =+ message-pump=(make-message-pump message-pump-state channel close bone) =^ pump-gifts message-pump-state (work:message-pump task) =. snd.peer-state (~(put by snd.peer-state) bone message-pump-state) :: process effects from |message-pump @@ -2401,6 +2572,8 @@ =. peer-core ?- -.gift %done (on-pump-done [message-num error]:gift) + %cork (on-pump-cork current.message-pump-state) + %kroc (on-pump-kroc bone:gift) %send (on-pump-send static-fragment.gift) %wait (on-pump-wait date.gift) %rest (on-pump-rest date.gift) @@ -2411,6 +2584,16 @@ ++ on-pump-done |= [=message-num error=(unit error)] ^+ peer-core + ?: ?& =(1 (end 0 bone)) + =(1 (end 0 (rsh 0 bone))) + (~(has in corked.peer-state) (mix 0b10 bone)) + == + %- %+ trace msg.veb + =/ dat [her.channel bone=bone message-num=message-num -.task] + |.("remove naxplanation flow {}") + =. snd.peer-state + (~(del by snd.peer-state) bone) + peer-core :: if odd bone, ack is on "subscription update" message; no-op :: ?: =(1 (end 0 bone)) @@ -2423,13 +2606,53 @@ =/ target-bone=^bone (mix 0b10 bone) :: (run-message-sink target-bone %drop message-num) + ?: &(close ?=(%near -.task)) + :: if the bone belongs to a closing flow and we got a naxplanation, + :: don't relay the ack to the client vane, and wait for the next try + :: + peer-core :: not a nack-trace bone; relay ack to client vane :: (emit (got-duct bone) %give %done error) + :: +on-pump-cork: kill flow on cork sender side + :: + ++ on-pump-cork + |= =message-num + ^+ peer-core + :: clear all packets from this message from the packet pump + :: + =. message-pump (run-packet-pump:message-pump %done message-num *@dr) + =/ =wire (make-pump-timer-wire her.channel bone) + =. corks.ames-state (~(del in corks.ames-state) wire) + =/ nack-bone=^bone (mix 0b10 bone) + =? rcv.peer-state (~(has by rcv.peer-state) nack-bone) + :: if the publisher was behind we remove nacks received on that bone + :: + (~(del by rcv.peer-state) nack-bone) + =. peer-state + =, peer-state + %_ peer-state + snd (~(del by snd) bone) + rcv (~(del by rcv) bone) + corked (~(put in corked) bone) + closing (~(del in closing) bone) + krocs (~(del in krocs) bone) + by-duct.ossuary (~(del by by-duct.ossuary) (got-duct bone)) + by-bone.ossuary (~(del by by-bone.ossuary) bone) + == + peer-core + :: +on-pump-krock: if we get a nack for a cork, add it to the recork set + :: + ++ on-pump-kroc + |= =^bone + ^+ peer-core + =. krocs.peer-state (~(put in krocs.peer-state) bone) + peer-core :: +on-pump-send: emit message fragment requested by |message-pump :: ++ on-pump-send - |=(f=static-fragment (send-shut-packet bone [message-num %& +]:f)) + |= f=static-fragment + (send-shut-packet bone [message-num %& +]:f) :: +on-pump-wait: relay |message-pump's set-timer request :: ++ on-pump-wait @@ -2454,13 +2677,15 @@ ++ run-message-sink |= [=bone task=message-sink-task] ^+ peer-core + ?: (~(has in corked.peer-state) bone) peer-core :: pass .task to the |message-sink and apply state mutations :: =/ =message-sink-state (~(gut by rcv.peer-state) bone *message-sink-state) :: =/ message-sink (make-message-sink message-sink-state channel) - =^ sink-gifts message-sink-state (work:message-sink task) + =/ closing=? (~(has in closing.peer-state) bone) + =^ sink-gifts message-sink-state (work:message-sink closing task) =. rcv.peer-state (~(put by rcv.peer-state) bone message-sink-state) :: process effects from |message-sink :: @@ -2471,8 +2696,32 @@ ?- -.gift %memo (on-sink-memo [message-num message]:gift) %send (on-sink-send [message-num ack-meat]:gift) + %cork on-sink-cork == $(sink-gifts t.sink-gifts) + :: +on-sink-cork: handle flow kill after server ames has taken %done + :: + ++ on-sink-cork + ^+ peer-core + =/ =message-pump-state + (~(gut by snd.peer-state) bone *message-pump-state) + =? peer-core ?=(^ next-wake.packet-pump-state.message-pump-state) + =* next-wake u.next-wake.packet-pump-state.message-pump-state + =/ =wire (make-pump-timer-wire her.channel bone) + :: resetting timer for boons + :: + (emit [/ames]~ %pass wire %b %rest next-wake) + =/ nax-bone=^bone (mix 0b10 bone) + =. peer-state + =, peer-state + %_ peer-state + rcv (~(del by rcv) bone) + snd (~(del by snd) bone) + corked (~(put in corked) bone) + closing (~(del in closing) bone) + krocs (~(del in krocs) bone) + == + peer-core :: +on-sink-send: emit ack packet as requested by |message-sink :: ++ on-sink-send @@ -2506,10 +2755,14 @@ ++ on-sink-boon |= [=message-num message=*] ^+ peer-core + ?: ?| (~(has in closing.peer-state) bone) + (~(has in corked.peer-state) bone) + == + peer-core :: send ack unconditionally :: =. peer-core (emit (got-duct bone) %give %boon message) - =. peer-core (run-message-sink bone %done ok=%.y) + =. peer-core (run-message-sink bone %done ok=%.y cork=%.n) :: ?. ?=([%hear * * ok=%.n] task) :: fresh boon; give message to client vane @@ -2547,52 +2800,91 @@ =+ ;; =naxplanation message :: ack nack-trace message (only applied if we don't later crash) :: - =. peer-core (run-message-sink bone %done ok=%.y) + =. peer-core (run-message-sink bone %done ok=%.y cork=%.n) :: flip .bone's second bit to find referenced flow :: =/ target-bone=^bone (mix 0b10 bone) :: notify |message-pump that this message got naxplained :: - (run-message-pump target-bone %near naxplanation) + =. peer-core (run-message-pump target-bone %near naxplanation) + :: + ?. (~(has in krocs.peer-state) target-bone) + peer-core + =/ =message-pump-state + (~(gut by snd.peer-state) target-bone *message-pump-state) + =/ message-pump + (make-message-pump message-pump-state channel %.y target-bone) + :: we don't process the gifts here and instead wait for + :: the timer to handle the %cork plea added to the pump + :: + =^ * message-pump-state + %- work:message-pump + %memo^(dedup-message (jim [%$ /flow [%cork ~]])) + =. snd.peer-state + (~(put by snd.peer-state) target-bone message-pump-state) + :: if we get a naxplanation for a %cork, the publisher is behind + :: receiving the OTA, so we set up a timer to retry in one day. + :: + %- %+ trace msg.veb + |.("old publisher, resend %cork on bone={} in ~d1") + =/ =wire (make-pump-timer-wire her.channel target-bone) + =. corks.ames-state (~(put in corks.ames-state) wire) + peer-core :: +on-sink-plea: handle request message received by |message-sink :: ++ on-sink-plea |= [=message-num message=*] ^+ peer-core + ?: ?| (~(has in closing.peer-state) bone) + (~(has in corked.peer-state) bone) + == + peer-core + |^ %- %+ trace msg.veb =/ dat [her.channel bone=bone message-num=message-num] |.("sink plea {}") :: is this the first time we're trying to process this message? :: - ?. ?=([%hear * * ok=%.n] task) - :: fresh plea; pass to client vane - :: - =+ ;; =plea message - :: - =/ =wire (make-bone-wire her.channel her-rift.channel bone) + ?: ?=([%hear * * ok=%.n] task) + :: we previously crashed on this message; send nack :: + nack-plea + :: fresh plea; pass to client vane + :: + =+ ;; =plea message + =/ =wire (make-bone-wire her.channel her-rift.channel bone) + :: + ?. =(vane.plea %$) ?+ vane.plea ~| %ames-evil-vane^our^her.channel^vane.plea !! %c (emit duct %pass wire %c %plea her.channel plea) %g (emit duct %pass wire %g %plea her.channel plea) %j (emit duct %pass wire %j %plea her.channel plea) == - :: we previously crashed on this message; send nack + :: a %cork plea is handled using %$ as the recipient vane to + :: account for publishers that still handle ames-to-ames %pleas :: - =. peer-core (run-message-sink bone %done ok=%.n) - :: also send nack-trace with blank .error for security + ?> &(?=([%cork *] payload.plea) ?=(%flow -.path.plea)) + =. closing.peer-state (~(put in closing.peer-state) bone) + (emit duct %pass wire %a %plea her.channel [%a /close ~]) :: - =/ nack-trace-bone=^bone (mix 0b10 bone) - =/ =naxplanation [message-num *error] - =/ =message-blob (jam naxplanation) - :: - (run-message-pump nack-trace-bone %memo message-blob) + ++ nack-plea + ^+ peer-core + =. peer-core (run-message-sink bone %done ok=%.n cork=%.n) + :: send nack-trace with blank .error for security + :: + =/ nack-trace-bone=^bone (mix 0b10 bone) + =/ =naxplanation [message-num *error] + =/ =message-blob (jam naxplanation) + :: + (run-message-pump nack-trace-bone %memo message-blob) + -- -- -- -- :: +make-message-pump: constructor for |message-pump :: ++ make-message-pump - |= [state=message-pump-state =channel] + |= [state=message-pump-state =channel closing=? =bone] =* veb veb.bug.channel =| gifts=(list message-pump-gift) :: @@ -2627,11 +2919,35 @@ %memo (on-memo message-blob.task) %wake (run-packet-pump %wake current.state) %hear - ?- -.ack-meat.task - %& (on-hear [message-num fragment-num=p.ack-meat]:task) - %| (on-done [message-num ?:(ok.p.ack-meat [%ok ~] [%nack ~])]:task) + ?- -.ack-meat.task + %& + (on-hear [message-num fragment-num=p.ack-meat]:task) + :: + %| + =/ cork=? + =/ top-live + (pry:packet-queue:*make-packet-pump live.packet-pump-state.state) + :: If we send a %cork and get an ack, we can know by + :: sequence number that the ack is for the %cork message + :: + ?& closing + ?=(^ top-live) + =(0 ~(wyt in unsent-messages.state)) + =(0 (lent unsent-fragments.state)) + =(1 ~(wyt by live.packet-pump-state.state)) + =(message-num:task message-num.key.u.top-live) + == + =* ack p.ack-meat.task + =? message-pump &(cork !ok.ack) (give [%kroc bone]) + =. message-pump + %- on-done + [[message-num:task ?:(ok.ack [%ok ~] [%nack ~])] cork] + ?. &(!ok.ack cork) message-pump + %. message-pump + %+ trace odd.veb + |.("got nack for %cork {}") == - %near (on-done [message-num %naxplanation error]:naxplanation.task) + %near (on-done [[message-num %naxplanation error]:naxplanation.task %&]) == :: +on-memo: handle request to send a message :: @@ -2663,10 +2979,11 @@ :: flows. :: ++ on-done - |= [=message-num =ack] + |= [[=message-num =ack] cork=?] ^+ message-pump :: unsent messages from the future should never get acked :: + ~| [message-num next.state] ?> (lth message-num next.state) :: ignore duplicate message acks :: @@ -2725,6 +3042,7 @@ ?- -.u.cur %ok =. message-pump (give %done current.state ~) + =? message-pump cork (give %cork ~) $(current.state +(current.state)) :: %nack @@ -3270,20 +3588,20 @@ :: +work: handle a $message-sink-task :: ++ work - |= task=message-sink-task + |= [closing=? task=message-sink-task] ^+ [gifts state] :: =- [(flop gifts) state] :: ?- -.task - %done (on-done ok.task) + %done (on-done ok.task cork.task) %drop (on-drop message-num.task) - %hear (on-hear [lane shut-packet ok]:task) + %hear (on-hear closing [lane shut-packet ok]:task) == :: +on-hear: receive message fragment, possibly completing message :: ++ on-hear - |= [=lane =shut-packet ok=?] + |= [closing=? =lane =shut-packet ok=?] ^+ message-sink :: we know this is a fragment, not an ack; expose into namespace :: @@ -3320,13 +3638,14 @@ :: doesn't happen for boons. :: ?: (lte seq last-heard.state) - ?: is-last-fragment - :: drop last packet since we don't know whether to ack or nack + ?: &(is-last-fragment !closing) + :: if not from a closing bone, drop last packet, + :: since we don't know whether to ack or nack :: %- %+ trace rcv.veb |. ^- tape =/ data - :* her.channel seq=seq + :* her.channel seq=seq bone=bone fragment-num=fragment-num num-fragments=num-fragments la=last-acked.state lh=last-heard.state == @@ -3336,7 +3655,9 @@ :: %- %+ trace rcv.veb |. =/ data - [seq=seq fragment-num=fragment-num num-fragments=num-fragments] + :* seq=seq fragment-num=fragment-num + num-fragments=num-fragments closing=closing + == "send ack-1 {}" (give %send seq %& fragment-num) :: last-heard |% ++| %helpers +:: +trace: print if .verb is set and we're tracking .dude +:: +++ trace + |= [verb=? =dude dudes=(set dude) print=tang] + ^+ same + ?. verb + same + ?. => [dude=dude dudes=dudes in=in] + ~+ |(=(~ dudes) (~(has in dudes) dude)) + same + (slog print) +:: +:: $bug: debug printing configuration +:: +:: veb: verbosity toggles +:: dudes: app filter; if ~, print for all +:: ++$ bug + $: veb=_veb-all-off + dudes=(set dude) + == +:: +| %main :: :: $move: Arvo-level move :: +$ move [=duct move=(wind note-arvo gift-arvo)] -:: $state-8: overall gall state, versioned +:: $state-9: overall gall state, versioned :: -+$ state-8 [%8 state] ++$ state-9 [%9 state] :: $state: overall gall state :: :: system-duct: TODO document @@ -22,6 +52,7 @@ :: contacts: other ships we're in communication with :: yokes: running agents :: blocked: moves to agents that haven't been started yet +:: bug: debug printing configuration :: +$ state $: system-duct=duct @@ -29,13 +60,8 @@ contacts=(set ship) yokes=(map term yoke) blocked=(map term (qeu blocked-move)) + =bug == -:: $watches: subscribers and publications -:: -:: TODO: rename this, to $ties? -:: TODO: rename $boat and $bitt and document -:: -+$ watches [inbound=bitt outbound=boat] :: $routes: new cuff; TODO: document :: +$ routes @@ -45,19 +71,26 @@ :: $yoke: agent runner state :: :: control-duct: TODO document -:: live: is this agent running? TODO document better +:: run-nonce: unique for each rebuild +:: sub-nonce: app-wide global %watch nonce +:: live: is this agent running? TODO document boarer :: stats: TODO document -:: watches: incoming and outgoing subscription state +:: bitt: incoming subscriptions +:: boat: outgoing subscriptions +:: boar: and their nonces :: agent: agent core :: beak: compilation source :: marks: mark conversion requests :: +$ yoke $: control-duct=duct - nonce=@t - live=? ::TODO remove, replaced by -.agent + run-nonce=@t + sub-nonce=_1 + live=? =stats - =watches + =bitt + =boat + =boar agent=(each agent vase) =beak marks=(map duct mark) @@ -108,6 +141,7 @@ %poke %leave %missing + %cork == :: |migrate: data structures for upgrades :: @@ -116,21 +150,25 @@ :: $spore: structures for update, produced by +stay :: +$ spore - $: %8 + $: %9 system-duct=duct outstanding=(map [wire duct] (qeu remote-request)) contacts=(set ship) eggs=(map term egg) blocked=(map term (qeu blocked-move)) + =bug == :: $egg: migratory agent state; $yoke with .old-state instead of .agent :: +$ egg $: control-duct=duct - nonce=@t + run-nonce=@t + sub-nonce=@ live=? =stats - =watches + =bitt + =boat + =boar old-state=(each vase vase) =beak marks=(map duct mark) @@ -139,6 +177,7 @@ :: pupal gall core, on upgrade :: =< =* adult-gate . + =| spore-tag=@ud =| =spore |= [now=@da eny=@uvJ rof=roof] =* pupal-gate . @@ -164,10 +203,12 @@ [^duct %pass /whiz/gall %$ %whiz ~]~ =/ adult adult-core =. state.adult - [%8 system-duct outstanding contacts yokes=~ blocked]:spore - =/ mo-core (mo-abed:mo:adult duct) + [%9 system-duct outstanding contacts yokes=~ blocked bug]:spore + =/ mo-core (mo-abed:mo:adult system-duct.state.adult) + =/ apps=(list [dap=term =egg]) ~(tap by eggs.spore) + :: upgrade %base apps and suspend others + :: =. mo-core - =/ apps=(list [dap=term =egg]) ~(tap by eggs.spore) |- ^+ mo-core ?~ apps mo-core ?. =(%base q.beak.egg.i.apps) @@ -185,6 +226,24 @@ (mean u.tan) ap-core $(apps t.apps, mo-core ap-abet:ap-core) + :: FIXME: ! kiln: %base not installed + :: ! /lib/hood/kiln/hoon:<[461 23].[461 42]> + :: ! /base/gall-lyv + :: kill subscriptions when upgrading to gall request queue fix + :: + :: =? mo-core (lth spore-tag %9) + :: |- ^+ mo-core + :: ?~ apps mo-core + :: ~> %slog.[0 leaf+"gall: +ap-kill-down {}"] + :: =/ ap-core (ap-abut:ap:mo-core i.apps) + :: =. ap-core + :: =/ boats=(list [=wire =dock]) + :: ~(tap in ~(key by boat.egg.i.apps)) + :: |- ^+ ap-core + :: ?~ boats ap-core + :: =/ [=wire =dock] i.boats + :: $(boats t.boats, ap-core (ap-kill-down:ap-core wire dock)) + :: $(apps t.apps, mo-core ap-abet:ap-core) =. mo-core (mo-subscribe-to-agent-builds:mo-core now) =^ moves adult-gate mo-abet:mo-core =? moves ?=(^ fec) (weld moves [u.fec]~) @@ -223,9 +282,10 @@ :: ++ load |^ |= old=spore-any - =? old ?=(%7 -.old) - (spore-7-to-8 old) - ?> ?=(%8 -.old) + =. spore-tag `@ud`-.old + =? old ?=(%7 -.old) (spore-7-to-8 old) + =? old ?=(%8 -.old) (spore-8-to-9 old) + ?> ?=(%9 -.old) =. spore old ?. =(~ eggs.spore) pupal-gate @@ -234,32 +294,78 @@ state spore(eggs *(map term yoke)) == :: - +$ spore-any $%(^spore spore-7) + +$ spore-any $%(^spore spore-8 spore-7) +$ spore-7 $: %7 wipe-eyre-subs=_| ::NOTE band-aid for #3196 system-duct=duct outstanding=(map [wire duct] (qeu remote-request)) contacts=(set ship) - eggs=(map term egg) + eggs=(map term egg-7) blocked=(map term (qeu blocked-move)) == :: + +$ spore-8 + $: %8 + system-duct=duct + outstanding=(map [wire duct] (qeu remote-request)) + contacts=(set ship) + eggs=(map term egg-8) + blocked=(map term (qeu blocked-move)) + == + :: + +$ egg-7 egg-8 + +$ egg-8 + $: control-duct=duct + run-nonce=@t + live=? + =stats + watches=watches-8 + old-state=(each vase vase) + =beak + marks=(map duct mark) + == + :: + +$ watches-8 [inbound=bitt outbound=boat-8] + +$ boat-8 (map [wire ship term] [acked=? =path]) + :: ++ spore-7-to-8 |= old=spore-7 - ^- ^spore + ^- spore-8 :- %8 =. eggs.old %- ~(urn by eggs.old) - |= [a=term e=egg] + |= [a=term e=egg-7] ::NOTE kiln will kick off appropriate app revival e(old-state [%| p.old-state.e]) +>.old + :: + ++ spore-8-to-9 + |= old=spore-8 + ^- ^spore + =- old(- %9, eggs -, blocked [blocked.old *bug]) + %- ~(run by eggs.old) + |= =egg-8 + ^- egg + =/ [=bitt =boat =boar] (watches-8-to-9 watches.egg-8) + :* control-duct.egg-8 + run-nonce.egg-8 + sub-nonce=0 + live.egg-8 + stats.egg-8 + bitt boat boar + [old-state beak marks]:egg-8 + == + :: + ++ watches-8-to-9 + |= watches-8 + ^- [bitt boat boar] + [inbound outbound (~(run by outbound) |=([acked=? =path] nonce=0))] -- -- :: adult gall vane interface, for type compatibility with pupa :: -=| state=state-8 +=| state=state-9 |= [now=@da eny=@uvJ rof=roof] =* gall-payload . =< ~% %gall-wrap ..mo ~ @@ -280,6 +386,12 @@ ++ mo ~% %gall-mo +> ~ |_ [hen=duct moves=(list move)] + :: + ++ trace + |= [verb=? =dude print=tang] + ^+ same + (^trace verb dude dudes.bug.state print) + :: :: +mo-abed: initialise state with the provided duct :: +mo-abet: finalize, reversing moves :: +mo-pass: prepend a standard %pass to the current list of moves @@ -357,7 +469,7 @@ control-duct hen beak bek agent &+agent - nonce (scot %uw (end 5 (shas %yoke-nonce eny))) + run-nonce (scot %uw (end 5 (shas %yoke-nonce eny))) == :: =/ old mo-core @@ -457,8 +569,12 @@ =. outstanding.state =/ stand (~(gut by outstanding.state) [wire hen] *(qeu remote-request)) - (~(put by outstanding.state) [wire hen] (~(put to stand) -.deal)) - (mo-pass wire note-arvo) + %+ ~(put by outstanding.state) [wire hen] + (~(gas to stand) ?.(?=(%leave -.deal) ~[-.deal] ~[%leave %cork])) + =. mo-core (mo-pass wire note-arvo) + ?. ?=(%leave -.deal) + mo-core + (mo-pass wire [%a [%cork ship]]) :: +mo-track-ship: subscribe to ames and jael for notices about .ship :: ++ mo-track-ship @@ -680,7 +796,12 @@ (~(put to *(qeu remote-request)) %missing) ~| [full-wire=full-wire hen=hen stand=stand] =^ rr stand ~(get to stand) - [rr (~(put by outstanding.state) [full-wire hen] stand)] + ~? &(=(rr %cork) ?=(^ stand)) + [%outstanding-queue-not-empty wire hen] + :- rr + ?: ?=(%cork rr) + (~(del by outstanding.state) [full-wire hen]) + (~(put by outstanding.state) [full-wire hen] stand) :: non-null case of wire is old, remove on next breach after :: 2019/12 :: @@ -696,6 +817,7 @@ %watch (mo-give %unto %watch-ack err) %poke (mo-give %unto %poke-ack err) %leave mo-core + %cork mo-core %missing (mo-give:(mo-give %unto %watch-ack err) %unto %poke-ack err) == :: @@ -738,7 +860,7 @@ ?~ yoke %- (slog leaf+"gall: {} dead, got {<+<.sign-arvo>}" ~) mo-core - ?. =(nonce.u.yoke i.t.wire) + ?. =(run-nonce.u.yoke i.t.wire) %- (slog leaf+"gall: got old {<+<.sign-arvo>} for {}" ~) mo-core ?. ?=([?(%gall %behn) %unto *] sign-arvo) @@ -962,6 +1084,28 @@ %d (mo-give %unto %raw-fact mark.ames-response noun.ames-response) %x (mo-give %unto %kick ~) == + :: +mo-spew: handle request to set verbosity toggles on debug output + :: + ++ mo-spew + |= verbs=(list verb) + ^+ mo-core + :: start from all %.n's, then flip requested toggles + :: + =. veb.bug.state + %+ roll verbs + |= [=verb acc=_veb-all-off] + ^+ veb.bug.state + ?- verb + %odd acc(odd %.y) + == + mo-core + :: +mo-sift: handle request to filter debug output by agent + :: + ++ mo-sift + |= dudes=(list dude) + ^+ mo-core + =. dudes.bug.state (sy dudes) + mo-core :: +ap: agent engine :: :: An inner, agent-level core. The sample refers to the agent we're @@ -976,6 +1120,12 @@ agent-config=(list (each suss tang)) =yoke == + :: + ++ trace + |= [verb=? print=tang] + ^+ same + (^trace verb agent-name print) + :: ++ ap-core . :: +ap-abed: initialise state for an agent, with the supplied routes. :: @@ -1036,11 +1186,9 @@ :: ++ ap-nuke ^+ ap-core - =/ out=(list [[=wire =ship =term] ? =path]) - ~(tap by outbound.watches.yoke) =/ inbound-paths=(set path) %- silt - %+ turn ~(tap by inbound.watches.yoke) + %+ turn ~(tap by bitt.yoke) |= [=duct =ship =path] path =/ will=(list card:agent:gall) @@ -1048,7 +1196,7 @@ ?: =(~ inbound-paths) ~ [%give %kick ~(tap in inbound-paths) ~]~ - %+ turn ~(tap by outbound.watches.yoke) + %+ turn ~(tap by boat.yoke) |= [[=wire =ship =term] ? =path] [%pass wire %agent [ship term] %leave ~] =^ maybe-tang ap-core (ap-ingest ~ |.([will *agent])) @@ -1133,12 +1281,13 @@ tang.neet == =. wire + :^ %use agent-name run-nonce.yoke ?- -.neet %agent [%out (scot %p ship.neet) name.neet wire] %huck [%out (scot %p ship.neet) name.neet wire] %arvo [(scot %p attributing.agent-routes) wire] == - =. wire [%use agent-name nonce.yoke wire] + :: =/ =note-arvo ?- -.neet %arvo note-arvo.neet @@ -1152,8 +1301,7 @@ ++ ap-breach |= =ship ^+ ap-core - =/ in=(list [=duct =^ship =path]) - ~(tap by inbound.watches.yoke) + =/ in=(list [=duct =^ship =path]) ~(tap by bitt.yoke) |- ^+ ap-core ?^ in =? ap-core =(ship ship.i.in) @@ -1161,15 +1309,19 @@ core(agent-duct agent-duct) $(in t.in) :: - =/ out=(list [[=wire =^ship =term] ? =path]) - ~(tap by outbound.watches.yoke) + =/ out=(list [[=wire =^ship =term] ? =path nonce=@]) + %+ turn ~(tap by boat.yoke) + |= [key=[wire ^ship term] val=[? path]] + :- key + val(+ [+.val (~(got by boar.yoke) key)]) |- ^+ ap-core ?~ out ap-core =? ap-core =(ship ship.i.out) =/ core =. agent-duct system-duct.state - =/ way [%out (scot %p ship) term.i.out wire.i.out] + =/ way + [%out (scot %p ship) term.i.out (scot %ud nonce.i.out) wire.i.out] (ap-specific-take way %kick ~) core(agent-duct agent-duct) $(out t.out) @@ -1184,8 +1336,7 @@ |= =ship ^+ ap-core :: - =/ in=(list [=duct =^ship =path]) - ~(tap by inbound.watches.yoke) + =/ in=(list [=duct =^ship =path]) ~(tap by bitt.yoke) |- ^+ ap-core ?~ in ap-core :: @@ -1206,7 +1357,7 @@ ?~ target-paths ?~ target-ship ~[agent-duct] - %+ murn ~(tap by inbound.watches.yoke) + %+ murn ~(tap by bitt.yoke) |= [=duct =ship =path] ^- (unit ^duct) ?: =(target-ship `ship) @@ -1221,7 +1372,7 @@ ++ ap-ducts-from-path |= [target-path=path target-ship=(unit ship)] ^- (list duct) - %+ murn ~(tap by inbound.watches.yoke) + %+ murn ~(tap by bitt.yoke) |= [=duct =ship =path] ^- (unit ^duct) ?: ?& =(target-path path) @@ -1284,15 +1435,6 @@ ?: ?=(%& -.res) ``want^p.res ((slog leaf+"peek failed tube from {(trip have)} to {(trip want)}" ~) ~) - :: +ap-update-subscription: update subscription. - :: - ++ ap-update-subscription - ~/ %ap-update-subscription - |= [is-ok=? =other=ship other-agent=term =wire] - ^+ ap-core - ?: is-ok - ap-core - (ap-kill-down wire [other-ship other-agent]) :: +ap-move: send move :: ++ ap-move @@ -1316,8 +1458,8 @@ attributing.agent-routes :: guest agent-name :: agent == :: - :* wex=outbound.watches.yoke :: outgoing - sup=inbound.watches.yoke :: incoming + :* wex=boat.yoke :: outgoing + sup=bitt.yoke :: incoming == :: :* act=change.stats.yoke :: tick eny=eny.stats.yoke :: nonce @@ -1352,9 +1494,8 @@ ~/ %ap-subscribe |= pax=path ^+ ap-core - =/ incoming [attributing.agent-routes pax] - =. inbound.watches.yoke - (~(put by inbound.watches.yoke) agent-duct incoming) + =/ incoming [attributing.agent-routes pax] + =. bitt.yoke (~(put by bitt.yoke) agent-duct incoming) =^ maybe-tang ap-core %+ ap-ingest %watch-ack |. (on-watch:ap-agent-core pax) @@ -1404,6 +1545,7 @@ =/ other-agent i.t.t.wire =/ =dock [other-ship other-agent] =/ agent-wire t.t.t.wire + =/ nonce=@ 0 :: =^ =sign:agent ap-core ?. ?=(%raw-fact -.unto) @@ -1424,48 +1566,106 @@ %- ap-move :_ ~ :^ hen %pass /nowhere [%c %warp our q.beak.yoke ~ %sing %b case /[mark.unto]] + |^ ^+ ap-core + :: %poke-ack has no nonce; ingest directly + :: + ?: ?=(%poke-ack -.sign) + ingest-and-check-error + :: if .agent-wire matches, it's an old pre-nonce subscription + :: + ?: (~(has by boat.yoke) sub-key) + run-sign + :: if an app happened to use a null wire, no-op + :: + ?: =(~ agent-wire) + on-missing + =/ has-nonce=(unit @ud) (slaw %ud (head agent-wire)) + ?: &(?=(~ has-nonce) ?=(%kick -.sign)) + on-weird-kick + :: pop nonce off .agent-wire and match against stored subscription + :: + ?> ?=(^ has-nonce) + =: nonce u.has-nonce + agent-wire (tail agent-wire) + == + ?~ got=(~(get by boar.yoke) sub-key) + on-missing + ?: =(nonce.u.got nonce) + run-sign + (on-bad-nonce nonce.u.got) :: - :: if subscription ack or close, handle before calling user code - :: - =? outbound.watches.yoke ?=(%kick -.sign) - %- ~(del by outbound.watches.yoke) - [agent-wire dock] - ?: ?& ?=(%watch-ack -.sign) - !(~(has by outbound.watches.yoke) [agent-wire dock]) - == - %- %: slog + ++ sub-key [agent-wire dock] + ++ ingest (ap-ingest ~ |.((on-agent:ap-agent-core agent-wire sign))) + ++ run-sign + ?- -.sign + %poke-ack !! + %fact + =^ tan ap-core ingest + ?~ tan ap-core + =. ap-core (ap-kill-down sub-key) + (ap-error -.sign leaf/"take %fact failed, closing subscription" u.tan) + :: + %kick + =: boar.yoke (~(del by boar.yoke) sub-key) + boat.yoke (~(del by boat.yoke) sub-key) + == + ingest-and-check-error + :: + %watch-ack + ?. (~(has by boat.yoke) sub-key) + %. ap-core + %+ trace odd.veb.bug.state :~ leaf+"{}: got ack for nonexistent subscription" leaf+"{}: {}" >wire=wire< - ~ == - ap-core + =? boar.yoke ?=(^ p.sign) (~(del by boar.yoke) sub-key) + :: + =. boat.yoke + ?^ p.sign (~(del by boat.yoke) sub-key) + :: + %+ ~(jab by boat.yoke) sub-key + |= val=[acked=? =path] + %. val(acked &) + %^ trace &(odd.veb.bug.state acked.val) + leaf/"{} 2nd watch-ack on {}" ~ + :: + ingest-and-check-error + == :: - =? outbound.watches.yoke ?=(%watch-ack -.sign) - ?^ p.sign - %- ~(del by outbound.watches.yoke) - [agent-wire dock] - %+ ~(jab by outbound.watches.yoke) [agent-wire dock] - |= [acked=? =path] - =. . - ?. acked - . - %- =/ =tape - "{}: received 2nd watch-ack on {}" - (slog leaf+tape ~) - . - [& path] + ++ on-missing + %. ap-core + %+ trace odd.veb.bug.state :~ + leaf+"{}: got {<-.sign>} for nonexistent subscription" + leaf+"{}: {<[nonce=nonce agent-wire]>}" + >wire=wire< + == :: - =^ maybe-tang ap-core - %+ ap-ingest ~ |. - (on-agent:ap-agent-core agent-wire sign) - :: if failed %fact handling, kill subscription + ++ on-weird-kick + %. run-sign + %+ trace odd.veb.bug.state :~ + leaf+"{}: got %kick for nonexistent subscription" + leaf+"{}: {}" + >wire=wire< + == :: - =? ap-core ?=(%fact -.sign) - (ap-update-subscription =(~ maybe-tang) p.dock q.dock agent-wire) - ?^ maybe-tang - (ap-error -.sign leaf/"closing subscription" u.maybe-tang) - ap-core + ++ on-bad-nonce + |= stored-nonce=@ + %. ap-core + %- slog :~ + =/ nonces [expected=stored-nonce got=nonce] + =/ ok |(?=(?(%fact %kick) -.sign) =(~ p.sign)) + leaf+"{}: stale {<-.sign>} {} ok={}" + :: + leaf+"{}: {}" + >wire=wire< + == + :: + ++ ingest-and-check-error + ^+ ap-core + =^ tan ap-core ingest + ?~(tan ap-core (ap-error -.sign leaf/"take {<-.sign>} failed" u.tan)) + -- :: +ap-install: install wrapper. :: ++ ap-install @@ -1499,24 +1699,18 @@ :: ++ ap-silent-delete ^+ ap-core - :: - %= ap-core - inbound.watches.yoke - (~(del by inbound.watches.yoke) agent-duct) - == + ap-core(bitt.yoke (~(del by bitt.yoke) agent-duct)) :: +ap-load-delete: load delete. :: ++ ap-load-delete ^+ ap-core :: - =/ maybe-incoming - (~(get by inbound.watches.yoke) agent-duct) + =/ maybe-incoming (~(get by bitt.yoke) agent-duct) ?~ maybe-incoming ap-core :: - =/ incoming u.maybe-incoming - =. inbound.watches.yoke - (~(del by inbound.watches.yoke) agent-duct) + =/ incoming u.maybe-incoming + =. bitt.yoke (~(del by bitt.yoke) agent-duct) :: =^ maybe-tang ap-core %+ ap-ingest ~ |. @@ -1621,11 +1815,10 @@ `ap-core :: =. agent.yoke &++.p.result - =/ moves (zing (turn -.p.result ap-from-internal)) - =. inbound.watches.yoke - (ap-handle-kicks moves) + =/ moves (zing (turn -.p.result ap-from-internal)) + =. bitt.yoke (ap-handle-kicks moves) (ap-handle-peers moves) - :: +ap-handle-kicks: handle cancels of inbound.watches + :: +ap-handle-kicks: handle cancels of bitt.watches :: ++ ap-handle-kicks ~/ %ap-handle-kicks @@ -1641,8 +1834,8 @@ :: =/ quit-map=bitt (malt (turn quits |=(=duct [duct *[ship path]]))) - (~(dif by inbound.watches.yoke) quit-map) - :: +ap-handle-peers: handle new outbound.watches + (~(dif by bitt.yoke) quit-map) + :: +ap-handle-peers: handle new boat.watches :: ++ ap-handle-peers ~/ %ap-handle-peers @@ -1656,34 +1849,62 @@ ?: ?=([* %pass * %g %deal * * %leave *] move) =/ =wire p.move.move ?> ?=([%use @ @ %out @ @ *] wire) - =/ short-wire t.t.t.t.t.t.wire - =/ =dock [q.p q]:q.move.move - =. outbound.watches.yoke - (~(del by outbound.watches.yoke) [short-wire dock]) + =/ =dock [q.p q]:q.move.move + =/ sys-wire=^wire (scag 6 `^wire`wire) + =/ sub-wire=^wire (slag 6 `^wire`wire) + :: + ?. (~(has by boat.yoke) sub-wire dock) + %. $(moves t.moves) + %^ trace odd.veb.bug.state + leaf/"gall: {} missing subscription, got %leave" ~ + =/ nonce=@ (~(got by boar.yoke) sub-wire dock) + =. p.move.move + %+ weld sys-wire + ?: =(nonce 0) + :: skip adding nonce to pre-nonce subscription wires + :: + sub-wire + [(scot %ud nonce) sub-wire] + =: boat.yoke (~(del by boat.yoke) [sub-wire dock]) + boar.yoke (~(del by boar.yoke) [sub-wire dock]) + == + :: if nonce = 0, this was a pre-nonce subscription so later + :: subscriptions need to start subscribing on the next nonce + :: + =? sub-nonce.yoke =(nonce 0) +(sub-nonce.yoke) $(moves t.moves, new-moves [move new-moves]) ?. ?=([* %pass * %g %deal * * ?(%watch %watch-as) *] move) $(moves t.moves, new-moves [move new-moves]) =/ =wire p.move.move ?> ?=([%use @ @ %out @ @ *] wire) - =/ short-wire t.t.t.t.t.t.wire - =/ =dock [q.p q]:q.move.move - =/ =path - ?- -.r.q.move.move - %watch path.r.q.move.move - %watch-as path.r.q.move.move - == - ?: (~(has by outbound.watches.yoke) short-wire dock) + =/ sys-wire=^wire (scag 6 `^wire`wire) + =/ sub-wire=^wire (slag 6 `^wire`wire) + =/ [=dock =deal] [[q.p q] r]:q.move.move + :: + ?: (~(has by boat.yoke) sub-wire dock) =. ap-core =/ =tang - ~[leaf+"subscribe wire not unique" >agent-name< >short-wire< >dock<] - =/ have - (~(got by outbound.watches.yoke) short-wire dock) + ~[leaf+"subscribe wire not unique" >agent-name< >sub-wire< >dock<] + =/ have (~(got by boat.yoke) sub-wire dock) %- (slog >out=have< tang) (ap-error %watch-not-unique tang) :: reentrant, maybe bad? $(moves t.moves) - =. outbound.watches.yoke - (~(put by outbound.watches.yoke) [short-wire dock] [| path]) - $(moves t.moves, new-moves [move new-moves]) + :: + =. p.move.move + (weld sys-wire [(scot %ud sub-nonce.yoke) sub-wire]) + %_ $ + moves t.moves + new-moves [move new-moves] + sub-nonce.yoke +(sub-nonce.yoke) + :: + boat.yoke + %+ ~(put by boat.yoke) [sub-wire dock] + :- acked=| + path=?+(-.deal !! %watch path.deal, %watch-as path.deal) + :: + boar.yoke + (~(put by boar.yoke) [sub-wire dock] sub-nonce.yoke) + == -- -- :: +call: request @@ -1726,6 +1947,8 @@ %jolt mo-abet:(mo-jolt:mo-core dude.task our desk.task) %idle mo-abet:(mo-idle:mo-core dude.task) %nuke mo-abet:(mo-nuke:mo-core dude.task) + %spew mo-abet:(mo-spew:mo-core veb.task) + %sift mo-abet:(mo-sift:mo-core dudes.task) %trim [~ gall-payload] %vega [~ gall-payload] == @@ -1797,6 +2020,19 @@ acc (~(put in acc) [dude -.agent.yoke]) :: + ?: ?& =(%n care) + ?=([@ @ ^] path) + =([%$ %da now] coin) + =(our ship) + == + ?~ yok=(~(get by yokes.state) dap) + [~ ~] + =/ [=^ship =term =wire] + [(slav %p i.path) i.t.path t.t.path] + ?~ nonce=(~(get by boar.u.yok) [wire ship term]) + [~ ~] + [~ ~ atom+!>(u.nonce)] + :: ?. =(our ship) ~ ?. =([%$ %da now] coin) diff --git a/pkg/arvo/tests/sys/vane/ames.hoon b/pkg/arvo/tests/sys/vane/ames.hoon index 668ab28124..511198c320 100644 --- a/pkg/arvo/tests/sys/vane/ames.hoon +++ b/pkg/arvo/tests/sys/vane/ames.hoon @@ -354,46 +354,48 @@ !> (snag 0 `(list move:ames)`moves6) == :: -++ test-comet-message-flow ^- tang - :: same as test-message-flow, but ~nec will send a sendkeys packet to request - :: comet's self-attestation directly - :: - =^ moves0 nec (call nec ~[/g/talk] %plea our-comet %g /talk [%get %post]) - =^ moves1 comet (call comet ~[//unix] %hear (snag-packet 0 moves0)) - =^ moves2 comet - =/ =point:ames - :* rift=1 - life=2 - keys=[[life=2 [crypto-suite=1 `@`nec-pub]] ~ ~] - sponsor=`~nec - == - %- take - :^ comet /public-keys ~[//unix] - ^- sign:ames - [%jael %public-keys %full [n=[~nec point] ~ ~]] - :: give comet's self-attestation to ~nec; at this point, we have established - :: a channel, and can proceed as usual - :: - =^ moves3 nec (call nec ~[//unix] %hear (snag-packet 0 moves2)) - =^ moves4 comet (call comet ~[//unix] %hear (snag-packet 0 moves3)) - =^ moves5 comet (take comet /bone/~nec/0/1 ~[//unix] %g %done ~) - =^ moves6 nec (call nec ~[//unix] %hear (snag-packet 0 moves5)) - =^ moves7 comet (take comet /bone/~nec/0/1 ~[//unix] %g %boon [%post 'first1!!']) - =^ moves8 nec (call nec ~[//unix] %hear (snag-packet 0 moves7)) - :: - ;: weld - %+ expect-eq - !> [~[//unix] %pass /qos %d %flog %text "; ~nec is your neighbor"] - !> (snag 0 `(list move:ames)`moves4) - :: - %+ expect-eq - !> [~[//unix] %pass /qos %d %flog %text "; {} is your neighbor"] - !> (snag 0 `(list move:ames)`moves6) - :: - %+ expect-eq - !> [~[/g/talk] %give %boon [%post 'first1!!']] - !> (snag 0 `(list move:ames)`moves8) - == +::TODO crashes in (snag 0 moves5), presumably due to subtle changes around +:: #5886. fix and re-enable! +:: ++ test-comet-message-flow ^- tang +:: :: same as test-message-flow, but ~nec will send a sendkeys packet to request +:: :: comet's self-attestation directly +:: :: +:: =^ moves0 nec (call nec ~[/g/talk] %plea our-comet %g /talk [%get %post]) +:: =^ moves1 comet (call comet ~[//unix] %hear (snag-packet 0 moves0)) +:: =^ moves2 comet +:: =/ =point:ames +:: :* rift=1 +:: life=2 +:: keys=[[life=2 [crypto-suite=1 `@`nec-pub]] ~ ~] +:: sponsor=`~nec +:: == +:: %- take +:: :^ comet /public-keys ~[//unix] +:: ^- sign:ames +:: [%jael %public-keys %full [n=[~nec point] ~ ~]] +:: :: give comet's self-attestation to ~nec; at this point, we have established +:: :: a channel, and can proceed as usual +:: :: +:: =^ moves3 nec (call nec ~[//unix] %hear (snag-packet 0 moves2)) +:: =^ moves4 comet (call comet ~[//unix] %hear (snag-packet 0 moves3)) +:: =^ moves5 comet (take comet /bone/~nec/0/1 ~[//unix] %g %done ~) +:: =^ moves6 nec (call nec ~[//unix] %hear (snag-packet 0 moves5)) +:: =^ moves7 comet (take comet /bone/~nec/0/1 ~[//unix] %g %boon [%post 'first1!!']) +:: =^ moves8 nec (call nec ~[//unix] %hear (snag-packet 0 moves7)) +:: :: +:: ;: weld +:: %+ expect-eq +:: !> [~[//unix] %pass /qos %d %flog %text "; ~nec is your neighbor"] +:: !> (snag 0 `(list move:ames)`moves4) +:: :: +:: %+ expect-eq +:: !> [~[//unix] %pass /qos %d %flog %text "; {} is your neighbor"] +:: !> (snag 0 `(list move:ames)`moves6) +:: :: +:: %+ expect-eq +:: !> [~[/g/talk] %give %boon [%post 'first1!!']] +:: !> (snag 0 `(list move:ames)`moves8) +:: == :: ++ test-comet-comet-message-flow ^- tang :: same as test-message-flow, but the comets need to exchange diff --git a/pkg/garden/lib/treaty.hoon b/pkg/garden/lib/treaty.hoon index bea8fc750b..65d64b366b 100644 --- a/pkg/garden/lib/treaty.hoon +++ b/pkg/garden/lib/treaty.hoon @@ -27,6 +27,7 @@ %da s+(scot %da p.c) %tas s+(scot %tas p.c) %ud (numb p.c) + %uv s+(scot %uv p.c) == ++ foreign-desk |= [s=^ship =desk]