alef: reno-style congestion control (todo: misordered acks)

This commit is contained in:
Ted Blackman 2019-09-27 07:45:22 -04:00
parent 7f3917107b
commit 087adacc15

View File

@ -160,6 +160,24 @@
?: |(?=(~ rest.l) (mor key.n.a key.n.rest.l))
a(l rest.l)
rest.l(r a(r r.rest.l))
:: +del: delete .key from .a if it exists, producing value iff deleted
::
++ del
|= [a=(tree item) =key]
^- [(unit val) (tree item)]
::
?~ a [~ ~]
:: we found .key at the root; delete and rebalance
::
?: =(key key.n.a)
[`val.n.a (nip a)]
:: recurse left or right to find .key
::
?: (compare key key.n.a)
=+ [found lef]=$(a l.a)
[found a(l lef)]
=+ [found rig]=$(a r.a)
[found a(r rig)]
:: +nip: remove root; for internal use
::
++ nip
@ -524,11 +542,16 @@
queued-message-acks=(map message-num ok=?)
=packet-pump-state
==
+$ static-fragment
$: =message-num
num-fragments=fragment-num
=fragment-num
=fragment
==
:: $packet-pump-state: persistent state for |packet-pump
::
:: next-wake: last timer we've set, or null
:: live: packets in flight; sent but not yet acked
:: lost: packets to retry, since they timed out with no ack
:: metrics: congestion control information
::
+$ packet-pump-state
@ -536,38 +559,44 @@
live=(tree [live-packet-key live-packet-val])
metrics=pump-metrics
==
:: $pump-metrics: congestion control statistics for the |pump-gauge
:: $pump-metrics: congestion control state for a |packet-pump
::
:: num-live: number of sent packets in flight
:: num-lost: number of expired packets
:: last-sent-at: last date at which we sent a packet
:: last-dead-at: most recently packet expiry
:: rtt: roundtrip time estimate
:: max-live: current window size
:: This is an Ames adaptation of TCP's Reno congestion control
:: algorithm. The information signals and their responses are
:: identical to Reno's; the implementation differs because Ames
:: acknowledgments differ from TCP's and because we're using
:: functional data structures.
::
:: If .skips reaches 3, we perform a fast retransmit and fast
:: recovery. This corresponds to Reno's handling of "three duplicate
:: acks".
::
:: rto: retransmission timeout
:: rtt: roundtrip time estimate, low-passed using EWMA
:: rttvar: mean deviation of .rtt, also low-passed with EWMA
:: num-live: how many packets sent, awaiting ack
:: ssthresh: slow-start threshold
:: cwnd: congestion window; max unacked packets
:: skips: how many misordered acks we've received
::
+$ pump-metrics
$: num-live=@ud
last-sent-at=@da
$: rto=_~s1
rtt=_~s1
max-live=_2
skipped=@ud
rttvar=_~s1
ssthresh=_1.000.000
cwnd=_1
num-live=@ud
num-skips=@ud
==
+$ live-packet-key [=message-num =fragment-num]
+$ live-packet-val
$: sent-packet-state
$: packet-state
num-fragments=fragment-num
=fragment
==
+$ sent-packet-state
$: expiry=@da
sent-date=@da
retried=?
==
+$ static-fragment
$: =message-num
num-fragments=fragment-num
=fragment-num
=fragment
+$ packet-state
$: last-sent=@da
retries=@ud
==
:: $message-still-state: state of |message-still to assemble messages
::
@ -1706,6 +1735,7 @@
^+ peer-core
::
=/ =wire (make-pump-timer-wire her.channel bone)
=/ duct ~[/ames]
(emit duct %pass wire %b %wait date)
:: +on-pump-rest: relay |message-pump's unset-timer request
::
@ -1714,6 +1744,7 @@
^+ peer-core
::
=/ =wire (make-pump-timer-wire her.channel bone)
=/ duct ~[/ames]
(emit duct %pass wire %b %rest date)
--
:: +run-message-still: process $message-still-task and its effects
@ -2070,29 +2101,29 @@
?- -.task
%hear (on-hear [message-num fragment-num]:task)
%done (on-done message-num.task)
%wake resend-lost(next-wake.state ~)
%wake on-wake
%halt set-wake
==
:: +resend-lost: resend as many lost packets as .gauge will allow
:: +on-wake: handle packet timeout
::
++ resend-lost
++ on-wake
^+ packet-pump
:: assert temporal coherence
::
=- =. packet-pump core.-
=. live.state live.-
~? !=(0 num-sent.-) %resent-lost^num-sent.-
?< =(~ next-wake.state)
?> (gte now.channel (need next-wake.state))
=. next-wake.state ~
:: tell congestion control a packet timed out
::
=. metrics.state on-timeout:gauge
:: re-send first packet and update its state in-place
::
=- =. live.state live.-
=. packet-pump (give %send static-fragment.-)
packet-pump
:: acc: state to thread through traversal
::
:: num-slots: start with max retries; decrement on each resend
::
=| $= acc
$: num-slots=_num-retry-slots:gauge
num-sent=@ud
core=_packet-pump
==
::
^+ [acc live=live.state]
=| acc=static-fragment
^+ [static-fragment=acc live=live.state]
::
%^ (traverse:packet-queue _acc) live.state acc
|= $: acc=_acc
@ -2100,37 +2131,15 @@
val=live-packet-val
==
^- [new-val=(unit live-packet-val) stop=? _acc]
:: load mutant environment
:: packet has expired; update it in-place, stop, and produce it
::
=. packet-pump core.acc
:: if we can't send any more packets, we're done
::
?: =(0 num-slots.acc)
[`val stop=%.y acc]
:: if the packet hasn't expired, we're done
::
?: (gte expiry.val now.channel)
[`val stop=%.y acc]
:: packet has expired so re-send it
=. last-sent.val now.channel
=. retries.val +(retries.val)
::
=/ =static-fragment
=> [key val]
[message-num num-fragments fragment-num fragment]
[message-num num-fragments fragment-num fragment]:[key val]
::
=. packet-pump (give %send static-fragment)
=. metrics.state (on-resent:gauge -.val)
:: update $sent-packet-state in .val and continue
::
=. expiry.val (next-retry-expiry:gauge -.val)
=. sent-date.val now.channel
=. retried.val %.y
:: update .acc, writing back .packet-pump
::
=. num-sent.acc +(num-sent.acc)
=. num-slots.acc (dec num-slots.acc)
=. core.acc packet-pump
::
[`val stop=%.n acc]
[`val stop=%.y static-fragment]
:: +feed: try to send a list of packets, returning unsent and effects
::
++ feed
@ -2138,13 +2147,9 @@
^+ [fragments gifts state]
:: return unsent back to caller and reverse effects to finalize
::
=- ::::::::~&~&~& %ames-feed^(lent fragments)^%unsent^(lent unsent)
[unsent (flop gifts) state]
=- [unsent (flop gifts) state]
::
^+ [unsent=fragments packet-pump]
:: resend lost packets first, possibly adjusting congestion control
::
=. packet-pump resend-lost
:: bite off as many fragments as we can send
::
=/ num-slots num-slots:gauge
@ -2164,9 +2169,7 @@
^- [key=live-packet-key val=live-packet-val]
::
:- [message-num fragment-num]
:- :+ expiry=next-expiry:gauge
sent-date=now.channel
retried=%.n
:- [sent-date=now.channel retries=0]
[num-fragments fragment]
:: update .live and .metrics
::
@ -2179,60 +2182,27 @@
::
|- ^+ packet-pump
?~ sent packet-pump
::::::~&~&~& %sent^[message-num fragment-num]:i.sent
::
=. packet-pump (give %send i.sent)
$(sent t.sent)
:: +on-hear: handle ack on a live packet
::
:: Traverse .live from the head, marking packets as lost until we
:: find the acked packet. Then delete the acked packet and try to
:: resend lost packets.
::
:: If we don't find the acked packet, no-op: no mutations, effects,
:: or resending of lost packets.
:: If the packet was in our queue, delete it and update our
:: metrics. Otherwise, no-op.
::
++ on-hear
|= [=message-num =fragment-num]
|= key=live-packet-key
^+ packet-pump
::
=- :: if no sent packet matches the ack, don't apply mutations or effects
:: TODO handle misordered ack
=^ packet=(unit live-packet-val) live.state
(del:packet-queue live.state key)
::
?. found.-
::~> %slog.0^leaf/"ames: hear: no-op {(scow %ud message-num)} {(scow %ud fragment-num)}"
?~ packet
packet-pump
::::::~&~&~& %ames-hear-ack^message-num^fragment-num
::
=. metrics.state metrics.-
=. live.state live.-
resend-lost
::
^- $: [found=? metrics=pump-metrics]
live=(tree [live-packet-key live-packet-val])
==
::
=/ acc=[found=? metrics=pump-metrics] [%.n metrics.state]
::
%^ (traverse:packet-queue _acc) live.state acc
|= $: acc=_acc
key=live-packet-key
val=live-packet-val
==
^- [new-val=(unit live-packet-val) stop=? _acc]
::
=/ gauge (make-pump-gauge now.channel metrics.acc)
:: is this the acked packet?
::
?: =(key [message-num fragment-num])
:: delete acked packet, update metrics, and stop traversal
::
:+ new-val=~
stop=%.y
[found=%.y metrics=(on-ack:gauge -.val)]
:: ack was out of order; mark expired, tell gauge, and continue
::
:+ new-val=`val(expiry `@da`0)
stop=%.n
[found=%.n metrics=(on-skipped-packet:gauge -.val)]
=. metrics.state (on-ack:gauge -.u.packet)
packet-pump
:: +on-done: apply ack to all packets from .message-num
::
++ on-done
@ -2242,8 +2212,7 @@
=- =. metrics.state metrics.-
=. live.state live.-
::
::::::~&~&~& %done^metrics.state
resend-lost
packet-pump
::
^- $: metrics=pump-metrics
live=(tree [live-packet-key live-packet-val])
@ -2258,9 +2227,10 @@
::
=/ gauge (make-pump-gauge now.channel metrics)
:: if ack was out of order, mark expired and continue
:: TODO redo skipped packet logic
::
?: (lth message-num.key message-num)
:+ new-val=`val(expiry `@da`0)
:+ new-val=`val
stop=%.n
metrics=(on-skipped-packet:gauge -.val)
:: if packet was from acked message, delete it and continue
@ -2279,7 +2249,7 @@
=/ new-wake=(unit @da)
?~ head=(peek:packet-queue live.state)
~
`expiry.val.u.head
`next-expiry:gauge
:: no-op if no change
::
?: =(new-wake next-wake.state) packet-pump
@ -2305,87 +2275,93 @@
|%
:: +next-expiry: when should a newly sent fresh packet time out?
::
:: Use rtt + 4*sigma, where sigma is the mean deviation of rtt.
:: This should make it unlikely that a packet would time out from a
:: delay, as opposed to an actual packet loss.
::
++ next-expiry
^- @da
(add now (mul 2 rtt))
:: +next-retry-expiry: when should a resent packet time out?
::
++ next-retry-expiry
|= sent-packet-state
^- @da
next-expiry
:: +has-slot: can we send a packet right now?
::
++ has-slot
^- ?
(gth num-slots 0)
(add now rto)
:: +num-slots: how many packets can we send right now?
::
++ num-slots
^- @ud
?. (gth max-live num-live)
0
(sub max-live num-live)
:: +num-retry-slots: how many lost packets can we resend right now?
::
++ num-retry-slots
^- @ud
max-live
:: +on-skipped-packet: adjust metrics based on a misordered ack
::
++ on-skipped-packet
|= sent-packet-state
^- pump-metrics
::
%_ metrics
skipped +(skipped)
==
:: +on-ack: adjust metrics based on a packet getting acknowledged
::
++ on-ack
|= sent-packet-state
^- pump-metrics
::
=? metrics (gth skipped 0)
::::::~&~&~& %skipped^skipped
%_ metrics
skipped 0
==
::
%_ metrics
num-live (dec num-live)
max-live +(max-live)
rtt (smooth-rtt-since sent-date)
==
(sub-safe cwnd num-live)
:: +on-sent: adjust metrics based on sending .num-sent fresh packets
::
++ on-sent
|= num-sent=@ud
^- pump-metrics
::
%_ metrics
last-sent-at now
num-live (add num-sent num-live)
==
:: +on-resent: adjust metrics based on retrying an expired packet
=. num-live (add num-live num-sent)
?> (lte num-live cwnd)
metrics
:: +on-ack: adjust metrics based on a packet getting acknowledged
::
++ on-resent
|= sent-packet-state
++ on-ack
|= =packet-state
^- pump-metrics
::
%_ metrics
last-sent-at now
max-live (max 1 (div max-live 2))
rtt (smooth-rtt-since sent-date)
==
:: +smooth-rtt-since: calculate new low-passed roundtrip time
=. num-live (dec num-live)
:: if below congestion threshold, add 1; else, add avg. 1 / cwnd
::
++ smooth-rtt-since
|= start=@da
%+ min ~s30
=- (div - 4)
%+ add (mul 3 rtt)
(sub now start)
=. cwnd
?: in-slow-start
+(cwnd)
(add cwnd !=(0 (mod (mug now) cwnd)))
:: if this was a re-send, don't adjust rtt or downstream state
::
?. =(0 retries.packet-state)
metrics
:: rtt-datum: new rtt measurement based on this packet roundtrip
::
=/ rtt-datum=@dr (sub-safe now last-sent.packet-state)
:: rtt-error: difference between this rtt measurement and expected
::
=/ rtt-error=@dr
?: (gte rtt-datum rtt)
(sub rtt-datum rtt)
(sub rtt rtt-datum)
:: exponential weighting ratio for .rtt and .rttvar
::
=. rtt (div (add rtt-datum (mul rtt 7)) 8)
=. rttvar (div (add rtt-error (mul rttvar 7)) 8)
=. rto (clamp-rto (add rtt (mul 4 rttvar)))
::
metrics
:: +on-skipped-packet: TODO
::
++ on-skipped-packet
|= packet-state
^- pump-metrics
metrics
:: +on-timeout: (re)enter slow-start mode on packet loss
::
++ on-timeout
^- pump-metrics
::
=: ssthresh (div cwnd 2)
cwnd 1
rto (clamp-rto (mul rto 2))
==
metrics
:: +clamp-rto: apply min and max to an .rto value
::
++ clamp-rto
|= rto=@dr
^+ rto
(min ~m2 (max ^~((div ~s1 5)) rto))
:: +in-slow-start: produces %.y iff we're in "slow-start" mode
::
++ in-slow-start
^- ?
(lth cwnd ssthresh)
:: +sub-safe: subtract with underflow protection
::
++ sub-safe
|= [a=@ b=@]
^- @
?:((lte a b) 0 (sub a b))
--
:: +make-message-still: construct |message-still message receiver core
::