ames: move +on-kroc logic to |close-flows

+on-kroc was cluttered with ad-hoc logic to indentify stale flows from
failed resubscriptions that were not properly %corked. Here we move
that logic to a generator that, if not in dry mode, will call %ames with a
(list  [ship bone]) to %cork them.

Another option would be to move the logic in the generator to a state
update in ames, which will trigger possibly thousands of %ames messages
to be sent, on every ship that runs the state migration—these flows are
not causing a problem that neds to be addressed, and only take extra
space.

If we decide that this needs to be run by everyone, one solution could be
to set up a timer (maybe taking advantage of the fact that ships don't get
the OTA a the same time) that will eventually poke %hood with a
%helm-ames-kroc task.
This commit is contained in:
yosoyubik 2023-06-07 15:55:50 +02:00
parent 2c854d1285
commit 68db0b4e03
4 changed files with 119 additions and 76 deletions

View File

@ -4,6 +4,88 @@
:: To actually close the flows, run with |close-flows, =dry |
::
:- %say
|= [^ arg=~ dry=?]
|= [[now=@da eny=@uvJ bec=beak] arg=~ peer=(unit @p) dry=? veb=?]
::
[%helm-ames-kroc dry]
=/ peers-map
.^((map ship ?(%alien %known)) %ax /(scot %p p.bec)//(scot %da now)/peers)
::
=/ peers=(list ship)
%+ murn ~(tap by peers-map)
|= [=ship val=?(%alien %known)]
?: =(ship p.bec)
~ :: this is weird, but we saw it
?- val
%alien ~
%known (some ship)
==
::
=; bones=(list [ship bone])
:- %helm-ames-kroc
~? dry "#{<(lent bones)>} flows can be closed"
dry^bones
::
%+ roll peers
|= [=ship bones=(list [ship bone])]
?: &(?=(^ peer) !=(u.peer ship))
bones
::
=+ .^ =ship-state:ames
%ax /(scot %p p.bec)//(scot %da now)/peers/(scot %p ship)
==
=/ =peer-state:ames ?>(?=(%known -.ship-state) +.ship-state)
|^
=/ subs=(jar path [bone sub-nonce=@]) resubscriptions
%+ roll ~(tap by subs)
|= [[=wire flows=(list [bone sub-nonce=@])] bones=_bones]
::
%- tail
%+ roll flows
|= [[=bone nonce=@] resubs=_(lent flows) bones=_bones]
=/ app=term ?>(?=([%gall %use sub=@ *] wire) i.t.t.wire)
=/ =path (slag 7 wire)
=/ log=tape "[bone={<bone>} agent={<app>} nonce={<nonce>}] {<path>}"
=; corkable=?
=? bones corkable [[ship bone] bones]
(dec resubs)^bones
:: checks if this is a stale re-subscription
::
?. =(resubs 1)
~? veb [ship (weld "stale %watch plea " log)]
&
:: the current subscription can be safely corked if there
:: is a flow with a naxplanation ack on a backward bone
::
=+ backward-bone=(mix 0b10 bone)
?. =(%2 (mod backward-bone 4))
|
?~ (~(get by rcv.peer-state) backward-bone)
|
~? veb [ship (weld "failed %watch plea " log)]
&
::
++ resubscriptions
%+ roll ~(tap by snd.peer-state)
|= $: [=forward=bone message-pump-state:ames]
subs=(jar path [bone sub-nonce=@])
==
?: (~(has in closing.peer-state) forward-bone)
~? veb
:- ship
%+ weld "stale flow bone={<forward-bone>} in closing, "
"#{<~(wyt in live:packet-pump-state)>} packets retrying"
subs
?~ duct=(~(get by by-bone.ossuary.peer-state) forward-bone)
subs
?. ?=([* [%gall %use @ @ %out @ @ nonce=@ @ *] *] u.duct)
subs
=/ =wire i.t.u.duct
=/ nonce=(unit @) (slaw %ud &8.wire)
%- ~(add ja subs)
:: 0 for old pre-nonce subscriptions
::
:_ [forward-bone ?~(nonce 0 u.nonce)]
?~ nonce wire
:: don't include the sub-nonce in the key
::
(weld (scag 7 wire) (slag 8 wire))
--

View File

@ -246,8 +246,9 @@
(emit %pass /helm %arvo %a %stir '')
::
++ poke-ames-kroc
|= dry=? =< abet
(emit %pass /helm %arvo %a %kroc dry)
|= [dry=? bones=(list [ship bone])] =< abet
?: dry this
(emit %pass /helm %arvo %a %kroc bones)
::
++ poke-ames-cong
|= cong=[msg=@ud mem=@ud] =< abet

View File

@ -765,7 +765,7 @@
:: %heed: track peer's responsiveness; gives %clog if slow
:: %jilt: stop tracking peer's responsiveness
:: %cork: request to delete message flow
:: %kroc: request to delete stale message flows
:: %kroc: request to delete specific message flows, from their bones
:: %plea: request to send message
::
:: Remote Scry Tasks
@ -792,7 +792,7 @@
[%heed =ship]
[%jilt =ship]
[%cork =ship]
[%kroc dry=?]
[%kroc bones=(list [ship bone])]
$>(%plea vane-task)
::
[%keen spar]
@ -1190,6 +1190,7 @@
:: rto: retransmission timeout
:: rtt: roundtrip time estimate, low-passed using EWMA
:: rttvar: mean deviation of .rtt, also low-passed with EWMA
:: num-live: how many packets sent, awaiting ack
:: ssthresh: slow-start threshold
:: cwnd: congestion window; max unacked packets
::

View File

@ -1141,6 +1141,16 @@
num-live=@ud
counter=@ud
==
::
+$ queued-event-14
$% [%call =duct wrapped-task=(hobo task-14)]
[%take =wire =duct =sign]
==
::
+$ task-14
$% [%kroc dry=?]
$<(%kroc task)
==
:: $bug: debug printing configuration
::
:: veb: verbosity toggles
@ -1473,7 +1483,7 @@
== ==
$: %14
$% $: %larva
events=(qeu queued-event)
events=(qeu queued-event-14)
state=ames-state-14
==
[%adult state=ames-state-14]
@ -1600,7 +1610,7 @@
[%14 %larva *]
~> %slog.1^leaf/"ames: larva: load"
=. cached-state `[%14 state.old]
=. queued-events events.old
=. queued-events (event-14-to-15 events.old)
larval-gate
::
[%15 %adult *] (load:adult-core %15 state.old)
@ -1625,6 +1635,18 @@
%= e
wrapped-task ?.(?=(%snub -.task) task [%snub %deny ships.task])
==
::
++ event-14-to-15
|= events=(qeu queued-event-14)
^- (qeu queued-event)
%- ~(rep in events)
|= [e=queued-event-14 q=(qeu queued-event)]
%- ~(put to q) ^- queued-event
?. ?=(%call -.e) e
=/ task=task-14 ((harden task-14) wrapped-task.e)
%= e
wrapped-task ?.(?=(%kroc -.task) task [%kroc ~])
==
--
:: +molt: re-evolve to adult-ames
::
@ -2200,74 +2222,11 @@
:: +on-kroc: cork all stale flows from failed subscriptions
::
++ on-kroc
|= dry=?
|= bones=(list [ship bone])
^+ event-core
:: no-op
::
?: & %.(event-core (slog leaf/"ames: %kroc task not ready" ~))
::
=; [corks=@ core=_event-core]
?. dry core
%.(core (slog leaf/"ames: #{<corks>} flows can be corked" ~))
::
%+ roll ~(tap by peers.ames-state)
|= [[=ship =ship-state] corks=@ core=_event-core]
?. ?=(%known -.ship-state)
corks^core
=/ =peer-state:ames ?>(?=(%known -.ship-state) +.ship-state)
=/ subs=(jar path [bone sub-nonce=@])
%+ roll ~(tap by snd.peer-state)
|= $: [=forward=bone message-pump-state:ames]
subs=(jar path [bone sub-nonce=@])
==
?: (~(has in closing.peer-state) forward-bone)
%. subs
%^ ev-trace &(dry odd.veb) ship
|.
%+ weld "stale flow bone={<forward-bone>} in closing, "
"#{<~(wyt in live:packet-pump-state)>} packets retrying"
?~ duct=(~(get by by-bone.ossuary.peer-state) forward-bone)
subs
?. ?=([* [%gall %use sub=@ @ %out @ @ nonce=@ pub=@ *] *] u.duct)
subs
=/ =wire i.t.u.duct
=/ nonce=(unit @) (rush (snag 7 wire) dem)
%- ~(add ja subs)
:: 0 for old pre-nonce subscriptions
::
:_ [forward-bone ?~(nonce 0 u.nonce)]
?~ nonce wire
:: don't include the sub-nonce in the key
::
(weld (scag 7 wire) (slag 8 wire))
%+ roll ~(tap by subs)
|= [[=wire flows=(list [bone sub-nonce=@])] corks=_corks core=_core]
::
%- tail
%+ roll (sort flows |=([[@ n=@] [@ m=@]] (lte n m)))
|= [[=bone nonce=@] resubs=_(lent flows) corks=_corks core=_core]
=/ app=term ?>(?=([%gall %use sub=@ *] wire) i.t.t.wire)
=/ =path (slag 7 wire)
=/ log=tape "[bone={<bone>} agent={<app>} nonce={<nonce>}] {<path>}"
=; corkable=?
=? corks corkable +(corks)
=? core &(corkable !dry) (%*(on-cork core cork-bone `bone) ship)
(dec resubs)^corks^core
:: checks if this is a stale re-subscription
::
?. =(resubs 1)
%. &
(ev-trace &(dry odd.veb) ship |.((weld "stale %watch plea " log)))
:: the current subscription can be safely corked if there
:: is a flow with a naxplanation ack on a backward bone
::
=+ backward-bone=(mix 0b10 bone)
?. =(2 (mod backward-bone 4))
|
?~ (~(get by rcv.peer-state) backward-bone)
|
%. &
(ev-trace &(dry odd.veb) ship |.((weld "failed %watch plea " log)))
%+ roll bones
|= [[=ship =bone] co=_event-core]
(%*(on-cork co cork-bone `bone) ship)
:: +on-take-wake: receive wakeup or error notification from behn
::
++ on-take-wake
@ -4677,7 +4636,7 @@
%vega on-vega:event-core
%plea (on-plea:event-core [ship plea]:task)
%cork (on-cork:event-core ship.task)
%kroc (on-kroc:event-core dry.task)
%kroc (on-kroc:event-core bones.task)
::
%keen (on-keen:event-core +.task)
%yawn (on-cancel-scry:event-core | +.task)