From a019c2079e4a6547e7e9fffab6a0859c5205724a Mon Sep 17 00:00:00 2001 From: pkova Date: Mon, 7 Oct 2019 02:26:03 +0300 Subject: [PATCH] eyre: add channel \n heartbeat every 20 seconds --- pkg/arvo/sys/vane/eyre.hoon | 108 +++++++++++++++++++++++++++--- pkg/arvo/tests/sys/vane/eyre.hoon | 20 ++++++ 2 files changed, 118 insertions(+), 10 deletions(-) diff --git a/pkg/arvo/sys/vane/eyre.hoon b/pkg/arvo/sys/vane/eyre.hoon index 5ae8577af..e4c1cd8b4 100755 --- a/pkg/arvo/sys/vane/eyre.hoon +++ b/pkg/arvo/sys/vane/eyre.hoon @@ -83,7 +83,7 @@ ++ axle $: :: date: date at which http-server's state was updated to this data structure :: - date=%~2019.1.7 + date=%~2019.10.6 :: server-state: state of inbound requests :: =server-state @@ -216,8 +216,6 @@ :: 'Last-Event-Id: ' header to the server; the server then resends all :: events since then. :: -:: TODO: Send \n as a heartbeat every 20 seconds. -:: +$ channel $: :: channel-state: expiration time or the duct currently listening :: @@ -246,6 +244,9 @@ :: can cancel all the subscriptions we've made. :: subscriptions=(map wire [ship=@p app=term =path duc=duct]) + :: heartbeat: sse heartbeat timer + :: + heartbeat=(unit timer) == :: channel-request: an action requested on a channel :: @@ -1182,7 +1183,7 @@ %_ ..update-timeout-timer-for session.channel-state.state %+ ~(put by session.channel-state.state) channel-id - [[%& expiration-time duct] 0 ~ ~] + [[%& expiration-time duct] 0 ~ ~ ~] :: moves [(set-timeout-move channel-id expiration-time) moves] @@ -1206,6 +1207,18 @@ == == :: + ++ set-heartbeat-move + |= [channel-id=@t heartbeat-time=@da] + ^- move + :^ duct %pass /channel/heartbeat/[channel-id] + [%b %wait heartbeat-time] + :: + ++ cancel-heartbeat-move + |= [channel-id=@t heartbeat-time=@da =^duct] + ^- move + :^ duct %pass /channel/heartbeat/[channel-id] + [%b %rest heartbeat-time] + :: ++ set-timeout-move |= [channel-id=@t expiration-time=@da] ^- move @@ -1278,14 +1291,19 @@ :: =. duct-to-key.channel-state.state (~(put by duct-to-key.channel-state.state) duct channel-id) - :: clear the event queue and record the duct for future output + :: initialize sse heartbeat + :: + =/ heartbeat-time=@da (add now ~s20) + =/ heartbeat (set-heartbeat-move channel-id heartbeat-time) + :: clear the event queue, record the duct for future output and + :: record heartbeat-time for possible future cancel :: =. session.channel-state.state %+ ~(jab by session.channel-state.state) channel-id |= =channel - channel(events ~, state [%| duct]) + channel(events ~, state [%| duct], heartbeat (some [heartbeat-time duct])) :: - [(weld http-moves moves) state] + [[heartbeat (weld http-moves moves)] state] :: +acknowledge-events: removes events before :last-event-id on :channel-id :: ++ acknowledge-events @@ -1464,6 +1482,14 @@ =. duct-to-key.channel-state.state (~(del by duct-to-key.channel-state.state) p.state.session) :: + ?~ heartbeat.session $(requests t.requests) + =. gall-moves + %+ snoc gall-moves + %^ cancel-heartbeat-move + channel-id + date.u.heartbeat.session + duct.u.heartbeat.session + :: $(requests t.requests) :: == @@ -1582,7 +1608,28 @@ events (~(put to events.channel) [event-id event-stream-lines]) == == - :: +on-channel-timeout: we received a wake to clear an old session + :: + ++ on-channel-heartbeat + |= channel-id=@t + ^- [(list move) server-state] + :: + =/ res + %- handle-response + :* %continue + data=(some (as-octs:mimes:html '\0a')) + complete=%.n + == + =/ http-moves -.res + =/ new-state +.res + =/ heartbeat-time=@da (add now ~s20) + :_ %_ new-state + session.channel-state + %+ ~(jab by session.channel-state.state) channel-id + |= =channel + channel(heartbeat (some [heartbeat-time duct])) + == + (snoc http-moves (set-heartbeat-move channel-id heartbeat-time)) + :: +on-channel-timeout: we received a wake to clear an old session :: ++ on-channel-timeout |= channel-id=@t @@ -2158,6 +2205,13 @@ =^ moves server-state.ax (on-channel-timeout i.t.t.wire) [moves http-server-gate] + :: + %heartbeat + =/ on-channel-heartbeat + on-channel-heartbeat:by-channel:(per-server-event event-args) + =^ moves server-state.ax + (on-channel-heartbeat i.t.t.wire) + [moves http-server-gate] :: ?(%poke %subscription) ?> ?=([%g %unto *] sign) @@ -2187,11 +2241,45 @@ :: +load: migrate old state to new state (called on vane reload) :: ++ load - |= old=axle + => |% + +$ channel-old + $: state=(each timer duct) + next-id=@ud + events=(qeu [id=@ud lines=wall]) + subscriptions=(map wire [ship=@p app=term =path duc=duct]) + == + +$ channel-state-old + $: session=(map @t channel-old) + duct-to-key=(map duct @t) + == + ++ axle-old + %+ cork + axle + |= =axle + axle(date %~2019.1.7, channel-state.server-state (channel-state-old)) + -- + |= old=$%(axle axle-old) ^+ ..^$ :: ~! %loading - ..^$(ax old) + ?- -.old + %~2019.1.7 + =/ add-heartbeat + %- ~(run by session.channel-state.server-state.old) + |= [c=channel-old] + ^- channel + [state.c next-id.c events.c subscriptions.c ~] + :: + =/ new + %= old + date %~2019.10.6 + session.channel-state.server-state add-heartbeat + == + $(old new) + :: + %~2019.10.6 ..^$(ax old) + == + :: +stay: produce current state :: ++ stay `axle`ax diff --git a/pkg/arvo/tests/sys/vane/eyre.hoon b/pkg/arvo/tests/sys/vane/eyre.hoon index aee11eba7..0e61f2325 100644 --- a/pkg/arvo/tests/sys/vane/eyre.hoon +++ b/pkg/arvo/tests/sys/vane/eyre.hoon @@ -991,6 +991,11 @@ ^= expected-moves ^- (list move:http-server-gate) :~ :* duct=~[/http-get-open] + %pass + /channel/heartbeat/'0123456789abcdef' + [%b %wait ~1111.1.2..00.03.20] + == + :* duct=~[/http-get-open] %give %response %start @@ -1370,6 +1375,11 @@ ^= expected-moves ^- (list move:http-server-gate) :~ :* duct=~[/http-get-open] + %pass + /channel/heartbeat/'0123456789abcdef' + [%b %wait ~1111.1.2..00.03.20] + == + :* duct=~[/http-get-open] %give %response %start @@ -1562,6 +1572,11 @@ ^= expected-moves ^- (list move:http-server-gate) :~ :* duct=~[/http-get-open] + %pass + /channel/heartbeat/'0123456789abcdef' + [%b %wait ~1111.1.2..00.03.20] + == + :* duct=~[/http-get-open] %give %response %start @@ -1712,6 +1727,11 @@ ^= expected-moves ^- (list move:http-server-gate) :~ :* duct=~[/http-get-open] + %pass + /channel/heartbeat/'0123456789abcdef' + [%b %wait ~1111.1.2..00.08.20] + == + :* duct=~[/http-get-open] %give %response %start