mirror of
https://github.com/urbit/shrub.git
synced 2024-12-14 20:02:51 +03:00
Merge pull request #1124 from urbit/rver-handle-multiple-subscriptions
Make %light handle multiple subscriptions on the same path.
This commit is contained in:
commit
801f298eb9
@ -242,7 +242,7 @@
|
||||
:: We maintain a list of subscriptions so if a channel times out, we
|
||||
:: can cancel all the subscriptions we've made.
|
||||
::
|
||||
subscriptions=(list [ship=@p app=term =path])
|
||||
subscriptions=(map wire [ship=@p app=term =path])
|
||||
==
|
||||
:: channel-request: an action requested on a channel
|
||||
::
|
||||
@ -258,7 +258,7 @@
|
||||
[%subscribe request-id=@ud ship=@p app=term =path]
|
||||
:: %unsubscribe: unsubscribes from an application path
|
||||
::
|
||||
[%unsubscribe request-id=@ud ship=@p app=term =path]
|
||||
[%unsubscribe request-id=@ud subscription-id=@ud]
|
||||
==
|
||||
:: channel-timeout: the delay before a channel should be reaped
|
||||
::
|
||||
@ -324,7 +324,7 @@
|
||||
?: =('unsubscribe' u.maybe-key)
|
||||
%. item
|
||||
%+ pe %unsubscribe
|
||||
(ot id+ni ship+(su fed:ag) app+so path+(su ;~(pfix fas (more fas urs:ab))) ~)
|
||||
(ot id+ni subscription+ni ~)
|
||||
:: if we reached this, we have an invalid action key. fail parsing.
|
||||
::
|
||||
~
|
||||
@ -404,7 +404,7 @@
|
||||
~
|
||||
==
|
||||
==
|
||||
:: +error-page: 400 page, with an error string if logged in
|
||||
:: +error-page: error page, with an error string if logged in
|
||||
::
|
||||
++ error-page
|
||||
|= [code=@ud authorized=? url=@t t=tape]
|
||||
@ -505,8 +505,10 @@
|
||||
});
|
||||
}
|
||||
|
||||
// subscribes to a path on an
|
||||
// subscribes to a path on an specific app and ship.
|
||||
//
|
||||
// Returns a subscription id, which is the same as the same internal id
|
||||
// passed to your Urbit.
|
||||
subscribe(ship, app, path, connectionErrFunc, eventFunc, quitFunc) {
|
||||
var id = this.nextId();
|
||||
this.outstandingSubscriptions.set(
|
||||
@ -519,6 +521,19 @@
|
||||
"app": app,
|
||||
"path": path
|
||||
});
|
||||
|
||||
return id;
|
||||
}
|
||||
|
||||
// unsubscribe to a specific subscription
|
||||
//
|
||||
unsubscribe(subscriptionId) {
|
||||
var id = this.nextId();
|
||||
this.sendJSONToChannel({
|
||||
"id": id,
|
||||
"action": "unsubscribe",
|
||||
"subscription": subscriptionId
|
||||
});
|
||||
}
|
||||
|
||||
// sends a JSON command command to the server.
|
||||
@ -1200,44 +1215,49 @@
|
||||
::
|
||||
%subscribe
|
||||
::
|
||||
=/ channel-wire=path
|
||||
/channel/subscription/[channel-id]/(scot %ud request-id.i.requests)
|
||||
::
|
||||
=. gall-moves
|
||||
:_ gall-moves
|
||||
^- move
|
||||
:^ duct %pass
|
||||
/channel/subscription/[channel-id]/(scot %ud request-id.i.requests)
|
||||
:^ duct %pass channel-wire
|
||||
=, i.requests
|
||||
[%g %deal [our ship] `cush:gall`[app %peel %json path]]
|
||||
:: TODO: Check existence to prevent duplicates?
|
||||
::
|
||||
=. session.channel-state.state
|
||||
%+ ~(jab by session.channel-state.state) channel-id
|
||||
|= =channel
|
||||
^+ channel
|
||||
=, i.requests
|
||||
channel(subscriptions [[ship app path] subscriptions.channel])
|
||||
channel(subscriptions (~(put by subscriptions.channel) channel-wire [ship app path]))
|
||||
::
|
||||
$(requests t.requests)
|
||||
::
|
||||
%unsubscribe
|
||||
=/ channel-wire=path
|
||||
/channel/subscription/[channel-id]/(scot %ud subscription-id.i.requests)
|
||||
::
|
||||
=/ subscriptions
|
||||
subscriptions:(~(got by session.channel-state.state) channel-id)
|
||||
::
|
||||
?~ maybe-subscription=(~(get by subscriptions) channel-wire)
|
||||
:: the client sent us a weird request referring to a subscription
|
||||
:: which isn't active.
|
||||
::
|
||||
~& [%missing-subscription-in-unsubscribe channel-wire]
|
||||
$(requests t.requests)
|
||||
::
|
||||
=. gall-moves
|
||||
:_ gall-moves
|
||||
^- move
|
||||
:^ duct %pass
|
||||
/channel/subscription/[channel-id]/(scot %ud request-id.i.requests)
|
||||
=, i.requests
|
||||
:^ duct %pass channel-wire
|
||||
=, u.maybe-subscription
|
||||
[%g %deal [our ship] `cush:gall`[app %pull ~]]
|
||||
:: TODO: Check existence to prevent duplicates?
|
||||
::
|
||||
=. session.channel-state.state
|
||||
%+ ~(jab by session.channel-state.state) channel-id
|
||||
|= =channel
|
||||
^+ channel
|
||||
=, i.requests
|
||||
%_ channel
|
||||
subscriptions
|
||||
(skip subscriptions.channel |=(a=[@p term ^path] =(a [ship app path])))
|
||||
==
|
||||
channel(subscriptions (~(del by subscriptions.channel) channel-wire))
|
||||
::
|
||||
$(requests t.requests)
|
||||
==
|
||||
@ -1368,14 +1388,11 @@
|
||||
==
|
||||
:: produce a list of moves which cancels every gall subscription
|
||||
::
|
||||
%+ turn subscriptions.session
|
||||
|= [ship=@p app=term =path]
|
||||
%+ turn ~(tap by subscriptions.session)
|
||||
|= [channel-wire=path ship=@p app=term =path]
|
||||
^- move
|
||||
:: todo: double check this; which duct should we be canceling on? does
|
||||
:: gall strongly bind to a duct as a cause like ford does?
|
||||
::
|
||||
:^ duct %pass /channel/subscription/[channel-id]
|
||||
[%g %deal [our ship] app %pull ~]
|
||||
[duct %pass channel-wire [%g %deal [our ship] app %pull ~]]
|
||||
--
|
||||
:: +handle-ford-response: translates a ford response for the outside world
|
||||
::
|
||||
|
@ -772,15 +772,13 @@
|
||||
'''
|
||||
::
|
||||
%+ expect-eq
|
||||
!> `[%unsubscribe 2 ~marlyt %thing /other]~
|
||||
!> `[%unsubscribe 2 1]~
|
||||
!> %- parse-channel-request:http-server-gate
|
||||
%- need %- de-json:html
|
||||
'''
|
||||
[{"action": "unsubscribe",
|
||||
"id": 2,
|
||||
"ship": "marlyt",
|
||||
"app": "thing",
|
||||
"path": "/other"}]
|
||||
"subscription": 1}]
|
||||
'''
|
||||
::
|
||||
%+ expect-eq
|
||||
@ -893,7 +891,7 @@
|
||||
[%leaf "wrong number of moves: {<(lent moves)>}"]~
|
||||
::
|
||||
%+ expect-gall-deal
|
||||
:* /channel/subscription/'0123456789abcdef'
|
||||
:* /channel/subscription/'0123456789abcdef'/1
|
||||
[~nul ~nul] %two %pull ~
|
||||
==
|
||||
card.i.moves
|
||||
@ -1164,9 +1162,7 @@
|
||||
'''
|
||||
[{"action": "unsubscribe",
|
||||
"id": 2,
|
||||
"ship": "nul",
|
||||
"app": "two",
|
||||
"path": "/one/two/three"}
|
||||
"subscription": 1}
|
||||
]
|
||||
'''
|
||||
==
|
||||
@ -1178,8 +1174,10 @@
|
||||
[%leaf "wrong number of moves: {<(lent moves)>}"]~
|
||||
::
|
||||
;: weld
|
||||
:: we want to cancel the subscription id on which we originally subscribed
|
||||
::
|
||||
%+ expect-gall-deal
|
||||
:* /channel/subscription/'0123456789abcdef'/'2'
|
||||
:* /channel/subscription/'0123456789abcdef'/'1'
|
||||
[~nul ~nul] %two %pull ~
|
||||
==
|
||||
card.i.moves
|
||||
@ -1210,6 +1208,276 @@
|
||||
results4
|
||||
==
|
||||
::
|
||||
++ test-channel-double-subscription-works
|
||||
:: common initialization
|
||||
::
|
||||
=^ results1 http-server-gate (perform-init-start-channel http-server-gate *sley)
|
||||
:: poke gets a success message
|
||||
::
|
||||
=^ results2 http-server-gate
|
||||
%- http-server-take :*
|
||||
http-server-gate
|
||||
now=(add ~1111.1.2 ~m1)
|
||||
scry=scry-provides-code
|
||||
^= take-args
|
||||
:* wire=/channel/poke/'0123456789abcdef'/'0' duct=~[/http-put-request]
|
||||
^- (hypo sign:http-server-gate)
|
||||
:- *type
|
||||
[%g %unto %coup ~]
|
||||
==
|
||||
moves=~
|
||||
==
|
||||
:: subscription gets a success message
|
||||
::
|
||||
=^ results3 http-server-gate
|
||||
%- http-server-take :*
|
||||
http-server-gate
|
||||
now=(add ~1111.1.2 ~m2)
|
||||
scry=scry-provides-code
|
||||
^= take-args
|
||||
:* wire=/channel/subscription/'0123456789abcdef'/'1' duct=~[/http-put-request]
|
||||
^- (hypo sign:http-server-gate)
|
||||
:- *type
|
||||
[%g %unto %reap ~]
|
||||
==
|
||||
moves=~
|
||||
==
|
||||
:: now make a second subscription from the client on the same path
|
||||
::
|
||||
=^ results3 http-server-gate
|
||||
%- http-server-call-with-comparator :*
|
||||
http-server-gate
|
||||
now=(add ~1111.1.2 ~m3)
|
||||
scry=scry-provides-code
|
||||
^= call-args
|
||||
:* duct=~[/http-put-request] ~
|
||||
%request
|
||||
%.n
|
||||
[%ipv4 .192.168.1.1]
|
||||
%'PUT'
|
||||
'/~/channel/0123456789abcdef'
|
||||
['cookie' 'urbauth=0v3.q0p7t.mlkkq.cqtto.p0nvi.2ieea']~
|
||||
::
|
||||
:- ~
|
||||
%- as-octs:mimes:html
|
||||
'''
|
||||
[{"action": "subscribe",
|
||||
"id": 2,
|
||||
"ship": "nul",
|
||||
"app": "two",
|
||||
"path": "/one/two/three"}
|
||||
]
|
||||
'''
|
||||
==
|
||||
^= comparator
|
||||
|= moves=(list move:http-server-gate)
|
||||
^- tang
|
||||
::
|
||||
?. ?=([^ ^ ^ ^ ~] moves)
|
||||
[%leaf "wrong number of moves: {<(lent moves)>}"]~
|
||||
::
|
||||
;: weld
|
||||
%+ expect-gall-deal
|
||||
:* /channel/subscription/'0123456789abcdef'/'2'
|
||||
[~nul ~nul] %two
|
||||
%peel %json /one/two/three
|
||||
==
|
||||
card.i.moves
|
||||
::
|
||||
%+ expect-eq
|
||||
!> [~[/http-put-request] %give %response %start [200 ~] ~ %.y]
|
||||
!> i.t.moves
|
||||
::
|
||||
%+ expect-eq
|
||||
!> :* ~[/http-put-request] %pass
|
||||
/channel/timeout/'0123456789abcdef'
|
||||
%b %rest (add ~1111.1.2 ~h12)
|
||||
==
|
||||
!> i.t.t.moves
|
||||
::
|
||||
%+ expect-eq
|
||||
!> :* ~[/http-put-request] %pass
|
||||
/channel/timeout/'0123456789abcdef'
|
||||
%b %wait :(add ~1111.1.2 ~h12 ~m3)
|
||||
==
|
||||
!> i.t.t.t.moves
|
||||
== ==
|
||||
:: subscription gets a result (on the id 1)
|
||||
::
|
||||
=^ results4 http-server-gate
|
||||
%- http-server-take :*
|
||||
http-server-gate
|
||||
now=(add ~1111.1.2 ~m2)
|
||||
scry=scry-provides-code
|
||||
^= take-args
|
||||
:* wire=/channel/subscription/'0123456789abcdef'/'1' duct=~[/http-put-request]
|
||||
^- (hypo sign:http-server-gate)
|
||||
:- *type
|
||||
[%g %unto %diff %json !>(`json`[%a [%n '1'] [%n '2'] ~])]
|
||||
==
|
||||
moves=~
|
||||
==
|
||||
:: subscription gets a result (on the id 2)
|
||||
::
|
||||
=^ results5 http-server-gate
|
||||
%- http-server-take :*
|
||||
http-server-gate
|
||||
now=(add ~1111.1.2 ~m2)
|
||||
scry=scry-provides-code
|
||||
^= take-args
|
||||
:* wire=/channel/subscription/'0123456789abcdef'/'2' duct=~[/http-put-request]
|
||||
^- (hypo sign:http-server-gate)
|
||||
:- *type
|
||||
[%g %unto %diff %json !>(`json`[%a [%n '1'] [%n '2'] ~])]
|
||||
==
|
||||
moves=~
|
||||
==
|
||||
:: open up the channel
|
||||
::
|
||||
=^ results6 http-server-gate
|
||||
%- http-server-call :*
|
||||
http-server-gate
|
||||
now=(add ~1111.1.2 ~m3)
|
||||
scry=scry-provides-code
|
||||
^= call-args
|
||||
:* duct=~[/http-get-open] ~
|
||||
%request
|
||||
%.n
|
||||
[%ipv4 .192.168.1.1]
|
||||
%'GET'
|
||||
'/~/channel/0123456789abcdef'
|
||||
['cookie' 'urbauth=0v3.q0p7t.mlkkq.cqtto.p0nvi.2ieea']~
|
||||
~
|
||||
==
|
||||
^= expected-moves
|
||||
^- (list move:http-server-gate)
|
||||
:~ :* duct=~[/http-get-open]
|
||||
%give
|
||||
%response
|
||||
%start
|
||||
:- 200
|
||||
:~ ['content-type' 'text/event-stream']
|
||||
['cache-control' 'no-cache']
|
||||
['connection' 'keep-alive']
|
||||
==
|
||||
::
|
||||
:- ~
|
||||
%- as-octs:mimes:html
|
||||
'''
|
||||
id: 0
|
||||
data: {"ok":"ok","id":0,"response":"poke"}
|
||||
|
||||
id: 1
|
||||
data: {"ok":"ok","id":1,"response":"subscribe"}
|
||||
|
||||
id: 2
|
||||
data: {"json":[1,2],"id":1,"response":"diff"}
|
||||
|
||||
id: 3
|
||||
data: {"json":[1,2],"id":2,"response":"diff"}
|
||||
|
||||
|
||||
'''
|
||||
::
|
||||
complete=%.n
|
||||
==
|
||||
:: opening the channel cancels the timeout timer
|
||||
::
|
||||
:* duct=~[/http-put-request] %pass
|
||||
/channel/timeout/'0123456789abcdef'
|
||||
[%b %rest ~1111.1.2..12.03.00]
|
||||
== == ==
|
||||
:: we can close the first channel without closing the second
|
||||
::
|
||||
=^ results7 http-server-gate
|
||||
%- http-server-call-with-comparator :*
|
||||
http-server-gate
|
||||
now=(add ~1111.1.2 ~m3)
|
||||
scry=scry-provides-code
|
||||
^= call-args
|
||||
:* duct=~[/http-put-request] ~
|
||||
%request
|
||||
%.n
|
||||
[%ipv4 .192.168.1.1]
|
||||
%'PUT'
|
||||
'/~/channel/0123456789abcdef'
|
||||
['cookie' 'urbauth=0v3.q0p7t.mlkkq.cqtto.p0nvi.2ieea']~
|
||||
::
|
||||
:- ~
|
||||
%- as-octs:mimes:html
|
||||
'''
|
||||
[{"action": "unsubscribe",
|
||||
"id": 3,
|
||||
"subscription": 1}
|
||||
]
|
||||
'''
|
||||
==
|
||||
^= comparator
|
||||
|= moves=(list move:http-server-gate)
|
||||
^- tang
|
||||
::
|
||||
?. ?=([^ ^ ~] moves)
|
||||
[%leaf "wrong number of moves: {<(lent moves)>}"]~
|
||||
::
|
||||
;: weld
|
||||
%+ expect-gall-deal
|
||||
:* /channel/subscription/'0123456789abcdef'/'1'
|
||||
[~nul ~nul] %two %pull ~
|
||||
==
|
||||
card.i.moves
|
||||
::
|
||||
%+ expect-eq
|
||||
!> [~[/http-put-request] %give %response %start [200 ~] ~ %.y]
|
||||
!> i.t.moves
|
||||
== ==
|
||||
:: gall responds on the second subscription.
|
||||
::
|
||||
:: This just tests that closing one of the two subscriptions doesn't
|
||||
:: unsubscribe to the other.
|
||||
::
|
||||
=^ results8 http-server-gate
|
||||
%- http-server-take-with-comparator :*
|
||||
http-server-gate
|
||||
now=(add ~1111.1.2 ~m2)
|
||||
scry=scry-provides-code
|
||||
^= take-args
|
||||
:* wire=/channel/subscription/'0123456789abcdef'/'2' duct=~[/http-put-request]
|
||||
^- (hypo sign:http-server-gate)
|
||||
:- *type
|
||||
[%g %unto %diff %json !>(`json`[%a [%n '1'] [%n '2'] ~])]
|
||||
==
|
||||
^= comparator
|
||||
|= moves=(list move:http-server-gate)
|
||||
^- tang
|
||||
::
|
||||
?. ?=([^ ~] moves)
|
||||
[%leaf "wrong number of moves: {<(lent moves)>}"]~
|
||||
%+ expect-eq
|
||||
!> :* ~[/http-get-open] %give %response %continue
|
||||
:- ~
|
||||
%- as-octs:mimes:html
|
||||
'''
|
||||
id: 4
|
||||
data: {"json":[1,2],"id":2,"response":"diff"}
|
||||
|
||||
|
||||
'''
|
||||
complete=%.n
|
||||
==
|
||||
!> i.moves
|
||||
==
|
||||
::
|
||||
;: weld
|
||||
results1
|
||||
results2
|
||||
results3
|
||||
results4
|
||||
results5
|
||||
results6
|
||||
results7
|
||||
results8
|
||||
==
|
||||
::
|
||||
++ test-prune-events
|
||||
=/ q=(qeu [id=@ud lines=wall]) ~
|
||||
=. q (~(put to q) [0 ~])
|
||||
|
Loading…
Reference in New Issue
Block a user