feat: sync with keck

This commit is contained in:
DarkSky 2022-09-14 17:32:08 +08:00
parent 914f5ad436
commit 3ff0dbe781
11 changed files with 350 additions and 35 deletions

139
apps/keck/src/utils.ts.bak Normal file
View File

@ -0,0 +1,139 @@
import WebSocket = require('ws');
import http = require('http');
import Y = require('yjs');
import lib0 = require('lib0');
import syncProtocol = require('y-protocols/sync');
const { encoding, decoding, map } = lib0;
const wsReadyStateConnecting = 0;
const wsReadyStateOpen = 1;
// disable gc when using snapshots!
const gcEnabled = process.env.GC !== 'false' && process.env.GC !== '0';
const docs: Map<string, WSSharedDoc> = new Map();
const messageSync = 0;
const updateHandler = (update: Uint8Array, origin: any, doc: WSSharedDoc) => {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageSync);
syncProtocol.writeUpdate(encoder, update);
const message = encoding.toUint8Array(encoder);
doc.conns.forEach((_, conn) => send(doc, conn, message));
};
export class WSSharedDoc extends Y.Doc {
name: string;
conns: Map<any, any>;
constructor(name: string) {
super({ gc: gcEnabled });
this.name = name;
// Maps from conn to set of controlled user ids. Delete all user ids from awareness when this conn is closed
this.conns = new Map();
this.on('update', updateHandler);
}
}
// Gets a Y.Doc by name, whether in memory or on disk
const getYDoc = (docname: string, gc = true): WSSharedDoc =>
map.setIfUndefined(docs, docname, () => {
const doc = new WSSharedDoc(docname);
doc.gc = gc;
docs.set(docname, doc);
return doc;
});
const closeConn = (doc: WSSharedDoc, conn: any) => {
if (doc.conns.has(conn)) {
doc.conns.delete(conn);
}
conn.close();
};
const send = (doc: WSSharedDoc, conn: any, m: Uint8Array) => {
if (
conn.readyState !== wsReadyStateConnecting &&
conn.readyState !== wsReadyStateOpen
) {
closeConn(doc, conn);
}
try {
conn.send(m, (/** @param {any} err */ err: any) => {
err != null && closeConn(doc, conn);
});
} catch (e) {
closeConn(doc, conn);
}
};
export const handleConnection = (
socket: WebSocket.WebSocket,
request: http.IncomingMessage,
docName: string
) => {
const gc = true;
socket.binaryType = 'arraybuffer';
// get doc, initialize if it does not exist yet
const doc = getYDoc(docName, gc);
doc.conns.set(socket, new Set());
// listen and reply to events
socket.on('message', (message: ArrayBuffer) => {
try {
const encoder = encoding.createEncoder();
const decoder = decoding.createDecoder(new Uint8Array(message));
const messageType = decoding.readVarUint(decoder);
switch (messageType) {
case messageSync:
encoding.writeVarUint(encoder, messageSync);
syncProtocol.readSyncMessage(decoder, encoder, doc, null);
if (encoding.length(encoder) > 1) {
send(doc, socket, encoding.toUint8Array(encoder));
}
break;
}
} catch (err) {
console.error(err);
doc.emit('error', [err]);
}
});
// Check if connection is still alive
let pongReceived = true;
const pingInterval = setInterval(() => {
if (!pongReceived) {
if (doc.conns.has(socket)) {
closeConn(doc, socket);
}
clearInterval(pingInterval);
} else if (doc.conns.has(socket)) {
pongReceived = false;
try {
socket.ping();
} catch (e) {
closeConn(doc, socket);
clearInterval(pingInterval);
}
}
}, 30 * 1000);
socket.on('close', () => {
closeConn(doc, socket);
clearInterval(pingInterval);
});
socket.on('pong', () => {
pongReceived = true;
});
// put the following in a variables in a block so the interval handlers don't keep in in
// scope
{
// send sync step 1
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageSync);
console.log('sync step 0', encoding.toUint8Array(encoder));
syncProtocol.writeSyncStep1(encoder, doc);
send(doc, socket, encoding.toUint8Array(encoder));
console.log('sync step 1 sent', encoding.toUint8Array(encoder));
}
};

View File

@ -1,11 +1,6 @@
{
"/api": {
"target": "https://nightly.affine.pro/",
"secure": false,
"changeOrigin": true
},
"/collaboration": {
"target": "https://canary.affine.pro",
"target": "http://127.0.0.1:3000/",
"ws": true,
"changeOrigin": true,
"secure": false

View File

@ -33,13 +33,15 @@ async function _getCurrentToken() {
if (user) resolve(user.getIdToken());
});
});
} else if (process.env['NX_KECK']) {
return 'AFFiNE';
}
return undefined;
}
const _enabled = {
demo: [],
AFFiNE: process.env['NX_KECK'] ? ['idb'] : ['idb', 'ws'],
AFFiNE: process.env['NX_KECK'] ? ['idb', 'ws'] : ['idb'],
} as any;
async function _getBlockDatabase(

View File

@ -0,0 +1,115 @@
import * as Y from 'yjs';
import { Observable } from 'lib0/observable';
import * as url from 'lib0/url';
import { handler } from './handler';
import { registerKeckUpdateHandler } from './processor';
import { registerWebsocket } from './websocket';
/**
* Websocket Provider for Yjs. Creates a websocket connection to sync the shared document.
* The document name is attached to the provided url. I.e. the following example
* creates a websocket connection to http://localhost:3000/my-document-name
*
* @example
* import * as Y from 'yjs'
* import { KeckProvider } from 'jwt-rpc'
* const doc = new Y.Doc()
* const provider = new KeckProvider('http://localhost:3000', 'my-document-name', doc)
*/
export class KeckProvider extends Observable<string> {
maxBackOffTime: number;
url: string;
roomName: string;
doc: Y.Doc;
wsUnsuccessfulReconnects: number;
private _synced: boolean;
broadcastChannel: string;
private _broadcast?: {
broadcastMessage: (buf: ArrayBuffer) => void;
disconnect: () => void;
};
private _websocket?: {
broadcastMessage: (buf: ArrayBuffer) => void;
disconnect: () => void;
};
private _updateHandlerDestroy: () => void;
constructor(
token: string,
serverUrl: string,
roomName: string,
doc: Y.Doc,
{ params = {}, resyncInterval = -1, maxBackOffTime = 2500 } = {}
) {
super();
this.roomName = roomName;
// ensure that url is always ends with /
while (serverUrl[serverUrl.length - 1] === '/') {
serverUrl = serverUrl.slice(0, serverUrl.length - 1);
}
this.broadcastChannel = serverUrl + '/' + roomName + '/';
const encodedParams = url.encodeQueryParams(params);
this.url =
this.broadcastChannel +
(encodedParams.length === 0 ? '' : '?' + encodedParams);
this.doc = doc;
this.maxBackOffTime = maxBackOffTime;
this.wsUnsuccessfulReconnects = 0;
this._synced = false;
this._websocket = registerWebsocket(this, token, resyncInterval);
this._updateHandlerDestroy = registerKeckUpdateHandler(
this,
doc,
buf => {
this._websocket?.broadcastMessage(buf);
this._broadcast?.broadcastMessage(buf);
}
);
}
get messageHandlers() {
return handler;
}
get synced() {
return this._synced;
}
set synced(state) {
if (this._synced !== state) {
this._synced = state;
this.emit('synced', [state]);
this.emit('sync', [state]);
}
}
override destroy() {
if (this._broadcast) {
const disconnect = this._broadcast.disconnect;
this._broadcast = undefined;
disconnect();
}
if (this._websocket) {
const disconnect = this._websocket.disconnect;
this._websocket = undefined;
disconnect();
}
this._updateHandlerDestroy?.();
super.destroy();
}
}

View File

@ -5,10 +5,11 @@ import * as syncProtocol from 'y-protocols/sync';
import * as Y from 'yjs';
import { Message } from './handler';
import { WebsocketProvider } from './provider';
import { KeckProvider } from './keckprovider';
import { WebsocketProvider } from './wsprovider';
export const readMessage = (
provider: WebsocketProvider,
provider: WebsocketProvider | KeckProvider,
buf: Uint8Array,
emitSynced: boolean
): encoding.Encoder => {
@ -24,7 +25,7 @@ export const readMessage = (
return encoder;
};
export const registerUpdateHandler = (
export const registerWsUpdateHandler = (
provider: WebsocketProvider,
awareness: awarenessProtocol.Awareness,
doc: Y.Doc,
@ -78,3 +79,24 @@ export const registerUpdateHandler = (
doc.off('update', documentUpdateHandler);
};
};
export const registerKeckUpdateHandler = (
provider: KeckProvider,
doc: Y.Doc,
broadcastMessage: (buf: ArrayBuffer) => void
) => {
// Listens to Yjs updates and sends them to remote peers (ws and broadcastchannel)
const documentUpdateHandler = (update: Uint8Array, origin: any) => {
if (origin !== provider) {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, Message.sync);
syncProtocol.writeUpdate(encoder, update);
broadcastMessage(encoding.toUint8Array(encoder));
}
};
doc.on('update', documentUpdateHandler);
return () => {
doc.off('update', documentUpdateHandler);
};
};

View File

@ -5,8 +5,9 @@ import * as awarenessProtocol from 'y-protocols/awareness';
import * as syncProtocol from 'y-protocols/sync';
import { Message } from './handler';
import { KeckProvider } from './keckprovider';
import { readMessage } from './processor';
import { WebsocketProvider } from './provider';
import { WebsocketProvider } from './wsprovider';
enum WebSocketState {
disconnected = 0,
@ -46,14 +47,14 @@ const _getToken = async (
return resp.json();
};
const _getTimeout = (provider: WebsocketProvider) =>
const _getTimeout = (provider: WebsocketProvider | KeckProvider) =>
math.min(
math.pow(2, provider.wsUnsuccessfulReconnects) * 100,
provider.maxBackOffTime
);
export const registerWebsocket = (
provider: WebsocketProvider,
provider: WebsocketProvider | KeckProvider,
token: string,
resync = -1,
reconnect = 3,
@ -105,13 +106,19 @@ export const registerWebsocket = (
state = WebSocketState.disconnected;
provider.synced = false;
// update awareness (all users except local left)
awarenessProtocol.removeAwarenessStates(
provider.awareness,
Array.from(
provider.awareness.getStates().keys()
).filter(client => client !== provider.doc.clientID),
provider
);
const awareness = (provider as any)['awareness'];
if (awareness) {
awarenessProtocol.removeAwarenessStates(
awareness,
Array.from(awareness.getStates().keys()).filter(
(client): client is number =>
client !== provider.doc.clientID
),
provider
);
}
provider.emit('status', [{ status: 'disconnected' }]);
} else {
provider.wsUnsuccessfulReconnects++;
@ -139,8 +146,10 @@ export const registerWebsocket = (
encoding.writeVarUint(encoder, Message.sync);
syncProtocol.writeSyncStep1(encoder, provider.doc);
websocket?.send(encoding.toUint8Array(encoder));
const awareness = (provider as any)['awareness'];
// broadcast local awareness state
if (provider.awareness.getLocalState() !== null) {
if (awareness && awareness.getLocalState() !== null) {
const encoderAwarenessState = encoding.createEncoder();
encoding.writeVarUint(
encoderAwarenessState,
@ -148,10 +157,9 @@ export const registerWebsocket = (
);
encoding.writeVarUint8Array(
encoderAwarenessState,
awarenessProtocol.encodeAwarenessUpdate(
provider.awareness,
[provider.doc.clientID]
)
awarenessProtocol.encodeAwarenessUpdate(awareness, [
provider.doc.clientID,
])
);
websocket?.send(
encoding.toUint8Array(encoderAwarenessState)

View File

@ -6,7 +6,7 @@ import * as url from 'lib0/url';
import * as awarenessProtocol from 'y-protocols/awareness';
import { handler } from './handler';
import { registerUpdateHandler } from './processor';
import { registerWsUpdateHandler } from './processor';
import { registerWebsocket } from './websocket';
/**
@ -85,7 +85,7 @@ export class WebsocketProvider extends Observable<string> {
// this.doc
// );
this._updateHandlerDestroy = registerUpdateHandler(
this._updateHandlerDestroy = registerWsUpdateHandler(
this,
awareness,
doc,

View File

@ -17,7 +17,7 @@ function getCollaborationPoint() {
const { protocol, host } = getLocation();
const ws = protocol.startsWith('https') ? 'wss' : 'ws';
const isOnline = host.endsWith('affine.pro');
const site = isOnline ? host : 'localhost:4200';
const site = isOnline ? host : 'localhost:3000';
return `${ws}://${site}/collaboration/`;
}

View File

@ -120,13 +120,13 @@ async function _initYjsDatabase(
[name]: p,
};
}),
p({
awareness,
doc: binaries,
token,
workspace: `${workspace}_binaries`,
emitState,
}).then(p => ({ [`${name}_binaries`]: p })),
// p({
// awareness,
// doc: binaries,
// token,
// workspace: `${workspace}_binaries`,
// emitState,
// }).then(p => ({ [`${name}_binaries`]: p })),
])
);
}

View File

@ -79,6 +79,39 @@ export const getYjsProviders = (
}
) as any; // TODO: type is erased after cascading references
// Wait for ws synchronization to complete, otherwise the data will be modified in reverse, which can be optimized later
return new Promise<void>((resolve, reject) => {
// TODO: synced will also be triggered on reconnection after losing sync
// There needs to be an event mechanism to emit the synchronization state to the upper layer
ws.once('synced', () => resolve());
ws.once('lost-connection', () => resolve());
ws.once('connection-error', () => reject());
ws.on('synced', () => instances.emitState('connected'));
ws.on('lost-connection', () =>
instances.emitState('retry')
);
ws.on('connection-error', () =>
instances.emitState('retry')
);
});
} else {
return;
}
}
},
keck: async (instances: YjsDefaultInstances) => {
if (options.enabled.includes('ws')) {
if (instances.token) {
const ws = new WebsocketProvider(
instances.token,
options.backend,
instances.workspace,
instances.doc,
{
params: options.params,
}
) as any; // TODO: type is erased after cascading references
// Wait for ws synchronization to complete, otherwise the data will be modified in reverse, which can be optimized later
return new Promise<void>((resolve, reject) => {
// TODO: synced will also be triggered on reconnection after losing sync

View File

@ -9,6 +9,7 @@
"start": "env-cmd -f .github/env/.env.local-dev nx serve ligo-virgo",
"start:affine": "nx serve ligo-virgo",
"start:keck": "nx serve keck",
"start:keck-dev": "env-cmd -f .github/env/.env.local-keck nx serve ligo-virgo",
"start:venus": "nx serve venus",
"build": "nx build ligo-virgo",
"build:local": "env-cmd -f .github/env/.env.local-dev nx build ligo-virgo",