diff --git a/pkg/arvo/gen/hood/ames-prod.hoon b/pkg/arvo/gen/hood/ames-prod.hoon new file mode 100644 index 000000000..4641f430b --- /dev/null +++ b/pkg/arvo/gen/hood/ames-prod.hoon @@ -0,0 +1,4 @@ +:- %say +|= [^ ships=(list ship) ~] +:- %helm-ames-prod +ships diff --git a/pkg/arvo/lib/hood/helm.hoon b/pkg/arvo/lib/hood/helm.hoon index 0f71669d6..4579ec4a7 100644 --- a/pkg/arvo/lib/hood/helm.hoon +++ b/pkg/arvo/lib/hood/helm.hoon @@ -142,6 +142,10 @@ !! abet:(flog %text "< {}: {(trip mes)}") :: +++ poke-ames-prod + |= ships=(list ship) + abet:(emit %pass /helm/prod %arvo %a %prod ships) +:: ++ poke-atom |= ato=@ =+ len=(scow %ud (met 3 ato)) @@ -195,6 +199,7 @@ ++ poke |= [=mark =vase] ?+ mark ~|([%poke-helm-bad-mark mark] !!) + %helm-ames-prod =;(f (f !<(_+<.f vase)) poke-ames-prod) %helm-ames-sift =;(f (f !<(_+<.f vase)) poke-ames-sift) %helm-ames-verb =;(f (f !<(_+<.f vase)) poke-ames-verb) %helm-ames-wake =;(f (f !<(_+<.f vase)) poke-ames-wake) diff --git a/pkg/arvo/sys/lull.hoon b/pkg/arvo/sys/lull.hoon index e20f41ec9..17777cc84 100644 --- a/pkg/arvo/sys/lull.hoon +++ b/pkg/arvo/sys/lull.hoon @@ -357,6 +357,7 @@ :: :: %born: process restart notification :: %init: vane boot + :: %prod: re-send a packet per flow, to all peers if .ships is ~ :: %sift: limit verbosity to .ships :: %spew: set verbosity toggles :: %trim: release memory @@ -370,6 +371,7 @@ :: $>(%born vane-task) $>(%init vane-task) + [%prod ships=(list ship)] [%sift ships=(list ship)] [%spew veb=(list verb)] [%stir arg=@t] diff --git a/pkg/arvo/sys/vane/ames.hoon b/pkg/arvo/sys/vane/ames.hoon index fb09a2ebe..1d658e80e 100644 --- a/pkg/arvo/sys/vane/ames.hoon +++ b/pkg/arvo/sys/vane/ames.hoon @@ -654,12 +654,14 @@ :: %memo: packetize and send application-level message :: %hear: handle receipt of ack on fragment or message :: %near: handle receipt of naxplanation +:: $prod: reset congestion control :: %wake: handle timer firing :: +$ message-pump-task $% [%memo =message-blob] [%hear =message-num =ack-meat] [%near =naxplanation] + [%prod ~] [%wake ~] == :: $message-pump-gift: effect from |message-pump @@ -681,12 +683,14 @@ :: %done: deal with message acknowledgment :: %halt: finish event, possibly updating timer :: %wake: handle timer firing +:: %prod: reset congestion control :: +$ packet-pump-task $% [%hear =message-num =fragment-num] [%done =message-num lag=@dr] [%halt ~] [%wake current=message-num] + [%prod ~] == :: $packet-pump-gift: effect from |packet-pump :: @@ -898,6 +902,7 @@ %heed (on-heed:event-core ship.task) %init on-init:event-core %jilt (on-jilt:event-core ship.task) + %prod (on-prod:event-core ships.task) %sift (on-sift:event-core ships.task) %spew (on-spew:event-core veb.task) %stir (on-stir:event-core arg.task) @@ -1158,6 +1163,29 @@ %rot acc(rot %.y) == event-core + :: +on-prod: re-send a packet per flow to each of .ships + :: + ++ on-prod + |= ships=(list ship) + ^+ event-core + =? ships =(~ ships) ~(tap in ~(key by peers.ames-state)) + |^ ^+ event-core + ?~ ships event-core + $(ships t.ships, event-core (prod-peer i.ships)) + :: + ++ prod-peer + |= her=ship + ^+ event-core + =/ par (get-peer-state her) + ?~ par event-core + =/ =channel [[our her] now channel-state -.u.par] + =/ peer-core (make-peer-core u.par channel) + =/ bones ~(tap in ~(key by snd.u.par)) + |- ^+ event-core + ?~ bones abet:peer-core + =. peer-core (run-message-pump:peer-core i.bones %prod ~) + $(bones t.bones) + -- :: +on-stir: start timers for any flow that lack them :: :: .arg is unused, meant to ease future debug commands @@ -2348,6 +2376,7 @@ ^+ message-pump :: ?- -.task + %prod (run-packet-pump %prod ~) %memo (on-memo message-blob.task) %wake (run-packet-pump %wake current.state) %hear @@ -2566,8 +2595,29 @@ %hear (on-hear [message-num fragment-num]:task) %done (on-done message-num.task) %wake (on-wake current.task) + %prod on-prod %halt set-wake == + :: +on-prod: reset congestion control, re-send packets + :: + ++ on-prod + ^+ packet-pump + ?: =(~ next-wake.state) + packet-pump + :: + =. metrics.state %*(. *pump-metrics counter counter.metrics.state) + =. live.state + %+ run:packet-queue live.state + |=(p=live-packet-val p(- *packet-state)) + :: + =/ sot (max 1 num-slots:gauge) + =/ liv live.state + |- ^+ packet-pump + ?: =(0 sot) packet-pump + ?: =(~ liv) packet-pump + =^ hed liv (pop:packet-queue live.state) + =. packet-pump (give %send (to-static-fragment hed)) + $(sot (dec sot)) :: +on-wake: handle packet timeout :: ++ on-wake @@ -2576,7 +2626,6 @@ :: assert temporal coherence :: ?< =(~ next-wake.state) - ?> (gte now.channel (need next-wake.state)) =. next-wake.state ~ :: tell congestion control a packet timed out ::