ames: |ames-prod to reset congestion control

This commit is contained in:
Ted Blackman 2021-08-28 22:39:57 +03:00
parent f6d29fb630
commit e2fd0b61e4
4 changed files with 61 additions and 1 deletions

View File

@ -0,0 +1,4 @@
:- %say
|= [^ ships=(list ship) ~]
:- %helm-ames-prod
ships

View File

@ -142,6 +142,10 @@
!!
abet:(flog %text "< {<src.bowl>}: {(trip mes)}")
::
++ poke-ames-prod
|= ships=(list ship)
abet:(emit %pass /helm/prod %arvo %a %prod ships)
::
++ poke-atom
|= ato=@
=+ len=(scow %ud (met 3 ato))
@ -195,6 +199,7 @@
++ poke
|= [=mark =vase]
?+ mark ~|([%poke-helm-bad-mark mark] !!)
%helm-ames-prod =;(f (f !<(_+<.f vase)) poke-ames-prod)
%helm-ames-sift =;(f (f !<(_+<.f vase)) poke-ames-sift)
%helm-ames-verb =;(f (f !<(_+<.f vase)) poke-ames-verb)
%helm-ames-wake =;(f (f !<(_+<.f vase)) poke-ames-wake)

View File

@ -357,6 +357,7 @@
::
:: %born: process restart notification
:: %init: vane boot
:: %prod: re-send a packet per flow, to all peers if .ships is ~
:: %sift: limit verbosity to .ships
:: %spew: set verbosity toggles
:: %trim: release memory
@ -370,6 +371,7 @@
::
$>(%born vane-task)
$>(%init vane-task)
[%prod ships=(list ship)]
[%sift ships=(list ship)]
[%spew veb=(list verb)]
[%stir arg=@t]

View File

@ -654,12 +654,14 @@
:: %memo: packetize and send application-level message
:: %hear: handle receipt of ack on fragment or message
:: %near: handle receipt of naxplanation
:: $prod: reset congestion control
:: %wake: handle timer firing
::
+$ message-pump-task
$% [%memo =message-blob]
[%hear =message-num =ack-meat]
[%near =naxplanation]
[%prod ~]
[%wake ~]
==
:: $message-pump-gift: effect from |message-pump
@ -681,12 +683,14 @@
:: %done: deal with message acknowledgment
:: %halt: finish event, possibly updating timer
:: %wake: handle timer firing
:: %prod: reset congestion control
::
+$ packet-pump-task
$% [%hear =message-num =fragment-num]
[%done =message-num lag=@dr]
[%halt ~]
[%wake current=message-num]
[%prod ~]
==
:: $packet-pump-gift: effect from |packet-pump
::
@ -898,6 +902,7 @@
%heed (on-heed:event-core ship.task)
%init on-init:event-core
%jilt (on-jilt:event-core ship.task)
%prod (on-prod:event-core ships.task)
%sift (on-sift:event-core ships.task)
%spew (on-spew:event-core veb.task)
%stir (on-stir:event-core arg.task)
@ -1158,6 +1163,29 @@
%rot acc(rot %.y)
==
event-core
:: +on-prod: re-send a packet per flow to each of .ships
::
++ on-prod
|= ships=(list ship)
^+ event-core
=? ships =(~ ships) ~(tap in ~(key by peers.ames-state))
|^ ^+ event-core
?~ ships event-core
$(ships t.ships, event-core (prod-peer i.ships))
::
++ prod-peer
|= her=ship
^+ event-core
=/ par (get-peer-state her)
?~ par event-core
=/ =channel [[our her] now channel-state -.u.par]
=/ peer-core (make-peer-core u.par channel)
=/ bones ~(tap in ~(key by snd.u.par))
|- ^+ event-core
?~ bones abet:peer-core
=. peer-core (run-message-pump:peer-core i.bones %prod ~)
$(bones t.bones)
--
:: +on-stir: start timers for any flow that lack them
::
:: .arg is unused, meant to ease future debug commands
@ -2348,6 +2376,7 @@
^+ message-pump
::
?- -.task
%prod (run-packet-pump %prod ~)
%memo (on-memo message-blob.task)
%wake (run-packet-pump %wake current.state)
%hear
@ -2566,8 +2595,29 @@
%hear (on-hear [message-num fragment-num]:task)
%done (on-done message-num.task)
%wake (on-wake current.task)
%prod on-prod
%halt set-wake
==
:: +on-prod: reset congestion control, re-send packets
::
++ on-prod
^+ packet-pump
?: =(~ next-wake.state)
packet-pump
::
=. metrics.state %*(. *pump-metrics counter counter.metrics.state)
=. live.state
%+ run:packet-queue live.state
|=(p=live-packet-val p(- *packet-state))
::
=/ sot (max 1 num-slots:gauge)
=/ liv live.state
|- ^+ packet-pump
?: =(0 sot) packet-pump
?: =(~ liv) packet-pump
=^ hed liv (pop:packet-queue live.state)
=. packet-pump (give %send (to-static-fragment hed))
$(sot (dec sot))
:: +on-wake: handle packet timeout
::
++ on-wake
@ -2576,7 +2626,6 @@
:: assert temporal coherence
::
?< =(~ next-wake.state)
?> (gte now.channel (need next-wake.state))
=. next-wake.state ~
:: tell congestion control a packet timed out
::