@urbit/http-api: fixup reconnect behaviour

This commit is contained in:
Liam Fitzgerald 2021-06-14 09:52:34 +10:00
parent fa79c3a34f
commit 3d8146b358
No known key found for this signature in database
GPG Key ID: D390E12C61D1CFFB
2 changed files with 227 additions and 71 deletions

View File

@ -1,30 +1,29 @@
import { isBrowser, isNode } from 'browser-or-node';
import { Action, Scry, Thread } from '@urbit/api';
import { fetchEventSource, EventSourceMessage, EventStreamContentType } from '@microsoft/fetch-event-source';
import { AuthenticationInterface, SubscriptionInterface, CustomEventHandler, PokeInterface, SubscriptionRequestInterface, headers, UrbitInterface, SSEOptions, PokeHandlers, Message } from './types';
import { Action, Scry, Thread, AuthenticationInterface, SubscriptionInterface, CustomEventHandler, PokeInterface, SubscriptionRequestInterface, headers, SSEOptions, PokeHandlers, Message } from './types';
import { uncamelize, hexString } from './utils';
/**
* A class for interacting with an urbit ship, given its URL and code
*/
export class Urbit implements UrbitInterface {
export class Urbit {
/**
* UID will be used for the channel: The current unix time plus a random hex string
*/
uid: string = `${Math.floor(Date.now() / 1000)}-${hexString(6)}`;
private uid: string = `${Math.floor(Date.now() / 1000)}-${hexString(6)}`;
/**
* Last Event ID is an auto-updated index of which events have been sent over this channel
*/
lastEventId: number = 0;
private lastEventId: number = 0;
lastAcknowledgedEventId: number = 0;
private lastAcknowledgedEventId: number = 0;
/**
* SSE Client is null for now; we don't want to start polling until it the channel exists
*/
sseClientInitialized: boolean = false;
private sseClientInitialized: boolean = false;
/**
* Cookie gets set when we log in.
@ -40,7 +39,7 @@ export class Urbit implements UrbitInterface {
* removed after calling the success or failure function.
*/
outstandingPokes: Map<number, PokeHandlers> = new Map();
private outstandingPokes: Map<number, PokeHandlers> = new Map();
/**
* A registry of requestId to subscription functions.
@ -51,8 +50,7 @@ export class Urbit implements UrbitInterface {
* subscription is available, which may be 0, 1, or many times. The
* disconnect function may be called exactly once.
*/
outstandingSubscriptions: Map<number, SubscriptionRequestInterface> = new Map();
private outstandingSubscriptions: Map<number, SubscriptionRequestInterface> = new Map();
/**
* Ship can be set, in which case we can do some magic stuff like send chats
@ -64,14 +62,23 @@ export class Urbit implements UrbitInterface {
*/
verbose?: boolean;
/**
* number of consecutive errors in connecting to the eventsource
*/
private errorCount = 0;
onError?: (error: any) => void = null;
onRetry?: () => void = null;
onOpen?: () => void = null;
/** This is basic interpolation to get the channel URL of an instantiated Urbit connection. */
get channelUrl(): string {
private get channelUrl(): string {
return `${this.url}/~/channel/${this.uid}`;
}
get fetchOptions(): any {
private get fetchOptions(): any {
const headers: headers = {
'Content-Type': 'application/json',
};
@ -88,7 +95,9 @@ export class Urbit implements UrbitInterface {
/**
* Constructs a new Urbit connection.
*
* @param url The URL (with protocol and port) of the ship to be accessed
* @param url The URL (with protocol and port) of the ship to be accessed. If
* the airlock is running in a webpage served by the ship, this should just
* be the empty string.
* @param code The access code for the ship at that address
*/
constructor(
@ -108,7 +117,6 @@ export class Urbit implements UrbitInterface {
* that is ready to go. It `|hi`s itself to create the channel,
* then opens the channel via EventSource.
*
* @param AuthenticationInterface
*/
static async authenticate({ ship, url, code, verbose = false }: AuthenticationInterface) {
const airlock = new Urbit(`http://${url}`, code);
@ -173,23 +181,15 @@ export class Urbit implements UrbitInterface {
console.log('Opened eventsource', response);
}
if (response.ok) {
this.errorCount = 0;
this.onOpen && this.onOpen();
resolve();
return; // everything's good
} else if (response.status >= 400 && response.status < 500 && response.status !== 429) {
reject();
if (this.onError) {
this.onError(response.statusText);
} else {
throw new Error();
}
} else {
reject();
if (this.onError) {
this.onError(response.statusText);
} else {
throw new Error();
}
}
this.onError && this.onError(response?.statusText || 'Unknown Error');
throw new Error();
}
},
onmessage: (event: EventSourceMessage) => {
if (this.verbose) {
@ -235,12 +235,18 @@ export class Urbit implements UrbitInterface {
}
},
onerror: (error) => {
if (this.onError) {
this.onError(error);
} else {
throw error;
if(this.errorCount++ < 5) {
console.log(this.errorCount);
this.onRetry && this.onRetry();
return Math.pow(2, this.errorCount - 1) * 750;
}
}
this.onError && this.onError(error);
throw error;
},
onclose: () => {
throw Error('Ship unexpectedly closed the connection');
},
});
this.sseClientInitialized = true;
}
@ -248,10 +254,23 @@ export class Urbit implements UrbitInterface {
});
}
/**
* Reset airlock, abandoning current subscriptions and wiping state
*
*/
reset() {
this.uid = `${Math.floor(Date.now() / 1000)}-${hexString(6)}`;
this.lastEventId = 0;
this.lastAcknowledgedEventId = 0;
this.outstandingSubscriptions = new Map();
this.outstandingPokes = new Map();
this.sseClientInitialized = false;
}
/**
* Autoincrements the next event ID for the appropriate channel.
*/
getEventId(): number {
private getEventId(): number {
this.lastEventId = Number(this.lastEventId) + 1;
return this.lastEventId;
}
@ -261,7 +280,7 @@ export class Urbit implements UrbitInterface {
*
* @param eventId The event to acknowledge.
*/
async ack(eventId: number): Promise<number | void> {
private async ack(eventId: number): Promise<number | void> {
const message: Message = {
action: 'ack',
'event-id': eventId
@ -291,18 +310,18 @@ export class Urbit implements UrbitInterface {
// return id;
// }
outstandingJSON: Message[] = [];
private outstandingJSON: Message[] = [];
debounceTimer: any = null;
debounceInterval = 500;
calm = true;
private debounceTimer: any = null;
private debounceInterval = 500;
private calm = true;
sendJSONtoChannel(json: Message): Promise<boolean | void> {
private sendJSONtoChannel(json: Message): Promise<boolean | void> {
this.outstandingJSON.push(json);
return this.processQueue();
}
processQueue(): Promise<boolean | void> {
private processQueue(): Promise<boolean | void> {
return new Promise(async (resolve, reject) => {
const process = async () => {
if (this.calm) {
@ -454,6 +473,7 @@ export class Urbit implements UrbitInterface {
/**
* Subscribes to a path on an app on a ship.
*
*
* @param app The app to subsribe to
* @param path The path to which to subscribe
* @param handlers Handlers to deal with various events of the subscription
@ -521,22 +541,36 @@ export class Urbit implements UrbitInterface {
}
/**
* Scry into an gall agent at a path
*
* @param app The app into which to scry
* @param path The path at which to scry
* @typeParam T - Type of the scry result
*
* @remarks
*
* Equivalent to
* ```hoon
* .^(T %gx /(scot %p our)/[app]/(scot %da now)/[path]/json)
* ```
* The returned cage must have a conversion to JSON for the scry to succeed
*
* @param params The scry request
* @returns The scry result
*/
async scry(params: Scry): Promise<void | any> {
async scry<T = any>(params: Scry): Promise<T> {
const { app, path } = params;
const response = await fetch(`${this.url}/~/scry/${app}${path}.json`, this.fetchOptions);
return await response.json();
}
/**
* Run a thread
*
*
* @param inputMark The mark of the data being sent
* @param outputMark The mark of the data being returned
* @param threadName The thread to run
* @param body The data to send to the thread
* @returns The return value of the thread
*/
async thread<R, T = any>(params: Thread<T>): Promise<R> {
const { inputMark, outputMark, threadName, body } = params;

View File

@ -1,4 +1,127 @@
import { Action, Poke, Scry, Thread } from '@urbit/api';
/**
* An urbit style path, rendered as a Javascript string
* @example
* `"/updates"`
*/
export type Path = string;
/**
* @p including leading sig, rendered as a string
*
* @example
* ```typescript
* "~sampel-palnet"
* ```
*
*/
export type Patp = string;
/**
* @p not including leading sig, rendered as a string
*
* @example
* ```typescript
* "sampel-palnet"
* ```
*
*/
export type PatpNoSig = string;
/**
* The name of a clay mark, as a string
*
* @example
* ```typescript
* "graph-update"
* ```
*/
export type Mark = string;
/**
* The name of a gall agent, as a string
*
* @example
*
* ```typescript
* "graph-store"
* ```
*/
export type GallAgent = string;
/**
* Description of an outgoing poke
*
* @typeParam Action - Typescript type of the data being poked
*/
export interface Poke<Action> {
/**
* Ship to poke. If left empty, the api lib will populate it with the ship that it is connected to.
*
* @remarks
*
* This should always be the ship that you are connected to
*
*/
ship?: PatpNoSig;
/**
*/
app: GallAgent;
/**
* Mark of the cage to be poked
*
*/
mark: Mark;
/**
* Vase of the cage of to be poked, as JSON
*/
json: Action;
}
/**
* Description of a scry request
*/
export interface Scry {
/** {@inheritDoc GallAgent} */
app: GallAgent;
/** {@inheritDoc Path} */
path: Path;
}
/**
* Description of a thread request
*
* @typeParam Action - Typescript type of the data being poked
*/
export interface Thread<Action> {
/**
* The mark of the input vase
*/
inputMark: Mark;
/**
* The mark of the output vase
*/
outputMark: Mark;
/**
* Name of the thread
*
* @example
* ```typescript
* "graph-add-nodes"
* ```
*/
threadName: string;
/**
* Data of the input vase
*/
body: Action;
}
export type Action = 'poke' | 'subscribe' | 'ack' | 'unsubscribe' | 'delete';
export interface PokeHandlers {
onSuccess?: () => void;
@ -14,17 +137,40 @@ export interface AuthenticationInterface {
verbose?: boolean;
}
/**
* Subscription event handlers
*
*/
export interface SubscriptionInterface {
/**
* Handle negative %watch-ack
*/
err?(error: any, id: string): void;
/**
* Handle %fact
*/
event?(data: any): void;
/**
* Handle %kick
*/
quit?(data: any): void;
}
export type OnceSubscriptionErr = 'quit' | 'nack' | 'timeout';
export type SubscriptionRequestInterface = SubscriptionInterface & {
app: string;
path: string;
export interface SubscriptionRequestInterface extends SubscriptionInterface {
/**
* The app to subscribe to
* @example
* `"graph-store"`
*/
app: GallAgent;
/**
* The path to which to subscribe
* @example
* `"/keys"`
*/
path: Path;
}
export interface headers {
@ -32,30 +178,6 @@ export interface headers {
Cookie?: string;
}
export interface UrbitInterface {
uid: string;
lastEventId: number;
lastAcknowledgedEventId: number;
sseClientInitialized: boolean;
cookie?: string | undefined;
outstandingPokes: Map<number, PokeHandlers>;
outstandingSubscriptions: Map<number, SubscriptionRequestInterface>;
verbose?: boolean;
ship?: string | null;
onError?: (error: any) => void;
connect(): void;
connect(): Promise<void>;
eventSource(): void;
getEventId(): number;
ack(eventId: number): Promise<void | number>;
// sendMessage(action: Action, data?: object): Promise<void | number>;
poke<T>(params: PokeInterface<T>): Promise<number>;
subscribe(params: SubscriptionRequestInterface): Promise<number>;
unsubscribe(subscription: number): Promise<boolean | void>;
delete(): void;
scry(params: Scry): Promise<void | any>;
thread<T>(params: Thread<T>): Promise<T>;
}
export interface CustomEventHandler {
(data: any, response: string): void;