perf: lazy doc provider factory (#3330)

Co-authored-by: Alex Yang <himself65@outlook.com>
This commit is contained in:
Peng Xiao 2023-07-21 13:23:18 +08:00 committed by GitHub
parent cff741e9ba
commit 869d98d019
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 609 additions and 0 deletions

View File

@ -0,0 +1,19 @@
export interface DatasourceDocAdapter {
// request diff update from other clients
queryDocState: (
guid: string,
options?: {
stateVector?: Uint8Array;
targetClientId?: number;
}
) => Promise<Uint8Array | false>;
// send update to the datasource
sendDocUpdate: (guid: string, update: Uint8Array) => Promise<void>;
// listen to update from the datasource. Returns a function to unsubscribe.
// this is optional because some datasource might not support it
onDocUpdate?(
callback: (guid: string, update: Uint8Array) => void
): () => void;
}

View File

@ -0,0 +1,148 @@
import type { PassiveDocProvider } from '@blocksuite/store';
import {
applyUpdate,
type Doc,
encodeStateAsUpdate,
encodeStateVectorFromUpdate,
} from 'yjs';
import type { DatasourceDocAdapter } from './datasource-doc-adapter';
const selfUpdateOrigin = 'lazy-provider-self-origin';
function getDoc(doc: Doc, guid: string): Doc | undefined {
if (doc.guid === guid) {
return doc;
}
for (const subdoc of doc.subdocs) {
const found = getDoc(subdoc, guid);
if (found) {
return found;
}
}
return undefined;
}
/**
* Creates a lazy provider that connects to a datasource and synchronizes a root document.
*/
export const createLazyProvider = (
rootDoc: Doc,
datasource: DatasourceDocAdapter
): Omit<PassiveDocProvider, 'flavour'> => {
let connected = false;
const pendingMap = new Map<string, Uint8Array[]>(); // guid -> pending-updates
const disposableMap = new Map<string, Set<() => void>>();
let datasourceUnsub: (() => void) | undefined;
async function syncDoc(doc: Doc) {
const guid = doc.guid;
// perf: optimize me
const currentUpdate = encodeStateAsUpdate(doc);
const remoteUpdate = await datasource.queryDocState(guid, {
stateVector: encodeStateVectorFromUpdate(currentUpdate),
});
const updates = [currentUpdate];
pendingMap.set(guid, []);
if (remoteUpdate) {
applyUpdate(doc, remoteUpdate, selfUpdateOrigin);
const newUpdate = encodeStateAsUpdate(
doc,
encodeStateVectorFromUpdate(remoteUpdate)
);
updates.push(newUpdate);
await datasource.sendDocUpdate(guid, newUpdate);
}
}
function setupDocListener(doc: Doc) {
const disposables = new Set<() => void>();
disposableMap.set(doc.guid, disposables);
const updateHandler = async (update: Uint8Array, origin: unknown) => {
if (origin === selfUpdateOrigin) {
return;
}
datasource.sendDocUpdate(doc.guid, update).catch(console.error);
};
const subdocLoadHandler = (event: { loaded: Set<Doc> }) => {
event.loaded.forEach(subdoc => {
connectDoc(subdoc).catch(console.error);
});
};
doc.on('update', updateHandler);
doc.on('subdocs', subdocLoadHandler);
// todo: handle destroy?
disposables.add(() => {
doc.off('update', updateHandler);
doc.off('subdocs', subdocLoadHandler);
});
}
function setupDatasourceListeners() {
datasourceUnsub = datasource.onDocUpdate?.((guid, update) => {
const doc = getDoc(rootDoc, guid);
if (doc) {
applyUpdate(doc, update);
//
if (pendingMap.has(guid)) {
pendingMap.get(guid)?.forEach(update => applyUpdate(doc, update));
pendingMap.delete(guid);
}
} else {
// This case happens when the father doc is not yet updated,
// so that the child doc is not yet created.
// We need to put it into cache so that it can be applied later.
console.warn('idb: doc not found', guid);
pendingMap.set(guid, (pendingMap.get(guid) ?? []).concat(update));
}
});
}
// when a subdoc is loaded, we need to sync it with the datasource and setup listeners
async function connectDoc(doc: Doc) {
setupDocListener(doc);
await syncDoc(doc);
await Promise.all(
[...doc.subdocs]
.filter(subdoc => subdoc.shouldLoad)
.map(subdoc => connectDoc(subdoc))
);
}
function disposeAll() {
disposableMap.forEach(disposables => {
disposables.forEach(dispose => dispose());
});
disposableMap.clear();
}
function connect() {
connected = true;
// root doc should be already loaded,
// but we want to populate the cache for later update events
connectDoc(rootDoc).catch(console.error);
setupDatasourceListeners();
}
async function disconnect() {
connected = false;
disposeAll();
datasourceUnsub?.();
datasourceUnsub = undefined;
}
return {
get connected() {
return connected;
},
passive: true,
connect,
disconnect,
};
};

View File

@ -0,0 +1,8 @@
# A set of provider utilities for Yjs
## createLazyProvider
A factory function to create a lazy provider. It will not download the document from the provider until the first time a document is loaded at the parent doc.
To use it, first define a `DatasourceDocAdapter`.
Then, create a `LazyProvider` with `createLazyProvider(rootDoc, datasource)`.

View File

@ -0,0 +1,17 @@
{
"name": "@affine/y-provider",
"type": "module",
"version": "0.7.0-canary.47",
"description": "Yjs provider utilities for AFFiNE",
"exports": {
".": "./src/index.ts"
},
"main": "./src/index.ts",
"module": "./src/index.ts",
"devDependencies": {
"@blocksuite/store": "0.0.0-20230719163314-76d863fc-nightly"
},
"peerDependencies": {
"yjs": "^13.5.51"
}
}

View File

@ -0,0 +1,181 @@
import { setTimeout } from 'node:timers/promises';
import { describe, expect, test } from 'vitest';
import { applyUpdate, Doc, encodeStateAsUpdate } from 'yjs';
import { createLazyProvider } from '../lazy-provider';
import type { DatasourceDocAdapter } from '../types';
import { getDoc } from '../utils';
const createMemoryDatasource = (rootDoc: Doc) => {
const selfUpdateOrigin = Symbol('self-origin');
const listeners = new Set<(guid: string, update: Uint8Array) => void>();
function trackDoc(doc: Doc) {
doc.on('update', (update, origin) => {
if (origin === selfUpdateOrigin) {
return;
}
for (const listener of listeners) {
listener(doc.guid, update);
}
});
doc.on('subdocs', () => {
for (const subdoc of rootDoc.subdocs) {
trackDoc(subdoc);
}
});
}
trackDoc(rootDoc);
const adapter = {
queryDocState: async (guid, options) => {
const subdoc = getDoc(rootDoc, guid);
if (!subdoc) {
return false;
}
return encodeStateAsUpdate(subdoc, options?.stateVector);
},
sendDocUpdate: async (guid, update) => {
const subdoc = getDoc(rootDoc, guid);
if (!subdoc) {
return;
}
applyUpdate(subdoc, update, selfUpdateOrigin);
},
onDocUpdate: callback => {
listeners.add(callback);
return () => {
listeners.delete(callback);
};
},
} satisfies DatasourceDocAdapter;
return {
rootDoc, // expose rootDoc for testing
...adapter,
};
};
describe('y-provider', () => {
test('should sync a subdoc if it is loaded after connect', async () => {
const remoteRootDoc = new Doc(); // this is the remote doc lives in remote
const datasource = createMemoryDatasource(remoteRootDoc);
const remotesubdoc = new Doc();
remotesubdoc.getText('text').insert(0, 'test-subdoc-value');
// populate remote doc with simple data
remoteRootDoc.getMap('map').set('test-0', 'test-0-value');
remoteRootDoc.getMap('map').set('subdoc', remotesubdoc);
const rootDoc = new Doc({ guid: remoteRootDoc.guid }); // this is the doc that we want to sync
const provider = createLazyProvider(rootDoc, datasource);
provider.connect();
await setTimeout(); // wait for the provider to sync
const subdoc = rootDoc.getMap('map').get('subdoc') as Doc;
expect(rootDoc.getMap('map').get('test-0')).toBe('test-0-value');
expect(subdoc.getText('text').toJSON()).toBe('');
// onload, the provider should sync the subdoc
subdoc.load();
await setTimeout();
expect(subdoc.getText('text').toJSON()).toBe('test-subdoc-value');
remotesubdoc.getText('text').insert(0, 'prefix-');
await setTimeout();
expect(subdoc.getText('text').toJSON()).toBe('prefix-test-subdoc-value');
});
test('should sync a shouldLoad=true subdoc on connect', async () => {
const remoteRootDoc = new Doc(); // this is the remote doc lives in remote
const datasource = createMemoryDatasource(remoteRootDoc);
const remotesubdoc = new Doc();
remotesubdoc.getText('text').insert(0, 'test-subdoc-value');
// populate remote doc with simple data
remoteRootDoc.getMap('map').set('test-0', 'test-0-value');
remoteRootDoc.getMap('map').set('subdoc', remotesubdoc);
const rootDoc = new Doc({ guid: remoteRootDoc.guid }); // this is the doc that we want to sync
applyUpdate(rootDoc, encodeStateAsUpdate(remoteRootDoc)); // sync rootDoc with remoteRootDoc
const subdoc = rootDoc.getMap('map').get('subdoc') as Doc;
expect(subdoc.getText('text').toJSON()).toBe('');
subdoc.load();
const provider = createLazyProvider(rootDoc, datasource);
provider.connect();
await setTimeout(); // wait for the provider to sync
expect(subdoc.getText('text').toJSON()).toBe('test-subdoc-value');
});
test('should send existing local update to remote on connect', async () => {
const remoteRootDoc = new Doc(); // this is the remote doc lives in remote
const datasource = createMemoryDatasource(remoteRootDoc);
const rootDoc = new Doc({ guid: remoteRootDoc.guid }); // this is the doc that we want to sync
applyUpdate(rootDoc, encodeStateAsUpdate(remoteRootDoc)); // sync rootDoc with remoteRootDoc
rootDoc.getText('text').insert(0, 'test-value');
const provider = createLazyProvider(rootDoc, datasource);
provider.connect();
await setTimeout(); // wait for the provider to sync
expect(remoteRootDoc.getText('text').toJSON()).toBe('test-value');
});
test('should send local update to remote for subdoc after connect', async () => {
const remoteRootDoc = new Doc(); // this is the remote doc lives in remote
const datasource = createMemoryDatasource(remoteRootDoc);
const rootDoc = new Doc({ guid: remoteRootDoc.guid }); // this is the doc that we want to sync
const provider = createLazyProvider(rootDoc, datasource);
provider.connect();
await setTimeout(); // wait for the provider to sync
const subdoc = new Doc();
rootDoc.getMap('map').set('subdoc', subdoc);
subdoc.getText('text').insert(0, 'test-subdoc-value');
await setTimeout(); // wait for the provider to sync
const remoteSubdoc = remoteRootDoc.getMap('map').get('subdoc') as Doc;
expect(remoteSubdoc.getText('text').toJSON()).toBe('test-subdoc-value');
});
test('should not send local update to remote for subdoc after disconnect', async () => {
const remoteRootDoc = new Doc(); // this is the remote doc lives in remote
const datasource = createMemoryDatasource(remoteRootDoc);
const rootDoc = new Doc({ guid: remoteRootDoc.guid }); // this is the doc that we want to sync
const provider = createLazyProvider(rootDoc, datasource);
provider.connect();
await setTimeout(); // wait for the provider to sync
const subdoc = new Doc();
rootDoc.getMap('map').set('subdoc', subdoc);
await setTimeout(); // wait for the provider to sync
const remoteSubdoc = remoteRootDoc.getMap('map').get('subdoc') as Doc;
expect(remoteSubdoc.getText('text').toJSON()).toBe('');
provider.disconnect();
subdoc.getText('text').insert(0, 'test-subdoc-value');
setTimeout();
expect(remoteSubdoc.getText('text').toJSON()).toBe('');
expect(provider.connected).toBe(false);
});
});

View File

@ -0,0 +1,2 @@
export * from './lazy-provider';
export * from './types';

View File

@ -0,0 +1,182 @@
import type { PassiveDocProvider } from '@blocksuite/store';
import {
applyUpdate,
type Doc,
encodeStateAsUpdate,
encodeStateVectorFromUpdate,
} from 'yjs';
import type { DatasourceDocAdapter } from './types';
const selfUpdateOrigin = 'lazy-provider-self-origin';
function getDoc(doc: Doc, guid: string): Doc | undefined {
if (doc.guid === guid) {
return doc;
}
for (const subdoc of doc.subdocs) {
const found = getDoc(subdoc, guid);
if (found) {
return found;
}
}
return undefined;
}
/**
* Creates a lazy provider that connects to a datasource and synchronizes a root document.
*/
export const createLazyProvider = (
rootDoc: Doc,
datasource: DatasourceDocAdapter
): Omit<PassiveDocProvider, 'flavour'> => {
let connected = false;
const pendingMap = new Map<string, Uint8Array[]>(); // guid -> pending-updates
const disposableMap = new Map<string, Set<() => void>>();
const connectedDocs = new Set();
let datasourceUnsub: (() => void) | undefined;
async function syncDoc(doc: Doc) {
const guid = doc.guid;
// perf: optimize me
const currentUpdate = encodeStateAsUpdate(doc);
const remoteUpdate = await datasource.queryDocState(guid, {
stateVector: encodeStateVectorFromUpdate(currentUpdate),
});
const updates = [currentUpdate];
pendingMap.set(guid, []);
if (remoteUpdate) {
applyUpdate(doc, remoteUpdate, selfUpdateOrigin);
const newUpdate = encodeStateAsUpdate(
doc,
encodeStateVectorFromUpdate(remoteUpdate)
);
updates.push(newUpdate);
await datasource.sendDocUpdate(guid, newUpdate);
}
}
/**
* Sets up event listeners for a Yjs document.
* @param doc - The Yjs document to set up listeners for.
*/
function setupDocListener(doc: Doc) {
const disposables = new Set<() => void>();
disposableMap.set(doc.guid, disposables);
const updateHandler = async (update: Uint8Array, origin: unknown) => {
if (origin === selfUpdateOrigin) {
return;
}
datasource.sendDocUpdate(doc.guid, update).catch(console.error);
};
const subdocLoadHandler = (event: {
loaded: Set<Doc>;
removed: Set<Doc>;
}) => {
event.loaded.forEach(subdoc => {
connectDoc(subdoc).catch(console.error);
});
event.removed.forEach(subdoc => {
disposeDoc(subdoc);
});
};
doc.on('update', updateHandler);
doc.on('subdocs', subdocLoadHandler);
// todo: handle destroy?
disposables.add(() => {
doc.off('update', updateHandler);
doc.off('subdocs', subdocLoadHandler);
});
}
/**
* Sets up event listeners for the datasource.
* Specifically, listens for updates to documents and applies them to the corresponding Yjs document.
*/
function setupDatasourceListeners() {
datasourceUnsub = datasource.onDocUpdate?.((guid, update) => {
const doc = getDoc(rootDoc, guid);
if (doc) {
applyUpdate(doc, update);
//
if (pendingMap.has(guid)) {
pendingMap.get(guid)?.forEach(update => applyUpdate(doc, update));
pendingMap.delete(guid);
}
} else {
// This case happens when the father doc is not yet updated,
// so that the child doc is not yet created.
// We need to put it into cache so that it can be applied later.
console.warn('idb: doc not found', guid);
pendingMap.set(guid, (pendingMap.get(guid) ?? []).concat(update));
}
});
}
// when a subdoc is loaded, we need to sync it with the datasource and setup listeners
async function connectDoc(doc: Doc) {
// skip if already connected
if (connectedDocs.has(doc.guid)) {
return;
}
connectedDocs.add(doc.guid);
setupDocListener(doc);
await syncDoc(doc);
await Promise.all(
[...doc.subdocs]
.filter(subdoc => subdoc.shouldLoad)
.map(subdoc => connectDoc(subdoc))
);
}
function disposeDoc(doc: Doc) {
connectedDocs.delete(doc.guid);
const disposables = disposableMap.get(doc.guid);
if (disposables) {
disposables.forEach(dispose => dispose());
disposableMap.delete(doc.guid);
}
// also dispose all subdocs
doc.subdocs.forEach(disposeDoc);
}
function disposeAll() {
disposableMap.forEach(disposables => {
disposables.forEach(dispose => dispose());
});
disposableMap.clear();
}
/**
* Connects to the datasource and sets up event listeners for document updates.
*/
function connect() {
connected = true;
// root doc should be already loaded,
// but we want to populate the cache for later update events
connectDoc(rootDoc).catch(console.error);
setupDatasourceListeners();
}
async function disconnect() {
connected = false;
disposeAll();
datasourceUnsub?.();
datasourceUnsub = undefined;
}
return {
get connected() {
return connected;
},
passive: true,
connect,
disconnect,
};
};

View File

@ -0,0 +1,19 @@
export interface DatasourceDocAdapter {
// request diff update from other clients
queryDocState: (
guid: string,
options?: {
stateVector?: Uint8Array;
targetClientId?: number;
}
) => Promise<Uint8Array | false>;
// send update to the datasource
sendDocUpdate: (guid: string, update: Uint8Array) => Promise<void>;
// listen to update from the datasource. Returns a function to unsubscribe.
// this is optional because some datasource might not support it
onDocUpdate?(
callback: (guid: string, update: Uint8Array) => void
): () => void;
}

View File

@ -0,0 +1,14 @@
import type { Doc } from 'yjs';
export function getDoc(doc: Doc, guid: string): Doc | undefined {
if (doc.guid === guid) {
return doc;
}
for (const subdoc of doc.subdocs) {
const found = getDoc(subdoc, guid);
if (found) {
return found;
}
}
return undefined;
}

View File

@ -0,0 +1,9 @@
{
"extends": "../../tsconfig.json",
"include": ["./src"],
"compilerOptions": {
"composite": true,
"noEmit": false,
"outDir": "lib"
}
}

View File

@ -636,6 +636,16 @@ __metadata:
languageName: unknown
linkType: soft
"@affine/y-provider@workspace:packages/y-provider":
version: 0.0.0-use.local
resolution: "@affine/y-provider@workspace:packages/y-provider"
dependencies:
"@blocksuite/store": 0.0.0-20230719163314-76d863fc-nightly
peerDependencies:
yjs: ^13.5.51
languageName: unknown
linkType: soft
"@alloc/quick-lru@npm:^5.2.0":
version: 5.2.0
resolution: "@alloc/quick-lru@npm:5.2.0"