diff --git a/pkg/arvo/sys/vane/ames.hoon b/pkg/arvo/sys/vane/ames.hoon index 16b5d53cf..64d410698 100644 --- a/pkg/arvo/sys/vane/ames.hoon +++ b/pkg/arvo/sys/vane/ames.hoon @@ -871,12 +871,12 @@ ?^ dud ?+ -.task (on-crud:event-core -.task tang.u.dud) - %hear (on-hole:event-core [lane blob]:task) + %hear (on-hear:event-core lane.task blob.task dud) == :: ?- -.task %born on-born:event-core - %hear (on-hear:event-core [lane blob]:task) + %hear (on-hear:event-core [lane blob ~]:task) %heed (on-heed:event-core ship.task) %init on-init:event-core %jilt (on-jilt:event-core ship.task) @@ -1194,15 +1194,15 @@ =/ =channel [[our ship] now channel-state -.peer-state] abet:on-jilt:(make-peer-core peer-state channel) :: +on-hear: handle raw packet receipt - :: +on-hole: handle packet crash notification :: - ++ on-hear |=([l=lane b=blob] (on-hear-packet l (decode-packet b) ok=&)) - ++ on-hole |=([l=lane b=blob] (on-hear-packet l (decode-packet b) ok=|)) + ++ on-hear + |= [l=lane b=blob d=(unit goof)] + (on-hear-packet l (decode-packet b) d) :: +on-hear-packet: handle mildly processed packet receipt :: ++ on-hear-packet ~/ %on-hear-packet - |= [=lane =packet ok=?] + |= [=lane =packet dud=(unit goof)] ^+ event-core :: ?: =(our sndr.packet) @@ -1226,7 +1226,7 @@ :: ++ on-hear-forward ~/ %on-hear-forward - |= [=lane =packet ok=?] + |= [=lane =packet dud=(unit goof)] ^+ event-core %- %^ trace for.veb sndr.packet |.("forward: {} -> {}") @@ -1246,7 +1246,7 @@ :: ++ on-hear-open ~/ %on-hear-open - |= [=lane =packet ok=?] + |= [=lane =packet dud=(unit goof)] ^+ event-core :: assert the comet can't pretend to be a moon or other address :: @@ -1283,7 +1283,7 @@ :: ++ on-hear-shut ~/ %on-hear-shut - |= [=lane =packet ok=?] + |= [=lane =packet dud=(unit goof)] ^+ event-core =/ sndr-state (~(get by peers.ames-state) sndr.packet) :: if we don't know them, maybe enqueue a jael %public-keys request @@ -1338,7 +1338,7 @@ :: perform peer-specific handling of packet :: =/ peer-core (make-peer-core peer-state channel) - abet:(on-hear-shut-packet:peer-core lane shut-packet ok) + abet:(on-hear-shut-packet:peer-core lane shut-packet dud) :: +on-take-boon: receive request to give message to peer :: ++ on-take-boon @@ -1373,7 +1373,7 @@ |. ^- tape =/ sndr [our our-life.channel] =/ rcvr [ship her-life.channel] - "plea {}" + "plea {}" :: abet:(on-memo:(make-peer-core peer-state channel) bone plea %plea) :: +on-take-wake: receive wakeup or error notification from behn @@ -1897,7 +1897,7 @@ :: +on-hear-shut-packet: handle receipt of ack or message fragment :: ++ on-hear-shut-packet - |= [=lane =shut-packet ok=?] + |= [=lane =shut-packet dud=(unit goof)] ^+ peer-core :: update and print connection status :: @@ -1906,12 +1906,15 @@ =/ =bone bone.shut-packet :: ?: ?=(%& -.meat.shut-packet) - (run-message-sink bone %hear lane shut-packet ok) - :: ignore .ok for |message-pump; just try again on error + (run-message-sink bone %hear lane shut-packet ?=(~ dud)) + :: Just try again on error, printing trace :: :: Note this implies that vanes should never crash on %done, :: since we have no way to continue using the flow if they do. :: + =+ ?~ dud ~ + %. ~ + (slog leaf+"ames: crashed on message ack" >mote.u.dud< tang.u.dud) (run-message-pump bone %hear [message-num +.meat]:shut-packet) :: +on-memo: handle request to send message :: @@ -2197,11 +2200,12 @@ ?. ?=([%hear * * ok=%.n] task) :: fresh boon; give message to client vane :: - %- (trace msg.veb |.("boon {}")) + %- (trace msg.veb |.("boon {}")) peer-core :: we previously crashed on this message; notify client vane :: - %- (trace msg.veb |.("crashed on boon {}")) + %- %+ trace msg.veb + |.("crashed on boon {}") boon-to-lost :: +boon-to-lost: convert all boons to losts :: @@ -2219,7 +2223,7 @@ ++ on-sink-nack-trace |= [=message-num message=*] ^+ peer-core - %- (trace msg.veb |.("nack trace {}")) + %- (trace msg.veb |.("nack trace {}")) :: =+ ;; =naxplanation message :: ack nack-trace message (only applied if we don't later crash) @@ -2236,7 +2240,7 @@ ++ on-sink-plea |= [=message-num message=*] ^+ peer-core - %- (trace msg.veb |.("plea {}")) + %- (trace msg.veb |.("plea {}")) :: is this the first time we're trying to process this message? :: ?. ?=([%hear * * ok=%.n] task) @@ -2346,7 +2350,8 @@ :: ignore duplicate message acks :: ?: (lth message-num current.state) - %- (trace snd.veb |.("duplicate done {}")) + %- %+ trace snd.veb + |.("duplicate done {}") message-pump :: ignore duplicate and future acks :: @@ -2381,6 +2386,19 @@ :: =. queued-message-acks.state (~(del by queued-message-acks.state) current.state) + :: clear all packets from this message from the packet pump + :: + :: Note we did this when the original packet came in, a few lines + :: above. It's not clear why, but it doesn't always clear the + :: packets when it's not the current message. As a workaround, + :: we clear the packets again when we catch up to this packet. + :: + :: This is slightly inefficient because we run this twice for + :: each packet and it may emit a few unnecessary packets, but + :: but it's not incorrect. pump-metrics are updated only once, + :: at the time when we actually delete the packet. + :: + =. message-pump (run-packet-pump %done current.state lag=*@dr) :: give %done to vane if we're ready :: ?- -.u.cur @@ -2646,7 +2664,7 @@ =(0 (mod counter.metrics.state 20)) == same - (trace snd.veb |.("{<[fragment-num show:gauge]>}")) + (trace snd.veb |.("send: {<[fragment=fragment-num show:gauge]>}")) :: .resends is backward, so fold backward and emit :: =. packet-pump @@ -2705,7 +2723,7 @@ =- =. metrics.state metrics.- =. live.state live.- :: - %- (trace snd.veb |.("done {}")) + %- (trace snd.veb |.("done {}")) (fast-resend-after-ack message-num `fragment-num`0) :: ^+ [metrics=metrics.state live=live.state] @@ -2936,7 +2954,8 @@ :: ignore messages from far future; limit to 10 in progress :: ?: (gte seq (add 10 last-acked.state)) - %- (trace odd.veb |.("future %hear {}")) + %- %+ trace odd.veb + |.("future %hear {}") message-sink :: =/ is-last-fragment=? =(+(fragment-num) num-fragments) @@ -2946,12 +2965,13 @@ ?. is-last-fragment :: single packet ack :: - %- (trace rcv.veb |.("send dupe ack {}")) + %- %+ trace rcv.veb + |.("send dupe ack {}") (give %send seq %& fragment-num) :: whole message (n)ack :: =/ ok=? !(~(has in nax.state) seq) - %- (trace rcv.veb |.("send dupe message ack {} ok={}")) + %- (trace rcv.veb |.("send dupe message ack {} ok={}")) (give %send seq %| ok lag=`@dr`0) :: last-acked}")) + %- %+ trace rcv.veb |. + =/ data + [seq=seq fragment-num=fragment-num num-fragments=num-fragments] + "send ack-1 {}" (give %send seq %& fragment-num) :: last-heard}" message-sink - %- (trace rcv.veb |.("send dupe ack {}")) + %- %+ trace rcv.veb + |.("send dupe ack {}") (give %send seq %& fragment-num) :: new fragment; store in state and check if message is done :: @@ -3014,7 +3039,10 @@ :: ack any packet other than the last one, and continue either way :: =? message-sink !is-last-fragment - %- (trace rcv.veb |.("send ack-2 {}")) + %- %+ trace rcv.veb |. + =/ data + [seq=seq fragment-num=fragment-num num-fragments=num-fragments] + "send ack-2 {}" (give %send seq %& fragment-num) :: enqueue all completed messages starting at +(last-heard.state) :: @@ -3037,7 +3065,7 @@ =. live-messages.state (~(del by live-messages.state) seq) :: %- %+ trace msg.veb - |.("hear {} {} {}kb") + |.("hear {} {} {}kb") =/ message=* (assemble-fragments [num-fragments fragments]:u.live) =. message-sink (enqueue-to-vane seq message) ::