diff --git a/pkg/arvo/gen/hood/close-flows.hoon b/pkg/arvo/gen/hood/close-flows.hoon index 8804c9e34..19ec1949f 100644 --- a/pkg/arvo/gen/hood/close-flows.hoon +++ b/pkg/arvo/gen/hood/close-flows.hoon @@ -4,6 +4,88 @@ :: To actually close the flows, run with |close-flows, =dry | :: :- %say -|= [^ arg=~ dry=?] +|= [[now=@da eny=@uvJ bec=beak] arg=~ peer=(unit @p) dry=? veb=?] :: -[%helm-ames-kroc dry] +=/ peers-map + .^((map ship ?(%alien %known)) %ax /(scot %p p.bec)//(scot %da now)/peers) +:: +=/ peers=(list ship) + %+ murn ~(tap by peers-map) + |= [=ship val=?(%alien %known)] + ?: =(ship p.bec) + ~ :: this is weird, but we saw it + ?- val + %alien ~ + %known (some ship) + == +:: +=; bones=(list [ship bone]) + :- %helm-ames-kroc + ~? dry "#{<(lent bones)>} flows can be closed" + dry^bones +:: +%+ roll peers +|= [=ship bones=(list [ship bone])] +?: &(?=(^ peer) !=(u.peer ship)) + bones +:: +=+ .^ =ship-state:ames + %ax /(scot %p p.bec)//(scot %da now)/peers/(scot %p ship) + == +=/ =peer-state:ames ?>(?=(%known -.ship-state) +.ship-state) +|^ +=/ subs=(jar path [bone sub-nonce=@]) resubscriptions +%+ roll ~(tap by subs) +|= [[=wire flows=(list [bone sub-nonce=@])] bones=_bones] +:: +%- tail +%+ roll flows +|= [[=bone nonce=@] resubs=_(lent flows) bones=_bones] +=/ app=term ?>(?=([%gall %use sub=@ *] wire) i.t.t.wire) +=/ =path (slag 7 wire) +=/ log=tape "[bone={} agent={} nonce={}] {}" +=; corkable=? + =? bones corkable [[ship bone] bones] + (dec resubs)^bones +:: checks if this is a stale re-subscription +:: +?. =(resubs 1) + ~? veb [ship (weld "stale %watch plea " log)] + & +:: the current subscription can be safely corked if there +:: is a flow with a naxplanation ack on a backward bone +:: +=+ backward-bone=(mix 0b10 bone) +?. =(%2 (mod backward-bone 4)) + | +?~ (~(get by rcv.peer-state) backward-bone) + | +~? veb [ship (weld "failed %watch plea " log)] +& +:: +++ resubscriptions + %+ roll ~(tap by snd.peer-state) + |= $: [=forward=bone message-pump-state:ames] + subs=(jar path [bone sub-nonce=@]) + == + ?: (~(has in closing.peer-state) forward-bone) + ~? veb + :- ship + %+ weld "stale flow bone={} in closing, " + "#{<~(wyt in live:packet-pump-state)>} packets retrying" + subs + ?~ duct=(~(get by by-bone.ossuary.peer-state) forward-bone) + subs + ?. ?=([* [%gall %use @ @ %out @ @ nonce=@ @ *] *] u.duct) + subs + =/ =wire i.t.u.duct + =/ nonce=(unit @) (slaw %ud &8.wire) + %- ~(add ja subs) + :: 0 for old pre-nonce subscriptions + :: + :_ [forward-bone ?~(nonce 0 u.nonce)] + ?~ nonce wire + :: don't include the sub-nonce in the key + :: + (weld (scag 7 wire) (slag 8 wire)) +-- diff --git a/pkg/arvo/lib/hood/helm.hoon b/pkg/arvo/lib/hood/helm.hoon index 399a17f7f..be88dd65a 100644 --- a/pkg/arvo/lib/hood/helm.hoon +++ b/pkg/arvo/lib/hood/helm.hoon @@ -246,8 +246,9 @@ (emit %pass /helm %arvo %a %stir '') :: ++ poke-ames-kroc - |= dry=? =< abet - (emit %pass /helm %arvo %a %kroc dry) + |= [dry=? bones=(list [ship bone])] =< abet + ?: dry this + (emit %pass /helm %arvo %a %kroc bones) :: ++ poke-ames-cong |= cong=[msg=@ud mem=@ud] =< abet diff --git a/pkg/arvo/sys/lull.hoon b/pkg/arvo/sys/lull.hoon index a70e98e4f..530e83e57 100644 --- a/pkg/arvo/sys/lull.hoon +++ b/pkg/arvo/sys/lull.hoon @@ -765,7 +765,7 @@ :: %heed: track peer's responsiveness; gives %clog if slow :: %jilt: stop tracking peer's responsiveness :: %cork: request to delete message flow - :: %kroc: request to delete stale message flows + :: %kroc: request to delete specific message flows, from their bones :: %plea: request to send message :: :: Remote Scry Tasks @@ -792,7 +792,7 @@ [%heed =ship] [%jilt =ship] [%cork =ship] - [%kroc dry=?] + [%kroc bones=(list [ship bone])] $>(%plea vane-task) :: [%keen spar] @@ -1190,6 +1190,7 @@ :: rto: retransmission timeout :: rtt: roundtrip time estimate, low-passed using EWMA :: rttvar: mean deviation of .rtt, also low-passed with EWMA + :: num-live: how many packets sent, awaiting ack :: ssthresh: slow-start threshold :: cwnd: congestion window; max unacked packets :: diff --git a/pkg/arvo/sys/vane/ames.hoon b/pkg/arvo/sys/vane/ames.hoon index d29227d34..be3d2ab98 100644 --- a/pkg/arvo/sys/vane/ames.hoon +++ b/pkg/arvo/sys/vane/ames.hoon @@ -1141,6 +1141,16 @@ num-live=@ud counter=@ud == +:: ++$ queued-event-14 + $% [%call =duct wrapped-task=(hobo task-14)] + [%take =wire =duct =sign] + == +:: ++$ task-14 + $% [%kroc dry=?] + $<(%kroc task) + == :: $bug: debug printing configuration :: :: veb: verbosity toggles @@ -1473,7 +1483,7 @@ == == $: %14 $% $: %larva - events=(qeu queued-event) + events=(qeu queued-event-14) state=ames-state-14 == [%adult state=ames-state-14] @@ -1600,7 +1610,7 @@ [%14 %larva *] ~> %slog.1^leaf/"ames: larva: load" =. cached-state `[%14 state.old] - =. queued-events events.old + =. queued-events (event-14-to-15 events.old) larval-gate :: [%15 %adult *] (load:adult-core %15 state.old) @@ -1625,6 +1635,18 @@ %= e wrapped-task ?.(?=(%snub -.task) task [%snub %deny ships.task]) == + :: + ++ event-14-to-15 + |= events=(qeu queued-event-14) + ^- (qeu queued-event) + %- ~(rep in events) + |= [e=queued-event-14 q=(qeu queued-event)] + %- ~(put to q) ^- queued-event + ?. ?=(%call -.e) e + =/ task=task-14 ((harden task-14) wrapped-task.e) + %= e + wrapped-task ?.(?=(%kroc -.task) task [%kroc ~]) + == -- :: +molt: re-evolve to adult-ames :: @@ -2200,74 +2222,11 @@ :: +on-kroc: cork all stale flows from failed subscriptions :: ++ on-kroc - |= dry=? + |= bones=(list [ship bone]) ^+ event-core - :: no-op - :: - ?: & %.(event-core (slog leaf/"ames: %kroc task not ready" ~)) - :: - =; [corks=@ core=_event-core] - ?. dry core - %.(core (slog leaf/"ames: #{} flows can be corked" ~)) - :: - %+ roll ~(tap by peers.ames-state) - |= [[=ship =ship-state] corks=@ core=_event-core] - ?. ?=(%known -.ship-state) - corks^core - =/ =peer-state:ames ?>(?=(%known -.ship-state) +.ship-state) - =/ subs=(jar path [bone sub-nonce=@]) - %+ roll ~(tap by snd.peer-state) - |= $: [=forward=bone message-pump-state:ames] - subs=(jar path [bone sub-nonce=@]) - == - ?: (~(has in closing.peer-state) forward-bone) - %. subs - %^ ev-trace &(dry odd.veb) ship - |. - %+ weld "stale flow bone={} in closing, " - "#{<~(wyt in live:packet-pump-state)>} packets retrying" - ?~ duct=(~(get by by-bone.ossuary.peer-state) forward-bone) - subs - ?. ?=([* [%gall %use sub=@ @ %out @ @ nonce=@ pub=@ *] *] u.duct) - subs - =/ =wire i.t.u.duct - =/ nonce=(unit @) (rush (snag 7 wire) dem) - %- ~(add ja subs) - :: 0 for old pre-nonce subscriptions - :: - :_ [forward-bone ?~(nonce 0 u.nonce)] - ?~ nonce wire - :: don't include the sub-nonce in the key - :: - (weld (scag 7 wire) (slag 8 wire)) - %+ roll ~(tap by subs) - |= [[=wire flows=(list [bone sub-nonce=@])] corks=_corks core=_core] - :: - %- tail - %+ roll (sort flows |=([[@ n=@] [@ m=@]] (lte n m))) - |= [[=bone nonce=@] resubs=_(lent flows) corks=_corks core=_core] - =/ app=term ?>(?=([%gall %use sub=@ *] wire) i.t.t.wire) - =/ =path (slag 7 wire) - =/ log=tape "[bone={} agent={} nonce={}] {}" - =; corkable=? - =? corks corkable +(corks) - =? core &(corkable !dry) (%*(on-cork core cork-bone `bone) ship) - (dec resubs)^corks^core - :: checks if this is a stale re-subscription - :: - ?. =(resubs 1) - %. & - (ev-trace &(dry odd.veb) ship |.((weld "stale %watch plea " log))) - :: the current subscription can be safely corked if there - :: is a flow with a naxplanation ack on a backward bone - :: - =+ backward-bone=(mix 0b10 bone) - ?. =(2 (mod backward-bone 4)) - | - ?~ (~(get by rcv.peer-state) backward-bone) - | - %. & - (ev-trace &(dry odd.veb) ship |.((weld "failed %watch plea " log))) + %+ roll bones + |= [[=ship =bone] co=_event-core] + (%*(on-cork co cork-bone `bone) ship) :: +on-take-wake: receive wakeup or error notification from behn :: ++ on-take-wake @@ -4677,7 +4636,7 @@ %vega on-vega:event-core %plea (on-plea:event-core [ship plea]:task) %cork (on-cork:event-core ship.task) - %kroc (on-kroc:event-core dry.task) + %kroc (on-kroc:event-core bones.task) :: %keen (on-keen:event-core +.task) %yawn (on-cancel-scry:event-core | +.task)