Make %light handle multiple subscriptions on the same path.

Some applications make multiple subscriptions to the same app on the
same path. Support this by changing the subscription interface to return
a numeric reference which needs to be passed in during the explicit
unsubscribe, and ensure that we send per-subscription instead of
per-path unsubscribe commands to Gall.
This commit is contained in:
Elliot Glaysher 2019-03-26 15:52:32 -07:00
parent 444d5276c0
commit 49dfb2d1bd
2 changed files with 320 additions and 35 deletions

View File

@ -242,7 +242,7 @@
:: We maintain a list of subscriptions so if a channel times out, we
:: can cancel all the subscriptions we've made.
::
subscriptions=(list [ship=@p app=term =path])
subscriptions=(map wire [ship=@p app=term =path])
==
:: channel-request: an action requested on a channel
::
@ -258,7 +258,7 @@
[%subscribe request-id=@ud ship=@p app=term =path]
:: %unsubscribe: unsubscribes from an application path
::
[%unsubscribe request-id=@ud ship=@p app=term =path]
[%unsubscribe request-id=@ud subscription-id=@ud]
==
:: channel-timeout: the delay before a channel should be reaped
::
@ -324,7 +324,7 @@
?: =('unsubscribe' u.maybe-key)
%. item
%+ pe %unsubscribe
(ot id+ni ship+(su fed:ag) app+so path+(su ;~(pfix fas (more fas urs:ab))) ~)
(ot id+ni subscription+ni ~)
:: if we reached this, we have an invalid action key. fail parsing.
::
~
@ -404,7 +404,7 @@
~
==
==
:: +error-page: 400 page, with an error string if logged in
:: +error-page: error page, with an error string if logged in
::
++ error-page
|= [code=@ud authorized=? url=@t t=tape]
@ -505,8 +505,10 @@
});
}
// subscribes to a path on an
// subscribes to a path on an specific app and ship.
//
// Returns a subscription id, which is the same as the same internal id
// passed to your Urbit.
subscribe(ship, app, path, connectionErrFunc, eventFunc, quitFunc) {
var id = this.nextId();
this.outstandingSubscriptions.set(
@ -519,6 +521,19 @@
"app": app,
"path": path
});
return id;
}
// unsubscribe to a specific subscription
//
unsubscribe(subscriptionId) {
var id = this.nextId();
this.sendJSONToChannel({
"id": id,
"action": "unsubscribe",
"subscription": subscriptionId
});
}
// sends a JSON command command to the server.
@ -1200,44 +1215,49 @@
::
%subscribe
::
=/ channel-wire=path
/channel/subscription/[channel-id]/(scot %ud request-id.i.requests)
::
=. gall-moves
:_ gall-moves
^- move
:^ duct %pass
/channel/subscription/[channel-id]/(scot %ud request-id.i.requests)
:^ duct %pass channel-wire
=, i.requests
[%g %deal [our ship] `cush:gall`[app %peel %json path]]
:: TODO: Check existence to prevent duplicates?
::
=. session.channel-state.state
%+ ~(jab by session.channel-state.state) channel-id
|= =channel
^+ channel
=, i.requests
channel(subscriptions [[ship app path] subscriptions.channel])
channel(subscriptions (~(put by subscriptions.channel) channel-wire [ship app path]))
::
$(requests t.requests)
::
%unsubscribe
=/ channel-wire=path
/channel/subscription/[channel-id]/(scot %ud subscription-id.i.requests)
::
=/ subscriptions
subscriptions:(~(got by session.channel-state.state) channel-id)
::
?~ maybe-subscription=(~(get by subscriptions) channel-wire)
:: the client sent us a weird request referring to a subscription
:: which isn't active.
::
~& [%missing-subscription-in-unsubscribe channel-wire]
$(requests t.requests)
::
=. gall-moves
:_ gall-moves
^- move
:^ duct %pass
/channel/subscription/[channel-id]/(scot %ud request-id.i.requests)
=, i.requests
:^ duct %pass channel-wire
=, u.maybe-subscription
[%g %deal [our ship] `cush:gall`[app %pull ~]]
:: TODO: Check existence to prevent duplicates?
::
=. session.channel-state.state
%+ ~(jab by session.channel-state.state) channel-id
|= =channel
^+ channel
=, i.requests
%_ channel
subscriptions
(skip subscriptions.channel |=(a=[@p term ^path] =(a [ship app path])))
==
channel(subscriptions (~(del by subscriptions.channel) channel-wire))
::
$(requests t.requests)
==
@ -1368,14 +1388,11 @@
==
:: produce a list of moves which cancels every gall subscription
::
%+ turn subscriptions.session
|= [ship=@p app=term =path]
%+ turn ~(tap by subscriptions.session)
|= [channel-wire=path ship=@p app=term =path]
^- move
:: todo: double check this; which duct should we be canceling on? does
:: gall strongly bind to a duct as a cause like ford does?
::
:^ duct %pass /channel/subscription/[channel-id]
[%g %deal [our ship] app %pull ~]
[duct %pass channel-wire [%g %deal [our ship] app %pull ~]]
--
:: +handle-ford-response: translates a ford response for the outside world
::

View File

@ -772,15 +772,13 @@
'''
::
%+ expect-eq
!> `[%unsubscribe 2 ~marlyt %thing /other]~
!> `[%unsubscribe 2 1]~
!> %- parse-channel-request:http-server-gate
%- need %- de-json:html
'''
[{"action": "unsubscribe",
"id": 2,
"ship": "marlyt",
"app": "thing",
"path": "/other"}]
"subscription": 1}]
'''
::
%+ expect-eq
@ -893,7 +891,7 @@
[%leaf "wrong number of moves: {<(lent moves)>}"]~
::
%+ expect-gall-deal
:* /channel/subscription/'0123456789abcdef'
:* /channel/subscription/'0123456789abcdef'/1
[~nul ~nul] %two %pull ~
==
card.i.moves
@ -1164,9 +1162,7 @@
'''
[{"action": "unsubscribe",
"id": 2,
"ship": "nul",
"app": "two",
"path": "/one/two/three"}
"subscription": 1}
]
'''
==
@ -1178,8 +1174,10 @@
[%leaf "wrong number of moves: {<(lent moves)>}"]~
::
;: weld
:: we want to cancel the subscription id on which we originally subscribed
::
%+ expect-gall-deal
:* /channel/subscription/'0123456789abcdef'/'2'
:* /channel/subscription/'0123456789abcdef'/'1'
[~nul ~nul] %two %pull ~
==
card.i.moves
@ -1210,6 +1208,276 @@
results4
==
::
++ test-channel-double-subscription-works
:: common initialization
::
=^ results1 http-server-gate (perform-init-start-channel http-server-gate *sley)
:: poke gets a success message
::
=^ results2 http-server-gate
%- http-server-take :*
http-server-gate
now=(add ~1111.1.2 ~m1)
scry=scry-provides-code
^= take-args
:* wire=/channel/poke/'0123456789abcdef'/'0' duct=~[/http-put-request]
^- (hypo sign:http-server-gate)
:- *type
[%g %unto %coup ~]
==
moves=~
==
:: subscription gets a success message
::
=^ results3 http-server-gate
%- http-server-take :*
http-server-gate
now=(add ~1111.1.2 ~m2)
scry=scry-provides-code
^= take-args
:* wire=/channel/subscription/'0123456789abcdef'/'1' duct=~[/http-put-request]
^- (hypo sign:http-server-gate)
:- *type
[%g %unto %reap ~]
==
moves=~
==
:: now make a second subscription from the client on the same path
::
=^ results3 http-server-gate
%- http-server-call-with-comparator :*
http-server-gate
now=(add ~1111.1.2 ~m3)
scry=scry-provides-code
^= call-args
:* duct=~[/http-put-request] ~
%request
%.n
[%ipv4 .192.168.1.1]
%'PUT'
'/~/channel/0123456789abcdef'
['cookie' 'urbauth=0v3.q0p7t.mlkkq.cqtto.p0nvi.2ieea']~
::
:- ~
%- as-octs:mimes:html
'''
[{"action": "subscribe",
"id": 2,
"ship": "nul",
"app": "two",
"path": "/one/two/three"}
]
'''
==
^= comparator
|= moves=(list move:http-server-gate)
^- tang
::
?. ?=([^ ^ ^ ^ ~] moves)
[%leaf "wrong number of moves: {<(lent moves)>}"]~
::
;: weld
%+ expect-gall-deal
:* /channel/subscription/'0123456789abcdef'/'2'
[~nul ~nul] %two
%peel %json /one/two/three
==
card.i.moves
::
%+ expect-eq
!> [~[/http-put-request] %give %response %start [200 ~] ~ %.y]
!> i.t.moves
::
%+ expect-eq
!> :* ~[/http-put-request] %pass
/channel/timeout/'0123456789abcdef'
%b %rest (add ~1111.1.2 ~h12)
==
!> i.t.t.moves
::
%+ expect-eq
!> :* ~[/http-put-request] %pass
/channel/timeout/'0123456789abcdef'
%b %wait :(add ~1111.1.2 ~h12 ~m3)
==
!> i.t.t.t.moves
== ==
:: subscription gets a result (on the id 1)
::
=^ results4 http-server-gate
%- http-server-take :*
http-server-gate
now=(add ~1111.1.2 ~m2)
scry=scry-provides-code
^= take-args
:* wire=/channel/subscription/'0123456789abcdef'/'1' duct=~[/http-put-request]
^- (hypo sign:http-server-gate)
:- *type
[%g %unto %diff %json !>(`json`[%a [%n '1'] [%n '2'] ~])]
==
moves=~
==
:: subscription gets a result (on the id 2)
::
=^ results5 http-server-gate
%- http-server-take :*
http-server-gate
now=(add ~1111.1.2 ~m2)
scry=scry-provides-code
^= take-args
:* wire=/channel/subscription/'0123456789abcdef'/'2' duct=~[/http-put-request]
^- (hypo sign:http-server-gate)
:- *type
[%g %unto %diff %json !>(`json`[%a [%n '1'] [%n '2'] ~])]
==
moves=~
==
:: open up the channel
::
=^ results6 http-server-gate
%- http-server-call :*
http-server-gate
now=(add ~1111.1.2 ~m3)
scry=scry-provides-code
^= call-args
:* duct=~[/http-get-open] ~
%request
%.n
[%ipv4 .192.168.1.1]
%'GET'
'/~/channel/0123456789abcdef'
['cookie' 'urbauth=0v3.q0p7t.mlkkq.cqtto.p0nvi.2ieea']~
~
==
^= expected-moves
^- (list move:http-server-gate)
:~ :* duct=~[/http-get-open]
%give
%response
%start
:- 200
:~ ['content-type' 'text/event-stream']
['cache-control' 'no-cache']
['connection' 'keep-alive']
==
::
:- ~
%- as-octs:mimes:html
'''
id: 0
data: {"ok":"ok","id":0,"response":"poke"}
id: 1
data: {"ok":"ok","id":1,"response":"subscribe"}
id: 2
data: {"json":[1,2],"id":1,"response":"diff"}
id: 3
data: {"json":[1,2],"id":2,"response":"diff"}
'''
::
complete=%.n
==
:: opening the channel cancels the timeout timer
::
:* duct=~[/http-put-request] %pass
/channel/timeout/'0123456789abcdef'
[%b %rest ~1111.1.2..12.03.00]
== == ==
:: we can close the first channel without closing the second
::
=^ results7 http-server-gate
%- http-server-call-with-comparator :*
http-server-gate
now=(add ~1111.1.2 ~m3)
scry=scry-provides-code
^= call-args
:* duct=~[/http-put-request] ~
%request
%.n
[%ipv4 .192.168.1.1]
%'PUT'
'/~/channel/0123456789abcdef'
['cookie' 'urbauth=0v3.q0p7t.mlkkq.cqtto.p0nvi.2ieea']~
::
:- ~
%- as-octs:mimes:html
'''
[{"action": "unsubscribe",
"id": 3,
"subscription": 1}
]
'''
==
^= comparator
|= moves=(list move:http-server-gate)
^- tang
::
?. ?=([^ ^ ~] moves)
[%leaf "wrong number of moves: {<(lent moves)>}"]~
::
;: weld
%+ expect-gall-deal
:* /channel/subscription/'0123456789abcdef'/'1'
[~nul ~nul] %two %pull ~
==
card.i.moves
::
%+ expect-eq
!> [~[/http-put-request] %give %response %start [200 ~] ~ %.y]
!> i.t.moves
== ==
:: gall responds on the second subscription.
::
:: This just tests that closing one of the two subscriptions doesn't
:: unsubscribe to the other.
::
=^ results8 http-server-gate
%- http-server-take-with-comparator :*
http-server-gate
now=(add ~1111.1.2 ~m2)
scry=scry-provides-code
^= take-args
:* wire=/channel/subscription/'0123456789abcdef'/'2' duct=~[/http-put-request]
^- (hypo sign:http-server-gate)
:- *type
[%g %unto %diff %json !>(`json`[%a [%n '1'] [%n '2'] ~])]
==
^= comparator
|= moves=(list move:http-server-gate)
^- tang
::
?. ?=([^ ~] moves)
[%leaf "wrong number of moves: {<(lent moves)>}"]~
%+ expect-eq
!> :* ~[/http-get-open] %give %response %continue
:- ~
%- as-octs:mimes:html
'''
id: 4
data: {"json":[1,2],"id":2,"response":"diff"}
'''
complete=%.n
==
!> i.moves
==
::
;: weld
results1
results2
results3
results4
results5
results6
results7
results8
==
::
++ test-prune-events
=/ q=(qeu [id=@ud lines=wall]) ~
=. q (~(put to q) [0 ~])