Merge pull request #6490 from urbit/pkova/fix-eyre-clog

lull, eyre: actually send events after clogged channel reconnect
This commit is contained in:
Pyry Kovanen 2023-04-25 16:23:52 +03:00 committed by GitHub
commit cdd406f9dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 69 additions and 57 deletions

View File

@ -1520,7 +1520,7 @@
$% $>(%poke-ack sign:agent:gall)
$>(%watch-ack sign:agent:gall)
$>(%kick sign:agent:gall)
[%fact =mark =noun]
[%fact =desk =mark =noun]
==
:: channel: connection to the browser
::

View File

@ -1395,11 +1395,8 @@
::NOTE these will only fail if the mark and/or json types changed,
:: since conversion failure also gets caught during first receive.
:: we can't do anything about this, so consider it unsupported.
=/ sign
(channel-event-to-sign u.maybe-channel request-id channel-event)
?~ sign $
=/ said
(sign-to-tape u.maybe-channel request-id u.sign)
(channel-event-to-tape u.maybe-channel request-id channel-event)
?~ said $
$(events [(event-tape-to-wall id +.u.said) events])
:: send the start event to the client
@ -1433,8 +1430,8 @@
::
=/ heartbeat-time=@da (add now ~s20)
=/ heartbeat (set-heartbeat-move channel-id heartbeat-time)
:: record the duct for future output and
:: record heartbeat-time for possible future cancel
:: record the mode & duct for future output,
:: and record heartbeat-time for possible future cancel
::
=. session.channel-state.state
%+ ~(jab by session.channel-state.state) channel-id
@ -1678,8 +1675,12 @@
:: if conversion succeeds, we *can* send it. if the client is actually
:: connected, we *will* send it immediately.
::
=/ maybe-channel-event=(unit channel-event)
(sign-to-channel-event sign u.channel request-id)
?~ maybe-channel-event [~ state]
=/ =channel-event u.maybe-channel-event
=/ said=(unit (quip move tape))
(sign-to-tape u.channel request-id sign)
(channel-event-to-tape u.channel request-id channel-event)
=? moves ?=(^ said)
(weld moves -.u.said)
=* sending &(?=([%| *] state.u.channel) ?=(^ said))
@ -1689,7 +1690,7 @@
::
=? events.u.channel ?=(^ said)
%- ~(put to events.u.channel)
[next-id request-id (sign-to-channel-event sign)]
[next-id request-id channel-event]
:: if it makes sense to do so, send the event to the client
::
=? moves sending
@ -1727,7 +1728,7 @@
:: if we're clogged, or we ran into an event we can't serialize,
:: kill this gall subscription.
::
=* msg=tape "on {(trip channel-id)} for {(trip request-id)}"
=* msg=tape "on {(trip channel-id)} for {(scow %ud request-id)}"
=/ kicking=?
?: clogged
((trace 0 |.("clogged {msg}")) &)
@ -1751,7 +1752,9 @@
subscriptions (~(del by subscriptions.u.channel) request-id)
unacked (~(del by unacked.u.channel) request-id)
events %- ~(put to events.u.channel)
[next-id request-id (sign-to-channel-event %kick ~)]
:+ next-id
request-id
(need (sign-to-channel-event [%kick ~] u.channel request-id))
==
:: if a client is connected, send the kick event to them
::
@ -1764,7 +1767,7 @@
^= data
%- wall-to-octs
%+ event-tape-to-wall next-id
+:(need (sign-to-tape u.channel request-id %kick ~))
+:(need (channel-event-to-tape u.channel request-id %kick ~))
::
complete=%.n
==
@ -1779,10 +1782,12 @@
:: +sign-to-channel-event: strip the vase from a sign:agent:gall
::
++ sign-to-channel-event
|= =sign:agent:gall
^- channel-event
?. ?=(%fact -.sign) sign
[%fact [p q.q]:cage.sign]
|= [=sign:agent:gall =channel request-id=@ud]
^- (unit channel-event)
?. ?=(%fact -.sign) `sign
?~ desk=(app-to-desk channel request-id) ~
:- ~
[%fact u.desk [p q.q]:cage.sign]
:: +app-to-desk
::
++ app-to-desk
@ -1790,70 +1795,51 @@
^- (unit desk)
=/ sub (~(get by subscriptions.channel) request-id)
?~ sub
((trace 0 |.("no subscription for request-id {(trip request-id)}")) ~)
((trace 0 |.("no subscription for request-id {(scow %ud request-id)}")) ~)
=/ des=(unit (unit cage))
(rof ~ %gd [our app.u.sub da+now] ~)
?. ?=([~ ~ *] des)
((trace 0 |.("no desk for app {<app.u.sub>}")) ~)
`!<(=desk q.u.u.des)
:: +channel-event-to-sign: attempt to recover a sign from a channel-event
:: +channel-event-to-tape: render channel-event from request-id in specified mode
::
++ channel-event-to-sign
~% %eyre-channel-event-to-sign ..part ~
|= [=channel request-id=@ud event=channel-event]
^- (unit sign:agent:gall)
?. ?=(%fact -.event) `event
:: rebuild vase for fact data
::
=/ des=(unit desk) (app-to-desk channel request-id)
?~ des ~
=* have=mark mark.event
=/ val=(unit (unit cage))
(rof ~ %cb [our u.des da+now] /[have])
?. ?=([~ ~ *] val)
((trace 0 |.("no mark {(trip have)}")) ~)
=+ !<(=dais:clay q.u.u.val)
=/ res (mule |.((vale:dais noun.event)))
?: ?=(%| -.res)
((trace 0 |.("stale fact of mark {(trip have)}")) ~)
`[%fact have p.res]
:: +sign-to-tape: render sign from request-id in specified mode
::
++ sign-to-tape
|= [=channel request-id=@ud =sign:agent:gall]
++ channel-event-to-tape
|= [=channel request-id=@ud =channel-event]
^- (unit (quip move tape))
?- mode.channel
%json %+ bind (sign-to-json channel request-id sign)
%json %+ bind (channel-event-to-json channel request-id channel-event)
|=((quip move json) [+<- (en-json:html +<+)])
%jam =- `[~ (scow %uw (jam -))]
[request-id (sign-to-channel-event sign)]
[request-id channel-event]
==
:: +sign-to-json: render sign from request-id as json channel event
:: +channel-event-to-json: render channel event as json channel event
::
++ sign-to-json
~% %sign-to-json ..part ~
|= [=channel request-id=@ud =sign:agent:gall]
++ channel-event-to-json
~% %eyre-channel-event-to-json ..part ~
|= [=channel request-id=@ud event=channel-event]
^- (unit (quip move json))
:: for facts, we try to convert the result to json
::
=/ [from=(unit [=desk =mark]) jsyn=(unit sign:agent:gall)]
?. ?=(%fact -.sign) [~ `sign]
?: ?=(%json p.cage.sign) [~ `sign]
?. ?=(%fact -.event) [~ `event]
?: ?=(%json mark.event)
?~ jsin=((soft json) noun.event)
%. [~ ~]
(slog leaf+"eyre: dropping fake json for {(scow %ud request-id)}" ~)
[~ `[%fact %json !>(u.jsin)]]
:: find and use tube from fact mark to json
::
=/ des=(unit desk) (app-to-desk channel request-id)
?~ des [~ ~]
::
=* have=mark p.cage.sign
=* have=mark mark.event
=/ convert=(unit vase)
=/ cag=(unit (unit cage))
(rof ~ %cf [our u.des da+now] /[have]/json)
(rof ~ %cf [our desk.event da+now] /[have]/json)
?. ?=([~ ~ *] cag) ~
`q.u.u.cag
?~ convert
((trace 0 |.("no convert from {(trip have)} to json")) [~ ~])
~| "conversion failed from {(trip have)} to json"
[`[u.des have] `[%fact %json (slym u.convert q.q.cage.sign)]]
[`[desk.event have] `[%fact %json (slym u.convert noun.event)]]
?~ jsyn ~
%- some
:- ?~ from ~
@ -2816,11 +2802,17 @@
$: state=(each timer duct)
next-id=@ud
last-ack=@da
events=(qeu [id=@ud request-id=@ud =channel-event])
events=(qeu [id=@ud request-id=@ud channel-event=channel-event-2])
unacked=(map @ud @ud)
subscriptions=(map @ud [ship=@p app=term =path duc=duct])
heartbeat=(unit timer)
==
+$ channel-event-2
$% $>(%poke-ack sign:agent:gall)
$>(%watch-ack sign:agent:gall)
$>(%kick sign:agent:gall)
[%fact =mark =noun]
==
--
|= old=axle-any
^+ http-server-gate
@ -2853,16 +2845,36 @@
%~2023.2.17
$(old [%~2023.3.16 [bindings ~ +]:server-state.old])
::
:: inits channel mode
:: inits channel mode and desks in unacked events
::
%~2023.3.16
::
:: Prior to this desks were not part of events.channel.
:: When serializing we used to rely on the desk stored in
:: subscriptions.channel, but this state is deleted when we clog.
:: This migration adds the desk to events.channel, but we can not
:: scry in +load to populate the desks in the old events,
:: so we just kick all subscriptions on all channels.
%= $
date.old %~2023.4.11
::
server-state.old
%= server-state.old
session.channel-state
(~(run by session.channel-state.server-state.old) (lead %json))
%- ~(run by session.channel-state.server-state.old)
|= c=channel-2
=; new-events
:- %json
c(events new-events, unacked ~, subscriptions ~)
=| events=(qeu [id=@ud request-id=@ud =channel-event])
=/ l ~(tap in ~(key by subscriptions.c))
|-
?~ l events
%= $
l t.l
next-id.c +(next-id.c)
events (~(put to events) [next-id.c i.l %kick ~])
==
==
==
::