First attempt at retargetting the %server demo onto %light channels.

This currently has a bunch of issues around canceling the channel, but
we have a minimal demo which is able to use Gall subscriptions from the
webpage over an EventSource.
This commit is contained in:
Elliot Glaysher 2019-01-09 15:43:43 -08:00
parent bee2b01fb6
commit 412a182c3e
2 changed files with 143 additions and 104 deletions

View File

@ -8,6 +8,7 @@
$% [%connect wire [(unit @t) (list @t)] %server]
[%wait wire @da]
[%http-response =raw-http-response:light]
[%diff %json json]
==
--
:: utilities:
@ -34,8 +35,10 @@
;h1:"Hello, {(trip name)}"
;p
; Time is
;span#time;
;span#time:"?????"
==
;button#start:"Start Timer"
;button#poke:"Random Poke"
;script(type "module", src "/~server/hello.js");
==
==
@ -47,76 +50,33 @@
import * as urb from '/~/channel/channel.js';
var c = urb.newChannel();
c.poke("zod", "server", "json", 5,
// The poke button just sends a poke
document.getElementById("poke").addEventListener("click", function(){
c.poke("zod", "server", "json", 5,
function() {
console.log("Poke worked");
},
function(err) {
console.log("Poke failed: " + err);
});
});
var evtSource = new EventSource("/~server/stream",
{ withCredentials: true } );
evtSource.onmessage = function(e) {
var message = document.getElementById("time");
message.innerHTML = e.data;
}
// The subscription sends the time which makes the thing work.
//
c.subscribe("zod", "server", "/timer",
function(err) {
console.log("Failed initial connection: " + err);
},
function(json) {
console.log("Subscription update: ", json);
var message = document.getElementById("time");
message.innerHTML = json;
},
function() {
console.log("Subscription quit");
});
'''
:: helper library that lets an app handle an EventSource.
::
:: TODO: This doesn't even attempt to deal with sequence numbers.
::
++ event-source
|_ m=(map =bone last-id=@ud)
++ abet m
:: +start-session: called by app to start a session and send first event
::
:: This creates a new session where we
::
++ start-session
|= [session=@ud =bone data=wall]
^- [(list move) _m]
::
:- :~ :* bone %http-response
%start 200
:~ ['content-type' 'text/event-stream']
['cache-control' 'no-cache']
==
(wall-to-output data)
complete=%.n
== ==
(~(put by m) bone 0)
:: +session-stopped: external notification that a session ended
::
++ session-stopped
|= =bone
^- _m
::
(~(del by m) bone)
:: +send-message: sends a message based on the continuation
::
++ send-message
|= [=bone data=wall]
^- [(list move) _m]
:- [bone %http-response %continue (wall-to-output data) complete=%.n]~
(~(jab by m) bone |=(a=@ud +(a)))
:: +wall-to-output: changes our raw text lines to a text/event-stream
::
++ wall-to-output
|= =wall
^- (unit octs)
:- ~
%- as-octs:mimes:html
%- crip
%- zing
%+ weld
%+ turn wall
|= t=tape
"data: {t}\0a"
::
[`tape`['\0a' ~] ~]
--
:: +require-authorization: redirect to the login page when unauthenticated
::
++ require-authorization
@ -138,7 +98,7 @@
|%
::
+$ state
$: events=(map =bone last-id=@ud)
$: next-timer=(unit @da)
==
--
::
@ -161,37 +121,30 @@
~& [%bound success]
[~ this]
::
++ handle-start-stream
|= =inbound-request:light
^- (quip move _this)
:: Start a session sending the current time
::
=^ moves events
(~(start-session event-source events) 0 ost.bow ["{<now.bow>}" ~])
::
:_ this
:- ^- move
[ost.bow %wait /timer (add now.bow ~s1)]
::
moves
:: +wake: responds to a %wait send from +handle-start-stream
::
++ wake
|= [wir=wire ~]
^- (quip move _this)
?. (~(has by events) ost.bow)
~& [%closed wir now.bow]
[~ this]
::
~& [%timer-tick wir now.bow]
::
=^ moves events
(~(send-message event-source events) ost.bow ["{<now.bow>}" ~])
=/ moves=(list move)
%+ turn (prey:pubsub:userlib /timer bow)
|= [=bone ^]
[bone %diff %json %s (scot %da now.bow)]
:: if we have outbound moves, say that we have another timer.
::
:_ this
:- ^- move
[ost.bow %wait /timer (add now.bow ~s1)]
moves
=. next-timer
?: ?=(^ moves)
`(add now.bow ~s1)
~
:: if we have any subscribers, add another timer for the future
::
=? moves ?=(^ moves)
[[ost.bow %wait /timer (add now.bow ~s1)] moves]
::
[moves this]
:: +poke-handle-http-request: received on a new connection established
::
++ poke-handle-http-request
@ -206,9 +159,6 @@
?~ back-path
'World'
i.back-path
?: =(name 'stream')
(handle-start-stream inbound-request)
~& [%name name]
::
?: =(name 'hello')
:_ this
@ -233,8 +183,22 @@
^- (quip move _this)
:: the only long lived connections we keep state about are the stream ones.
::
=. events
(~(session-stopped event-source events) ost.bow)
::
[~ this]
::
++ poke-json
|= =json
^- (quip move _this)
~& [%poke-json json]
[~ this]
::
++ peer-timer
|= pax/path
^- (quip move _this)
:: if we don't have a timer, set a timer.
?: ?=(^ next-timer)
~& [%already-have-a-timer next-timer]
[~ this]
::
:- [ost.bow %wait /timer (add now.bow ~s1)]~
this(next-timer `(unit @da)`[~ (add now.bow ~s1)])
--

View File

@ -283,7 +283,7 @@
==
:: channel-timeout: the delay before a channel should be reaped
::
++ channel-timeout ~h12
++ channel-timeout ~s45
--
:: utilities
::
@ -417,6 +417,8 @@
==
:: +channel-js: the urbit javascript interface
::
:: TODO: Must send 'acks' to the server.
::
++ channel-js
^- octs
%- as-octs:mimes:html
@ -431,7 +433,14 @@
Math.random().toString(16).slice(-6);
this.requestId = 1;
this.connection = null;
// the currently connected EventSource
//
this.eventSource = null;
// the id of the last EventSource event we received
//
this.lastEventId = 0;
// a registry of requestId to successFunc/failureFunc
//
@ -442,7 +451,7 @@
//
this.outstandingPokes = new Map();
// a registry of requestId to eventFunc/disconnectFunc
// a registry of requestId to subscription functions.
//
// These functions are registered during a +subscribe and are
// executed in the onServerEvent()/onServerError() callbacks. The
@ -460,37 +469,91 @@
this.outstandingPokes.set(
id, {"success": successFunc, "fail": failureFunc});
var req = new XMLHttpRequest();
req.open("PUT", this.channelURL());
req.setRequestHeader("Content-Type", "application/json");
// TODO: Need to stuff an "ack" in here, too.
var x = JSON.stringify([{
this.sendJSONToChannel({
"id": id,
"action": "poke",
"ship": ship,
"app": app,
"mark": mark,
"json": json
}]);
});
}
// subscribes to a path on an
//
subscribe(ship, app, path, connectionErrFunc, eventFunc, quitFunc) {
var id = this.nextId();
this.outstandingSubscriptions.set(
id, {"err": connectionErrFunc, "event": eventFunc, "quit": quitFunc});
this.sendJSONToChannel({
"id": id,
"action": "subscribe",
"ship": ship,
"app": app,
"path": path
});
}
// sends a JSON command command to the server.
//
// TODO: This should also bundle an acknowledgment of the last received
// request id.
//
sendJSONToChannel(j) {
var req = new XMLHttpRequest();
req.open("PUT", this.channelURL());
req.setRequestHeader("Content-Type", "application/json");
// TODO: Need to stuff an "ack" in here, too.
var x = JSON.stringify([j]);
req.send(x);
this.connectIfDisconnected();
}
// connects to the EventSource if we are not currently connected
//
connectIfDisconnected() {
if (this.connection)
if (this.eventSource)
return;
this.eventSource = new EventSource(this.channelURL(), {withCredentials:true});
this.eventSource.onmessage = e => {
this.lastEventId = e.id;
var obj = JSON.parse(e.data);
if (obj.response == "poke") {
var funcs = this.outstandingPokes.get(obj.id);
if (obj.hasOwnProperty("ok"))
funcs["success"]()
else
else if (obj.hasOwnProperty("err"))
funcs["fail"](obj.err)
else
console.log("Invalid poke response: ", obj);
this.outstandingPokes.delete(obj.id);
} else if (obj.response == "subscribe") {
// on a response to a subscribe, we only notify the caller on err
//
var funcs = this.outstandingSubscriptions.get(obj.id);
if (obj.hasOwnProperty("err")) {
funcs["err"](obj.err);
this.outstandingSubscriptions.delete(obj.id);
} else {
console.log("Subscription establisthed");
}
} else if (obj.response == "diff") {
console.log("Diff: ", obj);
var funcs = this.outstandingSubscriptions.get(obj.id);
funcs["event"](obj.json);
} else if (obj.response == "quit") {
var funcs = this.outstandingSubscriptions.get(obj.id);
funcs["quit"](obj.err);
this.outstandingSubscriptions.delete(obj.id);
} else {
console.log("Unrecognized response: ", e);
}
@ -911,6 +974,8 @@
::
++ on-cancel-request
^- [(list move) server-state]
::
~& [%channel-on-cancel-request duct]
:: lookup the session id by duct
::
?~ maybe-channel-id=(~(get by duct-to-key.channel-state.state) duct)
@ -1214,6 +1279,16 @@
==
::
(emit-event channel-id [(en-json:html json)]~)
::
%quit
=/ =json
=, enjs:format
%- pairs :~
['response' [%s 'quit']]
['id' (numb request-id)]
==
::
(emit-event channel-id [(en-json:html json)]~)
::
%reap
=/ =json