First transimsission of text/event-stream from Eyre.

This commit is contained in:
Elliot Glaysher 2018-11-20 17:06:04 -08:00
parent 439e623550
commit 1a443599ae
2 changed files with 485 additions and 135 deletions

View File

@ -27,7 +27,8 @@
$: %b $: %b
:: ::
:: ::
$% [%wait p=@da] $% [%rest p=@da]
[%wait p=@da]
== == == ==
:: %f: to ford :: %f: to ford
:: ::
@ -226,12 +227,15 @@
:: events since then. :: events since then.
:: ::
+$ channel +$ channel
$: :: expiration-time: when this channel will expire $: :: channel-state: expiration time or the duct currently listening
:: ::
:: In case of an EventSource disconnect, we set a timer to reap the :: For each channel, there is at most one open EventSource
:: subscriptions. This timer shouldn't be too short because the :: connection. A 400 is issues on duplicate attempts to connect to the
:: same channel. When an EventSource isn't connected, we set a timer
:: to reap the subscriptions. This timer shouldn't be too short
:: because the
:: ::
expiration-time=(unit @da) state=(each @da duct)
:: next-id: next sequence number to use :: next-id: next sequence number to use
:: ::
next-id=@ud next-id=@ud
@ -243,36 +247,29 @@
:: channel, we send the event but we still add it to events because we :: channel, we send the event but we still add it to events because we
:: can't assume it got received until we get an acknowledgment. :: can't assume it got received until we get an acknowledgment.
:: ::
events=(qeu [id=@ud type=term data=wall]) events=(qeu [id=@ud lines=wall])
:: subscriptions: gall subscriptions :: subscriptions: gall subscriptions
:: ::
:: We maintain a list of subscriptions so if a channel times out, we :: We maintain a list of subscriptions so if a channel times out, we
:: can cancel all the subscriptions we've made. :: can cancel all the subscriptions we've made.
:: ::
subscriptions=(list [ship=@p app=term =path]) subscriptions=(list [ship=@p app=term =path])
:: duct: the open http sessions which we must %continue on new events.
::
:: For each channel, there is at most one open EventSource
:: connection. A 400 is issues on duplicate attempts to connect to the
:: same channel.
::
duct=(unit duct)
== ==
:: channel-request: an action requested on a channel :: channel-request: an action requested on a channel
:: ::
+$ channel-request +$ channel-request
$% :: %ack: acknowledges that the client has received events up to :id $% :: %ack: acknowledges that the client has received events up to :id
:: ::
[%ack id=@ud] [%ack event-id=@ud]
:: %poke: pokes an application, translating :json to :mark. :: %poke: pokes an application, translating :json to :mark.
:: ::
[%poke ship=@p app=term mark=@tas =json] [%poke request-id=@ud ship=@p app=term mark=@tas =json]
:: %subscribe: subscribes to an application path :: %subscribe: subscribes to an application path
:: ::
[%subscribe ship=@p app=term =path] [%subscribe request-id=@ud ship=@p app=term =path]
:: %unsubscribe: unsubscribes from an application path :: %unsubscribe: unsubscribes from an application path
:: ::
[%unsubscribe ship=@p app=term =path] [%unsubscribe request-id=@ud ship=@p app=term =path]
== ==
:: channel-timeout: the delay before a channel should be reaped :: channel-timeout: the delay before a channel should be reaped
:: ::
@ -301,17 +298,17 @@
?~ maybe-key=((ot action+so ~) item) ?~ maybe-key=((ot action+so ~) item)
~ ~
?: =('ack' u.maybe-key) ?: =('ack' u.maybe-key)
((pe %ack (ot id+ni ~)) item) ((pe %ack (ot event-id+ni ~)) item)
?: =('poke' u.maybe-key) ?: =('poke' u.maybe-key)
((pe %poke (ot ship+(su fed:ag) app+so mark+(su sym) json+some ~)) item) ((pe %poke (ot id+ni ship+(su fed:ag) app+so mark+(su sym) json+some ~)) item)
?: =('subscribe' u.maybe-key) ?: =('subscribe' u.maybe-key)
%. item %. item
%+ pe %subscribe %+ pe %subscribe
(ot ship+(su fed:ag) app+so path+(su ;~(pfix fas (more fas urs:ab))) ~) (ot id+ni ship+(su fed:ag) app+so path+(su ;~(pfix fas (more fas urs:ab))) ~)
?: =('unsubscribe' u.maybe-key) ?: =('unsubscribe' u.maybe-key)
%. item %. item
%+ pe %unsubscribe %+ pe %unsubscribe
(ot ship+(su fed:ag) app+so path+(su ;~(pfix fas (more fas urs:ab))) ~) (ot id+ni ship+(su fed:ag) app+so path+(su ;~(pfix fas (more fas urs:ab))) ~)
:: if we reached this, we have an invalid action key. fail parsing. :: if we reached this, we have an invalid action key. fail parsing.
:: ::
~ ~
@ -354,9 +351,9 @@
== ==
== ==
== ==
:: +render-tang: renders a tang and adds <br/> tags between each line :: +render-tang-to-marl: renders a tang and adds <br/> tags between each line
:: ::
++ render-tang ++ render-tang-to-marl
|= {wid/@u tan/tang} |= {wid/@u tan/tang}
^- marl ^- marl
=/ raw=(list tape) (zing (turn tan |=(a/tank (wash 0^wid a)))) =/ raw=(list tape) (zing (turn tan |=(a/tank (wash 0^wid a))))
@ -364,6 +361,28 @@
|- ^- marl |- ^- marl
?~ raw ~ ?~ raw ~
[;/(i.raw) ;br; $(raw t.raw)] [;/(i.raw) ;br; $(raw t.raw)]
:: +render-tang-to-wall: renders tang as text lines
::
++ render-tang-to-wall
|= {wid/@u tan/tang}
^- wall
(zing (turn tan |=(a=tank (wash 0^wid a))))
:: +wall-to-octs: text to binary output
::
++ wall-to-octs
|= =wall
^- (unit octs)
::
?: =(~ wall)
~
::
:- ~
%- as-octs:mimes:html
%- crip
%- zing
%+ turn wall
|= t=tape
"{t}\0a"
:: +internal-server-error: 500 page, with a tang :: +internal-server-error: 500 page, with a tang
:: ::
++ internal-server-error ++ internal-server-error
@ -381,7 +400,7 @@
;p:"There was an error while handling the request for {<(trip url)>}." ;p:"There was an error while handling the request for {<(trip url)>}."
;* ?: authorized ;* ?: authorized
;= ;=
;code:"*{(render-tang 80 t)}" ;code:"*{(render-tang-to-marl 80 t)}"
== ==
~ ~
== ==
@ -536,7 +555,7 @@
(handle-request:authentication secure address http-request) (handle-request:authentication secure address http-request)
:: ::
%channel %channel
(handle-request:channel secure authenticated address http-request) (handle-request:by-channel secure authenticated address http-request)
== ==
:: +cancel-request: handles a request being externally aborted :: +cancel-request: handles a request being externally aborted
:: ::
@ -729,7 +748,7 @@
:: Eyre offers a remote interface to your Urbit through channels, which :: Eyre offers a remote interface to your Urbit through channels, which
:: are persistent connections on the server which :: are persistent connections on the server which
:: ::
++ channel ++ by-channel
:: moves: the moves to be sent out at the end of this event, reversed :: moves: the moves to be sent out at the end of this event, reversed
:: ::
=| moves=(list move) =| moves=(list move)
@ -753,30 +772,126 @@
:: ::
=+ request-line=(parse-request-line url.http-request) =+ request-line=(parse-request-line url.http-request)
?. ?=([@t @t @t ~] site.request-line) ?. ?=([@t @t @t ~] site.request-line)
:: url is not of the form '/~/subscription/uid' :: url is not of the form '/~/subscription/'
:: ::
%^ return-static-data-on-duct 400 'text/html' %^ return-static-data-on-duct 400 'text/html'
(internal-server-error authenticated url.http-request ~) (internal-server-error authenticated url.http-request ~)
:: uid: unique channel id parsed out of url :: channel-id: unique channel id parsed out of url
:: ::
=+ uid=i.t.t.site.request-line =+ channel-id=i.t.t.site.request-line
:: ::
?: =('PUT' method.http-request) ?: =('PUT' method.http-request)
:: PUT methods starts/modifies a channel, and returns a result immediately :: PUT methods starts/modifies a channel, and returns a result immediately
:: ::
(on-put-request uid http-request) (on-put-request channel-id http-request)
::
?: =('GET' method.http-request)
(on-get-request channel-id http-request)
:: ::
~& %session-not-a-put ~& %session-not-a-put
[~ state] [~ state]
:: +handle-cancel: cancels an ongoing subscription :: +handle-cancel: cancels an ongoing subscription
:: ::
::++ handle-cancel ::++ handle-cancel
:: +on-get-request: handles a GET request
::
:: GET requests open a channel for the server to send events to the
:: client in text/event-stream format.
::
++ on-get-request
|= [channel-id=@t =http-request]
^- [(list move) server-state]
:: if there's no channel-id, we must 404
::
?~ maybe-channel=(~(get by session.channel-state.state) channel-id)
%^ return-static-data-on-duct 404 'text/html'
(internal-server-error %.y url.http-request ~)
:: if there's already a duct listening to this channel, we must 400
::
?: ?=([%| *] state.u.maybe-channel)
%^ return-static-data-on-duct 400 'text/html'
(internal-server-error %.y url.http-request ~)
:: when opening an event-stream, we must cancel our timeout timer
::
:: TODO: Need to cancel on the original duct!
::
=. moves
:_ moves
^- move
:^ duct %pass /channel/timeout/[channel-id]
[%b %rest p.state.u.maybe-channel]
:: the http-request may include a 'Last-Event-Id' header
::
=/ maybe-last-event-id=(unit @ud)
?~ maybe-raw-header=(get-header 'Last-Event-ID' header-list.http-request)
~
(rush u.maybe-raw-header dum:ag)
:: flush events older than the passed in 'Last-Event-ID'
::
=? state ?=(^ maybe-last-event-id)
(acknowledge-events channel-id u.maybe-last-event-id)
:: combine the remaining queued events to send to the client
::
=/ event-replay=wall
%- zing
%- flop
=/ queue events.u.maybe-channel
=| events=(list wall)
|-
^+ events
?: =(~ queue)
events
=^ head queue ~(get to queue)
$(events [lines.p.head events])
:: send the start event to the client
::
=. moves
:_ moves
:+ duct %give
:* %http-response %start 200
:~ ['content-type' 'text/event-stream']
['cache-control' 'no-cache']
['connection' 'keep-alive']
==
(wall-to-octs event-replay)
complete=%.n
==
:: clear the event queue and record the duct for future output
::
=. session.channel-state.state
%+ ~(jab by session.channel-state.state) channel-id
|= =channel
channel(events ~, state [%| duct])
::
[moves state]
:: +acknowledge-events: removes events before :last-event-id on :channel-id
::
++ acknowledge-events
|= [channel-id=@t last-event-id=@u]
^- server-state
%_ state
session.channel-state
%+ ~(jab by session.channel-state.state) channel-id
|= =channel
^+ channel
:: if the queue is empty, don't do anything else
::
?~ maybe-top=~(top to events.channel)
channel
:: if the oldest event is older than the event queue, pop it
::
?: (gte last-event-id id.u.maybe-top)
$(events.channel ~(nap to events.channel))
::
channel
==
:: +on-put-request: handles a PUT request :: +on-put-request: handles a PUT request
:: ::
:: :: PUT requests send commands from the client to the server. We receive
:: a set of commands in JSON format in the body of the message.
:: ::
++ on-put-request ++ on-put-request
|= [uid=@t =http-request] |= [channel-id=@t =http-request]
^- [(list move) server-state] ^- [(list move) server-state]
:: error when there's no body :: error when there's no body
:: ::
@ -798,7 +913,7 @@
?: =(~ u.maybe-requests) ?: =(~ u.maybe-requests)
%^ return-static-data-on-duct 400 'text/html' %^ return-static-data-on-duct 400 'text/html'
(internal-server-error %.y url.http-request ~) (internal-server-error %.y url.http-request ~)
:: check for the existence of the uid :: check for the existence of the channel-id
:: ::
:: if we have no session, create a new one set to expire in :: if we have no session, create a new one set to expire in
:: :channel-timeout from now. :: :channel-timeout from now.
@ -806,18 +921,18 @@
:: TODO: This is wrong. We always want to potentially update the :: TODO: This is wrong. We always want to potentially update the
:: expiration time if there's no eventsource attached. :: expiration time if there's no eventsource attached.
:: ::
=? ..on-put-request !(~(has by session.channel-state.state) uid) =? ..on-put-request !(~(has by session.channel-state.state) channel-id)
:: ::
=/ expiration-time=@da (add now channel-timeout) =/ expiration-time=@da (add now channel-timeout)
%_ ..on-put-request %_ ..on-put-request
session.channel-state.state session.channel-state.state
%+ ~(put by session.channel-state.state) uid %+ ~(put by session.channel-state.state) channel-id
[`expiration-time 0 ~ ~ ~] [[%& expiration-time] 0 ~ ~]
:: ::
moves moves
:_ moves :_ moves
^- move ^- move
[duct %pass /channel/timeout/[uid] %b %wait expiration-time] [duct %pass /channel/timeout/[channel-id] %b %wait expiration-time]
== ==
:: for each request, execute the action passed in :: for each request, execute the action passed in
:: ::
@ -855,7 +970,7 @@
=. gall-moves =. gall-moves
:_ gall-moves :_ gall-moves
^- move ^- move
:^ duct %pass /channel/poke/[uid] :^ duct %pass /channel/poke/[channel-id]/(scot %ud request-id.i.requests)
=, i.requests =, i.requests
[%g %deal `sock`[our ship] `cush:gall`[app %punk mark %json !>(json)]] [%g %deal `sock`[our ship] `cush:gall`[app %punk mark %json !>(json)]]
:: ::
@ -866,14 +981,15 @@
=. gall-moves =. gall-moves
:_ gall-moves :_ gall-moves
^- move ^- move
:^ duct %pass /channel/subscription/[uid] :^ duct %pass
/channel/subscription/[channel-id]/(scot %ud request-id.i.requests)
=, i.requests =, i.requests
[%g %deal [our ship] `cush:gall`[app %peel %json path]] [%g %deal [our ship] `cush:gall`[app %peel %json path]]
:: TODO: Check existence to prevent duplicates? :: TODO: Check existence to prevent duplicates?
:: ::
=. session.channel-state.state =. session.channel-state.state
%+ ~(jab by session.channel-state.state) uid %+ ~(jab by session.channel-state.state) channel-id
|= =^channel |= =channel
^+ channel ^+ channel
=, i.requests =, i.requests
channel(subscriptions [[ship app path] subscriptions.channel]) channel(subscriptions [[ship app path] subscriptions.channel])
@ -883,18 +999,117 @@
%unsubscribe %unsubscribe
!! !!
== ==
:: +on-gall-response: turns a gall response into an event
::
++ on-gall-response
|= [channel-id=@t request-id=@ud =cuft:gall]
^- [(list move) server-state]
::
?+ -.cuft ~|([%invalid-gall-response -.cuft] !!)
%coup
=/ =json
=, enjs:format
%- pairs :~
['response' [%s 'poke']]
['id' (numb request-id)]
?~ p.cuft
['ok' [%s 'ok']]
['err' (wall (render-tang-to-wall 100 u.p.cuft))]
==
::
(emit-event channel-id [(en-json:html json)]~)
::
%diff
=/ =json
=, enjs:format
%- pairs :~
['response' [%s 'diff']]
['id' (numb request-id)]
:- 'json'
?> =(%json p.p.cuft)
((hard json) q.q.p.cuft)
==
::
(emit-event channel-id [(en-json:html json)]~)
::
%reap
=/ =json
=, enjs:format
%- pairs :~
['response' [%s 'subscribe']]
['id' (numb request-id)]
?~ p.cuft
['ok' [%s 'ok']]
['err' (wall (render-tang-to-wall 100 u.p.cuft))]
==
::
(emit-event channel-id [(en-json:html json)]~)
==
:: +emit-event: records an event occurred, possibly sending to client
::
:: When an event occurs, we need to record it, even if we immediately
:: send it to a connected browser so in case of disconnection, we can
:: resend it.
::
:: This function is responsible for taking the raw json lines and
:: converting them into a text/event-stream. The :event-stream-lines
:: then may get sent, and are stored for later resending until
:: acknowledged by the client.
::
++ emit-event
|= [channel-id=@t json-text=wall]
^- [(list move) server-state]
::
=/ channel=channel
(~(got by session.channel-state.state) channel-id)
::
=/ event-id next-id.channel
::
=/ event-stream-lines=wall
%- weld :_ [""]~
:- "id: {<event-id>}"
%+ turn json-text
|= =tape
(weld "data: " tape)
:: if a client is connected, send this event to them.
::
=? moves ?=([%| *] state.channel)
:_ moves
:+ p.state.channel %give
:* %http-response %continue
::
^= data
:- ~
%- as-octs:mimes:html
(crip (of-wall:format event-stream-lines))
::
complete=%.n
==
::
:- moves
%_ state
channel-state
%+ ~(jab by session.channel-state.state) channel-id
|= =^channel
^+ channel
::
%_ channel
next-id +(next-id.channel)
events (~(put to events.channel) [event-id event-stream-lines])
==
==
:: +on-channel-timeout: we received a wake to clear an old session :: +on-channel-timeout: we received a wake to clear an old session
:: ::
++ on-channel-timeout ++ on-channel-timeout
|= uid=@t |= channel-id=@t
^- [(list move) server-state] ^- [(list move) server-state]
:: ::
=/ session =/ session
(~(got by session.channel-state.state) uid) (~(got by session.channel-state.state) channel-id)
:: ::
:_ %_ state :_ %_ state
session.channel-state session.channel-state
(~(del by session.channel-state.state) uid) (~(del by session.channel-state.state) channel-id)
== ==
:: produce a list of moves which cancels every gall subscription :: produce a list of moves which cancels every gall subscription
:: ::
@ -904,7 +1119,7 @@
:: todo: double check this; which duct should we be canceling on? does :: todo: double check this; which duct should we be canceling on? does
:: gall strongly bind to a duct as a cause like ford does? :: gall strongly bind to a duct as a cause like ford does?
:: ::
:^ duct %pass /channel/subscription/[uid] :^ duct %pass /channel/subscription/[channel-id]
[%g %deal [our ship] app %pull ~] [%g %deal [our ship] app %pull ~]
-- --
:: +handle-ford-response: translates a ford response for the outside world :: +handle-ford-response: translates a ford response for the outside world
@ -1329,11 +1544,20 @@
:: ::
%timeout %timeout
=/ on-channel-timeout =/ on-channel-timeout
on-channel-timeout:channel:(per-server-event event-args) on-channel-timeout:by-channel:(per-server-event event-args)
=^ moves server-state.ax =^ moves server-state.ax
(on-channel-timeout i.t.t.wire) (on-channel-timeout i.t.t.wire)
[moves light-gate] [moves light-gate]
:: ::
?(%poke %subscription)
?> ?=([%g %unto *] sign)
?> ?=([@ @ @t @ *] wire)
~& [%wire wire]
=/ on-gall-response
on-gall-response:by-channel:(per-server-event event-args)
=^ moves server-state.ax
(on-gall-response i.t.t.wire `@ud`(slav %ud i.t.t.t.wire) p.sign)
[moves light-gate]
== ==
-- --
:: ::

View File

@ -647,14 +647,15 @@
%+ expect-eq %+ expect-eq
!> `[%ack 5]~ !> `[%ack 5]~
!> %- parse-channel-request:light-gate !> %- parse-channel-request:light-gate
(need (de-json:html '[{"action": "ack", "id": 5}]')) (need (de-json:html '[{"action": "ack", "event-id": 5}]'))
:: ::
%+ expect-eq %+ expect-eq
!> `[%poke ~nec %app1 %app-type [%n '5']]~ !> `[%poke 0 ~nec %app1 %app-type [%n '5']]~
!> %- parse-channel-request:light-gate !> %- parse-channel-request:light-gate
%- need %- de-json:html %- need %- de-json:html
''' '''
[{"action": "poke", [{"action": "poke",
"id": 0,
"ship": "nec", "ship": "nec",
"app": "app1", "app": "app1",
"mark": "app-type", "mark": "app-type",
@ -662,22 +663,24 @@
''' '''
:: ::
%+ expect-eq %+ expect-eq
!> `[%subscribe ~sampyl-sipnym %hall /this/path]~ !> `[%subscribe 1 ~sampyl-sipnym %hall /this/path]~
!> %- parse-channel-request:light-gate !> %- parse-channel-request:light-gate
%- need %- de-json:html %- need %- de-json:html
''' '''
[{"action": "subscribe", [{"action": "subscribe",
"id": 1,
"ship": "sampyl-sipnym", "ship": "sampyl-sipnym",
"app": "hall", "app": "hall",
"path": "/this/path"}] "path": "/this/path"}]
''' '''
:: ::
%+ expect-eq %+ expect-eq
!> `[%unsubscribe ~marlyt %thing /other]~ !> `[%unsubscribe 2 ~marlyt %thing /other]~
!> %- parse-channel-request:light-gate !> %- parse-channel-request:light-gate
%- need %- de-json:html %- need %- de-json:html
''' '''
[{"action": "unsubscribe", [{"action": "unsubscribe",
"id": 2,
"ship": "marlyt", "ship": "marlyt",
"app": "thing", "app": "thing",
"path": "/other"}] "path": "/other"}]
@ -699,18 +702,19 @@
!> ~ !> ~
!> %- parse-channel-request:light-gate !> %- parse-channel-request:light-gate
%- need %- de-json:html %- need %- de-json:html
'[{"action": "ack", "id": 5}, {"action": "bad-action"}]' '[{"action": "ack", "event-id": 5}, {"action": "bad-action"}]'
:: ::
%+ expect-eq %+ expect-eq
!> :- ~ !> :- ~
:~ [%ack 9] :~ [%ack 9]
[%poke ~bud %wut %wut-type [%a [%n '2'] [%n '1'] ~]] [%poke 3 ~bud %wut %wut-type [%a [%n '2'] [%n '1'] ~]]
== ==
!> %- parse-channel-request:light-gate !> %- parse-channel-request:light-gate
%- need %- de-json:html %- need %- de-json:html
''' '''
[{"action": "ack", "id": 9}, [{"action": "ack", "event-id": 9},
{"action": "poke", {"action": "poke",
"id": 3,
"ship": "bud", "ship": "bud",
"app": "wut", "app": "wut",
"mark": "wut-type", "mark": "wut-type",
@ -766,90 +770,10 @@
== ==
:: ::
++ test-channel-open-never-used-expire ++ test-channel-open-never-used-expire
:: =^ results1 light-gate (perform-init-start-channel light-gate *sley)
=^ results1 light-gate
%- light-call :*
light-gate
now=~1111.1.1
scry=*sley
call-args=[duct=~[/init] ~ [%init ~nul]]
expected-moves=~
==
:: ensure there's an authenticated session
::
=^ results2 light-gate
%- perform-authentication :*
light-gate
now=~1111.1.2
scry=*sley
==
:: send the channel a poke and a subscription request
::
=^ results3 light-gate
%- light-call-with-comparator :*
light-gate
now=~1111.1.2
scry=*sley
^= call-args
:* duct=~[/http-blah] ~
%inbound-request
%.n
[%ipv4 .192.168.1.1]
%'PUT'
'/~/channel/0123456789abcdef'
['cookie' 'urbauth=0v3.q0p7t.mlkkq.cqtto.p0nvi.2ieea']~
::
:- ~
%- as-octs:mimes:html
'''
[{"action": "poke",
"ship": "nul",
"app": "one",
"mark": "a",
"json": 5},
{"action": "subscribe",
"ship": "nul",
"app": "two",
"path": "/one/two/three"}
]
'''
==
^= comparator
|= moves=(list move:light-gate)
^- tang
::
?. ?=([^ ^ ^ ^ ~] moves)
[%leaf "wrong number of moves: {<(lent moves)>}"]~
::
;: weld
%+ expect-gall-deal
:* /channel/poke/'0123456789abcdef'
[~nul ~nul] %one
%punk %a %json !>([%n '5'])
==
card.i.moves
::
%+ expect-gall-deal
:* /channel/subscription/'0123456789abcdef'
[~nul ~nul] %two
%peel %json /one/two/three
==
card.i.t.moves
::
%+ expect-eq
!> [~[/http-blah] %give %http-response %start 200 ~ ~ %.y]
!> i.t.t.moves
::
%+ expect-eq
!> :* ~[/http-blah] %pass
/channel/timeout/'0123456789abcdef'
%b %wait (add ~1111.1.2 ~h12)
==
!> i.t.t.t.moves
== ==
:: the behn timer wakes us up; we cancel our subscription :: the behn timer wakes us up; we cancel our subscription
:: ::
=^ results4 light-gate =^ results2 light-gate
%- light-take-with-comparator :* %- light-take-with-comparator :*
light-gate light-gate
now=(add ~1111.1.2 ~h12) now=(add ~1111.1.2 ~h12)
@ -874,11 +798,120 @@
card.i.moves card.i.moves
== ==
:: ::
;: weld
results1
results2
==
::
++ test-channel-results-before-open
:: common initialization
::
=^ results1 light-gate (perform-init-start-channel light-gate *sley)
:: poke gets a success message
::
=^ results2 light-gate
%- light-take :*
light-gate
now=(add ~1111.1.2 ~m1)
scry=*sley
^= take-args
:* wire=/channel/poke/'0123456789abcdef'/'0' duct=~[/http-blah]
^- (hypo sign:light-gate)
:- *type
[%g %unto %coup ~]
==
moves=~
==
:: subscription gets a success message
::
=^ results3 light-gate
%- light-take :*
light-gate
now=(add ~1111.1.2 ~m1)
scry=*sley
^= take-args
:* wire=/channel/subscription/'0123456789abcdef'/'1' duct=~[/http-blah]
^- (hypo sign:light-gate)
:- *type
[%g %unto %reap ~]
==
moves=~
==
:: subscription gets a result
::
=^ results4 light-gate
%- light-take :*
light-gate
now=(add ~1111.1.2 ~m2)
scry=*sley
^= take-args
:* wire=/channel/subscription/'0123456789abcdef'/'1' duct=~[/http-blah]
^- (hypo sign:light-gate)
:- *type
[%g %unto %diff %json !>(`json`[%a [%n '1'] [%n '2'] ~])]
==
moves=~
==
:: open up the channel
::
:: send the channel a poke and a subscription request
::
=^ results5 light-gate
%- light-call :*
light-gate
now=~1111.1.2
scry=*sley
^= call-args
:* duct=~[/http-get-open] ~
%inbound-request
%.n
[%ipv4 .192.168.1.1]
%'GET'
'/~/channel/0123456789abcdef'
['cookie' 'urbauth=0v3.q0p7t.mlkkq.cqtto.p0nvi.2ieea']~
~
==
^= expected-moves
^- (list move:light-gate)
:~ :* duct=~[/http-get-open]
%give
%http-response
%start
200
:~ ['content-type' 'text/event-stream']
['cache-control' 'no-cache']
['connection' 'keep-alive']
==
::
:- ~
%- as-octs:mimes:html
'''
id: 0
data: {"ok":"ok","response":"poke","id":0}
id: 1
data: {"ok":"ok","response":"subscribe","id":1}
id: 2
data: {"response":"diff","id":1,"json":[1,2]}
'''
::
complete=%.n
==
:: TODO: Need to cancel on the original duct!
::
:* duct=~[/http-get-open] %pass /channel/timeout/'0123456789abcdef'
[%b %rest ~1111.1.2..12.00.00]
== == ==
::
;: weld ;: weld
results1 results1
results2 results2
results3 results3
results4 results4
results5
== ==
:: ::
++ light-call ++ light-call
@ -1094,4 +1127,97 @@
:: ::
:_ light-gate :_ light-gate
(weld results1 results2) (weld results1 results2)
:: performs all initialization and an initial PUT.
::
++ perform-init-start-channel
|= $: light-gate=_light-gate
scry=sley
==
^- [tang _light-gate]
::
=^ results1 light-gate
%- light-call :*
light-gate
now=~1111.1.1
scry=*sley
call-args=[duct=~[/init] ~ [%init ~nul]]
expected-moves=~
==
:: ensure there's an authenticated session
::
=^ results2 light-gate
%- perform-authentication :*
light-gate
now=~1111.1.2
scry=*sley
==
:: send the channel a poke and a subscription request
::
=^ results3 light-gate
%- light-call-with-comparator :*
light-gate
now=~1111.1.2
scry=*sley
^= call-args
:* duct=~[/http-blah] ~
%inbound-request
%.n
[%ipv4 .192.168.1.1]
%'PUT'
'/~/channel/0123456789abcdef'
['cookie' 'urbauth=0v3.q0p7t.mlkkq.cqtto.p0nvi.2ieea']~
::
:- ~
%- as-octs:mimes:html
'''
[{"action": "poke",
"id": 0,
"ship": "nul",
"app": "one",
"mark": "a",
"json": 5},
{"action": "subscribe",
"id": 1,
"ship": "nul",
"app": "two",
"path": "/one/two/three"}
]
'''
==
^= comparator
|= moves=(list move:light-gate)
^- tang
::
?. ?=([^ ^ ^ ^ ~] moves)
[%leaf "wrong number of moves: {<(lent moves)>}"]~
::
;: weld
%+ expect-gall-deal
:* /channel/poke/'0123456789abcdef'/'0'
[~nul ~nul] %one
%punk %a %json !>([%n '5'])
==
card.i.moves
::
%+ expect-gall-deal
:* /channel/subscription/'0123456789abcdef'/'1'
[~nul ~nul] %two
%peel %json /one/two/three
==
card.i.t.moves
::
%+ expect-eq
!> [~[/http-blah] %give %http-response %start 200 ~ ~ %.y]
!> i.t.t.moves
::
%+ expect-eq
!> :* ~[/http-blah] %pass
/channel/timeout/'0123456789abcdef'
%b %wait (add ~1111.1.2 ~h12)
==
!> i.t.t.t.moves
== ==
::
:_ light-gate
:(weld results1 results2 results3)
-- --