diff --git a/pkg/npm/http-api/src/Urbit.ts b/pkg/npm/http-api/src/Urbit.ts index 2017df3cb8..82dec26ea6 100644 --- a/pkg/npm/http-api/src/Urbit.ts +++ b/pkg/npm/http-api/src/Urbit.ts @@ -164,14 +164,18 @@ export class Urbit { /** * Initializes the SSE pipe for the appropriate channel. */ - eventSource(): Promise { - if(this.lastEventId === 0) { - // Can't receive events until the channel is open - this.skipDebounce = true; - return this.poke({ app: 'hood', mark: 'helm-hi', json: 'Opening API channel' }).then(() => {}); + async eventSource(): Promise { + if(this.sseClientInitialized) { + return Promise.resolve(); } + if(this.lastEventId === 0) { + // Can't receive events until the channel is open, + // so poke and open then + await this.poke({ app: 'hood', mark: 'helm-hi', json: 'Opening API channel' }); + return; + } + this.sseClientInitialized = true; return new Promise((resolve, reject) => { - if (!this.sseClientInitialized) { const sseOptions: SSEOptions = { headers: {} }; @@ -193,6 +197,7 @@ export class Urbit { resolve(); return; // everything's good } else { + this.onError && this.onError(new Error('bad response')); reject(); } }, @@ -201,14 +206,13 @@ export class Urbit { console.log('Received SSE: ', event); } if (!event.id) return; - this.ack(Number(event.id)); - if (event.data && JSON.parse(event.data)) { - - const data: any = JSON.parse(event.data); + this.lastEventId = parseInt(event.id, 10); + if((this.lastEventId - this.lastAcknowledgedEventId) > 20) { + this.ack(this.lastEventId); + } - if (data.response === 'diff') { - this.clearQueue(); - } + if (event.data && JSON.parse(event.data)) { + const data: any = JSON.parse(event.data); if (data.response === 'poke' && this.outstandingPokes.has(data.id)) { const funcs = this.outstandingPokes.get(data.id); @@ -221,8 +225,8 @@ export class Urbit { console.error('Invalid poke response', data); } this.outstandingPokes.delete(data.id); - } else if (data.response === 'subscribe' || - (data.response === 'poke' && this.outstandingSubscriptions.has(data.id))) { + } else if (data.response === 'subscribe' + && this.outstandingSubscriptions.has(data.id)) { const funcs = this.outstandingSubscriptions.get(data.id); if (data.hasOwnProperty('err')) { console.error(data.err); @@ -231,7 +235,11 @@ export class Urbit { } } else if (data.response === 'diff' && this.outstandingSubscriptions.has(data.id)) { const funcs = this.outstandingSubscriptions.get(data.id); - funcs.event(data.json); + try { + funcs.event(data.json); + } catch (e) { + console.error('Failed to call subscription event callback', e); + } } else if (data.response === 'quit' && this.outstandingSubscriptions.has(data.id)) { const funcs = this.outstandingSubscriptions.get(data.id); funcs.quit(data); @@ -257,10 +265,7 @@ export class Urbit { }, }); - this.sseClientInitialized = true; - } - resolve(); - }); + }) } /** @@ -272,12 +277,6 @@ export class Urbit { this.abort.abort(); this.abort = new AbortController(); this.uid = `${Math.floor(Date.now() / 1000)}-${hexString(6)}`; - if(this.debounceTimer) { - clearTimeout(this.debounceTimer); - this.debounceTimer = null; - } - this.calm = true; - this.outstandingJSON = []; this.lastEventId = 0; this.lastAcknowledgedEventId = 0; this.outstandingSubscriptions = new Map(); @@ -299,6 +298,7 @@ export class Urbit { * @param eventId The event to acknowledge. */ private async ack(eventId: number): Promise { + this.lastAcknowledgedEventId = eventId; const message: Message = { action: 'ack', 'event-id': eventId @@ -307,90 +307,18 @@ export class Urbit { return eventId; } - /** - * This is a wrapper method that can be used to send any action with data. - * - * Every message sent has some common parameters, like method, headers, and data - * structure, so this method exists to prevent duplication. - * - * @param action The action to send - * @param data The data to send with the action - * - * @returns void | number If successful, returns the number of the message that was sent - */ - // async sendMessage(action: Action, data?: object): Promise { - // const id = this.getEventId(); - // if (this.verbose) { - // console.log(`Sending message ${id}:`, action, data,); - // } - // const message: Message = { id, action, ...data }; - // await this.sendJSONtoChannel(message); - // return id; - // } - - private outstandingJSON: Message[] = []; - - private debounceTimer: NodeJS.Timeout = null; - private debounceInterval = 10; - private skipDebounce = false; - private calm = true; - - private sendJSONtoChannel(json: Message): Promise { - this.outstandingJSON.push(json); - return this.processQueue(); - } - - private processQueue(): Promise { - return new Promise(async (resolve, reject) => { - const process = async () => { - if (this.calm) { - if (this.outstandingJSON.length === 0) resolve(true); - this.calm = false; // We are now occupied - const json = this.outstandingJSON; - const body = JSON.stringify(json); - this.outstandingJSON = []; - if (body === '[]') { - this.calm = true; - return resolve(false); - } - try { - const response = await fetch(this.channelUrl, { - ...this.fetchOptions, - method: 'PUT', - body - }); - if(!response.ok) { - throw new Error('failed to PUT'); - } - } catch (error) { - console.log(error); - json.forEach(failed => this.outstandingJSON.push(failed)); - if (this.onError) { - this.onError(error); - } else { - throw error; - } - } - this.calm = true; - if (!this.sseClientInitialized) { - this.eventSource().then(resolve); // We can open the channel for subscriptions once we've sent data over it - } - resolve(true); - } else { - clearTimeout(this.debounceTimer); - this.debounceTimer = setTimeout(process, this.debounceInterval); - resolve(false); - } - } - if(this.skipDebounce) { - process(); - this.skipDebounce = false; - } - - this.debounceTimer = setTimeout(process, this.debounceInterval); - - + private async sendJSONtoChannel(...json: Message[]): Promise { + const response = await fetch(this.channelUrl, { + ...this.fetchOptions, + method: 'PUT', + body: JSON.stringify(json) }); + if(!response.ok) { + throw new Error('Failed to PUT channel'); + } + if(!this.sseClientInitialized) { + await this.eventSource(); + } } /** @@ -433,24 +361,6 @@ export class Urbit { }); } - - - // resetDebounceTimer() { - // if (this.debounceTimer) { - // clearTimeout(this.debounceTimer); - // this.debounceTimer = null; - // } - // this.calm = false; - // this.debounceTimer = setTimeout(() => { - // this.calm = true; - // }, this.debounceInterval); - // } - - clearQueue() { - clearTimeout(this.debounceTimer); - this.debounceTimer = null; - } - /** * Pokes a ship with data. * @@ -458,7 +368,7 @@ export class Urbit { * @param mark The mark of the data being sent * @param json The data to send */ - poke(params: PokeInterface): Promise { + async poke(params: PokeInterface): Promise { const { app, mark, @@ -472,29 +382,30 @@ export class Urbit { ship: this.ship, ...params }; - return new Promise((resolve, reject) => { - const message: Message = { - id: this.getEventId(), - action: 'poke', - ship, - app, - mark, - json - }; - this.outstandingPokes.set(message.id, { - onSuccess: () => { - onSuccess(); - resolve(message.id); - }, - onError: (event) => { - onError(event); - reject(event.err); - } - }); - this.sendJSONtoChannel(message).then(() => { - resolve(message.id); - }); - }); + const message: Message = { + id: this.getEventId(), + action: 'poke', + ship, + app, + mark, + json + }; + const [send, result] = await Promise.all([ + this.sendJSONtoChannel(message), + new Promise((resolve, reject) => { + this.outstandingPokes.set(message.id, { + onSuccess: () => { + onSuccess(); + resolve(message.id); + }, + onError: (event) => { + onError(event); + reject(event.err); + } + }); + }) + ]); + return result; } /** diff --git a/pkg/npm/http-api/test/default.test.ts b/pkg/npm/http-api/test/default.test.ts index 3d13d1f5f7..5a5e05b9d8 100644 --- a/pkg/npm/http-api/test/default.test.ts +++ b/pkg/npm/http-api/test/default.test.ts @@ -1,5 +1,7 @@ + import Urbit from '../src'; import { Readable } from 'streams'; +import 'jest'; function fakeSSE(messages = [], timeout = 0) { const ourMessages = [...messages]; @@ -60,7 +62,6 @@ describe('Initialisation', () => { let fetchSpy; beforeEach(() => { airlock = new Urbit('', '+code'); - airlock.debounceInterval = 10; }); afterEach(() => { fetchSpy.mockReset(); @@ -73,7 +74,7 @@ describe('Initialisation', () => { Promise.resolve({ ok: true, body: fakeSSE() }) ) .mockImplementationOnce(() => - Promise.resolve({ ok: true, body: fakeSSE() }) + Promise.resolve({ ok: true, body: fakeSSE([ack(1)]) }) ); await airlock.eventSource(); @@ -82,13 +83,16 @@ describe('Initialisation', () => { it('should handle failures', async () => { fetchSpy = jest.spyOn(window, 'fetch'); fetchSpy - .mockImplementation(() => + .mockImplementationOnce(() => + Promise.resolve({ ok: true, body: fakeSSE() }) + ) + .mockImplementationOnce(() => Promise.resolve({ ok: false, body: fakeSSE() }) ) + airlock.onError = jest.fn(); try { await airlock.eventSource(); - wait(100); } catch (e) { expect(airlock.onError).toHaveBeenCalled(); }