Merge pull request #6648 from urbit/yu/enable-close-flows

ames: move +on-kroc logic to |close-flows
This commit is contained in:
Pyry Kovanen 2023-07-24 18:30:53 +03:00 committed by GitHub
commit 65b257b96a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 229 additions and 136 deletions

View File

@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:8f26569c70bdf4950323c9578c602e5b5cea9da57131ce0a8ccc7fee7a26cde8
size 6739960
oid sha256:3fbab4ec845202c742a2ab029b485e0449febc96a4c8877ec88d10684aba09b0
size 6709899

View File

@ -4,6 +4,88 @@
:: To actually close the flows, run with |close-flows, =dry |
::
:- %say
|= [^ arg=~ dry=?]
|= [[now=@da eny=@uvJ bec=beak] arg=~ peer=(unit @p) dry=? veb=?]
::
[%helm-ames-kroc dry]
=/ peers-map
.^((map ship ?(%alien %known)) %ax /(scot %p p.bec)//(scot %da now)/peers)
::
=/ peers=(list ship)
%+ murn ~(tap by peers-map)
|= [=ship val=?(%alien %known)]
?: =(ship p.bec)
~ :: this is weird, but we saw it
?- val
%alien ~
%known (some ship)
==
::
=; bones=(list [ship bone])
:- %helm-ames-kroc
~? dry "#{<(lent bones)>} flows can be closed"
dry^bones
::
%+ roll peers
|= [=ship bones=(list [ship bone])]
?: &(?=(^ peer) !=(u.peer ship))
bones
::
=+ .^ =ship-state:ames
%ax /(scot %p p.bec)//(scot %da now)/peers/(scot %p ship)
==
=/ =peer-state:ames ?>(?=(%known -.ship-state) +.ship-state)
|^
=/ subs=(jar path [bone sub-nonce=@]) resubscriptions
%+ roll ~(tap by subs)
|= [[=wire flows=(list [bone sub-nonce=@])] bones=_bones]
::
%- flop %- tail
%+ roll (sort flows |=([[@ n=@] [@ m=@]] (lte n m)))
|= [[=bone nonce=@] resubs=_(lent flows) bones=_bones]
=/ app=term ?>(?=([%gall %use sub=@ *] wire) i.t.t.wire)
=/ =path (slag 7 wire)
=/ log=tape "[bone={<bone>} agent={<app>} nonce={<nonce>}] {<path>}"
=; corkable=?
=? bones corkable [[ship bone] bones]
(dec resubs)^bones
:: checks if this is a stale re-subscription
::
?. =(resubs 1)
~? veb [ship (weld "stale %watch plea " log)]
&
:: the current subscription can be safely corked if there
:: is a flow with a naxplanation ack on a backward bone
::
=+ backward-bone=(mix 0b10 bone)
?. =(%2 (mod backward-bone 4))
|
?~ (~(get by rcv.peer-state) backward-bone)
|
~? veb [ship (weld "failed %watch plea " log)]
&
::
++ resubscriptions
%+ roll ~(tap by snd.peer-state)
|= $: [=forward=bone message-pump-state:ames]
subs=(jar path [bone sub-nonce=@])
==
?: (~(has in closing.peer-state) forward-bone)
~? veb
:- ship
%+ weld "stale flow bone={<forward-bone>} in closing, "
"#{<~(wyt in live:packet-pump-state)>} packets retrying"
subs
?~ duct=(~(get by by-bone.ossuary.peer-state) forward-bone)
subs
?. ?=([* [%gall %use sub=@ @ %out @ @ *] *] u.duct)
subs
=/ =wire i.t.u.duct
=/ nonce=(unit @) ?~((slag 7 wire) ~ (slaw %ud &8.wire))
%- ~(add ja subs)
:: 0 for old pre-nonce subscriptions
::
:_ [forward-bone ?~(nonce 0 u.nonce)]
?~ nonce wire
:: don't include the sub-nonce in the key
::
(weld (scag 7 wire) (slag 8 wire))
--

View File

@ -4,24 +4,26 @@
:: |stale-flows, =veb %2 :: stale flows that keep (re)trying to connect
:: |stale-flows, =veb %21 :: ... per app (only forward)
:: |stale-flows, =veb %3 :: stale resubscriptions
:: |stale-flows, =veb %4 :: print live naxplanation flows
::
=> |%
+$ subs (jar path [ship bone @])
+$ subs (jar path [ship bone @ close=?])
+$ pags (jar app=term [dst=term =ship =path]) :: per-agent
+$ naks (set [ship bone])
:: verbosity
::
+$ veb ?(%0 %1 %2 %21 %3 %31)
+$ veb ?(%0 %1 %2 %21 %3 %4 ~)
::
++ resubs
|= [=subs =veb]
^- @
::=/ sorted
:: %+ sort ~(tap by subs)
:: |=([a=(list *) b=(list *)] (lte (lent a) (lent b)))
%+ roll ~(tap by subs)::sorted
|= [[k=path v=(list [ship bone @])] num=@]
%+ roll ~(tap by subs)
|= [[k=path v=(list [ship bone @ close=?])] num=@]
=/ in-close=@
(roll v |=([[@ @ @ c=?] n=@] ?:(c +(n) n)))
~? &(=(%3 veb) (gth (lent v) 1))
%+ weld ?: =(in-close 0) ""
"[#{<in-close>} %close] "
"#{<(dec (lent v))>} stale resubs on {<k>}"
?. (gth (lent v) 1) num
(add (dec (lent v)) num)
@ -29,7 +31,7 @@
::
:- %say
|= $: [now=@da eny=@uvJ bec=beak]
[arg=~ dry=? =veb]
[arg=~ peer=(unit @p) dry=? =veb]
==
::
=/ peers-map
@ -45,12 +47,14 @@
%known (some ship)
==
::
=; [[=subs =pags backward=@ forward=@] =naks]
=; [[=subs =pags close=@ incoming=@ outgoing=@ nax=@] =naks]
:- %tang %- flop
%+ weld
:~ leaf+"#{<~(wyt in naks)>} flows from %nacking %watches"
leaf+"#{<backward>} live backward flows with (keep retrying)"
leaf+"#{<forward>} live forward flows with (keep retrying)"
leaf+"#{<incoming>} live backward flows"
leaf+"#{<outgoing>} live forward flows"
leaf+"#{<nax>} live naxplanations"
leaf+"#{<close>} flows in closing"
leaf+"#{<(resubs subs veb)>} stale resubscriptions"
==
?. =(%21 veb) ~
@ -60,13 +64,17 @@
(gth (lent v) (lent w))
|= [app=term v=(list [dst=term =ship =path])]
:- %leaf
%+ weld "#{<(lent v)>} flows for {<app>} with >10 retries"
%+ weld "#{<(lent v)>} flows for {<app>}"
?. =(1 (lent v)) ~
?> ?=(^ v)
" on {<ship.i.v>} to {<dst.i.v>} at {<path.i.v>}"
::
%+ roll peers
|= [=ship [=subs p=pags b=@ f=@] n=naks]
|= [=ship [=subs p=pags cl=@ in=@ ou=@ na=@] =naks]
?: ?& ?=(^ peer)
!=(u.peer ship)
==
+<+
=+ .^ =ship-state:ames
%ax /(scot %p p.bec)//(scot %da now)/peers/(scot %p ship)
==
@ -78,55 +86,63 @@
%+ roll ~(tap by snd.peer-state)
|= $: [=bone message-pump-state:ames]
subs=_subs pags=_p
backward=_b forward=_f
close=_cl
incoming=_in outgoing=_ou nax=_na
==
=, packet-pump-state
=+ closing=(~(has ^in closing.peer-state) bone)
:- ?~ duct=(~(get by by-bone.ossuary.peer-state) bone) subs
?. ?=([* [%gall %use sub=@ @ %out @ @ nonce=@ pub=@ *] *] u.duct)
?. ?=([* [%gall %use sub=@ @ %out @ @ *] *] u.duct)
subs
=/ =wire i.t.u.duct
=/ nonce=(unit @) (rush i.t.t.t.t.t.t.t.i.t.u.duct dem)
=/ =wire i.t.u.duct
=/ nonce=(unit @ud)
?~ (slag 7 wire) ~
(slaw %ud &8.wire)
%- ~(add ja subs)
:_ [ship bone ?~(nonce 0 u.nonce)] :: 0, 1?
?~ nonce wire
:_ [ship bone ?~(nonce 0 u.nonce) closing] :: 0 = pre-nonce subscriptions
?~ nonce
wire
:: don't include the sub-nonce in the key
::
(weld (scag 7 wire) (slag 8 wire))
%+ roll ~(tap in live)
|= $: [[msg=@ frag=@] [packet-state:ames *]]
pags=_pags
out=[b=_backward f=_forward]
==
::
?~ live [pags close incoming outgoing nax]
:: only forward flows
::
=? pags &(=(0 (end 0 bone)) (gth tries 10))
=? pags =(%0 (mod bone 4))
?~ duct=(~(get by by-bone.ossuary.peer-state) bone)
pags
?. ?=([* [%gall %use sub=@ @ %out @ @ nonce=@ pub=@ *] *] u.duct)
?. ?=([* [%gall %use sub=@ @ %out @ @ *] *] u.duct)
pags
=/ =wire i.t.u.duct
(~(add ja pags) (snag 2 wire) (snag 8 wire) ship (slag 9 wire))
(~(add ja pags) (snag 2 wire) (snag 6 wire) ship (slag 7 wire))
::
~? &(=(%2 veb) (gth tries 10))
=+ arrow=?:(=(0 (end 0 bone)) "<-" "->")
=+ closing=(~(has in closing.peer-state) bone)
%+ weld "{arrow} ({(cite:title ship)}) bone=#{<bone>} "
"closing={<closing>} msg=#{<msg>} frag=#{<frag>} #{<tries>}"
:- pags
=? out (gth tries 10)
?: =(0 (end 0 bone))
[b.out +(f.out)]
[+(b.out) f.out]
out
=? close closing +(close)
~? =(%2 veb)
=/ arrow=tape
?+ (mod bone 4) ~|([%odd-bone bone] !!)
%0 "<-"
%1 "->"
%3 "<-"
==
"{arrow} ({(cite:title ship)}) bone=#{<bone>} closing={<closing>}"
::
=/ is-nax=? =(%3 (mod bone 4))
~? &(=(%4 veb) is-nax)
"nax: ({(cite:title ship)}) bone=#{<bone>} closing={<closing>}"
:+ pags close
?+ (mod bone 4) ~|([%odd-bone bone] !!)
%0 [incoming +(outgoing) nax]
%1 [+(incoming) outgoing nax]
%3 [incoming outgoing +(nax)]
==
::
++ nacks
%+ roll ~(tap by rcv.peer-state)
|= [[=bone *] nacks=_n]
|= [[=bone *] n=_naks]
?. &(=(0 (end 0 bone)) =(1 (end 0 (rsh 0 bone))))
:: not a naxplanation ack bone
::
nacks
n
:: by only corking forward flows that have received
:: a nack we avoid corking the current subscription
::
@ -134,16 +150,18 @@
:: make sure that the nack bone has a forward flow
::
?~ duct=(~(get by by-bone.ossuary.peer-state) target)
nacks
n
?. ?=([* [%gall %use sub=@ @ %out @ @ nonce=@ pub=@ *] *] u.duct)
nacks
n
=/ =wire i.t.u.duct
=+ closing=(~(has ^in closing.peer-state) bone)
?> ?=([%gall %use sub=@ @ %out @ @ nonce=@ pub=@ *] wire)
=/ app=term i.t.t.wire
=/ nonce=@
=- ?~(- 0 u.-)
(rush i.t.t.t.t.t.t.t.wire dem)
=/ =path t.t.t.t.t.t.t.t.wire
~? =(%1 veb) "[bone={<target>} nonce={<nonce>} agent={<app>}] {<path>}"
(~(put in nacks) [ship target])
(slaw %ud &8.wire)
=/ =path |8.wire
~? =(%1 veb)
"[bone={<target>} nonce={<nonce>} agent={<app>} close={<closing>}] {<path>}"
(~(put ^in n) [ship target])
--

View File

@ -246,8 +246,9 @@
(emit %pass /helm %arvo %a %stir '')
::
++ poke-ames-kroc
|= dry=? =< abet
(emit %pass /helm %arvo %a %kroc dry)
|= [dry=? bones=(list [ship bone])] =< abet
?: dry this
(emit %pass /helm %arvo %a %kroc bones)
::
++ poke-ames-cong
|= cong=[msg=@ud mem=@ud] =< abet

View File

@ -767,7 +767,7 @@
:: %heed: track peer's responsiveness; gives %clog if slow
:: %jilt: stop tracking peer's responsiveness
:: %cork: request to delete message flow
:: %kroc: request to delete stale message flows
:: %kroc: request to delete specific message flows, from their bones
:: %plea: request to send message
:: %deep: deferred calls to %ames, from itself
::
@ -791,11 +791,12 @@
:: %vega: kernel reload notification
::
+$ task
$+ ames-task
$% [%hear =lane =blob]
[%heed =ship]
[%jilt =ship]
[%cork =ship]
[%kroc dry=?]
[%kroc bones=(list [ship bone])]
$>(%plea vane-task)
[%deep =deep]
::

View File

@ -601,6 +601,7 @@
==
::
+$ bug-9
$+ bug-9
$: veb=_[`?`%.n `?`%.n `?`%.n `?`%.n `?`%.n `?`%.n `?`%.n]
ships=(set ship)
==
@ -615,7 +616,7 @@
==
::
+$ ship-state-6
$+ peer-state-6
$+ ship-state-6
$% [%alien alien-agenda-12]
[%known peer-state-6]
==
@ -1158,6 +1159,31 @@
num-live=@ud
counter=@ud
==
::
+$ queued-event-11-and-15
$+ queued-event-11-and-15
$% [%call =duct wrapped-task=(hobo task-11-and-15)]
[%take =wire =duct =sign]
==
::
+$ task-11-and-15
$+ task-11-and-15
$% [%kroc dry=?]
[%snub ships=(list ship)]
$<(?(%snub %kroc) task)
==
::
+$ queued-event-15
$+ queued-event-15
$% [%call =duct wrapped-task=(hobo task-15)]
[%take =wire =duct =sign]
==
::
+$ task-15
$+ task-15
$% [%kroc dry=?]
$<(%kroc task)
==
:: $bug: debug printing configuration
::
:: veb: verbosity toggles
@ -1474,42 +1500,42 @@
== ==
$: %10
$% $: %larva
events=(qeu queued-event-11)
events=(qeu queued-event-11-and-15)
state=ames-state-10
==
[%adult state=ames-state-10]
== ==
$: %11
$% $: %larva
events=(qeu queued-event-11)
events=(qeu queued-event-11-and-15)
state=ames-state-11
==
[%adult state=ames-state-11]
== ==
$: %12
$% $: %larva
events=(qeu queued-event)
events=(qeu queued-event-15)
state=ames-state-12
==
[%adult state=ames-state-12]
== ==
$: %13
$% $: %larva
events=(qeu queued-event)
events=(qeu queued-event-15)
state=ames-state-13
==
[%adult state=ames-state-13]
== ==
$: %14
$% $: %larva
events=(qeu queued-event)
events=(qeu queued-event-15)
state=ames-state-14
==
[%adult state=ames-state-14]
== ==
$: %15
$% $: %larva
events=(qeu queued-event)
events=(qeu queued-event-15)
state=ames-state-15
==
[%adult state=ames-state-15]
@ -1592,7 +1618,7 @@
[%10 %larva *]
~> %slog.1^leaf/"ames: larva: load"
=. cached-state `[%10 state.old]
=. queued-events (event-11-to-12 events.old)
=. queued-events (event-11-to-16 events.old)
larval-gate
::
[%11 %adult *]
@ -1603,7 +1629,7 @@
[%11 %larva *]
~> %slog.1^leaf/"ames: larva: load"
=. cached-state `[%11 state.old]
=. queued-events (event-11-to-12 events.old)
=. queued-events (event-11-to-16 events.old)
larval-gate
::
[%12 %adult *]
@ -1614,7 +1640,7 @@
[%12 %larva *]
~> %slog.1^leaf/"ames: larva: load"
=. cached-state `[%12 state.old]
=. queued-events events.old
=. queued-events (event-15-to-16 events.old)
larval-gate
::
[%13 %adult *]
@ -1625,7 +1651,7 @@
[%13 %larva *]
~> %slog.1^leaf/"ames: larva: load"
=. cached-state `[%13 state.old]
=. queued-events events.old
=. queued-events (event-15-to-16 events.old)
larval-gate
::
[%14 %adult *]
@ -1636,7 +1662,7 @@
[%14 %larva *]
~> %slog.1^leaf/"ames: larva: load"
=. cached-state `[%14 state.old]
=. queued-events events.old
=. queued-events (event-15-to-16 events.old)
larval-gate
::
[%15 %adult *]
@ -1647,7 +1673,7 @@
[%15 %larva *]
~> %slog.1^leaf/"ames: larva: load"
=. cached-state `[%15 state.old]
=. queued-events events.old
=. queued-events (event-15-to-16 events.old)
larval-gate
::
[%16 %adult *] (load:adult-core %16 state.old)
@ -1672,6 +1698,34 @@
%= e
wrapped-task ?.(?=(%snub -.task) task [%snub %deny ships.task])
==
::
++ event-11-to-16
|= events=(qeu queued-event-11-and-15)
^- (qeu queued-event)
%- ~(rep in events)
|= [e=queued-event-11-and-15 q=(qeu queued-event)]
%- ~(put to q) ^- queued-event
?. ?=(%call -.e) e
=/ task=task-11-and-15 ((harden task-11-and-15) wrapped-task.e)
%= e
wrapped-task
?+ -.task task
%snub [%snub %deny ships.task]
%kroc [%kroc ~]
==
==
::
++ event-15-to-16
|= events=(qeu queued-event-15)
^- (qeu queued-event)
%- ~(rep in events)
|= [e=queued-event-15 q=(qeu queued-event)]
%- ~(put to q) ^- queued-event
?. ?=(%call -.e) e
=/ task=task-15 ((harden task-15) wrapped-task.e)
%= e
wrapped-task ?.(?=(%kroc -.task) task [%kroc ~])
==
--
:: +molt: re-evolve to adult-ames
::
@ -2275,74 +2329,11 @@
:: +on-kroc: cork all stale flows from failed subscriptions
::
++ on-kroc
|= dry=?
|= bones=(list [ship bone])
^+ event-core
:: no-op
::
?: & %.(event-core (slog leaf/"ames: %kroc task not ready" ~))
::
=; [corks=@ core=_event-core]
?. dry core
%.(core (slog leaf/"ames: #{<corks>} flows can be corked" ~))
::
%+ roll ~(tap by peers.ames-state)
|= [[=ship =ship-state] corks=@ core=_event-core]
?. ?=(%known -.ship-state)
corks^core
=/ =peer-state:ames ?>(?=(%known -.ship-state) +.ship-state)
=/ subs=(jar path [bone sub-nonce=@])
%+ roll ~(tap by snd.peer-state)
|= $: [=forward=bone message-pump-state:ames]
subs=(jar path [bone sub-nonce=@])
==
?: (~(has in closing.peer-state) forward-bone)
%. subs
%^ ev-trace &(dry odd.veb) ship
|.
%+ weld "stale flow bone={<forward-bone>} in closing, "
"#{<~(wyt in live:packet-pump-state)>} packets retrying"
?~ duct=(~(get by by-bone.ossuary.peer-state) forward-bone)
subs
?. ?=([* [%gall %use sub=@ @ %out @ @ nonce=@ pub=@ *] *] u.duct)
subs
=/ =wire i.t.u.duct
=/ nonce=(unit @) (rush (snag 7 wire) dem)
%- ~(add ja subs)
:: 0 for old pre-nonce subscriptions
::
:_ [forward-bone ?~(nonce 0 u.nonce)]
?~ nonce wire
:: don't include the sub-nonce in the key
::
(weld (scag 7 wire) (slag 8 wire))
%+ roll ~(tap by subs)
|= [[=wire flows=(list [bone sub-nonce=@])] corks=_corks core=_core]
::
%- tail
%+ roll (sort flows |=([[@ n=@] [@ m=@]] (lte n m)))
|= [[=bone nonce=@] resubs=_(lent flows) corks=_corks core=_core]
=/ app=term ?>(?=([%gall %use sub=@ *] wire) i.t.t.wire)
=/ =path (slag 7 wire)
=/ log=tape "[bone={<bone>} agent={<app>} nonce={<nonce>}] {<path>}"
=; corkable=?
=? corks corkable +(corks)
=? core &(corkable !dry) (%*(on-cork core cork-bone `bone) ship)
(dec resubs)^corks^core
:: checks if this is a stale re-subscription
::
?. =(resubs 1)
%. &
(ev-trace &(dry odd.veb) ship |.((weld "stale %watch plea " log)))
:: the current subscription can be safely corked if there
:: is a flow with a naxplanation ack on a backward bone
::
=+ backward-bone=(mix 0b10 bone)
?. =(2 (mod backward-bone 4))
|
?~ (~(get by rcv.peer-state) backward-bone)
|
%. &
(ev-trace &(dry odd.veb) ship |.((weld "failed %watch plea " log)))
%+ roll bones
|= [[=ship =bone] co=_event-core]
(%*(on-cork co cork-bone `bone) ship)
:: +on-deep: deferred %ames calls from itself
::
++ on-deep
@ -4862,7 +4853,7 @@
%vega on-vega:event-core
%plea (on-plea:event-core [ship plea]:task)
%cork (on-cork:event-core ship.task)
%kroc (on-kroc:event-core dry.task)
%kroc (on-kroc:event-core bones.task)
%deep (on-deep:event-core deep.task)
::
%keen (on-keen:event-core +.task)