Merge pull request #5121 from urbit/lf/channel-refactor

@urbit/http-api: ensure acks are sent if timers are throttled
This commit is contained in:
matildepark 2021-07-20 22:14:51 -04:00 committed by GitHub
commit 3cdf3a0762
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 70 additions and 155 deletions

View File

@ -164,14 +164,18 @@ export class Urbit {
/** /**
* Initializes the SSE pipe for the appropriate channel. * Initializes the SSE pipe for the appropriate channel.
*/ */
eventSource(): Promise<void> { async eventSource(): Promise<void> {
if(this.lastEventId === 0) { if(this.sseClientInitialized) {
// Can't receive events until the channel is open return Promise.resolve();
this.skipDebounce = true;
return this.poke({ app: 'hood', mark: 'helm-hi', json: 'Opening API channel' }).then(() => {});
} }
if(this.lastEventId === 0) {
// Can't receive events until the channel is open,
// so poke and open then
await this.poke({ app: 'hood', mark: 'helm-hi', json: 'Opening API channel' });
return;
}
this.sseClientInitialized = true;
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
if (!this.sseClientInitialized) {
const sseOptions: SSEOptions = { const sseOptions: SSEOptions = {
headers: {} headers: {}
}; };
@ -193,6 +197,7 @@ export class Urbit {
resolve(); resolve();
return; // everything's good return; // everything's good
} else { } else {
this.onError && this.onError(new Error('bad response'));
reject(); reject();
} }
}, },
@ -201,15 +206,14 @@ export class Urbit {
console.log('Received SSE: ', event); console.log('Received SSE: ', event);
} }
if (!event.id) return; if (!event.id) return;
this.ack(Number(event.id)); this.lastEventId = parseInt(event.id, 10);
if (event.data && JSON.parse(event.data)) { if((this.lastEventId - this.lastAcknowledgedEventId) > 20) {
this.ack(this.lastEventId);
const data: any = JSON.parse(event.data);
if (data.response === 'diff') {
this.clearQueue();
} }
if (event.data && JSON.parse(event.data)) {
const data: any = JSON.parse(event.data);
if (data.response === 'poke' && this.outstandingPokes.has(data.id)) { if (data.response === 'poke' && this.outstandingPokes.has(data.id)) {
const funcs = this.outstandingPokes.get(data.id); const funcs = this.outstandingPokes.get(data.id);
if (data.hasOwnProperty('ok')) { if (data.hasOwnProperty('ok')) {
@ -221,8 +225,8 @@ export class Urbit {
console.error('Invalid poke response', data); console.error('Invalid poke response', data);
} }
this.outstandingPokes.delete(data.id); this.outstandingPokes.delete(data.id);
} else if (data.response === 'subscribe' || } else if (data.response === 'subscribe'
(data.response === 'poke' && this.outstandingSubscriptions.has(data.id))) { && this.outstandingSubscriptions.has(data.id)) {
const funcs = this.outstandingSubscriptions.get(data.id); const funcs = this.outstandingSubscriptions.get(data.id);
if (data.hasOwnProperty('err')) { if (data.hasOwnProperty('err')) {
console.error(data.err); console.error(data.err);
@ -231,7 +235,11 @@ export class Urbit {
} }
} else if (data.response === 'diff' && this.outstandingSubscriptions.has(data.id)) { } else if (data.response === 'diff' && this.outstandingSubscriptions.has(data.id)) {
const funcs = this.outstandingSubscriptions.get(data.id); const funcs = this.outstandingSubscriptions.get(data.id);
try {
funcs.event(data.json); funcs.event(data.json);
} catch (e) {
console.error('Failed to call subscription event callback', e);
}
} else if (data.response === 'quit' && this.outstandingSubscriptions.has(data.id)) { } else if (data.response === 'quit' && this.outstandingSubscriptions.has(data.id)) {
const funcs = this.outstandingSubscriptions.get(data.id); const funcs = this.outstandingSubscriptions.get(data.id);
funcs.quit(data); funcs.quit(data);
@ -257,10 +265,7 @@ export class Urbit {
}, },
}); });
this.sseClientInitialized = true; })
}
resolve();
});
} }
/** /**
@ -272,12 +277,6 @@ export class Urbit {
this.abort.abort(); this.abort.abort();
this.abort = new AbortController(); this.abort = new AbortController();
this.uid = `${Math.floor(Date.now() / 1000)}-${hexString(6)}`; this.uid = `${Math.floor(Date.now() / 1000)}-${hexString(6)}`;
if(this.debounceTimer) {
clearTimeout(this.debounceTimer);
this.debounceTimer = null;
}
this.calm = true;
this.outstandingJSON = [];
this.lastEventId = 0; this.lastEventId = 0;
this.lastAcknowledgedEventId = 0; this.lastAcknowledgedEventId = 0;
this.outstandingSubscriptions = new Map(); this.outstandingSubscriptions = new Map();
@ -299,6 +298,7 @@ export class Urbit {
* @param eventId The event to acknowledge. * @param eventId The event to acknowledge.
*/ */
private async ack(eventId: number): Promise<number | void> { private async ack(eventId: number): Promise<number | void> {
this.lastAcknowledgedEventId = eventId;
const message: Message = { const message: Message = {
action: 'ack', action: 'ack',
'event-id': eventId 'event-id': eventId
@ -307,91 +307,19 @@ export class Urbit {
return eventId; return eventId;
} }
/** private async sendJSONtoChannel(...json: Message[]): Promise<void> {
* This is a wrapper method that can be used to send any action with data.
*
* Every message sent has some common parameters, like method, headers, and data
* structure, so this method exists to prevent duplication.
*
* @param action The action to send
* @param data The data to send with the action
*
* @returns void | number If successful, returns the number of the message that was sent
*/
// async sendMessage(action: Action, data?: object): Promise<number | void> {
// const id = this.getEventId();
// if (this.verbose) {
// console.log(`Sending message ${id}:`, action, data,);
// }
// const message: Message = { id, action, ...data };
// await this.sendJSONtoChannel(message);
// return id;
// }
private outstandingJSON: Message[] = [];
private debounceTimer: NodeJS.Timeout = null;
private debounceInterval = 10;
private skipDebounce = false;
private calm = true;
private sendJSONtoChannel(json: Message): Promise<boolean | void> {
this.outstandingJSON.push(json);
return this.processQueue();
}
private processQueue(): Promise<boolean | void> {
return new Promise(async (resolve, reject) => {
const process = async () => {
if (this.calm) {
if (this.outstandingJSON.length === 0) resolve(true);
this.calm = false; // We are now occupied
const json = this.outstandingJSON;
const body = JSON.stringify(json);
this.outstandingJSON = [];
if (body === '[]') {
this.calm = true;
return resolve(false);
}
try {
const response = await fetch(this.channelUrl, { const response = await fetch(this.channelUrl, {
...this.fetchOptions, ...this.fetchOptions,
method: 'PUT', method: 'PUT',
body body: JSON.stringify(json)
}); });
if(!response.ok) { if(!response.ok) {
throw new Error('failed to PUT'); throw new Error('Failed to PUT channel');
} }
} catch (error) { if(!this.sseClientInitialized) {
console.log(error); await this.eventSource();
json.forEach(failed => this.outstandingJSON.push(failed));
if (this.onError) {
this.onError(error);
} else {
throw error;
} }
} }
this.calm = true;
if (!this.sseClientInitialized) {
this.eventSource().then(resolve); // We can open the channel for subscriptions once we've sent data over it
}
resolve(true);
} else {
clearTimeout(this.debounceTimer);
this.debounceTimer = setTimeout(process, this.debounceInterval);
resolve(false);
}
}
if(this.skipDebounce) {
process();
this.skipDebounce = false;
}
this.debounceTimer = setTimeout(process, this.debounceInterval);
});
}
/** /**
* Creates a subscription, waits for a fact and then unsubscribes * Creates a subscription, waits for a fact and then unsubscribes
@ -433,24 +361,6 @@ export class Urbit {
}); });
} }
// resetDebounceTimer() {
// if (this.debounceTimer) {
// clearTimeout(this.debounceTimer);
// this.debounceTimer = null;
// }
// this.calm = false;
// this.debounceTimer = setTimeout(() => {
// this.calm = true;
// }, this.debounceInterval);
// }
clearQueue() {
clearTimeout(this.debounceTimer);
this.debounceTimer = null;
}
/** /**
* Pokes a ship with data. * Pokes a ship with data.
* *
@ -458,7 +368,7 @@ export class Urbit {
* @param mark The mark of the data being sent * @param mark The mark of the data being sent
* @param json The data to send * @param json The data to send
*/ */
poke<T>(params: PokeInterface<T>): Promise<number> { async poke<T>(params: PokeInterface<T>): Promise<number> {
const { const {
app, app,
mark, mark,
@ -472,7 +382,6 @@ export class Urbit {
ship: this.ship, ship: this.ship,
...params ...params
}; };
return new Promise((resolve, reject) => {
const message: Message = { const message: Message = {
id: this.getEventId(), id: this.getEventId(),
action: 'poke', action: 'poke',
@ -481,6 +390,9 @@ export class Urbit {
mark, mark,
json json
}; };
const [send, result] = await Promise.all([
this.sendJSONtoChannel(message),
new Promise<number>((resolve, reject) => {
this.outstandingPokes.set(message.id, { this.outstandingPokes.set(message.id, {
onSuccess: () => { onSuccess: () => {
onSuccess(); onSuccess();
@ -491,10 +403,9 @@ export class Urbit {
reject(event.err); reject(event.err);
} }
}); });
this.sendJSONtoChannel(message).then(() => { })
resolve(message.id); ]);
}); return result;
});
} }
/** /**

View File

@ -1,5 +1,7 @@
import Urbit from '../src'; import Urbit from '../src';
import { Readable } from 'streams'; import { Readable } from 'streams';
import 'jest';
function fakeSSE(messages = [], timeout = 0) { function fakeSSE(messages = [], timeout = 0) {
const ourMessages = [...messages]; const ourMessages = [...messages];
@ -60,7 +62,6 @@ describe('Initialisation', () => {
let fetchSpy; let fetchSpy;
beforeEach(() => { beforeEach(() => {
airlock = new Urbit('', '+code'); airlock = new Urbit('', '+code');
airlock.debounceInterval = 10;
}); });
afterEach(() => { afterEach(() => {
fetchSpy.mockReset(); fetchSpy.mockReset();
@ -73,7 +74,7 @@ describe('Initialisation', () => {
Promise.resolve({ ok: true, body: fakeSSE() }) Promise.resolve({ ok: true, body: fakeSSE() })
) )
.mockImplementationOnce(() => .mockImplementationOnce(() =>
Promise.resolve({ ok: true, body: fakeSSE() }) Promise.resolve({ ok: true, body: fakeSSE([ack(1)]) })
); );
await airlock.eventSource(); await airlock.eventSource();
@ -82,13 +83,16 @@ describe('Initialisation', () => {
it('should handle failures', async () => { it('should handle failures', async () => {
fetchSpy = jest.spyOn(window, 'fetch'); fetchSpy = jest.spyOn(window, 'fetch');
fetchSpy fetchSpy
.mockImplementation(() => .mockImplementationOnce(() =>
Promise.resolve({ ok: true, body: fakeSSE() })
)
.mockImplementationOnce(() =>
Promise.resolve({ ok: false, body: fakeSSE() }) Promise.resolve({ ok: false, body: fakeSSE() })
) )
airlock.onError = jest.fn(); airlock.onError = jest.fn();
try { try {
await airlock.eventSource(); await airlock.eventSource();
wait(100);
} catch (e) { } catch (e) {
expect(airlock.onError).toHaveBeenCalled(); expect(airlock.onError).toHaveBeenCalled();
} }