Merge pull request #5886 from urbit/yu/gall-rq-global-cork-timer

gall, ames: fix subscription desync (stage ii)
This commit is contained in:
fang 2022-07-28 17:46:45 +02:00 committed by GitHub
commit 731e27d5a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 857 additions and 254 deletions

View File

@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:c45166ff0f8ab8dc1552bcef519c77c0afa6ca52f8ed1ba31ed632012667d619
size 8674763
oid sha256:cab0dd267cc5c17eb0d0164e876556ae7975fd5db59a738d741ecd767cca8594
size 8760359

View File

@ -0,0 +1,8 @@
:: Helm: Set Gall Verbosity by Agent
::
/? 310
::
:- %say
|= [^ dudes=(list dude:gall) ~]
:- %helm-gall-sift
dudes

View File

@ -0,0 +1,11 @@
:: Helm: Adjust Gall verbosity
::
:: List of diagnostic flags is in verb:gall in zuse.hoon, documented in
:: gall.hoon
::
/? 310
::
:- %say
|= [^ veb=(list verb:gall) ~]
:- %helm-gall-verb
veb

View File

@ -77,7 +77,8 @@
::
++ de-gill :: gill from wire
|= way=wire ^- gill:gall
?>(?=([@ @ ~] way) [(slav %p i.way) i.t.way])
~| way
?>(?=([@ @ *] way) [(slav %p i.way) i.t.way])
--
:: TODO: remove .ost
::

View File

@ -230,6 +230,14 @@
|= veb=(list verb:ames) =< abet
(emit %pass /helm %arvo %a %spew veb)
::
++ poke-gall-sift
|= dudes=(list dude:gall) =< abet
(emit %pass /helm %arvo %g %sift dudes)
::
++ poke-gall-verb
|= veb=(list verb:gall) =< abet
(emit %pass /helm %arvo %g %spew veb)
::
++ poke-ames-wake
|= ~ =< abet
(emit %pass /helm %arvo %a %stir '')
@ -269,6 +277,8 @@
%helm-code =;(f (f !<(_+<.f vase)) poke-code)
%helm-cors-approve =;(f (f !<(_+<.f vase)) poke-cors-approve)
%helm-cors-reject =;(f (f !<(_+<.f vase)) poke-cors-reject)
%helm-gall-sift =;(f (f !<(_+<.f vase)) poke-gall-sift)
%helm-gall-verb =;(f (f !<(_+<.f vase)) poke-gall-verb)
%helm-hi =;(f (f !<(_+<.f vase)) poke-hi)
%helm-knob =;(f (f !<(_+<.f vase)) poke-knob)
%helm-pans =;(f (f !<(_+<.f vase)) poke-pans)

View File

@ -351,6 +351,7 @@
:: %hear: packet from unix
:: %heed: track peer's responsiveness; gives %clog if slow
:: %jilt: stop tracking peer's responsiveness
:: %cork: request to delete message flow
:: %plea: request to send message
::
:: System and Lifecycle Tasks
@ -367,6 +368,7 @@
$% [%hear =lane =blob]
[%heed =ship]
[%jilt =ship]
[%cork =ship]
$>(%plea vane-task)
::
$>(%born vane-task)
@ -511,6 +513,9 @@
:: entry and emit a nack to the local vane that asked us to send
:: the message.
:: heeds: listeners for %clog notifications
:: closing: bones closed on the sender side
:: corked: bones closed on both sender and receiver
:: krocs: bones that need to be sent again to the publisher
::
+$ peer-state
$: $: =symmetric-key
@ -526,6 +531,9 @@
rcv=(map bone message-sink-state)
nax=(set [=bone =message-num])
heeds=(set duct)
closing=(set bone)
corked=(set bone)
krocs=(set bone)
==
:: $qos: quality of service; how is our connection to a peer doing?
::
@ -1655,11 +1663,12 @@
$>(%trim vane-task) :: trim state
$>(%vega vane-task) :: report upgrade
$>(%plea vane-task) :: network request
[%spew veb=(list verb)] :: set verbosity
[%sift dudes=(list dude)] :: per agent
== ::
+$ bitt (map duct (pair ship path)) :: incoming subs
+$ boat :: outgoing subs
%+ map [=wire =ship =term] ::
[acked=? =path] ::
+$ boat (map [=wire =ship =term] [acked=? =path]) :: outgoing subs
+$ boar (map [=wire =ship =term] nonce=@) :: and their nonces
+$ bowl :: standard app state
$: $: our=ship :: host
src=ship :: guest
@ -1695,6 +1704,9 @@
$% [%raw-fact =mark =noun]
sign:agent
==
:: TODO: add more flags?
::
+$ verb ?(%odd)
::
:: +agent: app core
::

View File

@ -598,6 +598,26 @@
::
+$ naxplanation [=message-num =error]
::
+| %statics
::
:: $ames-state: state for entire vane
::
:: peers: states of connections to other ships
:: unix-duct: handle to give moves to unix
:: life: our $life; how many times we've rekeyed
:: crypto-core: interface for encryption and signing
:: bug: debug printing configuration
:: corks: wires for cork flows pending publisher update
::
+$ ames-state
$: peers=(map ship ship-state)
=unix=duct
=life
crypto-core=acru:ames
=bug
corks=(set wire)
==
::
+$ ames-state-4 ames-state-5
+$ ames-state-5
$: peers=(map ship ship-state-5)
@ -628,17 +648,36 @@
heeds=(set duct)
==
::
+| %statics
+$ ames-state-6
$: peers=(map ship ship-state-6)
=unix=duct
=life
crypto-core=acru:ames
=bug
==
::
:: $ames-state: state for entire vane
+$ ship-state-6
$% [%alien alien-agenda]
[%known peer-state-6]
==
::
:: peers: states of connections to other ships
:: unix-duct: handle to give moves to unix
:: life: our $life; how many times we've rekeyed
:: crypto-core: interface for encryption and signing
:: bug: debug printing configuration
+$ peer-state-6
$: $: =symmetric-key
=life
=rift
=public-key
sponsor=ship
==
route=(unit [direct=? =lane])
=qos
=ossuary
snd=(map bone message-pump-state)
rcv=(map bone message-sink-state)
nax=(set [=bone =message-num])
heeds=(set duct)
==
::
+$ ames-state
+$ ames-state-7
$: peers=(map ship ship-state)
=unix=duct
=life
@ -730,12 +769,16 @@
:: $message-pump-gift: effect from |message-pump
::
:: %done: report message acknowledgment
:: %cork: kill flow
:: %kroc: recork this bone
:: %send: emit message fragment
:: %wait: set a new timer at .date
:: %rest: cancel timer at .date
::
+$ message-pump-gift
$% [%done =message-num error=(unit error)]
[%cork ~]
[%kroc =bone]
[%send =static-fragment]
[%wait date=@da]
[%rest date=@da]
@ -774,7 +817,7 @@
:: .ok: %.y unless previous failed attempt
::
+$ message-sink-task
$% [%done ok=?]
$% [%done ok=? cork=?]
[%drop =message-num]
[%hear =lane =shut-packet ok=?]
==
@ -786,6 +829,7 @@
+$ message-sink-gift
$% [%memo =message-num message=*]
[%send =message-num =ack-meat]
[%cork ~]
==
--
:: external vane interface
@ -795,7 +839,7 @@
::
=< =* adult-gate .
=| queued-events=(qeu queued-event)
=| cached-state=(unit [%5 ames-state-5])
=| cached-state=(unit $%([%5 ames-state-5] [%6 ames-state-6] [%7 ames-state-7]))
::
|= [now=@da eny=@ rof=roof]
=* larval-gate .
@ -821,7 +865,9 @@
::
?: &(?=(^ cached-state) ?=(~ queued-events))
=^ moves adult-gate (call:adult-core duct dud task)
(molt moves)
%- molt
~> %slog.0^leaf/"ames: init daily recork timer"
:_(moves [duct %pass /recork %b %wait `@da`(add now ~d1)])
:: %born: set .unix-duct and start draining .queued-events
::
?: ?=(%born -.task)
@ -903,9 +949,12 @@
:: .queued-events has been cleared; metamorphose
::
?~ queued-events
?: ?=(^ cached-state) (molt moves)
~> %slog.0^leaf/"ames: metamorphosis"
[moves adult-gate]
?. ?=(^ cached-state)
~> %slog.0^leaf/"ames: metamorphosis"
[moves adult-gate]
%- molt
~> %slog.0^leaf/"ames: init daily recork timer"
:_(moves [duct %pass /recork %b %wait `@da`(add now ~d1)])
:: set timer to drain next event
::
=. moves :_(moves [duct %pass /larva %b %wait now])
@ -913,7 +962,7 @@
:: lifecycle arms; mostly pass-throughs to the contained adult ames
::
++ scry scry:adult-core
++ stay [%6 %larva queued-events ames-state.adult-gate]
++ stay [%8 %larva queued-events ames-state.adult-gate]
++ load
|= $= old
$% $: %4
@ -931,6 +980,20 @@
[%adult state=ames-state-5]
== ==
$: %6
$% $: %larva
events=(qeu queued-event)
state=ames-state-6
==
[%adult state=ames-state-6]
== ==
$: %7
$% $: %larva
events=(qeu queued-event)
state=ames-state-7
==
[%adult state=ames-state-7]
== ==
$: %8
$% $: %larva
events=(qeu queued-event)
state=_ames-state.adult-gate
@ -955,22 +1018,47 @@
=. queued-events events.old
larval-gate
::
[%6 %adult *] (load:adult-core %6 state.old)
[%6 %adult *]
=. cached-state `[%6 state.old]
~> %slog.0^leaf/"ames: larva reload"
larval-gate
::
[%6 %larva *]
~> %slog.0^leaf/"ames: larva: load"
=. queued-events events.old
=. adult-gate (load:adult-core %6 state.old)
larval-gate
::
[%7 %adult *]
=. cached-state `[%7 state.old]
~> %slog.0^leaf/"ames: larva reload"
larval-gate
::
[%7 %larva *]
~> %slog.0^leaf/"ames: larva: load"
=. queued-events events.old
larval-gate
::
[%8 %adult *] (load:adult-core %8 state.old)
::
[%8 %larva *]
~> %slog.1^leaf/"ames: larva: load"
=. queued-events events.old
=. adult-gate (load:adult-core %8 state.old)
larval-gate
::
==
:: +molt: re-evolve to adult-ames
::
++ molt
|= moves=(list move)
^- (quip move _adult-gate)
=? cached-state &(?=(^ cached-state) ?=(%5 +<.cached-state))
`%6^(state-5-to-6:load:adult-core +.u.cached-state)
=? cached-state &(?=(^ cached-state) ?=(%6 +<.cached-state))
`%7^(state-6-to-7:load:adult-core +.u.cached-state)
=. ames-state.adult-gate
?> ?=(^ cached-state)
(state-5-to-6:load:adult-core +.u.cached-state)
?> &(?=(^ cached-state) ?=(%7 +<.cached-state))
(state-7-to-8:load:adult-core +.u.cached-state)
=. cached-state ~
~> %slog.0^leaf/"ames: metamorphosis reload"
[moves adult-gate]
@ -1015,6 +1103,7 @@
%trim on-trim:event-core
%vega on-vega:event-core
%plea (on-plea:event-core [ship plea]:task)
%cork (on-cork:event-core ship.task)
==
::
[moves ames-gate]
@ -1045,20 +1134,23 @@
[moves ames-gate]
:: +stay: extract state before reload
::
++ stay [%6 %adult ames-state]
++ stay [%8 %adult ames-state]
:: +load: load in old state after reload
::
++ load
=< |= old-state=[%6 ^ames-state]
=< |= $= old-state
$% [%8 ^ames-state]
==
^+ ames-gate
?> ?=(%6 -.old-state)
?> ?=(%8 -.old-state)
ames-gate(ames-state +.old-state)
::
|%
:: +state-4-to-5 called from larval-ames
::
++ state-4-to-5
|= ames-state=ames-state-4
^- ames-state-4
^- ames-state-5
=. peers.ames-state
%- ~(run by peers.ames-state)
|= ship-state=ship-state-4
@ -1076,11 +1168,11 @@
::
++ state-5-to-6
|= ames-state=ames-state-5
^- ^^ames-state
^- ames-state-6
:_ +.ames-state
%- ~(rut by peers.ames-state)
|= [=ship ship-state=ship-state-5]
^- ^ship-state
^- ship-state-6
?. ?=(%known -.ship-state)
ship-state
=/ peer-state=peer-state-5 +.ship-state
@ -1091,12 +1183,37 @@
;; @ud
=< q.q %- need %- need
(rof ~ %j `beam`[[our %rift %da now] /(scot %p ship)])
=/ =^peer-state
:_ +.peer-state
=, -.peer-state
[symmetric-key life rift public-key sponsor]
:- -.ship-state
:_ +.peer-state
=, -.peer-state
[symmetric-key life rift public-key sponsor]
:: +state-6-to-7 called from larval-ames
::
++ state-6-to-7
|= ames-state=ames-state-6
^- ames-state-7
:_ +.ames-state
%- ~(run by peers.ames-state)
|= ship-state=ship-state-6
^- ^ship-state
[-.ship-state peer-state]
?. ?=(%known -.ship-state)
ship-state
:- %known
^- peer-state
:- +<.ship-state
[route qos ossuary snd rcv nax heeds ~ ~ ~]:ship-state
:: +state-7-to-8 called from larval-ames
::
++ state-7-to-8
|= ames-state=ames-state-7
^- ^^ames-state
:* peers.ames-state
unix-duct.ames-state
life.ames-state
crypto-core.ames-state
bug.ames-state
*(set wire)
==
--
:: +scry: dereference namespace
::
@ -1134,6 +1251,7 @@
:: /ax/peers/[ship]/forward-lane (list lane)
:: /ax/bones/[ship] [snd=(set bone) rcv=(set bone)]
:: /ax/snd-bones/[ship]/[bone] vase
:: /ax/corks (list wire)
::
?. ?=(%x ren) ~
?+ tyl ~
@ -1209,6 +1327,9 @@
=/ res
u.mps
``noun+!>(!>(res))
::
[%corks ~]
``noun+!>(~(tap in corks.ames-state))
==
--
:: |per-event: inner event-handling core
@ -1271,13 +1392,15 @@
++ send-ack
|= =bone
^+ event-core
abet:(run-message-sink:peer-core bone %done ok=%.y)
=/ cork=? (~(has in closing.peer-state) bone)
abet:(run-message-sink:peer-core bone %done ok=%.y cork)
:: failed; send message nack packet
::
++ send-nack
|= [=bone =^error]
^+ event-core
=. event-core abet:(run-message-sink:peer-core bone %done ok=%.n)
=. event-core
abet:(run-message-sink:peer-core bone %done ok=%.n cork=%.n)
=/ =^peer-state (got-peer-state her)
=/ =^channel [[our her] now channel-state -.peer-state]
:: construct nack-trace message, referencing .failed $message-num
@ -1638,7 +1761,35 @@
=/ sndr [our our-life.channel]
=/ rcvr [ship her-life.channel]
"plea {<sndr^rcvr^bone=bone^vane.plea^path.plea>}"
:: since flow kill goes like:
:: client vane cork task -> client ames pass cork as plea ->
:: -> server ames sinks plea -> server ames +on-plea (we are here);
:: if it's %cork plea passed to ames from its sink,
:: give %done and process flow closing after +on-take-done call
::
?: &(=(vane.plea %a) =(path.plea `path`/close) ?=(~ payload.plea))
(emit duct %give %done ~)
abet:(on-memo:(make-peer-core peer-state channel) bone plea %plea)
:: +on-cork: handle request to kill a flow
::
++ on-cork
|= =ship
^+ event-core
=/ ship-state (~(get by peers.ames-state) ship)
::
?> ?=([~ %known *] ship-state)
=/ =peer-state +.u.ship-state
=/ =channel [[our ship] now channel-state -.peer-state]
::
=^ =bone ossuary.peer-state (bind-duct ossuary.peer-state duct)
=/ =plea [%$ /flow [%cork ~]]
::
=. closing.peer-state (~(put in closing.peer-state) bone)
%- %^ trace msg.veb ship
|. ^- tape
=/ sndr [our our-life.channel]
=/ rcvr [ship her-life.channel]
"cork plea {<sndr^rcvr^bone=bone^vane.plea^path.plea>}"
abet:(on-memo:(make-peer-core peer-state channel) bone plea %plea)
:: +on-take-wake: receive wakeup or error notification from behn
::
@ -1660,19 +1811,32 @@
event-core
(request-attestation u.ship)
::
=/ res=(unit [her=ship =bone]) (parse-pump-timer-wire wire)
?~ res
%- (slog leaf+"ames: got timer for strange wire: {<wire>}" ~)
event-core
|^
?. ?=([%recork ~] wire) (handle-single-wire wire)
=/ wires=(list ^wire) ~(tap in corks.ames-state)
|- ^+ event-core
?^ wires
$(wires t.wires, event-core (handle-single-wire i.wires))
(emit duct %pass /recork %b %wait `@da`(add now ~d1))
::
=/ state=(unit peer-state) (get-peer-state her.u.res)
?~ state
%- (slog leaf+"ames: got timer for strange ship: {<her.u.res>}, ignoring" ~)
event-core
::
=/ =channel [[our her.u.res] now channel-state -.u.state]
::
abet:(on-wake:(make-peer-core u.state channel) bone.u.res error)
++ handle-single-wire
|= =^wire
^+ event-core
=/ res=(unit [her=ship =bone]) (parse-pump-timer-wire wire)
?~ res
%- (slog leaf+"ames: got timer for strange wire: {<wire>}" ~)
event-core
::
=/ state=(unit peer-state) (get-peer-state her.u.res)
?~ state
%. event-core
%- slog
[leaf+"ames: got timer for strange ship: {<her.u.res>}, ignoring" ~]
::
=/ =channel [[our her.u.res] now channel-state -.u.state]
::
abet:(on-wake:(make-peer-core u.state channel) bone.u.res error)
--
:: +on-init: first boot; subscribe to our info from jael
::
++ on-init
@ -1718,7 +1882,6 @@
::
?- public-keys-result
[%diff @ %rift *]
:: event-core
(on-publ-rift [who to.diff]:public-keys-result)
::
[%diff @ %keys *]
@ -1926,6 +2089,7 @@
::
=. qos.peer-state [%unborn now]
=. life.peer-state life.point
=. rift.peer-state rift.point
=. public-key.peer-state public-key
=. symmetric-key.peer-state symmetric-key
=. sponsor.peer-state
@ -2349,6 +2513,12 @@
=(1 current:(~(got by snd.peer-state) bone))
==
(send-blob | her.channel (attestation-packet [her her-life]:channel))
?: (~(has in corked.peer-state) bone)
:: if the bone was corked the flow doesn't exist anymore
:: TODO: clean up corked bones in the peer state when it's _safe_?
:: (e.g. if this bone is N blocks behind the next one)
::
peer-core
:: maybe resend some timed out packets
::
(run-message-pump bone %wake ~)
@ -2390,7 +2560,8 @@
=/ =message-pump-state
(~(gut by snd.peer-state) bone *message-pump-state)
::
=/ message-pump (make-message-pump message-pump-state channel)
=/ close=? (~(has in closing.peer-state) bone)
=+ message-pump=(make-message-pump message-pump-state channel close bone)
=^ pump-gifts message-pump-state (work:message-pump task)
=. snd.peer-state (~(put by snd.peer-state) bone message-pump-state)
:: process effects from |message-pump
@ -2401,6 +2572,8 @@
=. peer-core
?- -.gift
%done (on-pump-done [message-num error]:gift)
%cork (on-pump-cork current.message-pump-state)
%kroc (on-pump-kroc bone:gift)
%send (on-pump-send static-fragment.gift)
%wait (on-pump-wait date.gift)
%rest (on-pump-rest date.gift)
@ -2411,6 +2584,16 @@
++ on-pump-done
|= [=message-num error=(unit error)]
^+ peer-core
?: ?& =(1 (end 0 bone))
=(1 (end 0 (rsh 0 bone)))
(~(has in corked.peer-state) (mix 0b10 bone))
==
%- %+ trace msg.veb
=/ dat [her.channel bone=bone message-num=message-num -.task]
|.("remove naxplanation flow {<dat>}")
=. snd.peer-state
(~(del by snd.peer-state) bone)
peer-core
:: if odd bone, ack is on "subscription update" message; no-op
::
?: =(1 (end 0 bone))
@ -2423,13 +2606,53 @@
=/ target-bone=^bone (mix 0b10 bone)
::
(run-message-sink target-bone %drop message-num)
?: &(close ?=(%near -.task))
:: if the bone belongs to a closing flow and we got a naxplanation,
:: don't relay the ack to the client vane, and wait for the next try
::
peer-core
:: not a nack-trace bone; relay ack to client vane
::
(emit (got-duct bone) %give %done error)
:: +on-pump-cork: kill flow on cork sender side
::
++ on-pump-cork
|= =message-num
^+ peer-core
:: clear all packets from this message from the packet pump
::
=. message-pump (run-packet-pump:message-pump %done message-num *@dr)
=/ =wire (make-pump-timer-wire her.channel bone)
=. corks.ames-state (~(del in corks.ames-state) wire)
=/ nack-bone=^bone (mix 0b10 bone)
=? rcv.peer-state (~(has by rcv.peer-state) nack-bone)
:: if the publisher was behind we remove nacks received on that bone
::
(~(del by rcv.peer-state) nack-bone)
=. peer-state
=, peer-state
%_ peer-state
snd (~(del by snd) bone)
rcv (~(del by rcv) bone)
corked (~(put in corked) bone)
closing (~(del in closing) bone)
krocs (~(del in krocs) bone)
by-duct.ossuary (~(del by by-duct.ossuary) (got-duct bone))
by-bone.ossuary (~(del by by-bone.ossuary) bone)
==
peer-core
:: +on-pump-krock: if we get a nack for a cork, add it to the recork set
::
++ on-pump-kroc
|= =^bone
^+ peer-core
=. krocs.peer-state (~(put in krocs.peer-state) bone)
peer-core
:: +on-pump-send: emit message fragment requested by |message-pump
::
++ on-pump-send
|=(f=static-fragment (send-shut-packet bone [message-num %& +]:f))
|= f=static-fragment
(send-shut-packet bone [message-num %& +]:f)
:: +on-pump-wait: relay |message-pump's set-timer request
::
++ on-pump-wait
@ -2454,13 +2677,15 @@
++ run-message-sink
|= [=bone task=message-sink-task]
^+ peer-core
?: (~(has in corked.peer-state) bone) peer-core
:: pass .task to the |message-sink and apply state mutations
::
=/ =message-sink-state
(~(gut by rcv.peer-state) bone *message-sink-state)
::
=/ message-sink (make-message-sink message-sink-state channel)
=^ sink-gifts message-sink-state (work:message-sink task)
=/ closing=? (~(has in closing.peer-state) bone)
=^ sink-gifts message-sink-state (work:message-sink closing task)
=. rcv.peer-state (~(put by rcv.peer-state) bone message-sink-state)
:: process effects from |message-sink
::
@ -2471,8 +2696,32 @@
?- -.gift
%memo (on-sink-memo [message-num message]:gift)
%send (on-sink-send [message-num ack-meat]:gift)
%cork on-sink-cork
==
$(sink-gifts t.sink-gifts)
:: +on-sink-cork: handle flow kill after server ames has taken %done
::
++ on-sink-cork
^+ peer-core
=/ =message-pump-state
(~(gut by snd.peer-state) bone *message-pump-state)
=? peer-core ?=(^ next-wake.packet-pump-state.message-pump-state)
=* next-wake u.next-wake.packet-pump-state.message-pump-state
=/ =wire (make-pump-timer-wire her.channel bone)
:: resetting timer for boons
::
(emit [/ames]~ %pass wire %b %rest next-wake)
=/ nax-bone=^bone (mix 0b10 bone)
=. peer-state
=, peer-state
%_ peer-state
rcv (~(del by rcv) bone)
snd (~(del by snd) bone)
corked (~(put in corked) bone)
closing (~(del in closing) bone)
krocs (~(del in krocs) bone)
==
peer-core
:: +on-sink-send: emit ack packet as requested by |message-sink
::
++ on-sink-send
@ -2506,10 +2755,14 @@
++ on-sink-boon
|= [=message-num message=*]
^+ peer-core
?: ?| (~(has in closing.peer-state) bone)
(~(has in corked.peer-state) bone)
==
peer-core
:: send ack unconditionally
::
=. peer-core (emit (got-duct bone) %give %boon message)
=. peer-core (run-message-sink bone %done ok=%.y)
=. peer-core (run-message-sink bone %done ok=%.y cork=%.n)
::
?. ?=([%hear * * ok=%.n] task)
:: fresh boon; give message to client vane
@ -2547,52 +2800,91 @@
=+ ;; =naxplanation message
:: ack nack-trace message (only applied if we don't later crash)
::
=. peer-core (run-message-sink bone %done ok=%.y)
=. peer-core (run-message-sink bone %done ok=%.y cork=%.n)
:: flip .bone's second bit to find referenced flow
::
=/ target-bone=^bone (mix 0b10 bone)
:: notify |message-pump that this message got naxplained
::
(run-message-pump target-bone %near naxplanation)
=. peer-core (run-message-pump target-bone %near naxplanation)
::
?. (~(has in krocs.peer-state) target-bone)
peer-core
=/ =message-pump-state
(~(gut by snd.peer-state) target-bone *message-pump-state)
=/ message-pump
(make-message-pump message-pump-state channel %.y target-bone)
:: we don't process the gifts here and instead wait for
:: the timer to handle the %cork plea added to the pump
::
=^ * message-pump-state
%- work:message-pump
%memo^(dedup-message (jim [%$ /flow [%cork ~]]))
=. snd.peer-state
(~(put by snd.peer-state) target-bone message-pump-state)
:: if we get a naxplanation for a %cork, the publisher is behind
:: receiving the OTA, so we set up a timer to retry in one day.
::
%- %+ trace msg.veb
|.("old publisher, resend %cork on bone={<target-bone>} in ~d1")
=/ =wire (make-pump-timer-wire her.channel target-bone)
=. corks.ames-state (~(put in corks.ames-state) wire)
peer-core
:: +on-sink-plea: handle request message received by |message-sink
::
++ on-sink-plea
|= [=message-num message=*]
^+ peer-core
?: ?| (~(has in closing.peer-state) bone)
(~(has in corked.peer-state) bone)
==
peer-core
|^
%- %+ trace msg.veb
=/ dat [her.channel bone=bone message-num=message-num]
|.("sink plea {<dat>}")
:: is this the first time we're trying to process this message?
::
?. ?=([%hear * * ok=%.n] task)
:: fresh plea; pass to client vane
::
=+ ;; =plea message
::
=/ =wire (make-bone-wire her.channel her-rift.channel bone)
?: ?=([%hear * * ok=%.n] task)
:: we previously crashed on this message; send nack
::
nack-plea
:: fresh plea; pass to client vane
::
=+ ;; =plea message
=/ =wire (make-bone-wire her.channel her-rift.channel bone)
::
?. =(vane.plea %$)
?+ vane.plea ~| %ames-evil-vane^our^her.channel^vane.plea !!
%c (emit duct %pass wire %c %plea her.channel plea)
%g (emit duct %pass wire %g %plea her.channel plea)
%j (emit duct %pass wire %j %plea her.channel plea)
==
:: we previously crashed on this message; send nack
:: a %cork plea is handled using %$ as the recipient vane to
:: account for publishers that still handle ames-to-ames %pleas
::
=. peer-core (run-message-sink bone %done ok=%.n)
:: also send nack-trace with blank .error for security
?> &(?=([%cork *] payload.plea) ?=(%flow -.path.plea))
=. closing.peer-state (~(put in closing.peer-state) bone)
(emit duct %pass wire %a %plea her.channel [%a /close ~])
::
=/ nack-trace-bone=^bone (mix 0b10 bone)
=/ =naxplanation [message-num *error]
=/ =message-blob (jam naxplanation)
::
(run-message-pump nack-trace-bone %memo message-blob)
++ nack-plea
^+ peer-core
=. peer-core (run-message-sink bone %done ok=%.n cork=%.n)
:: send nack-trace with blank .error for security
::
=/ nack-trace-bone=^bone (mix 0b10 bone)
=/ =naxplanation [message-num *error]
=/ =message-blob (jam naxplanation)
::
(run-message-pump nack-trace-bone %memo message-blob)
--
--
--
--
:: +make-message-pump: constructor for |message-pump
::
++ make-message-pump
|= [state=message-pump-state =channel]
|= [state=message-pump-state =channel closing=? =bone]
=* veb veb.bug.channel
=| gifts=(list message-pump-gift)
::
@ -2627,11 +2919,35 @@
%memo (on-memo message-blob.task)
%wake (run-packet-pump %wake current.state)
%hear
?- -.ack-meat.task
%& (on-hear [message-num fragment-num=p.ack-meat]:task)
%| (on-done [message-num ?:(ok.p.ack-meat [%ok ~] [%nack ~])]:task)
?- -.ack-meat.task
%&
(on-hear [message-num fragment-num=p.ack-meat]:task)
::
%|
=/ cork=?
=/ top-live
(pry:packet-queue:*make-packet-pump live.packet-pump-state.state)
:: If we send a %cork and get an ack, we can know by
:: sequence number that the ack is for the %cork message
::
?& closing
?=(^ top-live)
=(0 ~(wyt in unsent-messages.state))
=(0 (lent unsent-fragments.state))
=(1 ~(wyt by live.packet-pump-state.state))
=(message-num:task message-num.key.u.top-live)
==
=* ack p.ack-meat.task
=? message-pump &(cork !ok.ack) (give [%kroc bone])
=. message-pump
%- on-done
[[message-num:task ?:(ok.ack [%ok ~] [%nack ~])] cork]
?. &(!ok.ack cork) message-pump
%. message-pump
%+ trace odd.veb
|.("got nack for %cork {<bone=bone message-num=message-num:task>}")
==
%near (on-done [message-num %naxplanation error]:naxplanation.task)
%near (on-done [[message-num %naxplanation error]:naxplanation.task %&])
==
:: +on-memo: handle request to send a message
::
@ -2663,10 +2979,11 @@
:: flows.
::
++ on-done
|= [=message-num =ack]
|= [[=message-num =ack] cork=?]
^+ message-pump
:: unsent messages from the future should never get acked
::
~| [message-num next.state]
?> (lth message-num next.state)
:: ignore duplicate message acks
::
@ -2725,6 +3042,7 @@
?- -.u.cur
%ok
=. message-pump (give %done current.state ~)
=? message-pump cork (give %cork ~)
$(current.state +(current.state))
::
%nack
@ -3270,20 +3588,20 @@
:: +work: handle a $message-sink-task
::
++ work
|= task=message-sink-task
|= [closing=? task=message-sink-task]
^+ [gifts state]
::
=- [(flop gifts) state]
::
?- -.task
%done (on-done ok.task)
%done (on-done ok.task cork.task)
%drop (on-drop message-num.task)
%hear (on-hear [lane shut-packet ok]:task)
%hear (on-hear closing [lane shut-packet ok]:task)
==
:: +on-hear: receive message fragment, possibly completing message
::
++ on-hear
|= [=lane =shut-packet ok=?]
|= [closing=? =lane =shut-packet ok=?]
^+ message-sink
:: we know this is a fragment, not an ack; expose into namespace
::
@ -3320,13 +3638,14 @@
:: doesn't happen for boons.
::
?: (lte seq last-heard.state)
?: is-last-fragment
:: drop last packet since we don't know whether to ack or nack
?: &(is-last-fragment !closing)
:: if not from a closing bone, drop last packet,
:: since we don't know whether to ack or nack
::
%- %+ trace rcv.veb
|. ^- tape
=/ data
:* her.channel seq=seq
:* her.channel seq=seq bone=bone
fragment-num=fragment-num num-fragments=num-fragments
la=last-acked.state lh=last-heard.state
==
@ -3336,7 +3655,9 @@
::
%- %+ trace rcv.veb |.
=/ data
[seq=seq fragment-num=fragment-num num-fragments=num-fragments]
:* seq=seq fragment-num=fragment-num
num-fragments=num-fragments closing=closing
==
"send ack-1 {<data>}"
(give %send seq %& fragment-num)
:: last-heard<seq<10+last-heard; this is a packet in a live message
@ -3425,7 +3746,7 @@
:: +on-done: handle confirmation of message processing from vane
::
++ on-done
|= ok=?
|= [ok=? cork=?]
^+ message-sink
::
=^ pending pending-vane-ack.state ~(get to pending-vane-ack.state)
@ -3435,6 +3756,7 @@
=? nax.state !ok (~(put in nax.state) message-num)
::
=. message-sink (give %send message-num %| ok lag=`@dr`0)
=? message-sink cork (give %cork ~)
=/ next ~(top to pending-vane-ack.state)
?~ next
message-sink

View File

@ -4,17 +4,47 @@
::
::::
|= our=ship
:: veb: verbosity flags
::
=/ veb-all-off
:: TODO: add more flags?
::
:* odd=`?`%.n :: unusual events
==
=, gall
=>
|%
+| %helpers
:: +trace: print if .verb is set and we're tracking .dude
::
++ trace
|= [verb=? =dude dudes=(set dude) print=tang]
^+ same
?. verb
same
?. => [dude=dude dudes=dudes in=in]
~+ |(=(~ dudes) (~(has in dudes) dude))
same
(slog print)
::
:: $bug: debug printing configuration
::
:: veb: verbosity toggles
:: dudes: app filter; if ~, print for all
::
+$ bug
$: veb=_veb-all-off
dudes=(set dude)
==
::
+| %main
::
:: $move: Arvo-level move
::
+$ move [=duct move=(wind note-arvo gift-arvo)]
:: $state-8: overall gall state, versioned
:: $state-9: overall gall state, versioned
::
+$ state-8 [%8 state]
+$ state-9 [%9 state]
:: $state: overall gall state
::
:: system-duct: TODO document
@ -22,6 +52,7 @@
:: contacts: other ships we're in communication with
:: yokes: running agents
:: blocked: moves to agents that haven't been started yet
:: bug: debug printing configuration
::
+$ state
$: system-duct=duct
@ -29,13 +60,8 @@
contacts=(set ship)
yokes=(map term yoke)
blocked=(map term (qeu blocked-move))
=bug
==
:: $watches: subscribers and publications
::
:: TODO: rename this, to $ties?
:: TODO: rename $boat and $bitt and document
::
+$ watches [inbound=bitt outbound=boat]
:: $routes: new cuff; TODO: document
::
+$ routes
@ -45,19 +71,26 @@
:: $yoke: agent runner state
::
:: control-duct: TODO document
:: live: is this agent running? TODO document better
:: run-nonce: unique for each rebuild
:: sub-nonce: app-wide global %watch nonce
:: live: is this agent running? TODO document boarer
:: stats: TODO document
:: watches: incoming and outgoing subscription state
:: bitt: incoming subscriptions
:: boat: outgoing subscriptions
:: boar: and their nonces
:: agent: agent core
:: beak: compilation source
:: marks: mark conversion requests
::
+$ yoke
$: control-duct=duct
nonce=@t
live=? ::TODO remove, replaced by -.agent
run-nonce=@t
sub-nonce=_1
live=?
=stats
=watches
=bitt
=boat
=boar
agent=(each agent vase)
=beak
marks=(map duct mark)
@ -108,6 +141,7 @@
%poke
%leave
%missing
%cork
==
:: |migrate: data structures for upgrades
::
@ -116,21 +150,25 @@
:: $spore: structures for update, produced by +stay
::
+$ spore
$: %8
$: %9
system-duct=duct
outstanding=(map [wire duct] (qeu remote-request))
contacts=(set ship)
eggs=(map term egg)
blocked=(map term (qeu blocked-move))
=bug
==
:: $egg: migratory agent state; $yoke with .old-state instead of .agent
::
+$ egg
$: control-duct=duct
nonce=@t
run-nonce=@t
sub-nonce=@
live=?
=stats
=watches
=bitt
=boat
=boar
old-state=(each vase vase)
=beak
marks=(map duct mark)
@ -139,6 +177,7 @@
:: pupal gall core, on upgrade
::
=< =* adult-gate .
=| spore-tag=@ud
=| =spore
|= [now=@da eny=@uvJ rof=roof]
=* pupal-gate .
@ -164,10 +203,12 @@
[^duct %pass /whiz/gall %$ %whiz ~]~
=/ adult adult-core
=. state.adult
[%8 system-duct outstanding contacts yokes=~ blocked]:spore
=/ mo-core (mo-abed:mo:adult duct)
[%9 system-duct outstanding contacts yokes=~ blocked bug]:spore
=/ mo-core (mo-abed:mo:adult system-duct.state.adult)
=/ apps=(list [dap=term =egg]) ~(tap by eggs.spore)
:: upgrade %base apps and suspend others
::
=. mo-core
=/ apps=(list [dap=term =egg]) ~(tap by eggs.spore)
|- ^+ mo-core
?~ apps mo-core
?. =(%base q.beak.egg.i.apps)
@ -185,6 +226,24 @@
(mean u.tan)
ap-core
$(apps t.apps, mo-core ap-abet:ap-core)
:: FIXME: ! kiln: %base not installed
:: ! /lib/hood/kiln/hoon:<[461 23].[461 42]>
:: ! /base/gall-lyv
:: kill subscriptions when upgrading to gall request queue fix
::
:: =? mo-core (lth spore-tag %9)
:: |- ^+ mo-core
:: ?~ apps mo-core
:: ~> %slog.[0 leaf+"gall: +ap-kill-down {<dap.i.apps>}"]
:: =/ ap-core (ap-abut:ap:mo-core i.apps)
:: =. ap-core
:: =/ boats=(list [=wire =dock])
:: ~(tap in ~(key by boat.egg.i.apps))
:: |- ^+ ap-core
:: ?~ boats ap-core
:: =/ [=wire =dock] i.boats
:: $(boats t.boats, ap-core (ap-kill-down:ap-core wire dock))
:: $(apps t.apps, mo-core ap-abet:ap-core)
=. mo-core (mo-subscribe-to-agent-builds:mo-core now)
=^ moves adult-gate mo-abet:mo-core
=? moves ?=(^ fec) (weld moves [u.fec]~)
@ -223,9 +282,10 @@
::
++ load
|^ |= old=spore-any
=? old ?=(%7 -.old)
(spore-7-to-8 old)
?> ?=(%8 -.old)
=. spore-tag `@ud`-.old
=? old ?=(%7 -.old) (spore-7-to-8 old)
=? old ?=(%8 -.old) (spore-8-to-9 old)
?> ?=(%9 -.old)
=. spore old
?. =(~ eggs.spore)
pupal-gate
@ -234,32 +294,78 @@
state spore(eggs *(map term yoke))
==
::
+$ spore-any $%(^spore spore-7)
+$ spore-any $%(^spore spore-8 spore-7)
+$ spore-7
$: %7
wipe-eyre-subs=_| ::NOTE band-aid for #3196
system-duct=duct
outstanding=(map [wire duct] (qeu remote-request))
contacts=(set ship)
eggs=(map term egg)
eggs=(map term egg-7)
blocked=(map term (qeu blocked-move))
==
::
+$ spore-8
$: %8
system-duct=duct
outstanding=(map [wire duct] (qeu remote-request))
contacts=(set ship)
eggs=(map term egg-8)
blocked=(map term (qeu blocked-move))
==
::
+$ egg-7 egg-8
+$ egg-8
$: control-duct=duct
run-nonce=@t
live=?
=stats
watches=watches-8
old-state=(each vase vase)
=beak
marks=(map duct mark)
==
::
+$ watches-8 [inbound=bitt outbound=boat-8]
+$ boat-8 (map [wire ship term] [acked=? =path])
::
++ spore-7-to-8
|= old=spore-7
^- ^spore
^- spore-8
:- %8
=. eggs.old
%- ~(urn by eggs.old)
|= [a=term e=egg]
|= [a=term e=egg-7]
::NOTE kiln will kick off appropriate app revival
e(old-state [%| p.old-state.e])
+>.old
::
++ spore-8-to-9
|= old=spore-8
^- ^spore
=- old(- %9, eggs -, blocked [blocked.old *bug])
%- ~(run by eggs.old)
|= =egg-8
^- egg
=/ [=bitt =boat =boar] (watches-8-to-9 watches.egg-8)
:* control-duct.egg-8
run-nonce.egg-8
sub-nonce=0
live.egg-8
stats.egg-8
bitt boat boar
[old-state beak marks]:egg-8
==
::
++ watches-8-to-9
|= watches-8
^- [bitt boat boar]
[inbound outbound (~(run by outbound) |=([acked=? =path] nonce=0))]
--
--
:: adult gall vane interface, for type compatibility with pupa
::
=| state=state-8
=| state=state-9
|= [now=@da eny=@uvJ rof=roof]
=* gall-payload .
=< ~% %gall-wrap ..mo ~
@ -280,6 +386,12 @@
++ mo
~% %gall-mo +> ~
|_ [hen=duct moves=(list move)]
::
++ trace
|= [verb=? =dude print=tang]
^+ same
(^trace verb dude dudes.bug.state print)
::
:: +mo-abed: initialise state with the provided duct
:: +mo-abet: finalize, reversing moves
:: +mo-pass: prepend a standard %pass to the current list of moves
@ -357,7 +469,7 @@
control-duct hen
beak bek
agent &+agent
nonce (scot %uw (end 5 (shas %yoke-nonce eny)))
run-nonce (scot %uw (end 5 (shas %yoke-nonce eny)))
==
::
=/ old mo-core
@ -457,8 +569,12 @@
=. outstanding.state
=/ stand
(~(gut by outstanding.state) [wire hen] *(qeu remote-request))
(~(put by outstanding.state) [wire hen] (~(put to stand) -.deal))
(mo-pass wire note-arvo)
%+ ~(put by outstanding.state) [wire hen]
(~(gas to stand) ?.(?=(%leave -.deal) ~[-.deal] ~[%leave %cork]))
=. mo-core (mo-pass wire note-arvo)
?. ?=(%leave -.deal)
mo-core
(mo-pass wire [%a [%cork ship]])
:: +mo-track-ship: subscribe to ames and jael for notices about .ship
::
++ mo-track-ship
@ -680,7 +796,12 @@
(~(put to *(qeu remote-request)) %missing)
~| [full-wire=full-wire hen=hen stand=stand]
=^ rr stand ~(get to stand)
[rr (~(put by outstanding.state) [full-wire hen] stand)]
~? &(=(rr %cork) ?=(^ stand))
[%outstanding-queue-not-empty wire hen]
:- rr
?: ?=(%cork rr)
(~(del by outstanding.state) [full-wire hen])
(~(put by outstanding.state) [full-wire hen] stand)
:: non-null case of wire is old, remove on next breach after
:: 2019/12
::
@ -696,6 +817,7 @@
%watch (mo-give %unto %watch-ack err)
%poke (mo-give %unto %poke-ack err)
%leave mo-core
%cork mo-core
%missing (mo-give:(mo-give %unto %watch-ack err) %unto %poke-ack err)
==
::
@ -738,7 +860,7 @@
?~ yoke
%- (slog leaf+"gall: {<dap>} dead, got {<+<.sign-arvo>}" ~)
mo-core
?. =(nonce.u.yoke i.t.wire)
?. =(run-nonce.u.yoke i.t.wire)
%- (slog leaf+"gall: got old {<+<.sign-arvo>} for {<dap>}" ~)
mo-core
?. ?=([?(%gall %behn) %unto *] sign-arvo)
@ -962,6 +1084,28 @@
%d (mo-give %unto %raw-fact mark.ames-response noun.ames-response)
%x (mo-give %unto %kick ~)
==
:: +mo-spew: handle request to set verbosity toggles on debug output
::
++ mo-spew
|= verbs=(list verb)
^+ mo-core
:: start from all %.n's, then flip requested toggles
::
=. veb.bug.state
%+ roll verbs
|= [=verb acc=_veb-all-off]
^+ veb.bug.state
?- verb
%odd acc(odd %.y)
==
mo-core
:: +mo-sift: handle request to filter debug output by agent
::
++ mo-sift
|= dudes=(list dude)
^+ mo-core
=. dudes.bug.state (sy dudes)
mo-core
:: +ap: agent engine
::
:: An inner, agent-level core. The sample refers to the agent we're
@ -976,6 +1120,12 @@
agent-config=(list (each suss tang))
=yoke
==
::
++ trace
|= [verb=? print=tang]
^+ same
(^trace verb agent-name print)
::
++ ap-core .
:: +ap-abed: initialise state for an agent, with the supplied routes.
::
@ -1036,11 +1186,9 @@
::
++ ap-nuke
^+ ap-core
=/ out=(list [[=wire =ship =term] ? =path])
~(tap by outbound.watches.yoke)
=/ inbound-paths=(set path)
%- silt
%+ turn ~(tap by inbound.watches.yoke)
%+ turn ~(tap by bitt.yoke)
|= [=duct =ship =path]
path
=/ will=(list card:agent:gall)
@ -1048,7 +1196,7 @@
?: =(~ inbound-paths)
~
[%give %kick ~(tap in inbound-paths) ~]~
%+ turn ~(tap by outbound.watches.yoke)
%+ turn ~(tap by boat.yoke)
|= [[=wire =ship =term] ? =path]
[%pass wire %agent [ship term] %leave ~]
=^ maybe-tang ap-core (ap-ingest ~ |.([will *agent]))
@ -1133,12 +1281,13 @@
tang.neet
==
=. wire
:^ %use agent-name run-nonce.yoke
?- -.neet
%agent [%out (scot %p ship.neet) name.neet wire]
%huck [%out (scot %p ship.neet) name.neet wire]
%arvo [(scot %p attributing.agent-routes) wire]
==
=. wire [%use agent-name nonce.yoke wire]
::
=/ =note-arvo
?- -.neet
%arvo note-arvo.neet
@ -1152,8 +1301,7 @@
++ ap-breach
|= =ship
^+ ap-core
=/ in=(list [=duct =^ship =path])
~(tap by inbound.watches.yoke)
=/ in=(list [=duct =^ship =path]) ~(tap by bitt.yoke)
|- ^+ ap-core
?^ in
=? ap-core =(ship ship.i.in)
@ -1161,15 +1309,19 @@
core(agent-duct agent-duct)
$(in t.in)
::
=/ out=(list [[=wire =^ship =term] ? =path])
~(tap by outbound.watches.yoke)
=/ out=(list [[=wire =^ship =term] ? =path nonce=@])
%+ turn ~(tap by boat.yoke)
|= [key=[wire ^ship term] val=[? path]]
:- key
val(+ [+.val (~(got by boar.yoke) key)])
|- ^+ ap-core
?~ out
ap-core
=? ap-core =(ship ship.i.out)
=/ core
=. agent-duct system-duct.state
=/ way [%out (scot %p ship) term.i.out wire.i.out]
=/ way
[%out (scot %p ship) term.i.out (scot %ud nonce.i.out) wire.i.out]
(ap-specific-take way %kick ~)
core(agent-duct agent-duct)
$(out t.out)
@ -1184,8 +1336,7 @@
|= =ship
^+ ap-core
::
=/ in=(list [=duct =^ship =path])
~(tap by inbound.watches.yoke)
=/ in=(list [=duct =^ship =path]) ~(tap by bitt.yoke)
|- ^+ ap-core
?~ in ap-core
::
@ -1206,7 +1357,7 @@
?~ target-paths
?~ target-ship
~[agent-duct]
%+ murn ~(tap by inbound.watches.yoke)
%+ murn ~(tap by bitt.yoke)
|= [=duct =ship =path]
^- (unit ^duct)
?: =(target-ship `ship)
@ -1221,7 +1372,7 @@
++ ap-ducts-from-path
|= [target-path=path target-ship=(unit ship)]
^- (list duct)
%+ murn ~(tap by inbound.watches.yoke)
%+ murn ~(tap by bitt.yoke)
|= [=duct =ship =path]
^- (unit ^duct)
?: ?& =(target-path path)
@ -1284,15 +1435,6 @@
?: ?=(%& -.res)
``want^p.res
((slog leaf+"peek failed tube from {(trip have)} to {(trip want)}" ~) ~)
:: +ap-update-subscription: update subscription.
::
++ ap-update-subscription
~/ %ap-update-subscription
|= [is-ok=? =other=ship other-agent=term =wire]
^+ ap-core
?: is-ok
ap-core
(ap-kill-down wire [other-ship other-agent])
:: +ap-move: send move
::
++ ap-move
@ -1316,8 +1458,8 @@
attributing.agent-routes :: guest
agent-name :: agent
== ::
:* wex=outbound.watches.yoke :: outgoing
sup=inbound.watches.yoke :: incoming
:* wex=boat.yoke :: outgoing
sup=bitt.yoke :: incoming
== ::
:* act=change.stats.yoke :: tick
eny=eny.stats.yoke :: nonce
@ -1352,9 +1494,8 @@
~/ %ap-subscribe
|= pax=path
^+ ap-core
=/ incoming [attributing.agent-routes pax]
=. inbound.watches.yoke
(~(put by inbound.watches.yoke) agent-duct incoming)
=/ incoming [attributing.agent-routes pax]
=. bitt.yoke (~(put by bitt.yoke) agent-duct incoming)
=^ maybe-tang ap-core
%+ ap-ingest %watch-ack |.
(on-watch:ap-agent-core pax)
@ -1404,6 +1545,7 @@
=/ other-agent i.t.t.wire
=/ =dock [other-ship other-agent]
=/ agent-wire t.t.t.wire
=/ nonce=@ 0
::
=^ =sign:agent ap-core
?. ?=(%raw-fact -.unto)
@ -1424,48 +1566,106 @@
%- ap-move :_ ~
:^ hen %pass /nowhere
[%c %warp our q.beak.yoke ~ %sing %b case /[mark.unto]]
|^ ^+ ap-core
:: %poke-ack has no nonce; ingest directly
::
?: ?=(%poke-ack -.sign)
ingest-and-check-error
:: if .agent-wire matches, it's an old pre-nonce subscription
::
?: (~(has by boat.yoke) sub-key)
run-sign
:: if an app happened to use a null wire, no-op
::
?: =(~ agent-wire)
on-missing
=/ has-nonce=(unit @ud) (slaw %ud (head agent-wire))
?: &(?=(~ has-nonce) ?=(%kick -.sign))
on-weird-kick
:: pop nonce off .agent-wire and match against stored subscription
::
?> ?=(^ has-nonce)
=: nonce u.has-nonce
agent-wire (tail agent-wire)
==
?~ got=(~(get by boar.yoke) sub-key)
on-missing
?: =(nonce.u.got nonce)
run-sign
(on-bad-nonce nonce.u.got)
::
:: if subscription ack or close, handle before calling user code
::
=? outbound.watches.yoke ?=(%kick -.sign)
%- ~(del by outbound.watches.yoke)
[agent-wire dock]
?: ?& ?=(%watch-ack -.sign)
!(~(has by outbound.watches.yoke) [agent-wire dock])
==
%- %: slog
++ sub-key [agent-wire dock]
++ ingest (ap-ingest ~ |.((on-agent:ap-agent-core agent-wire sign)))
++ run-sign
?- -.sign
%poke-ack !!
%fact
=^ tan ap-core ingest
?~ tan ap-core
=. ap-core (ap-kill-down sub-key)
(ap-error -.sign leaf/"take %fact failed, closing subscription" u.tan)
::
%kick
=: boar.yoke (~(del by boar.yoke) sub-key)
boat.yoke (~(del by boat.yoke) sub-key)
==
ingest-and-check-error
::
%watch-ack
?. (~(has by boat.yoke) sub-key)
%. ap-core
%+ trace odd.veb.bug.state :~
leaf+"{<agent-name>}: got ack for nonexistent subscription"
leaf+"{<dock>}: {<agent-wire>}"
>wire=wire<
~
==
ap-core
=? boar.yoke ?=(^ p.sign) (~(del by boar.yoke) sub-key)
::
=. boat.yoke
?^ p.sign (~(del by boat.yoke) sub-key)
::
%+ ~(jab by boat.yoke) sub-key
|= val=[acked=? =path]
%. val(acked &)
%^ trace &(odd.veb.bug.state acked.val)
leaf/"{<agent-name>} 2nd watch-ack on {<val>}" ~
::
ingest-and-check-error
==
::
=? outbound.watches.yoke ?=(%watch-ack -.sign)
?^ p.sign
%- ~(del by outbound.watches.yoke)
[agent-wire dock]
%+ ~(jab by outbound.watches.yoke) [agent-wire dock]
|= [acked=? =path]
=. .
?. acked
.
%- =/ =tape
"{<agent-name>}: received 2nd watch-ack on {<wire dock path>}"
(slog leaf+tape ~)
.
[& path]
++ on-missing
%. ap-core
%+ trace odd.veb.bug.state :~
leaf+"{<agent-name>}: got {<-.sign>} for nonexistent subscription"
leaf+"{<dock>}: {<[nonce=nonce agent-wire]>}"
>wire=wire<
==
::
=^ maybe-tang ap-core
%+ ap-ingest ~ |.
(on-agent:ap-agent-core agent-wire sign)
:: if failed %fact handling, kill subscription
++ on-weird-kick
%. run-sign
%+ trace odd.veb.bug.state :~
leaf+"{<agent-name>}: got %kick for nonexistent subscription"
leaf+"{<dock>}: {<agent-wire>}"
>wire=wire<
==
::
=? ap-core ?=(%fact -.sign)
(ap-update-subscription =(~ maybe-tang) p.dock q.dock agent-wire)
?^ maybe-tang
(ap-error -.sign leaf/"closing subscription" u.maybe-tang)
ap-core
++ on-bad-nonce
|= stored-nonce=@
%. ap-core
%- slog :~
=/ nonces [expected=stored-nonce got=nonce]
=/ ok |(?=(?(%fact %kick) -.sign) =(~ p.sign))
leaf+"{<agent-name>}: stale {<-.sign>} {<nonces>} ok={<ok>}"
::
leaf+"{<dock>}: {<agent-wire>}"
>wire=wire<
==
::
++ ingest-and-check-error
^+ ap-core
=^ tan ap-core ingest
?~(tan ap-core (ap-error -.sign leaf/"take {<-.sign>} failed" u.tan))
--
:: +ap-install: install wrapper.
::
++ ap-install
@ -1499,24 +1699,18 @@
::
++ ap-silent-delete
^+ ap-core
::
%= ap-core
inbound.watches.yoke
(~(del by inbound.watches.yoke) agent-duct)
==
ap-core(bitt.yoke (~(del by bitt.yoke) agent-duct))
:: +ap-load-delete: load delete.
::
++ ap-load-delete
^+ ap-core
::
=/ maybe-incoming
(~(get by inbound.watches.yoke) agent-duct)
=/ maybe-incoming (~(get by bitt.yoke) agent-duct)
?~ maybe-incoming
ap-core
::
=/ incoming u.maybe-incoming
=. inbound.watches.yoke
(~(del by inbound.watches.yoke) agent-duct)
=/ incoming u.maybe-incoming
=. bitt.yoke (~(del by bitt.yoke) agent-duct)
::
=^ maybe-tang ap-core
%+ ap-ingest ~ |.
@ -1621,11 +1815,10 @@
`ap-core
::
=. agent.yoke &++.p.result
=/ moves (zing (turn -.p.result ap-from-internal))
=. inbound.watches.yoke
(ap-handle-kicks moves)
=/ moves (zing (turn -.p.result ap-from-internal))
=. bitt.yoke (ap-handle-kicks moves)
(ap-handle-peers moves)
:: +ap-handle-kicks: handle cancels of inbound.watches
:: +ap-handle-kicks: handle cancels of bitt.watches
::
++ ap-handle-kicks
~/ %ap-handle-kicks
@ -1641,8 +1834,8 @@
::
=/ quit-map=bitt
(malt (turn quits |=(=duct [duct *[ship path]])))
(~(dif by inbound.watches.yoke) quit-map)
:: +ap-handle-peers: handle new outbound.watches
(~(dif by bitt.yoke) quit-map)
:: +ap-handle-peers: handle new boat.watches
::
++ ap-handle-peers
~/ %ap-handle-peers
@ -1656,34 +1849,62 @@
?: ?=([* %pass * %g %deal * * %leave *] move)
=/ =wire p.move.move
?> ?=([%use @ @ %out @ @ *] wire)
=/ short-wire t.t.t.t.t.t.wire
=/ =dock [q.p q]:q.move.move
=. outbound.watches.yoke
(~(del by outbound.watches.yoke) [short-wire dock])
=/ =dock [q.p q]:q.move.move
=/ sys-wire=^wire (scag 6 `^wire`wire)
=/ sub-wire=^wire (slag 6 `^wire`wire)
::
?. (~(has by boat.yoke) sub-wire dock)
%. $(moves t.moves)
%^ trace odd.veb.bug.state
leaf/"gall: {<agent-name>} missing subscription, got %leave" ~
=/ nonce=@ (~(got by boar.yoke) sub-wire dock)
=. p.move.move
%+ weld sys-wire
?: =(nonce 0)
:: skip adding nonce to pre-nonce subscription wires
::
sub-wire
[(scot %ud nonce) sub-wire]
=: boat.yoke (~(del by boat.yoke) [sub-wire dock])
boar.yoke (~(del by boar.yoke) [sub-wire dock])
==
:: if nonce = 0, this was a pre-nonce subscription so later
:: subscriptions need to start subscribing on the next nonce
::
=? sub-nonce.yoke =(nonce 0) +(sub-nonce.yoke)
$(moves t.moves, new-moves [move new-moves])
?. ?=([* %pass * %g %deal * * ?(%watch %watch-as) *] move)
$(moves t.moves, new-moves [move new-moves])
=/ =wire p.move.move
?> ?=([%use @ @ %out @ @ *] wire)
=/ short-wire t.t.t.t.t.t.wire
=/ =dock [q.p q]:q.move.move
=/ =path
?- -.r.q.move.move
%watch path.r.q.move.move
%watch-as path.r.q.move.move
==
?: (~(has by outbound.watches.yoke) short-wire dock)
=/ sys-wire=^wire (scag 6 `^wire`wire)
=/ sub-wire=^wire (slag 6 `^wire`wire)
=/ [=dock =deal] [[q.p q] r]:q.move.move
::
?: (~(has by boat.yoke) sub-wire dock)
=. ap-core
=/ =tang
~[leaf+"subscribe wire not unique" >agent-name< >short-wire< >dock<]
=/ have
(~(got by outbound.watches.yoke) short-wire dock)
~[leaf+"subscribe wire not unique" >agent-name< >sub-wire< >dock<]
=/ have (~(got by boat.yoke) sub-wire dock)
%- (slog >out=have< tang)
(ap-error %watch-not-unique tang) :: reentrant, maybe bad?
$(moves t.moves)
=. outbound.watches.yoke
(~(put by outbound.watches.yoke) [short-wire dock] [| path])
$(moves t.moves, new-moves [move new-moves])
::
=. p.move.move
(weld sys-wire [(scot %ud sub-nonce.yoke) sub-wire])
%_ $
moves t.moves
new-moves [move new-moves]
sub-nonce.yoke +(sub-nonce.yoke)
::
boat.yoke
%+ ~(put by boat.yoke) [sub-wire dock]
:- acked=|
path=?+(-.deal !! %watch path.deal, %watch-as path.deal)
::
boar.yoke
(~(put by boar.yoke) [sub-wire dock] sub-nonce.yoke)
==
--
--
:: +call: request
@ -1726,6 +1947,8 @@
%jolt mo-abet:(mo-jolt:mo-core dude.task our desk.task)
%idle mo-abet:(mo-idle:mo-core dude.task)
%nuke mo-abet:(mo-nuke:mo-core dude.task)
%spew mo-abet:(mo-spew:mo-core veb.task)
%sift mo-abet:(mo-sift:mo-core dudes.task)
%trim [~ gall-payload]
%vega [~ gall-payload]
==
@ -1797,6 +2020,19 @@
acc
(~(put in acc) [dude -.agent.yoke])
::
?: ?& =(%n care)
?=([@ @ ^] path)
=([%$ %da now] coin)
=(our ship)
==
?~ yok=(~(get by yokes.state) dap)
[~ ~]
=/ [=^ship =term =wire]
[(slav %p i.path) i.t.path t.t.path]
?~ nonce=(~(get by boar.u.yok) [wire ship term])
[~ ~]
[~ ~ atom+!>(u.nonce)]
::
?. =(our ship)
~
?. =([%$ %da now] coin)

View File

@ -354,46 +354,48 @@
!> (snag 0 `(list move:ames)`moves6)
==
::
++ test-comet-message-flow ^- tang
:: same as test-message-flow, but ~nec will send a sendkeys packet to request
:: comet's self-attestation directly
::
=^ moves0 nec (call nec ~[/g/talk] %plea our-comet %g /talk [%get %post])
=^ moves1 comet (call comet ~[//unix] %hear (snag-packet 0 moves0))
=^ moves2 comet
=/ =point:ames
:* rift=1
life=2
keys=[[life=2 [crypto-suite=1 `@`nec-pub]] ~ ~]
sponsor=`~nec
==
%- take
:^ comet /public-keys ~[//unix]
^- sign:ames
[%jael %public-keys %full [n=[~nec point] ~ ~]]
:: give comet's self-attestation to ~nec; at this point, we have established
:: a channel, and can proceed as usual
::
=^ moves3 nec (call nec ~[//unix] %hear (snag-packet 0 moves2))
=^ moves4 comet (call comet ~[//unix] %hear (snag-packet 0 moves3))
=^ moves5 comet (take comet /bone/~nec/0/1 ~[//unix] %g %done ~)
=^ moves6 nec (call nec ~[//unix] %hear (snag-packet 0 moves5))
=^ moves7 comet (take comet /bone/~nec/0/1 ~[//unix] %g %boon [%post 'first1!!'])
=^ moves8 nec (call nec ~[//unix] %hear (snag-packet 0 moves7))
::
;: weld
%+ expect-eq
!> [~[//unix] %pass /qos %d %flog %text "; ~nec is your neighbor"]
!> (snag 0 `(list move:ames)`moves4)
::
%+ expect-eq
!> [~[//unix] %pass /qos %d %flog %text "; {<our-comet>} is your neighbor"]
!> (snag 0 `(list move:ames)`moves6)
::
%+ expect-eq
!> [~[/g/talk] %give %boon [%post 'first1!!']]
!> (snag 0 `(list move:ames)`moves8)
==
::TODO crashes in (snag 0 moves5), presumably due to subtle changes around
:: #5886. fix and re-enable!
:: ++ test-comet-message-flow ^- tang
:: :: same as test-message-flow, but ~nec will send a sendkeys packet to request
:: :: comet's self-attestation directly
:: ::
:: =^ moves0 nec (call nec ~[/g/talk] %plea our-comet %g /talk [%get %post])
:: =^ moves1 comet (call comet ~[//unix] %hear (snag-packet 0 moves0))
:: =^ moves2 comet
:: =/ =point:ames
:: :* rift=1
:: life=2
:: keys=[[life=2 [crypto-suite=1 `@`nec-pub]] ~ ~]
:: sponsor=`~nec
:: ==
:: %- take
:: :^ comet /public-keys ~[//unix]
:: ^- sign:ames
:: [%jael %public-keys %full [n=[~nec point] ~ ~]]
:: :: give comet's self-attestation to ~nec; at this point, we have established
:: :: a channel, and can proceed as usual
:: ::
:: =^ moves3 nec (call nec ~[//unix] %hear (snag-packet 0 moves2))
:: =^ moves4 comet (call comet ~[//unix] %hear (snag-packet 0 moves3))
:: =^ moves5 comet (take comet /bone/~nec/0/1 ~[//unix] %g %done ~)
:: =^ moves6 nec (call nec ~[//unix] %hear (snag-packet 0 moves5))
:: =^ moves7 comet (take comet /bone/~nec/0/1 ~[//unix] %g %boon [%post 'first1!!'])
:: =^ moves8 nec (call nec ~[//unix] %hear (snag-packet 0 moves7))
:: ::
:: ;: weld
:: %+ expect-eq
:: !> [~[//unix] %pass /qos %d %flog %text "; ~nec is your neighbor"]
:: !> (snag 0 `(list move:ames)`moves4)
:: ::
:: %+ expect-eq
:: !> [~[//unix] %pass /qos %d %flog %text "; {<our-comet>} is your neighbor"]
:: !> (snag 0 `(list move:ames)`moves6)
:: ::
:: %+ expect-eq
:: !> [~[/g/talk] %give %boon [%post 'first1!!']]
:: !> (snag 0 `(list move:ames)`moves8)
:: ==
::
++ test-comet-comet-message-flow ^- tang
:: same as test-message-flow, but the comets need to exchange

View File

@ -27,6 +27,7 @@
%da s+(scot %da p.c)
%tas s+(scot %tas p.c)
%ud (numb p.c)
%uv s+(scot %uv p.c)
==
++ foreign-desk
|= [s=^ship =desk]