diff --git a/pkg/arvo/sys/lull.hoon b/pkg/arvo/sys/lull.hoon index 8ddc9d0c58..76ceb3b4d5 100644 --- a/pkg/arvo/sys/lull.hoon +++ b/pkg/arvo/sys/lull.hoon @@ -1523,7 +1523,8 @@ :: events since then. :: +$ channel - $: :: channel-state: expiration time or the duct currently listening + $: mode=?(%json %jam) + :: channel-state: expiration time or the duct currently listening :: :: For each channel, there is at most one open EventSource :: connection. A 400 is issues on duplicate attempts to connect to the diff --git a/pkg/arvo/sys/vane/eyre.hoon b/pkg/arvo/sys/vane/eyre.hoon index e19f48d609..9b43321916 100644 --- a/pkg/arvo/sys/vane/eyre.hoon +++ b/pkg/arvo/sys/vane/eyre.hoon @@ -70,7 +70,7 @@ ++ axle $: :: date: date at which http-server's state was updated to this data structure :: - date=%~2022.7.26 + date=%~2023.3.15 :: server-state: state of inbound requests :: =server-state @@ -119,9 +119,12 @@ $% :: %ack: acknowledges that the client has received events up to :id :: [%ack event-id=@ud] - :: %poke: pokes an application, translating :json to :mark. + :: %poke: pokes an application, validating :noun against :mark :: - [%poke request-id=@ud ship=@p app=term mark=@tas =json] + [%poke request-id=@ud ship=@p app=term mark=@tas =noun] + :: %poke-json: pokes an application, translating :json to :mark + :: + [%poke-json request-id=@ud ship=@p app=term mark=@tas =json] :: %watch: subscribes to an application path :: [%subscribe request-id=@ud ship=@p app=term =path] @@ -200,11 +203,32 @@ (sub u.sus ack) :: +parse-channel-request: parses a list of channel-requests :: +++ parse-channel-request + |= [mode=?(%json %jam) body=octs] + ^- (each (list channel-request) @t) + ?- mode + %json + ?~ maybe-json=(de-json:html q.body) + |+'put body not json' + ?~ maybe-requests=(parse-channel-request-json u.maybe-json) + |+'invalid channel json' + &+u.maybe-requests + :: + %jam + ?~ maybe-noun=(bind (slaw %uw q.body) cue) + |+'invalid request format' + ?~ maybe-reqs=((soft (list channel-request)) u.maybe-noun) + ~& [%miss u.maybe-noun] + |+'invalid request data' + &+u.maybe-reqs + == +:: +parse-channel-request-json: parses a json list of channel-requests +:: :: Parses a json array into a list of +channel-request. If any of the items :: in the list fail to parse, the entire thing fails so we can 400 properly :: to the client. :: -++ parse-channel-request +++ parse-channel-request-json |= request-list=json ^- (unit (list channel-request)) :: parse top @@ -220,7 +244,9 @@ ?: =('ack' u.maybe-key) ((pe %ack (ot event-id+ni ~)) item) ?: =('poke' u.maybe-key) - ((pe %poke (ot id+ni ship+(su fed:ag) app+so mark+(su sym) json+some ~)) item) + %. item + %+ pe %poke-json + (ot id+ni ship+(su fed:ag) app+so mark+(su sym) json+some ~) ?: =('subscribe' u.maybe-key) %. item %+ pe %subscribe @@ -1130,6 +1156,11 @@ :: %^ return-static-data-on-duct 400 'text/html' (error-page 400 authenticated url.request "malformed channel url") + =/ mode=?(%json %jam) + ::TODO go off file extention instead? + ?+ i.t.site.request-line %json + %channel-jam %jam + == :: channel-id: unique channel id parsed out of url :: =+ channel-id=i.t.t.site.request-line @@ -1137,13 +1168,13 @@ ?: =('PUT' method.request) :: PUT methods starts/modifies a channel, and returns a result immediately :: - (on-put-request channel-id request) + (on-put-request channel-id mode request) :: ?: =('GET' method.request) - (on-get-request channel-id request) + (on-get-request channel-id mode request) ?: =('POST' method.request) :: POST methods are used solely for deleting channels - (on-put-request channel-id request) + (on-put-request channel-id mode request) :: ~& %session-not-a-put [~ state] @@ -1198,7 +1229,7 @@ :: state. :: ++ update-timeout-timer-for - |= channel-id=@t + |= [mode=?(%json %jam) channel-id=@t] ^+ ..update-timeout-timer-for :: when our callback should fire :: @@ -1210,7 +1241,7 @@ %_ ..update-timeout-timer-for session.channel-state.state %+ ~(put by session.channel-state.state) channel-id - [[%& expiration-time duct] 0 now ~ ~ ~ ~] + [mode [%& expiration-time duct] 0 now ~ ~ ~ ~] :: moves [(set-timeout-move channel-id expiration-time) moves] @@ -1262,13 +1293,16 @@ :: client in text/event-stream format. :: ++ on-get-request - |= [channel-id=@t =request:http] + |= [channel-id=@t mode=?(%json %jam) =request:http] ^- [(list move) server-state] :: if there's no channel-id, we must 404 :: ?~ maybe-channel=(~(get by session.channel-state.state) channel-id) %^ return-static-data-on-duct 404 'text/html' (error-page 404 %.y url.request ~) + :: + ::TODO consider forbidding connection if !=(mode mode.u.maybe-channel) + :: :: when opening an event-stream, we must cancel our timeout timer :: if there's no duct already bound. Else, kill the old request :: and replace it @@ -1313,8 +1347,10 @@ =/ sign (channel-event-to-sign u.maybe-channel request-id channel-event) ?~ sign $ - ?~ jive=(sign-to-json u.maybe-channel request-id u.sign) $ - $(events [(event-json-to-wall id +.u.jive) events]) + =/ said + (sign-to-tape u.maybe-channel(mode mode) request-id u.sign) + ?~ said $ + $(events [(event-tape-to-wall id +.u.said) events]) :: send the start event to the client :: =^ http-moves state @@ -1346,13 +1382,18 @@ :: =/ heartbeat-time=@da (add now ~s20) =/ heartbeat (set-heartbeat-move channel-id heartbeat-time) - :: clear the event queue, record the duct for future output and - :: record heartbeat-time for possible future cancel + :: clear the event queue, record the mode & duct for future output, + :: and record heartbeat-time for possible future cancel :: =. session.channel-state.state %+ ~(jab by session.channel-state.state) channel-id |= =channel - channel(events ~, state [%| duct], heartbeat (some [heartbeat-time duct])) + %_ channel + mode mode + events ~ + state [%| duct] + heartbeat (some [heartbeat-time duct]) + == :: [[heartbeat :(weld http-moves cancel-moves moves)] state] :: +acknowledge-events: removes events before :last-event-id on :channel-id @@ -1377,26 +1418,23 @@ :: a set of commands in JSON format in the body of the message. :: ++ on-put-request - |= [channel-id=@t =request:http] + |= [channel-id=@t mode=?(%json %jam) =request:http] ^- [(list move) server-state] :: error when there's no body :: ?~ body.request %^ return-static-data-on-duct 400 'text/html' (error-page 400 %.y url.request "no put body") - :: if the incoming body isn't json, this is a bad request, 400. + :: if we cannot parse requests from the body, give an error :: - ?~ maybe-json=(de-json:html q.u.body.request) + =/ maybe-requests=(each (list channel-request) @t) + (parse-channel-request mode u.body.request) + ?: ?=(%| -.maybe-requests) %^ return-static-data-on-duct 400 'text/html' - (error-page 400 %.y url.request "put body not json") - :: parse the json into an array of +channel-request items - :: - ?~ maybe-requests=(parse-channel-request u.maybe-json) - %^ return-static-data-on-duct 400 'text/html' - (error-page 400 %.y url.request "invalid channel json") + (error-page 400 & url.request (trip p.maybe-requests)) :: while weird, the request list could be empty :: - ?: =(~ u.maybe-requests) + ?: =(~ p.maybe-requests) %^ return-static-data-on-duct 400 'text/html' (error-page 400 %.y url.request "empty list of actions") :: check for the existence of the channel-id @@ -1405,10 +1443,10 @@ :: :channel-timeout from now. if we have one which has a timer, update :: that timer. :: - =. ..on-put-request (update-timeout-timer-for channel-id) + =. ..on-put-request (update-timeout-timer-for mode channel-id) :: for each request, execute the action passed in :: - =+ requests=u.maybe-requests + =+ requests=p.maybe-requests :: gall-moves: put moves here first so we can flop for ordering :: :: TODO: Have an error state where any invalid duplicate subscriptions @@ -1439,7 +1477,7 @@ requests t.requests == :: - %poke + ?(%poke %poke-json) :: =. gall-moves :_ gall-moves @@ -1447,7 +1485,12 @@ :^ duct %pass /channel/poke/[channel-id]/(scot %ud request-id.i.requests) =, i.requests :* %g %deal `sock`[our ship] app - `task:agent:gall`[%poke-as mark %json !>(json)] + ^- task:agent:gall + :+ %poke-as mark + ?- -.i.requests + %poke [%noun !>(noun)] + %poke-json [%json !>(json)] + == == :: $(requests t.requests) @@ -1580,18 +1623,16 @@ :: if conversion succeeds, we *can* send it. if the client is actually :: connected, we *will* send it immediately. :: - =/ jive=(unit (quip move json)) - (sign-to-json u.channel request-id sign) - =/ json=(unit json) - ?~(jive ~ `+.u.jive) - =? moves ?=(^ jive) - (weld moves -.u.jive) - =* sending &(?=([%| *] state.u.channel) ?=(^ json)) + =/ said=(unit (quip move tape)) + (sign-to-tape u.channel request-id sign) + =? moves ?=(^ said) + (weld moves -.u.said) + =* sending &(?=([%| *] state.u.channel) ?=(^ said)) :: =/ next-id next-id.u.channel :: if we can send it, store the event as unacked :: - =? events.u.channel ?=(^ json) + =? events.u.channel ?=(^ said) %- ~(put to events.u.channel) [next-id request-id (sign-to-channel-event sign)] :: if it makes sense to do so, send the event to the client @@ -1607,11 +1648,11 @@ :: ^= data %- wall-to-octs - (event-json-to-wall next-id (need json)) + (event-tape-to-wall next-id +:(need said)) :: complete=%.n == - =? next-id ?=(^ json) +(next-id) + =? next-id ?=(^ said) +(next-id) :: update channel's unacked counts, find out if clogged :: =^ clogged unacked.u.channel @@ -1619,7 +1660,7 @@ :: and of course don't count events we can't send as unacked. :: ?: ?| !?=(%fact -.sign) - ?=(~ json) + ?=(~ said) == [| unacked.u.channel] =/ num=@ud @@ -1632,7 +1673,7 @@ :: if we're clogged, or we ran into an event we can't serialize, :: kill this gall subscription. :: - =* kicking |(clogged ?=(~ json)) + =* kicking |(clogged ?=(~ said)) =? moves kicking :_ moves ::NOTE this shouldn't crash because we @@ -1662,8 +1703,8 @@ :: ^= data %- wall-to-octs - %+ event-json-to-wall next-id - +:(need (sign-to-json u.channel request-id %kick ~)) + %+ event-tape-to-wall next-id + +:(need (sign-to-tape u.channel request-id %kick ~)) :: complete=%.n == @@ -1716,6 +1757,17 @@ ?: ?=(%| -.res) ((slog leaf+"eyre: stale fact of mark {(trip have)}" ~) ~) `[%fact have p.res] + :: +sign-to-tape: render sign from request-id in specified mode + :: + ++ sign-to-tape + |= [=channel request-id=@ud =sign:agent:gall] + ^- (unit (quip move tape)) + ?- mode.channel + %json %+ bind (sign-to-json channel request-id sign) + |=((quip move json) [+<- (en-json:html +<+)]) + %jam =- `[~ (scow %uw (jam -))] + [request-id (sign-to-channel-event sign)] + == :: +sign-to-json: render sign from request-id as json channel event :: ++ sign-to-json @@ -1785,12 +1837,12 @@ == == :: - ++ event-json-to-wall - ~% %eyre-json-to-wall ..part ~ - |= [event-id=@ud =json] + ++ event-tape-to-wall + ~% %eyre-tape-to-wall ..part ~ + |= [event-id=@ud =tape] ^- wall :~ (weld "id: " (format-ud-as-integer event-id)) - (weld "data: " (en-json:html json)) + (weld "data: " tape) "" == :: @@ -2097,6 +2149,8 @@ :: =/ request-line (parse-request-line url) =/ parsed-url=(list @t) site.request-line + =? parsed-url ?=([%'~' %channel-jam *] parsed-url) + parsed-url(i.t %channel) :: =/ bindings bindings.state |- @@ -2573,6 +2627,9 @@ :: ?^ error.sign [[duct %slip %d %flog %crud %wake u.error.sign]~ http-server-gate] + ::NOTE we are not concerned with expiring channels that are still in + :: use. we require acks for messages, which bump their session's + :: timer. channels have their own expiry timer, too. :: remove cookies that have expired :: =* sessions sessions.authentication-state.server-state.ax @@ -2612,15 +2669,8 @@ :: +load: migrate old state to new state (called on vane reload) :: ++ load - => |% - ++ axle-old - %+ cork - axle - |= =axle - axle(date %~2020.10.18) - -- - |= old=$%(axle axle-old) - ^+ ..^$ + |^ |= old=$%(axle axle-any) + ^+ ..^^$ :: ?- -.old %~2020.10.18 @@ -2640,8 +2690,57 @@ ?& ?=(^ secure.ports.server-state.old) ?=(^ secure.http-config.server-state.old) == - ..^$(ax old) + $(old (axle-0-to-1)) + :: + %~2023.3.15 + ..^^$(ax old) == + :: + +$ axle-any + $% axle + axle-0 + == + :: + ++ axle-0-to-1 + |= ax=axle-0 + ^- axle + :- %~2023.3.15 + ^- server-state + ::TODO add /~/channel-jam binding + %= server-state.ax + session.channel-state + (~(run by session.channel-state.server-state.ax) (lead %json)) + == + :: + +$ axle-0 + $: date=?(%~2022.7.26 %~2020.10.18) + $= server-state + $: bindings=(list [=binding =duct =action]) + =cors-registry + connections=(map duct outstanding-connection) + =authentication-state + channel-state=channel-state-0 + domains=(set turf) + =http-config + ports=[insecure=@ud secure=(unit @ud)] + outgoing-duct=duct + == == + :: + +$ channel-state-0 + $: session=(map @t channel-0) + duct-to-key=(map duct @t) + == + :: + +$ channel-0 + $: state=(each timer duct) + next-id=@ud + last-ack=@da + events=(qeu [id=@ud request-id=@ud =channel-event]) + unacked=(map @ud @ud) + subscriptions=(map @ud [ship=@p app=term =path duc=duct]) + heartbeat=(unit timer) + == + -- :: +stay: produce current state :: ++ stay `axle`ax