ames: clean all stale %watches in on-kroc

This removes the logic from cleaning up stale subscriptions in %gall,
leaving +ap-rake as it was, and moves it to the +on-kroc arm in %ames.

Failed subscriptions from nacking a %watch plea that were
not properly corked (fixed in https://github.com/urbit/urbit/pull/6102)
are a subset of the more general "stale re-subscription" issue, so
we take care of all stale flows at the same time, by focusing on the
current  subscription—leaving all others to be corked automatically—and
checking if it received a nack, to subsequently cork it.
This commit is contained in:
yosoyubik 2023-01-07 10:14:01 +01:00
parent ad712caccf
commit 1e04e9498e
3 changed files with 48 additions and 53 deletions

View File

@ -1810,7 +1810,7 @@
[%load =load] :: load agent
[%nuke =dude] :: delete agent
[%doff dude=(unit dude) ship=(unit ship)] :: kill subscriptions
[%rake dude=(unit dude) mode=?(%o %z %r) dry=?] :: reclaim old subs
[%rake dude=(unit dude) all=?] :: reclaim old subs
$>(%init vane-task) :: set owner
$>(%trim vane-task) :: trim state
$>(%vega vane-task) :: report upgrade

View File

@ -1918,7 +1918,7 @@
=/ rcvr [ship her-life.channel]
"cork plea {<sndr rcvr bone=bone vane.plea path.plea>}"
abet:(on-memo:(make-peer-core peer-state channel) bone plea %plea)
:: +on-kroc: cork all stale flows from failed subscriptions
:: +on-kroc: cork all flows from failed subscriptions
::
++ on-kroc
|= dry=?
@ -1927,27 +1927,48 @@
|= [[=ship =ship-state] core=_event-core]
?. ?=(%known -.ship-state) core
=/ =peer-state:ames ?>(?=(%known -.ship-state) +.ship-state)
=/ dudes ;; (map dude:gall nonce=@)
=< q.q %- need %- need
(rof ~ %gf `beam`[[our %$ da+now] /])
::
%+ roll ~(tap by rcv.peer-state)
|= [[=bone *] core=_core]
?. &(=(0 (end 0 bone)) =(1 (end 0 (rsh 0 bone))))
:: not a naxplanation ack bone
=/ subs=(jar path [bone nonce=@])
%+ roll ~(tap by snd.peer-state)
|= [[=bone *] subs=(jar path [bone nonce=@])]
?~ duct=(~(get by by-bone.ossuary.peer-state) bone) subs
?. ?=([* [%gall %use sub=@ @ %out @ @ nonce=@ pub=@ *] *] u.duct)
subs
=/ nonce=@ (rash i.t.t.t.t.t.t.t.i.t.u.duct dem)
=/ =wire i.t.u.duct
:: don't include the sub-nonce in the key
::
core
:: by only corking forward flows that have received
:: a nack we avoid corking the current subscription
(~(add ja subs) (weld (scag 7 wire) (slag 8 wire)) [bone nonce])
%+ roll ~(tap by subs)
|= [[=wire flows=(list [bone @])] core=_core]
::
%+ roll flows
|= [[=bone sub-nonce=@] core=_core]
?: (~(has in closing.peer-state) bone) core
=/ app=term ?>(?=([%gall %use sub=@ *] wire) i.t.t.wire)
=/ nonce=@ud (dec (~(got by dudes) app))
=/ =path (slag 7 wire)
=/ log=tape "[bone={<bone>} nonce={<sub-nonce>} agent={<app>}] {<path>}"
=; corkable=?
?.(corkable core (%*(on-cork core cork-bone `bone) ship))
:: check if the current subscription also failed
::
?. =(sub-nonce nonce)
%. !dry
(trace |(dry odd.veb) ship |.((weld "stale %watch plea " log)))
:: we can safely cork the current subscription
:: if it received a nack on backward bone
::
=+ target=(mix 0b10 bone)
:: make sure that the nack bone has a forward flow
:: not a naxplanation ack bone
::
?~ duct=(~(get by by-bone.ossuary.peer-state) target)
core
?. ?=([* [%gall %use sub=@ @ %out @ @ nonce=@ pub=@ *] *] u.duct)
core
%- %^ trace |(dry odd.veb) ship
|.("kroc failed %watch plea {<bone=target>}")
?: dry core
(%*(on-cork core cork-bone `target) ship)
?. &(=(0 (end 0 target)) =(1 (end 0 (rsh 0 target))))
|
%. !dry
(trace |(dry odd.veb) ship |.((weld "failed %watch plea " log)))
:: +on-take-wake: receive wakeup or error notification from behn
::
++ on-take-wake

View File

@ -244,7 +244,7 @@
:: +mo-rake: send %cork's for old subscriptions if needed
::
++ mo-rake
|= [dude=(unit dude) mode=?(%o %z %r) dry=?]
|= [dude=(unit dude) all=?]
^+ mo-core
=/ apps=(list (pair term yoke))
?~ dude ~(tap by yokes.state)
@ -252,7 +252,7 @@
|- ^+ mo-core
?~ apps mo-core
=/ ap-core (ap-yoke:ap p.i.apps [~ our] q.i.apps)
$(apps t.apps, mo-core ap-abet:(ap-rake:ap-core mode dry))
$(apps t.apps, mo-core ap-abet:(ap-rake:ap-core all))
:: +mo-receive-core: receives an app core built by %ford.
::
:: Presuming we receive a good core, we first check to see if the agent
@ -1512,49 +1512,36 @@
=? ap-core (gte 1 (~(got by boar.yoke) wyr dok))
(ap-pass wyr %agent dok %leave ~)
$(subs t.subs)
:: +ap-rake: clean up the dead subscriptions
::
:: =mode %o: kill old pre-nonce subscriptions
:: =mode %z: kill old pre-nonce subscriptions, including sub-nonce = 0
:: =mode %r: kills all stale resubscription flows
:: +ap-rake: clean up the dead %leave's
::
++ ap-rake
|= [mode=?(%o %z %r) dry=?]
|= all=?
=/ subs ~(tap in ~(key by boat.yoke))
|^ ^+ ap-core
?~ subs ap-core
=/ [=wire =dock] i.subs
=/ non=@ud (~(got by boar.yoke) wire dock)
~& [mode sub-nonce=sub-nonce.yoke nonce=non]
?. &(=(%zer mode) =(0 non))
=/ non (~(got by boar.yoke) wire dock)
?: &(!all =(0 non))
$(subs t.subs)
?~ per=(scry-peer-state p.dock)
$(subs t.subs)
:: skip current subscription in %res mode
::
?. ?& =(%res mode)
!=(sub-nonce.yoke 0)
(lth non (dec sub-nonce.yoke))
==
$(subs t.subs)
::
=/ sub-nonce=@t ?.(=(%res mode) '0' (scot %ud non))
=/ dud=(set duct)
=/ mod=^wire
:* %gall %use agent-name run-nonce.yoke
%out (scot %p p.dock) q.dock
sub-nonce wire
'0' wire
==
%- ~(rep by by-duct.ossuary.u.per)
|= [[=duct =bone] out=(set duct)]
^+ out
?. ?& ?=([* [%gall %use @ @ %out @ @ @ *] *] duct)
=(mod i.t.duct(i.t.t.t.t.t.t.t sub-nonce))
=(mod i.t.duct(i.t.t.t.t.t.t.t '0'))
==
out
?: (~(has in closing.u.per) bone) out
~> %slog.0^leaf+"gall: rake {<i.t.duct>}"
?:(dry out (~(put in out) duct))
(~(put in out) duct)
::
%- ap-move
(turn ~(tap in dud) |=(d=duct [+.d %pass -.d %a %cork p.dock]))
@ -2002,19 +1989,6 @@
[~ ~]
[~ ~ atom+!>(u.nonce)]
::
?: ?& =(%r care)
?=(~ path)
=([%$ %da now] coin)
=(our ship)
==
=| agents=(list [app=term sub-nonce=@ud run-nonce=@t =boat =boar])
:+ ~ ~
:- %noun !> ^+ agents
%+ roll ~(tap by yokes.state)
|= [[agent=term =yoke] agents=_agents]
:_ agents
[agent [sub-nonce run-nonce boat boar]:yoke]
::
?. =(our ship)
~
?. =([%$ %da now] coin)