eyre: add support for noun-based channels

Adds a "mode" to channels, which can be set to either %json (current
behavior) or %jam. For %jam channels, aside from the SSE framing, all
communication happens through @uw-encoded jammed nouns. This applies to
both outgoing channel events, as well as incoming channel requests.

We choose @uw-style encoding because raw bytestreams are fragile and
cannot work inside the SSE stream context.

Currently, a separate endpoint (/~/channel-jam/etc) is used to indicate
%jam as the desired mode for a channel. We will probably want to make
this a bit cleaner, not least because it's not currently implemented as
a formal standalone endpoint, but also to give it stronger aesthetic
equivalence with the existing channel endpoint. Putting the mode in the
file extension is a tempting option here, but semantically not quite
right.

Connecting to the same channel across multiple modes is currently
supported, but it's untested, and unclear whether this is desirable or
not.
This commit is contained in:
fang 2022-07-04 17:05:13 +02:00 committed by xiphiness
parent 25c4740945
commit 10fe204c9e
2 changed files with 159 additions and 59 deletions

View File

@ -1523,7 +1523,8 @@
:: events since then.
::
+$ channel
$: :: channel-state: expiration time or the duct currently listening
$: mode=?(%json %jam)
:: channel-state: expiration time or the duct currently listening
::
:: For each channel, there is at most one open EventSource
:: connection. A 400 is issues on duplicate attempts to connect to the

View File

@ -70,7 +70,7 @@
++ axle
$: :: date: date at which http-server's state was updated to this data structure
::
date=%~2022.7.26
date=%~2023.3.15
:: server-state: state of inbound requests
::
=server-state
@ -119,9 +119,12 @@
$% :: %ack: acknowledges that the client has received events up to :id
::
[%ack event-id=@ud]
:: %poke: pokes an application, translating :json to :mark.
:: %poke: pokes an application, validating :noun against :mark
::
[%poke request-id=@ud ship=@p app=term mark=@tas =json]
[%poke request-id=@ud ship=@p app=term mark=@tas =noun]
:: %poke-json: pokes an application, translating :json to :mark
::
[%poke-json request-id=@ud ship=@p app=term mark=@tas =json]
:: %watch: subscribes to an application path
::
[%subscribe request-id=@ud ship=@p app=term =path]
@ -200,11 +203,32 @@
(sub u.sus ack)
:: +parse-channel-request: parses a list of channel-requests
::
++ parse-channel-request
|= [mode=?(%json %jam) body=octs]
^- (each (list channel-request) @t)
?- mode
%json
?~ maybe-json=(de-json:html q.body)
|+'put body not json'
?~ maybe-requests=(parse-channel-request-json u.maybe-json)
|+'invalid channel json'
&+u.maybe-requests
::
%jam
?~ maybe-noun=(bind (slaw %uw q.body) cue)
|+'invalid request format'
?~ maybe-reqs=((soft (list channel-request)) u.maybe-noun)
~& [%miss u.maybe-noun]
|+'invalid request data'
&+u.maybe-reqs
==
:: +parse-channel-request-json: parses a json list of channel-requests
::
:: Parses a json array into a list of +channel-request. If any of the items
:: in the list fail to parse, the entire thing fails so we can 400 properly
:: to the client.
::
++ parse-channel-request
++ parse-channel-request-json
|= request-list=json
^- (unit (list channel-request))
:: parse top
@ -220,7 +244,9 @@
?: =('ack' u.maybe-key)
((pe %ack (ot event-id+ni ~)) item)
?: =('poke' u.maybe-key)
((pe %poke (ot id+ni ship+(su fed:ag) app+so mark+(su sym) json+some ~)) item)
%. item
%+ pe %poke-json
(ot id+ni ship+(su fed:ag) app+so mark+(su sym) json+some ~)
?: =('subscribe' u.maybe-key)
%. item
%+ pe %subscribe
@ -1130,6 +1156,11 @@
::
%^ return-static-data-on-duct 400 'text/html'
(error-page 400 authenticated url.request "malformed channel url")
=/ mode=?(%json %jam)
::TODO go off file extention instead?
?+ i.t.site.request-line %json
%channel-jam %jam
==
:: channel-id: unique channel id parsed out of url
::
=+ channel-id=i.t.t.site.request-line
@ -1137,13 +1168,13 @@
?: =('PUT' method.request)
:: PUT methods starts/modifies a channel, and returns a result immediately
::
(on-put-request channel-id request)
(on-put-request channel-id mode request)
::
?: =('GET' method.request)
(on-get-request channel-id request)
(on-get-request channel-id mode request)
?: =('POST' method.request)
:: POST methods are used solely for deleting channels
(on-put-request channel-id request)
(on-put-request channel-id mode request)
::
~& %session-not-a-put
[~ state]
@ -1198,7 +1229,7 @@
:: state.
::
++ update-timeout-timer-for
|= channel-id=@t
|= [mode=?(%json %jam) channel-id=@t]
^+ ..update-timeout-timer-for
:: when our callback should fire
::
@ -1210,7 +1241,7 @@
%_ ..update-timeout-timer-for
session.channel-state.state
%+ ~(put by session.channel-state.state) channel-id
[[%& expiration-time duct] 0 now ~ ~ ~ ~]
[mode [%& expiration-time duct] 0 now ~ ~ ~ ~]
::
moves
[(set-timeout-move channel-id expiration-time) moves]
@ -1262,13 +1293,16 @@
:: client in text/event-stream format.
::
++ on-get-request
|= [channel-id=@t =request:http]
|= [channel-id=@t mode=?(%json %jam) =request:http]
^- [(list move) server-state]
:: if there's no channel-id, we must 404
::
?~ maybe-channel=(~(get by session.channel-state.state) channel-id)
%^ return-static-data-on-duct 404 'text/html'
(error-page 404 %.y url.request ~)
::
::TODO consider forbidding connection if !=(mode mode.u.maybe-channel)
::
:: when opening an event-stream, we must cancel our timeout timer
:: if there's no duct already bound. Else, kill the old request
:: and replace it
@ -1313,8 +1347,10 @@
=/ sign
(channel-event-to-sign u.maybe-channel request-id channel-event)
?~ sign $
?~ jive=(sign-to-json u.maybe-channel request-id u.sign) $
$(events [(event-json-to-wall id +.u.jive) events])
=/ said
(sign-to-tape u.maybe-channel(mode mode) request-id u.sign)
?~ said $
$(events [(event-tape-to-wall id +.u.said) events])
:: send the start event to the client
::
=^ http-moves state
@ -1346,13 +1382,18 @@
::
=/ heartbeat-time=@da (add now ~s20)
=/ heartbeat (set-heartbeat-move channel-id heartbeat-time)
:: clear the event queue, record the duct for future output and
:: record heartbeat-time for possible future cancel
:: clear the event queue, record the mode & duct for future output,
:: and record heartbeat-time for possible future cancel
::
=. session.channel-state.state
%+ ~(jab by session.channel-state.state) channel-id
|= =channel
channel(events ~, state [%| duct], heartbeat (some [heartbeat-time duct]))
%_ channel
mode mode
events ~
state [%| duct]
heartbeat (some [heartbeat-time duct])
==
::
[[heartbeat :(weld http-moves cancel-moves moves)] state]
:: +acknowledge-events: removes events before :last-event-id on :channel-id
@ -1377,26 +1418,23 @@
:: a set of commands in JSON format in the body of the message.
::
++ on-put-request
|= [channel-id=@t =request:http]
|= [channel-id=@t mode=?(%json %jam) =request:http]
^- [(list move) server-state]
:: error when there's no body
::
?~ body.request
%^ return-static-data-on-duct 400 'text/html'
(error-page 400 %.y url.request "no put body")
:: if the incoming body isn't json, this is a bad request, 400.
:: if we cannot parse requests from the body, give an error
::
?~ maybe-json=(de-json:html q.u.body.request)
=/ maybe-requests=(each (list channel-request) @t)
(parse-channel-request mode u.body.request)
?: ?=(%| -.maybe-requests)
%^ return-static-data-on-duct 400 'text/html'
(error-page 400 %.y url.request "put body not json")
:: parse the json into an array of +channel-request items
::
?~ maybe-requests=(parse-channel-request u.maybe-json)
%^ return-static-data-on-duct 400 'text/html'
(error-page 400 %.y url.request "invalid channel json")
(error-page 400 & url.request (trip p.maybe-requests))
:: while weird, the request list could be empty
::
?: =(~ u.maybe-requests)
?: =(~ p.maybe-requests)
%^ return-static-data-on-duct 400 'text/html'
(error-page 400 %.y url.request "empty list of actions")
:: check for the existence of the channel-id
@ -1405,10 +1443,10 @@
:: :channel-timeout from now. if we have one which has a timer, update
:: that timer.
::
=. ..on-put-request (update-timeout-timer-for channel-id)
=. ..on-put-request (update-timeout-timer-for mode channel-id)
:: for each request, execute the action passed in
::
=+ requests=u.maybe-requests
=+ requests=p.maybe-requests
:: gall-moves: put moves here first so we can flop for ordering
::
:: TODO: Have an error state where any invalid duplicate subscriptions
@ -1439,7 +1477,7 @@
requests t.requests
==
::
%poke
?(%poke %poke-json)
::
=. gall-moves
:_ gall-moves
@ -1447,7 +1485,12 @@
:^ duct %pass /channel/poke/[channel-id]/(scot %ud request-id.i.requests)
=, i.requests
:* %g %deal `sock`[our ship] app
`task:agent:gall`[%poke-as mark %json !>(json)]
^- task:agent:gall
:+ %poke-as mark
?- -.i.requests
%poke [%noun !>(noun)]
%poke-json [%json !>(json)]
==
==
::
$(requests t.requests)
@ -1580,18 +1623,16 @@
:: if conversion succeeds, we *can* send it. if the client is actually
:: connected, we *will* send it immediately.
::
=/ jive=(unit (quip move json))
(sign-to-json u.channel request-id sign)
=/ json=(unit json)
?~(jive ~ `+.u.jive)
=? moves ?=(^ jive)
(weld moves -.u.jive)
=* sending &(?=([%| *] state.u.channel) ?=(^ json))
=/ said=(unit (quip move tape))
(sign-to-tape u.channel request-id sign)
=? moves ?=(^ said)
(weld moves -.u.said)
=* sending &(?=([%| *] state.u.channel) ?=(^ said))
::
=/ next-id next-id.u.channel
:: if we can send it, store the event as unacked
::
=? events.u.channel ?=(^ json)
=? events.u.channel ?=(^ said)
%- ~(put to events.u.channel)
[next-id request-id (sign-to-channel-event sign)]
:: if it makes sense to do so, send the event to the client
@ -1607,11 +1648,11 @@
::
^= data
%- wall-to-octs
(event-json-to-wall next-id (need json))
(event-tape-to-wall next-id +:(need said))
::
complete=%.n
==
=? next-id ?=(^ json) +(next-id)
=? next-id ?=(^ said) +(next-id)
:: update channel's unacked counts, find out if clogged
::
=^ clogged unacked.u.channel
@ -1619,7 +1660,7 @@
:: and of course don't count events we can't send as unacked.
::
?: ?| !?=(%fact -.sign)
?=(~ json)
?=(~ said)
==
[| unacked.u.channel]
=/ num=@ud
@ -1632,7 +1673,7 @@
:: if we're clogged, or we ran into an event we can't serialize,
:: kill this gall subscription.
::
=* kicking |(clogged ?=(~ json))
=* kicking |(clogged ?=(~ said))
=? moves kicking
:_ moves
::NOTE this shouldn't crash because we
@ -1662,8 +1703,8 @@
::
^= data
%- wall-to-octs
%+ event-json-to-wall next-id
+:(need (sign-to-json u.channel request-id %kick ~))
%+ event-tape-to-wall next-id
+:(need (sign-to-tape u.channel request-id %kick ~))
::
complete=%.n
==
@ -1716,6 +1757,17 @@
?: ?=(%| -.res)
((slog leaf+"eyre: stale fact of mark {(trip have)}" ~) ~)
`[%fact have p.res]
:: +sign-to-tape: render sign from request-id in specified mode
::
++ sign-to-tape
|= [=channel request-id=@ud =sign:agent:gall]
^- (unit (quip move tape))
?- mode.channel
%json %+ bind (sign-to-json channel request-id sign)
|=((quip move json) [+<- (en-json:html +<+)])
%jam =- `[~ (scow %uw (jam -))]
[request-id (sign-to-channel-event sign)]
==
:: +sign-to-json: render sign from request-id as json channel event
::
++ sign-to-json
@ -1785,12 +1837,12 @@
==
==
::
++ event-json-to-wall
~% %eyre-json-to-wall ..part ~
|= [event-id=@ud =json]
++ event-tape-to-wall
~% %eyre-tape-to-wall ..part ~
|= [event-id=@ud =tape]
^- wall
:~ (weld "id: " (format-ud-as-integer event-id))
(weld "data: " (en-json:html json))
(weld "data: " tape)
""
==
::
@ -2097,6 +2149,8 @@
::
=/ request-line (parse-request-line url)
=/ parsed-url=(list @t) site.request-line
=? parsed-url ?=([%'~' %channel-jam *] parsed-url)
parsed-url(i.t %channel)
::
=/ bindings bindings.state
|-
@ -2573,6 +2627,9 @@
::
?^ error.sign
[[duct %slip %d %flog %crud %wake u.error.sign]~ http-server-gate]
::NOTE we are not concerned with expiring channels that are still in
:: use. we require acks for messages, which bump their session's
:: timer. channels have their own expiry timer, too.
:: remove cookies that have expired
::
=* sessions sessions.authentication-state.server-state.ax
@ -2612,15 +2669,8 @@
:: +load: migrate old state to new state (called on vane reload)
::
++ load
=> |%
++ axle-old
%+ cork
axle
|= =axle
axle(date %~2020.10.18)
--
|= old=$%(axle axle-old)
^+ ..^$
|^ |= old=$%(axle axle-any)
^+ ..^^$
::
?- -.old
%~2020.10.18
@ -2640,8 +2690,57 @@
?& ?=(^ secure.ports.server-state.old)
?=(^ secure.http-config.server-state.old)
==
..^$(ax old)
$(old (axle-0-to-1))
::
%~2023.3.15
..^^$(ax old)
==
::
+$ axle-any
$% axle
axle-0
==
::
++ axle-0-to-1
|= ax=axle-0
^- axle
:- %~2023.3.15
^- server-state
::TODO add /~/channel-jam binding
%= server-state.ax
session.channel-state
(~(run by session.channel-state.server-state.ax) (lead %json))
==
::
+$ axle-0
$: date=?(%~2022.7.26 %~2020.10.18)
$= server-state
$: bindings=(list [=binding =duct =action])
=cors-registry
connections=(map duct outstanding-connection)
=authentication-state
channel-state=channel-state-0
domains=(set turf)
=http-config
ports=[insecure=@ud secure=(unit @ud)]
outgoing-duct=duct
== ==
::
+$ channel-state-0
$: session=(map @t channel-0)
duct-to-key=(map duct @t)
==
::
+$ channel-0
$: state=(each timer duct)
next-id=@ud
last-ack=@da
events=(qeu [id=@ud request-id=@ud =channel-event])
unacked=(map @ud @ud)
subscriptions=(map @ud [ship=@p app=term =path duc=duct])
heartbeat=(unit timer)
==
--
:: +stay: produce current state
::
++ stay `axle`ax