Merge pull request #3746 from urbit/m/eyre-memes

eyre: reduce memory usage of unacked channel events
This commit is contained in:
fang 2020-10-20 19:47:12 +02:00 committed by GitHub
commit 23d26ce64d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 199 additions and 99 deletions

View File

@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:77960d9e407294ee8d7fba51b330f9b846fd072501fa24fc2538b9fa9d75d13e
size 19008671
oid sha256:0f86fb4d082a41728b4b17a4c09dd0b718d3a83c2d20598fe2d5886742c330df
size 6309399

View File

@ -69,7 +69,7 @@
++ axle
$: :: date: date at which http-server's state was updated to this data structure
::
date=%~2020.9.30
date=%~2020.10.18
:: server-state: state of inbound requests
::
=server-state
@ -153,14 +153,14 @@
:: +prune-events: removes all items from the front of the queue up to :id
::
++ prune-events
|= [q=(qeu [id=@ud lines=wall]) id=@ud]
|= [q=(qeu [id=@ud @ud channel-event]) id=@ud]
^+ q
:: if the queue is now empty, that's fine
::
?: =(~ q)
~
::
=/ next=[item=[id=@ud lines=wall] _q] ~(get to q)
=/ next=[item=[id=@ud @ud channel-event] _q] ~(get to q)
:: if the head of the queue is newer than the acknowledged id, we're done
::
?: (gth id.item.next id)
@ -1213,7 +1213,10 @@
?: =(~ queue)
events
=^ head queue ~(get to queue)
$(events [lines.p.head events])
=, p.head
?~ sign=(channel-event-to-sign channel-event) $
?~ json=(sign-to-json request-id u.sign) $
$(events [(event-json-to-wall id u.json) events])
:: send the start event to the client
::
=^ http-moves state
@ -1359,7 +1362,7 @@
:^ duct %pass
(subscription-wire channel-id request-id ship app)
:* %g %deal [our ship] app
`task:agent:gall`[%watch-as %json path]
`task:agent:gall`[%watch path]
==
::
=. session.channel-state.state
@ -1410,7 +1413,7 @@
$(requests t.requests)
::
==
:: +on-gall-response: turns a gall response into an event
:: +on-gall-response: sanity-check a gall response, send as event
::
++ on-gall-response
|= [channel-id=@t request-id=@ud extra=wire =sign:agent:gall]
@ -1422,73 +1425,23 @@
:: until the source of that bug is discovered though, we keep this
:: in place to ensure a slightly tidier home.
::
?: ?& !(~(has by session.channel-state.state) channel-id)
?. ?& !(~(has by session.channel-state.state) channel-id)
?=(?(%fact %watch-ack) -.sign)
?=([@ @ ~] extra)
==
=/ =ship (slav %p i.extra)
=* app=term i.t.extra
=/ =tape
%+ weld "eyre: removing watch for "
"non-existent channel {(trip channel-id)} on {(trip app)}"
%- (slog leaf+tape ~)
:_ state
:_ ~
^- move
:^ duct %pass
(subscription-wire channel-id request-id ship app)
[%g %deal [our ship] app `task:agent:gall`[%leave ~]]
::
?- -.sign
%poke-ack
=/ =json
=, enjs:format
%- pairs :~
['response' [%s 'poke']]
['id' (numb request-id)]
?~ p.sign
['ok' [%s 'ok']]
['err' (wall (render-tang-to-wall 100 u.p.sign))]
==
::
(emit-event channel-id [(en-json:html json)]~)
::
%fact
=/ =json
=, enjs:format
%- pairs :~
['response' [%s 'diff']]
['id' (numb request-id)]
:- 'json'
?> =(%json p.cage.sign)
;;(json q.q.cage.sign)
==
::
(emit-event channel-id [(en-json:html json)]~)
::
%kick
=/ =json
=, enjs:format
%- pairs :~
['response' [%s 'quit']]
['id' (numb request-id)]
==
::
(emit-event channel-id [(en-json:html json)]~)
::
%watch-ack
=/ =json
=, enjs:format
%- pairs :~
['response' [%s 'subscribe']]
['id' (numb request-id)]
?~ p.sign
['ok' [%s 'ok']]
['err' (wall (render-tang-to-wall 100 u.p.sign))]
==
::
(emit-event channel-id [(en-json:html json)]~)
==
(emit-event channel-id request-id sign)
=/ =ship (slav %p i.extra)
=* app=term i.t.extra
=/ =tape
%+ weld "eyre: removing watch for "
"non-existent channel {(trip channel-id)} on {(trip app)}"
%- (slog leaf+tape ~)
:_ state
:_ ~
^- move
:^ duct %pass
(subscription-wire channel-id request-id ship app)
[%g %deal [our ship] app `task:agent:gall`[%leave ~]]
:: +emit-event: records an event occurred, possibly sending to client
::
:: When an event occurs, we need to record it, even if we immediately
@ -1501,7 +1454,7 @@
:: acknowledged by the client.
::
++ emit-event
|= [channel-id=@t json-text=wall]
|= [channel-id=@t request-id=@ud =sign:agent:gall]
^- [(list move) server-state]
::
=/ channel=(unit channel)
@ -1511,30 +1464,27 @@
[duct %pass /flog %d %flog %crud %eyre-no-channel >id=channel-id< ~]
::
=/ event-id next-id.u.channel
::
=/ event-stream-lines=wall
%- weld :_ [""]~
:- (weld "id: " (format-ud-as-integer event-id))
%+ turn json-text
|= =tape
(weld "data: " tape)
:: if a client is connected, send this event to them.
::
=? moves ?=([%| *] state.u.channel)
^- (list move)
?~ json=(sign-to-json request-id sign)
moves
:_ moves
:+ p.state.u.channel %give
^- gift:able
:* %response %continue
::
^= data
:- ~
%- as-octs:mimes:html
(crip (of-wall:format event-stream-lines))
%- wall-to-octs
(event-json-to-wall event-id u.json)
::
complete=%.n
==
::
=/ =channel-event
?. ?=(%fact -.sign) sign
[%fact [p q.q]:cage.sign]
:- moves
%_ state
session.channel-state
@ -1544,9 +1494,98 @@
::
%_ channel
next-id +(next-id.channel)
events (~(put to events.channel) [event-id event-stream-lines])
events (~(put to events.channel) [event-id request-id channel-event])
==
==
:: +channel-event-to-sign: attempt to recover a sign from a channel-event
::
++ channel-event-to-sign
|= event=channel-event
^- (unit sign:agent:gall)
?. ?=(%fact -.event) `event
:: rebuild vase for fact data
::
=* have=mark mark.event
=/ val=(unit (unit cage))
(scry [%141 %noun] ~ %cb [our %home da+now] /[have])
?. ?=([~ ~ *] val)
((slog leaf+"eyre: no mark {(trip have)}" ~) ~)
=+ !<(=dais:clay q.u.u.val)
=/ res (mule |.((vale:dais noun.event)))
?: ?=(%| -.res)
((slog leaf+"eyre: stale fact of mark {(trip have)}" ~) ~)
`[%fact have p.res]
:: +sign-to-json: render sign from request-id as json channel event
::
++ sign-to-json
|= [request-id=@ud =sign:agent:gall]
^- (unit json)
:: for facts, we try to convert the result to json
::
=/ jsyn=(unit sign:agent:gall)
?. ?=(%fact -.sign) `sign
?: ?=(%json p.cage.sign) `sign
:: find and use tube from fact mark to json
::
=* have=mark p.cage.sign
=* desc=tape "from {(trip have)} to json"
=/ tube=(unit tube:clay)
=/ tuc=(unit (unit cage))
(scry [%141 %noun] ~ %cc [our %home da+now] (flop /[have]/json))
?. ?=([~ ~ *] tuc) ~
`!<(tube:clay q.u.u.tuc)
?~ tube
((slog leaf+"eyre: no tube {desc}" ~) ~)
::
=/ res (mule |.((u.tube q.cage.sign)))
?: ?=(%& -.res)
`[%fact %json p.res]
((slog leaf+"eyre: failed tube {desc}" ~) ~)
::
?~ jsyn ~
%- some
=* sign u.jsyn
=, enjs:format
%- pairs
^- (list [@t json])
:- ['id' (numb request-id)]
?- -.sign
%poke-ack
:~ ['response' [%s 'poke']]
::
?~ p.sign
['ok' [%s 'ok']]
['err' (wall (render-tang-to-wall 100 u.p.sign))]
==
::
%fact
:~ ['response' [%s 'diff']]
::
:- 'json'
~| [%unexpected-fact-mark p.cage.sign]
?> =(%json p.cage.sign)
;;(json q.q.cage.sign)
==
::
%kick
['response' [%s 'quit']]~
::
%watch-ack
:~ ['response' [%s 'subscribe']]
::
?~ p.sign
['ok' [%s 'ok']]
['err' (wall (render-tang-to-wall 100 u.p.sign))]
==
==
::
++ event-json-to-wall
|= [event-id=@ud =json]
^- wall
:~ (weld "id: " (format-ud-as-integer event-id))
(weld "data: " (en-json:html json))
""
==
::
++ on-channel-heartbeat
|= channel-id=@t
@ -2339,6 +2378,34 @@
::
++ load
=> |%
+$ axle-2020-9-30
[date=%~2020.9.30 server-state=server-state-2020-9-30]
::
+$ server-state-2020-9-30
$: bindings=(list [=binding =duct =action])
=cors-registry
connections=(map duct outstanding-connection)
=authentication-state
channel-state=channel-state-2020-9-30
domains=(set turf)
=http-config
ports=[insecure=@ud secure=(unit @ud)]
outgoing-duct=duct
==
::
+$ channel-state-2020-9-30
$: session=(map @t channel-2020-9-30)
duct-to-key=(map duct @t)
==
::
+$ channel-2020-9-30
$: state=(each timer duct)
next-id=@ud
events=(qeu [id=@ud lines=wall])
subscriptions=(map wire [ship=@p app=term =path duc=duct])
heartbeat=(unit timer)
==
::
+$ axle-2020-5-29
[date=%~2020.5.29 server-state=server-state-2020-5-29]
::
@ -2346,7 +2413,7 @@
$: bindings=(list [=binding =duct =action])
connections=(map duct outstanding-connection)
=authentication-state
=channel-state
channel-state=channel-state-2020-9-30
domains=(set turf)
=http-config
ports=[insecure=@ud secure=(unit @ud)]
@ -2359,19 +2426,31 @@
$: bindings=(list [=binding =duct =action])
connections=(map duct outstanding-connection)
authentication-state=sessions=(map @uv @da)
=channel-state
channel-state=channel-state-2020-9-30
domains=(set turf)
=http-config
ports=[insecure=@ud secure=(unit @ud)]
outgoing-duct=duct
==
--
|= old=$%(axle axle-2019-10-6 axle-2020-5-29)
|= old=$%(axle axle-2019-10-6 axle-2020-5-29 axle-2020-9-30)
^+ ..^$
::
~! %loading
?- -.old
%~2020.9.30 ..^$(ax old)
%~2020.10.18 ..^$(ax old)
::
%~2020.9.30
%_ $
date.old %~2020.10.18
::
::NOTE soft-breaks the reconnect case, but is generally less disruptive
:: than wiping channels entirely.
session.channel-state.server-state.old
%- ~(run by session.channel-state.server-state.old)
|= =channel-2020-9-30
channel-2020-9-30(events *(qeu [@ud @ud channel-event]))
==
::
%~2020.5.29
%_ $

View File

@ -1358,6 +1358,14 @@
::
=duct
==
:: channel-event: unacknowledged channel event, vaseless sign
::
+$ channel-event
$% $>(%poke-ack sign:agent:gall)
$>(%watch-ack sign:agent:gall)
$>(%kick sign:agent:gall)
[%fact =mark =noun]
==
:: channel: connection to the browser
::
:: Channels are the main method where a webpage communicates with Gall
@ -1392,7 +1400,7 @@
:: channel, we send the event but we still add it to events because we
:: can't assume it got received until we get an acknowledgment.
::
events=(qeu [id=@ud lines=wall])
events=(qeu [id=@ud request-id=@ud =channel-event])
:: subscriptions: gall subscriptions
::
:: We maintain a list of subscriptions so if a channel times out, we

View File

@ -1308,7 +1308,7 @@
%+ expect-gall-deal
:* /channel/subscription/'0123456789abcdef'/'2'/~nul/two
[~nul ~nul] %two
%watch-as %json /one/two/three
%watch /one/two/three
==
card.i.moves
::
@ -1516,16 +1516,16 @@
==
::
++ test-prune-events
=/ q=(qeu [id=@ud lines=wall]) ~
=. q (~(put to q) [0 ~])
=. q (~(put to q) [1 ~])
=. q (~(put to q) [2 ~])
=. q (~(put to q) [3 ~])
=. q (~(put to q) [4 ~])
=/ q=(qeu [id=@ud @ud channel-event:eyre]) ~
=. q (~(put to q) [0 *@ud *channel-event:eyre])
=. q (~(put to q) [1 *@ud *channel-event:eyre])
=. q (~(put to q) [2 *@ud *channel-event:eyre])
=. q (~(put to q) [3 *@ud *channel-event:eyre])
=. q (~(put to q) [4 *@ud *channel-event:eyre])
::
=. q (prune-events:eyre-gate q 3)
::
(expect-eq !>([~ [4 ~]]) !>(~(top to q)))
(expect-eq !>([~ [4 *@ud *channel-event:eyre]]) !>(~(top to q)))
::
++ test-channel-sends-unacknowledged-events-on-reconnection
:: common initialization
@ -2027,7 +2027,7 @@
::
?: ?=([%watch *] deal.expected)
?. ?=([%watch *] r.note)
[%leaf "expected %watch-as, actual {<r.note>}"]~
[%leaf "expected %watch, actual {<r.note>}"]~
:: compare the path
::
(expect-eq !>(path.deal.expected) !>(path.r.note))
@ -2201,7 +2201,7 @@
%+ expect-gall-deal
:* /channel/subscription/'0123456789abcdef'/'1'/~nul/two
[~nul ~nul] %two
%watch-as %json /one/two/three
%watch /one/two/three
==
card.i.t.moves
::
@ -2226,6 +2226,19 @@
?: &(=(%ca term) =(/hoon/handler/gen s.beam))
:+ ~ ~
vase+!>(!>(|=(* |=(* [[%404 ~] ~]))))
?: &(=(%cb term) =(/json s.beam))
:^ ~ ~ %dais
!> ^- dais:clay
|_ sam=vase
++ bunt !!
++ diff !!
++ form !!
++ join !!
++ mash !!
++ pact !!
++ vale |=(=noun !>(;;(json noun)))
++ volt !!
--
::
?> =(%j term)
?> =(~nul p.beam)