diff --git a/pkg/npm/http-api/src/Urbit.ts b/pkg/npm/http-api/src/Urbit.ts index 2017df3cb..20f6bd25e 100644 --- a/pkg/npm/http-api/src/Urbit.ts +++ b/pkg/npm/http-api/src/Urbit.ts @@ -164,14 +164,18 @@ export class Urbit { /** * Initializes the SSE pipe for the appropriate channel. */ - eventSource(): Promise { + async eventSource(): Promise { + if(this.sseClientInitialized) { + return Promise.resolve(); + } + this.sseClientInitialized = true; if(this.lastEventId === 0) { - // Can't receive events until the channel is open - this.skipDebounce = true; - return this.poke({ app: 'hood', mark: 'helm-hi', json: 'Opening API channel' }).then(() => {}); + // Can't receive events until the channel is open, + // so poke and open then + await this.poke({ app: 'hood', mark: 'helm-hi', json: 'Opening API channel' }); + return; } return new Promise((resolve, reject) => { - if (!this.sseClientInitialized) { const sseOptions: SSEOptions = { headers: {} }; @@ -201,14 +205,13 @@ export class Urbit { console.log('Received SSE: ', event); } if (!event.id) return; - this.ack(Number(event.id)); - if (event.data && JSON.parse(event.data)) { - - const data: any = JSON.parse(event.data); + this.lastEventId = parseInt(event.id, 10); + if((this.lastEventId - this.lastAcknowledgedEventId) > 20) { + this.ack(this.lastEventId); + } - if (data.response === 'diff') { - this.clearQueue(); - } + if (event.data && JSON.parse(event.data)) { + const data: any = JSON.parse(event.data); if (data.response === 'poke' && this.outstandingPokes.has(data.id)) { const funcs = this.outstandingPokes.get(data.id); @@ -221,8 +224,8 @@ export class Urbit { console.error('Invalid poke response', data); } this.outstandingPokes.delete(data.id); - } else if (data.response === 'subscribe' || - (data.response === 'poke' && this.outstandingSubscriptions.has(data.id))) { + } else if (data.response === 'subscribe' + && this.outstandingSubscriptions.has(data.id)) { const funcs = this.outstandingSubscriptions.get(data.id); if (data.hasOwnProperty('err')) { console.error(data.err); @@ -231,7 +234,11 @@ export class Urbit { } } else if (data.response === 'diff' && this.outstandingSubscriptions.has(data.id)) { const funcs = this.outstandingSubscriptions.get(data.id); - funcs.event(data.json); + try { + funcs.event(data.json); + } catch (e) { + console.error('Failed to call subscription event callback', e); + } } else if (data.response === 'quit' && this.outstandingSubscriptions.has(data.id)) { const funcs = this.outstandingSubscriptions.get(data.id); funcs.quit(data); @@ -257,10 +264,7 @@ export class Urbit { }, }); - this.sseClientInitialized = true; - } - resolve(); - }); + }) } /** @@ -272,12 +276,6 @@ export class Urbit { this.abort.abort(); this.abort = new AbortController(); this.uid = `${Math.floor(Date.now() / 1000)}-${hexString(6)}`; - if(this.debounceTimer) { - clearTimeout(this.debounceTimer); - this.debounceTimer = null; - } - this.calm = true; - this.outstandingJSON = []; this.lastEventId = 0; this.lastAcknowledgedEventId = 0; this.outstandingSubscriptions = new Map(); @@ -299,6 +297,7 @@ export class Urbit { * @param eventId The event to acknowledge. */ private async ack(eventId: number): Promise { + this.lastAcknowledgedEventId = eventId; const message: Message = { action: 'ack', 'event-id': eventId @@ -307,90 +306,18 @@ export class Urbit { return eventId; } - /** - * This is a wrapper method that can be used to send any action with data. - * - * Every message sent has some common parameters, like method, headers, and data - * structure, so this method exists to prevent duplication. - * - * @param action The action to send - * @param data The data to send with the action - * - * @returns void | number If successful, returns the number of the message that was sent - */ - // async sendMessage(action: Action, data?: object): Promise { - // const id = this.getEventId(); - // if (this.verbose) { - // console.log(`Sending message ${id}:`, action, data,); - // } - // const message: Message = { id, action, ...data }; - // await this.sendJSONtoChannel(message); - // return id; - // } - - private outstandingJSON: Message[] = []; - - private debounceTimer: NodeJS.Timeout = null; - private debounceInterval = 10; - private skipDebounce = false; - private calm = true; - - private sendJSONtoChannel(json: Message): Promise { - this.outstandingJSON.push(json); - return this.processQueue(); - } - - private processQueue(): Promise { - return new Promise(async (resolve, reject) => { - const process = async () => { - if (this.calm) { - if (this.outstandingJSON.length === 0) resolve(true); - this.calm = false; // We are now occupied - const json = this.outstandingJSON; - const body = JSON.stringify(json); - this.outstandingJSON = []; - if (body === '[]') { - this.calm = true; - return resolve(false); - } - try { - const response = await fetch(this.channelUrl, { - ...this.fetchOptions, - method: 'PUT', - body - }); - if(!response.ok) { - throw new Error('failed to PUT'); - } - } catch (error) { - console.log(error); - json.forEach(failed => this.outstandingJSON.push(failed)); - if (this.onError) { - this.onError(error); - } else { - throw error; - } - } - this.calm = true; - if (!this.sseClientInitialized) { - this.eventSource().then(resolve); // We can open the channel for subscriptions once we've sent data over it - } - resolve(true); - } else { - clearTimeout(this.debounceTimer); - this.debounceTimer = setTimeout(process, this.debounceInterval); - resolve(false); - } - } - if(this.skipDebounce) { - process(); - this.skipDebounce = false; - } - - this.debounceTimer = setTimeout(process, this.debounceInterval); - - + private async sendJSONtoChannel(...json: Message[]): Promise { + const response = await fetch(this.channelUrl, { + ...this.fetchOptions, + method: 'PUT', + body: JSON.stringify(json) }); + if(!response.ok) { + throw new Error('Failed to PUT channel'); + } + if(!this.sseClientInitialized) { + await this.eventSource(); + } } /** @@ -433,24 +360,6 @@ export class Urbit { }); } - - - // resetDebounceTimer() { - // if (this.debounceTimer) { - // clearTimeout(this.debounceTimer); - // this.debounceTimer = null; - // } - // this.calm = false; - // this.debounceTimer = setTimeout(() => { - // this.calm = true; - // }, this.debounceInterval); - // } - - clearQueue() { - clearTimeout(this.debounceTimer); - this.debounceTimer = null; - } - /** * Pokes a ship with data. *