diff --git a/pkg/arvo/sys/vane/ames.hoon b/pkg/arvo/sys/vane/ames.hoon index 9605039e26..ad6fa54fe1 100644 --- a/pkg/arvo/sys/vane/ames.hoon +++ b/pkg/arvo/sys/vane/ames.hoon @@ -1182,577 +1182,6 @@ => |% :: XX out of here :: - :: +make-message-pump: constructor for |message-pump - :: - ++ make-message-pump - |= [state=message-pump-state =channel closing=? =bone] - =* veb veb.bug.channel - =| gifts=(list message-pump-gift) - :: - |% - ++ message-pump . - ++ give |=(gift=message-pump-gift message-pump(gifts [gift gifts])) - ++ packet-pump (make-packet-pump packet-pump-state.state channel) - ++ trace - |= [verb=? print=(trap tape)] - ^+ same - (^trace verb her.channel ships.bug.channel print) - :: +work: handle a $message-pump-task - :: - ++ work - |= task=message-pump-task - ^+ [gifts state] - :: - =~ (dispatch-task task) - feed-packets - (run-packet-pump %halt ~) - assert - [(flop gifts) state] - == - :: +dispatch-task: perform task-specific processing - :: - ++ dispatch-task - |= task=message-pump-task - ^+ message-pump - :: - ?- -.task - %prod (run-packet-pump %prod ~) - %memo (on-memo message-blob.task) - %wake (run-packet-pump %wake current.state) - %hear - ?- -.ack-meat.task - %& - (on-hear [message-num fragment-num=p.ack-meat]:task) - :: - %| - =/ cork=? - =/ top-live - (pry:packet-queue:*make-packet-pump live.packet-pump-state.state) - :: If we send a %cork and get an ack, we can know by - :: sequence number that the ack is for the %cork message - :: - ?& closing - ?=(^ top-live) - =(0 ~(wyt in unsent-messages.state)) - =(0 (lent unsent-fragments.state)) - =(1 ~(wyt by live.packet-pump-state.state)) - =(message-num:task message-num.key.u.top-live) - == - =* ack p.ack-meat.task - =? message-pump &(cork !ok.ack) (give [%kroc bone]) - =. message-pump - %- on-done - [[message-num:task ?:(ok.ack [%ok ~] [%nack ~])] cork] - ?. &(!ok.ack cork) message-pump - %. message-pump - %+ trace odd.veb - |.("got nack for %cork {}") - == - %near (on-done [[message-num %naxplanation error]:naxplanation.task %&]) - == - :: +on-memo: handle request to send a message - :: - ++ on-memo - |= =message-blob - ^+ message-pump - :: - =. unsent-messages.state (~(put to unsent-messages.state) message-blob) - message-pump - :: +on-hear: handle packet acknowledgment - :: - ++ on-hear - |= [=message-num =fragment-num] - ^+ message-pump - :: pass to |packet-pump unless duplicate or future ack - :: - ?. (is-message-num-in-range message-num) - %- (trace snd.veb |.("hear pump out of range")) - message-pump - (run-packet-pump %hear message-num fragment-num) - :: +on-done: handle message acknowledgment - :: - :: A nack-trace message counts as a valid message nack on the - :: original failed message. - :: - :: This prevents us from having to wait for a message nack packet, - :: which would mean we couldn't immediately ack the nack-trace - :: message, which would in turn violate the semantics of backward - :: flows. - :: - ++ on-done - |= [[=message-num =ack] cork=?] - ^+ message-pump - :: unsent messages from the future should never get acked - :: - ~| :* bone=bone - mnum=message-num - next=next.state - unsent-messages=~(wyt in unsent-messages.state) - unsent-fragments=(lent unsent-fragments.state) - any-live=!=(~ live.packet-pump-state.state) - == - ?> (lth message-num next.state) - :: ignore duplicate message acks - :: - ?: (lth message-num current.state) - %- %+ trace snd.veb - |.("duplicate done {}") - message-pump - :: ignore duplicate and future acks - :: - ?. (is-message-num-in-range message-num) - message-pump - :: clear and print .unsent-fragments if nonempty - :: - =? unsent-fragments.state - &(=(current next) ?=(^ unsent-fragments)):state - :: - ~> %slog.0^leaf/"ames: early message ack {}" - ~ - :: clear all packets from this message from the packet pump - :: - =. message-pump (run-packet-pump %done message-num lag=*@dr) - :: enqueue this ack to be sent back to local client vane - :: - :: Don't clobber a naxplanation with just a nack packet. - :: - =? queued-message-acks.state - =/ old (~(get by queued-message-acks.state) message-num) - !?=([~ %naxplanation *] old) - (~(put by queued-message-acks.state) message-num ack) - :: emit local acks from .queued-message-acks until incomplete - :: - |- ^+ message-pump - :: if .current hasn't been fully acked, we're done - :: - ?~ cur=(~(get by queued-message-acks.state) current.state) - message-pump - :: .current is complete; pop, emit local ack, and try next message - :: - =. queued-message-acks.state - (~(del by queued-message-acks.state) current.state) - :: clear all packets from this message from the packet pump - :: - :: Note we did this when the original packet came in, a few lines - :: above. It's not clear why, but it doesn't always clear the - :: packets when it's not the current message. As a workaround, - :: we clear the packets again when we catch up to this packet. - :: - :: This is slightly inefficient because we run this twice for - :: each packet and it may emit a few unnecessary packets, but - :: but it's not incorrect. pump-metrics are updated only once, - :: at the time when we actually delete the packet. - :: - =. message-pump (run-packet-pump %done current.state lag=*@dr) - :: give %done to vane if we're ready - :: - ?- -.u.cur - %ok - =. message-pump - :: don't give %done for corks - :: - ?: cork (give %cork ~) - (give %done current.state ~) - $(current.state +(current.state)) - :: - %nack - message-pump - :: - %naxplanation - =. message-pump (give %done current.state `error.u.cur) - $(current.state +(current.state)) - == - :: +is-message-num-in-range: %.y unless duplicate or future ack - :: - ++ is-message-num-in-range - |= =message-num - ^- ? - :: - ?: (gte message-num next.state) - %.n - ?: (lth message-num current.state) - %.n - !(~(has by queued-message-acks.state) message-num) - :: +feed-packets: give packets to |packet-pump until full - :: - ++ feed-packets - :: if nothing to send, no-op - :: - ?: &(=(~ unsent-messages) =(~ unsent-fragments)):state - message-pump - :: we have unsent fragments of the current message; feed them - :: - ?. =(~ unsent-fragments.state) - =/ res (feed:packet-pump unsent-fragments.state) - =+ [unsent packet-pump-gifts packet-pump-state]=res - :: - =. unsent-fragments.state unsent - =. packet-pump-state.state packet-pump-state - :: - =. message-pump (process-packet-pump-gifts packet-pump-gifts) - :: if it sent all of them, feed it more; otherwise, we're done - :: - ?~ unsent - feed-packets - message-pump - :: .unsent-messages is nonempty; pop a message off and feed it - :: - =^ =message-blob unsent-messages.state ~(get to unsent-messages.state) - :: break .message into .chunks and set as .unsent-fragments - :: - =. unsent-fragments.state (split-message next.state message-blob) - :: try to feed packets from the next message - :: - =. next.state +(next.state) - feed-packets - :: +run-packet-pump: call +work:packet-pump and process results - :: - ++ run-packet-pump - |= =packet-pump-task - ^+ message-pump - :: - =^ packet-pump-gifts packet-pump-state.state - (work:packet-pump packet-pump-task) - :: - (process-packet-pump-gifts packet-pump-gifts) - :: +process-packet-pump-gifts: pass |packet-pump effects up the chain - :: - ++ process-packet-pump-gifts - |= packet-pump-gifts=(list packet-pump-gift) - ^+ message-pump - :: - ?~ packet-pump-gifts - message-pump - =. message-pump (give i.packet-pump-gifts) - :: - $(packet-pump-gifts t.packet-pump-gifts) - :: +assert: sanity checks to isolate error cases - :: - ++ assert - ^+ message-pump - =/ top-live - (pry:packet-queue:*make-packet-pump live.packet-pump-state.state) - ?. |(?=(~ top-live) (lte current.state message-num.key.u.top-live)) - ~| [%strange-current current=current.state key.u.top-live] - !! - message-pump - -- - :: +make-packet-pump: construct |packet-pump core - :: - ++ make-packet-pump - |= [state=packet-pump-state =channel] - =* veb veb.bug.channel - =| gifts=(list packet-pump-gift) - |% - ++ packet-pump . - ++ give |=(packet-pump-gift packet-pump(gifts [+< gifts])) - ++ trace - |= [verb=? print=(trap tape)] - ^+ same - (^trace verb her.channel ships.bug.channel print) - :: +packet-queue: type for all sent fragments, ordered by sequence number - :: - ++ packet-queue - %- (ordered-map live-packet-key live-packet-val) - lte-packets - :: +live-packets: number of sent packets awaiting ack - :: - ++ live-packets - ^- @ud - ~(wyt by live.state) - :: +gauge: inflate a |pump-gauge to track congestion control - :: - ++ gauge (make-pump-gauge metrics.state live-packets [now her bug]:channel) - :: +work: handle $packet-pump-task request - :: - ++ work - |= task=packet-pump-task - ^+ [gifts state] - :: - =- [(flop gifts) state] - :: - ?- -.task - %hear (on-hear [message-num fragment-num]:task) - %done (on-done message-num.task) - %wake (on-wake current.task) - %prod on-prod - %halt set-wake - == - :: +on-prod: reset congestion control, re-send packets - :: - ++ on-prod - ^+ packet-pump - ?: =(~ next-wake.state) - packet-pump - :: - =. metrics.state %*(. *pump-metrics counter counter.metrics.state) - =. live.state - %+ run:packet-queue live.state - |=(p=live-packet-val p(- *packet-state)) - :: - =/ sot (max 1 num-slots:gauge) - =/ liv live.state - |- ^+ packet-pump - ?: =(0 sot) packet-pump - ?: =(~ liv) packet-pump - =^ hed liv (pop:packet-queue liv) - =. packet-pump (give %send (to-static-fragment hed)) - $(sot (dec sot)) - :: +on-wake: handle packet timeout - :: - ++ on-wake - |= current=message-num - ^+ packet-pump - :: assert temporal coherence - :: - ?< =(~ next-wake.state) - =. next-wake.state ~ - :: tell congestion control a packet timed out - :: - =. metrics.state on-timeout:gauge - :: re-send first packet and update its state in-place - :: - =- =* res - - =. live.state live.res - =? packet-pump ?=(^ static-fragment) - %- %+ trace snd.veb - =/ nums [message-num fragment-num]:u.static-fragment.res - |.("dead {}") - (give %send u.static-fragment.res) - packet-pump - :: - =| acc=(unit static-fragment) - ^+ [static-fragment=acc live=live.state] - :: - %^ (dip:packet-queue _acc) live.state acc - |= $: acc=_acc - key=live-packet-key - val=live-packet-val - == - ^- [new-val=(unit live-packet-val) stop=? _acc] - :: if already acked later message, don't resend - :: - ?: (lth message-num.key current) - %- %- slog :_ ~ - leaf+"ames: strange wake queue, expected {}, got {}" - [~ stop=%.n ~] - :: packet has expired; update it in-place, stop, and produce it - :: - =. last-sent.val now.channel - =. retries.val +(retries.val) - :: - [`val stop=%.y `(to-static-fragment key val)] - :: +feed: try to send a list of packets, returning unsent and effects - :: - ++ feed - |= fragments=(list static-fragment) - ^+ [fragments gifts state] - :: return unsent back to caller and reverse effects to finalize - :: - =- [unsent (flop gifts) state] - :: - ^+ [unsent=fragments packet-pump] - :: bite off as many fragments as we can send - :: - =/ num-slots num-slots:gauge - =/ sent (scag num-slots fragments) - =/ unsent (slag num-slots fragments) - :: - :- unsent - ^+ packet-pump - :: if nothing to send, we're done - :: - ?~ sent packet-pump - :: convert $static-fragment's into +ordered-set [key val] pairs - :: - =/ send-list - %+ turn sent - |= static-fragment - ^- [key=live-packet-key val=live-packet-val] - :: - :- [message-num fragment-num] - :- [sent-date=now.channel retries=0 skips=0] - [num-fragments fragment] - :: update .live and .metrics - :: - =. live.state (gas:packet-queue live.state send-list) - :: TMI - :: - => .(sent `(list static-fragment)`sent) - :: emit a $packet-pump-gift for each packet to send - :: - %+ roll sent - |= [packet=static-fragment core=_packet-pump] - (give:core %send packet) - :: +fast-resend-after-ack: resend timed out packets - :: - :: After we finally receive an ack, we want to resend all the live - :: packets that have been building up. - :: - ++ fast-resend-after-ack - |= [=message-num =fragment-num] - ^+ packet-pump - =; res=[resends=(list static-fragment) live=_live.state] - =. live.state live.res - %+ reel resends.res - |= [packet=static-fragment core=_packet-pump] - (give:core %send packet) - :: - =/ acc - resends=*(list static-fragment) - :: - %^ (dip:packet-queue _acc) live.state acc - |= $: acc=_acc - key=live-packet-key - val=live-packet-val - == - ^- [new-val=(unit live-packet-val) stop=? _acc] - ?: (lte-packets key [message-num fragment-num]) - [new-val=`val stop=%.n acc] - :: - ?: (gth (next-expiry:gauge key val) now.channel) - [new-val=`val stop=%.y acc] - :: - =. last-sent.val now.channel - =. resends.acc [(to-static-fragment key val) resends.acc] - [new-val=`val stop=%.n acc] - :: +on-hear: handle ack on a live packet - :: - :: If the packet was in our queue, delete it and update our - :: metrics, possibly re-sending skipped packets. Otherwise, no-op. - :: - ++ on-hear - |= [=message-num =fragment-num] - ^+ packet-pump - :: - =- :: if no sent packet matches the ack, don't apply mutations or effects - :: - ?. found.- - %- (trace snd.veb |.("miss {}")) - packet-pump - :: - =. metrics.state metrics.- - =. live.state live.- - %- ?. ?| =(0 fragment-num) - =(0 (mod counter.metrics.state 20)) - == - same - (trace snd.veb |.("send: {}")) - :: .resends is backward, so fold backward and emit - :: - =. packet-pump - %+ reel resends.- - |= [packet=static-fragment core=_packet-pump] - (give:core %send packet) - (fast-resend-after-ack message-num fragment-num) - :: - =/ acc - :* found=`?`%.n - resends=*(list static-fragment) - metrics=metrics.state - == - :: - ^+ [acc live=live.state] - :: - %^ (dip:packet-queue _acc) live.state acc - |= $: acc=_acc - key=live-packet-key - val=live-packet-val - == - ^- [new-val=(unit live-packet-val) stop=? _acc] - :: - =/ gauge (make-pump-gauge metrics.acc live-packets [now her bug]:channel) - :: is this the acked packet? - :: - ?: =(key [message-num fragment-num]) - :: delete acked packet, update metrics, and stop traversal - :: - =. found.acc %.y - =. metrics.acc (on-ack:gauge -.val) - [new-val=~ stop=%.y acc] - :: is this a duplicate ack? - :: - ?. (lte-packets key [message-num fragment-num]) - :: stop, nothing more to do - :: - [new-val=`val stop=%.y acc] - :: ack was on later packet; mark skipped, tell gauge, and continue - :: - =. skips.val +(skips.val) - =^ resend metrics.acc (on-skipped-packet:gauge -.val) - ?. resend - [new-val=`val stop=%.n acc] - :: - =. last-sent.val now.channel - =. retries.val +(retries.val) - =. resends.acc [(to-static-fragment key val) resends.acc] - [new-val=`val stop=%.n acc] - :: +on-done: apply ack to all packets from .message-num - :: - ++ on-done - |= =message-num - ^+ packet-pump - :: - =- =. metrics.state metrics.- - =. live.state live.- - :: - %- (trace snd.veb |.("done {}")) - (fast-resend-after-ack message-num `fragment-num`0) - :: - ^+ [metrics=metrics.state live=live.state] - :: - %^ (dip:packet-queue pump-metrics) live.state acc=metrics.state - |= $: metrics=pump-metrics - key=live-packet-key - val=live-packet-val - == - ^- [new-val=(unit live-packet-val) stop=? pump-metrics] - :: - =/ gauge (make-pump-gauge metrics live-packets [now her bug]:channel) - :: if we get an out-of-order ack for a message, skip until it - :: - ?: (lth message-num.key message-num) - [new-val=`val stop=%.n metrics] - :: if packet was from acked message, delete it and continue - :: - ?: =(message-num.key message-num) - [new-val=~ stop=%.n metrics=(on-ack:gauge -.val)] - :: we've gone past the acked message; we're done - :: - [new-val=`val stop=%.y metrics] - :: +set-wake: set, unset, or reset timer, emitting moves - :: - ++ set-wake - ^+ packet-pump - :: if nonempty .live, pry at head to get next wake time - :: - =/ new-wake=(unit @da) - ?~ head=(pry:packet-queue live.state) - ~ - `(next-expiry:gauge u.head) - :: no-op if no change - :: - ?: =(new-wake next-wake.state) packet-pump - :: unset old timer if non-null - :: - =? packet-pump !=(~ next-wake.state) - =/ old (need next-wake.state) - =. next-wake.state ~ - (give %rest old) - :: set new timer if non-null - :: - =? packet-pump ?=(^ new-wake) - =. next-wake.state new-wake - (give %wait u.new-wake) - :: - packet-pump - -- - :: +to-static-fragment: convenience function for |packet-pump - :: - ++ to-static-fragment - |= [live-packet-key live-packet-val] - ^- static-fragment - [message-num num-fragments fragment-num fragment] :: +make-pump-gauge: construct |pump-gauge congestion control core :: ++ make-pump-gauge @@ -2136,6 +1565,7 @@ %- %^ ev-trace odd.veb her |.("parsing old wire: {(spud wire)}") peer-core + =< abet ?~ error (send-ack bone) (send-nack bone u.error) @@ -2144,14 +1574,14 @@ :: ++ send-ack |= =bone - ^+ event-core + ^+ peer-core =/ cork=? (~(has in closing.peer-state) bone) - abet:(run-message-sink:peer-core bone %done ok=%.y cork) + (run-message-sink:peer-core bone %done ok=%.y cork) :: failed; send message nack packet :: ++ send-nack |= [=bone =^error] - ^+ event-core + ^+ peer-core =. event-core abet:(run-message-sink:peer-core bone %done ok=%.n cork=%.n) =/ =^peer-state (got-peer-state her) @@ -2166,7 +1596,8 @@ =. peer-core (pe peer-state channel) =/ nack-trace-bone=^bone (mix 0b10 bone) :: - abet:(run-message-pump:peer-core nack-trace-bone %memo message-blob) + =+ pump-core=(mu:peer-core nack-trace-bone *message-pump-state) + abet:(call:abed:pump-core %memo message-blob) -- :: +on-sift: handle request to filter debug output by ship :: @@ -2223,8 +1654,9 @@ =/ peer-core (pe u.par channel) =/ bones ~(tap in ~(key by snd.u.par)) |- ^+ event-core - ?~ bones abet:peer-core - =. peer-core (run-message-pump:peer-core i.bones %prod ~) + ?~ bones abet:peer-core + =/ pump-core (mu:peer-core i.bones *message-pump-state) + =. peer-core abet:(call:abed:pump-core %prod ~) $(bones t.bones) -- :: +on-cong: adjust congestion control parameters @@ -3196,6 +2628,7 @@ (~(put by peers.ames-state) her.channel %known peer-state) :: event-core + :: ++ pe-trace |= [verb=? print=(trap tape)] ^+ same @@ -3209,6 +2642,8 @@ ~| %dangling-bone^her.channel^bone (~(got by by-bone.ossuary.peer-state) bone) :: + ++ pump-core |=(=bone (mu bone *message-pump-state)) + :: +| %tasks :: ++ on-heed peer-core(heeds.peer-state (~(put in heeds.peer-state) duct)) @@ -3268,7 +2703,8 @@ ?. msg.veb ~ :- >[bone=bone message-num=message-num meat=meat]:shut-packet< tang.u.dud - (run-message-pump bone %hear [message-num +.meat]:shut-packet) + =< abet + (call:abed:(pump-core bone) %hear [message-num +.meat]:shut-packet) :: +on-memo: handle request to send message :: ++ on-memo @@ -3284,7 +2720,7 @@ peer-core :: =/ =message-blob (dedup-message (jim payload)) - =. peer-core (run-message-pump bone %memo message-blob) + =. peer-core abet:(call:abed:(pump-core bone) %memo message-blob) :: ?: ?& =(%boon valence) (gte now (add ~s30 last-contact.qos.peer-state)) @@ -3364,7 +2800,7 @@ peer-core :: maybe resend some timed out packets :: - (run-message-pump bone %wake ~) + abet:(call:abed:(pump-core bone) %wake ~) :: +| %implementation :: +dedup-message: replace with any existing copy of this message @@ -3501,38 +2937,245 @@ (on-memo i.boz plea %plea) :: +| %internals - :: +run-message-pump: process $message-pump-task and its effects + :: +mu: constructor for |message-pump :: - ++ run-message-pump - |= [=bone task=message-pump-task] - ^+ peer-core - :: pass .task to the |message-pump and apply state mutations + ++ mu + |= [=bone state=message-pump-state] + |% :: - =/ =message-pump-state - (~(gut by snd.peer-state) bone *message-pump-state) + +| %helpers :: - =/ close=? (~(has in closing.peer-state) bone) - =+ message-pump=(make-message-pump message-pump-state channel close bone) - =^ pump-gifts message-pump-state (work:message-pump task) - =. snd.peer-state (~(put by snd.peer-state) bone message-pump-state) - :: process effects from |message-pump + ++ pump . + ++ abed + pump(state (~(gut by snd.peer-state) bone *message-pump-state)) :: - |^ ^+ peer-core - ?~ pump-gifts peer-core - =* gift i.pump-gifts - =. peer-core - ?- -.gift - %done (on-pump-done [message-num error]:gift) - %cork (on-pump-cork current.message-pump-state) - %kroc (on-pump-kroc bone:gift) - %send (on-pump-send static-fragment.gift) - %wait (on-pump-wait date.gift) - %rest (on-pump-rest date.gift) + ++ abet + :: if the bone was corked, it's been removed from the state, + :: so we avoid adding it again. + :: + =? snd.peer-state !corked (~(put by snd.peer-state) bone state) + peer-core + :: + ++ packet-pump (pu packet-pump-state.state) + ++ mu-trace + |= [verb=? print=(trap tape)] + ^+ same + (trace verb her.channel ships.bug.channel print) + :: + ++ closing (~(has in closing.peer-state) bone) + ++ corked (~(has in corked.peer-state) bone) + :: +is-message-num-in-range: %.y unless duplicate or future ack + :: + ++ is-message-num-in-range + |= =message-num + ^- ? + :: + ?: (gte message-num next.state) + %.n + ?: (lth message-num current.state) + %.n + !(~(has by queued-message-acks.state) message-num) + :: + +| %entry-points + :: +call: handle a $message-pump-task + :: + ++ call + |= task=message-pump-task + ^+ pump + :: + =. pump =~((dispatch-task task) feed-packets) + =+ top=top-live:packet-pump + :: sanity check to isolate error cases + :: + ?. |(?=(~ top) (lte current.state message-num.key.u.top)) + ~| [%strange-current current=current.state key.u.top] + !! + :: maybe trigger a timer based on congestion control calculations + :: + abet:(call:packet-pump %halt ~) + :: + +| %tasks + :: +dispatch-task: perform task-specific processing + :: + ++ dispatch-task + |= task=message-pump-task + ^+ pump + :: + ?- -.task + %memo (on-memo message-blob.task) + %prod abet:(call:packet-pump %prod ~) + %wake abet:(call:packet-pump %wake current.state) + %near %- on-done + [[message-num %naxplanation error]:naxplanation.task %&] + %hear + ?- -.ack-meat.task + %& + (on-hear [message-num fragment-num=p.ack-meat]:task) + :: + %| + =/ cork=? + =+ top=top-live:packet-pump + :: If we send a %cork and get an ack, we can know by + :: sequence number that the ack is for the %cork message + :: + ?& closing + ?=(^ top) + =(0 ~(wyt in unsent-messages.state)) + =(0 (lent unsent-fragments.state)) + =(1 ~(wyt by live.packet-pump-state.state)) + =(message-num:task message-num.key.u.top) + == + =* ack p.ack-meat.task + =. pump + %- on-done + [[message-num:task ?:(ok.ack [%ok ~] [%nack ~])] cork] + ?. &(!ok.ack cork) pump + %. pump + %+ mu-trace odd.veb |. + "got nack for %cork {}" + == == + :: +on-memo: handle request to send a message + :: + ++ on-memo + |= blob=message-blob + pump(unsent-messages.state (~(put to unsent-messages.state) blob)) + :: +on-hear: handle packet acknowledgment + :: + ++ on-hear + |= [=message-num =fragment-num] + ^+ pump + :: pass to |packet-pump unless duplicate or future ack + :: + ?. (is-message-num-in-range message-num) + %. pump + (mu-trace snd.veb |.("hear pump out of range")) + abet:(call:packet-pump %hear message-num fragment-num) + :: +on-done: handle message acknowledgment + :: + :: A nack-trace message counts as a valid message nack on the + :: original failed message. + :: + :: This prevents us from having to wait for a message nack packet, + :: which would mean we couldn't immediately ack the nack-trace + :: message, which would in turn violate the semantics of backward + :: flows. + :: + ++ on-done + |= [[=message-num =ack] cork=?] + ^+ pump + :: unsent messages from the future should never get acked + :: + ~| :* bone=bone + mnum=message-num + next=next.state + unsent-messages=~(wyt in unsent-messages.state) + unsent-fragments=(lent unsent-fragments.state) + any-live=!=(~ live.packet-pump-state.state) == - $(pump-gifts t.pump-gifts) - :: +on-pump-done: handle |message-pump's report of message (n)ack + ?> (lth message-num next.state) + :: ignore duplicate message acks + :: + ?: (lth message-num current.state) + %- %+ mu-trace snd.veb + |.("duplicate done {}") + pump + :: ignore duplicate and future acks + :: + ?. (is-message-num-in-range message-num) + pump + :: clear and print .unsent-fragments if nonempty + :: + =? unsent-fragments.state + &(=(current next) ?=(^ unsent-fragments)):state + :: + ~> %slog.0^leaf/"ames: early message ack {}" + ~ + :: clear all packets from this message from the packet pump + :: + =. pump abet:(call:packet-pump %done message-num lag=*@dr) + :: enqueue this ack to be sent back to local client vane + :: + :: Don't clobber a naxplanation with just a nack packet. + :: + =? queued-message-acks.state + =/ old (~(get by queued-message-acks.state) message-num) + !?=([~ %naxplanation *] old) + (~(put by queued-message-acks.state) message-num ack) + :: emit local acks from .queued-message-acks until incomplete + :: + |- ^+ pump + :: if .current hasn't been fully acked, we're done + :: + ?~ cur=(~(get by queued-message-acks.state) current.state) + pump + :: .current is complete; pop, emit local ack, and try next message + :: + =. queued-message-acks.state + (~(del by queued-message-acks.state) current.state) + :: clear all packets from this message from the packet pump + :: + :: Note we did this when the original packet came in, a few lines + :: above. It's not clear why, but it doesn't always clear the + :: packets when it's not the current message. As a workaround, + :: we clear the packets again when we catch up to this packet. + :: + :: This is slightly inefficient because we run this twice for + :: each packet and it may emit a few unnecessary packets, but + :: but it's not incorrect. pump-metrics are updated only once, + :: at the time when we actually delete the packet. + :: + =. pump abet:(call:packet-pump %done current.state lag=*@dr) + :: give %done to vane if we're ready + :: + ?- -.u.cur + %ok + =. peer-core + :: don't give %done for corks + :: + ?: cork (pump-cork current.state) + (pump-done current.state ~) + $(current.state +(current.state)) + :: + %nack + pump + :: + %naxplanation + =. peer-core (pump-done current.state `error.u.cur) + $(current.state +(current.state)) + == :: - ++ on-pump-done + +| %implementation + :: +feed-packets: give packets to |packet-pump until full + :: + ++ feed-packets + :: if nothing to send, no-op + :: + ?: &(=(~ unsent-messages) =(~ unsent-fragments)):state + pump + :: we have unsent fragments of the current message; feed them + :: + ?. =(~ unsent-fragments.state) + + :: we have unsent fragments of the current message; feed them + :: + =^ unsent pump abut:(feed:packet-pump unsent-fragments.state) + =. unsent-fragments.state unsent + :: if it sent all of them, feed it more; otherwise, we're done + :: + ?~(unsent feed-packets pump) + :: .unsent-messages is nonempty; pop a message off and feed it + :: + =^ =message-blob unsent-messages.state ~(get to unsent-messages.state) + :: break .message into .chunks and set as .unsent-fragments + :: + =. unsent-fragments.state (split-message next.state message-blob) + :: try to feed packets from the next message + :: + =. next.state +(next.state) + feed-packets + :: +pump-done: handle |message-pump's report of message (n)ack + :: + ++ pump-done |= [=message-num error=(unit error)] ^+ peer-core ?: ?& =(1 (end 0 bone)) @@ -3542,22 +3185,24 @@ %- %+ pe-trace msg.veb =/ dat [her.channel bone=bone message-num=message-num -.task] |.("remove naxplanation flow {}") - =. snd.peer-state - (~(del by snd.peer-state) bone) + :: XX FIXME this happens before abet, so we'd put the bone in again + :: we check for this case in abet:mu; test that it works + :: + =. snd.peer-state (~(del by snd.peer-state) bone) peer-core :: if odd bone, ack is on "subscription update" message; no-op :: - ?: =(1 (end 0 bone)) - peer-core + ?: =(1 (end 0 bone)) peer-core :: even bone; is this bone a nack-trace bone? :: ?: =(1 (end 0 (rsh 0 bone))) :: nack-trace bone; assume .ok, clear nack from |message-sink :: =/ target-bone=^bone (mix 0b10 bone) - :: + :: XX refactor + ::abet:(call:abed:(mi target-bone *message-sink-state) %drop message-num) (run-message-sink target-bone %drop message-num) - ?: &(close ?=(%near -.task)) + ?: &(closing ?=(%near -.task)) :: if the bone belongs to a closing flow and we got a naxplanation, :: don't relay the ack to the client vane, and wait for the next try :: @@ -3565,14 +3210,15 @@ :: not a nack-trace bone; relay ack to client vane :: (pe-emit (got-duct bone) %give %done error) - :: +on-pump-cork: kill flow on cork sender side + :: XX impure +abet pattern + :: +pump-cork: kill flow on cork sender side :: - ++ on-pump-cork + ++ pump-cork |= =message-num ^+ peer-core :: clear all packets from this message from the packet pump :: - =. message-pump (run-packet-pump:message-pump %done message-num *@dr) + =. pump abet:(call:packet-pump %done message-num lag=*@dr) =/ =wire (make-pump-timer-wire her.channel bone) =/ nack-bone=^bone (mix 0b10 bone) =? rcv.peer-state (~(has by rcv.peer-state) nack-bone) @@ -3593,36 +3239,345 @@ :: since we got one cork ack, try the next one :: recork-one - :: +on-pump-kroc: if we get a nack for a cork, add it to the recork set + :: XX refactor wait/rest + :: +pump-wait: relay |message-pump's set-timer request :: - ++ on-pump-kroc - |= =^bone - ^+ peer-core - =. krocs.peer-state (~(put in krocs.peer-state) bone) - peer-core - :: +on-pump-send: emit message fragment requested by |message-pump - :: - ++ on-pump-send - |= f=static-fragment - (send-shut-packet bone [message-num %& +]:f) - :: +on-pump-wait: relay |message-pump's set-timer request - :: - ++ on-pump-wait + ++ pump-wait |= date=@da ^+ peer-core - :: - =/ =wire (make-pump-timer-wire her.channel bone) - =/ duct ~[/ames] - (pe-emit duct %pass wire %b %wait date) - :: +on-pump-rest: relay |message-pump's unset-timer request + %+ pe-emit ~[/ames] + [%pass (make-pump-timer-wire her.channel bone) %b %wait date] + :: +pump-rest: relay |message-pump's unset-timer request :: - ++ on-pump-rest + ++ pump-rest |= date=@da ^+ peer-core + %+ pe-emit ~[/ames] + [%pass (make-pump-timer-wire her.channel bone) %b %rest date] + :: +pu: construct |packet-pump core + :: + ++ pu + |= state=packet-pump-state :: - =/ =wire (make-pump-timer-wire her.channel bone) - =/ duct ~[/ames] - (pe-emit duct %pass wire %b %rest date) + =| unsent=(list static-fragment) + |% + +| %helpers + ++ pack . + :: XX +abut: abet with gifts + :: + ++ abut [unsent abet] + ++ abet pump(packet-pump-state.state state) + ++ pu-trace + |= [verb=? print=(trap tape)] + ^+ same + (trace verb her.channel ships.bug.channel print) + :: + ++ pu-emit |=(=note (pe-emit pump-duct %pass pump-wire note)) + :: +packet-queue: type for all sent fragments, ordered by sequence number + :: + ++ packet-queue + %- (ordered-map live-packet-key live-packet-val) + lte-packets + :: +live-packets: number of sent packets awaiting ack + :: + ++ live-packets + ^- @ud + ~(wyt by live.state) + :: +gauge: inflate a |pump-gauge to track congestion control + :: + ++ gauge (make-pump-gauge metrics.state live-packets [now her bug]:channel) + :: +to-static-fragment: convenience function for |packet-pump + :: + ++ to-static-fragment + |= [live-packet-key live-packet-val] + ^- static-fragment + [message-num num-fragments fragment-num fragment] + :: + ++ pump-wire (make-pump-timer-wire her.channel bone) + ++ pump-duct ~[/ames] + ++ top-live (pry:packet-queue live.state) + :: + +| %entry-points + :: + ++ call + |= task=packet-pump-task + ^+ pack + :: + ?- -.task + %hear (on-hear [message-num fragment-num]:task) + %done (on-done message-num.task) + %wake (on-wake current.task) + %prod on-prod + %halt set-wake + == + :: +feed: try to send a list of packets, returning unsent ones + :: + ++ feed + |= fragments=(list static-fragment) + ^+ pack + :: bite off as many fragments as we can send + :: + =/ num-slots num-slots:gauge + =/ sent (scag num-slots fragments) + =. unsent (slag num-slots fragments) + :: if nothing to send, we're done + :: + ?~ sent pack + :: convert $static-fragment's into +ordered-set [key val] pairs + :: + =/ send-list + %+ turn sent + |= static-fragment + ^- [key=live-packet-key val=live-packet-val] + :: + :- [message-num fragment-num] + :- [sent-date=now.channel retries=0 skips=0] + [num-fragments fragment] + :: update .live and .metrics + :: + =. live.state (gas:packet-queue live.state send-list) + :: TMI + :: + => .(sent `(list static-fragment)`sent) + :: emit a $shut-packet for each packet to send + :: + =. peer-core + %+ roll sent + |= [packet=static-fragment core=_peer-core] + (send-shut-packet bone [message-num %& +]:packet) + pack + :: + +| %tasks + :: +on-prod: reset congestion control, re-send packets + :: + ++ on-prod + ^+ pack + ?: =(~ next-wake.state) + pack + :: + =. metrics.state %*(. *pump-metrics counter counter.metrics.state) + =. live.state + %+ run:packet-queue live.state + |=(p=live-packet-val p(- *packet-state)) + :: + =/ sot (max 1 num-slots:gauge) + =/ liv live.state + |- ^+ pack + ?: =(0 sot) pack + ?: =(~ liv) pack + =^ hed liv (pop:packet-queue liv) + =. peer-core + (send-shut-packet bone [message-num %& +]:(to-static-fragment hed)) + $(sot (dec sot)) + :: +on-wake: handle packet timeout + :: + ++ on-wake + |= current=message-num + ^+ pack + :: assert temporal coherence + :: + ?< =(~ next-wake.state) + =. next-wake.state ~ + :: tell congestion control a packet timed out + :: + =. metrics.state on-timeout:gauge + :: re-send first packet and update its state in-place + :: + =- =* res - + =. live.state live.res + =? peer-core ?=(^ static-fragment) + %- %+ pu-trace snd.veb + =/ nums [message-num fragment-num]:u.static-fragment.res + |.("dead {}") + (send-shut-packet bone [message-num %& +]:u.static-fragment) + pack + :: + =| acc=(unit static-fragment) + ^+ [static-fragment=acc live=live.state] + :: + %^ (dip:packet-queue _acc) live.state acc + |= $: acc=_acc + key=live-packet-key + val=live-packet-val + == + ^- [new-val=(unit live-packet-val) stop=? _acc] + :: if already acked later message, don't resend + :: + ?: (lth message-num.key current) + %- %- slog :_ ~ + leaf+"ames: strange wake queue, expected {}, got {}" + [~ stop=%.n ~] + :: packet has expired; update it in-place, stop, and produce it + :: + =. last-sent.val now.channel + =. retries.val +(retries.val) + :: + [`val stop=%.y `(to-static-fragment key val)] + :: +fast-resend-after-ack: resend timed out packets + :: + :: After we finally receive an ack, we want to resend all the live + :: packets that have been building up. + :: + ++ fast-resend-after-ack + |= [=message-num =fragment-num] + ^+ pack + =; res=[resends=(list static-fragment) live=_live.state] + =. live.state live.res + =. peer-core + %+ reel resends.res + |= [packet=static-fragment core=_peer-core] + (send-shut-packet bone [message-num %& +]:packet) + pack + :: + =/ acc + resends=*(list static-fragment) + :: + %^ (dip:packet-queue _acc) live.state acc + |= $: acc=_acc + key=live-packet-key + val=live-packet-val + == + ^- [new-val=(unit live-packet-val) stop=? _acc] + ?: (lte-packets key [message-num fragment-num]) + [new-val=`val stop=%.n acc] + :: + ?: (gth (next-expiry:gauge key val) now.channel) + [new-val=`val stop=%.y acc] + :: + =. last-sent.val now.channel + =. resends.acc [(to-static-fragment key val) resends.acc] + [new-val=`val stop=%.n acc] + :: +on-hear: handle ack on a live packet + :: + :: If the packet was in our queue, delete it and update our + :: metrics, possibly re-sending skipped packets. Otherwise, no-op. + :: + ++ on-hear + |= [=message-num =fragment-num] + ^+ pack + :: + =- :: if no sent packet matches the ack, don't apply mutations or effects + :: + ?. found.- + %- (pu-trace snd.veb |.("miss {}")) + pack + :: + =. metrics.state metrics.- + =. live.state live.- + %- ?. ?| =(0 fragment-num) + =(0 (mod counter.metrics.state 20)) + == + same + (pu-trace snd.veb |.("send: {}")) + :: .resends is backward, so fold backward and emit + :: + =. peer-core + %+ reel resends.- + |= [packet=static-fragment core=_peer-core] + (send-shut-packet bone [message-num %& +]:packet) + (fast-resend-after-ack message-num fragment-num) + :: + =/ acc + :* found=`?`%.n + resends=*(list static-fragment) + metrics=metrics.state + == + :: + ^+ [acc live=live.state] + :: + %^ (dip:packet-queue _acc) live.state acc + |= $: acc=_acc + key=live-packet-key + val=live-packet-val + == + ^- [new-val=(unit live-packet-val) stop=? _acc] + :: + =/ gauge (make-pump-gauge metrics.acc live-packets [now her bug]:channel) + :: is this the acked packet? + :: + ?: =(key [message-num fragment-num]) + :: delete acked packet, update metrics, and stop traversal + :: + =. found.acc %.y + =. metrics.acc (on-ack:gauge -.val) + [new-val=~ stop=%.y acc] + :: is this a duplicate ack? + :: + ?. (lte-packets key [message-num fragment-num]) + :: stop, nothing more to do + :: + [new-val=`val stop=%.y acc] + :: ack was on later packet; mark skipped, tell gauge, and continue + :: + =. skips.val +(skips.val) + =^ resend metrics.acc (on-skipped-packet:gauge -.val) + ?. resend + [new-val=`val stop=%.n acc] + :: + =. last-sent.val now.channel + =. retries.val +(retries.val) + =. resends.acc [(to-static-fragment key val) resends.acc] + [new-val=`val stop=%.n acc] + :: +on-done: apply ack to all packets from .message-num + :: + ++ on-done + |= =message-num + ^+ pack + :: + =- =. metrics.state metrics.- + =. live.state live.- + :: + %- (pu-trace snd.veb |.("done {}")) + (fast-resend-after-ack message-num `fragment-num`0) + :: + ^+ [metrics=metrics.state live=live.state] + :: + %^ (dip:packet-queue pump-metrics) live.state acc=metrics.state + |= $: metrics=pump-metrics + key=live-packet-key + val=live-packet-val + == + ^- [new-val=(unit live-packet-val) stop=? pump-metrics] + :: + =/ gauge (make-pump-gauge metrics live-packets [now her bug]:channel) + :: if we get an out-of-order ack for a message, skip until it + :: + ?: (lth message-num.key message-num) + [new-val=`val stop=%.n metrics] + :: if packet was from acked message, delete it and continue + :: + ?: =(message-num.key message-num) + [new-val=~ stop=%.n metrics=(on-ack:gauge -.val)] + :: we've gone past the acked message; we're done + :: + [new-val=`val stop=%.y metrics] + :: +set-wake: set, unset, or reset timer, emitting moves + :: + ++ set-wake + ^+ pack + :: if nonempty .live, pry at head to get next wake time + :: + =/ new-wake=(unit @da) + ?~ head=(pry:packet-queue live.state) + ~ + `(next-expiry:gauge u.head) + :: no-op if no change + :: + ?: =(new-wake next-wake.state) pack + :: unset old timer if non-null + :: + =? peer-core !=(~ next-wake.state) + (pu-emit %b %rest (need next-wake.state)) + :: set new timer if non-null + :: + =? peer-core ?=(^ new-wake) + (pu-emit %b %wait u.new-wake) + :: + =? next-wake.state !=(~ next-wake.state) ~ :: unset + =? next-wake.state ?=(^ new-wake) new-wake :: reset + :: + pack + -- -- :: +run-message-sink: process $message-sink-task and its effects :: @@ -3762,18 +3717,19 @@ =. peer-core (run-message-sink bone %done ok=%.y cork=%.n) :: flip .bone's second bit to find referenced flow :: - =/ target-bone=^bone (mix 0b10 bone) + =/ target=^bone (mix 0b10 bone) :: notify |message-pump that this message got naxplained :: - =. peer-core (run-message-pump target-bone %near naxplanation) + =. peer-core + abet:(call:abed:(pump-core target) %near naxplanation) :: - ?. (~(has in krocs.peer-state) target-bone) + ?. (~(has in krocs.peer-state) target) peer-core :: if we get a naxplanation for a %cork, the publisher is behind :: receiving the OTA. The /recork timer will retry eventually. :: %- %+ pe-trace msg.veb - |.("old publisher, %cork nacked on bone={}") + |.("old publisher, %cork nacked on bone={}") peer-core :: +on-sink-plea: handle request message received by |message-sink :: @@ -3821,7 +3777,7 @@ =/ =naxplanation [message-num *error] =/ =message-blob (jam naxplanation) :: - (run-message-pump nack-trace-bone %memo message-blob) + abet:(call:abed:(pump-core nack-trace-bone) %memo message-blob) -- -- --