Merge branch 'philip/stuck-flow' (#2071)

* origin/philip/stuck-flow:
  ames: recover from mismatched message nums

Signed-off-by: Jared Tobin <jared@tlon.io>
This commit is contained in:
Jared Tobin 2019-12-12 15:49:53 +08:00
commit 2aa86e3121
No known key found for this signature in database
GPG Key ID: 0E4647D58F8A69E4

View File

@ -768,7 +768,7 @@
$% [%hear =message-num =fragment-num]
[%done =message-num lag=@dr]
[%halt ~]
[%wake ~]
[%wake current=message-num]
==
:: $packet-pump-gift: effect from |packet-pump
::
@ -2226,6 +2226,7 @@
=~ (dispatch-task task)
feed-packets
(run-packet-pump %halt ~)
assert
[(flop gifts) state]
==
:: +dispatch-task: perform task-specific processing
@ -2236,7 +2237,7 @@
::
?- -.task
%memo (on-memo message-blob.task)
%wake (run-packet-pump task)
%wake (run-packet-pump %wake current.state)
%hear
?- -.ack-meat.task
%& (on-hear [message-num fragment-num=p.ack-meat]:task)
@ -2395,6 +2396,16 @@
=. message-pump (give i.packet-pump-gifts)
::
$(packet-pump-gifts t.packet-pump-gifts)
:: +assert: sanity checks to isolate error cases
::
++ assert
^+ message-pump
=/ top-live
(peek:packet-queue:*make-packet-pump live.packet-pump-state.state)
?. |(?=(~ top-live) (gte current.state message-num.key.u.top-live))
~| [%strange-current current=current.state key.u.top-live]
!!
message-pump
--
:: +make-packet-pump: construct |packet-pump core
::
@ -2424,12 +2435,13 @@
?- -.task
%hear (on-hear [message-num fragment-num]:task)
%done (on-done message-num.task)
%wake on-wake
%wake (on-wake current.task)
%halt set-wake
==
:: +on-wake: handle packet timeout
::
++ on-wake
|= current=message-num
^+ packet-pump
:: assert temporal coherence
::
@ -2443,13 +2455,14 @@
::
=- =* res -
=. live.state live.res
=. packet-pump (give %send static-fragment.res)
%- %+ trace snd.veb
=/ nums [message-num fragment-num]:static-fragment.res
|.("dead {<nums^show:gauge>}")
=? packet-pump ?=(^ static-fragment)
%- %+ trace snd.veb
=/ nums [message-num fragment-num]:u.static-fragment.res
|.("dead {<nums^show:gauge>}")
(give %send u.static-fragment.res)
packet-pump
::
=| acc=static-fragment
=| acc=(unit static-fragment)
^+ [static-fragment=acc live=live.state]
::
%^ (traverse:packet-queue _acc) live.state acc
@ -2458,12 +2471,18 @@
val=live-packet-val
==
^- [new-val=(unit live-packet-val) stop=? _acc]
:: if already acked later message, don't resend
::
?: (lth message-num.key current)
%- %- slog :_ ~
leaf+"ames: strange wake queue, expected {<current>}, got {<key>}"
[~ stop=%.n ~]
:: packet has expired; update it in-place, stop, and produce it
::
=. last-sent.val now.channel
=. retries.val +(retries.val)
::
[`val stop=%.y (to-static-fragment key val)]
[`val stop=%.y `(to-static-fragment key val)]
:: +feed: try to send a list of packets, returning unsent and effects
::
++ feed