Merge pull request #4143 from urbit/philip/ames

ames: Fix stuck flows caused by %strange-current
This commit is contained in:
Philip Monk 2020-12-17 16:23:59 -08:00 committed by GitHub
commit c824284764
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -871,12 +871,12 @@
?^ dud
?+ -.task
(on-crud:event-core -.task tang.u.dud)
%hear (on-hole:event-core [lane blob]:task)
%hear (on-hear:event-core lane.task blob.task dud)
==
::
?- -.task
%born on-born:event-core
%hear (on-hear:event-core [lane blob]:task)
%hear (on-hear:event-core [lane blob ~]:task)
%heed (on-heed:event-core ship.task)
%init on-init:event-core
%jilt (on-jilt:event-core ship.task)
@ -1194,15 +1194,15 @@
=/ =channel [[our ship] now channel-state -.peer-state]
abet:on-jilt:(make-peer-core peer-state channel)
:: +on-hear: handle raw packet receipt
:: +on-hole: handle packet crash notification
::
++ on-hear |=([l=lane b=blob] (on-hear-packet l (decode-packet b) ok=&))
++ on-hole |=([l=lane b=blob] (on-hear-packet l (decode-packet b) ok=|))
++ on-hear
|= [l=lane b=blob d=(unit goof)]
(on-hear-packet l (decode-packet b) d)
:: +on-hear-packet: handle mildly processed packet receipt
::
++ on-hear-packet
~/ %on-hear-packet
|= [=lane =packet ok=?]
|= [=lane =packet dud=(unit goof)]
^+ event-core
::
?: =(our sndr.packet)
@ -1226,7 +1226,7 @@
::
++ on-hear-forward
~/ %on-hear-forward
|= [=lane =packet ok=?]
|= [=lane =packet dud=(unit goof)]
^+ event-core
%- %^ trace for.veb sndr.packet
|.("forward: {<sndr.packet>} -> {<rcvr.packet>}")
@ -1246,7 +1246,7 @@
::
++ on-hear-open
~/ %on-hear-open
|= [=lane =packet ok=?]
|= [=lane =packet dud=(unit goof)]
^+ event-core
:: assert the comet can't pretend to be a moon or other address
::
@ -1283,7 +1283,7 @@
::
++ on-hear-shut
~/ %on-hear-shut
|= [=lane =packet ok=?]
|= [=lane =packet dud=(unit goof)]
^+ event-core
=/ sndr-state (~(get by peers.ames-state) sndr.packet)
:: if we don't know them, maybe enqueue a jael %public-keys request
@ -1338,7 +1338,7 @@
:: perform peer-specific handling of packet
::
=/ peer-core (make-peer-core peer-state channel)
abet:(on-hear-shut-packet:peer-core lane shut-packet ok)
abet:(on-hear-shut-packet:peer-core lane shut-packet dud)
:: +on-take-boon: receive request to give message to peer
::
++ on-take-boon
@ -1373,7 +1373,7 @@
|. ^- tape
=/ sndr [our our-life.channel]
=/ rcvr [ship her-life.channel]
"plea {<sndr^rcvr^bone^vane.plea^path.plea>}"
"plea {<sndr^rcvr^bone=bone^vane.plea^path.plea>}"
::
abet:(on-memo:(make-peer-core peer-state channel) bone plea %plea)
:: +on-take-wake: receive wakeup or error notification from behn
@ -1897,7 +1897,7 @@
:: +on-hear-shut-packet: handle receipt of ack or message fragment
::
++ on-hear-shut-packet
|= [=lane =shut-packet ok=?]
|= [=lane =shut-packet dud=(unit goof)]
^+ peer-core
:: update and print connection status
::
@ -1906,12 +1906,15 @@
=/ =bone bone.shut-packet
::
?: ?=(%& -.meat.shut-packet)
(run-message-sink bone %hear lane shut-packet ok)
:: ignore .ok for |message-pump; just try again on error
(run-message-sink bone %hear lane shut-packet ?=(~ dud))
:: Just try again on error, printing trace
::
:: Note this implies that vanes should never crash on %done,
:: since we have no way to continue using the flow if they do.
::
=+ ?~ dud ~
%. ~
(slog leaf+"ames: crashed on message ack" >mote.u.dud< tang.u.dud)
(run-message-pump bone %hear [message-num +.meat]:shut-packet)
:: +on-memo: handle request to send message
::
@ -2197,11 +2200,12 @@
?. ?=([%hear * * ok=%.n] task)
:: fresh boon; give message to client vane
::
%- (trace msg.veb |.("boon {<her.channel^bone -.task>}"))
%- (trace msg.veb |.("boon {<her.channel^bone=bone -.task>}"))
peer-core
:: we previously crashed on this message; notify client vane
::
%- (trace msg.veb |.("crashed on boon {<her.channel^bone -.task>}"))
%- %+ trace msg.veb
|.("crashed on boon {<her.channel^bone=bone -.task>}")
boon-to-lost
:: +boon-to-lost: convert all boons to losts
::
@ -2219,7 +2223,7 @@
++ on-sink-nack-trace
|= [=message-num message=*]
^+ peer-core
%- (trace msg.veb |.("nack trace {<her.channel^bone>}"))
%- (trace msg.veb |.("nack trace {<her.channel^bone=bone>}"))
::
=+ ;; =naxplanation message
:: ack nack-trace message (only applied if we don't later crash)
@ -2236,7 +2240,7 @@
++ on-sink-plea
|= [=message-num message=*]
^+ peer-core
%- (trace msg.veb |.("plea {<her.channel^bone>}"))
%- (trace msg.veb |.("plea {<her.channel^bone=bone>}"))
:: is this the first time we're trying to process this message?
::
?. ?=([%hear * * ok=%.n] task)
@ -2346,7 +2350,8 @@
:: ignore duplicate message acks
::
?: (lth message-num current.state)
%- (trace snd.veb |.("duplicate done {<current.state message-num>}"))
%- %+ trace snd.veb
|.("duplicate done {<current=current.state message-num=message-num>}")
message-pump
:: ignore duplicate and future acks
::
@ -2381,6 +2386,19 @@
::
=. queued-message-acks.state
(~(del by queued-message-acks.state) current.state)
:: clear all packets from this message from the packet pump
::
:: Note we did this when the original packet came in, a few lines
:: above. It's not clear why, but it doesn't always clear the
:: packets when it's not the current message. As a workaround,
:: we clear the packets again when we catch up to this packet.
::
:: This is slightly inefficient because we run this twice for
:: each packet and it may emit a few unnecessary packets, but
:: but it's not incorrect. pump-metrics are updated only once,
:: at the time when we actually delete the packet.
::
=. message-pump (run-packet-pump %done current.state lag=*@dr)
:: give %done to vane if we're ready
::
?- -.u.cur
@ -2646,7 +2664,7 @@
=(0 (mod counter.metrics.state 20))
==
same
(trace snd.veb |.("{<[fragment-num show:gauge]>}"))
(trace snd.veb |.("send: {<[fragment=fragment-num show:gauge]>}"))
:: .resends is backward, so fold backward and emit
::
=. packet-pump
@ -2705,7 +2723,7 @@
=- =. metrics.state metrics.-
=. live.state live.-
::
%- (trace snd.veb |.("done {<message-num^show:gauge>}"))
%- (trace snd.veb |.("done {<message-num=message-num^show:gauge>}"))
(fast-resend-after-ack message-num `fragment-num`0)
::
^+ [metrics=metrics.state live=live.state]
@ -2936,7 +2954,8 @@
:: ignore messages from far future; limit to 10 in progress
::
?: (gte seq (add 10 last-acked.state))
%- (trace odd.veb |.("future %hear {<seq^last-acked.state>}"))
%- %+ trace odd.veb
|.("future %hear {<seq=seq^last-acked=last-acked.state>}")
message-sink
::
=/ is-last-fragment=? =(+(fragment-num) num-fragments)
@ -2946,12 +2965,13 @@
?. is-last-fragment
:: single packet ack
::
%- (trace rcv.veb |.("send dupe ack {<seq^fragment-num>}"))
%- %+ trace rcv.veb
|.("send dupe ack {<seq=seq^fragment-num=fragment-num>}")
(give %send seq %& fragment-num)
:: whole message (n)ack
::
=/ ok=? !(~(has in nax.state) seq)
%- (trace rcv.veb |.("send dupe message ack {<seq>} ok={<ok>}"))
%- (trace rcv.veb |.("send dupe message ack {<seq=seq>} ok={<ok>}"))
(give %send seq %| ok lag=`@dr`0)
:: last-acked<seq<=last-heard; heard message, unprocessed
::
@ -2965,15 +2985,18 @@
%- %+ trace rcv.veb
|. ^- tape
=/ data
:* her.channel seq
fragment-num num-fragments
:* her.channel seq=seq
fragment-num=fragment-num num-fragments=num-fragments
la=last-acked.state lh=last-heard.state
==
"hear last in-progress {<data>}"
message-sink
:: ack all other packets
::
%- (trace rcv.veb |.("send ack-1 {<seq^fragment-num^num-fragments>}"))
%- %+ trace rcv.veb |.
=/ data
[seq=seq fragment-num=fragment-num num-fragments=num-fragments]
"send ack-1 {<data>}"
(give %send seq %& fragment-num)
:: last-heard<seq<10+last-heard; this is a packet in a live message
::
@ -2996,10 +3019,12 @@
?: already-heard-fragment
?: is-last-fragment
%- %+ trace rcv.veb |.
=/ data [her.channel seq last-heard.state last-acked.state]
=/ data
[her.channel seq=seq lh=last-heard.state la=last-acked.state]
"hear last dupe {<data>}"
message-sink
%- (trace rcv.veb |.("send dupe ack {<her.channel^seq^fragment-num>}"))
%- %+ trace rcv.veb
|.("send dupe ack {<her.channel^seq=seq^fragment-num=fragment-num>}")
(give %send seq %& fragment-num)
:: new fragment; store in state and check if message is done
::
@ -3014,7 +3039,10 @@
:: ack any packet other than the last one, and continue either way
::
=? message-sink !is-last-fragment
%- (trace rcv.veb |.("send ack-2 {<seq^fragment-num^num-fragments>}"))
%- %+ trace rcv.veb |.
=/ data
[seq=seq fragment-num=fragment-num num-fragments=num-fragments]
"send ack-2 {<data>}"
(give %send seq %& fragment-num)
:: enqueue all completed messages starting at +(last-heard.state)
::
@ -3037,7 +3065,7 @@
=. live-messages.state (~(del by live-messages.state) seq)
::
%- %+ trace msg.veb
|.("hear {<her.channel>} {<seq>} {<num-fragments.u.live>}kb")
|.("hear {<her.channel>} {<seq=seq>} {<num-fragments.u.live>}kb")
=/ message=* (assemble-fragments [num-fragments fragments]:u.live)
=. message-sink (enqueue-to-vane seq message)
::