Fix event queue and add end-to-end test of the channels, including retry logic.

This commit is contained in:
Elliot Glaysher 2019-01-16 14:58:07 -08:00
parent 831d7f25b3
commit 49d43ff11b
2 changed files with 251 additions and 5 deletions

View File

@ -301,14 +301,14 @@
?: =(~ q)
~
::
=^ head=[id=@ud lines=wall] q ~(get to q)
=/ next=[item=[id=@ud lines=wall] _q] ~(get to q)
:: if the head of the queue is newer than the acknowledged id, we're done
::
?: (gte id.head id)
?: (gth id.item.next id)
q
:: otherwise, check next item
::
$
$(q +:next)
:: +parse-channel-request: parses a list of channel-requests
::
:: Parses a json array into a list of +channel-request. If any of the items
@ -1176,7 +1176,6 @@
++ on-put-request
|= [channel-id=@t =http-request]
^- [(list move) server-state]
~& %on-put-request
:: error when there's no body
::
?~ body.http-request

View File

@ -1008,7 +1008,6 @@
results2
==
::
::
++ test-prune-events
=/ q=(qeu [id=@ud lines=wall]) ~
=. q (~(put to q) [0 ~])
@ -1021,6 +1020,254 @@
::
(expect-eq !>([~ [4 ~]]) !>(~(top to q)))
::
++ test-channel-sends-unacknowledged-events-on-reconnection
:: common initialization
::
=^ results1 light-gate (perform-init-start-channel light-gate *sley)
:: poke gets a success message
::
=^ results2 light-gate
%- light-take :*
light-gate
now=(add ~1111.1.2 ~m1)
scry=*sley
^= take-args
:* wire=/channel/poke/'0123456789abcdef'/'0' duct=~[/http-put-request]
^- (hypo sign:light-gate)
:- *type
[%g %unto %coup ~]
==
moves=~
==
:: subscription gets a success message
::
=^ results3 light-gate
%- light-take :*
light-gate
now=(add ~1111.1.2 ~m2)
scry=*sley
^= take-args
:* wire=/channel/subscription/'0123456789abcdef'/'1' duct=~[/http-put-request]
^- (hypo sign:light-gate)
:- *type
[%g %unto %reap ~]
==
moves=~
==
:: opens the http channel
::
=^ results4 light-gate
%- light-call :*
light-gate
now=(add ~1111.1.2 ~m3)
scry=*sley
^= call-args
:* duct=~[/http-get-open] ~
%inbound-request
%.n
[%ipv4 .192.168.1.1]
%'GET'
'/~/channel/0123456789abcdef'
['cookie' 'urbauth=0v3.q0p7t.mlkkq.cqtto.p0nvi.2ieea']~
~
==
^= expected-moves
^- (list move:light-gate)
:~ :* duct=~[/http-get-open]
%give
%http-response
%start
200
:~ ['content-type' 'text/event-stream']
['cache-control' 'no-cache']
['connection' 'keep-alive']
==
::
:- ~
%- as-octs:mimes:html
'''
id: 0
data: {"ok":"ok","id":0,"response":"poke"}
id: 1
data: {"ok":"ok","id":1,"response":"subscribe"}
'''
::
complete=%.n
==
:: opening the channel cancels the timeout timer
::
:* duct=~[/http-put-request] %pass
/channel/timeout/'0123456789abcdef'
[%b %rest :(add ~1111.1.2 ~h12)]
== == ==
:: first subscription result gets sent to the user
::
=^ results5 light-gate
%- light-take :*
light-gate
now=(add ~1111.1.2 ~m4)
scry=*sley
^= take-args
:* wire=/channel/subscription/'0123456789abcdef'/'1' duct=~[/http-put-request]
^- (hypo sign:light-gate)
:- *type
[%g %unto %diff %json !>(`json`[%a [%n '1'] ~])]
==
^= moves
^- (list move:light-gate)
:~ :* duct=~[/http-get-open]
%give
%http-response
%continue
:- ~
%- as-octs:mimes:html
'''
id: 2
data: {"json":[1],"id":1,"response":"diff"}
'''
complete=%.n
== == ==
:: the client now acknowledges up to event 1
::
:: send the channel a poke and a subscription request
::
=^ results6 light-gate
%- light-call-with-comparator :*
light-gate
now=(add ~1111.1.2 ~m5)
scry=*sley
^= call-args
:* duct=~[/http-put-request] ~
%inbound-request
%.n
[%ipv4 .192.168.1.1]
%'PUT'
'/~/channel/0123456789abcdef'
['cookie' 'urbauth=0v3.q0p7t.mlkkq.cqtto.p0nvi.2ieea']~
::
:- ~
%- as-octs:mimes:html
'''
[{"action": "ack",
"event-id": 1}
]
'''
==
^= comparator
|= moves=(list move:light-gate)
^- tang
::
?. ?=([^ ~] moves)
[%leaf "wrong number of moves: {<(lent moves)>}"]~
::
%+ expect-eq
!> [~[/http-put-request] %give %http-response %start 200 ~ ~ %.y]
!> i.moves
==
:: the client connection is detected to be broken
::
=^ results7 light-gate
%- light-call :*
light-gate
now=(add ~1111.1.2 ~m6)
scry=*sley
call-args=[duct=~[/http-get-open] ~ %cancel-inbound-request ~]
^= expected-moves
^- (list move:light-gate)
:: closing the channel restarts the timeout timer
::
:~ :* duct=~[/http-get-open] %pass
/channel/timeout/'0123456789abcdef'
%b %wait :(add ~1111.1.2 ~h12 ~m6)
== ==
==
:: another subscription result while the user is disconnected
::
=^ results8 light-gate
%- light-take :*
light-gate
now=(add ~1111.1.2 ~m7)
scry=*sley
^= take-args
:* wire=/channel/subscription/'0123456789abcdef'/'1' duct=~[/http-put-request]
^- (hypo sign:light-gate)
:- *type
[%g %unto %diff %json !>(`json`[%a [%n '2'] ~])]
==
moves=~
==
:: the client now retries to connect
::
:: Because the client has acknowledged up to event 1, we should start the connection by
:: resending events 2 and 3.
::
=^ results9 light-gate
%- light-call :*
light-gate
now=(add ~1111.1.2 ~m8)
scry=*sley
^= call-args
:* duct=~[/http-get-open] ~
%inbound-request
%.n
[%ipv4 .192.168.1.1]
%'GET'
'/~/channel/0123456789abcdef'
['cookie' 'urbauth=0v3.q0p7t.mlkkq.cqtto.p0nvi.2ieea']~
~
==
^= expected-moves
^- (list move:light-gate)
:~ :* duct=~[/http-get-open]
%give
%http-response
%start
200
:~ ['content-type' 'text/event-stream']
['cache-control' 'no-cache']
['connection' 'keep-alive']
==
::
:- ~
%- as-octs:mimes:html
'''
id: 2
data: {"json":[1],"id":1,"response":"diff"}
id: 3
data: {"json":[2],"id":1,"response":"diff"}
'''
::
complete=%.n
==
:: opening the channel cancels the timeout timer
::
:* duct=~[/http-get-open] %pass
/channel/timeout/'0123456789abcdef'
:: add ~m6 because that was the time of the last GET
::
[%b %rest :(add ~1111.1.2 ~m6 ~h12)]
== == ==
::
;: weld
results1
results2
results3
results4
results5
results6
results7
results8
results9
==
::
++ light-call
|= $: light-gate=_light-gate
now=@da