eyre: kick busy subscriptions if client not acking

In order to curb event queue growth when a client for whatever reason
isn't acking the events we send out, we implement a mechanism for
detecting such "clogging", and proactively kick subscriptions which are
adding too many events to the queue.

If the client hasn't sent an ack for ~s30, any subscription that accrues
more than 50 unacked %facts gets closed to prevent further buildup.

Upon reconnecting, the client will see %kick for the relevant
subscriptions and can open a new subscription as appropriate.

Includes a simple test for this behavior, and updates /app/dbug to be
able to display the newly tracked statistics.
This commit is contained in:
fang 2020-10-19 15:56:05 +02:00
parent 63b4fb3e19
commit 1d4ee5a7b5
No known key found for this signature in database
GPG Key ID: EB035760C1BBA972
5 changed files with 271 additions and 42 deletions

View File

@ -380,6 +380,7 @@
'connected'^b+!-.state
'expiry'^?-(-.state %& (time date.p.state), %| ~)
'next-id'^(numb next-id)
'last-ack'^(time last-ack)
'unacked'^a+(turn (sort (turn ~(tap in events) head) dor) numb)
::
:- 'subscriptions'
@ -391,6 +392,7 @@
'ship'^(^ship ship)
'app'^s+app
'path'^(^path path)
'unacked'^(numb (~(gut by unacked) id 0))
==
==
==

View File

@ -131,6 +131,12 @@
::
[%delete ~]
==
:: clog-timeout: the delay between acks after which clog-threshold kicks in
::
++ clog-timeout ~s30
:: clog-threshold: maximum per-subscription event buildup, after clog-timeout
::
++ clog-threshold 50
:: channel-timeout: the delay before a channel should be reaped
::
++ channel-timeout ~h12
@ -152,22 +158,45 @@
(can 3 a)
:: +prune-events: removes all items from the front of the queue up to :id
::
:: also produces, per request-id, the amount of events that have got acked,
:: for use with +subtract-acked-events.
::
++ prune-events
=| acked=(map @ud @ud)
|= [q=(qeu [id=@ud @ud channel-event]) id=@ud]
^+ q
^+ [acked q]
:: if the queue is now empty, that's fine
::
?: =(~ q)
~
[acked ~]
::
=/ next=[item=[id=@ud @ud channel-event] _q] ~(get to q)
=/ next=[item=[id=@ud request-id=@ud channel-event] _q] ~(get to q)
:: if the head of the queue is newer than the acknowledged id, we're done
::
?: (gth id.item.next id)
q
:: otherwise, check next item
[acked q]
:: otherwise, note the ack, and check next item
::
$(q +:next)
%_ $
q +:next
::
acked
=, item.next
%+ ~(put by acked) request-id
+((~(gut by acked) request-id 0))
==
:: +subtract-acked-events: update the subscription map's pending ack counts
::
++ subtract-acked-events
|= [acked=(map @ud @ud) unacked=(map @ud @ud)]
^+ unacked
%+ roll ~(tap by acked)
|= [[rid=@ud ack=@ud] unacked=_unacked]
?~ sus=(~(get by unacked) rid)
unacked
%+ ~(put by unacked) rid
?: (lte u.sus ack) 0
(sub u.sus ack)
:: +parse-channel-request: parses a list of channel-requests
::
:: Parses a json array into a list of +channel-request. If any of the items
@ -1123,7 +1152,7 @@
%_ ..update-timeout-timer-for
session.channel-state.state
%+ ~(put by session.channel-state.state) channel-id
[[%& expiration-time duct] 0 ~ ~ ~]
[[%& expiration-time duct] 0 now ~ ~ ~ ~]
::
moves
[(set-timeout-move channel-id expiration-time) moves]
@ -1267,7 +1296,11 @@
%+ ~(jab by session.channel-state.state) channel-id
|= =channel
^+ channel
channel(events (prune-events events.channel last-event-id))
=^ acked events.channel
(prune-events events.channel last-event-id)
=. unacked.channel
(subtract-acked-events acked unacked.channel)
channel(last-ack now)
==
:: +on-put-request: handles a PUT request
::
@ -1401,9 +1434,10 @@
=. session.channel-state.state
%+ ~(jab by session.channel-state.state) channel-id
|= =channel
=- channel(subscriptions -)
%- ~(del by subscriptions.channel)
subscription-id
%_ channel
subscriptions (~(del by subscriptions.channel) subscription-id)
unacked (~(del by unacked.channel) subscription-id)
==
::
$(requests t.requests)
::
@ -1450,10 +1484,9 @@
:: send it to a connected browser so in case of disconnection, we can
:: resend it.
::
:: This function is responsible for taking the raw json lines and
:: converting them into a text/event-stream. The :event-stream-lines
:: then may get sent, and are stored for later resending until
:: acknowledged by the client.
:: This function is responsible for taking the event sign and converting
:: it into a text/event-stream. The :sign then may get sent, and is
:: stored for later resending until acknowledged by the client.
::
++ emit-event
|= [channel-id=@t request-id=@ud =sign:agent:gall]
@ -1466,6 +1499,11 @@
[duct %pass /flog %d %flog %crud %eyre-no-channel >id=channel-id< ~]
::
=/ event-id next-id.u.channel
:: store the event as unacked
::
=. events.u.channel
%- ~(put to events.u.channel)
[event-id request-id (sign-to-channel-event sign)]
:: if a client is connected, send this event to them.
::
=? moves ?=([%| *] state.u.channel)
@ -1483,22 +1521,65 @@
::
complete=%.n
==
:: update channel's unacked counts, find out if clogged
::
=^ clogged unacked.u.channel
:: poke-acks are one-offs, don't apply clog logic to them
::
?: ?=(%poke-ack -.sign) [| unacked.u.channel]
=/ num=@ud
(~(gut by unacked.u.channel) request-id 0)
:_ (~(put by unacked.u.channel) request-id +(num))
?& (gte num clog-threshold)
(lth (add last-ack.u.channel clog-timeout) now)
==
::
~? clogged [%e %clogged channel-id request-id]
:: if we're clogged, end this gall subscription
::
=? moves clogged
:_ moves
=+ (~(got by subscriptions.u.channel) request-id)
:^ duct %pass
(subscription-wire channel-id request-id ship app)
[%g %deal [our ship] app %leave ~]
=? event-id clogged +(event-id)
=? u.channel clogged
%_ u.channel
subscriptions (~(del by subscriptions.u.channel) request-id)
unacked (~(del by unacked.u.channel) request-id)
events %- ~(put to events.u.channel)
[event-id request-id (sign-to-channel-event %kick ~)]
==
:: if a client is connected, send the kick event to them
::
=? moves &(clogged ?=([%| *] state.u.channel))
:_ moves
:+ p.state.u.channel %give
^- gift:able
:* %response %continue
::
^= data
%- wall-to-octs
%+ event-json-to-wall event-id
(need (sign-to-json request-id %kick ~))
::
complete=%.n
==
::
=/ =channel-event
?. ?=(%fact -.sign) sign
[%fact [p q.q]:cage.sign]
:- moves
%_ state
session.channel-state
%+ ~(jab by session.channel-state.state) channel-id
|= =^channel
^+ channel
::
%_ channel
next-id +(next-id.channel)
events (~(put to events.channel) [event-id request-id channel-event])
==
%+ ~(put by session.channel-state.state) channel-id
u.channel(next-id +(event-id))
==
:: +sign-to-channel-event: strip the vase from a sign:agent:gall
::
++ sign-to-channel-event
|= =sign:agent:gall
^- channel-event
?. ?=(%fact -.sign) sign
[%fact [p q.q]:cage.sign]
:: +channel-event-to-sign: attempt to recover a sign from a channel-event
::
++ channel-event-to-sign
@ -2450,16 +2531,17 @@
:: than wiping channels entirely.
session.channel-state.server-state.old
%- ~(run by session.channel-state.server-state.old)
|= old-channel=channel-2020-9-30
|= channel-2020-9-30
^- channel
%= old-channel
events *(qeu [@ud @ud channel-event])
::
subscriptions
=/ subscriptions
%- ~(gas by *(map @ud [@p term path duct]))
%+ turn ~(tap by subscriptions.old-channel)
%+ turn ~(tap by subscriptions)
|= [=wire rest=[@p term path duct]]
[(slav %ud (snag 3 wire)) rest]
:* state next-id now
*(qeu [@ud @ud channel-event])
*(map @ud @ud)
subscriptions heartbeat
==
==
::

View File

@ -1392,6 +1392,11 @@
:: next-id: next sequence number to use
::
next-id=@ud
:: last-ack: time of last client ack
::
:: used for clog calculations, in combination with :unacked
::
last-ack=@da
:: events: unacknowledged events
::
:: We keep track of all events where we haven't received a
@ -1401,6 +1406,11 @@
:: can't assume it got received until we get an acknowledgment.
::
events=(qeu [id=@ud request-id=@ud =channel-event])
:: unacked: unacknowledged event counts by request-id
::
:: used for clog calculations, in combination with :last-ack
::
unacked=(map @ud @ud)
:: subscriptions: gall subscriptions by request-id
::
:: We maintain a list of subscriptions so if a channel times out, we

View File

@ -1517,15 +1517,26 @@
::
++ test-prune-events
=/ q=(qeu [id=@ud @ud channel-event:eyre]) ~
=. q (~(put to q) [0 *@ud *channel-event:eyre])
=. q (~(put to q) [1 *@ud *channel-event:eyre])
=. q (~(put to q) [2 *@ud *channel-event:eyre])
=. q (~(put to q) [3 *@ud *channel-event:eyre])
=. q (~(put to q) [4 *@ud *channel-event:eyre])
=. q (~(put to q) [0 0 *channel-event:eyre])
=. q (~(put to q) [1 0 *channel-event:eyre])
=. q (~(put to q) [2 0 *channel-event:eyre])
=. q (~(put to q) [3 1 *channel-event:eyre])
=. q (~(put to q) [4 1 *channel-event:eyre])
::
=. q (prune-events:eyre-gate q 3)
=^ a q (prune-events:eyre-gate q 3)
::
(expect-eq !>([~ [4 *@ud *channel-event:eyre]]) !>(~(top to q)))
%+ expect-eq
!>
:- (~(gas by *(map @ud @ud)) ~[0^3 1^1])
[~ [4 1 *channel-event:eyre]]
!>([a ~(top to q)])
::
++ test-subtract-acked-events
=/ a (~(gas by *(map @ud @ud)) ~[0^3 1^1])
=/ u (~(gas by *(map @ud @ud)) ~[0^4 2^1])
=/ e (~(gas by *(map @ud @ud)) ~[0^1 2^1])
=/ r (subtract-acked-events:eyre-gate a u)
(expect-eq !>(e) !>(r))
::
++ test-channel-sends-unacknowledged-events-on-reconnection
:: common initialization
@ -1796,6 +1807,123 @@
results9
==
::
++ test-channel-subscription-clogged
:: common initialization
::
=^ tested-elsewhere eyre-gate
(perform-init-start-channel eyre-gate *sley)
::
=/ now=@da :(add ~1111.1.2 clog-timeout:eyre-gate ~s1)
:: subscription gets a success message
::
=^ tested-elsewhere eyre-gate
%: eyre-take
eyre-gate
now
scry=scry-provides-code
^= take-args
:* wire=/channel/subscription/'0123456789abcdef'/'1'/~nul/two
duct=~[/http-put-request]
^- (hypo sign:eyre-gate)
:- *type
[%g %unto %watch-ack ~]
==
moves=~
==
:: opens the http channel
::
=^ tested-elsewhere eyre-gate
%: eyre-call
eyre-gate
now
scry=scry-provides-code
^= call-args
^- [duct * (hobo task:able:eyre-gate)]
:* duct=~[/http-get-open] ~
%request
%.n
[%ipv4 .192.168.1.1]
%'GET'
'/~/channel/0123456789abcdef'
['cookie' cookie-value]~
~
==
^= expected-moves
~ ::NOTE tested elsewher
==
:: user gets sent multiple subscription results
::
=/ max=@ud (dec clog-threshold:eyre-gate)
=/ cur=@ud 0
|- =* loop-fact $
?. =(cur max)
=^ tested-elsewhere eyre-gate
%: eyre-take
eyre-gate
now
scry=scry-provides-code
^= take-args
:* wire=/channel/subscription/'0123456789abcdef'/'1'/~nul/two
duct=~[/http-put-request]
^- (hypo sign:eyre-gate)
:- *type
[%g %unto %fact %json !>(`json`[%a [%n '1'] ~])]
==
^= moves
~ ::NOTE tested elsewhere
==
loop-fact(cur +(cur))
:: the next subscription result should trigger a clog
::
=^ results eyre-gate
%: eyre-take
eyre-gate
now
scry=scry-provides-code
^= take-args
:* wire=/channel/subscription/'0123456789abcdef'/'1'/~nul/two
duct=~[/http-put-request]
^- (hypo sign:eyre-gate)
:- *type
[%g %unto %fact %json !>(`json`[%a [%n '1'] ~])]
==
^= moves
:~ :* duct=~[/http-get-open]
%give
%response
%continue
:- ~
%- as-octt:mimes:html
"""
id: {((d-co:co 1) +(clog-threshold:eyre-gate))}
data: \{"id":1,"response":"quit"}
"""
complete=%.n
==
:* duct=~[/http-put-request] %pass
/channel/subscription/'0123456789abcdef'/'1'/~nul/two
%g %deal [~nul ~nul] %two %leave ~
==
:* duct=~[/http-get-open]
%give
%response
%continue
:- ~
%- as-octt:mimes:html
"""
id: {((d-co:co 1) clog-threshold:eyre-gate)}
data: \{"json":[1],"id":1,"response":"diff"}
"""
complete=%.n
==
==
==
results
::
++ test-born-sends-pending-cancels
::
=^ results1 eyre-gate

View File

@ -109,6 +109,10 @@ export class Eyre extends Component {
<td class="inter">next-id</td>
<td>{c['next-id']}</td>
</tr>
<tr>
<td class="inter">last-ack</td>
<td>{c['last-ack']}</td>
</tr>
<tr>
<td class="inter">unacked</td>
<td>{c.unacked.reduce((a, b) => a + b + ', ', '')}</td>
@ -119,25 +123,28 @@ export class Eyre extends Component {
//NOTE jsx sorta copied from /components/subscriptions
return {key: `${s.id} ${s.app} ${s.ship} ${s.path}`, jsx: (
<div class="flex">
<div class="flex-auto" style={{maxWidth: '35%'}}>
<div class="flex-auto" style={{maxWidth: '15%'}}>
{s.id}
</div>
<div class="flex-auto" style={{maxWidth: '15%'}}>
~{s.ship}
</div>
<div class="flex-auto" style={{maxWidth: '15%'}}>
<div class="flex-auto" style={{maxWidth: '20%'}}>
{s.app}
</div>
<div class="flex-auto" style={{maxWidth: '35%'}}>
{s.path}
</div>
<div class="flex-auto" style={{maxWidth: '15%'}}>
{s.unacked}
</div>
</div>
)};
});
return {key: c.session, jsx: (
<Summary summary={summary} details={(
<SearchableList
placeholder="wire, app, ship, path"
placeholder="id, app, ship, path"
items={subscriptionItems}
/>
)} />