Merge pull request #5886 from urbit/yu/gall-rq-global-cork-timer

gall, ames: fix subscription desync (stage ii)
This commit is contained in:
fang 2022-07-28 17:46:45 +02:00 committed by GitHub
commit 731e27d5a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 857 additions and 254 deletions

View File

@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1 version https://git-lfs.github.com/spec/v1
oid sha256:c45166ff0f8ab8dc1552bcef519c77c0afa6ca52f8ed1ba31ed632012667d619 oid sha256:cab0dd267cc5c17eb0d0164e876556ae7975fd5db59a738d741ecd767cca8594
size 8674763 size 8760359

View File

@ -0,0 +1,8 @@
:: Helm: Set Gall Verbosity by Agent
::
/? 310
::
:- %say
|= [^ dudes=(list dude:gall) ~]
:- %helm-gall-sift
dudes

View File

@ -0,0 +1,11 @@
:: Helm: Adjust Gall verbosity
::
:: List of diagnostic flags is in verb:gall in zuse.hoon, documented in
:: gall.hoon
::
/? 310
::
:- %say
|= [^ veb=(list verb:gall) ~]
:- %helm-gall-verb
veb

View File

@ -77,7 +77,8 @@
:: ::
++ de-gill :: gill from wire ++ de-gill :: gill from wire
|= way=wire ^- gill:gall |= way=wire ^- gill:gall
?>(?=([@ @ ~] way) [(slav %p i.way) i.t.way]) ~| way
?>(?=([@ @ *] way) [(slav %p i.way) i.t.way])
-- --
:: TODO: remove .ost :: TODO: remove .ost
:: ::

View File

@ -230,6 +230,14 @@
|= veb=(list verb:ames) =< abet |= veb=(list verb:ames) =< abet
(emit %pass /helm %arvo %a %spew veb) (emit %pass /helm %arvo %a %spew veb)
:: ::
++ poke-gall-sift
|= dudes=(list dude:gall) =< abet
(emit %pass /helm %arvo %g %sift dudes)
::
++ poke-gall-verb
|= veb=(list verb:gall) =< abet
(emit %pass /helm %arvo %g %spew veb)
::
++ poke-ames-wake ++ poke-ames-wake
|= ~ =< abet |= ~ =< abet
(emit %pass /helm %arvo %a %stir '') (emit %pass /helm %arvo %a %stir '')
@ -269,6 +277,8 @@
%helm-code =;(f (f !<(_+<.f vase)) poke-code) %helm-code =;(f (f !<(_+<.f vase)) poke-code)
%helm-cors-approve =;(f (f !<(_+<.f vase)) poke-cors-approve) %helm-cors-approve =;(f (f !<(_+<.f vase)) poke-cors-approve)
%helm-cors-reject =;(f (f !<(_+<.f vase)) poke-cors-reject) %helm-cors-reject =;(f (f !<(_+<.f vase)) poke-cors-reject)
%helm-gall-sift =;(f (f !<(_+<.f vase)) poke-gall-sift)
%helm-gall-verb =;(f (f !<(_+<.f vase)) poke-gall-verb)
%helm-hi =;(f (f !<(_+<.f vase)) poke-hi) %helm-hi =;(f (f !<(_+<.f vase)) poke-hi)
%helm-knob =;(f (f !<(_+<.f vase)) poke-knob) %helm-knob =;(f (f !<(_+<.f vase)) poke-knob)
%helm-pans =;(f (f !<(_+<.f vase)) poke-pans) %helm-pans =;(f (f !<(_+<.f vase)) poke-pans)

View File

@ -351,6 +351,7 @@
:: %hear: packet from unix :: %hear: packet from unix
:: %heed: track peer's responsiveness; gives %clog if slow :: %heed: track peer's responsiveness; gives %clog if slow
:: %jilt: stop tracking peer's responsiveness :: %jilt: stop tracking peer's responsiveness
:: %cork: request to delete message flow
:: %plea: request to send message :: %plea: request to send message
:: ::
:: System and Lifecycle Tasks :: System and Lifecycle Tasks
@ -367,6 +368,7 @@
$% [%hear =lane =blob] $% [%hear =lane =blob]
[%heed =ship] [%heed =ship]
[%jilt =ship] [%jilt =ship]
[%cork =ship]
$>(%plea vane-task) $>(%plea vane-task)
:: ::
$>(%born vane-task) $>(%born vane-task)
@ -511,6 +513,9 @@
:: entry and emit a nack to the local vane that asked us to send :: entry and emit a nack to the local vane that asked us to send
:: the message. :: the message.
:: heeds: listeners for %clog notifications :: heeds: listeners for %clog notifications
:: closing: bones closed on the sender side
:: corked: bones closed on both sender and receiver
:: krocs: bones that need to be sent again to the publisher
:: ::
+$ peer-state +$ peer-state
$: $: =symmetric-key $: $: =symmetric-key
@ -526,6 +531,9 @@
rcv=(map bone message-sink-state) rcv=(map bone message-sink-state)
nax=(set [=bone =message-num]) nax=(set [=bone =message-num])
heeds=(set duct) heeds=(set duct)
closing=(set bone)
corked=(set bone)
krocs=(set bone)
== ==
:: $qos: quality of service; how is our connection to a peer doing? :: $qos: quality of service; how is our connection to a peer doing?
:: ::
@ -1655,11 +1663,12 @@
$>(%trim vane-task) :: trim state $>(%trim vane-task) :: trim state
$>(%vega vane-task) :: report upgrade $>(%vega vane-task) :: report upgrade
$>(%plea vane-task) :: network request $>(%plea vane-task) :: network request
[%spew veb=(list verb)] :: set verbosity
[%sift dudes=(list dude)] :: per agent
== :: == ::
+$ bitt (map duct (pair ship path)) :: incoming subs +$ bitt (map duct (pair ship path)) :: incoming subs
+$ boat :: outgoing subs +$ boat (map [=wire =ship =term] [acked=? =path]) :: outgoing subs
%+ map [=wire =ship =term] :: +$ boar (map [=wire =ship =term] nonce=@) :: and their nonces
[acked=? =path] ::
+$ bowl :: standard app state +$ bowl :: standard app state
$: $: our=ship :: host $: $: our=ship :: host
src=ship :: guest src=ship :: guest
@ -1695,6 +1704,9 @@
$% [%raw-fact =mark =noun] $% [%raw-fact =mark =noun]
sign:agent sign:agent
== ==
:: TODO: add more flags?
::
+$ verb ?(%odd)
:: ::
:: +agent: app core :: +agent: app core
:: ::

View File

@ -598,6 +598,26 @@
:: ::
+$ naxplanation [=message-num =error] +$ naxplanation [=message-num =error]
:: ::
+| %statics
::
:: $ames-state: state for entire vane
::
:: peers: states of connections to other ships
:: unix-duct: handle to give moves to unix
:: life: our $life; how many times we've rekeyed
:: crypto-core: interface for encryption and signing
:: bug: debug printing configuration
:: corks: wires for cork flows pending publisher update
::
+$ ames-state
$: peers=(map ship ship-state)
=unix=duct
=life
crypto-core=acru:ames
=bug
corks=(set wire)
==
::
+$ ames-state-4 ames-state-5 +$ ames-state-4 ames-state-5
+$ ames-state-5 +$ ames-state-5
$: peers=(map ship ship-state-5) $: peers=(map ship ship-state-5)
@ -628,17 +648,36 @@
heeds=(set duct) heeds=(set duct)
== ==
:: ::
+| %statics +$ ames-state-6
$: peers=(map ship ship-state-6)
=unix=duct
=life
crypto-core=acru:ames
=bug
==
:: ::
:: $ames-state: state for entire vane +$ ship-state-6
$% [%alien alien-agenda]
[%known peer-state-6]
==
:: ::
:: peers: states of connections to other ships +$ peer-state-6
:: unix-duct: handle to give moves to unix $: $: =symmetric-key
:: life: our $life; how many times we've rekeyed =life
:: crypto-core: interface for encryption and signing =rift
:: bug: debug printing configuration =public-key
sponsor=ship
==
route=(unit [direct=? =lane])
=qos
=ossuary
snd=(map bone message-pump-state)
rcv=(map bone message-sink-state)
nax=(set [=bone =message-num])
heeds=(set duct)
==
:: ::
+$ ames-state +$ ames-state-7
$: peers=(map ship ship-state) $: peers=(map ship ship-state)
=unix=duct =unix=duct
=life =life
@ -730,12 +769,16 @@
:: $message-pump-gift: effect from |message-pump :: $message-pump-gift: effect from |message-pump
:: ::
:: %done: report message acknowledgment :: %done: report message acknowledgment
:: %cork: kill flow
:: %kroc: recork this bone
:: %send: emit message fragment :: %send: emit message fragment
:: %wait: set a new timer at .date :: %wait: set a new timer at .date
:: %rest: cancel timer at .date :: %rest: cancel timer at .date
:: ::
+$ message-pump-gift +$ message-pump-gift
$% [%done =message-num error=(unit error)] $% [%done =message-num error=(unit error)]
[%cork ~]
[%kroc =bone]
[%send =static-fragment] [%send =static-fragment]
[%wait date=@da] [%wait date=@da]
[%rest date=@da] [%rest date=@da]
@ -774,7 +817,7 @@
:: .ok: %.y unless previous failed attempt :: .ok: %.y unless previous failed attempt
:: ::
+$ message-sink-task +$ message-sink-task
$% [%done ok=?] $% [%done ok=? cork=?]
[%drop =message-num] [%drop =message-num]
[%hear =lane =shut-packet ok=?] [%hear =lane =shut-packet ok=?]
== ==
@ -786,6 +829,7 @@
+$ message-sink-gift +$ message-sink-gift
$% [%memo =message-num message=*] $% [%memo =message-num message=*]
[%send =message-num =ack-meat] [%send =message-num =ack-meat]
[%cork ~]
== ==
-- --
:: external vane interface :: external vane interface
@ -795,7 +839,7 @@
:: ::
=< =* adult-gate . =< =* adult-gate .
=| queued-events=(qeu queued-event) =| queued-events=(qeu queued-event)
=| cached-state=(unit [%5 ames-state-5]) =| cached-state=(unit $%([%5 ames-state-5] [%6 ames-state-6] [%7 ames-state-7]))
:: ::
|= [now=@da eny=@ rof=roof] |= [now=@da eny=@ rof=roof]
=* larval-gate . =* larval-gate .
@ -821,7 +865,9 @@
:: ::
?: &(?=(^ cached-state) ?=(~ queued-events)) ?: &(?=(^ cached-state) ?=(~ queued-events))
=^ moves adult-gate (call:adult-core duct dud task) =^ moves adult-gate (call:adult-core duct dud task)
(molt moves) %- molt
~> %slog.0^leaf/"ames: init daily recork timer"
:_(moves [duct %pass /recork %b %wait `@da`(add now ~d1)])
:: %born: set .unix-duct and start draining .queued-events :: %born: set .unix-duct and start draining .queued-events
:: ::
?: ?=(%born -.task) ?: ?=(%born -.task)
@ -903,9 +949,12 @@
:: .queued-events has been cleared; metamorphose :: .queued-events has been cleared; metamorphose
:: ::
?~ queued-events ?~ queued-events
?: ?=(^ cached-state) (molt moves) ?. ?=(^ cached-state)
~> %slog.0^leaf/"ames: metamorphosis" ~> %slog.0^leaf/"ames: metamorphosis"
[moves adult-gate] [moves adult-gate]
%- molt
~> %slog.0^leaf/"ames: init daily recork timer"
:_(moves [duct %pass /recork %b %wait `@da`(add now ~d1)])
:: set timer to drain next event :: set timer to drain next event
:: ::
=. moves :_(moves [duct %pass /larva %b %wait now]) =. moves :_(moves [duct %pass /larva %b %wait now])
@ -913,7 +962,7 @@
:: lifecycle arms; mostly pass-throughs to the contained adult ames :: lifecycle arms; mostly pass-throughs to the contained adult ames
:: ::
++ scry scry:adult-core ++ scry scry:adult-core
++ stay [%6 %larva queued-events ames-state.adult-gate] ++ stay [%8 %larva queued-events ames-state.adult-gate]
++ load ++ load
|= $= old |= $= old
$% $: %4 $% $: %4
@ -931,6 +980,20 @@
[%adult state=ames-state-5] [%adult state=ames-state-5]
== == == ==
$: %6 $: %6
$% $: %larva
events=(qeu queued-event)
state=ames-state-6
==
[%adult state=ames-state-6]
== ==
$: %7
$% $: %larva
events=(qeu queued-event)
state=ames-state-7
==
[%adult state=ames-state-7]
== ==
$: %8
$% $: %larva $% $: %larva
events=(qeu queued-event) events=(qeu queued-event)
state=_ames-state.adult-gate state=_ames-state.adult-gate
@ -955,22 +1018,47 @@
=. queued-events events.old =. queued-events events.old
larval-gate larval-gate
:: ::
[%6 %adult *] (load:adult-core %6 state.old) [%6 %adult *]
=. cached-state `[%6 state.old]
~> %slog.0^leaf/"ames: larva reload"
larval-gate
:: ::
[%6 %larva *] [%6 %larva *]
~> %slog.0^leaf/"ames: larva: load" ~> %slog.0^leaf/"ames: larva: load"
=. queued-events events.old =. queued-events events.old
=. adult-gate (load:adult-core %6 state.old)
larval-gate larval-gate
::
[%7 %adult *]
=. cached-state `[%7 state.old]
~> %slog.0^leaf/"ames: larva reload"
larval-gate
::
[%7 %larva *]
~> %slog.0^leaf/"ames: larva: load"
=. queued-events events.old
larval-gate
::
[%8 %adult *] (load:adult-core %8 state.old)
::
[%8 %larva *]
~> %slog.1^leaf/"ames: larva: load"
=. queued-events events.old
=. adult-gate (load:adult-core %8 state.old)
larval-gate
::
== ==
:: +molt: re-evolve to adult-ames :: +molt: re-evolve to adult-ames
:: ::
++ molt ++ molt
|= moves=(list move) |= moves=(list move)
^- (quip move _adult-gate) ^- (quip move _adult-gate)
=? cached-state &(?=(^ cached-state) ?=(%5 +<.cached-state))
`%6^(state-5-to-6:load:adult-core +.u.cached-state)
=? cached-state &(?=(^ cached-state) ?=(%6 +<.cached-state))
`%7^(state-6-to-7:load:adult-core +.u.cached-state)
=. ames-state.adult-gate =. ames-state.adult-gate
?> ?=(^ cached-state) ?> &(?=(^ cached-state) ?=(%7 +<.cached-state))
(state-5-to-6:load:adult-core +.u.cached-state) (state-7-to-8:load:adult-core +.u.cached-state)
=. cached-state ~ =. cached-state ~
~> %slog.0^leaf/"ames: metamorphosis reload" ~> %slog.0^leaf/"ames: metamorphosis reload"
[moves adult-gate] [moves adult-gate]
@ -1015,6 +1103,7 @@
%trim on-trim:event-core %trim on-trim:event-core
%vega on-vega:event-core %vega on-vega:event-core
%plea (on-plea:event-core [ship plea]:task) %plea (on-plea:event-core [ship plea]:task)
%cork (on-cork:event-core ship.task)
== ==
:: ::
[moves ames-gate] [moves ames-gate]
@ -1045,20 +1134,23 @@
[moves ames-gate] [moves ames-gate]
:: +stay: extract state before reload :: +stay: extract state before reload
:: ::
++ stay [%6 %adult ames-state] ++ stay [%8 %adult ames-state]
:: +load: load in old state after reload :: +load: load in old state after reload
:: ::
++ load ++ load
=< |= old-state=[%6 ^ames-state] =< |= $= old-state
$% [%8 ^ames-state]
==
^+ ames-gate ^+ ames-gate
?> ?=(%6 -.old-state) ?> ?=(%8 -.old-state)
ames-gate(ames-state +.old-state) ames-gate(ames-state +.old-state)
::
|% |%
:: +state-4-to-5 called from larval-ames :: +state-4-to-5 called from larval-ames
:: ::
++ state-4-to-5 ++ state-4-to-5
|= ames-state=ames-state-4 |= ames-state=ames-state-4
^- ames-state-4 ^- ames-state-5
=. peers.ames-state =. peers.ames-state
%- ~(run by peers.ames-state) %- ~(run by peers.ames-state)
|= ship-state=ship-state-4 |= ship-state=ship-state-4
@ -1076,11 +1168,11 @@
:: ::
++ state-5-to-6 ++ state-5-to-6
|= ames-state=ames-state-5 |= ames-state=ames-state-5
^- ^^ames-state ^- ames-state-6
:_ +.ames-state :_ +.ames-state
%- ~(rut by peers.ames-state) %- ~(rut by peers.ames-state)
|= [=ship ship-state=ship-state-5] |= [=ship ship-state=ship-state-5]
^- ^ship-state ^- ship-state-6
?. ?=(%known -.ship-state) ?. ?=(%known -.ship-state)
ship-state ship-state
=/ peer-state=peer-state-5 +.ship-state =/ peer-state=peer-state-5 +.ship-state
@ -1091,12 +1183,37 @@
;; @ud ;; @ud
=< q.q %- need %- need =< q.q %- need %- need
(rof ~ %j `beam`[[our %rift %da now] /(scot %p ship)]) (rof ~ %j `beam`[[our %rift %da now] /(scot %p ship)])
=/ =^peer-state :- -.ship-state
:_ +.peer-state :_ +.peer-state
=, -.peer-state =, -.peer-state
[symmetric-key life rift public-key sponsor] [symmetric-key life rift public-key sponsor]
:: +state-6-to-7 called from larval-ames
::
++ state-6-to-7
|= ames-state=ames-state-6
^- ames-state-7
:_ +.ames-state
%- ~(run by peers.ames-state)
|= ship-state=ship-state-6
^- ^ship-state ^- ^ship-state
[-.ship-state peer-state] ?. ?=(%known -.ship-state)
ship-state
:- %known
^- peer-state
:- +<.ship-state
[route qos ossuary snd rcv nax heeds ~ ~ ~]:ship-state
:: +state-7-to-8 called from larval-ames
::
++ state-7-to-8
|= ames-state=ames-state-7
^- ^^ames-state
:* peers.ames-state
unix-duct.ames-state
life.ames-state
crypto-core.ames-state
bug.ames-state
*(set wire)
==
-- --
:: +scry: dereference namespace :: +scry: dereference namespace
:: ::
@ -1134,6 +1251,7 @@
:: /ax/peers/[ship]/forward-lane (list lane) :: /ax/peers/[ship]/forward-lane (list lane)
:: /ax/bones/[ship] [snd=(set bone) rcv=(set bone)] :: /ax/bones/[ship] [snd=(set bone) rcv=(set bone)]
:: /ax/snd-bones/[ship]/[bone] vase :: /ax/snd-bones/[ship]/[bone] vase
:: /ax/corks (list wire)
:: ::
?. ?=(%x ren) ~ ?. ?=(%x ren) ~
?+ tyl ~ ?+ tyl ~
@ -1209,6 +1327,9 @@
=/ res =/ res
u.mps u.mps
``noun+!>(!>(res)) ``noun+!>(!>(res))
::
[%corks ~]
``noun+!>(~(tap in corks.ames-state))
== ==
-- --
:: |per-event: inner event-handling core :: |per-event: inner event-handling core
@ -1271,13 +1392,15 @@
++ send-ack ++ send-ack
|= =bone |= =bone
^+ event-core ^+ event-core
abet:(run-message-sink:peer-core bone %done ok=%.y) =/ cork=? (~(has in closing.peer-state) bone)
abet:(run-message-sink:peer-core bone %done ok=%.y cork)
:: failed; send message nack packet :: failed; send message nack packet
:: ::
++ send-nack ++ send-nack
|= [=bone =^error] |= [=bone =^error]
^+ event-core ^+ event-core
=. event-core abet:(run-message-sink:peer-core bone %done ok=%.n) =. event-core
abet:(run-message-sink:peer-core bone %done ok=%.n cork=%.n)
=/ =^peer-state (got-peer-state her) =/ =^peer-state (got-peer-state her)
=/ =^channel [[our her] now channel-state -.peer-state] =/ =^channel [[our her] now channel-state -.peer-state]
:: construct nack-trace message, referencing .failed $message-num :: construct nack-trace message, referencing .failed $message-num
@ -1638,7 +1761,35 @@
=/ sndr [our our-life.channel] =/ sndr [our our-life.channel]
=/ rcvr [ship her-life.channel] =/ rcvr [ship her-life.channel]
"plea {<sndr^rcvr^bone=bone^vane.plea^path.plea>}" "plea {<sndr^rcvr^bone=bone^vane.plea^path.plea>}"
:: since flow kill goes like:
:: client vane cork task -> client ames pass cork as plea ->
:: -> server ames sinks plea -> server ames +on-plea (we are here);
:: if it's %cork plea passed to ames from its sink,
:: give %done and process flow closing after +on-take-done call
:: ::
?: &(=(vane.plea %a) =(path.plea `path`/close) ?=(~ payload.plea))
(emit duct %give %done ~)
abet:(on-memo:(make-peer-core peer-state channel) bone plea %plea)
:: +on-cork: handle request to kill a flow
::
++ on-cork
|= =ship
^+ event-core
=/ ship-state (~(get by peers.ames-state) ship)
::
?> ?=([~ %known *] ship-state)
=/ =peer-state +.u.ship-state
=/ =channel [[our ship] now channel-state -.peer-state]
::
=^ =bone ossuary.peer-state (bind-duct ossuary.peer-state duct)
=/ =plea [%$ /flow [%cork ~]]
::
=. closing.peer-state (~(put in closing.peer-state) bone)
%- %^ trace msg.veb ship
|. ^- tape
=/ sndr [our our-life.channel]
=/ rcvr [ship her-life.channel]
"cork plea {<sndr^rcvr^bone=bone^vane.plea^path.plea>}"
abet:(on-memo:(make-peer-core peer-state channel) bone plea %plea) abet:(on-memo:(make-peer-core peer-state channel) bone plea %plea)
:: +on-take-wake: receive wakeup or error notification from behn :: +on-take-wake: receive wakeup or error notification from behn
:: ::
@ -1660,19 +1811,32 @@
event-core event-core
(request-attestation u.ship) (request-attestation u.ship)
:: ::
=/ res=(unit [her=ship =bone]) (parse-pump-timer-wire wire) |^
?~ res ?. ?=([%recork ~] wire) (handle-single-wire wire)
%- (slog leaf+"ames: got timer for strange wire: {<wire>}" ~) =/ wires=(list ^wire) ~(tap in corks.ames-state)
event-core |- ^+ event-core
?^ wires
$(wires t.wires, event-core (handle-single-wire i.wires))
(emit duct %pass /recork %b %wait `@da`(add now ~d1))
:: ::
=/ state=(unit peer-state) (get-peer-state her.u.res) ++ handle-single-wire
?~ state |= =^wire
%- (slog leaf+"ames: got timer for strange ship: {<her.u.res>}, ignoring" ~) ^+ event-core
event-core =/ res=(unit [her=ship =bone]) (parse-pump-timer-wire wire)
:: ?~ res
=/ =channel [[our her.u.res] now channel-state -.u.state] %- (slog leaf+"ames: got timer for strange wire: {<wire>}" ~)
:: event-core
abet:(on-wake:(make-peer-core u.state channel) bone.u.res error) ::
=/ state=(unit peer-state) (get-peer-state her.u.res)
?~ state
%. event-core
%- slog
[leaf+"ames: got timer for strange ship: {<her.u.res>}, ignoring" ~]
::
=/ =channel [[our her.u.res] now channel-state -.u.state]
::
abet:(on-wake:(make-peer-core u.state channel) bone.u.res error)
--
:: +on-init: first boot; subscribe to our info from jael :: +on-init: first boot; subscribe to our info from jael
:: ::
++ on-init ++ on-init
@ -1718,7 +1882,6 @@
:: ::
?- public-keys-result ?- public-keys-result
[%diff @ %rift *] [%diff @ %rift *]
:: event-core
(on-publ-rift [who to.diff]:public-keys-result) (on-publ-rift [who to.diff]:public-keys-result)
:: ::
[%diff @ %keys *] [%diff @ %keys *]
@ -1926,6 +2089,7 @@
:: ::
=. qos.peer-state [%unborn now] =. qos.peer-state [%unborn now]
=. life.peer-state life.point =. life.peer-state life.point
=. rift.peer-state rift.point
=. public-key.peer-state public-key =. public-key.peer-state public-key
=. symmetric-key.peer-state symmetric-key =. symmetric-key.peer-state symmetric-key
=. sponsor.peer-state =. sponsor.peer-state
@ -2349,6 +2513,12 @@
=(1 current:(~(got by snd.peer-state) bone)) =(1 current:(~(got by snd.peer-state) bone))
== ==
(send-blob | her.channel (attestation-packet [her her-life]:channel)) (send-blob | her.channel (attestation-packet [her her-life]:channel))
?: (~(has in corked.peer-state) bone)
:: if the bone was corked the flow doesn't exist anymore
:: TODO: clean up corked bones in the peer state when it's _safe_?
:: (e.g. if this bone is N blocks behind the next one)
::
peer-core
:: maybe resend some timed out packets :: maybe resend some timed out packets
:: ::
(run-message-pump bone %wake ~) (run-message-pump bone %wake ~)
@ -2390,7 +2560,8 @@
=/ =message-pump-state =/ =message-pump-state
(~(gut by snd.peer-state) bone *message-pump-state) (~(gut by snd.peer-state) bone *message-pump-state)
:: ::
=/ message-pump (make-message-pump message-pump-state channel) =/ close=? (~(has in closing.peer-state) bone)
=+ message-pump=(make-message-pump message-pump-state channel close bone)
=^ pump-gifts message-pump-state (work:message-pump task) =^ pump-gifts message-pump-state (work:message-pump task)
=. snd.peer-state (~(put by snd.peer-state) bone message-pump-state) =. snd.peer-state (~(put by snd.peer-state) bone message-pump-state)
:: process effects from |message-pump :: process effects from |message-pump
@ -2401,6 +2572,8 @@
=. peer-core =. peer-core
?- -.gift ?- -.gift
%done (on-pump-done [message-num error]:gift) %done (on-pump-done [message-num error]:gift)
%cork (on-pump-cork current.message-pump-state)
%kroc (on-pump-kroc bone:gift)
%send (on-pump-send static-fragment.gift) %send (on-pump-send static-fragment.gift)
%wait (on-pump-wait date.gift) %wait (on-pump-wait date.gift)
%rest (on-pump-rest date.gift) %rest (on-pump-rest date.gift)
@ -2411,6 +2584,16 @@
++ on-pump-done ++ on-pump-done
|= [=message-num error=(unit error)] |= [=message-num error=(unit error)]
^+ peer-core ^+ peer-core
?: ?& =(1 (end 0 bone))
=(1 (end 0 (rsh 0 bone)))
(~(has in corked.peer-state) (mix 0b10 bone))
==
%- %+ trace msg.veb
=/ dat [her.channel bone=bone message-num=message-num -.task]
|.("remove naxplanation flow {<dat>}")
=. snd.peer-state
(~(del by snd.peer-state) bone)
peer-core
:: if odd bone, ack is on "subscription update" message; no-op :: if odd bone, ack is on "subscription update" message; no-op
:: ::
?: =(1 (end 0 bone)) ?: =(1 (end 0 bone))
@ -2423,13 +2606,53 @@
=/ target-bone=^bone (mix 0b10 bone) =/ target-bone=^bone (mix 0b10 bone)
:: ::
(run-message-sink target-bone %drop message-num) (run-message-sink target-bone %drop message-num)
?: &(close ?=(%near -.task))
:: if the bone belongs to a closing flow and we got a naxplanation,
:: don't relay the ack to the client vane, and wait for the next try
::
peer-core
:: not a nack-trace bone; relay ack to client vane :: not a nack-trace bone; relay ack to client vane
:: ::
(emit (got-duct bone) %give %done error) (emit (got-duct bone) %give %done error)
:: +on-pump-cork: kill flow on cork sender side
::
++ on-pump-cork
|= =message-num
^+ peer-core
:: clear all packets from this message from the packet pump
::
=. message-pump (run-packet-pump:message-pump %done message-num *@dr)
=/ =wire (make-pump-timer-wire her.channel bone)
=. corks.ames-state (~(del in corks.ames-state) wire)
=/ nack-bone=^bone (mix 0b10 bone)
=? rcv.peer-state (~(has by rcv.peer-state) nack-bone)
:: if the publisher was behind we remove nacks received on that bone
::
(~(del by rcv.peer-state) nack-bone)
=. peer-state
=, peer-state
%_ peer-state
snd (~(del by snd) bone)
rcv (~(del by rcv) bone)
corked (~(put in corked) bone)
closing (~(del in closing) bone)
krocs (~(del in krocs) bone)
by-duct.ossuary (~(del by by-duct.ossuary) (got-duct bone))
by-bone.ossuary (~(del by by-bone.ossuary) bone)
==
peer-core
:: +on-pump-krock: if we get a nack for a cork, add it to the recork set
::
++ on-pump-kroc
|= =^bone
^+ peer-core
=. krocs.peer-state (~(put in krocs.peer-state) bone)
peer-core
:: +on-pump-send: emit message fragment requested by |message-pump :: +on-pump-send: emit message fragment requested by |message-pump
:: ::
++ on-pump-send ++ on-pump-send
|=(f=static-fragment (send-shut-packet bone [message-num %& +]:f)) |= f=static-fragment
(send-shut-packet bone [message-num %& +]:f)
:: +on-pump-wait: relay |message-pump's set-timer request :: +on-pump-wait: relay |message-pump's set-timer request
:: ::
++ on-pump-wait ++ on-pump-wait
@ -2454,13 +2677,15 @@
++ run-message-sink ++ run-message-sink
|= [=bone task=message-sink-task] |= [=bone task=message-sink-task]
^+ peer-core ^+ peer-core
?: (~(has in corked.peer-state) bone) peer-core
:: pass .task to the |message-sink and apply state mutations :: pass .task to the |message-sink and apply state mutations
:: ::
=/ =message-sink-state =/ =message-sink-state
(~(gut by rcv.peer-state) bone *message-sink-state) (~(gut by rcv.peer-state) bone *message-sink-state)
:: ::
=/ message-sink (make-message-sink message-sink-state channel) =/ message-sink (make-message-sink message-sink-state channel)
=^ sink-gifts message-sink-state (work:message-sink task) =/ closing=? (~(has in closing.peer-state) bone)
=^ sink-gifts message-sink-state (work:message-sink closing task)
=. rcv.peer-state (~(put by rcv.peer-state) bone message-sink-state) =. rcv.peer-state (~(put by rcv.peer-state) bone message-sink-state)
:: process effects from |message-sink :: process effects from |message-sink
:: ::
@ -2471,8 +2696,32 @@
?- -.gift ?- -.gift
%memo (on-sink-memo [message-num message]:gift) %memo (on-sink-memo [message-num message]:gift)
%send (on-sink-send [message-num ack-meat]:gift) %send (on-sink-send [message-num ack-meat]:gift)
%cork on-sink-cork
== ==
$(sink-gifts t.sink-gifts) $(sink-gifts t.sink-gifts)
:: +on-sink-cork: handle flow kill after server ames has taken %done
::
++ on-sink-cork
^+ peer-core
=/ =message-pump-state
(~(gut by snd.peer-state) bone *message-pump-state)
=? peer-core ?=(^ next-wake.packet-pump-state.message-pump-state)
=* next-wake u.next-wake.packet-pump-state.message-pump-state
=/ =wire (make-pump-timer-wire her.channel bone)
:: resetting timer for boons
::
(emit [/ames]~ %pass wire %b %rest next-wake)
=/ nax-bone=^bone (mix 0b10 bone)
=. peer-state
=, peer-state
%_ peer-state
rcv (~(del by rcv) bone)
snd (~(del by snd) bone)
corked (~(put in corked) bone)
closing (~(del in closing) bone)
krocs (~(del in krocs) bone)
==
peer-core
:: +on-sink-send: emit ack packet as requested by |message-sink :: +on-sink-send: emit ack packet as requested by |message-sink
:: ::
++ on-sink-send ++ on-sink-send
@ -2506,10 +2755,14 @@
++ on-sink-boon ++ on-sink-boon
|= [=message-num message=*] |= [=message-num message=*]
^+ peer-core ^+ peer-core
?: ?| (~(has in closing.peer-state) bone)
(~(has in corked.peer-state) bone)
==
peer-core
:: send ack unconditionally :: send ack unconditionally
:: ::
=. peer-core (emit (got-duct bone) %give %boon message) =. peer-core (emit (got-duct bone) %give %boon message)
=. peer-core (run-message-sink bone %done ok=%.y) =. peer-core (run-message-sink bone %done ok=%.y cork=%.n)
:: ::
?. ?=([%hear * * ok=%.n] task) ?. ?=([%hear * * ok=%.n] task)
:: fresh boon; give message to client vane :: fresh boon; give message to client vane
@ -2547,52 +2800,91 @@
=+ ;; =naxplanation message =+ ;; =naxplanation message
:: ack nack-trace message (only applied if we don't later crash) :: ack nack-trace message (only applied if we don't later crash)
:: ::
=. peer-core (run-message-sink bone %done ok=%.y) =. peer-core (run-message-sink bone %done ok=%.y cork=%.n)
:: flip .bone's second bit to find referenced flow :: flip .bone's second bit to find referenced flow
:: ::
=/ target-bone=^bone (mix 0b10 bone) =/ target-bone=^bone (mix 0b10 bone)
:: notify |message-pump that this message got naxplained :: notify |message-pump that this message got naxplained
:: ::
(run-message-pump target-bone %near naxplanation) =. peer-core (run-message-pump target-bone %near naxplanation)
::
?. (~(has in krocs.peer-state) target-bone)
peer-core
=/ =message-pump-state
(~(gut by snd.peer-state) target-bone *message-pump-state)
=/ message-pump
(make-message-pump message-pump-state channel %.y target-bone)
:: we don't process the gifts here and instead wait for
:: the timer to handle the %cork plea added to the pump
::
=^ * message-pump-state
%- work:message-pump
%memo^(dedup-message (jim [%$ /flow [%cork ~]]))
=. snd.peer-state
(~(put by snd.peer-state) target-bone message-pump-state)
:: if we get a naxplanation for a %cork, the publisher is behind
:: receiving the OTA, so we set up a timer to retry in one day.
::
%- %+ trace msg.veb
|.("old publisher, resend %cork on bone={<target-bone>} in ~d1")
=/ =wire (make-pump-timer-wire her.channel target-bone)
=. corks.ames-state (~(put in corks.ames-state) wire)
peer-core
:: +on-sink-plea: handle request message received by |message-sink :: +on-sink-plea: handle request message received by |message-sink
:: ::
++ on-sink-plea ++ on-sink-plea
|= [=message-num message=*] |= [=message-num message=*]
^+ peer-core ^+ peer-core
?: ?| (~(has in closing.peer-state) bone)
(~(has in corked.peer-state) bone)
==
peer-core
|^
%- %+ trace msg.veb %- %+ trace msg.veb
=/ dat [her.channel bone=bone message-num=message-num] =/ dat [her.channel bone=bone message-num=message-num]
|.("sink plea {<dat>}") |.("sink plea {<dat>}")
:: is this the first time we're trying to process this message? :: is this the first time we're trying to process this message?
:: ::
?. ?=([%hear * * ok=%.n] task) ?: ?=([%hear * * ok=%.n] task)
:: fresh plea; pass to client vane :: we previously crashed on this message; send nack
::
=+ ;; =plea message
::
=/ =wire (make-bone-wire her.channel her-rift.channel bone)
:: ::
nack-plea
:: fresh plea; pass to client vane
::
=+ ;; =plea message
=/ =wire (make-bone-wire her.channel her-rift.channel bone)
::
?. =(vane.plea %$)
?+ vane.plea ~| %ames-evil-vane^our^her.channel^vane.plea !! ?+ vane.plea ~| %ames-evil-vane^our^her.channel^vane.plea !!
%c (emit duct %pass wire %c %plea her.channel plea) %c (emit duct %pass wire %c %plea her.channel plea)
%g (emit duct %pass wire %g %plea her.channel plea) %g (emit duct %pass wire %g %plea her.channel plea)
%j (emit duct %pass wire %j %plea her.channel plea) %j (emit duct %pass wire %j %plea her.channel plea)
== ==
:: we previously crashed on this message; send nack :: a %cork plea is handled using %$ as the recipient vane to
:: account for publishers that still handle ames-to-ames %pleas
:: ::
=. peer-core (run-message-sink bone %done ok=%.n) ?> &(?=([%cork *] payload.plea) ?=(%flow -.path.plea))
:: also send nack-trace with blank .error for security =. closing.peer-state (~(put in closing.peer-state) bone)
(emit duct %pass wire %a %plea her.channel [%a /close ~])
:: ::
=/ nack-trace-bone=^bone (mix 0b10 bone) ++ nack-plea
=/ =naxplanation [message-num *error] ^+ peer-core
=/ =message-blob (jam naxplanation) =. peer-core (run-message-sink bone %done ok=%.n cork=%.n)
:: :: send nack-trace with blank .error for security
(run-message-pump nack-trace-bone %memo message-blob) ::
=/ nack-trace-bone=^bone (mix 0b10 bone)
=/ =naxplanation [message-num *error]
=/ =message-blob (jam naxplanation)
::
(run-message-pump nack-trace-bone %memo message-blob)
--
-- --
-- --
-- --
:: +make-message-pump: constructor for |message-pump :: +make-message-pump: constructor for |message-pump
:: ::
++ make-message-pump ++ make-message-pump
|= [state=message-pump-state =channel] |= [state=message-pump-state =channel closing=? =bone]
=* veb veb.bug.channel =* veb veb.bug.channel
=| gifts=(list message-pump-gift) =| gifts=(list message-pump-gift)
:: ::
@ -2627,11 +2919,35 @@
%memo (on-memo message-blob.task) %memo (on-memo message-blob.task)
%wake (run-packet-pump %wake current.state) %wake (run-packet-pump %wake current.state)
%hear %hear
?- -.ack-meat.task ?- -.ack-meat.task
%& (on-hear [message-num fragment-num=p.ack-meat]:task) %&
%| (on-done [message-num ?:(ok.p.ack-meat [%ok ~] [%nack ~])]:task) (on-hear [message-num fragment-num=p.ack-meat]:task)
::
%|
=/ cork=?
=/ top-live
(pry:packet-queue:*make-packet-pump live.packet-pump-state.state)
:: If we send a %cork and get an ack, we can know by
:: sequence number that the ack is for the %cork message
::
?& closing
?=(^ top-live)
=(0 ~(wyt in unsent-messages.state))
=(0 (lent unsent-fragments.state))
=(1 ~(wyt by live.packet-pump-state.state))
=(message-num:task message-num.key.u.top-live)
==
=* ack p.ack-meat.task
=? message-pump &(cork !ok.ack) (give [%kroc bone])
=. message-pump
%- on-done
[[message-num:task ?:(ok.ack [%ok ~] [%nack ~])] cork]
?. &(!ok.ack cork) message-pump
%. message-pump
%+ trace odd.veb
|.("got nack for %cork {<bone=bone message-num=message-num:task>}")
== ==
%near (on-done [message-num %naxplanation error]:naxplanation.task) %near (on-done [[message-num %naxplanation error]:naxplanation.task %&])
== ==
:: +on-memo: handle request to send a message :: +on-memo: handle request to send a message
:: ::
@ -2663,10 +2979,11 @@
:: flows. :: flows.
:: ::
++ on-done ++ on-done
|= [=message-num =ack] |= [[=message-num =ack] cork=?]
^+ message-pump ^+ message-pump
:: unsent messages from the future should never get acked :: unsent messages from the future should never get acked
:: ::
~| [message-num next.state]
?> (lth message-num next.state) ?> (lth message-num next.state)
:: ignore duplicate message acks :: ignore duplicate message acks
:: ::
@ -2725,6 +3042,7 @@
?- -.u.cur ?- -.u.cur
%ok %ok
=. message-pump (give %done current.state ~) =. message-pump (give %done current.state ~)
=? message-pump cork (give %cork ~)
$(current.state +(current.state)) $(current.state +(current.state))
:: ::
%nack %nack
@ -3270,20 +3588,20 @@
:: +work: handle a $message-sink-task :: +work: handle a $message-sink-task
:: ::
++ work ++ work
|= task=message-sink-task |= [closing=? task=message-sink-task]
^+ [gifts state] ^+ [gifts state]
:: ::
=- [(flop gifts) state] =- [(flop gifts) state]
:: ::
?- -.task ?- -.task
%done (on-done ok.task) %done (on-done ok.task cork.task)
%drop (on-drop message-num.task) %drop (on-drop message-num.task)
%hear (on-hear [lane shut-packet ok]:task) %hear (on-hear closing [lane shut-packet ok]:task)
== ==
:: +on-hear: receive message fragment, possibly completing message :: +on-hear: receive message fragment, possibly completing message
:: ::
++ on-hear ++ on-hear
|= [=lane =shut-packet ok=?] |= [closing=? =lane =shut-packet ok=?]
^+ message-sink ^+ message-sink
:: we know this is a fragment, not an ack; expose into namespace :: we know this is a fragment, not an ack; expose into namespace
:: ::
@ -3320,13 +3638,14 @@
:: doesn't happen for boons. :: doesn't happen for boons.
:: ::
?: (lte seq last-heard.state) ?: (lte seq last-heard.state)
?: is-last-fragment ?: &(is-last-fragment !closing)
:: drop last packet since we don't know whether to ack or nack :: if not from a closing bone, drop last packet,
:: since we don't know whether to ack or nack
:: ::
%- %+ trace rcv.veb %- %+ trace rcv.veb
|. ^- tape |. ^- tape
=/ data =/ data
:* her.channel seq=seq :* her.channel seq=seq bone=bone
fragment-num=fragment-num num-fragments=num-fragments fragment-num=fragment-num num-fragments=num-fragments
la=last-acked.state lh=last-heard.state la=last-acked.state lh=last-heard.state
== ==
@ -3336,7 +3655,9 @@
:: ::
%- %+ trace rcv.veb |. %- %+ trace rcv.veb |.
=/ data =/ data
[seq=seq fragment-num=fragment-num num-fragments=num-fragments] :* seq=seq fragment-num=fragment-num
num-fragments=num-fragments closing=closing
==
"send ack-1 {<data>}" "send ack-1 {<data>}"
(give %send seq %& fragment-num) (give %send seq %& fragment-num)
:: last-heard<seq<10+last-heard; this is a packet in a live message :: last-heard<seq<10+last-heard; this is a packet in a live message
@ -3425,7 +3746,7 @@
:: +on-done: handle confirmation of message processing from vane :: +on-done: handle confirmation of message processing from vane
:: ::
++ on-done ++ on-done
|= ok=? |= [ok=? cork=?]
^+ message-sink ^+ message-sink
:: ::
=^ pending pending-vane-ack.state ~(get to pending-vane-ack.state) =^ pending pending-vane-ack.state ~(get to pending-vane-ack.state)
@ -3435,6 +3756,7 @@
=? nax.state !ok (~(put in nax.state) message-num) =? nax.state !ok (~(put in nax.state) message-num)
:: ::
=. message-sink (give %send message-num %| ok lag=`@dr`0) =. message-sink (give %send message-num %| ok lag=`@dr`0)
=? message-sink cork (give %cork ~)
=/ next ~(top to pending-vane-ack.state) =/ next ~(top to pending-vane-ack.state)
?~ next ?~ next
message-sink message-sink

View File

@ -4,17 +4,47 @@
:: ::
:::: ::::
|= our=ship |= our=ship
:: veb: verbosity flags
::
=/ veb-all-off
:: TODO: add more flags?
::
:* odd=`?`%.n :: unusual events
==
=, gall =, gall
=> =>
|% |%
+| %helpers
:: +trace: print if .verb is set and we're tracking .dude
::
++ trace
|= [verb=? =dude dudes=(set dude) print=tang]
^+ same
?. verb
same
?. => [dude=dude dudes=dudes in=in]
~+ |(=(~ dudes) (~(has in dudes) dude))
same
(slog print)
::
:: $bug: debug printing configuration
::
:: veb: verbosity toggles
:: dudes: app filter; if ~, print for all
::
+$ bug
$: veb=_veb-all-off
dudes=(set dude)
==
::
+| %main +| %main
:: ::
:: $move: Arvo-level move :: $move: Arvo-level move
:: ::
+$ move [=duct move=(wind note-arvo gift-arvo)] +$ move [=duct move=(wind note-arvo gift-arvo)]
:: $state-8: overall gall state, versioned :: $state-9: overall gall state, versioned
:: ::
+$ state-8 [%8 state] +$ state-9 [%9 state]
:: $state: overall gall state :: $state: overall gall state
:: ::
:: system-duct: TODO document :: system-duct: TODO document
@ -22,6 +52,7 @@
:: contacts: other ships we're in communication with :: contacts: other ships we're in communication with
:: yokes: running agents :: yokes: running agents
:: blocked: moves to agents that haven't been started yet :: blocked: moves to agents that haven't been started yet
:: bug: debug printing configuration
:: ::
+$ state +$ state
$: system-duct=duct $: system-duct=duct
@ -29,13 +60,8 @@
contacts=(set ship) contacts=(set ship)
yokes=(map term yoke) yokes=(map term yoke)
blocked=(map term (qeu blocked-move)) blocked=(map term (qeu blocked-move))
=bug
== ==
:: $watches: subscribers and publications
::
:: TODO: rename this, to $ties?
:: TODO: rename $boat and $bitt and document
::
+$ watches [inbound=bitt outbound=boat]
:: $routes: new cuff; TODO: document :: $routes: new cuff; TODO: document
:: ::
+$ routes +$ routes
@ -45,19 +71,26 @@
:: $yoke: agent runner state :: $yoke: agent runner state
:: ::
:: control-duct: TODO document :: control-duct: TODO document
:: live: is this agent running? TODO document better :: run-nonce: unique for each rebuild
:: sub-nonce: app-wide global %watch nonce
:: live: is this agent running? TODO document boarer
:: stats: TODO document :: stats: TODO document
:: watches: incoming and outgoing subscription state :: bitt: incoming subscriptions
:: boat: outgoing subscriptions
:: boar: and their nonces
:: agent: agent core :: agent: agent core
:: beak: compilation source :: beak: compilation source
:: marks: mark conversion requests :: marks: mark conversion requests
:: ::
+$ yoke +$ yoke
$: control-duct=duct $: control-duct=duct
nonce=@t run-nonce=@t
live=? ::TODO remove, replaced by -.agent sub-nonce=_1
live=?
=stats =stats
=watches =bitt
=boat
=boar
agent=(each agent vase) agent=(each agent vase)
=beak =beak
marks=(map duct mark) marks=(map duct mark)
@ -108,6 +141,7 @@
%poke %poke
%leave %leave
%missing %missing
%cork
== ==
:: |migrate: data structures for upgrades :: |migrate: data structures for upgrades
:: ::
@ -116,21 +150,25 @@
:: $spore: structures for update, produced by +stay :: $spore: structures for update, produced by +stay
:: ::
+$ spore +$ spore
$: %8 $: %9
system-duct=duct system-duct=duct
outstanding=(map [wire duct] (qeu remote-request)) outstanding=(map [wire duct] (qeu remote-request))
contacts=(set ship) contacts=(set ship)
eggs=(map term egg) eggs=(map term egg)
blocked=(map term (qeu blocked-move)) blocked=(map term (qeu blocked-move))
=bug
== ==
:: $egg: migratory agent state; $yoke with .old-state instead of .agent :: $egg: migratory agent state; $yoke with .old-state instead of .agent
:: ::
+$ egg +$ egg
$: control-duct=duct $: control-duct=duct
nonce=@t run-nonce=@t
sub-nonce=@
live=? live=?
=stats =stats
=watches =bitt
=boat
=boar
old-state=(each vase vase) old-state=(each vase vase)
=beak =beak
marks=(map duct mark) marks=(map duct mark)
@ -139,6 +177,7 @@
:: pupal gall core, on upgrade :: pupal gall core, on upgrade
:: ::
=< =* adult-gate . =< =* adult-gate .
=| spore-tag=@ud
=| =spore =| =spore
|= [now=@da eny=@uvJ rof=roof] |= [now=@da eny=@uvJ rof=roof]
=* pupal-gate . =* pupal-gate .
@ -164,10 +203,12 @@
[^duct %pass /whiz/gall %$ %whiz ~]~ [^duct %pass /whiz/gall %$ %whiz ~]~
=/ adult adult-core =/ adult adult-core
=. state.adult =. state.adult
[%8 system-duct outstanding contacts yokes=~ blocked]:spore [%9 system-duct outstanding contacts yokes=~ blocked bug]:spore
=/ mo-core (mo-abed:mo:adult duct) =/ mo-core (mo-abed:mo:adult system-duct.state.adult)
=/ apps=(list [dap=term =egg]) ~(tap by eggs.spore)
:: upgrade %base apps and suspend others
::
=. mo-core =. mo-core
=/ apps=(list [dap=term =egg]) ~(tap by eggs.spore)
|- ^+ mo-core |- ^+ mo-core
?~ apps mo-core ?~ apps mo-core
?. =(%base q.beak.egg.i.apps) ?. =(%base q.beak.egg.i.apps)
@ -185,6 +226,24 @@
(mean u.tan) (mean u.tan)
ap-core ap-core
$(apps t.apps, mo-core ap-abet:ap-core) $(apps t.apps, mo-core ap-abet:ap-core)
:: FIXME: ! kiln: %base not installed
:: ! /lib/hood/kiln/hoon:<[461 23].[461 42]>
:: ! /base/gall-lyv
:: kill subscriptions when upgrading to gall request queue fix
::
:: =? mo-core (lth spore-tag %9)
:: |- ^+ mo-core
:: ?~ apps mo-core
:: ~> %slog.[0 leaf+"gall: +ap-kill-down {<dap.i.apps>}"]
:: =/ ap-core (ap-abut:ap:mo-core i.apps)
:: =. ap-core
:: =/ boats=(list [=wire =dock])
:: ~(tap in ~(key by boat.egg.i.apps))
:: |- ^+ ap-core
:: ?~ boats ap-core
:: =/ [=wire =dock] i.boats
:: $(boats t.boats, ap-core (ap-kill-down:ap-core wire dock))
:: $(apps t.apps, mo-core ap-abet:ap-core)
=. mo-core (mo-subscribe-to-agent-builds:mo-core now) =. mo-core (mo-subscribe-to-agent-builds:mo-core now)
=^ moves adult-gate mo-abet:mo-core =^ moves adult-gate mo-abet:mo-core
=? moves ?=(^ fec) (weld moves [u.fec]~) =? moves ?=(^ fec) (weld moves [u.fec]~)
@ -223,9 +282,10 @@
:: ::
++ load ++ load
|^ |= old=spore-any |^ |= old=spore-any
=? old ?=(%7 -.old) =. spore-tag `@ud`-.old
(spore-7-to-8 old) =? old ?=(%7 -.old) (spore-7-to-8 old)
?> ?=(%8 -.old) =? old ?=(%8 -.old) (spore-8-to-9 old)
?> ?=(%9 -.old)
=. spore old =. spore old
?. =(~ eggs.spore) ?. =(~ eggs.spore)
pupal-gate pupal-gate
@ -234,32 +294,78 @@
state spore(eggs *(map term yoke)) state spore(eggs *(map term yoke))
== ==
:: ::
+$ spore-any $%(^spore spore-7) +$ spore-any $%(^spore spore-8 spore-7)
+$ spore-7 +$ spore-7
$: %7 $: %7
wipe-eyre-subs=_| ::NOTE band-aid for #3196 wipe-eyre-subs=_| ::NOTE band-aid for #3196
system-duct=duct system-duct=duct
outstanding=(map [wire duct] (qeu remote-request)) outstanding=(map [wire duct] (qeu remote-request))
contacts=(set ship) contacts=(set ship)
eggs=(map term egg) eggs=(map term egg-7)
blocked=(map term (qeu blocked-move)) blocked=(map term (qeu blocked-move))
== ==
:: ::
+$ spore-8
$: %8
system-duct=duct
outstanding=(map [wire duct] (qeu remote-request))
contacts=(set ship)
eggs=(map term egg-8)
blocked=(map term (qeu blocked-move))
==
::
+$ egg-7 egg-8
+$ egg-8
$: control-duct=duct
run-nonce=@t
live=?
=stats
watches=watches-8
old-state=(each vase vase)
=beak
marks=(map duct mark)
==
::
+$ watches-8 [inbound=bitt outbound=boat-8]
+$ boat-8 (map [wire ship term] [acked=? =path])
::
++ spore-7-to-8 ++ spore-7-to-8
|= old=spore-7 |= old=spore-7
^- ^spore ^- spore-8
:- %8 :- %8
=. eggs.old =. eggs.old
%- ~(urn by eggs.old) %- ~(urn by eggs.old)
|= [a=term e=egg] |= [a=term e=egg-7]
::NOTE kiln will kick off appropriate app revival ::NOTE kiln will kick off appropriate app revival
e(old-state [%| p.old-state.e]) e(old-state [%| p.old-state.e])
+>.old +>.old
::
++ spore-8-to-9
|= old=spore-8
^- ^spore
=- old(- %9, eggs -, blocked [blocked.old *bug])
%- ~(run by eggs.old)
|= =egg-8
^- egg
=/ [=bitt =boat =boar] (watches-8-to-9 watches.egg-8)
:* control-duct.egg-8
run-nonce.egg-8
sub-nonce=0
live.egg-8
stats.egg-8
bitt boat boar
[old-state beak marks]:egg-8
==
::
++ watches-8-to-9
|= watches-8
^- [bitt boat boar]
[inbound outbound (~(run by outbound) |=([acked=? =path] nonce=0))]
-- --
-- --
:: adult gall vane interface, for type compatibility with pupa :: adult gall vane interface, for type compatibility with pupa
:: ::
=| state=state-8 =| state=state-9
|= [now=@da eny=@uvJ rof=roof] |= [now=@da eny=@uvJ rof=roof]
=* gall-payload . =* gall-payload .
=< ~% %gall-wrap ..mo ~ =< ~% %gall-wrap ..mo ~
@ -280,6 +386,12 @@
++ mo ++ mo
~% %gall-mo +> ~ ~% %gall-mo +> ~
|_ [hen=duct moves=(list move)] |_ [hen=duct moves=(list move)]
::
++ trace
|= [verb=? =dude print=tang]
^+ same
(^trace verb dude dudes.bug.state print)
::
:: +mo-abed: initialise state with the provided duct :: +mo-abed: initialise state with the provided duct
:: +mo-abet: finalize, reversing moves :: +mo-abet: finalize, reversing moves
:: +mo-pass: prepend a standard %pass to the current list of moves :: +mo-pass: prepend a standard %pass to the current list of moves
@ -357,7 +469,7 @@
control-duct hen control-duct hen
beak bek beak bek
agent &+agent agent &+agent
nonce (scot %uw (end 5 (shas %yoke-nonce eny))) run-nonce (scot %uw (end 5 (shas %yoke-nonce eny)))
== ==
:: ::
=/ old mo-core =/ old mo-core
@ -457,8 +569,12 @@
=. outstanding.state =. outstanding.state
=/ stand =/ stand
(~(gut by outstanding.state) [wire hen] *(qeu remote-request)) (~(gut by outstanding.state) [wire hen] *(qeu remote-request))
(~(put by outstanding.state) [wire hen] (~(put to stand) -.deal)) %+ ~(put by outstanding.state) [wire hen]
(mo-pass wire note-arvo) (~(gas to stand) ?.(?=(%leave -.deal) ~[-.deal] ~[%leave %cork]))
=. mo-core (mo-pass wire note-arvo)
?. ?=(%leave -.deal)
mo-core
(mo-pass wire [%a [%cork ship]])
:: +mo-track-ship: subscribe to ames and jael for notices about .ship :: +mo-track-ship: subscribe to ames and jael for notices about .ship
:: ::
++ mo-track-ship ++ mo-track-ship
@ -680,7 +796,12 @@
(~(put to *(qeu remote-request)) %missing) (~(put to *(qeu remote-request)) %missing)
~| [full-wire=full-wire hen=hen stand=stand] ~| [full-wire=full-wire hen=hen stand=stand]
=^ rr stand ~(get to stand) =^ rr stand ~(get to stand)
[rr (~(put by outstanding.state) [full-wire hen] stand)] ~? &(=(rr %cork) ?=(^ stand))
[%outstanding-queue-not-empty wire hen]
:- rr
?: ?=(%cork rr)
(~(del by outstanding.state) [full-wire hen])
(~(put by outstanding.state) [full-wire hen] stand)
:: non-null case of wire is old, remove on next breach after :: non-null case of wire is old, remove on next breach after
:: 2019/12 :: 2019/12
:: ::
@ -696,6 +817,7 @@
%watch (mo-give %unto %watch-ack err) %watch (mo-give %unto %watch-ack err)
%poke (mo-give %unto %poke-ack err) %poke (mo-give %unto %poke-ack err)
%leave mo-core %leave mo-core
%cork mo-core
%missing (mo-give:(mo-give %unto %watch-ack err) %unto %poke-ack err) %missing (mo-give:(mo-give %unto %watch-ack err) %unto %poke-ack err)
== ==
:: ::
@ -738,7 +860,7 @@
?~ yoke ?~ yoke
%- (slog leaf+"gall: {<dap>} dead, got {<+<.sign-arvo>}" ~) %- (slog leaf+"gall: {<dap>} dead, got {<+<.sign-arvo>}" ~)
mo-core mo-core
?. =(nonce.u.yoke i.t.wire) ?. =(run-nonce.u.yoke i.t.wire)
%- (slog leaf+"gall: got old {<+<.sign-arvo>} for {<dap>}" ~) %- (slog leaf+"gall: got old {<+<.sign-arvo>} for {<dap>}" ~)
mo-core mo-core
?. ?=([?(%gall %behn) %unto *] sign-arvo) ?. ?=([?(%gall %behn) %unto *] sign-arvo)
@ -962,6 +1084,28 @@
%d (mo-give %unto %raw-fact mark.ames-response noun.ames-response) %d (mo-give %unto %raw-fact mark.ames-response noun.ames-response)
%x (mo-give %unto %kick ~) %x (mo-give %unto %kick ~)
== ==
:: +mo-spew: handle request to set verbosity toggles on debug output
::
++ mo-spew
|= verbs=(list verb)
^+ mo-core
:: start from all %.n's, then flip requested toggles
::
=. veb.bug.state
%+ roll verbs
|= [=verb acc=_veb-all-off]
^+ veb.bug.state
?- verb
%odd acc(odd %.y)
==
mo-core
:: +mo-sift: handle request to filter debug output by agent
::
++ mo-sift
|= dudes=(list dude)
^+ mo-core
=. dudes.bug.state (sy dudes)
mo-core
:: +ap: agent engine :: +ap: agent engine
:: ::
:: An inner, agent-level core. The sample refers to the agent we're :: An inner, agent-level core. The sample refers to the agent we're
@ -976,6 +1120,12 @@
agent-config=(list (each suss tang)) agent-config=(list (each suss tang))
=yoke =yoke
== ==
::
++ trace
|= [verb=? print=tang]
^+ same
(^trace verb agent-name print)
::
++ ap-core . ++ ap-core .
:: +ap-abed: initialise state for an agent, with the supplied routes. :: +ap-abed: initialise state for an agent, with the supplied routes.
:: ::
@ -1036,11 +1186,9 @@
:: ::
++ ap-nuke ++ ap-nuke
^+ ap-core ^+ ap-core
=/ out=(list [[=wire =ship =term] ? =path])
~(tap by outbound.watches.yoke)
=/ inbound-paths=(set path) =/ inbound-paths=(set path)
%- silt %- silt
%+ turn ~(tap by inbound.watches.yoke) %+ turn ~(tap by bitt.yoke)
|= [=duct =ship =path] |= [=duct =ship =path]
path path
=/ will=(list card:agent:gall) =/ will=(list card:agent:gall)
@ -1048,7 +1196,7 @@
?: =(~ inbound-paths) ?: =(~ inbound-paths)
~ ~
[%give %kick ~(tap in inbound-paths) ~]~ [%give %kick ~(tap in inbound-paths) ~]~
%+ turn ~(tap by outbound.watches.yoke) %+ turn ~(tap by boat.yoke)
|= [[=wire =ship =term] ? =path] |= [[=wire =ship =term] ? =path]
[%pass wire %agent [ship term] %leave ~] [%pass wire %agent [ship term] %leave ~]
=^ maybe-tang ap-core (ap-ingest ~ |.([will *agent])) =^ maybe-tang ap-core (ap-ingest ~ |.([will *agent]))
@ -1133,12 +1281,13 @@
tang.neet tang.neet
== ==
=. wire =. wire
:^ %use agent-name run-nonce.yoke
?- -.neet ?- -.neet
%agent [%out (scot %p ship.neet) name.neet wire] %agent [%out (scot %p ship.neet) name.neet wire]
%huck [%out (scot %p ship.neet) name.neet wire] %huck [%out (scot %p ship.neet) name.neet wire]
%arvo [(scot %p attributing.agent-routes) wire] %arvo [(scot %p attributing.agent-routes) wire]
== ==
=. wire [%use agent-name nonce.yoke wire] ::
=/ =note-arvo =/ =note-arvo
?- -.neet ?- -.neet
%arvo note-arvo.neet %arvo note-arvo.neet
@ -1152,8 +1301,7 @@
++ ap-breach ++ ap-breach
|= =ship |= =ship
^+ ap-core ^+ ap-core
=/ in=(list [=duct =^ship =path]) =/ in=(list [=duct =^ship =path]) ~(tap by bitt.yoke)
~(tap by inbound.watches.yoke)
|- ^+ ap-core |- ^+ ap-core
?^ in ?^ in
=? ap-core =(ship ship.i.in) =? ap-core =(ship ship.i.in)
@ -1161,15 +1309,19 @@
core(agent-duct agent-duct) core(agent-duct agent-duct)
$(in t.in) $(in t.in)
:: ::
=/ out=(list [[=wire =^ship =term] ? =path]) =/ out=(list [[=wire =^ship =term] ? =path nonce=@])
~(tap by outbound.watches.yoke) %+ turn ~(tap by boat.yoke)
|= [key=[wire ^ship term] val=[? path]]
:- key
val(+ [+.val (~(got by boar.yoke) key)])
|- ^+ ap-core |- ^+ ap-core
?~ out ?~ out
ap-core ap-core
=? ap-core =(ship ship.i.out) =? ap-core =(ship ship.i.out)
=/ core =/ core
=. agent-duct system-duct.state =. agent-duct system-duct.state
=/ way [%out (scot %p ship) term.i.out wire.i.out] =/ way
[%out (scot %p ship) term.i.out (scot %ud nonce.i.out) wire.i.out]
(ap-specific-take way %kick ~) (ap-specific-take way %kick ~)
core(agent-duct agent-duct) core(agent-duct agent-duct)
$(out t.out) $(out t.out)
@ -1184,8 +1336,7 @@
|= =ship |= =ship
^+ ap-core ^+ ap-core
:: ::
=/ in=(list [=duct =^ship =path]) =/ in=(list [=duct =^ship =path]) ~(tap by bitt.yoke)
~(tap by inbound.watches.yoke)
|- ^+ ap-core |- ^+ ap-core
?~ in ap-core ?~ in ap-core
:: ::
@ -1206,7 +1357,7 @@
?~ target-paths ?~ target-paths
?~ target-ship ?~ target-ship
~[agent-duct] ~[agent-duct]
%+ murn ~(tap by inbound.watches.yoke) %+ murn ~(tap by bitt.yoke)
|= [=duct =ship =path] |= [=duct =ship =path]
^- (unit ^duct) ^- (unit ^duct)
?: =(target-ship `ship) ?: =(target-ship `ship)
@ -1221,7 +1372,7 @@
++ ap-ducts-from-path ++ ap-ducts-from-path
|= [target-path=path target-ship=(unit ship)] |= [target-path=path target-ship=(unit ship)]
^- (list duct) ^- (list duct)
%+ murn ~(tap by inbound.watches.yoke) %+ murn ~(tap by bitt.yoke)
|= [=duct =ship =path] |= [=duct =ship =path]
^- (unit ^duct) ^- (unit ^duct)
?: ?& =(target-path path) ?: ?& =(target-path path)
@ -1284,15 +1435,6 @@
?: ?=(%& -.res) ?: ?=(%& -.res)
``want^p.res ``want^p.res
((slog leaf+"peek failed tube from {(trip have)} to {(trip want)}" ~) ~) ((slog leaf+"peek failed tube from {(trip have)} to {(trip want)}" ~) ~)
:: +ap-update-subscription: update subscription.
::
++ ap-update-subscription
~/ %ap-update-subscription
|= [is-ok=? =other=ship other-agent=term =wire]
^+ ap-core
?: is-ok
ap-core
(ap-kill-down wire [other-ship other-agent])
:: +ap-move: send move :: +ap-move: send move
:: ::
++ ap-move ++ ap-move
@ -1316,8 +1458,8 @@
attributing.agent-routes :: guest attributing.agent-routes :: guest
agent-name :: agent agent-name :: agent
== :: == ::
:* wex=outbound.watches.yoke :: outgoing :* wex=boat.yoke :: outgoing
sup=inbound.watches.yoke :: incoming sup=bitt.yoke :: incoming
== :: == ::
:* act=change.stats.yoke :: tick :* act=change.stats.yoke :: tick
eny=eny.stats.yoke :: nonce eny=eny.stats.yoke :: nonce
@ -1352,9 +1494,8 @@
~/ %ap-subscribe ~/ %ap-subscribe
|= pax=path |= pax=path
^+ ap-core ^+ ap-core
=/ incoming [attributing.agent-routes pax] =/ incoming [attributing.agent-routes pax]
=. inbound.watches.yoke =. bitt.yoke (~(put by bitt.yoke) agent-duct incoming)
(~(put by inbound.watches.yoke) agent-duct incoming)
=^ maybe-tang ap-core =^ maybe-tang ap-core
%+ ap-ingest %watch-ack |. %+ ap-ingest %watch-ack |.
(on-watch:ap-agent-core pax) (on-watch:ap-agent-core pax)
@ -1404,6 +1545,7 @@
=/ other-agent i.t.t.wire =/ other-agent i.t.t.wire
=/ =dock [other-ship other-agent] =/ =dock [other-ship other-agent]
=/ agent-wire t.t.t.wire =/ agent-wire t.t.t.wire
=/ nonce=@ 0
:: ::
=^ =sign:agent ap-core =^ =sign:agent ap-core
?. ?=(%raw-fact -.unto) ?. ?=(%raw-fact -.unto)
@ -1424,48 +1566,106 @@
%- ap-move :_ ~ %- ap-move :_ ~
:^ hen %pass /nowhere :^ hen %pass /nowhere
[%c %warp our q.beak.yoke ~ %sing %b case /[mark.unto]] [%c %warp our q.beak.yoke ~ %sing %b case /[mark.unto]]
|^ ^+ ap-core
:: %poke-ack has no nonce; ingest directly
::
?: ?=(%poke-ack -.sign)
ingest-and-check-error
:: if .agent-wire matches, it's an old pre-nonce subscription
::
?: (~(has by boat.yoke) sub-key)
run-sign
:: if an app happened to use a null wire, no-op
::
?: =(~ agent-wire)
on-missing
=/ has-nonce=(unit @ud) (slaw %ud (head agent-wire))
?: &(?=(~ has-nonce) ?=(%kick -.sign))
on-weird-kick
:: pop nonce off .agent-wire and match against stored subscription
::
?> ?=(^ has-nonce)
=: nonce u.has-nonce
agent-wire (tail agent-wire)
==
?~ got=(~(get by boar.yoke) sub-key)
on-missing
?: =(nonce.u.got nonce)
run-sign
(on-bad-nonce nonce.u.got)
:: ::
:: if subscription ack or close, handle before calling user code ++ sub-key [agent-wire dock]
:: ++ ingest (ap-ingest ~ |.((on-agent:ap-agent-core agent-wire sign)))
=? outbound.watches.yoke ?=(%kick -.sign) ++ run-sign
%- ~(del by outbound.watches.yoke) ?- -.sign
[agent-wire dock] %poke-ack !!
?: ?& ?=(%watch-ack -.sign) %fact
!(~(has by outbound.watches.yoke) [agent-wire dock]) =^ tan ap-core ingest
== ?~ tan ap-core
%- %: slog =. ap-core (ap-kill-down sub-key)
(ap-error -.sign leaf/"take %fact failed, closing subscription" u.tan)
::
%kick
=: boar.yoke (~(del by boar.yoke) sub-key)
boat.yoke (~(del by boat.yoke) sub-key)
==
ingest-and-check-error
::
%watch-ack
?. (~(has by boat.yoke) sub-key)
%. ap-core
%+ trace odd.veb.bug.state :~
leaf+"{<agent-name>}: got ack for nonexistent subscription" leaf+"{<agent-name>}: got ack for nonexistent subscription"
leaf+"{<dock>}: {<agent-wire>}" leaf+"{<dock>}: {<agent-wire>}"
>wire=wire< >wire=wire<
~
== ==
ap-core =? boar.yoke ?=(^ p.sign) (~(del by boar.yoke) sub-key)
::
=. boat.yoke
?^ p.sign (~(del by boat.yoke) sub-key)
::
%+ ~(jab by boat.yoke) sub-key
|= val=[acked=? =path]
%. val(acked &)
%^ trace &(odd.veb.bug.state acked.val)
leaf/"{<agent-name>} 2nd watch-ack on {<val>}" ~
::
ingest-and-check-error
==
:: ::
=? outbound.watches.yoke ?=(%watch-ack -.sign) ++ on-missing
?^ p.sign %. ap-core
%- ~(del by outbound.watches.yoke) %+ trace odd.veb.bug.state :~
[agent-wire dock] leaf+"{<agent-name>}: got {<-.sign>} for nonexistent subscription"
%+ ~(jab by outbound.watches.yoke) [agent-wire dock] leaf+"{<dock>}: {<[nonce=nonce agent-wire]>}"
|= [acked=? =path] >wire=wire<
=. . ==
?. acked
.
%- =/ =tape
"{<agent-name>}: received 2nd watch-ack on {<wire dock path>}"
(slog leaf+tape ~)
.
[& path]
:: ::
=^ maybe-tang ap-core ++ on-weird-kick
%+ ap-ingest ~ |. %. run-sign
(on-agent:ap-agent-core agent-wire sign) %+ trace odd.veb.bug.state :~
:: if failed %fact handling, kill subscription leaf+"{<agent-name>}: got %kick for nonexistent subscription"
leaf+"{<dock>}: {<agent-wire>}"
>wire=wire<
==
:: ::
=? ap-core ?=(%fact -.sign) ++ on-bad-nonce
(ap-update-subscription =(~ maybe-tang) p.dock q.dock agent-wire) |= stored-nonce=@
?^ maybe-tang %. ap-core
(ap-error -.sign leaf/"closing subscription" u.maybe-tang) %- slog :~
ap-core =/ nonces [expected=stored-nonce got=nonce]
=/ ok |(?=(?(%fact %kick) -.sign) =(~ p.sign))
leaf+"{<agent-name>}: stale {<-.sign>} {<nonces>} ok={<ok>}"
::
leaf+"{<dock>}: {<agent-wire>}"
>wire=wire<
==
::
++ ingest-and-check-error
^+ ap-core
=^ tan ap-core ingest
?~(tan ap-core (ap-error -.sign leaf/"take {<-.sign>} failed" u.tan))
--
:: +ap-install: install wrapper. :: +ap-install: install wrapper.
:: ::
++ ap-install ++ ap-install
@ -1499,24 +1699,18 @@
:: ::
++ ap-silent-delete ++ ap-silent-delete
^+ ap-core ^+ ap-core
:: ap-core(bitt.yoke (~(del by bitt.yoke) agent-duct))
%= ap-core
inbound.watches.yoke
(~(del by inbound.watches.yoke) agent-duct)
==
:: +ap-load-delete: load delete. :: +ap-load-delete: load delete.
:: ::
++ ap-load-delete ++ ap-load-delete
^+ ap-core ^+ ap-core
:: ::
=/ maybe-incoming =/ maybe-incoming (~(get by bitt.yoke) agent-duct)
(~(get by inbound.watches.yoke) agent-duct)
?~ maybe-incoming ?~ maybe-incoming
ap-core ap-core
:: ::
=/ incoming u.maybe-incoming =/ incoming u.maybe-incoming
=. inbound.watches.yoke =. bitt.yoke (~(del by bitt.yoke) agent-duct)
(~(del by inbound.watches.yoke) agent-duct)
:: ::
=^ maybe-tang ap-core =^ maybe-tang ap-core
%+ ap-ingest ~ |. %+ ap-ingest ~ |.
@ -1621,11 +1815,10 @@
`ap-core `ap-core
:: ::
=. agent.yoke &++.p.result =. agent.yoke &++.p.result
=/ moves (zing (turn -.p.result ap-from-internal)) =/ moves (zing (turn -.p.result ap-from-internal))
=. inbound.watches.yoke =. bitt.yoke (ap-handle-kicks moves)
(ap-handle-kicks moves)
(ap-handle-peers moves) (ap-handle-peers moves)
:: +ap-handle-kicks: handle cancels of inbound.watches :: +ap-handle-kicks: handle cancels of bitt.watches
:: ::
++ ap-handle-kicks ++ ap-handle-kicks
~/ %ap-handle-kicks ~/ %ap-handle-kicks
@ -1641,8 +1834,8 @@
:: ::
=/ quit-map=bitt =/ quit-map=bitt
(malt (turn quits |=(=duct [duct *[ship path]]))) (malt (turn quits |=(=duct [duct *[ship path]])))
(~(dif by inbound.watches.yoke) quit-map) (~(dif by bitt.yoke) quit-map)
:: +ap-handle-peers: handle new outbound.watches :: +ap-handle-peers: handle new boat.watches
:: ::
++ ap-handle-peers ++ ap-handle-peers
~/ %ap-handle-peers ~/ %ap-handle-peers
@ -1656,34 +1849,62 @@
?: ?=([* %pass * %g %deal * * %leave *] move) ?: ?=([* %pass * %g %deal * * %leave *] move)
=/ =wire p.move.move =/ =wire p.move.move
?> ?=([%use @ @ %out @ @ *] wire) ?> ?=([%use @ @ %out @ @ *] wire)
=/ short-wire t.t.t.t.t.t.wire =/ =dock [q.p q]:q.move.move
=/ =dock [q.p q]:q.move.move =/ sys-wire=^wire (scag 6 `^wire`wire)
=. outbound.watches.yoke =/ sub-wire=^wire (slag 6 `^wire`wire)
(~(del by outbound.watches.yoke) [short-wire dock]) ::
?. (~(has by boat.yoke) sub-wire dock)
%. $(moves t.moves)
%^ trace odd.veb.bug.state
leaf/"gall: {<agent-name>} missing subscription, got %leave" ~
=/ nonce=@ (~(got by boar.yoke) sub-wire dock)
=. p.move.move
%+ weld sys-wire
?: =(nonce 0)
:: skip adding nonce to pre-nonce subscription wires
::
sub-wire
[(scot %ud nonce) sub-wire]
=: boat.yoke (~(del by boat.yoke) [sub-wire dock])
boar.yoke (~(del by boar.yoke) [sub-wire dock])
==
:: if nonce = 0, this was a pre-nonce subscription so later
:: subscriptions need to start subscribing on the next nonce
::
=? sub-nonce.yoke =(nonce 0) +(sub-nonce.yoke)
$(moves t.moves, new-moves [move new-moves]) $(moves t.moves, new-moves [move new-moves])
?. ?=([* %pass * %g %deal * * ?(%watch %watch-as) *] move) ?. ?=([* %pass * %g %deal * * ?(%watch %watch-as) *] move)
$(moves t.moves, new-moves [move new-moves]) $(moves t.moves, new-moves [move new-moves])
=/ =wire p.move.move =/ =wire p.move.move
?> ?=([%use @ @ %out @ @ *] wire) ?> ?=([%use @ @ %out @ @ *] wire)
=/ short-wire t.t.t.t.t.t.wire =/ sys-wire=^wire (scag 6 `^wire`wire)
=/ =dock [q.p q]:q.move.move =/ sub-wire=^wire (slag 6 `^wire`wire)
=/ =path =/ [=dock =deal] [[q.p q] r]:q.move.move
?- -.r.q.move.move ::
%watch path.r.q.move.move ?: (~(has by boat.yoke) sub-wire dock)
%watch-as path.r.q.move.move
==
?: (~(has by outbound.watches.yoke) short-wire dock)
=. ap-core =. ap-core
=/ =tang =/ =tang
~[leaf+"subscribe wire not unique" >agent-name< >short-wire< >dock<] ~[leaf+"subscribe wire not unique" >agent-name< >sub-wire< >dock<]
=/ have =/ have (~(got by boat.yoke) sub-wire dock)
(~(got by outbound.watches.yoke) short-wire dock)
%- (slog >out=have< tang) %- (slog >out=have< tang)
(ap-error %watch-not-unique tang) :: reentrant, maybe bad? (ap-error %watch-not-unique tang) :: reentrant, maybe bad?
$(moves t.moves) $(moves t.moves)
=. outbound.watches.yoke ::
(~(put by outbound.watches.yoke) [short-wire dock] [| path]) =. p.move.move
$(moves t.moves, new-moves [move new-moves]) (weld sys-wire [(scot %ud sub-nonce.yoke) sub-wire])
%_ $
moves t.moves
new-moves [move new-moves]
sub-nonce.yoke +(sub-nonce.yoke)
::
boat.yoke
%+ ~(put by boat.yoke) [sub-wire dock]
:- acked=|
path=?+(-.deal !! %watch path.deal, %watch-as path.deal)
::
boar.yoke
(~(put by boar.yoke) [sub-wire dock] sub-nonce.yoke)
==
-- --
-- --
:: +call: request :: +call: request
@ -1726,6 +1947,8 @@
%jolt mo-abet:(mo-jolt:mo-core dude.task our desk.task) %jolt mo-abet:(mo-jolt:mo-core dude.task our desk.task)
%idle mo-abet:(mo-idle:mo-core dude.task) %idle mo-abet:(mo-idle:mo-core dude.task)
%nuke mo-abet:(mo-nuke:mo-core dude.task) %nuke mo-abet:(mo-nuke:mo-core dude.task)
%spew mo-abet:(mo-spew:mo-core veb.task)
%sift mo-abet:(mo-sift:mo-core dudes.task)
%trim [~ gall-payload] %trim [~ gall-payload]
%vega [~ gall-payload] %vega [~ gall-payload]
== ==
@ -1797,6 +2020,19 @@
acc acc
(~(put in acc) [dude -.agent.yoke]) (~(put in acc) [dude -.agent.yoke])
:: ::
?: ?& =(%n care)
?=([@ @ ^] path)
=([%$ %da now] coin)
=(our ship)
==
?~ yok=(~(get by yokes.state) dap)
[~ ~]
=/ [=^ship =term =wire]
[(slav %p i.path) i.t.path t.t.path]
?~ nonce=(~(get by boar.u.yok) [wire ship term])
[~ ~]
[~ ~ atom+!>(u.nonce)]
::
?. =(our ship) ?. =(our ship)
~ ~
?. =([%$ %da now] coin) ?. =([%$ %da now] coin)

View File

@ -354,46 +354,48 @@
!> (snag 0 `(list move:ames)`moves6) !> (snag 0 `(list move:ames)`moves6)
== ==
:: ::
++ test-comet-message-flow ^- tang ::TODO crashes in (snag 0 moves5), presumably due to subtle changes around
:: same as test-message-flow, but ~nec will send a sendkeys packet to request :: #5886. fix and re-enable!
:: comet's self-attestation directly :: ++ test-comet-message-flow ^- tang
:: :: :: same as test-message-flow, but ~nec will send a sendkeys packet to request
=^ moves0 nec (call nec ~[/g/talk] %plea our-comet %g /talk [%get %post]) :: :: comet's self-attestation directly
=^ moves1 comet (call comet ~[//unix] %hear (snag-packet 0 moves0)) :: ::
=^ moves2 comet :: =^ moves0 nec (call nec ~[/g/talk] %plea our-comet %g /talk [%get %post])
=/ =point:ames :: =^ moves1 comet (call comet ~[//unix] %hear (snag-packet 0 moves0))
:* rift=1 :: =^ moves2 comet
life=2 :: =/ =point:ames
keys=[[life=2 [crypto-suite=1 `@`nec-pub]] ~ ~] :: :* rift=1
sponsor=`~nec :: life=2
== :: keys=[[life=2 [crypto-suite=1 `@`nec-pub]] ~ ~]
%- take :: sponsor=`~nec
:^ comet /public-keys ~[//unix] :: ==
^- sign:ames :: %- take
[%jael %public-keys %full [n=[~nec point] ~ ~]] :: :^ comet /public-keys ~[//unix]
:: give comet's self-attestation to ~nec; at this point, we have established :: ^- sign:ames
:: a channel, and can proceed as usual :: [%jael %public-keys %full [n=[~nec point] ~ ~]]
:: :: :: give comet's self-attestation to ~nec; at this point, we have established
=^ moves3 nec (call nec ~[//unix] %hear (snag-packet 0 moves2)) :: :: a channel, and can proceed as usual
=^ moves4 comet (call comet ~[//unix] %hear (snag-packet 0 moves3)) :: ::
=^ moves5 comet (take comet /bone/~nec/0/1 ~[//unix] %g %done ~) :: =^ moves3 nec (call nec ~[//unix] %hear (snag-packet 0 moves2))
=^ moves6 nec (call nec ~[//unix] %hear (snag-packet 0 moves5)) :: =^ moves4 comet (call comet ~[//unix] %hear (snag-packet 0 moves3))
=^ moves7 comet (take comet /bone/~nec/0/1 ~[//unix] %g %boon [%post 'first1!!']) :: =^ moves5 comet (take comet /bone/~nec/0/1 ~[//unix] %g %done ~)
=^ moves8 nec (call nec ~[//unix] %hear (snag-packet 0 moves7)) :: =^ moves6 nec (call nec ~[//unix] %hear (snag-packet 0 moves5))
:: :: =^ moves7 comet (take comet /bone/~nec/0/1 ~[//unix] %g %boon [%post 'first1!!'])
;: weld :: =^ moves8 nec (call nec ~[//unix] %hear (snag-packet 0 moves7))
%+ expect-eq :: ::
!> [~[//unix] %pass /qos %d %flog %text "; ~nec is your neighbor"] :: ;: weld
!> (snag 0 `(list move:ames)`moves4) :: %+ expect-eq
:: :: !> [~[//unix] %pass /qos %d %flog %text "; ~nec is your neighbor"]
%+ expect-eq :: !> (snag 0 `(list move:ames)`moves4)
!> [~[//unix] %pass /qos %d %flog %text "; {<our-comet>} is your neighbor"] :: ::
!> (snag 0 `(list move:ames)`moves6) :: %+ expect-eq
:: :: !> [~[//unix] %pass /qos %d %flog %text "; {<our-comet>} is your neighbor"]
%+ expect-eq :: !> (snag 0 `(list move:ames)`moves6)
!> [~[/g/talk] %give %boon [%post 'first1!!']] :: ::
!> (snag 0 `(list move:ames)`moves8) :: %+ expect-eq
== :: !> [~[/g/talk] %give %boon [%post 'first1!!']]
:: !> (snag 0 `(list move:ames)`moves8)
:: ==
:: ::
++ test-comet-comet-message-flow ^- tang ++ test-comet-comet-message-flow ^- tang
:: same as test-message-flow, but the comets need to exchange :: same as test-message-flow, but the comets need to exchange

View File

@ -27,6 +27,7 @@
%da s+(scot %da p.c) %da s+(scot %da p.c)
%tas s+(scot %tas p.c) %tas s+(scot %tas p.c)
%ud (numb p.c) %ud (numb p.c)
%uv s+(scot %uv p.c)
== ==
++ foreign-desk ++ foreign-desk
|= [s=^ship =desk] |= [s=^ship =desk]