diff --git a/pkg/arvo/sys/vane/ames.hoon b/pkg/arvo/sys/vane/ames.hoon index afebdae12..a2e826c5d 100644 --- a/pkg/arvo/sys/vane/ames.hoon +++ b/pkg/arvo/sys/vane/ames.hoon @@ -768,7 +768,7 @@ $% [%hear =message-num =fragment-num] [%done =message-num lag=@dr] [%halt ~] - [%wake ~] + [%wake current=message-num] == :: $packet-pump-gift: effect from |packet-pump :: @@ -2222,6 +2222,7 @@ =~ (dispatch-task task) feed-packets (run-packet-pump %halt ~) + assert [(flop gifts) state] == :: +dispatch-task: perform task-specific processing @@ -2232,7 +2233,7 @@ :: ?- -.task %memo (on-memo message-blob.task) - %wake (run-packet-pump task) + %wake (run-packet-pump %wake current.state) %hear ?- -.ack-meat.task %& (on-hear [message-num fragment-num=p.ack-meat]:task) @@ -2391,6 +2392,16 @@ =. message-pump (give i.packet-pump-gifts) :: $(packet-pump-gifts t.packet-pump-gifts) + :: +assert: sanity checks to isolate error cases + :: + ++ assert + ^+ message-pump + =/ top-live + (peek:packet-queue:*make-packet-pump live.packet-pump-state.state) + ?. |(?=(~ top-live) (gte current.state message-num.key.u.top-live)) + ~| [%strange-current current=current.state key.u.top-live] + !! + message-pump -- :: +make-packet-pump: construct |packet-pump core :: @@ -2420,12 +2431,13 @@ ?- -.task %hear (on-hear [message-num fragment-num]:task) %done (on-done message-num.task) - %wake on-wake + %wake (on-wake current.task) %halt set-wake == :: +on-wake: handle packet timeout :: ++ on-wake + |= current=message-num ^+ packet-pump :: assert temporal coherence :: @@ -2439,13 +2451,14 @@ :: =- =* res - =. live.state live.res - =. packet-pump (give %send static-fragment.res) - %- %+ trace snd.veb - =/ nums [message-num fragment-num]:static-fragment.res - |.("dead {}") + =? packet-pump ?=(^ static-fragment) + %- %+ trace snd.veb + =/ nums [message-num fragment-num]:u.static-fragment.res + |.("dead {}") + (give %send u.static-fragment.res) packet-pump :: - =| acc=static-fragment + =| acc=(unit static-fragment) ^+ [static-fragment=acc live=live.state] :: %^ (traverse:packet-queue _acc) live.state acc @@ -2454,12 +2467,18 @@ val=live-packet-val == ^- [new-val=(unit live-packet-val) stop=? _acc] + :: if already acked later message, don't resend + :: + ?: (lth message-num.key current) + %- %- slog :_ ~ + leaf+"ames: strange wake queue, expected {}, got {}" + [~ stop=%.n ~] :: packet has expired; update it in-place, stop, and produce it :: =. last-sent.val now.channel =. retries.val +(retries.val) :: - [`val stop=%.y (to-static-fragment key val)] + [`val stop=%.y `(to-static-fragment key val)] :: +feed: try to send a list of packets, returning unsent and effects :: ++ feed