Merge pull request #6472 from urbit/x/json-bgon

eyre: add support for noun-based channels
This commit is contained in:
Josh Lehman 2023-04-12 12:32:12 -07:00 committed by GitHub
commit 03e85551f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 234 additions and 134 deletions

View File

@ -7,7 +7,7 @@
|_ mud=@
++ grow
|%
++ mime [/application/octet-stream (as-octs mud)]
++ mime [/application/x-urb-jam (as-octs mud)]
--
++ grab
|% :: convert from

View File

@ -1536,7 +1536,8 @@
:: events since then.
::
+$ channel
$: :: channel-state: expiration time or the duct currently listening
$: mode=?(%json %jam)
:: channel-state: expiration time or the duct currently listening
::
:: For each channel, there is at most one open EventSource
:: connection. A 400 is issues on duplicate attempts to connect to the

View File

@ -67,8 +67,12 @@
:: more structures
::
|%
+$ axle
$: %~2023.3.16
++ axle
$: :: date: date at which http-server's state was updated to this data structure
::
date=%~2023.4.11
:: server-state: state of inbound requests
::
=server-state
==
:: +server-state: state relating to open inbound HTTP connections
@ -121,9 +125,12 @@
$% :: %ack: acknowledges that the client has received events up to :id
::
[%ack event-id=@ud]
:: %poke: pokes an application, translating :json to :mark.
:: %poke: pokes an application, validating :noun against :mark
::
[%poke request-id=@ud ship=@p app=term mark=@tas =json]
[%poke request-id=@ud ship=@p app=term mark=@tas =noun]
:: %poke-json: pokes an application, translating :json to :mark
::
[%poke-json request-id=@ud ship=@p app=term mark=@tas =json]
:: %watch: subscribes to an application path
::
[%subscribe request-id=@ud ship=@p app=term =path]
@ -200,13 +207,44 @@
%+ ~(put by unacked) rid
?: (lte u.sus ack) 0
(sub u.sus ack)
:: +find-channel-mode: deduce requested mode from headers
::
++ find-channel-mode
|= [met=method:http hes=header-list:http]
^- ?(%json %jam)
=+ ^- [hed=@t jam=@t]
?: ?=(%'GET' met) ['x-channel-format' 'application/x-urb-jam']
['content-type' 'application/x-urb-jam']
=+ typ=(bind (get-header:http hed hes) :(cork trip cass crip))
?:(=(`jam typ) %jam %json)
:: +parse-channel-request: parses a list of channel-requests
::
++ parse-channel-request
|= [mode=?(%json %jam) body=octs]
^- (each (list channel-request) @t)
?- mode
%json
?~ maybe-json=(de-json:html q.body)
|+'put body not json'
?~ maybe-requests=(parse-channel-request-json u.maybe-json)
|+'invalid channel json'
&+u.maybe-requests
::
%jam
?~ maybe-noun=(bind (slaw %uw q.body) cue)
|+'invalid request format'
?~ maybe-reqs=((soft (list channel-request)) u.maybe-noun)
~& [%miss u.maybe-noun]
|+'invalid request data'
&+u.maybe-reqs
==
:: +parse-channel-request-json: parses a json list of channel-requests
::
:: Parses a json array into a list of +channel-request. If any of the items
:: in the list fail to parse, the entire thing fails so we can 400 properly
:: to the client.
::
++ parse-channel-request
++ parse-channel-request-json
|= request-list=json
^- (unit (list channel-request))
:: parse top
@ -222,7 +260,9 @@
?: =('ack' u.maybe-key)
((pe %ack (ot event-id+ni ~)) item)
?: =('poke' u.maybe-key)
((pe %poke (ot id+ni ship+(su fed:ag) app+so mark+(su sym) json+some ~)) item)
%. item
%+ pe %poke-json
(ot id+ni ship+(su fed:ag) app+so mark+(su sym) json+some ~)
?: =('subscribe' u.maybe-key)
%. item
%+ pe %subscribe
@ -1234,7 +1274,7 @@
:: state.
::
++ update-timeout-timer-for
|= channel-id=@t
|= [mode=?(%json %jam) channel-id=@t]
^+ ..update-timeout-timer-for
:: when our callback should fire
::
@ -1246,7 +1286,7 @@
%_ ..update-timeout-timer-for
session.channel-state.state
%+ ~(put by session.channel-state.state) channel-id
[[%& expiration-time duct] 0 now ~ ~ ~ ~]
[mode [%& expiration-time duct] 0 now ~ ~ ~ ~]
::
moves
[(set-timeout-move channel-id expiration-time) moves]
@ -1301,10 +1341,19 @@
|= [channel-id=@t =request:http]
^- [(list move) server-state]
:: if there's no channel-id, we must 404
::TODO but arm description says otherwise?
::
?~ maybe-channel=(~(get by session.channel-state.state) channel-id)
%^ return-static-data-on-duct 404 'text/html'
(error-page 404 %.y url.request ~)
:: find the requested "mode" and make sure it doesn't conflict
::
=/ mode=?(%json %jam)
(find-channel-mode %'GET' header-list.request)
?. =(mode mode.u.maybe-channel)
%^ return-static-data-on-duct 406 'text/html'
=; msg=tape (error-page 406 %.y url.request msg)
"channel already established in {(trip mode.u.maybe-channel)} mode"
:: when opening an event-stream, we must cancel our timeout timer
:: if there's no duct already bound. Else, kill the old request
:: and replace it
@ -1349,8 +1398,10 @@
=/ sign
(channel-event-to-sign u.maybe-channel request-id channel-event)
?~ sign $
?~ jive=(sign-to-json u.maybe-channel request-id u.sign) $
$(events [(event-json-to-wall id +.u.jive) events])
=/ said
(sign-to-tape u.maybe-channel request-id u.sign)
?~ said $
$(events [(event-tape-to-wall id +.u.said) events])
:: send the start event to the client
::
=^ http-moves state
@ -1382,13 +1433,18 @@
::
=/ heartbeat-time=@da (add now ~s20)
=/ heartbeat (set-heartbeat-move channel-id heartbeat-time)
:: clear the event queue, record the duct for future output and
:: record heartbeat-time for possible future cancel
:: clear the event queue, record the mode & duct for future output,
:: and record heartbeat-time for possible future cancel
::
=. session.channel-state.state
%+ ~(jab by session.channel-state.state) channel-id
|= =channel
channel(events ~, state [%| duct], heartbeat (some [heartbeat-time duct]))
%_ channel
mode mode
events ~
state [%| duct]
heartbeat (some [heartbeat-time duct])
==
::
[[heartbeat :(weld http-moves cancel-moves moves)] state]
:: +acknowledge-events: removes events before :last-event-id on :channel-id
@ -1420,19 +1476,19 @@
?~ body.request
%^ return-static-data-on-duct 400 'text/html'
(error-page 400 %.y url.request "no put body")
:: if the incoming body isn't json, this is a bad request, 400.
::
?~ maybe-json=(de-json:html q.u.body.request)
%^ return-static-data-on-duct 400 'text/html'
(error-page 400 %.y url.request "put body not json")
:: parse the json into an array of +channel-request items
=/ mode=?(%json %jam)
(find-channel-mode %'PUT' header-list.request)
:: if we cannot parse requests from the body, give an error
::
?~ maybe-requests=(parse-channel-request u.maybe-json)
=/ maybe-requests=(each (list channel-request) @t)
(parse-channel-request mode u.body.request)
?: ?=(%| -.maybe-requests)
%^ return-static-data-on-duct 400 'text/html'
(error-page 400 %.y url.request "invalid channel json")
(error-page 400 & url.request (trip p.maybe-requests))
:: while weird, the request list could be empty
::
?: =(~ u.maybe-requests)
?: =(~ p.maybe-requests)
%^ return-static-data-on-duct 400 'text/html'
(error-page 400 %.y url.request "empty list of actions")
:: check for the existence of the channel-id
@ -1441,10 +1497,10 @@
:: :channel-timeout from now. if we have one which has a timer, update
:: that timer.
::
=. ..on-put-request (update-timeout-timer-for channel-id)
=. ..on-put-request (update-timeout-timer-for mode channel-id)
:: for each request, execute the action passed in
::
=+ requests=u.maybe-requests
=+ requests=p.maybe-requests
:: gall-moves: put moves here first so we can flop for ordering
::
:: TODO: Have an error state where any invalid duplicate subscriptions
@ -1475,7 +1531,7 @@
requests t.requests
==
::
%poke
?(%poke %poke-json)
::
=. gall-moves
:_ gall-moves
@ -1483,7 +1539,12 @@
:^ duct %pass /channel/poke/[channel-id]/(scot %ud request-id.i.requests)
=, i.requests
:* %g %deal `sock`[our ship] app
`task:agent:gall`[%poke-as mark %json !>(json)]
^- task:agent:gall
:+ %poke-as mark
?- -.i.requests
%poke [%noun !>(noun)]
%poke-json [%json !>(json)]
==
==
::
$(requests t.requests)
@ -1618,18 +1679,16 @@
:: if conversion succeeds, we *can* send it. if the client is actually
:: connected, we *will* send it immediately.
::
=/ jive=(unit (quip move json))
(sign-to-json u.channel request-id sign)
=/ json=(unit json)
?~(jive ~ `+.u.jive)
=? moves ?=(^ jive)
(weld moves -.u.jive)
=* sending &(?=([%| *] state.u.channel) ?=(^ json))
=/ said=(unit (quip move tape))
(sign-to-tape u.channel request-id sign)
=? moves ?=(^ said)
(weld moves -.u.said)
=* sending &(?=([%| *] state.u.channel) ?=(^ said))
::
=/ next-id next-id.u.channel
:: if we can send it, store the event as unacked
::
=? events.u.channel ?=(^ json)
=? events.u.channel ?=(^ said)
%- ~(put to events.u.channel)
[next-id request-id (sign-to-channel-event sign)]
:: if it makes sense to do so, send the event to the client
@ -1645,11 +1704,11 @@
::
^= data
%- wall-to-octs
(event-json-to-wall next-id (need json))
(event-tape-to-wall next-id +:(need said))
::
complete=%.n
==
=? next-id ?=(^ json) +(next-id)
=? next-id ?=(^ said) +(next-id)
:: update channel's unacked counts, find out if clogged
::
=^ clogged unacked.u.channel
@ -1657,7 +1716,7 @@
:: and of course don't count events we can't send as unacked.
::
?: ?| !?=(%fact -.sign)
?=(~ json)
?=(~ said)
==
[| unacked.u.channel]
=/ num=@ud
@ -1673,7 +1732,7 @@
=/ kicking=?
?: clogged
((trace 0 |.("clogged {msg}")) &)
?. ?=(~ json) |
?. ?=(~ said) |
((trace 0 |.("can't serialize event, kicking {msg}")) &)
=? moves kicking
:_ moves
@ -1705,8 +1764,8 @@
::
^= data
%- wall-to-octs
%+ event-json-to-wall next-id
+:(need (sign-to-json u.channel request-id %kick ~))
%+ event-tape-to-wall next-id
+:(need (sign-to-tape u.channel request-id %kick ~))
::
complete=%.n
==
@ -1759,6 +1818,17 @@
?: ?=(%| -.res)
((trace 0 |.("stale fact of mark {(trip have)}")) ~)
`[%fact have p.res]
:: +sign-to-tape: render sign from request-id in specified mode
::
++ sign-to-tape
|= [=channel request-id=@ud =sign:agent:gall]
^- (unit (quip move tape))
?- mode.channel
%json %+ bind (sign-to-json channel request-id sign)
|=((quip move json) [+<- (en-json:html +<+)])
%jam =- `[~ (scow %uw (jam -))]
[request-id (sign-to-channel-event sign)]
==
:: +sign-to-json: render sign from request-id as json channel event
::
++ sign-to-json
@ -1827,12 +1897,12 @@
==
==
::
++ event-json-to-wall
~% %eyre-json-to-wall ..part ~
|= [event-id=@ud =json]
++ event-tape-to-wall
~% %eyre-tape-to-wall ..part ~
|= [event-id=@ud =tape]
^- wall
:~ (weld "id: " (format-ud-as-integer event-id))
(weld "data: " (en-json:html json))
(weld "data: " tape)
""
==
::
@ -2041,7 +2111,7 @@
::
=. connections.state
%. (~(del by connections.state) duct)
(trace 2 |.("{<duct>} completed"))
(trace 2 |.("{<duct>} completed"))
state
::
++ error-connection
@ -2152,6 +2222,8 @@
::
=/ request-line (parse-request-line url)
=/ parsed-url=(list @t) site.request-line
=? parsed-url ?=([%'~' %channel-jam *] parsed-url)
parsed-url(i.t %channel)
::
=/ bindings bindings.state
|-
@ -2648,6 +2720,9 @@
::
?^ error.sign
[[duct %slip %d %flog %crud %wake u.error.sign]~ http-server-gate]
::NOTE we are not concerned with expiring channels that are still in
:: use. we require acks for messages, which bump their session's
:: timer. channels have their own expiry timer, too.
:: remove cookies that have expired
::
=* sessions sessions.authentication-state.server-state.ax
@ -2689,100 +2764,111 @@
++ load
=> |%
+$ axle-any
$% [%~2020.10.18 =server-state-0]
[%~2022.7.26 =server-state-0]
[%~2023.2.17 =server-state-1]
[%~2023.3.16 =server-state]
$% [date=%~2020.10.18 server-state=server-state-0]
[date=%~2022.7.26 server-state=server-state-0]
[date=%~2023.2.17 server-state=server-state-1]
[date=%~2023.3.16 server-state=server-state-2]
[date=%~2023.4.11 =server-state]
==
::
+$ server-state-0
$: bindings=(list [=binding =duct =action])
=cors-registry
connections=(map duct outstanding-connection)
=authentication-state
=channel-state
channel-state=channel-state-2
domains=(set turf)
=http-config
ports=[insecure=@ud secure=(unit @ud)]
outgoing-duct=duct
==
::
+$ server-state-1
$: bindings=(list [=binding =duct =action])
=cors-registry
connections=(map duct outstanding-connection)
=authentication-state
=channel-state
channel-state=channel-state-2
domains=(set turf)
=http-config
ports=[insecure=@ud secure=(unit @ud)]
outgoing-duct=duct
verb=@ :: <- new
==
::
+$ server-state-2
$: bindings=(list [=binding =duct =action])
cache=(map url=@t [aeon=@ud val=(unit cache-entry)]) :: <- new
=cors-registry
connections=(map duct outstanding-connection)
=authentication-state
channel-state=channel-state-2
domains=(set turf)
=http-config
ports=[insecure=@ud secure=(unit @ud)]
outgoing-duct=duct
verb=@
==
+$ channel-state-2
$: session=(map @t channel-2)
duct-to-key=(map duct @t)
==
+$ channel-2
$: state=(each timer duct)
next-id=@ud
last-ack=@da
events=(qeu [id=@ud request-id=@ud =channel-event])
unacked=(map @ud @ud)
subscriptions=(map @ud [ship=@p app=term =path duc=duct])
heartbeat=(unit timer)
==
--
|= old=axle-any
^+ ..^$
^+ http-server-gate
?- -.old
::
:: adds /~/name
::
%~2020.10.18
=, server-state-0.old
%= ..^$
ax ^- axle
:* %~2023.3.16
(insert-binding [[~ /~/name] outgoing-duct [%name ~]] bindings)
*(map url=@t [aeon=@ud val=(unit cache-entry)])
cors-registry
connections
authentication-state
channel-state
domains
http-config
ports
outgoing-duct
0
== ==
%= $
date.old %~2022.7.26
::
bindings.server-state.old
%+ insert-binding
[[~ /~/name] outgoing-duct.server-state.old [%name ~]]
bindings.server-state.old
==
::
:: enables https redirects if certificate configured
:: inits .verb
::
%~2022.7.26
=, server-state-0.old
%= ..^$
ax ^- axle
:* %~2023.3.16
bindings
*(map url=@t [aeon=@ud val=(unit cache-entry)])
cors-registry
connections
authentication-state
channel-state
domains
http-config
ports
outgoing-duct
0
== ==
::
%~2023.2.17
=, server-state-1.old
%= ..^$
ax ^- axle
:* %~2023.3.16
bindings
*(map url=@t [aeon=@ud val=(unit cache-entry)])
cors-registry
connections
authentication-state
channel-state
domains
http-config
ports
outgoing-duct
verb
== ==
::
%~2023.3.16
:: enable https redirects if certificate configured
::
=. redirect.http-config.server-state.old
?& ?=(^ secure.ports.server-state.old)
?=(^ secure.http-config.server-state.old)
==
..^$(ax old)
$(old [%~2023.2.17 server-state.old(|8 [|8 verb=0]:server-state.old)])
::
:: inits .cache
::
%~2023.2.17
$(old [%~2023.3.16 [bindings ~ +]:server-state.old])
::
:: inits channel mode
::
%~2023.3.16
%= $
date.old %~2023.4.11
::
server-state.old
%= server-state.old
session.channel-state
(~(run by session.channel-state.server-state.old) (lead %json))
==
==
::
%~2023.4.11
http-server-gate(ax old)
==
:: +stay: produce current state
::

View File

@ -643,17 +643,30 @@
!> (rush '192.168.1.1' simplified-url-parser:eyre-gate)
==
::
++ test-parse-channel-request
++ test-parse-channel-request-jam
;: weld
%+ expect-eq
!> `[%ack 5]~
!> %- parse-channel-request:eyre-gate
(need (de-json:html '[{"action": "ack", "event-id": 5}]'))
!> &+[%ack 5]~
!> %+ parse-channel-request:eyre-gate %jam
(as-octs:mimes:html (scot %uw (jam [%ack 5]~)))
::
%+ expect-eq
!> `[%poke 0 ~nec %app1 %app-type [%n '5']]~
!> %- parse-channel-request:eyre-gate
%- need %- de-json:html
!> |+'invalid request data'
!> %+ parse-channel-request:eyre-gate %jam
(as-octs:mimes:html (scot %uw (jam [%not %a %chanreq %list])))
==
::
++ test-parse-channel-request-json
;: weld
%+ expect-eq
!> &+[%ack 5]~
!> %+ parse-channel-request:eyre-gate %json
(as-octs:mimes:html '[{"action": "ack", "event-id": 5}]')
::
%+ expect-eq
!> &+[%poke-json 0 ~nec %app1 %app-type [%n '5']]~
!> %+ parse-channel-request:eyre-gate %json
%- as-octs:mimes:html
'''
[{"action": "poke",
"id": 0,
@ -664,9 +677,9 @@
'''
::
%+ expect-eq
!> `[%subscribe 1 ~sampyl-sipnym %hall /this/path]~
!> %- parse-channel-request:eyre-gate
%- need %- de-json:html
!> &+[%subscribe 1 ~sampyl-sipnym %hall /this/path]~
!> %+ parse-channel-request:eyre-gate %json
%- as-octs:mimes:html
'''
[{"action": "subscribe",
"id": 1,
@ -676,9 +689,9 @@
'''
::
%+ expect-eq
!> `[%unsubscribe 2 1]~
!> %- parse-channel-request:eyre-gate
%- need %- de-json:html
!> &+[%unsubscribe 2 1]~
!> %+ parse-channel-request:eyre-gate %json
%- as-octs:mimes:html
'''
[{"action": "unsubscribe",
"id": 2,
@ -686,30 +699,30 @@
'''
::
%+ expect-eq
!> ~
!> %- parse-channel-request:eyre-gate
%- need %- de-json:html
!> |+'invalid channel json'
!> %+ parse-channel-request:eyre-gate %json
%- as-octs:mimes:html
'[{"noaction": "noaction"}]'
::
%+ expect-eq
!> ~
!> %- parse-channel-request:eyre-gate
%- need %- de-json:html
!> |+'invalid channel json'
!> %+ parse-channel-request:eyre-gate %json
%- as-octs:mimes:html
'[{"action": "bad-action"}]'
::
%+ expect-eq
!> ~
!> %- parse-channel-request:eyre-gate
%- need %- de-json:html
!> |+'invalid channel json'
!> %+ parse-channel-request:eyre-gate %json
%- as-octs:mimes:html
'[{"action": "ack", "event-id": 5}, {"action": "bad-action"}]'
::
%+ expect-eq
!> :- ~
!> :- %&
:~ [%ack 9]
[%poke 3 ~bud %wut %wut-type [%a [%n '2'] [%n '1'] ~]]
[%poke-json 3 ~bud %wut %wut-type [%a [%n '2'] [%n '1'] ~]]
==
!> %- parse-channel-request:eyre-gate
%- need %- de-json:html
!> %+ parse-channel-request:eyre-gate %json
%- as-octs:mimes:html
'''
[{"action": "ack", "event-id": 9},
{"action": "poke",