diff --git a/pkg/arvo/app/dbug.hoon b/pkg/arvo/app/dbug.hoon index 29be53e07e..dd4372c3f2 100644 --- a/pkg/arvo/app/dbug.hoon +++ b/pkg/arvo/app/dbug.hoon @@ -380,6 +380,7 @@ 'connected'^b+!-.state 'expiry'^?-(-.state %& (time date.p.state), %| ~) 'next-id'^(numb next-id) + 'last-ack'^(time last-ack) 'unacked'^a+(turn (sort (turn ~(tap in events) head) dor) numb) :: :- 'subscriptions' @@ -391,6 +392,7 @@ 'ship'^(^ship ship) 'app'^s+app 'path'^(^path path) + 'unacked'^(numb (~(gut by unacked) id 0)) == == == diff --git a/pkg/arvo/sys/vane/eyre.hoon b/pkg/arvo/sys/vane/eyre.hoon index d2f8e50620..3ed6750101 100644 --- a/pkg/arvo/sys/vane/eyre.hoon +++ b/pkg/arvo/sys/vane/eyre.hoon @@ -131,6 +131,12 @@ :: [%delete ~] == +:: clog-timeout: the delay between acks after which clog-threshold kicks in +:: +++ clog-timeout ~s30 +:: clog-threshold: maximum per-subscription event buildup, after clog-timeout +:: +++ clog-threshold 50 :: channel-timeout: the delay before a channel should be reaped :: ++ channel-timeout ~h12 @@ -152,22 +158,45 @@ (can 3 a) :: +prune-events: removes all items from the front of the queue up to :id :: +:: also produces, per request-id, the amount of events that have got acked, +:: for use with +subtract-acked-events. +:: ++ prune-events + =| acked=(map @ud @ud) |= [q=(qeu [id=@ud @ud channel-event]) id=@ud] - ^+ q + ^+ [acked q] :: if the queue is now empty, that's fine :: ?: =(~ q) - ~ + [acked ~] :: - =/ next=[item=[id=@ud @ud channel-event] _q] ~(get to q) + =/ next=[item=[id=@ud request-id=@ud channel-event] _q] ~(get to q) :: if the head of the queue is newer than the acknowledged id, we're done :: ?: (gth id.item.next id) - q - :: otherwise, check next item + [acked q] + :: otherwise, note the ack, and check next item :: - $(q +:next) + %_ $ + q +:next + :: + acked + =, item.next + %+ ~(put by acked) request-id + +((~(gut by acked) request-id 0)) + == +:: +subtract-acked-events: update the subscription map's pending ack counts +:: +++ subtract-acked-events + |= [acked=(map @ud @ud) unacked=(map @ud @ud)] + ^+ unacked + %+ roll ~(tap by acked) + |= [[rid=@ud ack=@ud] unacked=_unacked] + ?~ sus=(~(get by unacked) rid) + unacked + %+ ~(put by unacked) rid + ?: (lte u.sus ack) 0 + (sub u.sus ack) :: +parse-channel-request: parses a list of channel-requests :: :: Parses a json array into a list of +channel-request. If any of the items @@ -1123,7 +1152,7 @@ %_ ..update-timeout-timer-for session.channel-state.state %+ ~(put by session.channel-state.state) channel-id - [[%& expiration-time duct] 0 ~ ~ ~] + [[%& expiration-time duct] 0 now ~ ~ ~ ~] :: moves [(set-timeout-move channel-id expiration-time) moves] @@ -1267,7 +1296,11 @@ %+ ~(jab by session.channel-state.state) channel-id |= =channel ^+ channel - channel(events (prune-events events.channel last-event-id)) + =^ acked events.channel + (prune-events events.channel last-event-id) + =. unacked.channel + (subtract-acked-events acked unacked.channel) + channel(last-ack now) == :: +on-put-request: handles a PUT request :: @@ -1401,9 +1434,10 @@ =. session.channel-state.state %+ ~(jab by session.channel-state.state) channel-id |= =channel - =- channel(subscriptions -) - %- ~(del by subscriptions.channel) - subscription-id + %_ channel + subscriptions (~(del by subscriptions.channel) subscription-id) + unacked (~(del by unacked.channel) subscription-id) + == :: $(requests t.requests) :: @@ -1450,10 +1484,9 @@ :: send it to a connected browser so in case of disconnection, we can :: resend it. :: - :: This function is responsible for taking the raw json lines and - :: converting them into a text/event-stream. The :event-stream-lines - :: then may get sent, and are stored for later resending until - :: acknowledged by the client. + :: This function is responsible for taking the event sign and converting + :: it into a text/event-stream. The :sign then may get sent, and is + :: stored for later resending until acknowledged by the client. :: ++ emit-event |= [channel-id=@t request-id=@ud =sign:agent:gall] @@ -1466,6 +1499,11 @@ [duct %pass /flog %d %flog %crud %eyre-no-channel >id=channel-id< ~] :: =/ event-id next-id.u.channel + :: store the event as unacked + :: + =. events.u.channel + %- ~(put to events.u.channel) + [event-id request-id (sign-to-channel-event sign)] :: if a client is connected, send this event to them. :: =? moves ?=([%| *] state.u.channel) @@ -1483,22 +1521,65 @@ :: complete=%.n == + :: update channel's unacked counts, find out if clogged + :: + =^ clogged unacked.u.channel + :: poke-acks are one-offs, don't apply clog logic to them + :: + ?: ?=(%poke-ack -.sign) [| unacked.u.channel] + =/ num=@ud + (~(gut by unacked.u.channel) request-id 0) + :_ (~(put by unacked.u.channel) request-id +(num)) + ?& (gte num clog-threshold) + (lth (add last-ack.u.channel clog-timeout) now) + == + :: + ~? clogged [%e %clogged channel-id request-id] + :: if we're clogged, end this gall subscription + :: + =? moves clogged + :_ moves + =+ (~(got by subscriptions.u.channel) request-id) + :^ duct %pass + (subscription-wire channel-id request-id ship app) + [%g %deal [our ship] app %leave ~] + =? event-id clogged +(event-id) + =? u.channel clogged + %_ u.channel + subscriptions (~(del by subscriptions.u.channel) request-id) + unacked (~(del by unacked.u.channel) request-id) + events %- ~(put to events.u.channel) + [event-id request-id (sign-to-channel-event %kick ~)] + == + :: if a client is connected, send the kick event to them + :: + =? moves &(clogged ?=([%| *] state.u.channel)) + :_ moves + :+ p.state.u.channel %give + ^- gift:able + :* %response %continue + :: + ^= data + %- wall-to-octs + %+ event-json-to-wall event-id + (need (sign-to-json request-id %kick ~)) + :: + complete=%.n + == :: - =/ =channel-event - ?. ?=(%fact -.sign) sign - [%fact [p q.q]:cage.sign] :- moves %_ state session.channel-state - %+ ~(jab by session.channel-state.state) channel-id - |= =^channel - ^+ channel - :: - %_ channel - next-id +(next-id.channel) - events (~(put to events.channel) [event-id request-id channel-event]) - == + %+ ~(put by session.channel-state.state) channel-id + u.channel(next-id +(event-id)) == + :: +sign-to-channel-event: strip the vase from a sign:agent:gall + :: + ++ sign-to-channel-event + |= =sign:agent:gall + ^- channel-event + ?. ?=(%fact -.sign) sign + [%fact [p q.q]:cage.sign] :: +channel-event-to-sign: attempt to recover a sign from a channel-event :: ++ channel-event-to-sign @@ -2450,16 +2531,17 @@ :: than wiping channels entirely. session.channel-state.server-state.old %- ~(run by session.channel-state.server-state.old) - |= old-channel=channel-2020-9-30 + |= channel-2020-9-30 ^- channel - %= old-channel - events *(qeu [@ud @ud channel-event]) - :: - subscriptions + =/ subscriptions %- ~(gas by *(map @ud [@p term path duct])) - %+ turn ~(tap by subscriptions.old-channel) + %+ turn ~(tap by subscriptions) |= [=wire rest=[@p term path duct]] [(slav %ud (snag 3 wire)) rest] + :* state next-id now + *(qeu [@ud @ud channel-event]) + *(map @ud @ud) + subscriptions heartbeat == == :: diff --git a/pkg/arvo/sys/zuse.hoon b/pkg/arvo/sys/zuse.hoon index ee129aeb13..4dc733ca8f 100644 --- a/pkg/arvo/sys/zuse.hoon +++ b/pkg/arvo/sys/zuse.hoon @@ -1392,6 +1392,11 @@ :: next-id: next sequence number to use :: next-id=@ud + :: last-ack: time of last client ack + :: + :: used for clog calculations, in combination with :unacked + :: + last-ack=@da :: events: unacknowledged events :: :: We keep track of all events where we haven't received a @@ -1401,6 +1406,11 @@ :: can't assume it got received until we get an acknowledgment. :: events=(qeu [id=@ud request-id=@ud =channel-event]) + :: unacked: unacknowledged event counts by request-id + :: + :: used for clog calculations, in combination with :last-ack + :: + unacked=(map @ud @ud) :: subscriptions: gall subscriptions by request-id :: :: We maintain a list of subscriptions so if a channel times out, we diff --git a/pkg/arvo/tests/sys/vane/eyre.hoon b/pkg/arvo/tests/sys/vane/eyre.hoon index b5ca0af1a7..b5323038fe 100644 --- a/pkg/arvo/tests/sys/vane/eyre.hoon +++ b/pkg/arvo/tests/sys/vane/eyre.hoon @@ -1517,15 +1517,26 @@ :: ++ test-prune-events =/ q=(qeu [id=@ud @ud channel-event:eyre]) ~ - =. q (~(put to q) [0 *@ud *channel-event:eyre]) - =. q (~(put to q) [1 *@ud *channel-event:eyre]) - =. q (~(put to q) [2 *@ud *channel-event:eyre]) - =. q (~(put to q) [3 *@ud *channel-event:eyre]) - =. q (~(put to q) [4 *@ud *channel-event:eyre]) + =. q (~(put to q) [0 0 *channel-event:eyre]) + =. q (~(put to q) [1 0 *channel-event:eyre]) + =. q (~(put to q) [2 0 *channel-event:eyre]) + =. q (~(put to q) [3 1 *channel-event:eyre]) + =. q (~(put to q) [4 1 *channel-event:eyre]) :: - =. q (prune-events:eyre-gate q 3) + =^ a q (prune-events:eyre-gate q 3) :: - (expect-eq !>([~ [4 *@ud *channel-event:eyre]]) !>(~(top to q))) + %+ expect-eq + !> + :- (~(gas by *(map @ud @ud)) ~[0^3 1^1]) + [~ [4 1 *channel-event:eyre]] + !>([a ~(top to q)]) +:: +++ test-subtract-acked-events + =/ a (~(gas by *(map @ud @ud)) ~[0^3 1^1]) + =/ u (~(gas by *(map @ud @ud)) ~[0^4 2^1]) + =/ e (~(gas by *(map @ud @ud)) ~[0^1 2^1]) + =/ r (subtract-acked-events:eyre-gate a u) + (expect-eq !>(e) !>(r)) :: ++ test-channel-sends-unacknowledged-events-on-reconnection :: common initialization @@ -1796,6 +1807,123 @@ results9 == :: +++ test-channel-subscription-clogged + :: common initialization + :: + =^ tested-elsewhere eyre-gate + (perform-init-start-channel eyre-gate *sley) + :: + =/ now=@da :(add ~1111.1.2 clog-timeout:eyre-gate ~s1) + :: subscription gets a success message + :: + =^ tested-elsewhere eyre-gate + %: eyre-take + eyre-gate + now + scry=scry-provides-code + ^= take-args + :* wire=/channel/subscription/'0123456789abcdef'/'1'/~nul/two + duct=~[/http-put-request] + ^- (hypo sign:eyre-gate) + :- *type + [%g %unto %watch-ack ~] + == + moves=~ + == + :: opens the http channel + :: + =^ tested-elsewhere eyre-gate + %: eyre-call + eyre-gate + now + scry=scry-provides-code + ^= call-args + ^- [duct * (hobo task:able:eyre-gate)] + :* duct=~[/http-get-open] ~ + %request + %.n + [%ipv4 .192.168.1.1] + %'GET' + '/~/channel/0123456789abcdef' + ['cookie' cookie-value]~ + ~ + == + ^= expected-moves + ~ ::NOTE tested elsewher + == + :: user gets sent multiple subscription results + :: + =/ max=@ud (dec clog-threshold:eyre-gate) + =/ cur=@ud 0 + |- =* loop-fact $ + ?. =(cur max) + =^ tested-elsewhere eyre-gate + %: eyre-take + eyre-gate + now + scry=scry-provides-code + ^= take-args + :* wire=/channel/subscription/'0123456789abcdef'/'1'/~nul/two + duct=~[/http-put-request] + ^- (hypo sign:eyre-gate) + :- *type + [%g %unto %fact %json !>(`json`[%a [%n '1'] ~])] + == + ^= moves + ~ ::NOTE tested elsewhere + == + loop-fact(cur +(cur)) + :: the next subscription result should trigger a clog + :: + =^ results eyre-gate + %: eyre-take + eyre-gate + now + scry=scry-provides-code + ^= take-args + :* wire=/channel/subscription/'0123456789abcdef'/'1'/~nul/two + duct=~[/http-put-request] + ^- (hypo sign:eyre-gate) + :- *type + [%g %unto %fact %json !>(`json`[%a [%n '1'] ~])] + == + ^= moves + :~ :* duct=~[/http-get-open] + %give + %response + %continue + :- ~ + %- as-octt:mimes:html + """ + id: {((d-co:co 1) +(clog-threshold:eyre-gate))} + data: \{"id":1,"response":"quit"} + + + """ + complete=%.n + == + :* duct=~[/http-put-request] %pass + /channel/subscription/'0123456789abcdef'/'1'/~nul/two + %g %deal [~nul ~nul] %two %leave ~ + == + :* duct=~[/http-get-open] + %give + %response + %continue + :- ~ + %- as-octt:mimes:html + """ + id: {((d-co:co 1) clog-threshold:eyre-gate)} + data: \{"json":[1],"id":1,"response":"diff"} + + + """ + complete=%.n + == + == + == + results +:: ++ test-born-sends-pending-cancels :: =^ results1 eyre-gate diff --git a/pkg/interface/dbug/src/js/views/eyre.js b/pkg/interface/dbug/src/js/views/eyre.js index 45807fa6de..2fd4670990 100644 --- a/pkg/interface/dbug/src/js/views/eyre.js +++ b/pkg/interface/dbug/src/js/views/eyre.js @@ -109,6 +109,10 @@ export class Eyre extends Component { next-id {c['next-id']} + + last-ack + {c['last-ack']} + unacked {c.unacked.reduce((a, b) => a + b + ', ', '')} @@ -119,25 +123,28 @@ export class Eyre extends Component { //NOTE jsx sorta copied from /components/subscriptions return {key: `${s.id} ${s.app} ${s.ship} ${s.path}`, jsx: (
-
+
{s.id}
~{s.ship}
-
+
{s.app}
{s.path}
+
+ {s.unacked} +
)}; }); return {key: c.session, jsx: ( )} />