mirror of
https://github.com/toeverything/AFFiNE.git
synced 2024-11-24 00:32:44 +03:00
refactor: use keck from OctoBase (#1761)
This commit is contained in:
parent
c9bd4e34b3
commit
3fa7d17dca
@ -1,5 +1,5 @@
|
||||
import { KeckProvider } from '@affine/workspace/affine/keck';
|
||||
import { getLoginStorage } from '@affine/workspace/affine/login';
|
||||
import { WebsocketProvider } from '@affine/workspace/affine/sync';
|
||||
import { assertExists } from '@blocksuite/store';
|
||||
import { IndexeddbPersistence } from 'y-indexeddb';
|
||||
|
||||
@ -14,7 +14,7 @@ import { createBroadCastChannelProvider } from './broad-cast-channel';
|
||||
const createAffineWebSocketProvider = (
|
||||
blockSuiteWorkspace: BlockSuiteWorkspace
|
||||
): AffineWebSocketProvider => {
|
||||
let webSocketProvider: WebsocketProvider | null = null;
|
||||
let webSocketProvider: KeckProvider | null = null;
|
||||
return {
|
||||
flavour: 'affine-websocket',
|
||||
background: false,
|
||||
@ -27,7 +27,7 @@ const createAffineWebSocketProvider = (
|
||||
const wsUrl = `${
|
||||
window.location.protocol === 'https:' ? 'wss' : 'ws'
|
||||
}://${window.location.host}/api/sync/`;
|
||||
webSocketProvider = new WebsocketProvider(
|
||||
webSocketProvider = new KeckProvider(
|
||||
wsUrl,
|
||||
blockSuiteWorkspace.id,
|
||||
blockSuiteWorkspace.doc,
|
||||
@ -40,12 +40,12 @@ const createAffineWebSocketProvider = (
|
||||
connect: false,
|
||||
}
|
||||
);
|
||||
providerLogger.info('connect', webSocketProvider.roomname);
|
||||
providerLogger.info('connect', webSocketProvider.url);
|
||||
webSocketProvider.connect();
|
||||
},
|
||||
disconnect: () => {
|
||||
assertExists(webSocketProvider);
|
||||
providerLogger.info('disconnect', webSocketProvider.roomname);
|
||||
providerLogger.info('disconnect', webSocketProvider.url);
|
||||
webSocketProvider.destroy();
|
||||
webSocketProvider = null;
|
||||
},
|
||||
|
@ -6,7 +6,8 @@
|
||||
"./type": "./src/type.ts",
|
||||
"./affine/*": "./src/affine/*.ts",
|
||||
"./affine/api": "./src/affine/api/index.ts",
|
||||
"./affine/sync": "./src/affine/sync.js"
|
||||
"./affine/sync": "./src/affine/sync.js",
|
||||
"./affine/keck": "./src/affine/keck/index.ts"
|
||||
},
|
||||
"dependencies": {
|
||||
"@affine-test/fixtures": "workspace:*",
|
||||
@ -22,6 +23,8 @@
|
||||
"lib0": "^0.2.73",
|
||||
"react": "^18.2.0",
|
||||
"react-dom": "^18.2.0",
|
||||
"y-protocols": "^1.0.5",
|
||||
"yjs": "^13.5.51",
|
||||
"zod": "^3.21.4"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
@ -3,9 +3,9 @@ import {
|
||||
createWorkspaceApis,
|
||||
PermissionType,
|
||||
} from '@affine/workspace/affine/api';
|
||||
import { KeckProvider } from '@affine/workspace/affine/keck';
|
||||
import type { LoginResponse } from '@affine/workspace/affine/login';
|
||||
import { loginResponseSchema } from '@affine/workspace/affine/login';
|
||||
import { WebsocketProvider } from '@affine/workspace/affine/sync';
|
||||
import { createEmptyBlockSuiteWorkspace } from '@affine/workspace/utils';
|
||||
import user1 from '@affine-test/fixtures/built-in-user1.json';
|
||||
import user2 from '@affine-test/fixtures/built-in-user2.json';
|
||||
@ -13,7 +13,10 @@ import type { ParagraphBlockModel } from '@blocksuite/blocks/models';
|
||||
import type { Page, Text } from '@blocksuite/store';
|
||||
import { uuidv4, Workspace as BlockSuiteWorkspace } from '@blocksuite/store';
|
||||
import { beforeEach, describe, expect, test, vi } from 'vitest';
|
||||
import WebSocket from 'ws';
|
||||
import { WebSocket } from 'ws';
|
||||
|
||||
// @ts-expect-error
|
||||
globalThis.WebSocket = WebSocket;
|
||||
|
||||
const currentTokenRef = {
|
||||
current: null as LoginResponse | null,
|
||||
@ -85,40 +88,26 @@ describe('ydoc sync', () => {
|
||||
);
|
||||
BlockSuiteWorkspace.Y.applyUpdate(workspace1.doc, new Uint8Array(binary));
|
||||
BlockSuiteWorkspace.Y.applyUpdate(workspace2.doc, new Uint8Array(binary));
|
||||
const provider1 = new WebsocketProvider(
|
||||
wsUrl,
|
||||
workspace1.id,
|
||||
workspace1.doc,
|
||||
{
|
||||
// @ts-expect-error ignore the error
|
||||
WebSocketPolyfill: WebSocket,
|
||||
params: { token: user1Token.refresh },
|
||||
// @ts-expect-error ignore the type
|
||||
awareness: workspace1.awarenessStore.awareness,
|
||||
disableBc: true,
|
||||
connect: false,
|
||||
}
|
||||
);
|
||||
const provider1 = new KeckProvider(wsUrl, workspace1.id, workspace1.doc, {
|
||||
params: { token: user1Token.refresh },
|
||||
// @ts-expect-error ignore the type
|
||||
awareness: workspace1.awarenessStore.awareness,
|
||||
disableBc: true,
|
||||
connect: false,
|
||||
});
|
||||
|
||||
const provider2 = new WebsocketProvider(
|
||||
wsUrl,
|
||||
workspace2.id,
|
||||
workspace2.doc,
|
||||
{
|
||||
// @ts-expect-error ignore the error
|
||||
WebSocketPolyfill: WebSocket,
|
||||
params: { token: user2Token.refresh },
|
||||
// @ts-expect-error ignore the type
|
||||
awareness: workspace2.awarenessStore.awareness,
|
||||
disableBc: true,
|
||||
connect: false,
|
||||
}
|
||||
);
|
||||
const provider2 = new KeckProvider(wsUrl, workspace2.id, workspace2.doc, {
|
||||
params: { token: user2Token.refresh },
|
||||
// @ts-expect-error ignore the type
|
||||
awareness: workspace2.awarenessStore.awareness,
|
||||
disableBc: true,
|
||||
connect: false,
|
||||
});
|
||||
|
||||
provider1.connect();
|
||||
provider2.connect();
|
||||
|
||||
function waitForConnected(provider: WebsocketProvider) {
|
||||
function waitForConnected(provider: KeckProvider) {
|
||||
return new Promise<void>(resolve => {
|
||||
provider.once('status', ({ status }: any) => {
|
||||
expect(status).toBe('connected');
|
||||
|
5
packages/workspace/src/affine/keck/README.md
Normal file
5
packages/workspace/src/affine/keck/README.md
Normal file
@ -0,0 +1,5 @@
|
||||
# Keck
|
||||
|
||||
> This directory will be removed in the future once we publish the jwt library to npm.
|
||||
|
||||
The latest Keck code of AFFiNE is at https://github.com/toeverything/OctoBase/tree/master/libs/jwt
|
56
packages/workspace/src/affine/keck/handler.ts
Normal file
56
packages/workspace/src/affine/keck/handler.ts
Normal file
@ -0,0 +1,56 @@
|
||||
import * as decoding from 'lib0/decoding';
|
||||
import * as encoding from 'lib0/encoding';
|
||||
import * as awarenessProtocol from 'y-protocols/awareness';
|
||||
import * as syncProtocol from 'y-protocols/sync';
|
||||
|
||||
import type { KeckProvider } from '.';
|
||||
|
||||
export enum Message {
|
||||
sync = 0,
|
||||
awareness = 1,
|
||||
queryAwareness = 3,
|
||||
}
|
||||
|
||||
export type MessageCallback = (
|
||||
encoder: encoding.Encoder,
|
||||
decoder: decoding.Decoder,
|
||||
provider: KeckProvider,
|
||||
emitSynced: boolean,
|
||||
messageType: number
|
||||
) => void;
|
||||
|
||||
export const handler: Record<Message, MessageCallback> = {
|
||||
[Message.sync]: (encoder, decoder, provider, emitSynced) => {
|
||||
encoding.writeVarUint(encoder, Message.sync);
|
||||
const syncMessageType = syncProtocol.readSyncMessage(
|
||||
decoder,
|
||||
encoder,
|
||||
provider.doc,
|
||||
provider
|
||||
);
|
||||
if (
|
||||
emitSynced &&
|
||||
syncMessageType === syncProtocol.messageYjsSyncStep2 &&
|
||||
!provider.synced
|
||||
) {
|
||||
provider.synced = true;
|
||||
}
|
||||
},
|
||||
[Message.awareness]: (_encoder, decoder, provider) => {
|
||||
awarenessProtocol.applyAwarenessUpdate(
|
||||
provider.awareness,
|
||||
decoding.readVarUint8Array(decoder),
|
||||
provider
|
||||
);
|
||||
},
|
||||
[Message.queryAwareness]: (encoder, _decoder, provider) => {
|
||||
encoding.writeVarUint(encoder, Message.awareness);
|
||||
encoding.writeVarUint8Array(
|
||||
encoder,
|
||||
awarenessProtocol.encodeAwarenessUpdate(
|
||||
provider.awareness,
|
||||
Array.from(provider.awareness.getStates().keys())
|
||||
)
|
||||
);
|
||||
},
|
||||
};
|
286
packages/workspace/src/affine/keck/index.ts
Normal file
286
packages/workspace/src/affine/keck/index.ts
Normal file
@ -0,0 +1,286 @@
|
||||
import * as encoding from 'lib0/encoding';
|
||||
import * as math from 'lib0/math';
|
||||
import { Observable } from 'lib0/observable';
|
||||
import * as url from 'lib0/url';
|
||||
import * as awarenessProtocol from 'y-protocols/awareness';
|
||||
import * as syncProtocol from 'y-protocols/sync';
|
||||
import type * as Y from 'yjs';
|
||||
|
||||
import { handler, Message } from './handler';
|
||||
import { readMessage } from './processor';
|
||||
|
||||
// @todo - this should depend on awareness.outdatedTime
|
||||
const messageReconnectTimeout = 30000;
|
||||
|
||||
const setupWS = (provider: KeckProvider) => {
|
||||
if (provider.shouldConnect && provider.ws === null) {
|
||||
const websocket = new WebSocket(provider.url);
|
||||
websocket.binaryType = 'arraybuffer';
|
||||
provider.ws = websocket;
|
||||
provider.wsconnecting = true;
|
||||
provider.wsconnected = false;
|
||||
provider.synced = false;
|
||||
|
||||
websocket.onmessage = (event: any) => {
|
||||
provider.wsLastMessageReceived = Date.now();
|
||||
const encoder = readMessage(provider, new Uint8Array(event.data), true);
|
||||
if (encoding.length(encoder) > 1) {
|
||||
websocket.send(encoding.toUint8Array(encoder));
|
||||
}
|
||||
};
|
||||
websocket.onerror = (event: any) => {
|
||||
provider.emit('connection-error', [event, provider]);
|
||||
};
|
||||
websocket.onclose = (event: any) => {
|
||||
provider.emit('connection-close', [event, provider]);
|
||||
provider.ws = null;
|
||||
provider.wsconnecting = false;
|
||||
if (provider.wsconnected) {
|
||||
provider.wsconnected = false;
|
||||
provider.synced = false;
|
||||
// update awareness (all users except local left)
|
||||
awarenessProtocol.removeAwarenessStates(
|
||||
provider.awareness,
|
||||
Array.from(provider.awareness.getStates().keys()).filter(
|
||||
client => client !== provider.doc.clientID
|
||||
),
|
||||
provider
|
||||
);
|
||||
provider.emit('status', [
|
||||
{
|
||||
status: 'disconnected',
|
||||
},
|
||||
]);
|
||||
} else {
|
||||
provider.wsUnsuccessfulReconnects++;
|
||||
}
|
||||
// Start with no reconnect timeout and increase timeout by
|
||||
// using exponential backoff starting with 100ms
|
||||
setTimeout(
|
||||
setupWS,
|
||||
math.min(
|
||||
math.pow(2, provider.wsUnsuccessfulReconnects) * 100,
|
||||
provider.maxBackOffTime
|
||||
) + provider.extraToleranceTime,
|
||||
provider
|
||||
);
|
||||
};
|
||||
websocket.onopen = () => {
|
||||
provider.wsLastMessageReceived = Date.now();
|
||||
provider.wsconnecting = false;
|
||||
provider.wsconnected = true;
|
||||
provider.wsUnsuccessfulReconnects = 0;
|
||||
provider.emit('status', [
|
||||
{
|
||||
status: 'connected',
|
||||
},
|
||||
]);
|
||||
// always send sync step 1 when connected
|
||||
const encoder = encoding.createEncoder();
|
||||
encoding.writeVarUint(encoder, Message.sync);
|
||||
syncProtocol.writeSyncStep1(encoder, provider.doc);
|
||||
websocket.send(encoding.toUint8Array(encoder));
|
||||
// broadcast local awareness state
|
||||
if (provider.awareness.getLocalState() !== null) {
|
||||
const encoderAwarenessState = encoding.createEncoder();
|
||||
encoding.writeVarUint(encoderAwarenessState, Message.awareness);
|
||||
encoding.writeVarUint8Array(
|
||||
encoderAwarenessState,
|
||||
awarenessProtocol.encodeAwarenessUpdate(provider.awareness, [
|
||||
provider.doc.clientID,
|
||||
])
|
||||
);
|
||||
websocket.send(encoding.toUint8Array(encoderAwarenessState));
|
||||
}
|
||||
};
|
||||
provider.emit('status', [
|
||||
{
|
||||
status: 'connecting',
|
||||
},
|
||||
]);
|
||||
}
|
||||
};
|
||||
|
||||
const broadcastMessage = (provider: KeckProvider, buf: ArrayBuffer) => {
|
||||
const ws = provider.ws;
|
||||
if (provider.wsconnected && ws && ws.readyState === ws.OPEN) {
|
||||
ws.send(buf);
|
||||
}
|
||||
};
|
||||
|
||||
export class KeckProvider extends Observable<string> {
|
||||
doc: Y.Doc;
|
||||
awareness: awarenessProtocol.Awareness;
|
||||
url: any;
|
||||
messageHandlers: typeof handler;
|
||||
shouldConnect: boolean;
|
||||
ws: any;
|
||||
wsconnecting: boolean;
|
||||
wsconnected: boolean;
|
||||
wsLastMessageReceived: number;
|
||||
wsUnsuccessfulReconnects: any;
|
||||
maxBackOffTime: number;
|
||||
roomName: string;
|
||||
_synced: boolean;
|
||||
_resyncInterval: any;
|
||||
extraToleranceTime: number;
|
||||
_updateHandler: (update: Uint8Array, origin: any) => void;
|
||||
_awarenessUpdateHandler: ({ added, updated, removed }: any) => void;
|
||||
_unloadHandler: () => void;
|
||||
_checkInterval: NodeJS.Timer;
|
||||
|
||||
constructor(
|
||||
serverUrl: string,
|
||||
roomName: string,
|
||||
doc: Y.Doc,
|
||||
{
|
||||
connect = true,
|
||||
awareness = new awarenessProtocol.Awareness(doc),
|
||||
params = {},
|
||||
resyncInterval = -1,
|
||||
maxBackOffTime = 2500,
|
||||
extraToleranceTime = 0,
|
||||
} = {}
|
||||
) {
|
||||
super();
|
||||
// ensure that url is always ends with /
|
||||
while (serverUrl[serverUrl.length - 1] === '/') {
|
||||
serverUrl = serverUrl.slice(0, serverUrl.length - 1);
|
||||
}
|
||||
const encodedParams = url.encodeQueryParams(params);
|
||||
this.maxBackOffTime = maxBackOffTime;
|
||||
this.extraToleranceTime = extraToleranceTime;
|
||||
this.url =
|
||||
serverUrl +
|
||||
'/' +
|
||||
roomName +
|
||||
(encodedParams.length === 0 ? '' : '?' + encodedParams);
|
||||
this.roomName = roomName;
|
||||
this.doc = doc;
|
||||
this.awareness = awareness;
|
||||
this.wsconnected = false;
|
||||
this.wsconnecting = false;
|
||||
this.wsUnsuccessfulReconnects = 0;
|
||||
this.messageHandlers = handler;
|
||||
/**
|
||||
* @type {boolean}
|
||||
*/
|
||||
this._synced = false;
|
||||
/**
|
||||
* @type {WebSocket?}
|
||||
*/
|
||||
this.ws = null;
|
||||
this.wsLastMessageReceived = 0;
|
||||
/**
|
||||
* Whether to connect to other peers or not
|
||||
* @type {boolean}
|
||||
*/
|
||||
this.shouldConnect = connect;
|
||||
|
||||
this._resyncInterval = 0;
|
||||
if (resyncInterval > 0) {
|
||||
this._resyncInterval = /** @type {any} */ setInterval(() => {
|
||||
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
|
||||
// resend sync step 1
|
||||
const encoder = encoding.createEncoder();
|
||||
encoding.writeVarUint(encoder, Message.sync);
|
||||
syncProtocol.writeSyncStep1(encoder, doc);
|
||||
this.ws.send(encoding.toUint8Array(encoder));
|
||||
}
|
||||
}, resyncInterval);
|
||||
}
|
||||
|
||||
this._updateHandler = (update: Uint8Array, origin: any) => {
|
||||
if (origin !== this) {
|
||||
const encoder = encoding.createEncoder();
|
||||
encoding.writeVarUint(encoder, Message.sync);
|
||||
syncProtocol.writeUpdate(encoder, update);
|
||||
broadcastMessage(this, encoding.toUint8Array(encoder));
|
||||
}
|
||||
};
|
||||
this.doc.on('update', this._updateHandler);
|
||||
|
||||
this._awarenessUpdateHandler = ({ added, updated, removed }: any) => {
|
||||
const changedClients = added.concat(updated).concat(removed);
|
||||
const encoder = encoding.createEncoder();
|
||||
encoding.writeVarUint(encoder, Message.awareness);
|
||||
encoding.writeVarUint8Array(
|
||||
encoder,
|
||||
awarenessProtocol.encodeAwarenessUpdate(awareness, changedClients)
|
||||
);
|
||||
broadcastMessage(this, encoding.toUint8Array(encoder));
|
||||
};
|
||||
this._unloadHandler = () => {
|
||||
awarenessProtocol.removeAwarenessStates(
|
||||
this.awareness,
|
||||
[doc.clientID],
|
||||
'window unload'
|
||||
);
|
||||
};
|
||||
if (typeof window !== 'undefined') {
|
||||
window.addEventListener('unload', this._unloadHandler);
|
||||
} else if (typeof process !== 'undefined') {
|
||||
process.on('exit', this._unloadHandler);
|
||||
}
|
||||
awareness.on('update', this._awarenessUpdateHandler);
|
||||
this._checkInterval = /** @type {any} */ setInterval(() => {
|
||||
if (
|
||||
this.wsconnected &&
|
||||
messageReconnectTimeout < Date.now() - this.wsLastMessageReceived
|
||||
) {
|
||||
// no message received in a long time - not even your own awareness
|
||||
// updates (which are updated every 15 seconds)
|
||||
/** @type {WebSocket} */ this.ws.close();
|
||||
}
|
||||
}, messageReconnectTimeout / 10);
|
||||
if (connect) {
|
||||
this.connect();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @type {boolean}
|
||||
*/
|
||||
get synced() {
|
||||
return this._synced;
|
||||
}
|
||||
|
||||
set synced(state) {
|
||||
if (this._synced !== state) {
|
||||
this._synced = state;
|
||||
this.emit('synced', [state]);
|
||||
this.emit('sync', [state]);
|
||||
}
|
||||
}
|
||||
|
||||
override destroy() {
|
||||
if (this._resyncInterval !== 0) {
|
||||
clearInterval(this._resyncInterval);
|
||||
}
|
||||
clearInterval(this._checkInterval);
|
||||
this.disconnect();
|
||||
if (typeof window !== 'undefined') {
|
||||
window.removeEventListener('unload', this._unloadHandler);
|
||||
} else if (typeof process !== 'undefined') {
|
||||
process.off('exit', this._unloadHandler);
|
||||
}
|
||||
this.awareness.off('update', this._awarenessUpdateHandler);
|
||||
this.doc.off('update', this._updateHandler);
|
||||
super.destroy();
|
||||
}
|
||||
|
||||
disconnect() {
|
||||
this.shouldConnect = false;
|
||||
|
||||
if (this.ws !== null) {
|
||||
this.ws.close();
|
||||
}
|
||||
}
|
||||
|
||||
connect() {
|
||||
this.shouldConnect = true;
|
||||
if (!this.wsconnected && this.ws === null) {
|
||||
setupWS(this);
|
||||
}
|
||||
}
|
||||
}
|
22
packages/workspace/src/affine/keck/processor.ts
Normal file
22
packages/workspace/src/affine/keck/processor.ts
Normal file
@ -0,0 +1,22 @@
|
||||
import * as decoding from 'lib0/decoding';
|
||||
import * as encoding from 'lib0/encoding';
|
||||
|
||||
import type { KeckProvider } from '.';
|
||||
import type { Message } from './handler.js';
|
||||
|
||||
export const readMessage = (
|
||||
provider: KeckProvider,
|
||||
buf: Uint8Array,
|
||||
emitSynced: boolean
|
||||
): encoding.Encoder => {
|
||||
const decoder = decoding.createDecoder(buf);
|
||||
const encoder = encoding.createEncoder();
|
||||
const messageType = decoding.readVarUint(decoder) as Message;
|
||||
const messageHandler = provider.messageHandlers[messageType];
|
||||
if (messageHandler) {
|
||||
messageHandler(encoder, decoder, provider, emitSynced, messageType);
|
||||
} else {
|
||||
console.error('Unable to compute message');
|
||||
}
|
||||
return encoder;
|
||||
};
|
@ -1,514 +0,0 @@
|
||||
/* eslint-disable no-undef */
|
||||
/**
|
||||
* @module provider/websocket
|
||||
*/
|
||||
|
||||
/* eslint-env browser */
|
||||
|
||||
// import * as Y from 'yjs'; // eslint-disable-line
|
||||
import { toast } from '@affine/component';
|
||||
import * as bc from 'lib0/broadcastchannel';
|
||||
import * as decoding from 'lib0/decoding';
|
||||
import * as encoding from 'lib0/encoding';
|
||||
import * as math from 'lib0/math';
|
||||
import { Observable } from 'lib0/observable';
|
||||
import * as time from 'lib0/time';
|
||||
import * as url from 'lib0/url';
|
||||
import * as authProtocol from 'y-protocols/auth';
|
||||
import * as awarenessProtocol from 'y-protocols/awareness';
|
||||
import * as syncProtocol from 'y-protocols/sync';
|
||||
|
||||
export const messageSync = 0;
|
||||
export const messageQueryAwareness = 3;
|
||||
export const messageAwareness = 1;
|
||||
export const messageAuth = 2;
|
||||
|
||||
/**
|
||||
* encoder, decoder, provider, emitSynced, messageType
|
||||
* @type {Array<function(encoding.Encoder, decoding.Decoder, WebsocketProvider, boolean, number):void>}
|
||||
*/
|
||||
const messageHandlers = [];
|
||||
|
||||
messageHandlers[messageSync] = (
|
||||
encoder,
|
||||
decoder,
|
||||
provider,
|
||||
emitSynced,
|
||||
_messageType
|
||||
) => {
|
||||
encoding.writeVarUint(encoder, messageSync);
|
||||
const syncMessageType = syncProtocol.readSyncMessage(
|
||||
decoder,
|
||||
encoder,
|
||||
provider.doc,
|
||||
provider
|
||||
);
|
||||
if (
|
||||
emitSynced &&
|
||||
syncMessageType === syncProtocol.messageYjsSyncStep2 &&
|
||||
!provider.synced
|
||||
) {
|
||||
provider.synced = true;
|
||||
}
|
||||
};
|
||||
|
||||
messageHandlers[messageQueryAwareness] = (
|
||||
encoder,
|
||||
_decoder,
|
||||
provider,
|
||||
_emitSynced,
|
||||
_messageType
|
||||
) => {
|
||||
encoding.writeVarUint(encoder, messageAwareness);
|
||||
encoding.writeVarUint8Array(
|
||||
encoder,
|
||||
awarenessProtocol.encodeAwarenessUpdate(
|
||||
provider.awareness,
|
||||
Array.from(provider.awareness.getStates().keys())
|
||||
)
|
||||
);
|
||||
};
|
||||
|
||||
messageHandlers[messageAwareness] = (
|
||||
_encoder,
|
||||
decoder,
|
||||
provider,
|
||||
_emitSynced,
|
||||
_messageType
|
||||
) => {
|
||||
awarenessProtocol.applyAwarenessUpdate(
|
||||
provider.awareness,
|
||||
decoding.readVarUint8Array(decoder),
|
||||
provider
|
||||
);
|
||||
};
|
||||
|
||||
messageHandlers[messageAuth] = (
|
||||
_encoder,
|
||||
decoder,
|
||||
provider,
|
||||
_emitSynced,
|
||||
_messageType
|
||||
) => {
|
||||
authProtocol.readAuthMessage(decoder, provider.doc, (_ydoc, reason) =>
|
||||
permissionDeniedHandler(provider, reason)
|
||||
);
|
||||
};
|
||||
|
||||
// @todo - this should depend on awareness.outdatedTime
|
||||
const messageReconnectTimeout = 30000;
|
||||
|
||||
/**
|
||||
* @param {WebsocketProvider} provider
|
||||
* @param {string} reason
|
||||
*/
|
||||
const permissionDeniedHandler = (provider, reason) =>
|
||||
console.warn(`Permission denied to access ${provider.url}.\n${reason}`);
|
||||
|
||||
/**
|
||||
* @param {WebsocketProvider} provider
|
||||
* @param {Uint8Array} buf
|
||||
* @param {boolean} emitSynced
|
||||
* @return {encoding.Encoder}
|
||||
*/
|
||||
const readMessage = (provider, buf, emitSynced) => {
|
||||
const decoder = decoding.createDecoder(buf);
|
||||
const encoder = encoding.createEncoder();
|
||||
const messageType = decoding.readVarUint(decoder);
|
||||
const messageHandler = provider.messageHandlers[messageType];
|
||||
if (/** @type {any} */ (messageHandler)) {
|
||||
messageHandler(encoder, decoder, provider, emitSynced, messageType);
|
||||
} else {
|
||||
console.error('Unable to compute message');
|
||||
}
|
||||
return encoder;
|
||||
};
|
||||
|
||||
/**
|
||||
* @param {WebsocketProvider} provider
|
||||
*/
|
||||
const setupWS = provider => {
|
||||
// failed too many times to reconnect
|
||||
if (provider.wsUnsuccessfulReconnects >= 5) {
|
||||
toast('Unable to connect to server. Please try again later.');
|
||||
provider.shouldConnect = false;
|
||||
}
|
||||
if (provider.shouldConnect && provider.ws === null) {
|
||||
const websocket = new provider._WS(provider.url);
|
||||
websocket.binaryType = 'arraybuffer';
|
||||
provider.ws = websocket;
|
||||
provider.wsconnecting = true;
|
||||
provider.wsconnected = false;
|
||||
provider.synced = false;
|
||||
|
||||
websocket.onmessage = event => {
|
||||
provider.wsLastMessageReceived = time.getUnixTime();
|
||||
const encoder = readMessage(provider, new Uint8Array(event.data), true);
|
||||
if (encoding.length(encoder) > 1) {
|
||||
websocket.send(encoding.toUint8Array(encoder));
|
||||
}
|
||||
};
|
||||
websocket.onerror = event => {
|
||||
provider.emit('connection-error', [event, provider]);
|
||||
};
|
||||
websocket.onclose = event => {
|
||||
provider.emit('connection-close', [event, provider]);
|
||||
provider.ws = null;
|
||||
provider.wsconnecting = false;
|
||||
if (provider.wsconnected) {
|
||||
provider.wsconnected = false;
|
||||
provider.synced = false;
|
||||
// update awareness (all users except local left)
|
||||
awarenessProtocol.removeAwarenessStates(
|
||||
provider.awareness,
|
||||
Array.from(provider.awareness.getStates().keys()).filter(
|
||||
client => client !== provider.doc.clientID
|
||||
),
|
||||
provider
|
||||
);
|
||||
provider.emit('status', [
|
||||
{
|
||||
status: 'disconnected',
|
||||
},
|
||||
]);
|
||||
} else {
|
||||
provider.wsUnsuccessfulReconnects++;
|
||||
}
|
||||
// Start with no reconnect timeout and increase timeout by
|
||||
// using exponential backoff starting with 100ms
|
||||
setTimeout(
|
||||
setupWS,
|
||||
math.min(
|
||||
math.pow(2, provider.wsUnsuccessfulReconnects) * 100,
|
||||
provider.maxBackoffTime
|
||||
),
|
||||
provider
|
||||
);
|
||||
};
|
||||
websocket.onopen = () => {
|
||||
provider.wsLastMessageReceived = time.getUnixTime();
|
||||
provider.wsconnecting = false;
|
||||
provider.wsconnected = true;
|
||||
provider.wsUnsuccessfulReconnects = 0;
|
||||
provider.emit('status', [
|
||||
{
|
||||
status: 'connected',
|
||||
},
|
||||
]);
|
||||
// always send sync step 1 when connected
|
||||
const encoder = encoding.createEncoder();
|
||||
encoding.writeVarUint(encoder, messageSync);
|
||||
syncProtocol.writeSyncStep1(encoder, provider.doc);
|
||||
websocket.send(encoding.toUint8Array(encoder));
|
||||
// broadcast local awareness state
|
||||
if (provider.awareness.getLocalState() !== null) {
|
||||
const encoderAwarenessState = encoding.createEncoder();
|
||||
encoding.writeVarUint(encoderAwarenessState, messageAwareness);
|
||||
encoding.writeVarUint8Array(
|
||||
encoderAwarenessState,
|
||||
awarenessProtocol.encodeAwarenessUpdate(provider.awareness, [
|
||||
provider.doc.clientID,
|
||||
])
|
||||
);
|
||||
websocket.send(encoding.toUint8Array(encoderAwarenessState));
|
||||
}
|
||||
};
|
||||
|
||||
provider.emit('status', [
|
||||
{
|
||||
status: 'connecting',
|
||||
},
|
||||
]);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @param {WebsocketProvider} provider
|
||||
* @param {ArrayBuffer} buf
|
||||
*/
|
||||
const broadcastMessage = (provider, buf) => {
|
||||
if (provider.wsconnected) {
|
||||
/** @type {WebSocket} */ (provider.ws).send(buf);
|
||||
}
|
||||
if (provider.bcconnected) {
|
||||
bc.publish(provider.bcChannel, buf, provider);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Websocket Provider for Yjs. Creates a websocket connection to sync the shared document.
|
||||
* The document name is attached to the provided url. I.e. the following example
|
||||
* creates a websocket connection to http://localhost:1234/my-document-name
|
||||
*
|
||||
* @example
|
||||
* import * as Y from 'yjs'
|
||||
* import { WebsocketProvider } from 'y-websocket'
|
||||
* const doc = new Y.Doc()
|
||||
* const provider = new WebsocketProvider('http://localhost:1234', 'my-document-name', doc)
|
||||
*
|
||||
* @extends {Observable<string>}
|
||||
*/
|
||||
export class WebsocketProvider extends Observable {
|
||||
/**
|
||||
* @param {string} serverUrl
|
||||
* @param {string} roomname
|
||||
* @param {Y.Doc} doc
|
||||
* @param {object} [opts]
|
||||
* @param {boolean} [opts.connect]
|
||||
* @param {awarenessProtocol.Awareness} [opts.awareness]
|
||||
* @param {Object<string,string>} [opts.params]
|
||||
* @param {typeof WebSocket} [opts.WebSocketPolyfill] Optionall provide a WebSocket polyfill
|
||||
* @param {number} [opts.resyncInterval] Request server state every `resyncInterval` milliseconds
|
||||
* @param {number} [opts.maxBackoffTime] Maximum amount of time to wait before trying to reconnect (we try to reconnect using exponential backoff)
|
||||
* @param {boolean} [opts.disableBc] Disable cross-tab BroadcastChannel communication
|
||||
*/
|
||||
constructor(
|
||||
serverUrl,
|
||||
roomname,
|
||||
doc,
|
||||
{
|
||||
connect = true,
|
||||
awareness = new awarenessProtocol.Awareness(doc),
|
||||
params = {},
|
||||
WebSocketPolyfill = WebSocket,
|
||||
resyncInterval = -1,
|
||||
maxBackoffTime = 2500,
|
||||
disableBc = false,
|
||||
} = {}
|
||||
) {
|
||||
super();
|
||||
// ensure that url is always ends with /
|
||||
while (serverUrl[serverUrl.length - 1] === '/') {
|
||||
serverUrl = serverUrl.slice(0, serverUrl.length - 1);
|
||||
}
|
||||
const encodedParams = url.encodeQueryParams(params);
|
||||
this.maxBackoffTime = maxBackoffTime;
|
||||
this.bcChannel = serverUrl + '/' + roomname;
|
||||
this.url =
|
||||
serverUrl +
|
||||
'/' +
|
||||
roomname +
|
||||
(encodedParams.length === 0 ? '' : '?' + encodedParams);
|
||||
this.roomname = roomname;
|
||||
this.doc = doc;
|
||||
this._WS = WebSocketPolyfill;
|
||||
this.awareness = awareness;
|
||||
this.wsconnected = false;
|
||||
this.wsconnecting = false;
|
||||
this.bcconnected = false;
|
||||
this.disableBc = disableBc;
|
||||
this.wsUnsuccessfulReconnects = 0;
|
||||
this.messageHandlers = messageHandlers.slice();
|
||||
/**
|
||||
* @type {boolean}
|
||||
*/
|
||||
this._synced = false;
|
||||
/**
|
||||
* @type {WebSocket?}
|
||||
*/
|
||||
this.ws = null;
|
||||
this.wsLastMessageReceived = 0;
|
||||
/**
|
||||
* Whether to connect to other peers or not
|
||||
* @type {boolean}
|
||||
*/
|
||||
this.shouldConnect = connect;
|
||||
|
||||
/**
|
||||
* @type {number}
|
||||
*/
|
||||
this._resyncInterval = 0;
|
||||
if (resyncInterval > 0) {
|
||||
this._resyncInterval = /** @type {any} */ (
|
||||
setInterval(() => {
|
||||
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
|
||||
// resend sync step 1
|
||||
const encoder = encoding.createEncoder();
|
||||
encoding.writeVarUint(encoder, messageSync);
|
||||
syncProtocol.writeSyncStep1(encoder, doc);
|
||||
this.ws.send(encoding.toUint8Array(encoder));
|
||||
}
|
||||
}, resyncInterval)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {ArrayBuffer} data
|
||||
* @param {any} origin
|
||||
*/
|
||||
this._bcSubscriber = (data, origin) => {
|
||||
if (origin !== this) {
|
||||
const encoder = readMessage(this, new Uint8Array(data), false);
|
||||
if (encoding.length(encoder) > 1) {
|
||||
bc.publish(this.bcChannel, encoding.toUint8Array(encoder), this);
|
||||
}
|
||||
}
|
||||
};
|
||||
/**
|
||||
* Listens to Yjs updates and sends them to remote peers (ws and broadcastchannel)
|
||||
* @param {Uint8Array} update
|
||||
* @param {any} origin
|
||||
*/
|
||||
this._updateHandler = (update, origin) => {
|
||||
if (origin !== this) {
|
||||
const encoder = encoding.createEncoder();
|
||||
encoding.writeVarUint(encoder, messageSync);
|
||||
syncProtocol.writeUpdate(encoder, update);
|
||||
broadcastMessage(this, encoding.toUint8Array(encoder));
|
||||
}
|
||||
};
|
||||
this.doc.on('update', this._updateHandler);
|
||||
/**
|
||||
* @param {any} changed
|
||||
* @param {any} _origin
|
||||
*/
|
||||
this._awarenessUpdateHandler = ({ added, updated, removed }, _origin) => {
|
||||
const changedClients = added.concat(updated).concat(removed);
|
||||
const encoder = encoding.createEncoder();
|
||||
encoding.writeVarUint(encoder, messageAwareness);
|
||||
encoding.writeVarUint8Array(
|
||||
encoder,
|
||||
awarenessProtocol.encodeAwarenessUpdate(awareness, changedClients)
|
||||
);
|
||||
broadcastMessage(this, encoding.toUint8Array(encoder));
|
||||
};
|
||||
this._unloadHandler = () => {
|
||||
awarenessProtocol.removeAwarenessStates(
|
||||
this.awareness,
|
||||
[doc.clientID],
|
||||
'window unload'
|
||||
);
|
||||
};
|
||||
if (typeof window !== 'undefined') {
|
||||
window.addEventListener('unload', this._unloadHandler);
|
||||
} else if (typeof process !== 'undefined') {
|
||||
process.on('exit', this._unloadHandler);
|
||||
}
|
||||
awareness.on('update', this._awarenessUpdateHandler);
|
||||
this._checkInterval = /** @type {any} */ (
|
||||
setInterval(() => {
|
||||
if (
|
||||
this.wsconnected &&
|
||||
messageReconnectTimeout <
|
||||
time.getUnixTime() - this.wsLastMessageReceived
|
||||
) {
|
||||
// no message received in a long time - not even your own awareness
|
||||
// updates (which are updated every 15 seconds)
|
||||
/** @type {WebSocket} */ (this.ws).close();
|
||||
}
|
||||
}, messageReconnectTimeout / 10)
|
||||
);
|
||||
if (connect) {
|
||||
this.connect();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @type {boolean}
|
||||
*/
|
||||
get synced() {
|
||||
return this._synced;
|
||||
}
|
||||
|
||||
set synced(state) {
|
||||
if (this._synced !== state) {
|
||||
this._synced = state;
|
||||
this.emit('synced', [state]);
|
||||
this.emit('sync', [state]);
|
||||
}
|
||||
}
|
||||
|
||||
destroy() {
|
||||
if (this._resyncInterval !== 0) {
|
||||
clearInterval(this._resyncInterval);
|
||||
}
|
||||
clearInterval(this._checkInterval);
|
||||
this.disconnect();
|
||||
if (typeof window !== 'undefined') {
|
||||
window.removeEventListener('unload', this._unloadHandler);
|
||||
} else if (typeof process !== 'undefined') {
|
||||
process.off('exit', this._unloadHandler);
|
||||
}
|
||||
this.awareness.off('update', this._awarenessUpdateHandler);
|
||||
this.doc.off('update', this._updateHandler);
|
||||
super.destroy();
|
||||
}
|
||||
|
||||
connectBc() {
|
||||
if (this.disableBc) {
|
||||
return;
|
||||
}
|
||||
if (!this.bcconnected) {
|
||||
bc.subscribe(this.bcChannel, this._bcSubscriber);
|
||||
this.bcconnected = true;
|
||||
}
|
||||
// send sync step1 to bc
|
||||
// write sync step 1
|
||||
const encoderSync = encoding.createEncoder();
|
||||
encoding.writeVarUint(encoderSync, messageSync);
|
||||
syncProtocol.writeSyncStep1(encoderSync, this.doc);
|
||||
bc.publish(this.bcChannel, encoding.toUint8Array(encoderSync), this);
|
||||
// broadcast local state
|
||||
const encoderState = encoding.createEncoder();
|
||||
encoding.writeVarUint(encoderState, messageSync);
|
||||
syncProtocol.writeSyncStep2(encoderState, this.doc);
|
||||
bc.publish(this.bcChannel, encoding.toUint8Array(encoderState), this);
|
||||
// write queryAwareness
|
||||
const encoderAwarenessQuery = encoding.createEncoder();
|
||||
encoding.writeVarUint(encoderAwarenessQuery, messageQueryAwareness);
|
||||
bc.publish(
|
||||
this.bcChannel,
|
||||
encoding.toUint8Array(encoderAwarenessQuery),
|
||||
this
|
||||
);
|
||||
// broadcast local awareness state
|
||||
const encoderAwarenessState = encoding.createEncoder();
|
||||
encoding.writeVarUint(encoderAwarenessState, messageAwareness);
|
||||
encoding.writeVarUint8Array(
|
||||
encoderAwarenessState,
|
||||
awarenessProtocol.encodeAwarenessUpdate(this.awareness, [
|
||||
this.doc.clientID,
|
||||
])
|
||||
);
|
||||
bc.publish(
|
||||
this.bcChannel,
|
||||
encoding.toUint8Array(encoderAwarenessState),
|
||||
this
|
||||
);
|
||||
}
|
||||
|
||||
disconnectBc() {
|
||||
// broadcast message with local awareness state set to null (indicating disconnect)
|
||||
const encoder = encoding.createEncoder();
|
||||
encoding.writeVarUint(encoder, messageAwareness);
|
||||
encoding.writeVarUint8Array(
|
||||
encoder,
|
||||
awarenessProtocol.encodeAwarenessUpdate(
|
||||
this.awareness,
|
||||
[this.doc.clientID],
|
||||
new Map()
|
||||
)
|
||||
);
|
||||
broadcastMessage(this, encoding.toUint8Array(encoder));
|
||||
if (this.bcconnected) {
|
||||
bc.unsubscribe(this.bcChannel, this._bcSubscriber);
|
||||
this.bcconnected = false;
|
||||
}
|
||||
}
|
||||
|
||||
disconnect() {
|
||||
this.shouldConnect = false;
|
||||
this.disconnectBc();
|
||||
if (this.ws !== null) {
|
||||
this.ws.close();
|
||||
}
|
||||
}
|
||||
|
||||
connect() {
|
||||
this.shouldConnect = true;
|
||||
if (!this.wsconnected && this.ws === null) {
|
||||
setupWS(this);
|
||||
this.connectBc();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user