Merge branch 'next/arvo' into ted/kill-subs

This commit is contained in:
Ted Blackman 2022-08-05 11:24:55 +03:00
commit f750de9aac

View File

@ -764,7 +764,7 @@
[%hear =message-num =ack-meat] [%hear =message-num =ack-meat]
[%near =naxplanation] [%near =naxplanation]
[%prod ~] [%prod ~]
[%wake ~] [%wake recork=?]
== ==
:: $message-pump-gift: effect from |message-pump :: $message-pump-gift: effect from |message-pump
:: ::
@ -795,7 +795,7 @@
$% [%hear =message-num =fragment-num] $% [%hear =message-num =fragment-num]
[%done =message-num lag=@dr] [%done =message-num lag=@dr]
[%halt ~] [%halt ~]
[%wake current=message-num] [%wake recork=? current=message-num]
[%prod ~] [%prod ~]
== ==
:: $packet-pump-gift: effect from |packet-pump :: $packet-pump-gift: effect from |packet-pump
@ -867,7 +867,7 @@
=^ moves adult-gate (call:adult-core duct dud task) =^ moves adult-gate (call:adult-core duct dud task)
%- molt %- molt
~> %slog.0^leaf/"ames: init daily recork timer" ~> %slog.0^leaf/"ames: init daily recork timer"
:_(moves [duct %pass /recork %b %wait `@da`(add now ~d1)]) :_(moves [duct %pass /recork %b %wait `@da`(add now ~m20)])
:: %born: set .unix-duct and start draining .queued-events :: %born: set .unix-duct and start draining .queued-events
:: ::
?: ?=(%born -.task) ?: ?=(%born -.task)
@ -954,7 +954,7 @@
[moves adult-gate] [moves adult-gate]
%- molt %- molt
~> %slog.0^leaf/"ames: init daily recork timer" ~> %slog.0^leaf/"ames: init daily recork timer"
:_(moves [duct %pass /recork %b %wait `@da`(add now ~d1)]) :_(moves [duct %pass /recork %b %wait `@da`(add now ~m20)])
:: set timer to drain next event :: set timer to drain next event
:: ::
=. moves :_(moves [duct %pass /larva %b %wait now]) =. moves :_(moves [duct %pass /larva %b %wait now])
@ -1482,9 +1482,10 @@
=/ snds=(list (list [ship bone message-pump-state])) =/ snds=(list (list [ship bone message-pump-state]))
%+ turn states %+ turn states
|= [=ship peer-state] |= [=ship peer-state]
%+ turn ~(tap by snd) %+ murn ~(tap by snd)
|= [=bone =message-pump-state] |= [=bone =message-pump-state]
[ship bone message-pump-state] ?: (~(has in closing) bone) ~
`[ship bone message-pump-state]
=/ next-wakes =/ next-wakes
%+ turn `(list [ship bone message-pump-state])`(zing snds) %+ turn `(list [ship bone message-pump-state])`(zing snds)
|= [=ship =bone message-pump-state] |= [=ship =bone message-pump-state]
@ -1498,7 +1499,7 @@
%- silt %- silt
;; (list [@da ^duct]) ;; (list [@da ^duct])
=< q.q %- need %- need =< q.q %- need %- need
(rof ~ %b [[our %timers da+now] /]) (rof ~ %bx [[our %$ da+now] /debug/timers])
=/ to-stir =/ to-stir
%+ skip next-real-wakes %+ skip next-real-wakes
|= [=ship =bone =@da] |= [=ship =bone =@da]
@ -1812,15 +1813,15 @@
(request-attestation u.ship) (request-attestation u.ship)
:: ::
|^ |^
?. ?=([%recork ~] wire) (handle-single-wire wire) ?. ?=([%recork ~] wire) (handle-single-wire wire recork=|)
=/ wires=(list ^wire) ~(tap in corks.ames-state) =/ wires=(list ^wire) ~(tap in corks.ames-state)
|- ^+ event-core |- ^+ event-core
?^ wires ?^ wires
$(wires t.wires, event-core (handle-single-wire i.wires)) $(wires t.wires, event-core (handle-single-wire i.wires recork=&))
(emit duct %pass /recork %b %wait `@da`(add now ~d1)) (emit duct %pass /recork %b %wait `@da`(add now ~m20))
:: ::
++ handle-single-wire ++ handle-single-wire
|= =^wire |= [=^wire recork=?]
^+ event-core ^+ event-core
=/ res=(unit [her=ship =bone]) (parse-pump-timer-wire wire) =/ res=(unit [her=ship =bone]) (parse-pump-timer-wire wire)
?~ res ?~ res
@ -1835,7 +1836,9 @@
:: ::
=/ =channel [[our her.u.res] now channel-state -.u.state] =/ =channel [[our her.u.res] now channel-state -.u.state]
:: ::
abet:(on-wake:(make-peer-core u.state channel) bone.u.res error) =< abet
%- on-wake:(make-peer-core u.state channel)
[recork bone.u.res error]
-- --
:: +on-init: first boot; subscribe to our info from jael :: +on-init: first boot; subscribe to our info from jael
:: ::
@ -2451,8 +2454,15 @@
:: +on-wake: handle timer expiration :: +on-wake: handle timer expiration
:: ::
++ on-wake ++ on-wake
|= [=bone error=(unit tang)] |= [recork=? =bone error=(unit tang)]
^+ peer-core ^+ peer-core
:: ignore spurious pump timers for %cork's
::
:: This is here to fix canaries that had spurious pump timers.
::
?: &(!recork (~(has in closing.peer-state) bone))
~> %slog.0^leaf/"ames: dropping pump wake while recorking {<bone>}"
peer-core
:: if we previously errored out, print and reset timer for later :: if we previously errored out, print and reset timer for later
:: ::
:: This really shouldn't happen, but if it does, make sure we :: This really shouldn't happen, but if it does, make sure we
@ -2521,7 +2531,7 @@
peer-core peer-core
:: maybe resend some timed out packets :: maybe resend some timed out packets
:: ::
(run-message-pump bone %wake ~) (run-message-pump bone %wake recork)
:: +send-shut-packet: fire encrypted packet at rcvr and maybe sponsors :: +send-shut-packet: fire encrypted packet at rcvr and maybe sponsors
:: ::
++ send-shut-packet ++ send-shut-packet
@ -2826,7 +2836,7 @@
:: receiving the OTA, so we set up a timer to retry in one day. :: receiving the OTA, so we set up a timer to retry in one day.
:: ::
%- %+ trace msg.veb %- %+ trace msg.veb
|.("old publisher, resend %cork on bone={<target-bone>} in ~d1") |.("old publisher, resend %cork on bone={<target-bone>} in ~m20")
=/ =wire (make-pump-timer-wire her.channel target-bone) =/ =wire (make-pump-timer-wire her.channel target-bone)
=. corks.ames-state (~(put in corks.ames-state) wire) =. corks.ames-state (~(put in corks.ames-state) wire)
peer-core peer-core
@ -2902,12 +2912,15 @@
|= task=message-pump-task |= task=message-pump-task
^+ [gifts state] ^+ [gifts state]
:: ::
=~ (dispatch-task task) =. message-pump (dispatch-task task)
feed-packets =. message-pump feed-packets
:: don't set new pump timer if triggered by a recork timer
::
=? message-pump !=([%wake recork=&] task)
(run-packet-pump %halt ~) (run-packet-pump %halt ~)
assert ::
=. message-pump assert
[(flop gifts) state] [(flop gifts) state]
==
:: +dispatch-task: perform task-specific processing :: +dispatch-task: perform task-specific processing
:: ::
++ dispatch-task ++ dispatch-task
@ -2917,7 +2930,7 @@
?- -.task ?- -.task
%prod (run-packet-pump %prod ~) %prod (run-packet-pump %prod ~)
%memo (on-memo message-blob.task) %memo (on-memo message-blob.task)
%wake (run-packet-pump %wake current.state) %wake (run-packet-pump %wake recork.task current.state)
%hear %hear
?- -.ack-meat.task ?- -.ack-meat.task
%& %&
@ -2983,7 +2996,13 @@
^+ message-pump ^+ message-pump
:: unsent messages from the future should never get acked :: unsent messages from the future should never get acked
:: ::
~| [message-num next.state] ~| :* bone=bone
mnum=message-num
next=next.state
unsent-messages=~(wyt in unsent-messages.state)
unsent-fragments=(lent unsent-fragments.state)
any-live=!=(~ live.packet-pump-state.state)
==
?> (lth message-num next.state) ?> (lth message-num next.state)
:: ignore duplicate message acks :: ignore duplicate message acks
:: ::
@ -3159,7 +3178,7 @@
?- -.task ?- -.task
%hear (on-hear [message-num fragment-num]:task) %hear (on-hear [message-num fragment-num]:task)
%done (on-done message-num.task) %done (on-done message-num.task)
%wake (on-wake current.task) %wake (on-wake recork.task current.task)
%prod on-prod %prod on-prod
%halt set-wake %halt set-wake
== ==
@ -3186,7 +3205,7 @@
:: +on-wake: handle packet timeout :: +on-wake: handle packet timeout
:: ::
++ on-wake ++ on-wake
|= current=message-num |= [recork=? current=message-num]
^+ packet-pump ^+ packet-pump
:: assert temporal coherence :: assert temporal coherence
:: ::