Channel cancelation works now.

This commit is contained in:
Elliot Glaysher 2018-11-21 13:37:26 -08:00
parent ed3d7c2a0d
commit 2a1b801bad
2 changed files with 189 additions and 30 deletions

View File

@ -212,6 +212,9 @@
$: :: session: mapping between an arbitrary key to a channel
::
session=(map @t channel)
:: by-duct: mapping from ducts to session key
::
duct-to-key=(map duct @t)
==
:: +timer: a reference to a timer so we can cancel or update it.
::
@ -574,8 +577,6 @@
++ cancel-request
^- [(list move) server-state]
::
~& [%cancel-request duct]
::
?~ connection=(~(get by connections.state) duct)
:: nothing has handled this connection
::
@ -610,9 +611,7 @@
[~ state]
::
%channel
:: todo: this part actually matters.
::
[~ state]
on-cancel-request:by-channel
==
:: +return-static-data-on-duct: returns one piece of data all at once
::
@ -802,9 +801,88 @@
::
~& %session-not-a-put
[~ state]
:: +handle-cancel: cancels an ongoing subscription
:: +on-cancel-request: cancels an ongoing subscription
::
::++ handle-cancel
:: One of our long lived sessions just got closed. We put the associated
:: session back into the waiting state.
::
++ on-cancel-request
^- [(list move) server-state]
:: lookup the session id by duct
::
?~ maybe-channel-id=(~(get by duct-to-key.channel-state.state) duct)
~& [%canceling-nonexistant-channel duct]
[~ state]
::
=/ expiration-time=@da (add now channel-timeout)
::
:- [(set-timeout-move u.maybe-channel-id expiration-time) moves]
%_ state
session.channel-state
%+ ~(jab by session.channel-state.state) u.maybe-channel-id
|= =channel
:: if we are canceling a known channel, it should have a listener
::
?> ?=([%| *] state.channel)
channel(state [%& [expiration-time duct]])
::
duct-to-key.channel-state
(~(del by duct-to-key.channel-state.state) duct)
==
:: +set-timeout-timer-for: sets a timeout timer on a channel
::
:: This creates a channel if it doesn't exist, cancels existing timers
:: if they're already set (we cannot have duplicate timers), and (if
:: necessary) moves channels from the listening state to the expiration
:: state.
::
++ update-timeout-timer-for
|= channel-id=@t
^+ ..update-timeout-timer-for
:: when our callback should fire
::
=/ expiration-time=@da (add now channel-timeout)
:: if the channel doesn't exist, create it and set a timer
::
?~ maybe-channel=(~(get by session.channel-state.state) channel-id)
::
%_ ..update-timeout-timer-for
session.channel-state.state
%+ ~(put by session.channel-state.state) channel-id
[[%& expiration-time duct] 0 ~ ~]
::
moves
[(set-timeout-move channel-id expiration-time) moves]
==
:: if the channel has an active listener, we aren't setting any timers
::
?: ?=([%| *] state.u.maybe-channel)
..update-timeout-timer-for
:: we have a previous timer; cancel the old one and set the new one
::
%_ ..update-timeout-timer-for
session.channel-state.state
%+ ~(jab by session.channel-state.state) channel-id
|= =channel
channel(state [%& [expiration-time duct]])
::
moves
:* (cancel-timeout-move channel-id p.state.u.maybe-channel)
(set-timeout-move channel-id expiration-time)
moves
==
==
::
++ set-timeout-move
|= [channel-id=@t expiration-time=@da]
^- move
[duct %pass /channel/timeout/[channel-id] %b %wait expiration-time]
::
++ cancel-timeout-move
|= [channel-id=@t expiration-time=@da =^duct]
^- move
:^ duct %pass /channel/timeout/[channel-id]
[%b %rest expiration-time]
:: +on-get-request: handles a GET request
::
:: GET requests open a channel for the server to send events to the
@ -826,10 +904,7 @@
:: when opening an event-stream, we must cancel our timeout timer
::
=. moves
:_ moves
^- move
:^ duct.p.state.u.maybe-channel %pass /channel/timeout/[channel-id]
[%b %rest date.p.state.u.maybe-channel]
[(cancel-timeout-move channel-id p.state.u.maybe-channel) moves]
:: the http-request may include a 'Last-Event-Id' header
::
=/ maybe-last-event-id=(unit @ud)
@ -866,6 +941,10 @@
(wall-to-octs event-replay)
complete=%.n
==
:: associate this duct with this session key
::
=. duct-to-key.channel-state.state
(~(put by duct-to-key.channel-state.state) duct channel-id)
:: clear the event queue and record the duct for future output
::
=. session.channel-state.state
@ -926,24 +1005,10 @@
:: check for the existence of the channel-id
::
:: if we have no session, create a new one set to expire in
:: :channel-timeout from now.
:: :channel-timeout from now. if we have one which has a timer, update
:: that timer.
::
:: TODO: This is wrong. We always want to potentially update the
:: expiration time if there's no eventsource attached.
::
=? ..on-put-request !(~(has by session.channel-state.state) channel-id)
::
=/ expiration-time=@da (add now channel-timeout)
%_ ..on-put-request
session.channel-state.state
%+ ~(put by session.channel-state.state) channel-id
[[%& expiration-time duct] 0 ~ ~]
::
moves
:_ moves
^- move
[duct %pass /channel/timeout/[channel-id] %b %wait expiration-time]
==
=. ..on-put-request (update-timeout-timer-for channel-id)
:: for each request, execute the action passed in
::
=+ requests=u.maybe-requests
@ -1098,7 +1163,7 @@
::
:- moves
%_ state
channel-state
session.channel-state
%+ ~(jab by session.channel-state.state) channel-id
|= =^channel
^+ channel

View File

@ -859,7 +859,7 @@
=^ results5 light-gate
%- light-call :*
light-gate
now=~1111.1.2
now=(add ~1111.1.2 ~m3)
scry=*sley
^= call-args
:* duct=~[/http-get-open] ~
@ -900,11 +900,29 @@
::
complete=%.n
==
:: opening the channel cancels the timeout timer
::
:* duct=~[/http-put-request] %pass
/channel/timeout/'0123456789abcdef'
[%b %rest ~1111.1.2..12.00.00]
== == ==
:: we get a cancel when we notice the client has disconnected
::
=^ results6 light-gate
%- light-call :*
light-gate
now=(add ~1111.1.2 ~m4)
scry=*sley
call-args=[duct=~[/http-get-open] ~ %cancel-inbound-request ~]
^= expected-moves
^- (list move:light-gate)
:: closing the channel restarts the timeout timer
::
:~ :* duct=~[/http-get-open] %pass
/channel/timeout/'0123456789abcdef'
%b %wait :(add ~1111.1.2 ~h12 ~m4)
== ==
==
::
;: weld
results1
@ -912,6 +930,82 @@
results3
results4
results5
results6
==
::
::
++ test-channel-second-get-updates-timer
:: common initialization
::
=^ results1 light-gate (perform-init-start-channel light-gate *sley)
:: perform another poke to a different app
::
:: Since we haven't connected with a GET, the old timer should be canceled
:: and a new one should be set.
:: send the channel a poke and a subscription request
::
=^ results2 light-gate
%- light-call-with-comparator :*
light-gate
now=(add ~1111.1.2 ~m1)
scry=*sley
^= call-args
:* duct=~[/http-put-request] ~
%inbound-request
%.n
[%ipv4 .192.168.1.1]
%'PUT'
'/~/channel/0123456789abcdef'
['cookie' 'urbauth=0v3.q0p7t.mlkkq.cqtto.p0nvi.2ieea']~
::
:- ~
%- as-octs:mimes:html
'''
[{"action": "poke",
"id": 2,
"ship": "nul",
"app": "eight",
"mark": "a",
"json": 9}]
'''
==
^= comparator
|= moves=(list move:light-gate)
^- tang
::
?. ?=([^ ^ ^ ^ ~] moves)
[%leaf "wrong number of moves: {<(lent moves)>}"]~
::
;: weld
%+ expect-gall-deal
:* /channel/poke/'0123456789abcdef'/'2'
[~nul ~nul] %eight
%punk %a %json !>([%n '9'])
==
card.i.moves
::
%+ expect-eq
!> [~[/http-put-request] %give %http-response %start 200 ~ ~ %.y]
!> i.t.moves
::
%+ expect-eq
!> :* ~[/http-put-request] %pass
/channel/timeout/'0123456789abcdef'
%b %rest (add ~1111.1.2 ~h12)
==
!> i.t.t.moves
::
%+ expect-eq
!> :* ~[/http-put-request] %pass
/channel/timeout/'0123456789abcdef'
%b %wait :(add ~1111.1.2 ~h12 ~m1)
==
!> i.t.t.t.moves
== ==
::
;: weld
results1
results2
==
::
++ light-call