From 6256f7d92bbd3df34060b5c40473f3631bad4f14 Mon Sep 17 00:00:00 2001 From: Liam Fitzgerald Date: Mon, 12 Jul 2021 12:07:56 +1000 Subject: [PATCH] @urbit/http-api: add tests, fix uncovered bugs --- pkg/npm/http-api/src/Urbit.ts | 24 ++-- pkg/npm/http-api/test/default.test.ts | 167 +++++++++++++++++++++++++- 2 files changed, 180 insertions(+), 11 deletions(-) diff --git a/pkg/npm/http-api/src/Urbit.ts b/pkg/npm/http-api/src/Urbit.ts index 90da37113..2017df3cb 100644 --- a/pkg/npm/http-api/src/Urbit.ts +++ b/pkg/npm/http-api/src/Urbit.ts @@ -165,6 +165,11 @@ export class Urbit { * Initializes the SSE pipe for the appropriate channel. */ eventSource(): Promise { + if(this.lastEventId === 0) { + // Can't receive events until the channel is open + this.skipDebounce = true; + return this.poke({ app: 'hood', mark: 'helm-hi', json: 'Opening API channel' }).then(() => {}); + } return new Promise((resolve, reject) => { if (!this.sseClientInitialized) { const sseOptions: SSEOptions = { @@ -175,10 +180,6 @@ export class Urbit { } else if (isNode) { sseOptions.headers.Cookie = this.cookie; } - if (this.lastEventId === 0) { - // Can't receive events until the channel is open - this.poke({ app: 'hood', mark: 'helm-hi', json: 'Opening API channel' }); - } fetchEventSource(this.channelUrl, { ...this.fetchOptions, openWhenHidden: true, @@ -236,6 +237,7 @@ export class Urbit { funcs.quit(data); this.outstandingSubscriptions.delete(data.id); } else { + console.log([...this.outstandingSubscriptions.keys()]); console.log('Unrecognized response', data); } } @@ -329,7 +331,8 @@ export class Urbit { private outstandingJSON: Message[] = []; private debounceTimer: NodeJS.Timeout = null; - private debounceInterval = 500; + private debounceInterval = 10; + private skipDebounce = false; private calm = true; private sendJSONtoChannel(json: Message): Promise { @@ -351,11 +354,14 @@ export class Urbit { return resolve(false); } try { - await fetch(this.channelUrl, { + const response = await fetch(this.channelUrl, { ...this.fetchOptions, method: 'PUT', body }); + if(!response.ok) { + throw new Error('failed to PUT'); + } } catch (error) { console.log(error); json.forEach(failed => this.outstandingJSON.push(failed)); @@ -367,7 +373,7 @@ export class Urbit { } this.calm = true; if (!this.sseClientInitialized) { - this.eventSource(); // We can open the channel for subscriptions once we've sent data over it + this.eventSource().then(resolve); // We can open the channel for subscriptions once we've sent data over it } resolve(true); } else { @@ -376,6 +382,10 @@ export class Urbit { resolve(false); } } + if(this.skipDebounce) { + process(); + this.skipDebounce = false; + } this.debounceTimer = setTimeout(process, this.debounceInterval); diff --git a/pkg/npm/http-api/test/default.test.ts b/pkg/npm/http-api/test/default.test.ts index 719f10efb..3d13d1f5f 100644 --- a/pkg/npm/http-api/test/default.test.ts +++ b/pkg/npm/http-api/test/default.test.ts @@ -1,8 +1,167 @@ import Urbit from '../src'; +import { Readable } from 'streams'; -describe('blah', () => { - it('works', () => { - const connection = new Urbit('~sampel-palnet', '+code'); - expect(connection).toEqual(2); +function fakeSSE(messages = [], timeout = 0) { + const ourMessages = [...messages]; + const enc = new TextEncoder(); + return new ReadableStream({ + start(controller) { + const interval = setInterval(() => { + let message = ':\n'; + if (ourMessages.length > 0) { + message = ourMessages.shift(); + } + + controller.enqueue(enc.encode(message)); + }, 50); + + if (timeout > 0) { + setTimeout(() => { + controller.close(); + interval; + }, timeout); + } + }, + }); +} + +const ship = '~sampel-palnet'; +let eventId = 0; +function event(data: any) { + return `id:${eventId++}\ndata:${JSON.stringify(data)}\n\n`; +} + +function fact(id: number, data: any) { + return event({ + response: 'diff', + id, + json: data, + }); +} + +function ack(id: number, err = false) { + const res = err ? { err: 'Error' } : { ok: true }; + return event({ id, response: 'poke', ...res }); +} +const fakeFetch = (body) => () => + Promise.resolve({ + ok: true, + body: body(), + }); + +const wait = (ms: number) => new Promise((res) => setTimeout(res, ms)); + +process.on('unhandledRejection', () => { + console.error(error); +}); + +describe('Initialisation', () => { + let airlock: Urbit; + let fetchSpy; + beforeEach(() => { + airlock = new Urbit('', '+code'); + airlock.debounceInterval = 10; + }); + afterEach(() => { + fetchSpy.mockReset(); + }); + it('should poke & connect upon a 200', async () => { + airlock.onOpen = jest.fn(); + fetchSpy = jest.spyOn(window, 'fetch'); + fetchSpy + .mockImplementationOnce(() => + Promise.resolve({ ok: true, body: fakeSSE() }) + ) + .mockImplementationOnce(() => + Promise.resolve({ ok: true, body: fakeSSE() }) + ); + await airlock.eventSource(); + + expect(airlock.onOpen).toHaveBeenCalled(); + }, 500); + it('should handle failures', async () => { + fetchSpy = jest.spyOn(window, 'fetch'); + fetchSpy + .mockImplementation(() => + Promise.resolve({ ok: false, body: fakeSSE() }) + ) + airlock.onError = jest.fn(); + try { + await airlock.eventSource(); + wait(100); + } catch (e) { + expect(airlock.onError).toHaveBeenCalled(); + } + }, 200); +}); + +describe('subscription', () => { + let airlock: Urbit; + let fetchSpy: jest.SpyInstance; + beforeEach(() => { + eventId = 1; + }); + afterEach(() => { + fetchSpy.mockReset(); + }); + + it('should subscribe', async () => { + fetchSpy = jest.spyOn(window, 'fetch'); + airlock = new Urbit('', '+code'); + airlock.onOpen = jest.fn(); + const params = { + app: 'app', + path: '/path', + err: jest.fn(), + event: jest.fn(), + quit: jest.fn(), + }; + const firstEv = 'one'; + const secondEv = 'two'; + const events = (id) => [fact(id, firstEv), fact(id, secondEv)]; + fetchSpy.mockImplementation(fakeFetch(() => fakeSSE(events(1)))); + + await airlock.subscribe(params); + await wait(600); + + expect(airlock.onOpen).toBeCalled(); + expect(params.event).toHaveBeenNthCalledWith(1, firstEv); + expect(params.event).toHaveBeenNthCalledWith(2, secondEv); + }, 800); + it('should poke', async () => { + fetchSpy = jest.spyOn(window, 'fetch'); + airlock = new Urbit('', '+code'); + airlock.onOpen = jest.fn(); + fetchSpy.mockImplementation(fakeFetch(() => fakeSSE([ack(1)]))); + const params = { + app: 'app', + mark: 'mark', + json: { poke: 1 }, + onSuccess: jest.fn(), + onError: jest.fn(), + }; + await airlock.poke(params); + await wait(300); + expect(params.onSuccess).toHaveBeenCalled(); + }, 800); + + it('should nack poke', async () => { + fetchSpy = jest.spyOn(window, 'fetch'); + airlock = new Urbit('', '+code'); + airlock.onOpen = jest.fn(); + fetchSpy.mockImplementation(fakeFetch(() => fakeSSE([ack(1, true)]))); + const params = { + app: 'app', + mark: 'mark', + json: { poke: 1 }, + onSuccess: jest.fn(), + onError: jest.fn(), + }; + try { + await airlock.poke(params); + await wait(300); + } catch (e) { + expect(params.onError).toHaveBeenCalled(); + } }); });