Merge branch 'yu/gall-cork-wip' into yu/gall-rq-wire-ames-flow-kill

This commit is contained in:
yosoyubik 2022-05-17 14:41:54 +02:00
commit a2cfffb483
9 changed files with 242 additions and 108 deletions

View File

@ -206,14 +206,13 @@
'ship'^(ship s)
'path'^(path p)
==
:: TODO: display subscription nonce
::
++ outgoing
|= =boat:gall
^- json
:- %a
%+ turn ~(tap by boat)
|= [[w=wire s=^ship t=term] [a=? p=^path nonce=@]]
|= [[w=wire s=^ship t=term] [a=? p=^path]]
%- pairs
:~ 'wire'^(path w)
'ship'^(ship s)

View File

@ -607,7 +607,7 @@
==
:_ state
%+ murn ~(tap by wex.bowl)
|= [[=wire =ship =term] [acked=? =path nonce=@]]
|= [[=wire =ship =term] [acked=? =path]]
^- (unit card)
?. ?& ?=([%thread @ *] wire)
=(tid i.t.wire)

View File

@ -0,0 +1,8 @@
:: Helm: Set Gall Verbosity by Agent
::
/? 310
::
:- %say
|= [^ dudes=(list dude:gall) ~]
:- %helm-gall-sift
dudes

View File

@ -0,0 +1,11 @@
:: Helm: Adjust Gall verbosity
::
:: List of diagnostic flags is in verb:gall in zuse.hoon, documented in
:: gall.hoon
::
/? 310
::
:- %say
|= [^ veb=(list verb:gall) ~]
:- %helm-gall-verb
veb

View File

@ -200,6 +200,14 @@
|= veb=(list verb:ames) =< abet
(emit %pass /helm %arvo %a %spew veb)
::
++ poke-gall-sift
|= dudes=(list dude:gall) =< abet
(emit %pass /helm %arvo %g %sift dudes)
::
++ poke-gall-verb
|= veb=(list verb:gall) =< abet
(emit %pass /helm %arvo %g %spew veb)
::
++ poke-ames-wake
|= ~ =< abet
(emit %pass /helm %arvo %a %stir '')
@ -237,6 +245,8 @@
%helm-code =;(f (f !<(_+<.f vase)) poke-code)
%helm-cors-approve =;(f (f !<(_+<.f vase)) poke-cors-approve)
%helm-cors-reject =;(f (f !<(_+<.f vase)) poke-cors-reject)
%helm-gall-sift =;(f (f !<(_+<.f vase)) poke-gall-sift)
%helm-gall-verb =;(f (f !<(_+<.f vase)) poke-gall-verb)
%helm-hi =;(f (f !<(_+<.f vase)) poke-hi)
%helm-knob =;(f (f !<(_+<.f vase)) poke-knob)
%helm-mass =;(f (f !<(_+<.f vase)) poke-mass)

View File

@ -1653,11 +1653,12 @@
$>(%trim vane-task) :: trim state
$>(%vega vane-task) :: report upgrade
$>(%plea vane-task) :: network request
[%spew veb=(list verb)] :: set verbosity
[%sift dudes=(list dude)] :: per agent
== ::
+$ bitt (map duct (pair ship path)) :: incoming subs
+$ boat :: outgoing subs
%+ map [=wire =ship =term] ::
[acked=? =path nonce=@] ::
+$ boat (map [=wire =ship =term] [acked=? =path]) :: outgoing subs
+$ beat (map [=wire =ship =term] nonce=@) ::
+$ bowl :: standard app state
$: $: our=ship :: host
src=ship :: guest
@ -1693,6 +1694,9 @@
$% [%raw-fact =mark =noun]
sign:agent
==
:: TODO: add more flags?
::
+$ verb ?(%odd)
::
:: +agent: app core
::

View File

@ -816,6 +816,13 @@
|= [now=@da eny=@ rof=roof]
=* larval-gate .
=* adult-core (adult-gate +<)
=< |%
++ call ^call
++ load ^load
++ scry ^scry
++ stay ^stay
++ take ^take
--
|%
:: +call: handle request $task
::
@ -828,18 +835,8 @@
?^ dud
~|(%ames-larval-call-dud (mean tang.u.dud))
::
=/ update-ready=?
?& ?=(^ cached-state)
?=(~ queued-events)
==
?: update-ready
=. ames-state.adult-gate
%- state-6-to-7:load:adult-core
?> ?=(^ cached-state)
(state-5-to-6:load:adult-core +.u.cached-state)
=. cached-state ~
~> %slog.1^leaf/"ames: metamorphosis reload"
[~ adult-gate]
?: &(?=(^ cached-state) ?=(~ queued-events))
(molt ~)
:: %born: set .unix-duct and start draining .queued-events
::
?: ?=(%born -.task)
@ -869,11 +866,17 @@
~|(%ames-larval-take-dud (mean tang.u.dud))
:: enqueue event if not a larval drainage timer
::
=? queued-events !=(/larva wire)
(~(put to queued-events) %take wire duct sign)
:: start drainage timer if have regressed from adult ames
::
?: ?& !=(/larva wire)
?=(^ cached-state)
==
[[duct %pass /larva %b %wait now]~ larval-gate]
:: XX what to do with errors?
::
?. =(/larva wire)
=. queued-events (~(put to queued-events) %take wire duct sign)
[~ larval-gate]
?. =(/larva wire) [~ larval-gate]
:: larval event drainage timer; pop and process a queued event
::
?. ?=([%behn %wake *] sign)
@ -912,21 +915,10 @@
%call (call:adult-core [duct ~ wrapped-task]:+.first-event)
%take (take:adult-core [wire duct ~ sign]:+.first-event)
==
=/ update-ready=?
?& ?=(^ cached-state)
?=(~ queued-events)
==
?: update-ready
=. ames-state.adult-gate
%- state-6-to-7:load:adult-core
?> ?=(^ cached-state)
(state-5-to-6:load:adult-core +.u.cached-state)
=. cached-state ~
~> %slog.1^leaf/"ames: metamorphosis reload"
[moves adult-gate]
:: .queued-events has been cleared; metamorphose
::
?~ queued-events
?: ?=(^ cached-state) (molt moves)
~> %slog.0^leaf/"ames: metamorphosis"
[moves adult-gate]
:: set timer to drain next event
@ -1002,6 +994,18 @@
larval-gate
::
==
:: +molt: re-evolve to adult-ames
::
++ molt
|= moves=(list move)
^- (quip move _adult-gate)
=. ames-state.adult-gate
%- state-6-to-7:load:adult-core
?> ?=(^ cached-state)
(state-5-to-6:load:adult-core +.u.cached-state)
=. cached-state ~
~> %slog.1^leaf/"ames: metamorphosis reload"
[~ adult-gate]
--
:: adult ames, after metamorphosis from larva
::
@ -2410,6 +2414,11 @@
=/ target-bone=^bone (mix 0b10 bone)
::
(run-message-sink target-bone %drop message-num)
?: &(closing ?=(%near -.task))
:: if the bone belongs to a closing flow and we got a naxplanation,
:: don't relay the ack to the client vane, and wait for the next try
::
peer-core
:: not a nack-trace bone; relay ack to client vane
::
(emit (got-duct bone) %give %done error)
@ -2602,6 +2611,8 @@
:: if we get a naxplanation for a %cork, the publisher is behind
:: receiving the OTA, so we set up a timer to retry in one hour.
::
%- %+ trace msg.veb
|.("resend %cork on bone={<target-bone>} in ~h1")
=/ =wire (make-pump-timer-wire her.channel target-bone)
(emit [/ames]~ %pass wire %b %wait `@da`(add now ~h1))
:: +on-sink-plea: handle request message received by |message-sink

View File

@ -4,15 +4,45 @@
::
::::
|= our=ship
:: veb: verbosity flags
::
=/ veb-all-off
:: TODO: add more flags?
::
:* odd=`?`%.n :: unusual events
==
=, gall
=>
|%
+| %helpers
:: +trace: print if .verb is set and we're tracking .dude
::
++ trace
|= [verb=? =dude dudes=(set dude) print=tang]
^+ same
?. verb
same
?. => [dude=dude dudes=dudes in=in]
~+ |(=(~ dudes) (~(has in dudes) dude))
same
(slog print)
::
:: $bug: debug printing configuration
::
:: veb: verbosity toggles
:: dudes: app filter; if ~, print for all
::
+$ bug
$: veb=_veb-all-off
dudes=(set dude)
==
::
+| %main
::
:: $move: Arvo-level move
::
+$ move [=duct move=(wind note-arvo gift-arvo)]
:: $state-8: overall gall state, versioned
:: $state-9: overall gall state, versioned
::
+$ state-9 [%9 state]
:: $state: overall gall state
@ -22,6 +52,7 @@
:: contacts: other ships we're in communication with
:: yokes: running agents
:: blocked: moves to agents that haven't been started yet
:: bug: debug printing configuration
::
+$ state
$: system-duct=duct
@ -29,13 +60,14 @@
contacts=(set ship)
yokes=(map term yoke)
blocked=(map term (qeu blocked-move))
=bug
==
:: $watches: subscribers and publications
::
:: TODO: rename this, to $ties?
:: TODO: rename $boat and $bitt and document
:: TODO: document
::
+$ watches [inbound=bitt outbound=boat]
+$ watches [=bitt =boat =beat]
:: $routes: new cuff; TODO: document
::
+$ routes
@ -126,6 +158,7 @@
contacts=(set ship)
eggs=(map term egg)
blocked=(map term (qeu blocked-move))
=bug
==
:: $egg: migratory agent state; $yoke with .old-state instead of .agent
::
@ -169,7 +202,7 @@
[^duct %pass /whiz/gall %$ %whiz ~]~
=/ adult adult-core
=. state.adult
[%9 system-duct outstanding contacts yokes=~ blocked]:spore
[%9 system-duct outstanding contacts yokes=~ blocked bug]:spore
=/ mo-core (mo-abed:mo:adult duct)
=. mo-core
=/ apps=(list [dap=term =egg]) ~(tap by eggs.spore)
@ -288,7 +321,7 @@
++ spore-8-to-9
|= old=spore-8
^- ^spore
=- old(- %9, eggs -)
=- old(- %9, eggs -, blocked [blocked.old *bug])
%- ~(run by eggs.old)
|= =egg-8
^- egg
@ -297,16 +330,14 @@
sub-nonce=0
live.egg-8
stats.egg-8
[inbound.watches.egg-8 (boat-8-to-9 outbound.watches.egg-8)]
(watches-8-to-9 watches.egg-8)
[old-state beak marks]:egg-8
==
::
++ boat-8-to-9
|= =boat-8
^- boat
%- ~(run by boat-8)
|= [acked=? =path]
[acked path nonce=0]
++ watches-8-to-9
|= watches-8
^- watches
[inbound outbound (~(run by outbound) |=([acked=? =path] nonce=0))]
--
--
:: adult gall vane interface, for type compatibility with pupa
@ -332,6 +363,12 @@
++ mo
~% %gall-mo +> ~
|_ [hen=duct moves=(list move)]
::
++ trace
|= [verb=? =dude print=tang]
^+ same
(^trace verb dude dudes.bug.state print)
::
:: +mo-abed: initialise state with the provided duct
:: +mo-abet: finalize, reversing moves
:: +mo-pass: prepend a standard %pass to the current list of moves
@ -1024,6 +1061,28 @@
%d (mo-give %unto %raw-fact mark.ames-response noun.ames-response)
%x (mo-give %unto %kick ~)
==
:: +mo-spew: handle request to set verbosity toggles on debug output
::
++ mo-spew
|= verbs=(list verb)
^+ mo-core
:: start from all %.n's, then flip requested toggles
::
=. veb.bug.state
%+ roll verbs
|= [=verb acc=_veb-all-off]
^+ veb.bug.state
?- verb
%odd acc(odd %.y)
==
mo-core
:: +mo-sift: handle request to filter debug output by agent
::
++ mo-sift
|= dudes=(list dude)
^+ mo-core
=. dudes.bug.state (sy dudes)
mo-core
:: +ap: agent engine
::
:: An inner, agent-level core. The sample refers to the agent we're
@ -1038,6 +1097,12 @@
agent-config=(list (each suss tang))
=yoke
==
::
++ trace
|= [verb=? print=tang]
^+ same
(^trace verb agent-name print)
::
++ ap-core .
:: +ap-abed: initialise state for an agent, with the supplied routes.
::
@ -1098,11 +1163,11 @@
::
++ ap-nuke
^+ ap-core
=/ out=(list [[=wire =ship =term] ? =path nonce=@])
~(tap by outbound.watches.yoke)
=/ out=(list [[=wire =ship =term] ? =path])
~(tap by boat.watches.yoke)
=/ inbound-paths=(set path)
%- silt
%+ turn ~(tap by inbound.watches.yoke)
%+ turn ~(tap by bitt.watches.yoke)
|= [=duct =ship =path]
path
=/ will=(list card:agent:gall)
@ -1110,8 +1175,8 @@
?: =(~ inbound-paths)
~
[%give %kick ~(tap in inbound-paths) ~]~
%+ turn ~(tap by outbound.watches.yoke)
|= [[=wire =ship =term] ? =path nonce=@]
%+ turn ~(tap by boat.watches.yoke)
|= [[=wire =ship =term] ? =path]
[%pass wire %agent [ship term] %leave ~]
=^ maybe-tang ap-core (ap-ingest ~ |.([will *agent]))
ap-core
@ -1216,7 +1281,7 @@
|= =ship
^+ ap-core
=/ in=(list [=duct =^ship =path])
~(tap by inbound.watches.yoke)
~(tap by bitt.watches.yoke)
|- ^+ ap-core
?^ in
=? ap-core =(ship ship.i.in)
@ -1225,7 +1290,10 @@
$(in t.in)
::
=/ out=(list [[=wire =^ship =term] ? =path nonce=@])
~(tap by outbound.watches.yoke)
%+ turn ~(tap by boat.watches.yoke)
|= [key=[wire ^ship term] val=[? path]]
:- key
val(+ [+.val (~(got by beat.watches.yoke) key)])
|- ^+ ap-core
?~ out
ap-core
@ -1249,7 +1317,7 @@
^+ ap-core
::
=/ in=(list [=duct =^ship =path])
~(tap by inbound.watches.yoke)
~(tap by bitt.watches.yoke)
|- ^+ ap-core
?~ in ap-core
::
@ -1270,7 +1338,7 @@
?~ target-paths
?~ target-ship
~[agent-duct]
%+ murn ~(tap by inbound.watches.yoke)
%+ murn ~(tap by bitt.watches.yoke)
|= [=duct =ship =path]
^- (unit ^duct)
?: =(target-ship `ship)
@ -1285,7 +1353,7 @@
++ ap-ducts-from-path
|= [target-path=path target-ship=(unit ship)]
^- (list duct)
%+ murn ~(tap by inbound.watches.yoke)
%+ murn ~(tap by bitt.watches.yoke)
|= [=duct =ship =path]
^- (unit ^duct)
?: ?& =(target-path path)
@ -1371,8 +1439,8 @@
attributing.agent-routes :: guest
agent-name :: agent
== ::
:* wex=outbound.watches.yoke :: outgoing
sup=inbound.watches.yoke :: incoming
:* wex=boat.watches.yoke :: outgoing
sup=bitt.watches.yoke :: incoming
== ::
:* act=change.stats.yoke :: tick
eny=eny.stats.yoke :: nonce
@ -1408,8 +1476,8 @@
|= pax=path
^+ ap-core
=/ incoming [attributing.agent-routes pax]
=. inbound.watches.yoke
(~(put by inbound.watches.yoke) agent-duct incoming)
=. bitt.watches.yoke
(~(put by bitt.watches.yoke) agent-duct incoming)
=^ maybe-tang ap-core
%+ ap-ingest %watch-ack |.
(on-watch:ap-agent-core pax)
@ -1487,7 +1555,7 @@
ingest-and-check-error
:: if .agent-wire matches, it's an old pre-nonce subscription
::
?: (~(has by outbound.watches.yoke) sub-key)
?: (~(has by boat.watches.yoke) sub-key)
run-sign
:: if an app happened to use a null wire, no-op
::
@ -1502,7 +1570,7 @@
=: nonce u.has-nonce
agent-wire (tail agent-wire)
==
=/ got (~(get by outbound.watches.yoke) sub-key)
=/ got (~(get by beat.watches.yoke) sub-key)
?~ got
on-missing
?. =(nonce.u.got nonce)
@ -1521,36 +1589,39 @@
(ap-error -.sign leaf/"take %fact failed, closing subscription" u.tan)
::
%kick
=. outbound.watches.yoke
(~(del by outbound.watches.yoke) sub-key)
=: beat.watches.yoke (~(del by beat.watches.yoke) sub-key)
boat.watches.yoke (~(del by boat.watches.yoke) sub-key)
==
::
ingest-and-check-error
::
%watch-ack
?. (~(has by outbound.watches.yoke) sub-key)
%- %: slog
leaf+"{<agent-name>}: got ack for nonexistent subscription"
leaf+"{<dock>}: {<agent-wire>}"
>wire=wire<
~
==
ap-core
=. outbound.watches.yoke
?. (~(has by boat.watches.yoke) sub-key)
%. ap-core
%+ trace odd.veb.bug.state :~
leaf+"{<agent-name>}: got ack for nonexistent subscription"
leaf+"{<dock>}: {<agent-wire>}"
>wire=wire<
==
=? beat.watches.yoke ?=(^ p.sign)
(~(del by beat.watches.yoke) sub-key)
::
=. boat.watches.yoke
?^ p.sign
(~(del by outbound.watches.yoke) sub-key)
(~(del by boat.watches.yoke) sub-key)
::
%+ ~(jab by outbound.watches.yoke) sub-key
|= val=[acked=? =path nonce=@]
=? . acked.val
%.(. (slog leaf+"{<agent-name>} 2nd watch-ack on {<val>}" ~))
val(acked &)
%+ ~(jab by boat.watches.yoke) sub-key
|= val=[acked=? =path]
%. val(acked &)
%^ trace &(odd.veb.bug.state acked.val)
leaf/"{<agent-name>} 2nd watch-ack on {<val>}" ~
::
ingest-and-check-error
==
::
++ on-missing
%. ap-core
%- slog :~
%+ trace odd.veb.bug.state :~
leaf+"{<agent-name>}: got {<-.sign>} for nonexistent subscription"
leaf+"{<dock>}: {<[nonce=nonce agent-wire]>}"
>wire=wire<
@ -1558,7 +1629,7 @@
::
++ on-weird-kick
%. run-sign
%- slog :~
%+ trace odd.veb.bug.state :~
leaf+"{<agent-name>}: got %kick for nonexistent subscription"
leaf+"{<dock>}: {<agent-wire>}"
>wire=wire<
@ -1616,8 +1687,8 @@
^+ ap-core
::
%= ap-core
inbound.watches.yoke
(~(del by inbound.watches.yoke) agent-duct)
bitt.watches.yoke
(~(del by bitt.watches.yoke) agent-duct)
==
:: +ap-load-delete: load delete.
::
@ -1625,13 +1696,13 @@
^+ ap-core
::
=/ maybe-incoming
(~(get by inbound.watches.yoke) agent-duct)
(~(get by bitt.watches.yoke) agent-duct)
?~ maybe-incoming
ap-core
::
=/ incoming u.maybe-incoming
=. inbound.watches.yoke
(~(del by inbound.watches.yoke) agent-duct)
=. bitt.watches.yoke
(~(del by bitt.watches.yoke) agent-duct)
::
=^ maybe-tang ap-core
%+ ap-ingest ~ |.
@ -1737,10 +1808,10 @@
::
=. agent.yoke &++.p.result
=/ moves (zing (turn -.p.result ap-from-internal))
=. inbound.watches.yoke
=. bitt.watches.yoke
(ap-handle-kicks moves)
(ap-handle-peers moves)
:: +ap-handle-kicks: handle cancels of inbound.watches
:: +ap-handle-kicks: handle cancels of bitt.watches
::
++ ap-handle-kicks
~/ %ap-handle-kicks
@ -1756,8 +1827,8 @@
::
=/ quit-map=bitt
(malt (turn quits |=(=duct [duct *[ship path]])))
(~(dif by inbound.watches.yoke) quit-map)
:: +ap-handle-peers: handle new outbound.watches
(~(dif by bitt.watches.yoke) quit-map)
:: +ap-handle-peers: handle new boat.watches
::
++ ap-handle-peers
~/ %ap-handle-peers
@ -1775,17 +1846,21 @@
=/ sys-wire=^wire (scag 6 `^wire`wire)
=/ sub-wire=^wire (slag 6 `^wire`wire)
::
?. (~(has by outbound.watches.yoke) sub-wire dock)
=; =tang
%- (slog tang)
$(moves t.moves)
[leaf+"gall: {<agent-name>} missing subscription, got %leave"]~
=/ have=[acked=? =path nonce=@]
(~(got by outbound.watches.yoke) sub-wire dock)
?. (~(has by boat.watches.yoke) sub-wire dock)
%. $(moves t.moves)
%^ trace odd.veb.bug.state
leaf/"gall: {<agent-name>} missing subscription, got %leave" ~
=/ nonce=@ (~(got by beat.watches.yoke) sub-wire dock)
=. p.move.move
(weld sys-wire [(scot %ud nonce.have) sub-wire])
=. outbound.watches.yoke
(~(del by outbound.watches.yoke) [sub-wire dock])
%+ weld sys-wire
?: =(nonce 0)
:: skip adding nonce to pre-nonce subscription wires
::
sub-wire
[(scot %ud nonce) sub-wire]
=: boat.watches.yoke (~(del by boat.watches.yoke) [sub-wire dock])
beat.watches.yoke (~(del by beat.watches.yoke) [sub-wire dock])
==
$(moves t.moves, new-moves [move new-moves])
?. ?=([* %pass * %g %deal * * ?(%watch %watch-as) *] move)
$(moves t.moves, new-moves [move new-moves])
@ -1795,12 +1870,11 @@
=/ sub-wire=^wire (slag 6 `^wire`wire)
=/ [=dock =deal] [[q.p q] r]:q.move.move
::
?: (~(has by outbound.watches.yoke) sub-wire dock)
?: (~(has by boat.watches.yoke) sub-wire dock)
=. ap-core
=/ =tang
~[leaf+"subscribe wire not unique" >agent-name< >sub-wire< >dock<]
=/ have
(~(got by outbound.watches.yoke) sub-wire dock)
=/ have (~(got by boat.watches.yoke) sub-wire dock)
%- (slog >out=have< tang)
(ap-error %watch-not-unique tang) :: reentrant, maybe bad?
$(moves t.moves)
@ -1812,11 +1886,13 @@
new-moves [move new-moves]
sub-nonce.yoke +(sub-nonce.yoke)
::
outbound.watches.yoke
%+ ~(put by outbound.watches.yoke) [sub-wire dock]
:+ acked=|
path=?+(-.deal !! %watch path.deal, %watch-as path.deal)
sub-nonce.yoke
boat.watches.yoke
%+ ~(put by boat.watches.yoke) [sub-wire dock]
:- acked=|
path=?+(-.deal !! %watch path.deal, %watch-as path.deal)
::
beat.watches.yoke
(~(put by beat.watches.yoke) [sub-wire dock] sub-nonce.yoke)
==
--
--
@ -1860,6 +1936,8 @@
%jolt mo-abet:(mo-jolt:mo-core dude.task our desk.task)
%idle mo-abet:(mo-idle:mo-core dude.task)
%nuke mo-abet:(mo-nuke:mo-core dude.task)
%spew mo-abet:(mo-spew:mo-core veb.task)
%sift mo-abet:(mo-sift:mo-core dudes.task)
%trim [~ gall-payload]
%vega [~ gall-payload]
==
@ -1931,6 +2009,19 @@
acc
(~(put in acc) [dude -.agent.yoke])
::
?: ?& =(%n care)
?=([@ @ ^] path)
=([%$ %da now] coin)
=(our ship)
==
?~ yok=(~(get by yokes.state) dap)
[~ ~]
=/ [=^ship =term =wire]
[(slav %p i.path) i.t.path t.t.path]
?~ nonce=(~(get by beat.watches.u.yok) [wire ship term])
[~ ~]
[~ ~ atom+!>(u.nonce)]
::
?. =(our ship)
~
?. =([%$ %da now] coin)

View File

@ -84,7 +84,7 @@
%+ sort ~(tap by wex.bowl)
|= [[[a=wire *] *] [[b=wire *] *]]
(aor a b)
|= [[=wire =ship =term] [acked=? =path nonce=@]]
|= [[=wire =ship =term] [acked=? =path]]
^- (unit tank)
=; relevant=?
?. relevant ~