From 8d9e9c20f57bb76b88bc0d16df980aa84df7ac9b Mon Sep 17 00:00:00 2001 From: yosoyubik Date: Wed, 22 Feb 2023 10:50:20 +0100 Subject: [PATCH] ames: refactor message-sink --- pkg/arvo/sys/vane/ames.hoon | 983 +++++++++++++++++------------------- 1 file changed, 459 insertions(+), 524 deletions(-) diff --git a/pkg/arvo/sys/vane/ames.hoon b/pkg/arvo/sys/vane/ames.hoon index b4073d77bc..6d15875f94 100644 --- a/pkg/arvo/sys/vane/ames.hoon +++ b/pkg/arvo/sys/vane/ames.hoon @@ -864,7 +864,7 @@ :: .ok: %.y unless previous failed attempt :: +$ message-sink-task - $% [%done ok=? cork=?] + $% [%done ok=?] [%drop =message-num] [%hear =lane =shut-packet ok=?] == @@ -1179,211 +1179,6 @@ [moz larval-core(cached-state ~)] -- :: -=> |% - :: XX out of here - :: - :: +make-message-sink: construct |message-sink message receiver core - :: - ++ make-message-sink - |= [state=message-sink-state =channel] - =* veb veb.bug.channel - =| gifts=(list message-sink-gift) - |% - ++ message-sink . - ++ give |=(message-sink-gift message-sink(gifts [+< gifts])) - ++ trace - |= [verb=? print=(trap tape)] - ^+ same - (^trace verb her.channel ships.bug.channel print) - :: +work: handle a $message-sink-task - :: - ++ work - |= [closing=? task=message-sink-task] - ^+ [gifts state] - :: - =- [(flop gifts) state] - :: - ?- -.task - %done (on-done ok.task cork.task) - %drop (on-drop message-num.task) - %hear (on-hear closing [lane shut-packet ok]:task) - == - :: +on-hear: receive message fragment, possibly completing message - :: - ++ on-hear - |= [closing=? =lane =shut-packet ok=?] - ^+ message-sink - :: we know this is a fragment, not an ack; expose into namespace - :: - ?> ?=(%& -.meat.shut-packet) - =+ [num-fragments fragment-num fragment]=+.meat.shut-packet - :: seq: message sequence number, for convenience - :: - =/ seq message-num.shut-packet - :: ignore messages from far future; limit to 10 in progress - :: - ?: (gte seq (add 10 last-acked.state)) - %- %+ trace odd.veb - |.("future %hear {}") - message-sink - :: - =/ is-last-fragment=? =(+(fragment-num) num-fragments) - :: always ack a dupe! - :: - ?: (lte seq last-acked.state) - ?. is-last-fragment - :: single packet ack - :: - %- %+ trace rcv.veb - |.("send dupe ack {}") - (give %send seq %& fragment-num) - :: whole message (n)ack - :: - =/ ok=? !(~(has in nax.state) seq) - %- (trace rcv.veb |.("send dupe message ack {} ok={}")) - (give %send seq %| ok lag=`@dr`0) - :: last-acked}" - message-sink - :: ack all other packets - :: - %- %+ trace rcv.veb |. - =/ data - :* seq=seq fragment-num - num-fragments closing=closing - == - "send ack-1 {}" - (give %send seq %& fragment-num) - :: last-heard (gth num-fragments.u.existing fragment-num) - ?> =(num-fragments.u.existing num-fragments) - :: - u.existing - :: - =/ already-heard-fragment=? - (~(has by fragments.partial-rcv-message) fragment-num) - :: ack dupes except for the last fragment, in which case drop - :: - ?: already-heard-fragment - ?: is-last-fragment - %- %+ trace rcv.veb |. - =/ data - [her.channel seq=seq lh=last-heard.state la=last-acked.state] - "hear last dupe {}" - message-sink - %- %+ trace rcv.veb - |.("send dupe ack {}") - (give %send seq %& fragment-num) - :: new fragment; store in state and check if message is done - :: - =. num-received.partial-rcv-message - +(num-received.partial-rcv-message) - :: - =. fragments.partial-rcv-message - (~(put by fragments.partial-rcv-message) fragment-num fragment) - :: - =. live-messages.state - (~(put by live-messages.state) seq partial-rcv-message) - :: ack any packet other than the last one, and continue either way - :: - =? message-sink !is-last-fragment - %- %+ trace rcv.veb |. - =/ data - [seq=seq fragment-num num-fragments] - "send ack-2 {}" - (give %send seq %& fragment-num) - :: enqueue all completed messages starting at +(last-heard.state) - :: - |- ^+ message-sink - :: if this is not the next message to ack, we're done - :: - ?. =(seq +(last-heard.state)) - message-sink - :: if we haven't heard anything from this message, we're done - :: - ?~ live=(~(get by live-messages.state) seq) - message-sink - :: if the message isn't done yet, we're done - :: - ?. =(num-received num-fragments):u.live - message-sink - :: we have whole message; update state, assemble, and send to vane - :: - =. last-heard.state +(last-heard.state) - =. live-messages.state (~(del by live-messages.state) seq) - :: - %- %+ trace msg.veb - |.("hear {} {} {}kb") - =/ message=* (assemble-fragments [num-fragments fragments]:u.live) - =. message-sink (enqueue-to-vane seq message) - :: - $(seq +(seq)) - :: +enqueue-to-vane: enqueue message to be sent to local vane - :: - ++ enqueue-to-vane - |= [seq=message-num message=*] - ^+ message-sink - :: - =/ empty=? =(~ pending-vane-ack.state) - =. pending-vane-ack.state (~(put to pending-vane-ack.state) seq message) - ?. empty - message-sink - (give %memo seq message) - :: +on-done: handle confirmation of message processing from vane - :: - ++ on-done - |= [ok=? cork=?] - ^+ message-sink - :: - =^ pending pending-vane-ack.state ~(get to pending-vane-ack.state) - =/ =message-num message-num.p.pending - :: - =. last-acked.state +(last-acked.state) - =? nax.state !ok (~(put in nax.state) message-num) - :: - =. message-sink (give %send message-num %| ok lag=`@dr`0) - =? message-sink cork (give %cork ~) - =/ next ~(top to pending-vane-ack.state) - ?~ next - message-sink - (give %memo u.next) - :: +on-drop: drop .message-num from our .nax state - :: - ++ on-drop - |= =message-num - ^+ message-sink - :: - =. nax.state (~(del in nax.state) message-num) - :: - message-sink - -- - - -- -:: => :: |ev: inner event-handling core :: ~% %per-event ..decode-packet ~ @@ -1467,24 +1262,25 @@ |.("parsing old wire: {(spud wire)}") peer-core =< abet - ?~ error - (send-ack bone) - (send-nack bone u.error) + ?~(error (send-ack bone) (send-nack bone u.error)) :: :: if processing succeded, send positive ack packet and exit :: ++ send-ack |= =bone ^+ peer-core - =/ cork=? (~(has in closing.peer-state) bone) - (run-message-sink:peer-core bone %done ok=%.y cork) + =+ sink-core=(mi:peer-core bone *message-sink-state) + :: handle cork only deals with bones that are in closing + :: + (handle-cork:abet:(call:abed:sink-core %done ok=%.y) bone) :: failed; send message nack packet :: ++ send-nack |= [=bone =^error] ^+ peer-core - =. event-core - abet:(run-message-sink:peer-core bone %done ok=%.n cork=%.n) + =/ sink-core (mi:peer-core bone *message-sink-state) + =. peer-core abet:(call:abed:sink-core %done ok=%.n) + =. event-core abet:peer-core =/ =^peer-state (got-peer-state her) =/ =^channel [[our her] now channel-state -.peer-state] :: construct nack-trace message, referencing .failed $message-num @@ -1686,7 +1482,7 @@ ^+ event-core %- %^ ev-trace for.veb sndr.packet |.("forward: {} -> {}") - :: set .origin.packet if it doesn't already have one, re-encode, and send + :: set .origin.packet if it doesn't have one, re-encode, and send :: =? origin.packet &(?=(~ origin.packet) !=(%czar (clan:title sndr.packet))) @@ -2166,16 +1962,16 @@ :: (on-publ-full (my [ship point]~)) :: - =/ =peer-state +.u.ship-state - :: - =/ =private-key sec:ex:crypto-core.ames-state + =/ =peer-state +.u.ship-state + =/ =private-key sec:ex:crypto-core.ames-state =. symmetric-key.peer-state (derive-symmetric-key public-key private-key) :: - =. life.peer-state life - =. public-key.peer-state public-key + =. life.peer-state life + =. public-key.peer-state public-key :: - =. peers.ames-state (~(put by peers.ames-state) ship %known peer-state) + =. peers.ames-state + (~(put by peers.ames-state) ship %known peer-state) event-core :: +on-publ-sponsor: handle new or lost sponsor for peer :: @@ -2267,9 +2063,10 @@ :: ignore aliens :: event-core - =/ =peer-state +.u.ship-state - =. rift.peer-state rift - =. peers.ames-state (~(put by peers.ames-state) ship %known peer-state) + =/ =peer-state +.u.ship-state + =. rift.peer-state rift + =. peers.ames-state + (~(put by peers.ames-state) ship %known peer-state) event-core :: ++ insert-peer-state @@ -2490,6 +2287,7 @@ ++ pe |= [=peer-state =channel] =* veb veb.bug.channel + =* her her.channel |% :: +| %helpers @@ -2498,31 +2296,33 @@ ++ pe-emit |=(move peer-core(event-core (emit +<))) ++ abet ^+ event-core - :: =. peers.ames-state - (~(put by peers.ames-state) her.channel %known peer-state) - :: + (~(put by peers.ames-state) her %known peer-state) event-core :: ++ pe-trace |= [verb=? print=(trap tape)] ^+ same - (ev-trace verb her.channel print) + (ev-trace verb her print) :: :: +got-duct: look up $duct by .bone, asserting already bound :: ++ got-duct |= =bone ^- ^duct - ~| %dangling-bone^her.channel^bone + ~| %dangling-bone^her^bone (~(got by by-bone.ossuary.peer-state) bone) :: ++ pump-core |=(=bone (mu bone *message-pump-state)) + ++ sink-core |=(=bone (mi bone *message-sink-state)) :: +| %tasks :: - ++ on-heed peer-core(heeds.peer-state (~(put in heeds.peer-state) duct)) - ++ on-jilt peer-core(heeds.peer-state (~(del in heeds.peer-state) duct)) + ++ on-heed + peer-core(heeds.peer-state (~(put in heeds.peer-state) duct)) + :: + ++ on-jilt + peer-core(heeds.peer-state (~(del in heeds.peer-state) duct)) :: +update-qos: update and maybe print connection status :: ++ update-qos @@ -2533,8 +2333,7 @@ :: if no update worth reporting, we're done :: =/ text - %^ qos-update-text her.channel old-qos - [new-qos kay.veb ships.bug.ames-state] + (qos-update-text her old-qos [new-qos kay.veb ships.bug.ames-state]) ?~ text peer-core :: print message @@ -2561,8 +2360,8 @@ %. ~ %- slog :_ tang.u.dud - leaf+"ames: {} fragment crashed {}" - (run-message-sink bone %hear lane shut-packet ?=(~ dud)) + leaf+"ames: {} fragment crashed {}" + abet:(call:abed:(sink-core bone) %hear lane shut-packet ?=(~ dud)) :: benign ack on corked bone :: ?: (~(has in corked.peer-state) bone) @@ -2574,7 +2373,7 @@ :: =+ ?~ dud ~ %. ~ - %+ slog leaf+"ames: {} ack crashed {}" + %+ slog leaf+"ames: {} ack crashed {}" ?. msg.veb ~ :- >[bone=bone message-num=message-num meat=meat]:shut-packet< tang.u.dud @@ -2618,15 +2417,15 @@ :: ?~ message-pump-state=(~(get by snd.peer-state) bone) peer-core - ?~ next-wake.packet-pump-state.u.message-pump-state - peer-core + =* packet-state packet-pump-state.u.message-pump-state + ?~ next-wake.packet-state peer-core :: If we crashed because we woke up too early, assume another :: timer is already set. :: - ?: (lth now.channel u.next-wake.packet-pump-state.u.message-pump-state) + ?: (lth now.channel u.next-wake.packet-state) peer-core :: - =/ =wire (make-pump-timer-wire her.channel bone) + =/ =wire (make-pump-timer-wire her bone) (pe-emit duct %pass wire %b %wait (add now.channel ~s30)) :: update and print connection state :: @@ -2650,7 +2449,7 @@ ?& ?=(%dead -.qos.peer-state) ?=(^ route.peer-state) direct.u.route.peer-state - !=(%czar (clan:title her.channel)) + !=(%czar (clan:title her)) == route.peer-state(direct.u %.n) :: resend comet attestation packet if first message times out @@ -2666,7 +2465,7 @@ ?& ?=(%pawn (clan:title our)) =(1 current:(~(got by snd.peer-state) bone)) == - (send-blob | her.channel (attestation-packet [her her-life]:channel)) + (send-blob | her (attestation-packet [her her-life]:channel)) ?: (~(has in corked.peer-state) bone) :: if the bone was corked the flow doesn't exist anymore :: TODO: clean up corked bones in the peer state when it's _safe_? @@ -2696,8 +2495,8 @@ ~(tap by snd.ship-state.i.peers-l) |- ^+ message-blob =* bone-loop $ - ?~ snd-l - peer-loop(peers-l t.peers-l) + ?~ snd-l peer-loop(peers-l t.peers-l) + =* unsent-fragments unsent-fragments.message-pump-state.i.snd-l =/ blob-l=(list ^message-blob) ~(tap to unsent-messages.message-pump-state.i.snd-l) |- ^+ message-blob @@ -2706,12 +2505,10 @@ ?: =(i.blob-l message-blob) i.blob-l blob-loop(blob-l t.blob-l) - ?~ unsent-fragments.message-pump-state.i.snd-l - bone-loop(snd-l t.snd-l) - ?: =(message-blob fragment.i.unsent-fragments.message-pump-state.i.snd-l) - `@`fragment.i.unsent-fragments.message-pump-state.i.snd-l + ?~ unsent-fragments bone-loop(snd-l t.snd-l) + ?: =(message-blob fragment.i.unsent-fragments) + `@`fragment.i.unsent-fragments bone-loop(snd-l t.snd-l) - :: +check-clog: notify clients if peer has stopped responding :: ++ check-clog @@ -2726,45 +2523,44 @@ ?: =(0 (end 0 bone)) ~ `u=message-pump-state - :: - =/ clogged=? - |^ &(nuf-messages nuf-memory) - :: +nuf-messages: are there enough messages to mark as clogged? - :: - ++ nuf-messages - =| num=@ud - |- ^- ? - ?~ pumps | - =. num - ;: add num - (sub [next current]:i.pumps) - ~(wyt in unsent-messages.i.pumps) - == - ?: (gte num msg.cong.ames-state) - & - $(pumps t.pumps) - :: +nuf-memory: is enough memory used to mark as clogged? - :: - ++ nuf-memory - =| mem=@ud - |- ^- ? - ?~ pumps | - =. mem - %+ add - %- ~(rep in unsent-messages.i.pumps) - |=([a=@ b=_mem] (add b (met 3 a))) - ?~ unsent-fragments.i.pumps 0 - (met 3 fragment.i.unsent-fragments.i.pumps) - ?: (gte mem mem.cong.ames-state) - & - $(pumps t.pumps) - -- :: if clogged, notify client vane :: - ?. clogged + =; clogged=? + =? peer-core clogged + %+ roll ~(tap in heeds.peer-state) + |=([d=^duct core=_peer-core] (pe-emit:core d %give %clog her)) peer-core - %+ roll ~(tap in heeds.peer-state) - |=([d=^duct core=_peer-core] (pe-emit:core d %give %clog her.channel)) + |^ &(nuf-messages nuf-memory) + :: +nuf-messages: are there enough messages to mark as clogged? + :: + ++ nuf-messages + =| num=@ud + |- ^- ? + ?~ pumps | + =. num + ;: add num + (sub [next current]:i.pumps) + ~(wyt in unsent-messages.i.pumps) + == + ?: (gte num msg.cong.ames-state) + & + $(pumps t.pumps) + :: +nuf-memory: is enough memory used to mark as clogged? + :: + ++ nuf-memory + =| mem=@ud + |- ^- ? + ?~ pumps | + =. mem + %+ add + %- ~(rep in unsent-messages.i.pumps) + |=([a=@ b=_mem] (add b (met 3 a))) + ?~ unsent-fragments.i.pumps 0 + (met 3 fragment.i.unsent-fragments.i.pumps) + ?: (gte mem mem.cong.ames-state) + & + $(pumps t.pumps) + -- :: +send-shut-packet: fire encrypted packet at rcvr and maybe sponsors :: ++ send-shut-packet @@ -2777,12 +2573,12 @@ :: here. :: =. event-core - %^ send-blob | her.channel + %^ send-blob | her %- encode-packet %: encode-shut-packet shut-packet(bone (mix 1 bone.shut-packet)) symmetric-key.channel - our her.channel + our her our-life.channel her-life.channel == peer-core @@ -2802,15 +2598,46 @@ =(~ unsent-fragments.pum) =(~ live.packet-pump-state.pum) == - ~> %slog.0^leaf/"ames: bad pump state {}" + ~> %slog.0^leaf/"ames: bad pump state {}" $(boz t.boz) :: no outstanding messages, so send a new %cork :: :: TODO use +trace - ~> %slog.0^leaf/"ames: recork {}" + ~> %slog.0^leaf/"ames: recork {}" =/ =plea [%$ /flow [%cork ~]] (on-memo i.boz plea %plea) :: + :: +handle-cork: handle flow kill after server ames has taken %done + :: + ++ handle-cork + |= =bone + ^+ peer-core + ?. (~(has in closing.peer-state) bone) peer-core + =/ =message-pump-state + (~(gut by snd.peer-state) bone *message-pump-state) + =? peer-core ?=(^ next-wake.packet-pump-state.message-pump-state) + =* next-wake u.next-wake.packet-pump-state.message-pump-state + =/ =wire (make-pump-timer-wire her bone) + :: resetting timer for boons + :: + (pe-emit [/ames]~ %pass wire %b %rest next-wake) + =/ nax-bone=^bone (mix 0b10 bone) + =? peer-core (~(has by snd.peer-state) nax-bone) + %. peer-core + %+ pe-trace odd.veb + |.("remove naxplanation flow {<[her bone=nax-bone]>}") + =. peer-state + =, peer-state + %_ peer-state + :: preemptively delete nax flows (e.g. nacks for %watches) + :: + snd (~(del by (~(del by snd) bone)) nax-bone) + rcv (~(del by rcv) bone) + corked (~(put in corked) bone) + closing (~(del in closing) bone) + krocs (~(del in krocs) bone) + == + peer-core +| %internals :: +mu: constructor for |message-pump :: @@ -2835,7 +2662,7 @@ ++ mu-trace |= [verb=? print=(trap tape)] ^+ same - (trace verb her.channel ships.bug.channel print) + (trace verb her ships.bug.channel print) :: ++ closing (~(has in closing.peer-state) bone) ++ corked (~(has in corked.peer-state) bone) @@ -2900,14 +2727,14 @@ =(1 ~(wyt by live.packet-pump-state.state)) =(message-num:task message-num.key.u.top) == - =* ack p.ack-meat.task + =+ [ack msg]=[p.ack-meat message-num]:task =. pump %- on-done - [[message-num:task ?:(ok.ack [%ok ~] [%nack ~])] cork] + [[msg ?:(ok.ack [%ok ~] [%nack ~])] cork] ?. &(!ok.ack cork) pump %. pump - %+ mu-trace odd.veb |. - "got nack for %cork {}" + %+ mu-trace odd.veb + |.("got nack for %cork {}") == == :: +on-memo: handle request to send a message :: @@ -2951,9 +2778,9 @@ :: ignore duplicate message acks :: ?: (lth message-num current.state) - %- %+ mu-trace snd.veb - |.("duplicate done {}") - pump + %. pump + %+ mu-trace snd.veb |. + "duplicate done {}" :: ignore duplicate and future acks :: ?. (is-message-num-in-range message-num) @@ -2963,7 +2790,7 @@ =? unsent-fragments.state &(=(current next) ?=(^ unsent-fragments)):state :: - ~> %slog.0^leaf/"ames: early message ack {}" + ~> %slog.0^leaf/"ames: early message ack {}" ~ :: clear all packets from this message from the packet pump :: @@ -3040,7 +2867,8 @@ ?~(unsent feed-packets pump) :: .unsent-messages is nonempty; pop a message off and feed it :: - =^ =message-blob unsent-messages.state ~(get to unsent-messages.state) + =^ =message-blob unsent-messages.state + ~(get to unsent-messages.state) :: break .message into .chunks and set as .unsent-fragments :: =. unsent-fragments.state (split-message next.state message-blob) @@ -3058,10 +2886,9 @@ (~(has in corked.peer-state) (mix 0b10 bone)) == %- %+ pe-trace msg.veb - =/ dat [her.channel bone=bone message-num=message-num -.task] + =/ dat [her bone=bone message-num=message-num -.task] |.("remove naxplanation flow {}") - :: XX FIXME this happens before abet, so we'd put the bone in again - :: we check for this case in abet:mu; test that it works + :: XX we avoid re-adding the bone in abet:mu; test that it works :: =. snd.peer-state (~(del by snd.peer-state) bone) peer-core @@ -3074,12 +2901,11 @@ :: nack-trace bone; assume .ok, clear nack from |message-sink :: =/ target-bone=^bone (mix 0b10 bone) - :: XX refactor - ::abet:(call:abed:(mi target-bone *message-sink-state) %drop message-num) - (run-message-sink target-bone %drop message-num) + =< abet + (call:abed:(mi target-bone *message-sink-state) %drop message-num) ?: &(closing ?=(%near -.task)) - :: if the bone belongs to a closing flow and we got a naxplanation, - :: don't relay the ack to the client vane, and wait for the next try + :: if the bone belongs to a closing flow and we got a + :: naxplanation, don't relay ack to the client vane :: peer-core :: not a nack-trace bone; relay ack to client vane @@ -3094,10 +2920,10 @@ :: clear all packets from this message from the packet pump :: =. pump abet:(call:packet-pump %done message-num lag=*@dr) - =/ =wire (make-pump-timer-wire her.channel bone) + =/ =wire (make-pump-timer-wire her bone) =/ nack-bone=^bone (mix 0b10 bone) =? rcv.peer-state (~(has by rcv.peer-state) nack-bone) - :: if the publisher was behind we remove nacks received on that bone + :: if the publisher was behind we remove nacks on that bone :: (~(del by rcv.peer-state) nack-bone) =. peer-state @@ -3121,14 +2947,14 @@ |= date=@da ^+ peer-core %+ pe-emit ~[/ames] - [%pass (make-pump-timer-wire her.channel bone) %b %wait date] + [%pass (make-pump-timer-wire her bone) %b %wait date] :: +pump-rest: relay |message-pump's unset-timer request :: ++ pump-rest |= date=@da ^+ peer-core %+ pe-emit ~[/ames] - [%pass (make-pump-timer-wire her.channel bone) %b %rest date] + [%pass (make-pump-timer-wire her bone) %b %rest date] :: +pu: construct |packet-pump core :: ++ pu @@ -3138,17 +2964,17 @@ |% +| %helpers ++ pack . - :: XX +abut: abet with gifts + :: +abut: abet with gifts :: ++ abut [unsent abet] ++ abet pump(packet-pump-state.state state) ++ pu-trace |= [verb=? print=(trap tape)] ^+ same - (trace verb her.channel ships.bug.channel print) + (trace verb her ships.bug.channel print) :: ++ pu-emit |=(=note (pe-emit pump-duct %pass pump-wire note)) - :: +packet-queue: type for all sent fragments, ordered by sequence number + :: +packet-queue: type for all sent fragments (order: seq number) :: ++ packet-queue %- (ordered-map live-packet-key live-packet-val) @@ -3168,7 +2994,7 @@ ^- static-fragment [message-num num-fragments fragment-num fragment] :: - ++ pump-wire (make-pump-timer-wire her.channel bone) + ++ pump-wire (make-pump-timer-wire her bone) ++ pump-duct ~[/ames] ++ top-live (pry:packet-queue live.state) :: @@ -3177,7 +3003,6 @@ ++ call |= task=packet-pump-task ^+ pack - :: ?- -.task %hear (on-hear [message-num fragment-num]:task) %done (on-done message-num.task) @@ -3210,7 +3035,7 @@ [num-fragments fragment] :: update .live and .metrics :: - =. live.state (gas:packet-queue live.state send-list) + =. live.state (gas:packet-queue live.state send-list) :: TMI :: => .(sent `(list static-fragment)`sent) @@ -3230,7 +3055,8 @@ ?: =(~ next-wake.state) pack :: - =. metrics.state %*(. *pump-metrics counter counter.metrics.state) + =. metrics.state + %*(. *pump-metrics counter counter.metrics.state) =. live.state %+ run:packet-queue live.state |=(p=live-packet-val p(- *packet-state)) @@ -3242,7 +3068,8 @@ ?: =(~ liv) pack =^ hed liv (pop:packet-queue liv) =. peer-core - (send-shut-packet bone [message-num %& +]:(to-static-fragment hed)) + %+ send-shut-packet bone + [message-num %& +]:(to-static-fragment hed) $(sot (dec sot)) :: +on-wake: handle packet timeout :: @@ -3256,20 +3083,18 @@ :: tell congestion control a packet timed out :: =. metrics.state on-timeout:gauge + =| acc=(unit static-fragment) :: re-send first packet and update its state in-place :: - =- =* res - - =. live.state live.res + =; [static-fragment=_acc live=_live.state] + =. live.state live =? peer-core ?=(^ static-fragment) %- %+ pu-trace snd.veb - =/ nums [message-num fragment-num]:u.static-fragment.res + =/ nums [message-num fragment-num]:u.static-fragment |.("dead {}") (send-shut-packet bone [message-num %& +]:u.static-fragment) pack :: - =| acc=(unit static-fragment) - ^+ [static-fragment=acc live=live.state] - :: %^ (dip:packet-queue _acc) live.state acc |= $: acc=_acc key=live-packet-key @@ -3279,9 +3104,9 @@ :: if already acked later message, don't resend :: ?: (lth message-num.key current) - %- %- slog :_ ~ - leaf+"ames: strange wake queue, expected {}, got {}" - [~ stop=%.n ~] + %. [~ stop=%.n ~] + %- slog :_ ~ :- %leaf + "ames: strange wake queue, expected {}, got {}" :: packet has expired; update it in-place, stop, and produce it :: =. last-sent.val now.channel @@ -3290,8 +3115,8 @@ [`val stop=%.y `(to-static-fragment key val)] :: +fast-resend-after-ack: resend timed out packets :: - :: After we finally receive an ack, we want to resend all the live - :: packets that have been building up. + :: After we finally receive an ack, we want to resend all the + :: live packets that have been building up. :: ++ fast-resend-after-ack |= [=message-num =fragment-num] @@ -3325,13 +3150,14 @@ :: +on-hear: handle ack on a live packet :: :: If the packet was in our queue, delete it and update our - :: metrics, possibly re-sending skipped packets. Otherwise, no-op. + :: metrics, possibly re-sending skipped packets. Otherwise, no-op :: ++ on-hear |= [=message-num =fragment-num] ^+ pack :: - =- :: if no sent packet matches the ack, don't apply mutations or effects + =- :: if no sent packet matches the ack, + :: don't apply mutations or effects :: ?. found.- %- (pu-trace snd.veb |.("miss {}")) @@ -3343,7 +3169,8 @@ =(0 (mod counter.metrics.state 20)) == same - (pu-trace snd.veb |.("send: {}")) + %+ pu-trace snd.veb + |.("send: {}") :: .resends is backward, so fold backward and emit :: =. peer-core @@ -3382,7 +3209,7 @@ :: stop, nothing more to do :: [new-val=`val stop=%.y acc] - :: ack was on later packet; mark skipped, tell gauge, and continue + :: ack was on later packet; mark skipped, tell gauge, & continue :: =. skips.val +(skips.val) =^ resend metrics.acc (on-skipped-packet:gauge -.val) @@ -3402,8 +3229,8 @@ =- =. metrics.state metrics.- =. live.state live.- :: - %- (pu-trace snd.veb |.("done {}")) - (fast-resend-after-ack message-num `fragment-num`0) + %. (fast-resend-after-ack message-num `fragment-num`0) + (pu-trace snd.veb |.("done {}")) :: ^+ [metrics=metrics.state live=live.state] :: @@ -3458,7 +3285,7 @@ :: ++ ga |= pump-metrics - =* ship her.channel + =* ship her =* now now.channel =* metrics +< |% @@ -3471,8 +3298,8 @@ :: +next-expiry: when should a newly sent fresh packet time out? :: :: Use rtt + 4*sigma, where sigma is the mean deviation of rtt. - :: This should make it unlikely that a packet would time out from a - :: delay, as opposed to an actual packet loss. + :: This should make it unlikely that a packet would time out + :: from a delay, as opposed to an actual packet loss. :: ++ next-expiry |= [live-packet-key live-packet-val] @@ -3497,8 +3324,8 @@ (lth cwnd ssthresh) :: +in-recovery: %.y iff we're recovering from a skipped packet :: - :: We finish recovering when .live-packets finally dips back down to - :: .cwnd. + :: We finish recovering when .live-packets finally dips back + :: down to .cwnd. :: ++ in-recovery ^- ? @@ -3531,7 +3358,7 @@ ^- pump-metrics :: =. counter +(counter) - :: if below congestion threshold, add 1; else, add avg. 1 / cwnd + :: if below congestion threshold, add 1; else, add avg 1 / cwnd :: =. cwnd ?: in-slow-start @@ -3541,10 +3368,10 @@ :: ?. =(0 retries.packet-state) metrics - :: rtt-datum: new rtt measurement based on this packet roundtrip + :: rtt-datum: new rtt measurement based on packet roundtrip :: =/ rtt-datum=@dr (sub-safe now last-sent.packet-state) - :: rtt-error: difference between this rtt measurement and expected + :: rtt-error: difference between this measurement and expected :: =/ rtt-error=@dr ?: (gte rtt-datum rtt) @@ -3552,13 +3379,13 @@ (sub rtt rtt-datum) :: exponential weighting ratio for .rtt and .rttvar :: - %- %+ ga-trace ges.veb - |.("ack update {}") =. rtt (div (add rtt-datum (mul rtt 7)) 8) =. rttvar (div (add rtt-error (mul rttvar 7)) 8) =. rto (clamp-rto (add rtt (mul 4 rttvar))) :: - metrics + %. metrics + %+ ga-trace ges.veb |. + "ack update {}" :: +on-skipped-packet: handle misordered ack :: ++ on-skipped-packet @@ -3586,205 +3413,313 @@ -- -- -- - :: +run-message-sink: process $message-sink-task and its effects + :: +mi: construct |message-sink message receiver core :: - ++ run-message-sink - |= [=bone task=message-sink-task] - ^+ peer-core - ?: (~(has in corked.peer-state) bone) peer-core - :: pass .task to the |message-sink and apply state mutations + ++ mi + |= [=bone state=message-sink-state] + |% :: - =/ =message-sink-state - (~(gut by rcv.peer-state) bone *message-sink-state) + +| %helpers :: - =/ message-sink (make-message-sink message-sink-state channel) - =/ closing=? (~(has in closing.peer-state) bone) - =^ sink-gifts message-sink-state (work:message-sink closing task) - =. rcv.peer-state (~(put by rcv.peer-state) bone message-sink-state) - :: process effects from |message-sink + ++ sink . + ++ abed + sink(state (~(gut by rcv.peer-state) bone *message-sink-state)) :: - |^ ^+ peer-core - ?~ sink-gifts peer-core - =* gift i.sink-gifts - =. peer-core - ?- -.gift - %memo (on-sink-memo [message-num message]:gift) - %send (on-sink-send [message-num ack-meat]:gift) - %cork on-sink-cork - == - $(sink-gifts t.sink-gifts) - :: +on-sink-cork: handle flow kill after server ames has taken %done + ++ abet + peer-core(rcv.peer-state (~(put by rcv.peer-state) bone state)) :: - ++ on-sink-cork - ^+ peer-core - =/ =message-pump-state - (~(gut by snd.peer-state) bone *message-pump-state) - =? peer-core ?=(^ next-wake.packet-pump-state.message-pump-state) - =* next-wake u.next-wake.packet-pump-state.message-pump-state - =/ =wire (make-pump-timer-wire her.channel bone) - :: resetting timer for boons - :: - (pe-emit [/ames]~ %pass wire %b %rest next-wake) - =/ nax-bone=^bone (mix 0b10 bone) - =? peer-core (~(has by snd.peer-state) nax-bone) - %. peer-core - %+ pe-trace odd.veb - =/ dat [her.channel bone=nax-bone message-num=message-num -.task] - |.("remove naxplanation flow {}") - =. peer-state - =, peer-state - %_ peer-state - :: preemptively delete nax flows (e.g. nacks for initial %watches) + ++ mi-trace + |= [verb=? print=(trap tape)] + ^+ same + (trace verb her ships.bug.channel print) + :: + ++ closing (~(has in closing.peer-state) bone) + ++ corked (~(has in corked.peer-state) bone) + ++ pump-core |=(=^bone (mu bone *message-pump-state)) + :: + +| %entry-points + :: +call: handle a $message-sink-task + :: + ++ call + |= task=message-sink-task + ^+ sink + ?: corked sink + ?- -.task + %drop sink(nax.state (~(del in nax.state) message-num.task)) + %done (done ok.task) + %hear (hear [lane shut-packet ok]:task) + == + :: + +| %tasks + :: +hear: receive message fragment, possibly completing message + :: + ++ hear + |= [=lane =shut-packet ok=?] + ^+ sink + :: we know this is a fragment, not an ack; expose into namespace + :: + ?> ?=(%& -.meat.shut-packet) + =+ [num-fragments fragment-num fragment]=+.meat.shut-packet + :: seq: message sequence number, for convenience + :: + =/ seq message-num.shut-packet + :: ignore messages from far future; limit to 10 in progress + :: + ?: (gte seq (add 10 last-acked.state)) + %- %+ mi-trace odd.veb + |.("future %hear {}") + sink + :: + =/ is-last-fragment=? =(+(fragment-num) num-fragments) + :: always ack a dupe! + :: + ?: (lte seq last-acked.state) + ?. is-last-fragment + :: single packet ack :: - snd (~(del by (~(del by snd) bone)) nax-bone) - rcv (~(del by rcv) bone) - corked (~(put in corked) bone) - closing (~(del in closing) bone) - krocs (~(del in krocs) bone) - == - peer-core - :: +on-sink-send: emit ack packet as requested by |message-sink + =. peer-core (send-shut-packet bone seq %| %& fragment-num) + %. sink + %+ mi-trace rcv.veb + |.("send dupe ack {}") + :: whole message (n)ack + :: + =/ ok=? !(~(has in nax.state) seq) + =. peer-core (send-shut-packet bone seq %| %| ok lag=`@dr`0) + %. sink + %+ mi-trace rcv.veb + |.("send dupe message ack {} ok={}") + :: last-acked}" + sink + :: ack all other packets + :: + =. peer-core (send-shut-packet bone seq %| %& fragment-num) + %- %+ mi-trace rcv.veb |. + =/ data + :* seq=seq fragment-num=fragment-num + num-fragments=num-fragments closing=closing + == + "send ack-1 {}" + sink + :: last-heard (gth num-fragments.u.existing fragment-num) + ?> =(num-fragments.u.existing num-fragments) + :: + u.existing + :: + =/ already-heard-fragment=? + (~(has by fragments.partial-rcv-message) fragment-num) + :: ack dupes except for the last fragment, in which case drop + :: + ?: already-heard-fragment + ?: is-last-fragment + %- %+ mi-trace rcv.veb |. + =/ data + [her seq=seq lh=last-heard.state la=last-acked.state] + "hear last dupe {}" + sink + =. peer-core (send-shut-packet bone seq %| %& fragment-num) + %. sink + %+ mi-trace rcv.veb + |.("send dupe ack {}") + :: new fragment; store in state and check if message is done + :: + =. num-received.partial-rcv-message + +(num-received.partial-rcv-message) + :: + =. fragments.partial-rcv-message + (~(put by fragments.partial-rcv-message) fragment-num fragment) + :: + =. live-messages.state + (~(put by live-messages.state) seq partial-rcv-message) + :: ack any packet other than the last one, and continue either way + :: + =? peer-core !is-last-fragment + %- %+ mi-trace rcv.veb |. + =/ data + [seq=seq fragment-num=fragment-num fragments=num-fragments] + "send ack-2 {}" + (send-shut-packet bone seq %| %& fragment-num) + :: enqueue all completed messages starting at +(last-heard.state) + :: + |- ^+ sink + :: if this is not the next message to ack, we're done + :: + ?. =(seq +(last-heard.state)) + sink + :: if we haven't heard anything from this message, we're done + :: + ?~ live=(~(get by live-messages.state) seq) + sink + :: if the message isn't done yet, we're done + :: + ?. =(num-received num-fragments):u.live + sink + :: we have whole message; update state, assemble, and send to vane + :: + =. last-heard.state +(last-heard.state) + =. live-messages.state (~(del by live-messages.state) seq) + :: + %- %+ mi-trace msg.veb + |.("hear {} {} {}kb") + =/ message=* (assemble-fragments [num-fragments fragments]:u.live) + =/ empty=? =(~ pending-vane-ack.state) + :: enqueue message to be sent to local vane + :: + =. pending-vane-ack.state + (~(put to pending-vane-ack.state) seq message) + :: + =? sink empty (handle-sink seq message ok) + :: + $(seq +(seq)) + :: +done: handle confirmation of message processing from vane :: - ++ on-sink-send - |=([num=message-num ack=ack-meat] (send-shut-packet bone num %| ack)) - :: +on-sink-memo: dispatch message received by |message-sink + ++ done + |= ok=? + ^+ sink + :: + =^ pending pending-vane-ack.state ~(get to pending-vane-ack.state) + =/ =message-num message-num.p.pending + :: + =. last-acked.state +(last-acked.state) + =? nax.state !ok (~(put in nax.state) message-num) + :: + =. peer-core (send-shut-packet bone message-num %| %| ok lag=`@dr`0) + ?~ next=~(top to pending-vane-ack.state) + sink + (handle-sink message-num.u.next message.u.next ok) + :: + +| %implementation + :: +handle-sink: dispatch message :: :: odd bone: %plea request message :: even bone, 0 second bit: %boon response message :: even bone, 1 second bit: nack-trace %boon message :: - ++ on-sink-memo - ?: =(1 (end 0 bone)) - on-sink-plea - ?: =(0 (end 0 (rsh 0 bone))) - on-sink-boon - on-sink-nack-trace - :: +on-sink-boon: handle response message received by |message-sink - :: - :: .bone must be mapped in .ossuary.peer-state, or we crash. - :: This means a malformed message will kill a flow. We - :: could change this to a no-op if we had some sort of security - :: reporting. - :: - :: Note that if we had several consecutive packets in the queue - :: and crashed while processing any of them, the %hole card - :: will turn *all* of them into losts/nacks. - :: - :: TODO: This handles a previous crash in the client vane, but not in - :: Ames itself. - :: - ++ on-sink-boon - |= [=message-num message=*] - ^+ peer-core - ?: ?| (~(has in closing.peer-state) bone) - (~(has in corked.peer-state) bone) - == - peer-core - :: send ack unconditionally - :: - =. peer-core (pe-emit (got-duct bone) %give %boon message) - =. peer-core (run-message-sink bone %done ok=%.y cork=%.n) - :: - ?. ?=([%hear * * ok=%.n] task) - :: fresh boon; give message to client vane + ++ handle-sink + |= [=message-num message=* ok=?] + |^ ^+ sink + ?: =(1 (end 0 bone)) sink-plea + ?: =(0 (end 0 (rsh 0 bone))) sink-boon + sink-nack + :: XX FIXME: impure +abet pattern + ++ sink-plea + ^+ sink + ?: |(closing corked) sink + %- %+ mi-trace msg.veb + =/ dat [her bone=bone message-num=message-num] + |.("sink plea {}") + =; pe=_peer-core + =. peer-core pe + =? sink !ok (call %done ok=%.n) + sink + ?. ok + :: send nack-trace with blank .error for security + :: + =/ nack-bone=^bone (mix 0b10 bone) + =/ =message-blob (jam [message-num *error]) + abet:(call:abed:(pump-core nack-bone) %memo message-blob) + =+ ;; =plea message + =/ =wire (make-bone-wire her her-rift.channel bone) :: - %- %+ pe-trace msg.veb - =/ dat [her.channel bone=bone message-num=message-num -.task] - |.("sink boon {}") - peer-core - :: we previously crashed on this message; notify client vane + ?. =(vane.plea %$) + ?+ vane.plea ~| %ames-evil-vane^our^her^vane.plea !! + %c (pe-emit duct %pass wire %c %plea her plea) + %g (pe-emit duct %pass wire %g %plea her plea) + %j (pe-emit duct %pass wire %j %plea her plea) + == + :: a %cork plea is handled using %$ as the recipient vane to + :: account for publishers that still handle ames-to-ames %pleas + :: + ?> &(?=([%cork *] payload.plea) ?=(%flow -.path.plea)) + :: XX FIXME impure +abet pattern... + :: + =. closing.peer-state (~(put in closing.peer-state) bone) + (pe-emit duct %pass wire %a %plea her [%a /close ~]) :: - %- %+ pe-trace msg.veb - =/ dat [her.channel bone=bone message-num=message-num -.task] - |.("crashed on sink boon {}") - boon-to-lost - :: +boon-to-lost: convert all boons to losts - :: - ++ boon-to-lost - ^+ peer-core - =. moves - %+ turn moves - |= =move - ?. ?=([* %give %boon *] move) - move - [duct.move %give %lost ~] - peer-core - :: +on-sink-nack-trace: handle nack-trace received by |message-sink - :: - ++ on-sink-nack-trace - |= [=message-num message=*] - ^+ peer-core - %- %+ pe-trace msg.veb - =/ dat [her.channel bone=bone message-num=message-num] - |.("sink naxplanation {}") + :: +sink-boon: handle response message, acking unconditionally :: - =+ ;; =naxplanation message - :: ack nack-trace message (only applied if we don't later crash) + :: .bone must be mapped in .ossuary.peer-state, or we crash. + :: This means a malformed message will kill a flow. We + :: could change this to a no-op if we had some sort of security + :: reporting. :: - =. peer-core (run-message-sink bone %done ok=%.y cork=%.n) - :: flip .bone's second bit to find referenced flow + :: Note that if we had several consecutive packets in the queue + :: and crashed while processing any of them, the %hole card + :: will turn *all* of them into losts/nacks. :: - =/ target=^bone (mix 0b10 bone) - :: notify |message-pump that this message got naxplained + :: TODO: This handles a previous crash in the client vane, but + :: not in %ames itself. :: - =. peer-core - abet:(call:abed:(pump-core target) %near naxplanation) + ++ sink-boon + ^+ sink + ?: |(closing corked) sink + %- %+ mi-trace msg.veb |. + :: XX -.task not visible, FIXME + :: + =/ dat [her bone=bone message-num=message-num] + ?:(ok "sink boon {}" "crashed on sink boon {}") + =? moves !ok + :: we previously crashed on this message; notify client vane + :: + %+ turn moves + |= =move + ?. ?=([* %give %boon *] move) move + [duct.move %give %lost ~] + :: + =. peer-core (pe-emit (got-duct bone) %give %boon message) + :: send ack unconditionally + :: + (call %done ok=%.y) :: - ?. (~(has in krocs.peer-state) target) - peer-core - :: if we get a naxplanation for a %cork, the publisher is behind - :: receiving the OTA. The /recork timer will retry eventually. - :: - %- %+ pe-trace msg.veb + ++ sink-nack + ^+ sink + %- %+ mi-trace msg.veb + =/ dat [her bone=bone message-num=message-num] + |.("sink naxplanation {}") + :: flip .bone's second bit to find referenced flow + :: + =/ target=^bone (mix 0b10 bone) + :: XX not used, remove + =? peer-core (~(has in krocs.peer-state) target) + :: if we get a naxplanation for a %cork, the publisher hans't + :: received the OTA. The /recork timer will retry eventually. + :: + %. peer-core + %+ pe-trace msg.veb |.("old publisher, %cork nacked on bone={}") - peer-core - :: +on-sink-plea: handle request message received by |message-sink - :: - ++ on-sink-plea - |= [=message-num message=*] - ^+ peer-core - ?: ?| (~(has in closing.peer-state) bone) - (~(has in corked.peer-state) bone) - == - peer-core - |^ - %- %+ pe-trace msg.veb - =/ dat [her.channel bone=bone message-num=message-num] - |.("sink plea {}") - :: is this the first time we're trying to process this message? - :: - ?: ?=([%hear * * ok=%.n] task) - :: we previously crashed on this message; send nack + =. peer-core + :: notify |message-pump that this message got naxplained + :: + =< abet + (call:abed:(pump-core target) %near ;;(naxplanation message)) + :: ack nack-trace message (only applied if we don't later crash) :: - nack-plea - :: fresh plea; pass to client vane - :: - =+ ;; =plea message - =/ =wire (make-bone-wire her.channel her-rift.channel bone) - :: - ?. =(vane.plea %$) - ?+ vane.plea ~| %ames-evil-vane^our^her.channel^vane.plea !! - %c (pe-emit duct %pass wire %c %plea her.channel plea) - %g (pe-emit duct %pass wire %g %plea her.channel plea) - %j (pe-emit duct %pass wire %j %plea her.channel plea) - == - :: a %cork plea is handled using %$ as the recipient vane to - :: account for publishers that still handle ames-to-ames %pleas - :: - ?> &(?=([%cork *] payload.plea) ?=(%flow -.path.plea)) - =. closing.peer-state (~(put in closing.peer-state) bone) - (pe-emit duct %pass wire %a %plea her.channel [%a /close ~]) - :: - ++ nack-plea - ^+ peer-core - =. peer-core (run-message-sink bone %done ok=%.n cork=%.n) - :: send nack-trace with blank .error for security - :: - =/ nack-trace-bone=^bone (mix 0b10 bone) - =/ =naxplanation [message-num *error] - =/ =message-blob (jam naxplanation) - :: - abet:(call:abed:(pump-core nack-trace-bone) %memo message-blob) + (call %done ok=%.y) -- -- --