@urbit/http-api: ensure acks are sent if timers are throttled

Strips the debounce mechanic, instead preferring to send acks when 20 or
more events have not been acked
This commit is contained in:
Liam Fitzgerald 2021-07-20 12:19:31 +10:00
parent 22d0d9557f
commit 86c3d156dd
No known key found for this signature in database
GPG Key ID: D390E12C61D1CFFB

View File

@ -164,14 +164,18 @@ export class Urbit {
/**
* Initializes the SSE pipe for the appropriate channel.
*/
eventSource(): Promise<void> {
async eventSource(): Promise<void> {
if(this.sseClientInitialized) {
return Promise.resolve();
}
this.sseClientInitialized = true;
if(this.lastEventId === 0) {
// Can't receive events until the channel is open
this.skipDebounce = true;
return this.poke({ app: 'hood', mark: 'helm-hi', json: 'Opening API channel' }).then(() => {});
// Can't receive events until the channel is open,
// so poke and open then
await this.poke({ app: 'hood', mark: 'helm-hi', json: 'Opening API channel' });
return;
}
return new Promise((resolve, reject) => {
if (!this.sseClientInitialized) {
const sseOptions: SSEOptions = {
headers: {}
};
@ -201,15 +205,14 @@ export class Urbit {
console.log('Received SSE: ', event);
}
if (!event.id) return;
this.ack(Number(event.id));
if (event.data && JSON.parse(event.data)) {
const data: any = JSON.parse(event.data);
if (data.response === 'diff') {
this.clearQueue();
this.lastEventId = parseInt(event.id, 10);
if((this.lastEventId - this.lastAcknowledgedEventId) > 20) {
this.ack(this.lastEventId);
}
if (event.data && JSON.parse(event.data)) {
const data: any = JSON.parse(event.data);
if (data.response === 'poke' && this.outstandingPokes.has(data.id)) {
const funcs = this.outstandingPokes.get(data.id);
if (data.hasOwnProperty('ok')) {
@ -221,8 +224,8 @@ export class Urbit {
console.error('Invalid poke response', data);
}
this.outstandingPokes.delete(data.id);
} else if (data.response === 'subscribe' ||
(data.response === 'poke' && this.outstandingSubscriptions.has(data.id))) {
} else if (data.response === 'subscribe'
&& this.outstandingSubscriptions.has(data.id)) {
const funcs = this.outstandingSubscriptions.get(data.id);
if (data.hasOwnProperty('err')) {
console.error(data.err);
@ -231,7 +234,11 @@ export class Urbit {
}
} else if (data.response === 'diff' && this.outstandingSubscriptions.has(data.id)) {
const funcs = this.outstandingSubscriptions.get(data.id);
try {
funcs.event(data.json);
} catch (e) {
console.error('Failed to call subscription event callback', e);
}
} else if (data.response === 'quit' && this.outstandingSubscriptions.has(data.id)) {
const funcs = this.outstandingSubscriptions.get(data.id);
funcs.quit(data);
@ -257,10 +264,7 @@ export class Urbit {
},
});
this.sseClientInitialized = true;
}
resolve();
});
})
}
/**
@ -272,12 +276,6 @@ export class Urbit {
this.abort.abort();
this.abort = new AbortController();
this.uid = `${Math.floor(Date.now() / 1000)}-${hexString(6)}`;
if(this.debounceTimer) {
clearTimeout(this.debounceTimer);
this.debounceTimer = null;
}
this.calm = true;
this.outstandingJSON = [];
this.lastEventId = 0;
this.lastAcknowledgedEventId = 0;
this.outstandingSubscriptions = new Map();
@ -299,6 +297,7 @@ export class Urbit {
* @param eventId The event to acknowledge.
*/
private async ack(eventId: number): Promise<number | void> {
this.lastAcknowledgedEventId = eventId;
const message: Message = {
action: 'ack',
'event-id': eventId
@ -307,91 +306,19 @@ export class Urbit {
return eventId;
}
/**
* This is a wrapper method that can be used to send any action with data.
*
* Every message sent has some common parameters, like method, headers, and data
* structure, so this method exists to prevent duplication.
*
* @param action The action to send
* @param data The data to send with the action
*
* @returns void | number If successful, returns the number of the message that was sent
*/
// async sendMessage(action: Action, data?: object): Promise<number | void> {
// const id = this.getEventId();
// if (this.verbose) {
// console.log(`Sending message ${id}:`, action, data,);
// }
// const message: Message = { id, action, ...data };
// await this.sendJSONtoChannel(message);
// return id;
// }
private outstandingJSON: Message[] = [];
private debounceTimer: NodeJS.Timeout = null;
private debounceInterval = 10;
private skipDebounce = false;
private calm = true;
private sendJSONtoChannel(json: Message): Promise<boolean | void> {
this.outstandingJSON.push(json);
return this.processQueue();
}
private processQueue(): Promise<boolean | void> {
return new Promise(async (resolve, reject) => {
const process = async () => {
if (this.calm) {
if (this.outstandingJSON.length === 0) resolve(true);
this.calm = false; // We are now occupied
const json = this.outstandingJSON;
const body = JSON.stringify(json);
this.outstandingJSON = [];
if (body === '[]') {
this.calm = true;
return resolve(false);
}
try {
private async sendJSONtoChannel(...json: Message[]): Promise<void> {
const response = await fetch(this.channelUrl, {
...this.fetchOptions,
method: 'PUT',
body
body: JSON.stringify(json)
});
if(!response.ok) {
throw new Error('failed to PUT');
throw new Error('Failed to PUT channel');
}
} catch (error) {
console.log(error);
json.forEach(failed => this.outstandingJSON.push(failed));
if (this.onError) {
this.onError(error);
} else {
throw error;
if(!this.sseClientInitialized) {
await this.eventSource();
}
}
this.calm = true;
if (!this.sseClientInitialized) {
this.eventSource().then(resolve); // We can open the channel for subscriptions once we've sent data over it
}
resolve(true);
} else {
clearTimeout(this.debounceTimer);
this.debounceTimer = setTimeout(process, this.debounceInterval);
resolve(false);
}
}
if(this.skipDebounce) {
process();
this.skipDebounce = false;
}
this.debounceTimer = setTimeout(process, this.debounceInterval);
});
}
/**
* Creates a subscription, waits for a fact and then unsubscribes
@ -433,24 +360,6 @@ export class Urbit {
});
}
// resetDebounceTimer() {
// if (this.debounceTimer) {
// clearTimeout(this.debounceTimer);
// this.debounceTimer = null;
// }
// this.calm = false;
// this.debounceTimer = setTimeout(() => {
// this.calm = true;
// }, this.debounceInterval);
// }
clearQueue() {
clearTimeout(this.debounceTimer);
this.debounceTimer = null;
}
/**
* Pokes a ship with data.
*