Merge branch 'pkova/master' (#1687)

* pkova-master:
  pills: update solid (#1)
  eyre: add channel \n heartbeat every 20 seconds

Signed-off-by: Jared Tobin <jared@tlon.io>
This commit is contained in:
Jared Tobin 2019-10-07 21:44:00 +04:00
commit cd9cd68fe7
No known key found for this signature in database
GPG Key ID: 0E4647D58F8A69E4
3 changed files with 120 additions and 12 deletions

View File

@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:32b97380977dffc356523d3fc769d14f86c972e195e9ae1feae4707f0a5dd037
size 16381088
oid sha256:00546a20f2e1a2f0cc51f6b11f6ac429b6f29ea54270b806649659a4bc33cad4
size 16465119

View File

@ -83,7 +83,7 @@
++ axle
$: :: date: date at which http-server's state was updated to this data structure
::
date=%~2019.1.7
date=%~2019.10.6
:: server-state: state of inbound requests
::
=server-state
@ -216,8 +216,6 @@
:: 'Last-Event-Id: ' header to the server; the server then resends all
:: events since then.
::
:: TODO: Send \n as a heartbeat every 20 seconds.
::
+$ channel
$: :: channel-state: expiration time or the duct currently listening
::
@ -246,6 +244,9 @@
:: can cancel all the subscriptions we've made.
::
subscriptions=(map wire [ship=@p app=term =path duc=duct])
:: heartbeat: sse heartbeat timer
::
heartbeat=(unit timer)
==
:: channel-request: an action requested on a channel
::
@ -1182,7 +1183,7 @@
%_ ..update-timeout-timer-for
session.channel-state.state
%+ ~(put by session.channel-state.state) channel-id
[[%& expiration-time duct] 0 ~ ~]
[[%& expiration-time duct] 0 ~ ~ ~]
::
moves
[(set-timeout-move channel-id expiration-time) moves]
@ -1206,6 +1207,18 @@
==
==
::
++ set-heartbeat-move
|= [channel-id=@t heartbeat-time=@da]
^- move
:^ duct %pass /channel/heartbeat/[channel-id]
[%b %wait heartbeat-time]
::
++ cancel-heartbeat-move
|= [channel-id=@t heartbeat-time=@da =^duct]
^- move
:^ duct %pass /channel/heartbeat/[channel-id]
[%b %rest heartbeat-time]
::
++ set-timeout-move
|= [channel-id=@t expiration-time=@da]
^- move
@ -1278,14 +1291,19 @@
::
=. duct-to-key.channel-state.state
(~(put by duct-to-key.channel-state.state) duct channel-id)
:: clear the event queue and record the duct for future output
:: initialize sse heartbeat
::
=/ heartbeat-time=@da (add now ~s20)
=/ heartbeat (set-heartbeat-move channel-id heartbeat-time)
:: clear the event queue, record the duct for future output and
:: record heartbeat-time for possible future cancel
::
=. session.channel-state.state
%+ ~(jab by session.channel-state.state) channel-id
|= =channel
channel(events ~, state [%| duct])
channel(events ~, state [%| duct], heartbeat (some [heartbeat-time duct]))
::
[(weld http-moves moves) state]
[[heartbeat (weld http-moves moves)] state]
:: +acknowledge-events: removes events before :last-event-id on :channel-id
::
++ acknowledge-events
@ -1464,6 +1482,14 @@
=. duct-to-key.channel-state.state
(~(del by duct-to-key.channel-state.state) p.state.session)
::
?~ heartbeat.session $(requests t.requests)
=. gall-moves
%+ snoc gall-moves
%^ cancel-heartbeat-move
channel-id
date.u.heartbeat.session
duct.u.heartbeat.session
::
$(requests t.requests)
::
==
@ -1582,7 +1608,28 @@
events (~(put to events.channel) [event-id event-stream-lines])
==
==
:: +on-channel-timeout: we received a wake to clear an old session
::
++ on-channel-heartbeat
|= channel-id=@t
^- [(list move) server-state]
::
=/ res
%- handle-response
:* %continue
data=(some (as-octs:mimes:html '\0a'))
complete=%.n
==
=/ http-moves -.res
=/ new-state +.res
=/ heartbeat-time=@da (add now ~s20)
:_ %_ new-state
session.channel-state
%+ ~(jab by session.channel-state.state) channel-id
|= =channel
channel(heartbeat (some [heartbeat-time duct]))
==
(snoc http-moves (set-heartbeat-move channel-id heartbeat-time))
:: +on-channel-timeout: we received a wake to clear an old session
::
++ on-channel-timeout
|= channel-id=@t
@ -2158,6 +2205,13 @@
=^ moves server-state.ax
(on-channel-timeout i.t.t.wire)
[moves http-server-gate]
::
%heartbeat
=/ on-channel-heartbeat
on-channel-heartbeat:by-channel:(per-server-event event-args)
=^ moves server-state.ax
(on-channel-heartbeat i.t.t.wire)
[moves http-server-gate]
::
?(%poke %subscription)
?> ?=([%g %unto *] sign)
@ -2187,11 +2241,45 @@
:: +load: migrate old state to new state (called on vane reload)
::
++ load
|= old=axle
=> |%
+$ channel-old
$: state=(each timer duct)
next-id=@ud
events=(qeu [id=@ud lines=wall])
subscriptions=(map wire [ship=@p app=term =path duc=duct])
==
+$ channel-state-old
$: session=(map @t channel-old)
duct-to-key=(map duct @t)
==
++ axle-old
%+ cork
axle
|= =axle
axle(date %~2019.1.7, channel-state.server-state (channel-state-old))
--
|= old=$%(axle axle-old)
^+ ..^$
::
~! %loading
..^$(ax old)
?- -.old
%~2019.1.7
=/ add-heartbeat
%- ~(run by session.channel-state.server-state.old)
|= [c=channel-old]
^- channel
[state.c next-id.c events.c subscriptions.c ~]
::
=/ new
%= old
date %~2019.10.6
session.channel-state.server-state add-heartbeat
==
$(old new)
::
%~2019.10.6 ..^$(ax old)
==
:: +stay: produce current state
::
++ stay `axle`ax

View File

@ -991,6 +991,11 @@
^= expected-moves
^- (list move:http-server-gate)
:~ :* duct=~[/http-get-open]
%pass
/channel/heartbeat/'0123456789abcdef'
[%b %wait ~1111.1.2..00.03.20]
==
:* duct=~[/http-get-open]
%give
%response
%start
@ -1370,6 +1375,11 @@
^= expected-moves
^- (list move:http-server-gate)
:~ :* duct=~[/http-get-open]
%pass
/channel/heartbeat/'0123456789abcdef'
[%b %wait ~1111.1.2..00.03.20]
==
:* duct=~[/http-get-open]
%give
%response
%start
@ -1562,6 +1572,11 @@
^= expected-moves
^- (list move:http-server-gate)
:~ :* duct=~[/http-get-open]
%pass
/channel/heartbeat/'0123456789abcdef'
[%b %wait ~1111.1.2..00.03.20]
==
:* duct=~[/http-get-open]
%give
%response
%start
@ -1712,6 +1727,11 @@
^= expected-moves
^- (list move:http-server-gate)
:~ :* duct=~[/http-get-open]
%pass
/channel/heartbeat/'0123456789abcdef'
[%b %wait ~1111.1.2..00.08.20]
==
:* duct=~[/http-get-open]
%give
%response
%start