feat: jwt performance

This commit is contained in:
DarkSky 2022-07-26 17:11:01 +08:00
parent 3ec95e4377
commit c39814047a
9 changed files with 204 additions and 173 deletions

View File

@ -82,6 +82,7 @@ export abstract class ServiceBaseClass {
workspace: string,
blockId: string
): Promise<BlockImplInstance | undefined> {
if (!blockId) return undefined;
const db = await this.database.getDatabase(workspace);
const db_block = await db.get(blockId as 'block');
if (db_block.id !== blockId) {

View File

@ -19,7 +19,7 @@ const loading = new Set();
const waitLoading = async (key: string) => {
while (loading.has(key)) {
await sleep();
await sleep(50);
}
};
@ -53,7 +53,6 @@ async function _getBlockDatabase(
if (!workspaces[workspace]) {
loading.add(workspace);
workspaces[workspace] = await BlockClient.init(workspace, {
...options,
token: await _getCurrentToken(),

View File

@ -0,0 +1,77 @@
import * as Y from 'yjs';
import * as bc from 'lib0/broadcastchannel';
import * as encoding from 'lib0/encoding';
import * as syncProtocol from 'y-protocols/sync';
import * as awarenessProtocol from 'y-protocols/awareness';
import { Message } from './handler';
import { readMessage } from './processor';
import { WebsocketProvider } from './provider';
export const registerBroadcastSubscriber = (
provider: WebsocketProvider,
awareness: awarenessProtocol.Awareness,
document: Y.Doc
) => {
const channel = provider.broadcastChannel;
const subscriber = (data: ArrayBuffer, origin: any) => {
if (origin !== provider) {
const encoder = readMessage(provider, new Uint8Array(data), false);
if (encoding.length(encoder) > 1) {
bc.publish(channel, encoding.toUint8Array(encoder), provider);
}
}
};
bc.subscribe(channel, subscriber);
let connected = true;
// send sync step1 to bc
// write sync step 1
const encoderSync = encoding.createEncoder();
encoding.writeVarUint(encoderSync, Message.sync);
syncProtocol.writeSyncStep1(encoderSync, document);
bc.publish(channel, encoding.toUint8Array(encoderSync), this);
// broadcast local state
const encoderState = encoding.createEncoder();
encoding.writeVarUint(encoderState, Message.sync);
syncProtocol.writeSyncStep2(encoderState, document);
bc.publish(channel, encoding.toUint8Array(encoderState), this);
// write queryAwareness
const encoderAwarenessQuery = encoding.createEncoder();
encoding.writeVarUint(encoderAwarenessQuery, Message.queryAwareness);
bc.publish(channel, encoding.toUint8Array(encoderAwarenessQuery), this);
// broadcast local awareness state
const encoderAwarenessState = encoding.createEncoder();
encoding.writeVarUint(encoderAwarenessState, Message.awareness);
encoding.writeVarUint8Array(
encoderAwarenessState,
awarenessProtocol.encodeAwarenessUpdate(awareness, [document.clientID])
);
bc.publish(channel, encoding.toUint8Array(encoderAwarenessState), this);
const broadcastMessage = (buf: ArrayBuffer) => {
if (connected) bc.publish(channel, buf, provider);
};
const disconnect = () => {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, Message.awareness);
encoding.writeVarUint8Array(
encoder,
awarenessProtocol.encodeAwarenessUpdate(
awareness,
[document.clientID],
new Map()
)
);
broadcastMessage(encoding.toUint8Array(encoder));
if (connected) {
bc.unsubscribe(channel, subscriber);
connected = false;
}
};
return { broadcastMessage, disconnect };
};

View File

@ -0,0 +1,80 @@
import * as Y from 'yjs';
import * as encoding from 'lib0/encoding';
import * as decoding from 'lib0/decoding';
import * as syncProtocol from 'y-protocols/sync';
import * as awarenessProtocol from 'y-protocols/awareness';
import { Message } from './handler';
import { WebsocketProvider } from './provider';
export const readMessage = (
provider: WebsocketProvider,
buf: Uint8Array,
emitSynced: boolean
): encoding.Encoder => {
const decoder = decoding.createDecoder(buf);
const encoder = encoding.createEncoder();
const messageType = decoding.readVarUint(decoder) as Message;
const messageHandler = provider.messageHandlers[messageType];
if (/** @type {any} */ messageHandler) {
messageHandler(encoder, decoder, provider, emitSynced, messageType);
} else {
console.error('Unable to compute message');
}
return encoder;
};
export const registerUpdateHandler = (
provider: WebsocketProvider,
awareness: awarenessProtocol.Awareness,
doc: Y.Doc,
broadcastMessage: (buf: ArrayBuffer) => void
) => {
const beforeUnloadHandler = () => {
awarenessProtocol.removeAwarenessStates(
awareness,
[doc.clientID],
'window unload'
);
};
const awarenessUpdateHandler = ({ added, updated, removed }: any) => {
const changedClients = added.concat(updated).concat(removed);
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, Message.awareness);
encoding.writeVarUint8Array(
encoder,
awarenessProtocol.encodeAwarenessUpdate(awareness, changedClients)
);
broadcastMessage(encoding.toUint8Array(encoder));
};
// Listens to Yjs updates and sends them to remote peers (ws and broadcastchannel)
const documentUpdateHandler = (update: Uint8Array, origin: any) => {
if (origin !== provider) {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, Message.sync);
syncProtocol.writeUpdate(encoder, update);
broadcastMessage(encoding.toUint8Array(encoder));
}
};
if (typeof window !== 'undefined') {
window.addEventListener('beforeunload', beforeUnloadHandler);
} else if (typeof process !== 'undefined') {
process.on('exit', beforeUnloadHandler);
}
awareness.on('update', awarenessUpdateHandler);
doc.on('update', documentUpdateHandler);
return () => {
if (typeof window !== 'undefined') {
window.removeEventListener('beforeunload', beforeUnloadHandler);
} else if (typeof process !== 'undefined') {
process.off('exit', beforeUnloadHandler);
}
awareness.off('update', awarenessUpdateHandler);
doc.off('update', documentUpdateHandler);
};
};

View File

@ -6,11 +6,9 @@ import * as url from 'lib0/url';
import * as awarenessProtocol from 'y-protocols/awareness';
import { handler } from './handler';
import {
registerBroadcastSubscriber,
registerUpdateHandler,
registerWebsocket,
} from './connector';
import { registerBroadcastSubscriber } from './broadcast';
import { registerWebsocket } from './websocket';
import { registerUpdateHandler } from './processor';
/**
* Websocket Provider for Yjs. Creates a websocket connection to sync the shared document.

View File

@ -1,154 +1,12 @@
import * as Y from 'yjs';
import * as bc from 'lib0/broadcastchannel';
import * as time from 'lib0/time';
import * as encoding from 'lib0/encoding';
import * as decoding from 'lib0/decoding';
import * as syncProtocol from 'y-protocols/sync';
import * as awarenessProtocol from 'y-protocols/awareness';
import * as math from 'lib0/math';
import { WebsocketProvider } from './provider';
import { Message } from './handler';
export const readMessage = (
provider: WebsocketProvider,
buf: Uint8Array,
emitSynced: boolean
): encoding.Encoder => {
const decoder = decoding.createDecoder(buf);
const encoder = encoding.createEncoder();
const messageType = decoding.readVarUint(decoder) as Message;
const messageHandler = provider.messageHandlers[messageType];
if (/** @type {any} */ messageHandler) {
messageHandler(encoder, decoder, provider, emitSynced, messageType);
} else {
console.error('Unable to compute message');
}
return encoder;
};
export const registerBroadcastSubscriber = (
provider: WebsocketProvider,
awareness: awarenessProtocol.Awareness,
document: Y.Doc
) => {
const channel = provider.broadcastChannel;
const subscriber = (data: ArrayBuffer, origin: any) => {
if (origin !== provider) {
const encoder = readMessage(provider, new Uint8Array(data), false);
if (encoding.length(encoder) > 1) {
bc.publish(channel, encoding.toUint8Array(encoder), provider);
}
}
};
bc.subscribe(channel, subscriber);
let connected = true;
// send sync step1 to bc
// write sync step 1
const encoderSync = encoding.createEncoder();
encoding.writeVarUint(encoderSync, Message.sync);
syncProtocol.writeSyncStep1(encoderSync, document);
bc.publish(channel, encoding.toUint8Array(encoderSync), this);
// broadcast local state
const encoderState = encoding.createEncoder();
encoding.writeVarUint(encoderState, Message.sync);
syncProtocol.writeSyncStep2(encoderState, document);
bc.publish(channel, encoding.toUint8Array(encoderState), this);
// write queryAwareness
const encoderAwarenessQuery = encoding.createEncoder();
encoding.writeVarUint(encoderAwarenessQuery, Message.queryAwareness);
bc.publish(channel, encoding.toUint8Array(encoderAwarenessQuery), this);
// broadcast local awareness state
const encoderAwarenessState = encoding.createEncoder();
encoding.writeVarUint(encoderAwarenessState, Message.awareness);
encoding.writeVarUint8Array(
encoderAwarenessState,
awarenessProtocol.encodeAwarenessUpdate(awareness, [document.clientID])
);
bc.publish(channel, encoding.toUint8Array(encoderAwarenessState), this);
const broadcastMessage = (buf: ArrayBuffer) => {
if (connected) bc.publish(channel, buf, provider);
};
const disconnect = () => {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, Message.awareness);
encoding.writeVarUint8Array(
encoder,
awarenessProtocol.encodeAwarenessUpdate(
awareness,
[document.clientID],
new Map()
)
);
broadcastMessage(encoding.toUint8Array(encoder));
if (connected) {
bc.unsubscribe(channel, subscriber);
connected = false;
}
};
return { broadcastMessage, disconnect };
};
export const registerUpdateHandler = (
provider: WebsocketProvider,
awareness: awarenessProtocol.Awareness,
doc: Y.Doc,
broadcastMessage: (buf: ArrayBuffer) => void
) => {
const beforeUnloadHandler = () => {
awarenessProtocol.removeAwarenessStates(
awareness,
[doc.clientID],
'window unload'
);
};
const awarenessUpdateHandler = ({ added, updated, removed }: any) => {
const changedClients = added.concat(updated).concat(removed);
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, Message.awareness);
encoding.writeVarUint8Array(
encoder,
awarenessProtocol.encodeAwarenessUpdate(awareness, changedClients)
);
broadcastMessage(encoding.toUint8Array(encoder));
};
// Listens to Yjs updates and sends them to remote peers (ws and broadcastchannel)
const documentUpdateHandler = (update: Uint8Array, origin: any) => {
if (origin !== provider) {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, Message.sync);
syncProtocol.writeUpdate(encoder, update);
broadcastMessage(encoding.toUint8Array(encoder));
}
};
if (typeof window !== 'undefined') {
window.addEventListener('beforeunload', beforeUnloadHandler);
} else if (typeof process !== 'undefined') {
process.on('exit', beforeUnloadHandler);
}
awareness.on('update', awarenessUpdateHandler);
doc.on('update', documentUpdateHandler);
return () => {
if (typeof window !== 'undefined') {
window.removeEventListener('beforeunload', beforeUnloadHandler);
} else if (typeof process !== 'undefined') {
process.off('exit', beforeUnloadHandler);
}
awareness.off('update', awarenessUpdateHandler);
doc.off('update', documentUpdateHandler);
};
};
import { readMessage } from './processor';
import { WebsocketProvider } from './provider';
enum WebSocketState {
disconnected = 0,
@ -159,22 +17,41 @@ enum WebSocketState {
// @todo - this should depend on awareness.outdatedTime
const WEBSOCKET_RECONNECT = 30000;
const GET_TOKEN_BASELINE_TIMEOUT = 500;
const _getToken = async (
remote: string,
token: string,
existsProtocol?: string,
reconnect = 3
reconnect = 3,
timeout = 500
) => {
if (existsProtocol && reconnect > 0) {
return { protocol: existsProtocol };
}
const url = new URL(remote);
url.protocol = window.location.protocol;
return fetch(url, { method: 'POST', headers: { token } }).then(r =>
r.json()
const controller = new AbortController();
const id = setTimeout(
() => controller.abort(),
GET_TOKEN_BASELINE_TIMEOUT + timeout
);
const resp = await fetch(url, {
method: 'POST',
headers: { token },
signal: controller.signal,
});
clearTimeout(id);
return resp.json();
};
const _getTimeout = (provider: WebsocketProvider) =>
math.min(
math.pow(2, provider.wsUnsuccessfulReconnects) * 100,
provider.maxBackOffTime
);
export const registerWebsocket = (
provider: WebsocketProvider,
token: string,
@ -187,7 +64,13 @@ export const registerWebsocket = (
let websocket: WebSocket | undefined = undefined;
_getToken(provider.url, token, existsProtocol, reconnect)
_getToken(
provider.url,
token,
existsProtocol,
reconnect,
_getTimeout(provider)
)
.then(({ protocol }) => {
websocket = new WebSocket(provider.url, protocol);
websocket.binaryType = 'arraybuffer';
@ -233,17 +116,12 @@ export const registerWebsocket = (
} else {
provider.wsUnsuccessfulReconnects++;
}
if (reconnect <= 0) {
provider.emit('lost-connection', []);
}
if (reconnect <= 0) provider.emit('lost-connection', []);
// Start with no reconnect timeout and increase timeout by
// using exponential backoff starting with 100ms
setTimeout(
registerWebsocket,
math.min(
math.pow(2, provider.wsUnsuccessfulReconnects) * 100,
provider.maxBackOffTime
),
_getTimeout(provider),
provider,
token,
resyncInterval,
@ -284,16 +162,11 @@ export const registerWebsocket = (
provider.emit('status', [{ status: 'connecting' }]);
})
.catch(err => {
if (reconnect <= 0) {
provider.emit('lost-connection', []);
}
provider.emit('lost-connection', []);
provider.wsUnsuccessfulReconnects++;
setTimeout(
registerWebsocket,
math.min(
math.pow(2, provider.wsUnsuccessfulReconnects) * 100,
provider.maxBackOffTime
),
_getTimeout(provider),
provider,
token,
resyncInterval,

View File

@ -115,8 +115,8 @@ async function _initYjsDatabase(
const doc = new Doc({ autoLoad: true, shouldLoad: true });
const idb = await new IndexeddbPersistence(workspace, doc).whenSynced;
const [awareness, ws] = await _initWebsocketProvider(
const idbp = new IndexeddbPersistence(workspace, doc).whenSynced;
const wsp = _initWebsocketProvider(
backend,
workspace,
doc,
@ -124,6 +124,8 @@ async function _initYjsDatabase(
params
);
const [idb, [awareness, ws]] = await Promise.all([idbp, wsp]);
const binaries = new Doc({ autoLoad: true, shouldLoad: true });
const binariesIdb = await new IndexeddbPersistence(
`${workspace}_binaries`,
@ -410,6 +412,7 @@ export class YjsAdapter implements AsyncDatabaseAdapter<YjsContentOperation> {
binary?: ArrayBufferLike;
}
): Promise<YjsBlockInstance> {
console.trace('createBlock', options);
const uuid = options.uuid || `affine${nanoid(16)}`;
if (options.type === BlockTypes.binary) {
if (options.binary && options.binary instanceof ArrayBuffer) {

View File

@ -51,7 +51,7 @@ export function isBlock(obj: any) {
}
export function sleep() {
return new Promise(resolve => setTimeout(resolve, 500));
return new Promise(resolve => setTimeout(resolve, 100));
}
export { BlockEventBus } from './event-bus';

View File

@ -23,7 +23,7 @@ export function getPageId() {
return path ? path[2] : undefined;
}
export async function sleep(delay?: number) {
export async function sleep(delay = 100) {
return new Promise(res => {
window.setTimeout(() => {
res(true);