mirror of
https://github.com/ilyakooo0/urbit.git
synced 2024-12-21 05:41:43 +03:00
Merge pull request #6129 from urbit/yu/clean-flows
ames: add |close-flows
This commit is contained in:
commit
7de9c45c38
@ -1,3 +1,3 @@
|
||||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:55f1bcccf861b3a2247aabb6ec3349829272ef8c36cfc8a16ff0930aac87a24d
|
||||
size 5748196
|
||||
oid sha256:2035ef65290065edbd99a86f9f5a36978617bc1983131fa474a9a5c0e91dc15d
|
||||
size 5998440
|
||||
|
9
pkg/arvo/gen/hood/close-flows.hoon
Normal file
9
pkg/arvo/gen/hood/close-flows.hoon
Normal file
@ -0,0 +1,9 @@
|
||||
:: |close-flows: corks all stale ames flows
|
||||
::
|
||||
:: It runs in dry mode by default, printing the flows that can be closed.
|
||||
:: To actually close the flows, run with |close-flows, =dry |
|
||||
::
|
||||
:- %say
|
||||
|= [^ arg=~ dry=?]
|
||||
::
|
||||
[%helm-kroc dry]
|
141
pkg/arvo/gen/stale-flows.hoon
Normal file
141
pkg/arvo/gen/stale-flows.hoon
Normal file
@ -0,0 +1,141 @@
|
||||
:: +stale-flows: prints number of ames flows that can be closed
|
||||
::
|
||||
:: |stale-flows, =veb %1 :: flows from nacking initial subscriptions
|
||||
:: |stale-flows, =veb %2 :: stale flows that keep (re)trying to connect
|
||||
:: |stale-flows, =veb %21 :: ... per app (only forward)
|
||||
:: |stale-flows, =veb %3 :: stale resubscriptions
|
||||
::
|
||||
=> |%
|
||||
+$ subs (jar path [ship bone @])
|
||||
+$ pags (jar app=term [dst=term =ship =path]) :: per-agent
|
||||
+$ naks (set [ship bone])
|
||||
:: verbosity
|
||||
::
|
||||
+$ veb ?(%0 %1 %2 %21 %3 %31)
|
||||
::
|
||||
++ resubs
|
||||
|= [=subs =veb]
|
||||
^- @
|
||||
::=/ sorted
|
||||
:: %+ sort ~(tap by subs)
|
||||
:: |=([a=(list *) b=(list *)] (lte (lent a) (lent b)))
|
||||
%+ roll ~(tap by subs)::sorted
|
||||
|= [[k=path v=(list [ship bone @])] num=@]
|
||||
~? &(=(%3 veb) (gth (lent v) 1))
|
||||
"#{<(dec (lent v))>} stale resubs on {<k>}"
|
||||
?. (gth (lent v) 1) num
|
||||
(add (lent v) num)
|
||||
--
|
||||
::
|
||||
:- %say
|
||||
|= $: [now=@da eny=@uvJ bec=beak]
|
||||
[arg=~ dry=? =veb]
|
||||
==
|
||||
::
|
||||
=/ peers-map
|
||||
.^((map ship ?(%alien %known)) %ax /(scot %p p.bec)//(scot %da now)/peers)
|
||||
::
|
||||
=/ peers=(list ship)
|
||||
%+ murn ~(tap by peers-map)
|
||||
|= [=ship val=?(%alien %known)]
|
||||
?: =(ship p.bec)
|
||||
~ :: this is weird, but we saw it
|
||||
?- val
|
||||
%alien ~
|
||||
%known (some ship)
|
||||
==
|
||||
::
|
||||
=; [[=subs =pags backward=@ forward=@] =naks]
|
||||
:- %tang %- flop
|
||||
%+ weld
|
||||
:~ leaf+"#{<~(wyt in naks)>} flows from %nacking %watches"
|
||||
leaf+"#{<backward>} backward flows with >10 retries"
|
||||
leaf+"#{<forward>} forward flows with >10 retries"
|
||||
leaf+"#{<(resubs subs veb)>} stale resubscriptions"
|
||||
==
|
||||
?. =(%21 veb) ~
|
||||
:- leaf+"----------------------------------"
|
||||
%+ turn %+ sort ~(tap by pags)
|
||||
|= [[* v=(list)] [* w=(list)]]
|
||||
(gth (lent v) (lent w))
|
||||
|= [app=term v=(list [dst=term =ship =path])]
|
||||
:- %leaf
|
||||
%+ weld "#{<(lent v)>} flows for {<app>} with >10 retries"
|
||||
?. =(1 (lent v)) ~
|
||||
?> ?=(^ v)
|
||||
" on {<ship.i.v>} to {<dst.i.v>} at {<path.i.v>}"
|
||||
::
|
||||
%+ roll peers
|
||||
|= [=ship [=subs p=pags b=@ f=@] n=naks]
|
||||
=+ .^ =ship-state:ames
|
||||
%ax /(scot %p p.bec)//(scot %da now)/peers/(scot %p ship)
|
||||
==
|
||||
=/ =peer-state:ames ?>(?=(%known -.ship-state) +.ship-state)
|
||||
::
|
||||
|^ [stale nacks]
|
||||
::
|
||||
++ stale
|
||||
%+ roll ~(tap by snd.peer-state)
|
||||
|= [[=bone message-pump-state:ames] subs=_subs pags=_p backward=_b forward=_f]
|
||||
=, packet-pump-state
|
||||
:- ?~ duct=(~(get by by-bone.ossuary.peer-state) bone) subs
|
||||
?. ?=([* [%gall %use sub=@ @ %out @ @ nonce=@ pub=@ *] *] u.duct)
|
||||
subs
|
||||
=/ =wire i.t.u.duct
|
||||
=/ nonce=(unit @) (rush i.t.t.t.t.t.t.t.i.t.u.duct dem)
|
||||
%- ~(add ja subs)
|
||||
:_ [ship bone ?~(nonce 0 u.nonce)] :: 0, 1?
|
||||
?~ nonce wire
|
||||
:: don't include the sub-nonce in the key
|
||||
::
|
||||
(weld (scag 7 wire) (slag 8 wire))
|
||||
%+ roll ~(tap in live)
|
||||
|= [[* [packet-state:ames *]] pags=_pags out=[b=_backward f=_forward]]
|
||||
::
|
||||
:: only forward flows
|
||||
::
|
||||
=? pags &(=(0 (end 0 bone)) (gth retries 10))
|
||||
?~ duct=(~(get by by-bone.ossuary.peer-state) bone)
|
||||
pags
|
||||
?. ?=([* [%gall %use sub=@ @ %out @ @ nonce=@ pub=@ *] *] u.duct)
|
||||
pags
|
||||
=/ =wire i.t.u.duct
|
||||
(~(add ja pags) (snag 2 wire) (snag 8 wire) ship (slag 9 wire))
|
||||
::
|
||||
~? &(=(%2 veb) (gth retries 10))
|
||||
=+ arrow=?:(=(0 (end 0 bone)) "<-" "->")
|
||||
"{arrow} ({(cite:title ship)}) bone #{<bone>}, retries: #{<retries>}"
|
||||
:- pags
|
||||
=? out (gth retries 10)
|
||||
?: =(0 (end 0 bone))
|
||||
[b.out +(f.out)]
|
||||
[+(b.out) f.out]
|
||||
out
|
||||
::
|
||||
++ nacks
|
||||
%+ roll ~(tap by rcv.peer-state)
|
||||
|= [[=bone *] nacks=_n]
|
||||
?. &(=(0 (end 0 bone)) =(1 (end 0 (rsh 0 bone))))
|
||||
:: not a naxplanation ack bone
|
||||
::
|
||||
nacks
|
||||
:: by only corking forward flows that have received
|
||||
:: a nack we avoid corking the current subscription
|
||||
::
|
||||
=+ target=(mix 0b10 bone)
|
||||
:: make sure that the nack bone has a forward flow
|
||||
::
|
||||
?~ duct=(~(get by by-bone.ossuary.peer-state) target)
|
||||
nacks
|
||||
?. ?=([* [%gall %use sub=@ @ %out @ @ nonce=@ pub=@ *] *] u.duct)
|
||||
nacks
|
||||
=/ =wire i.t.u.duct
|
||||
?> ?=([%gall %use sub=@ @ %out @ @ nonce=@ pub=@ *] wire)
|
||||
=/ app=term i.t.t.wire
|
||||
=/ nonce=@
|
||||
=- ?~(- 0 u.-)
|
||||
(rush i.t.t.t.t.t.t.t.wire dem)
|
||||
=/ =path t.t.t.t.t.t.t.t.wire
|
||||
~? =(%1 veb) "[bone={<target>} nonce={<nonce>} agent={<app>}] {<path>}"
|
||||
(~(put in nacks) [ship target])
|
||||
--
|
@ -245,6 +245,10 @@
|
||||
|= ~ =< abet
|
||||
(emit %pass /helm %arvo %a %stir '')
|
||||
::
|
||||
++ poke-kroc
|
||||
|= dry=? =< abet
|
||||
(emit [%pass /helm/kroc %arvo %a %kroc dry])
|
||||
::
|
||||
++ poke-knob
|
||||
|= [error-tag=@tas level=?(%hush %soft %loud)] =< abet
|
||||
(emit %pass /helm %arvo %d %knob error-tag level)
|
||||
@ -280,6 +284,7 @@
|
||||
%helm-ames-sift =;(f (f !<(_+<.f vase)) poke-ames-sift)
|
||||
%helm-ames-verb =;(f (f !<(_+<.f vase)) poke-ames-verb)
|
||||
%helm-ames-wake =;(f (f !<(_+<.f vase)) poke-ames-wake)
|
||||
%helm-kroc =;(f (f !<(_+<.f vase)) poke-kroc)
|
||||
%helm-atom =;(f (f !<(_+<.f vase)) poke-atom)
|
||||
%helm-automass =;(f (f !<(_+<.f vase)) poke-automass)
|
||||
%helm-cancel-automass =;(f (f !<(_+<.f vase)) poke-cancel-automass)
|
||||
|
@ -352,6 +352,7 @@
|
||||
:: %heed: track peer's responsiveness; gives %clog if slow
|
||||
:: %jilt: stop tracking peer's responsiveness
|
||||
:: %cork: request to delete message flow
|
||||
:: %kroc: request to delete stale message flows
|
||||
:: %plea: request to send message
|
||||
::
|
||||
:: System and Lifecycle Tasks
|
||||
@ -370,6 +371,7 @@
|
||||
[%heed =ship]
|
||||
[%jilt =ship]
|
||||
[%cork =ship]
|
||||
[%kroc dry=?]
|
||||
$>(%plea vane-task)
|
||||
::
|
||||
$>(%born vane-task)
|
||||
|
@ -1183,6 +1183,7 @@
|
||||
%vega on-vega:event-core
|
||||
%plea (on-plea:event-core [ship plea]:task)
|
||||
%cork (on-cork:event-core ship.task)
|
||||
%kroc (on-kroc:event-core dry.task)
|
||||
==
|
||||
::
|
||||
[moves ames-gate]
|
||||
@ -1448,6 +1449,7 @@
|
||||
~% %event-gate ..per-event ~
|
||||
|= [[now=@da eny=@ rof=roof] =duct =ames-state]
|
||||
=* veb veb.bug.ames-state
|
||||
=| cork-bone=(unit bone) :: modified by +on-kroc
|
||||
~% %event-core ..$ ~
|
||||
|%
|
||||
++ event-core .
|
||||
@ -1899,7 +1901,15 @@
|
||||
=/ =peer-state +.u.ship-state
|
||||
=/ =channel [[our ship] now channel-state -.peer-state]
|
||||
::
|
||||
=^ =bone ossuary.peer-state (bind-duct ossuary.peer-state duct)
|
||||
=/ [=bone ossuary=_ossuary.peer-state]
|
||||
?^ cork-bone [u.cork-bone ossuary.peer-state]
|
||||
(bind-duct ossuary.peer-state duct)
|
||||
=. ossuary.peer-state ossuary
|
||||
::
|
||||
?. (~(has by by-bone.ossuary.peer-state) bone)
|
||||
%. event-core
|
||||
%^ trace odd.veb ship
|
||||
|.("trying to cork {<bone=bone>}, not in the ossuary, ignoring")
|
||||
::
|
||||
=. closing.peer-state (~(put in closing.peer-state) bone)
|
||||
%- %^ trace msg.veb ship
|
||||
@ -1908,6 +1918,68 @@
|
||||
=/ rcvr [ship her-life.channel]
|
||||
"cork plea {<sndr rcvr bone=bone vane.plea path.plea>}"
|
||||
abet:(on-memo:(make-peer-core peer-state channel) bone plea %plea)
|
||||
:: +on-kroc: cork all flows from failed subscriptions
|
||||
::
|
||||
++ on-kroc
|
||||
|= dry=?
|
||||
^+ event-core
|
||||
::
|
||||
=; [corks=@ core=_event-core]
|
||||
?. dry core
|
||||
%.(core (slog leaf/"ames: #{<corks>} flows can be corked" ~))
|
||||
::
|
||||
%+ roll ~(tap by peers.ames-state)
|
||||
|= [[=ship =ship-state] corks=@ core=_event-core]
|
||||
?. ?=(%known -.ship-state)
|
||||
corks^core
|
||||
=/ =peer-state:ames ?>(?=(%known -.ship-state) +.ship-state)
|
||||
=/ subs=(jar path [bone sub-nonce=@])
|
||||
%+ roll ~(tap by snd.peer-state)
|
||||
|= [[=forward=bone *] subs=(jar path [bone sub-nonce=@])]
|
||||
?: (~(has in closing.peer-state) forward-bone)
|
||||
subs
|
||||
?~ duct=(~(get by by-bone.ossuary.peer-state) forward-bone)
|
||||
subs
|
||||
?. ?=([* [%gall %use sub=@ @ %out @ @ nonce=@ pub=@ *] *] u.duct)
|
||||
subs
|
||||
=/ =wire i.t.u.duct
|
||||
=/ nonce=(unit @) (rush (snag 7 wire) dem)
|
||||
%- ~(add ja subs)
|
||||
:: 0 for old pre-nonce subscriptions
|
||||
::
|
||||
:_ [forward-bone ?~(nonce 0 u.nonce)]
|
||||
?~ nonce wire
|
||||
:: don't include the sub-nonce in the key
|
||||
::
|
||||
(weld (scag 7 wire) (slag 8 wire))
|
||||
%+ roll ~(tap by subs)
|
||||
|= [[=wire flows=(list [bone sub-nonce=@])] corks=_corks core=_core]
|
||||
::
|
||||
%- tail
|
||||
%+ roll (sort flows |=([[@ n=@] [@ m=@]] (lte n m)))
|
||||
|= [[=bone nonce=@] resubs=_(lent flows) corks=_corks core=_core]
|
||||
=/ app=term ?>(?=([%gall %use sub=@ *] wire) i.t.t.wire)
|
||||
=/ =path (slag 7 wire)
|
||||
=/ log=tape "[bone={<bone>} agent={<app>} nonce={<nonce>}] {<path>}"
|
||||
=; corkable=?
|
||||
=? corks corkable +(corks)
|
||||
=? core &(corkable !dry) (%*(on-cork core cork-bone `bone) ship)
|
||||
(dec resubs)^corks^core
|
||||
:: checks if this is a stale re-subscription
|
||||
::
|
||||
?. =(resubs 1)
|
||||
%. &
|
||||
(trace &(dry odd.veb) ship |.((weld "stale %watch plea " log)))
|
||||
:: the current subscription can be safely corked if there
|
||||
:: is a flow with a naxplanation ack on a backward bone
|
||||
::
|
||||
=+ backward-bone=(mix 0b10 bone)
|
||||
?. =(2 (mod backward-bone 4))
|
||||
|
|
||||
?~ (~(get by rcv.peer-state) backward-bone)
|
||||
|
|
||||
%. &
|
||||
(trace &(dry odd.veb) ship |.((weld "failed %watch plea " log)))
|
||||
:: +on-take-wake: receive wakeup or error notification from behn
|
||||
::
|
||||
++ on-take-wake
|
||||
|
Loading…
Reference in New Issue
Block a user