ames: continue processing memos after %done

This commit is contained in:
Philip Monk 2019-11-27 10:22:20 -08:00
parent 7ddd1225e0
commit 74b0f66850
No known key found for this signature in database
GPG Key ID: B66E1F02604E44EC
5 changed files with 77 additions and 33 deletions

View File

@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1 version https://git-lfs.github.com/spec/v1
oid sha256:2ae2f7487f7cade8f01733929bcc595c580f3e6e06d8b38a6ce2674799515b7b oid sha256:3148fd26f7db5aa846a1f6c66510f6a7cdf774c6b4f9cd5d09ffaad54f6a1887
size 10123015 size 10044742

View File

@ -3,7 +3,7 @@
:: allow sending chat messages to foreign paths based on write perms :: allow sending chat messages to foreign paths based on write perms
:: ::
/- *permission-store, *chat-hook, *invite-store /- *permission-store, *chat-hook, *invite-store
/+ *chat-json, default-agent /+ *chat-json, default-agent, verb
|% |%
+$ card card:agent:gall +$ card card:agent:gall
:: ::
@ -31,6 +31,7 @@
-- --
=| state-zero =| state-zero
=* state - =* state -
%+ verb |
^- agent:gall ^- agent:gall
=< =<
|_ bol=bowl:gall |_ bol=bowl:gall

View File

@ -1,7 +0,0 @@
:: Helm: bonk ames
::
:::: /hoon/bonk/hood/gen
::
/? 310
:- %say
|=({^ ~ ~} helm-bonk+~)

View File

@ -91,7 +91,6 @@
:~ %lens :~ %lens
%clock %clock
%dojo %dojo
%modulo
%launch %launch
%publish %publish
%weather %weather

View File

@ -62,9 +62,9 @@
=> =>
=/ veb =/ veb
:* pak=%.n :* pak=%.n
snd=%.n snd=%.y
rcv=%.n rcv=%.y
odd=%.n odd=%.y
msg=%.y msg=%.y
== ==
|% |%
@ -2210,14 +2210,7 @@
:: ::
++ packet-queue ++ packet-queue
%- (ordered-map live-packet-key live-packet-val) %- (ordered-map live-packet-key live-packet-val)
|= [a=live-packet-key b=live-packet-key] lte-packets
^- ?
::
?: (lth message-num.a message-num.b)
%.y
?: (gth message-num.a message-num.b)
%.n
(lte fragment-num.a fragment-num.b)
:: +gauge: inflate a |pump-gauge to track congestion control :: +gauge: inflate a |pump-gauge to track congestion control
:: ::
++ gauge (make-pump-gauge now.channel metrics.state) ++ gauge (make-pump-gauge now.channel metrics.state)
@ -2253,7 +2246,8 @@
=. live.state live.res =. live.state live.res
=. packet-pump (give %send static-fragment.res) =. packet-pump (give %send static-fragment.res)
%- %+ trace snd.veb %- %+ trace snd.veb
|.("dead {<fragment-num.static-fragment.res^show:gauge>}") =/ nums [message-num fragment-num]:static-fragment.res
|.("dead {<nums^show:gauge>}")
packet-pump packet-pump
:: ::
=| acc=static-fragment =| acc=static-fragment
@ -2314,6 +2308,42 @@
%+ roll sent %+ roll sent
|= [packet=static-fragment core=_packet-pump] |= [packet=static-fragment core=_packet-pump]
(give:core %send packet) (give:core %send packet)
:: +fast-resend-after-ack: resend timed out packets
::
:: After we finally receive an ack, we want to resend all the live
:: packets that have been building up.
::
++ fast-resend-after-ack
|= [=message-num =fragment-num]
^+ packet-pump
=; res=[resends=(list static-fragment) live=_live.state]
~& > resends=resends.res
=. live.state live.res
%+ reel resends.res
|= [packet=static-fragment core=_packet-pump]
(give:core %send packet)
::
=/ gauge (make-pump-gauge now.channel metrics.state)
=/ acc
resends=*(list static-fragment)
::
%^ (traverse:packet-queue _acc) live.state acc
|= $: acc=_acc
key=live-packet-key
val=live-packet-val
==
^- [new-val=(unit live-packet-val) stop=? _acc]
?: (lte-packets key [message-num fragment-num])
~& %no-resend
[new-val=`val stop=%.n acc]
::
~& > [next=(next-expiry:gauge key val) now=now.channel]
?: (gth (next-expiry:gauge key val) now.channel)
[new-val=`val stop=%.y acc]
::
=. last-sent.val now.channel
=. resends.acc [(to-static-fragment key val) resends.acc]
[new-val=`val stop=%.n acc]
:: +on-hear: handle ack on a live packet :: +on-hear: handle ack on a live packet
:: ::
:: If the packet was in our queue, delete it and update our :: If the packet was in our queue, delete it and update our
@ -2339,9 +2369,11 @@
|.("{<[fragment-num show:gauge]>}") |.("{<[fragment-num show:gauge]>}")
:: .resends is backward, so fold backward and emit :: .resends is backward, so fold backward and emit
:: ::
=. packet-pump
%+ reel resends.- %+ reel resends.-
|= [packet=static-fragment core=_packet-pump] |= [packet=static-fragment core=_packet-pump]
(give:core %send packet) (give:core %send packet)
(fast-resend-after-ack message-num fragment-num)
:: ::
=/ acc =/ acc
:* found=`?`%.n :* found=`?`%.n
@ -2388,7 +2420,7 @@
=. live.state live.- =. live.state live.-
:: ::
%- (trace snd.veb |.("done {<message-num^show:gauge>}")) %- (trace snd.veb |.("done {<message-num^show:gauge>}"))
packet-pump (fast-resend-after-ack message-num `fragment-num`0)
:: ::
^+ [metrics=metrics.state live=live.state] ^+ [metrics=metrics.state live=live.state]
:: ::
@ -2429,7 +2461,7 @@
=/ new-wake=(unit @da) =/ new-wake=(unit @da)
?~ head=(peek:packet-queue live.state) ?~ head=(peek:packet-queue live.state)
~ ~
`next-expiry:gauge `(next-expiry:gauge u.head)
:: no-op if no change :: no-op if no change
:: ::
?: =(new-wake next-wake.state) packet-pump ?: =(new-wake next-wake.state) packet-pump
@ -2466,8 +2498,9 @@
:: delay, as opposed to an actual packet loss. :: delay, as opposed to an actual packet loss.
:: ::
++ next-expiry ++ next-expiry
|= [live-packet-key live-packet-val]
^- @da ^- @da
(add now rto) (add last-sent rto)
:: +num-slots: how many packets can we send right now? :: +num-slots: how many packets can we send right now?
:: ::
++ num-slots ++ num-slots
@ -2630,6 +2663,9 @@
(give %send seq %| ok lag=`@dr`0) (give %send seq %| ok lag=`@dr`0)
:: last-acked<seq<=last-heard; heard message, unprocessed :: last-acked<seq<=last-heard; heard message, unprocessed
:: ::
:: Only true if we've heard some packets we haven't acked, which
:: doesn't happen for boons.
::
?: (lte seq last-heard.state) ?: (lte seq last-heard.state)
?: is-last-fragment ?: is-last-fragment
:: drop last packet since we don't know whether to ack or nack :: drop last packet since we don't know whether to ack or nack
@ -2699,7 +2735,7 @@
=. last-heard.state +(last-heard.state) =. last-heard.state +(last-heard.state)
=. live-messages.state (~(del by live-messages.state) seq) =. live-messages.state (~(del by live-messages.state) seq)
:: ::
%- (trace msg.veb |.("hear {<her.channel>} {<num-fragments.u.live>}kb")) %- (trace msg.veb |.("hear {<her.channel>} {<seq>} {<num-fragments.u.live>}kb"))
=/ message=* (assemble-fragments [num-fragments fragments]:u.live) =/ message=* (assemble-fragments [num-fragments fragments]:u.live)
=. message-sink (enqueue-to-vane seq message) =. message-sink (enqueue-to-vane seq message)
:: ::
@ -2727,7 +2763,11 @@
=. last-acked.state +(last-acked.state) =. last-acked.state +(last-acked.state)
=? nax.state !ok (~(put in nax.state) message-num) =? nax.state !ok (~(put in nax.state) message-num)
:: ::
(give %send message-num %| ok lag=`@dr`0) =. message-sink (give %send message-num %| ok lag=`@dr`0)
=/ next ~(top to pending-vane-ack.state)
?~ next
message-sink
(give %memo u.next)
:: +on-drop: drop .message-num from our .nax state :: +on-drop: drop .message-num from our .nax state
:: ::
++ on-drop ++ on-drop
@ -2752,13 +2792,24 @@
[%live %unborn] `"; {(scow %p ship)} has sunk" [%live %unborn] `"; {(scow %p ship)} has sunk"
[%dead %unborn] `"; {(scow %p ship)} has sunk" [%dead %unborn] `"; {(scow %p ship)} has sunk"
== ==
:: +lte-packets: yes if a is before b
::
++ lte-packets
|= [a=live-packet-key b=live-packet-key]
^- ?
::
?: (lth message-num.a message-num.b)
%.y
?: (gth message-num.a message-num.b)
%.n
(lte fragment-num.a fragment-num.b)
:: +split-message: split message into kilobyte-sized fragments :: +split-message: split message into kilobyte-sized fragments
:: ::
++ split-message ++ split-message
|= [=message-num =message-blob] |= [=message-num =message-blob]
^- (list static-fragment) ^- (list static-fragment)
:: ::
=/ fragments=(list fragment) (rip 20 message-blob) =/ fragments=(list fragment) (rip 13 message-blob)
=/ num-fragments=fragment-num (lent fragments) =/ num-fragments=fragment-num (lent fragments)
=| counter=@ =| counter=@
:: ::
@ -2783,7 +2834,7 @@
$(index +(index), sorted [(~(got by fragments) index) sorted]) $(index +(index), sorted [(~(got by fragments) index) sorted])
:: ::
%- cue %- cue
%+ can 20 %+ can 13
%+ turn (flop sorted) %+ turn (flop sorted)
|=(a=@ [1 a]) |=(a=@ [1 a])
:: +bind-duct: find or make new $bone for .duct in .ossuary :: +bind-duct: find or make new $bone for .duct in .ossuary