eyre: add channel \n heartbeat every 20 seconds

This commit is contained in:
pkova 2019-10-07 02:26:03 +03:00
parent 9e3b960194
commit a019c2079e
2 changed files with 118 additions and 10 deletions

View File

@ -83,7 +83,7 @@
++ axle ++ axle
$: :: date: date at which http-server's state was updated to this data structure $: :: date: date at which http-server's state was updated to this data structure
:: ::
date=%~2019.1.7 date=%~2019.10.6
:: server-state: state of inbound requests :: server-state: state of inbound requests
:: ::
=server-state =server-state
@ -216,8 +216,6 @@
:: 'Last-Event-Id: ' header to the server; the server then resends all :: 'Last-Event-Id: ' header to the server; the server then resends all
:: events since then. :: events since then.
:: ::
:: TODO: Send \n as a heartbeat every 20 seconds.
::
+$ channel +$ channel
$: :: channel-state: expiration time or the duct currently listening $: :: channel-state: expiration time or the duct currently listening
:: ::
@ -246,6 +244,9 @@
:: can cancel all the subscriptions we've made. :: can cancel all the subscriptions we've made.
:: ::
subscriptions=(map wire [ship=@p app=term =path duc=duct]) subscriptions=(map wire [ship=@p app=term =path duc=duct])
:: heartbeat: sse heartbeat timer
::
heartbeat=(unit timer)
== ==
:: channel-request: an action requested on a channel :: channel-request: an action requested on a channel
:: ::
@ -1182,7 +1183,7 @@
%_ ..update-timeout-timer-for %_ ..update-timeout-timer-for
session.channel-state.state session.channel-state.state
%+ ~(put by session.channel-state.state) channel-id %+ ~(put by session.channel-state.state) channel-id
[[%& expiration-time duct] 0 ~ ~] [[%& expiration-time duct] 0 ~ ~ ~]
:: ::
moves moves
[(set-timeout-move channel-id expiration-time) moves] [(set-timeout-move channel-id expiration-time) moves]
@ -1206,6 +1207,18 @@
== ==
== ==
:: ::
++ set-heartbeat-move
|= [channel-id=@t heartbeat-time=@da]
^- move
:^ duct %pass /channel/heartbeat/[channel-id]
[%b %wait heartbeat-time]
::
++ cancel-heartbeat-move
|= [channel-id=@t heartbeat-time=@da =^duct]
^- move
:^ duct %pass /channel/heartbeat/[channel-id]
[%b %rest heartbeat-time]
::
++ set-timeout-move ++ set-timeout-move
|= [channel-id=@t expiration-time=@da] |= [channel-id=@t expiration-time=@da]
^- move ^- move
@ -1278,14 +1291,19 @@
:: ::
=. duct-to-key.channel-state.state =. duct-to-key.channel-state.state
(~(put by duct-to-key.channel-state.state) duct channel-id) (~(put by duct-to-key.channel-state.state) duct channel-id)
:: clear the event queue and record the duct for future output :: initialize sse heartbeat
::
=/ heartbeat-time=@da (add now ~s20)
=/ heartbeat (set-heartbeat-move channel-id heartbeat-time)
:: clear the event queue, record the duct for future output and
:: record heartbeat-time for possible future cancel
:: ::
=. session.channel-state.state =. session.channel-state.state
%+ ~(jab by session.channel-state.state) channel-id %+ ~(jab by session.channel-state.state) channel-id
|= =channel |= =channel
channel(events ~, state [%| duct]) channel(events ~, state [%| duct], heartbeat (some [heartbeat-time duct]))
:: ::
[(weld http-moves moves) state] [[heartbeat (weld http-moves moves)] state]
:: +acknowledge-events: removes events before :last-event-id on :channel-id :: +acknowledge-events: removes events before :last-event-id on :channel-id
:: ::
++ acknowledge-events ++ acknowledge-events
@ -1464,6 +1482,14 @@
=. duct-to-key.channel-state.state =. duct-to-key.channel-state.state
(~(del by duct-to-key.channel-state.state) p.state.session) (~(del by duct-to-key.channel-state.state) p.state.session)
:: ::
?~ heartbeat.session $(requests t.requests)
=. gall-moves
%+ snoc gall-moves
%^ cancel-heartbeat-move
channel-id
date.u.heartbeat.session
duct.u.heartbeat.session
::
$(requests t.requests) $(requests t.requests)
:: ::
== ==
@ -1582,6 +1608,27 @@
events (~(put to events.channel) [event-id event-stream-lines]) events (~(put to events.channel) [event-id event-stream-lines])
== ==
== ==
::
++ on-channel-heartbeat
|= channel-id=@t
^- [(list move) server-state]
::
=/ res
%- handle-response
:* %continue
data=(some (as-octs:mimes:html '\0a'))
complete=%.n
==
=/ http-moves -.res
=/ new-state +.res
=/ heartbeat-time=@da (add now ~s20)
:_ %_ new-state
session.channel-state
%+ ~(jab by session.channel-state.state) channel-id
|= =channel
channel(heartbeat (some [heartbeat-time duct]))
==
(snoc http-moves (set-heartbeat-move channel-id heartbeat-time))
:: +on-channel-timeout: we received a wake to clear an old session :: +on-channel-timeout: we received a wake to clear an old session
:: ::
++ on-channel-timeout ++ on-channel-timeout
@ -2158,6 +2205,13 @@
=^ moves server-state.ax =^ moves server-state.ax
(on-channel-timeout i.t.t.wire) (on-channel-timeout i.t.t.wire)
[moves http-server-gate] [moves http-server-gate]
::
%heartbeat
=/ on-channel-heartbeat
on-channel-heartbeat:by-channel:(per-server-event event-args)
=^ moves server-state.ax
(on-channel-heartbeat i.t.t.wire)
[moves http-server-gate]
:: ::
?(%poke %subscription) ?(%poke %subscription)
?> ?=([%g %unto *] sign) ?> ?=([%g %unto *] sign)
@ -2187,11 +2241,45 @@
:: +load: migrate old state to new state (called on vane reload) :: +load: migrate old state to new state (called on vane reload)
:: ::
++ load ++ load
|= old=axle => |%
+$ channel-old
$: state=(each timer duct)
next-id=@ud
events=(qeu [id=@ud lines=wall])
subscriptions=(map wire [ship=@p app=term =path duc=duct])
==
+$ channel-state-old
$: session=(map @t channel-old)
duct-to-key=(map duct @t)
==
++ axle-old
%+ cork
axle
|= =axle
axle(date %~2019.1.7, channel-state.server-state (channel-state-old))
--
|= old=$%(axle axle-old)
^+ ..^$ ^+ ..^$
:: ::
~! %loading ~! %loading
..^$(ax old) ?- -.old
%~2019.1.7
=/ add-heartbeat
%- ~(run by session.channel-state.server-state.old)
|= [c=channel-old]
^- channel
[state.c next-id.c events.c subscriptions.c ~]
::
=/ new
%= old
date %~2019.10.6
session.channel-state.server-state add-heartbeat
==
$(old new)
::
%~2019.10.6 ..^$(ax old)
==
:: +stay: produce current state :: +stay: produce current state
:: ::
++ stay `axle`ax ++ stay `axle`ax

View File

@ -991,6 +991,11 @@
^= expected-moves ^= expected-moves
^- (list move:http-server-gate) ^- (list move:http-server-gate)
:~ :* duct=~[/http-get-open] :~ :* duct=~[/http-get-open]
%pass
/channel/heartbeat/'0123456789abcdef'
[%b %wait ~1111.1.2..00.03.20]
==
:* duct=~[/http-get-open]
%give %give
%response %response
%start %start
@ -1370,6 +1375,11 @@
^= expected-moves ^= expected-moves
^- (list move:http-server-gate) ^- (list move:http-server-gate)
:~ :* duct=~[/http-get-open] :~ :* duct=~[/http-get-open]
%pass
/channel/heartbeat/'0123456789abcdef'
[%b %wait ~1111.1.2..00.03.20]
==
:* duct=~[/http-get-open]
%give %give
%response %response
%start %start
@ -1562,6 +1572,11 @@
^= expected-moves ^= expected-moves
^- (list move:http-server-gate) ^- (list move:http-server-gate)
:~ :* duct=~[/http-get-open] :~ :* duct=~[/http-get-open]
%pass
/channel/heartbeat/'0123456789abcdef'
[%b %wait ~1111.1.2..00.03.20]
==
:* duct=~[/http-get-open]
%give %give
%response %response
%start %start
@ -1712,6 +1727,11 @@
^= expected-moves ^= expected-moves
^- (list move:http-server-gate) ^- (list move:http-server-gate)
:~ :* duct=~[/http-get-open] :~ :* duct=~[/http-get-open]
%pass
/channel/heartbeat/'0123456789abcdef'
[%b %wait ~1111.1.2..00.08.20]
==
:* duct=~[/http-get-open]
%give %give
%response %response
%start %start