ames: per-lane congestion control

This commit is contained in:
Liam Fitzgerald 2022-02-15 08:47:38 -06:00
parent b2051fcc2b
commit a36dc3aaff
3 changed files with 82 additions and 52 deletions

View File

@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1 version https://git-lfs.github.com/spec/v1
oid sha256:357e851fbd779f9d19c0bad88befcff118ce81cdfc7c3f05fddca1c434e3754a oid sha256:b82eedaf051560580868736e4fea667dc1ef5404f7fd694fe0c1d14886e50816
size 24334333 size 24661918

View File

@ -2,6 +2,7 @@
:: %lull: arvo structures :: %lull: arvo structures
:: ::
=> ..part => ..part
~% %lull ..part ~
|% |%
++ lull %330 ++ lull %330
:: :: :: :: :: ::
@ -960,7 +961,6 @@
$: order=(map path @ud) $: order=(map path @ud)
seq=@ud seq=@ud
keens=((mop @ud keen-state) lte) keens=((mop @ud keen-state) lte)
metrics=pump-metrics
== ==
+$ keen-state +$ keen-state
$: wan=(list want) :: request packets $: wan=(list want) :: request packets
@ -970,7 +970,7 @@
num-received=@ud num-received=@ud
next-wake=(unit @da) next-wake=(unit @da)
listeners=(set duct) listeners=(set duct)
last-sent=(list [@ud @da]) metrics=pump-metrics
== ==
+$ want +$ want
$: fra=@ud $: fra=@ud

View File

@ -1107,7 +1107,7 @@
%- ~(run by peers.old-state) %- ~(run by peers.old-state)
|= =ship-state |= =ship-state
?. ?=(%known -.ship-state) ship-state ?. ?=(%known -.ship-state) ship-state
ship-state(metrics.scry *pump-metrics) ship-state(scry *scry-state)
ames-gate(ames-state +.old-state) ames-gate(ames-state +.old-state)
:: ::
++ state-5-to-6 ++ state-5-to-6
@ -2541,14 +2541,8 @@
~ ~
`pe-core(ship s, peer u.sta) `pe-core(ship s, peer u.sta)
:: ::
++ pe-gauge
=| =bug
(make-pump-gauge now metrics.scry ship bug(ges.veb &))
::
++ pe-abet ++ pe-abet
^+ event-core ^+ event-core
:: ~& show:pe-gauge
::~& num-slots:pe-gauge
=. peers.ames-state =. peers.ames-state
(~(put by peers.ames-state) ship known/peer) (~(put by peers.ames-state) ship known/peer)
event-core event-core
@ -2598,16 +2592,35 @@
== ==
++ ke-core . ++ ke-core .
++ ke-abet ++ ke-abet
=. ke-core ke-set-wake ^+ pe-core
=/ gone=? =/ gone=?
=, keen =, keen
:: num-fragments is 0 when unknown (i.e. no response :: num-fragments is 0 when unknown (i.e. no response
:: yet) :: yet)
&(!=(0 num-fragments) =(num-fragments num-received)) &(!=(0 num-fragments) =(num-fragments num-received))
?: gone
ke-abet-gone
=. ke-core ke-set-wake
=. keens.scry =. keens.scry
?: gone +:(del:orm keens.scry keen-id)
(put:orm keens.scry keen-id keen) (put:orm keens.scry keen-id keen)
=? order.scry gone pe-core
::
++ ke-show
=, keen
:* wan=(lent wan)
nex=(lent nex)
hav=(lent hav)
num-fragments=num-fragments
num-received=num-received
next-wake=next-wake
metrics=metrics
==
::
++ ke-abet-gone
=. ke-core ke-set-wake
=. keens.scry
+:(del:orm keens.scry keen-id)
=. order.scry
(~(del by order.scry) path) (~(del by order.scry) path)
pe-core pe-core
:: ::
@ -2634,21 +2647,27 @@
:: ::
++ ke-on-ack ++ ke-on-ack
=| marked=(list want) =| marked=(list want)
=/ wan wan.keen
|= fra=@ud |= fra=@ud
=/ og ke-core =/ og ke-core
|- ^- [found=? _ke-core] |- ^- [found=? _ke-core]
?: =(~ wan.keen) [| og] ?: =(~ wan)
=^ =want wan.keen wan.keen ~& missing-ack/fra
[| og]
=^ =want wan wan
?: =(fra fra.want) ?: =(fra fra.want)
[& ke-core(wan.keen (welt marked wan.keen))] =. metrics.keen
(on-ack:ke-gauge +>.want)
[& ke-core(wan.keen (welt marked wan))]
=. skips.want +(skips.want) =. skips.want +(skips.want)
=^ resend=? metrics.scry =^ resend=? metrics.keen
(on-skipped-packet:ke-gauge +>.want) (on-skipped-packet:ke-gauge +>.want)
?. resend ?. resend
$(marked [want marked]) $(marked [want marked])
=. tries.want +(tries.want) =. tries.want +(tries.want)
=. last-sent.want now =. last-sent.want now
=. ke-core (ke-send [fra hoot]:want) =. ke-core
(ke-resend [fra hoot]:want)
$(marked [want marked]) $(marked [want marked])
:: ::
++ ke-start ++ ke-start
@ -2656,6 +2675,9 @@
?> =(num-fragments.keen 0) ?> =(num-fragments.keen 0)
=/ fra=@ 1 =/ fra=@ 1
=/ req (ke-encode-req fra) =/ req (ke-encode-req fra)
=/ =want [fra req now 1 0]
=. wan.keen ~[want]
=. metrics.keen (on-sent:ke-gauge 1)
=- ke-core(event-core -) =- ke-core(event-core -)
%- emit %- emit
[unix-duct.ames-state %give %send pe-lane `@ux`req] [unix-duct.ames-state %give %send pe-lane `@ux`req]
@ -2676,35 +2698,50 @@
%+ turn (gulf 1 siz.rawr) %+ turn (gulf 1 siz.rawr)
|= fra=@ud |= fra=@ud
^- want ^- want
[fra (ke-encode-req fra) now 1 0] [fra (ke-encode-req fra) now 0 0]
:: ::
%_ keen %_ keen
num-fragments siz.rawr num-fragments siz.rawr
wan paz wan paz
nex paz nex (tail paz)
== ==
:: +ke-continue: send packets according to normal congestion flow :: +ke-continue: send packets according to normal congestion flow
:: ::
++ ke-continue ++ ke-continue
=| inx=@ud =| inx=@ud
=| sent=(list @ud)
=/ max num-slots:ke-gauge =/ max num-slots:ke-gauge
|- ^+ ke-core |- ^+ ke-core
?: =(~ nex.keen) ke-core ?: |(=(~ nex.keen) =(inx max))
?: =(inx max) ke-core (ke-update-last-sent (flop sent))
=^ =want nex.keen nex.keen =^ =want nex.keen nex.keen
=. metrics.keen (on-sent:ke-gauge 1)
=. ke-core (ke-emit hoot.want) =. ke-core (ke-emit hoot.want)
$(inx +(inx)) $(inx +(inx), sent [fra.want sent])
:: ::
++ ke-send ++ ke-update-last-sent
!.
=| naw=(list want)
|= sent=(list @ud)
?: =(~ wan.keen)
?~ sent
=. wan.keen
(welt naw wan.keen)
ke-core
~& bad-update-fra/sent !!
=^ =want wan.keen wan.keen
?~ sent
=. wan.keen
(welt [want naw] wan.keen)
ke-core
?. =(i.sent fra.want)
$(naw [want naw])
=. last-sent.want now
=. tries.want +(tries.want)
$(sent t.sent, naw [want naw])
::
++ ke-resend
|= [fra=@ud =hoot] |= [fra=@ud =hoot]
=. metrics.scry (on-sent:ke-gauge 1)
=. nex.keen
=| xen=(list want)
|- ^+ nex.keen
=^ =want nex.keen nex.keen
?: =(fra fra.want)
(welt xen nex.keen)
$(xen [want xen])
(ke-emit hoot) (ke-emit hoot)
:: ::
++ ke-emit ++ ke-emit
@ -2723,15 +2760,6 @@
?> =((lent hav) num-received) ?> =((lent hav) num-received)
(decode-response-msg num-fragments hav) (decode-response-msg num-fragments hav)
:: ::
++ ke-update-congestion
!.
|= fra=@ud
^+ metrics.scry
?< =(~ wan.keen)
=^ =want wan.keen wan.keen
?. =(fra fra.want)
$
(on-ack:ke-gauge +>.want)
++ ke-is-dupe-ack ++ ke-is-dupe-ack
|= fra=@ud |= fra=@ud
^- ? ^- ?
@ -2745,9 +2773,10 @@
:: ::
++ ke-rcv ++ ke-rcv
|= [fra=@ud =purr =lane:ames] |= [fra=@ud =purr =lane:ames]
=/ =rawr (decode-response-packet purr)
^+ ke-core ^+ ke-core
=/ =rawr (decode-response-packet purr)
=/ og ke-core =/ og ke-core
=. pe-core (pe-update-qos %live last-contact=now)
:: handle empty :: handle empty
?: =(0 siz.rawr) ?: =(0 siz.rawr)
?> =(~ dat.rawr) ?> =(~ dat.rawr)
@ -2756,8 +2785,6 @@
~| wanted/+(num-received.keen) ~| wanted/+(num-received.keen)
?: (ke-is-dupe-ack fra) ke-core ?: (ke-is-dupe-ack fra) ke-core
:: update congestion, or fill details :: update congestion, or fill details
=? metrics.scry !=(0 num-fragments.keen)
(ke-update-congestion fra)
:: ::
=? ke-core =(0 num-fragments.keen) =? ke-core =(0 num-fragments.keen)
?> =(fra 1) ?> =(fra 1)
@ -2781,16 +2808,15 @@
ke-continue ke-continue
:: ::
++ ke-gauge ++ ke-gauge
=| =bug =* bug bug.ames-state
:: =. bug ges.veb.bug & (make-pump-gauge now metrics.keen ship bug)
(make-pump-gauge now metrics.scry ship bug)
:: ::
++ ke-timer-wire ++ ke-timer-wire
`wire`(welp /fine/behn/wake/(scot %p ship) path) `wire`(welp /fine/behn/wake/(scot %p ship) path)
:: ::
++ ke-pass-timer ++ ke-pass-timer
|= =note |= =note
ke-core(event-core (emit duct %pass ke-timer-wire note)) ke-core(event-core (emit unix-duct.ames-state %pass ke-timer-wire note))
:: ::
++ ke-wait |=(tim=@da (ke-pass-timer %b %wait tim)) ++ ke-wait |=(tim=@da (ke-pass-timer %b %wait tim))
++ ke-rest |=(tim=@da (ke-pass-timer %b %rest tim)) ++ ke-rest |=(tim=@da (ke-pass-timer %b %rest tim))
@ -2813,6 +2839,7 @@
:: ::
++ ke-take-wake ++ ke-take-wake
^+ ke-core ^+ ke-core
=. next-wake.keen ~
=. pe-core %- pe-update-qos =. pe-core %- pe-update-qos
=/ expiry=@da (add ~s30 last-contact.qos.peer) =/ expiry=@da (add ~s30 last-contact.qos.peer)
=? -.qos.peer =? -.qos.peer
@ -2827,13 +2854,13 @@
!=(%czar (clan:title ship)) !=(%czar (clan:title ship))
== ==
route.peer(direct.u %.n) route.peer(direct.u %.n)
=. metrics.keen on-timeout:ke-gauge
:: ::
=. metrics.scry on-timeout:ke-gauge
?> ?=(^ wan.keen) ?> ?=(^ wan.keen)
=: tries.i.wan.keen +(tries.i.wan.keen) =: tries.i.wan.keen +(tries.i.wan.keen)
last-sent.i.wan.keen now last-sent.i.wan.keen now
== ==
(ke-send [fra hoot]:i.wan.keen) (ke-resend [fra hoot]:i.wan.keen)
-- --
-- --
++ on-keen ++ on-keen
@ -2898,6 +2925,9 @@
|= [=wire error=(unit tang)] |= [=wire error=(unit tang)]
^+ event-core ^+ event-core
~| fine-on-take-wake/wire ~| fine-on-take-wake/wire
?^ error
%- (slog leaf/"bad wake" u.error)
event-core
:: TODO: handle error case :: TODO: handle error case
?> ?=([@ *] wire) ?> ?=([@ *] wire)
=/ =ship (slav %p i.wire) =/ =ship (slav %p i.wire)