lull, eyre: actually send events on clogged channel reconnect

This commit is contained in:
pkova 2023-04-20 16:18:52 +03:00
parent f3d2df087a
commit a1fd3a6792
2 changed files with 104 additions and 57 deletions

View File

@ -1520,7 +1520,7 @@
$% $>(%poke-ack sign:agent:gall)
$>(%watch-ack sign:agent:gall)
$>(%kick sign:agent:gall)
[%fact =mark =noun]
[%fact =desk =mark =noun]
==
:: channel: connection to the browser
::

View File

@ -70,7 +70,7 @@
++ axle
$: :: date: date at which http-server's state was updated to this data structure
::
date=%~2023.4.11
date=%~2023.4.19
:: server-state: state of inbound requests
::
=server-state
@ -1395,11 +1395,8 @@
::NOTE these will only fail if the mark and/or json types changed,
:: since conversion failure also gets caught during first receive.
:: we can't do anything about this, so consider it unsupported.
=/ sign
(channel-event-to-sign u.maybe-channel request-id channel-event)
?~ sign $
=/ said
(sign-to-tape u.maybe-channel request-id u.sign)
(channel-event-to-tape u.maybe-channel request-id channel-event)
?~ said $
$(events [(event-tape-to-wall id +.u.said) events])
:: send the start event to the client
@ -1433,7 +1430,7 @@
::
=/ heartbeat-time=@da (add now ~s20)
=/ heartbeat (set-heartbeat-move channel-id heartbeat-time)
:: clear the event queue, record the mode & duct for future output,
:: record the mode & duct for future output,
:: and record heartbeat-time for possible future cancel
::
=. session.channel-state.state
@ -1441,7 +1438,6 @@
|= =channel
%_ channel
mode mode
events ~
state [%| duct]
heartbeat (some [heartbeat-time duct])
==
@ -1679,8 +1675,12 @@
:: if conversion succeeds, we *can* send it. if the client is actually
:: connected, we *will* send it immediately.
::
=/ maybe-channel-event=(unit channel-event)
(sign-to-channel-event sign u.channel request-id)
?~ maybe-channel-event [~ state]
=/ =channel-event u.maybe-channel-event
=/ said=(unit (quip move tape))
(sign-to-tape u.channel request-id sign)
(channel-event-to-tape u.channel request-id channel-event)
=? moves ?=(^ said)
(weld moves -.u.said)
=* sending &(?=([%| *] state.u.channel) ?=(^ said))
@ -1690,7 +1690,7 @@
::
=? events.u.channel ?=(^ said)
%- ~(put to events.u.channel)
[next-id request-id (sign-to-channel-event sign)]
[next-id request-id channel-event]
:: if it makes sense to do so, send the event to the client
::
=? moves sending
@ -1728,7 +1728,7 @@
:: if we're clogged, or we ran into an event we can't serialize,
:: kill this gall subscription.
::
=* msg=tape "on {(trip channel-id)} for {(trip request-id)}"
=* msg=tape "on {(trip channel-id)} for {(scow %ud request-id)}"
=/ kicking=?
?: clogged
((trace 0 |.("clogged {msg}")) &)
@ -1752,7 +1752,9 @@
subscriptions (~(del by subscriptions.u.channel) request-id)
unacked (~(del by unacked.u.channel) request-id)
events %- ~(put to events.u.channel)
[next-id request-id (sign-to-channel-event %kick ~)]
:+ next-id
request-id
(need (sign-to-channel-event [%kick ~] u.channel request-id))
==
:: if a client is connected, send the kick event to them
::
@ -1765,7 +1767,7 @@
^= data
%- wall-to-octs
%+ event-tape-to-wall next-id
+:(need (sign-to-tape u.channel request-id %kick ~))
+:(need (channel-event-to-tape u.channel request-id %kick ~))
::
complete=%.n
==
@ -1780,10 +1782,12 @@
:: +sign-to-channel-event: strip the vase from a sign:agent:gall
::
++ sign-to-channel-event
|= =sign:agent:gall
^- channel-event
?. ?=(%fact -.sign) sign
[%fact [p q.q]:cage.sign]
|= [=sign:agent:gall =channel request-id=@ud]
^- (unit channel-event)
?. ?=(%fact -.sign) `sign
?~ desk=(app-to-desk channel request-id) ~
:- ~
[%fact u.desk [p q.q]:cage.sign]
:: +app-to-desk
::
++ app-to-desk
@ -1791,70 +1795,51 @@
^- (unit desk)
=/ sub (~(get by subscriptions.channel) request-id)
?~ sub
((trace 0 |.("no subscription for request-id {(trip request-id)}")) ~)
((trace 0 |.("no subscription for request-id {(scow %ud request-id)}")) ~)
=/ des=(unit (unit cage))
(rof ~ %gd [our app.u.sub da+now] ~)
?. ?=([~ ~ *] des)
((trace 0 |.("no desk for app {<app.u.sub>}")) ~)
`!<(=desk q.u.u.des)
:: +channel-event-to-sign: attempt to recover a sign from a channel-event
:: +channel-event-to-tape: render channel-event from request-id in specified mode
::
++ channel-event-to-sign
~% %eyre-channel-event-to-sign ..part ~
|= [=channel request-id=@ud event=channel-event]
^- (unit sign:agent:gall)
?. ?=(%fact -.event) `event
:: rebuild vase for fact data
::
=/ des=(unit desk) (app-to-desk channel request-id)
?~ des ~
=* have=mark mark.event
=/ val=(unit (unit cage))
(rof ~ %cb [our u.des da+now] /[have])
?. ?=([~ ~ *] val)
((trace 0 |.("no mark {(trip have)}")) ~)
=+ !<(=dais:clay q.u.u.val)
=/ res (mule |.((vale:dais noun.event)))
?: ?=(%| -.res)
((trace 0 |.("stale fact of mark {(trip have)}")) ~)
`[%fact have p.res]
:: +sign-to-tape: render sign from request-id in specified mode
::
++ sign-to-tape
|= [=channel request-id=@ud =sign:agent:gall]
++ channel-event-to-tape
|= [=channel request-id=@ud =channel-event]
^- (unit (quip move tape))
?- mode.channel
%json %+ bind (sign-to-json channel request-id sign)
%json %+ bind (channel-event-to-json channel request-id channel-event)
|=((quip move json) [+<- (en-json:html +<+)])
%jam =- `[~ (scow %uw (jam -))]
[request-id (sign-to-channel-event sign)]
[request-id channel-event]
==
:: +sign-to-json: render sign from request-id as json channel event
:: +channel-event-to-json: render channel event as json channel event
::
++ sign-to-json
~% %sign-to-json ..part ~
|= [=channel request-id=@ud =sign:agent:gall]
++ channel-event-to-json
~% %eyre-channel-event-to-json ..part ~
|= [=channel request-id=@ud event=channel-event]
^- (unit (quip move json))
:: for facts, we try to convert the result to json
::
=/ [from=(unit [=desk =mark]) jsyn=(unit sign:agent:gall)]
?. ?=(%fact -.sign) [~ `sign]
?: ?=(%json p.cage.sign) [~ `sign]
?. ?=(%fact -.event) [~ `event]
?: ?=(%json mark.event)
?~ jsin=((soft json) noun.event)
%. [~ ~]
(slog leaf+"eyre: dropping fake json for {(scow %ud request-id)}" ~)
[~ `[%fact %json !>(u.jsin)]]
:: find and use tube from fact mark to json
::
=/ des=(unit desk) (app-to-desk channel request-id)
?~ des [~ ~]
::
=* have=mark p.cage.sign
=* have=mark mark.event
=/ convert=(unit vase)
=/ cag=(unit (unit cage))
(rof ~ %cf [our u.des da+now] /[have]/json)
(rof ~ %cf [our desk.event da+now] /[have]/json)
?. ?=([~ ~ *] cag) ~
`q.u.u.cag
?~ convert
((trace 0 |.("no convert from {(trip have)} to json")) [~ ~])
~| "conversion failed from {(trip have)} to json"
[`[u.des have] `[%fact %json (slym u.convert q.q.cage.sign)]]
[`[desk.event have] `[%fact %json (slym u.convert noun.event)]]
?~ jsyn ~
%- some
:- ?~ from ~
@ -2768,7 +2753,8 @@
[date=%~2022.7.26 server-state=server-state-0]
[date=%~2023.2.17 server-state=server-state-1]
[date=%~2023.3.16 server-state=server-state-2]
[date=%~2023.4.11 =server-state]
[date=%~2023.4.11 server-state=server-state-3]
[date=%~2023.4.19 =server-state]
==
::
+$ server-state-0
@ -2817,11 +2803,44 @@
$: state=(each timer duct)
next-id=@ud
last-ack=@da
events=(qeu [id=@ud request-id=@ud =channel-event])
events=(qeu [id=@ud request-id=@ud channel-event=channel-event-3])
unacked=(map @ud @ud)
subscriptions=(map @ud [ship=@p app=term =path duc=duct])
heartbeat=(unit timer)
==
+$ server-state-3
$: bindings=(list [=binding =duct =action])
cache=(map url=@t [aeon=@ud val=(unit cache-entry)])
=cors-registry
connections=(map duct outstanding-connection)
=authentication-state
channel-state=channel-state-3 :: <- new
domains=(set turf)
=http-config
ports=[insecure=@ud secure=(unit @ud)]
outgoing-duct=duct
verb=@
==
+$ channel-state-3
$: session=(map @t channel-3)
duct-to-key=(map duct @t)
==
+$ channel-3
$: mode=?(%json %jam) :: <- new
state=(each timer duct)
next-id=@ud
last-ack=@da
events=(qeu [id=@ud request-id=@ud channel-event=channel-event-3])
unacked=(map @ud @ud)
subscriptions=(map @ud [ship=@p app=term =path duc=duct])
heartbeat=(unit timer)
==
+$ channel-event-3
$% $>(%poke-ack sign:agent:gall)
$>(%watch-ack sign:agent:gall)
$>(%kick sign:agent:gall)
[%fact =mark =noun]
==
--
|= old=axle-any
^+ http-server-gate
@ -2868,6 +2887,34 @@
==
::
%~2023.4.11
::
:: Prior to this desks were not part of events.channel.
:: When serializing we used to rely on the desk stored in
:: subscriptions.channel, but this state is deleted when we clog.
:: This migration adds the desk to events.channel, but we can not
:: scry in +load to populate the desks in the old events,
:: so we just kick all subscriptions on all channels.
::
=; new-channel-sessions
%= $
-.old %~2023.4.19
session.channel-state.server-state.old new-channel-sessions
==
%- ~(run by session.channel-state.server-state.old)
|= c=channel-3
=; new-events
c(events new-events, unacked ~, subscriptions ~)
=| events=(qeu [id=@ud request-id=@ud =channel-event])
=/ l ~(tap in ~(key by subscriptions.c))
|-
?~ l events
%= $
l t.l
next-id.c +(next-id.c)
events (~(put to events) [next-id.c i.l %kick ~])
==
::
%~2023.4.19
http-server-gate(ax old)
==
:: +stay: produce current state