diff --git a/packages/common/nbstore/src/__tests__/frontend.spec.ts b/packages/common/nbstore/src/__tests__/frontend.spec.ts new file mode 100644 index 0000000000..941a13a4d8 --- /dev/null +++ b/packages/common/nbstore/src/__tests__/frontend.spec.ts @@ -0,0 +1,50 @@ +import 'fake-indexeddb/auto'; + +import { test, vitest } from 'vitest'; +import { Doc as YDoc } from 'yjs'; + +import { DocFrontend } from '../frontend/doc'; +import { IndexedDBDocStorage } from '../impls/idb'; +import { expectYjsEqual } from './utils'; + +test('doc', async () => { + const doc1 = new YDoc({ + guid: 'test-doc', + }); + doc1.getMap('test').set('hello', 'world'); + + const docStorage = new IndexedDBDocStorage({ + id: 'ws1', + peer: 'a', + type: 'workspace', + }); + + await docStorage.connect(); + + const frontend1 = new DocFrontend(docStorage, null); + frontend1.start(); + frontend1.addDoc(doc1); + await vitest.waitFor(async () => { + const doc = await docStorage.getDoc('test-doc'); + expectYjsEqual(doc!.bin, { + test: { + hello: 'world', + }, + }); + }); + + const doc2 = new YDoc({ + guid: 'test-doc', + }); + const frontend2 = new DocFrontend(docStorage, null); + frontend2.start(); + frontend2.addDoc(doc2); + + await vitest.waitFor(async () => { + expectYjsEqual(doc2, { + test: { + hello: 'world', + }, + }); + }); +}); diff --git a/packages/common/nbstore/src/__tests__/sync.spec.ts b/packages/common/nbstore/src/__tests__/sync.spec.ts index bbff7c3054..e6c70452f8 100644 --- a/packages/common/nbstore/src/__tests__/sync.spec.ts +++ b/packages/common/nbstore/src/__tests__/sync.spec.ts @@ -10,6 +10,7 @@ import { } from '../impls/idb'; import { SpaceStorage } from '../storage'; import { SyncEngine } from '../sync'; +import { expectYjsEqual } from './utils'; test('doc', async () => { const doc = new YDoc(); @@ -53,19 +54,24 @@ test('doc', async () => { }); const sync = new SyncEngine(peerA, [peerB, peerC]); - const abort = new AbortController(); - sync.run(abort.signal); + sync.start(); await new Promise(resolve => setTimeout(resolve, 1000)); { const b = await peerB.get('doc').getDoc('doc1'); - expect(b).not.toBeNull(); - expect(b?.bin).toEqual(update); + expectYjsEqual(b!.bin, { + test: { + hello: 'world', + }, + }); const c = await peerC.get('doc').getDoc('doc1'); - expect(c).not.toBeNull(); - expect(c?.bin).toEqual(update); + expectYjsEqual(c!.bin, { + test: { + hello: 'world', + }, + }); } doc.getMap('test').set('foo', 'bar'); @@ -79,12 +85,20 @@ test('doc', async () => { { const a = await peerA.get('doc').getDoc('doc1'); - expect(a).not.toBeNull(); - expect(a?.bin).toEqual(update2); + expectYjsEqual(a!.bin, { + test: { + hello: 'world', + foo: 'bar', + }, + }); const c = await peerC.get('doc').getDoc('doc1'); - expect(c).not.toBeNull(); - expect(c?.bin).toEqual(update2); + expectYjsEqual(c!.bin, { + test: { + hello: 'world', + foo: 'bar', + }, + }); } }); @@ -130,8 +144,7 @@ test('blob', async () => { await peerC.connect(); const sync = new SyncEngine(peerA, [peerB, peerC]); - const abort = new AbortController(); - sync.run(abort.signal); + sync.start(); await new Promise(resolve => setTimeout(resolve, 1000)); diff --git a/packages/common/nbstore/src/__tests__/utils.ts b/packages/common/nbstore/src/__tests__/utils.ts new file mode 100644 index 0000000000..5952b12f0a --- /dev/null +++ b/packages/common/nbstore/src/__tests__/utils.ts @@ -0,0 +1,30 @@ +import { expect } from 'vitest'; +import { applyUpdate, Doc as YDoc } from 'yjs'; + +export function expectYjsEqual( + doc: Uint8Array | YDoc, + match: Record +) { + let ydoc: YDoc; + if (doc instanceof Uint8Array) { + ydoc = new YDoc(); + applyUpdate(ydoc, doc); + } else { + ydoc = doc; + } + + for (const key in match) { + const value = match[key]; + if (Array.isArray(value)) { + const actual = ydoc.getArray(key).toJSON(); + expect(actual).toEqual(value); + } else if (typeof value === 'string') { + const actual = ydoc.getText(key).toJSON(); + expect(actual).toEqual(value); + } else { + const actual = ydoc.getMap(key).toJSON(); + expect(actual).toEqual(value); + } + } + return doc; +} diff --git a/packages/common/nbstore/src/frontend/doc.ts b/packages/common/nbstore/src/frontend/doc.ts new file mode 100644 index 0000000000..7a1bf7b06d --- /dev/null +++ b/packages/common/nbstore/src/frontend/doc.ts @@ -0,0 +1,318 @@ +import { groupBy } from 'lodash-es'; +import { nanoid } from 'nanoid'; +import { Subject } from 'rxjs'; +import { + applyUpdate, + type Doc as YDoc, + encodeStateAsUpdate, + mergeUpdates, +} from 'yjs'; + +import type { DocRecord, DocStorage } from '../storage'; +import type { DocSyncEngine } from '../sync/doc'; +import { AsyncPriorityQueue } from '../utils/async-priority-queue'; +import { isEmptyUpdate } from '../utils/is-empty-update'; +import { throwIfAborted } from '../utils/throw-if-aborted'; + +const NBSTORE_ORIGIN = 'nbstore-frontend'; + +type Job = + | { + type: 'load'; + docId: string; + } + | { + type: 'save'; + docId: string; + update: Uint8Array; + } + | { + type: 'apply'; + docId: string; + update: Uint8Array; + }; + +interface DocFrontendOptions { + mergeUpdates?: (updates: Uint8Array[]) => Promise | Uint8Array; +} + +export class DocFrontend { + private readonly uniqueId = `frontend:${this.storage.peer}:${nanoid()}`; + + private readonly prioritySettings = new Map(); + + private readonly status = { + docs: new Map(), + connectedDocs: new Set(), + readyDocs: new Set(), + jobDocQueue: new AsyncPriorityQueue(), + jobMap: new Map(), + currentJob: null as { docId: string; jobs: Job[] } | null, + }; + + private readonly statusUpdatedSubject$ = new Subject(); + + private readonly abort = new AbortController(); + + constructor( + private readonly storage: DocStorage, + private readonly sync: DocSyncEngine | null, + readonly options: DocFrontendOptions = {} + ) {} + + start() { + if (this.abort.signal.aborted) { + throw new Error('doc frontend can only start once'); + } + this.mainLoop(this.abort.signal).catch(error => { + console.error(error); + }); + } + + stop() { + this.abort.abort(); + } + + private async mainLoop(signal?: AbortSignal) { + const dispose = this.storage.subscribeDocUpdate((record, origin) => { + this.event.onStorageUpdate(record, origin); + }); + try { + // wait for storage to connect + await Promise.race([ + this.storage.connection.waitForConnected(signal), + new Promise((_, reject) => { + signal?.addEventListener('abort', reason => { + reject(reason); + }); + }), + ]); + + // eslint-disable-next-line no-constant-condition + while (true) { + throwIfAborted(signal); + const docId = await this.status.jobDocQueue.asyncPop(signal); + const jobs = this.status.jobMap.get(docId); + this.status.jobMap.delete(docId); + + if (!jobs) { + this.statusUpdatedSubject$.next(docId); + continue; + } + + this.status.currentJob = { docId, jobs }; + this.statusUpdatedSubject$.next(docId); + + const { apply, load, save } = groupBy(jobs, job => job.type) as { + [key in Job['type']]?: Job[]; + }; + + if (load?.length) { + await this.jobs.load(load[0] as any, signal); + } + + for (const applyJob of apply ?? []) { + await this.jobs.apply(applyJob as any, signal); + } + + if (save?.length) { + await this.jobs.save(docId, save as any, signal); + } + } + } finally { + dispose(); + } + } + + /** + * Add a doc to the frontend, the doc will sync with the doc storage. + * @param doc - The doc to add + * @param withSubDoc - Whether to add the subdocs of the doc + */ + addDoc(doc: YDoc, withSubDoc: boolean = false) { + this._addDoc(doc); + if (withSubDoc) { + doc.on('subdocs', ({ loaded }) => { + for (const subdoc of loaded) { + this._addDoc(subdoc); + } + }); + } + } + + readonly jobs = { + load: async (job: Job & { type: 'load' }, signal?: AbortSignal) => { + const doc = this.status.docs.get(job.docId); + if (!doc) { + return; + } + const existingData = encodeStateAsUpdate(doc); + + if (!isEmptyUpdate(existingData)) { + this.schedule({ + type: 'save', + docId: doc.guid, + update: existingData, + }); + } + + // mark doc as loaded + doc.emit('sync', [true, doc]); + + this.status.connectedDocs.add(job.docId); + this.statusUpdatedSubject$.next(job.docId); + + const docRecord = await this.storage.getDoc(job.docId); + throwIfAborted(signal); + + if (!docRecord || isEmptyUpdate(docRecord.bin)) { + return; + } + + this.applyUpdate(job.docId, docRecord.bin); + this.status.readyDocs.add(job.docId); + this.statusUpdatedSubject$.next(job.docId); + }, + save: async ( + docId: string, + jobs: (Job & { type: 'save' })[], + signal?: AbortSignal + ) => { + if (!this.status.docs.has(docId)) { + return; + } + if (this.status.connectedDocs.has(docId)) { + const merged = await this.mergeUpdates( + jobs.map(j => j.update).filter(update => !isEmptyUpdate(update)) + ); + throwIfAborted(signal); + await this.storage.pushDocUpdate( + { + docId, + bin: merged, + }, + this.uniqueId + ); + } + }, + apply: async (job: Job & { type: 'apply' }, signal?: AbortSignal) => { + throwIfAborted(signal); + if (!this.status.docs.has(job.docId)) { + return; + } + if (this.status.connectedDocs.has(job.docId)) { + this.applyUpdate(job.docId, job.update); + } + if (!isEmptyUpdate(job.update)) { + this.status.readyDocs.add(job.docId); + this.statusUpdatedSubject$.next(job.docId); + } + }, + }; + + event = { + onStorageUpdate: (update: DocRecord, origin?: string) => { + if (origin !== this.uniqueId) { + this.schedule({ + type: 'apply', + docId: update.docId, + update: update.bin, + }); + } + }, + }; + + /** + * Remove a doc from the frontend, the doc will stop syncing with the doc storage. + * It's not recommended to use this method directly, better to use `doc.destroy()`. + * + * @param doc - The doc to remove + */ + removeDoc(doc: YDoc) { + this.status.docs.delete(doc.guid); + this.status.connectedDocs.delete(doc.guid); + this.status.readyDocs.delete(doc.guid); + this.status.jobDocQueue.remove(doc.guid); + this.status.jobMap.delete(doc.guid); + this.statusUpdatedSubject$.next(doc.guid); + doc.off('update', this.handleDocUpdate); + } + + addPriority(id: string, priority: number) { + const undoSyncPriority = this.sync?.addPriority(id, priority); + const oldPriority = this.prioritySettings.get(id) ?? 0; + + this.prioritySettings.set(id, priority); + this.status.jobDocQueue.setPriority(id, oldPriority + priority); + + return () => { + const currentPriority = this.prioritySettings.get(id) ?? 0; + this.prioritySettings.set(id, currentPriority - priority); + this.status.jobDocQueue.setPriority(id, currentPriority - priority); + + undoSyncPriority?.(); + }; + } + + private _addDoc(doc: YDoc) { + this.schedule({ + type: 'load', + docId: doc.guid, + }); + + this.status.docs.set(doc.guid, doc); + this.statusUpdatedSubject$.next(doc.guid); + + doc.on('update', this.handleDocUpdate); + + doc.on('destroy', () => { + this.removeDoc(doc); + }); + } + + private schedule(job: Job) { + const priority = this.prioritySettings.get(job.docId) ?? 0; + this.status.jobDocQueue.push(job.docId, priority); + + const existingJobs = this.status.jobMap.get(job.docId) ?? []; + existingJobs.push(job); + this.status.jobMap.set(job.docId, existingJobs); + this.statusUpdatedSubject$.next(job.docId); + } + + applyUpdate(docId: string, update: Uint8Array) { + const doc = this.status.docs.get(docId); + if (doc && !isEmptyUpdate(update)) { + try { + applyUpdate(doc, update, NBSTORE_ORIGIN); + } catch (err) { + console.error('failed to apply update yjs doc', err); + } + } + } + + private readonly handleDocUpdate = ( + update: Uint8Array, + origin: any, + doc: YDoc + ) => { + if (origin === NBSTORE_ORIGIN) { + return; + } + if (!this.status.docs.has(doc.guid)) { + return; + } + + this.schedule({ + type: 'save', + docId: doc.guid, + update, + }); + }; + + protected mergeUpdates(updates: Uint8Array[]) { + const merge = this.options?.mergeUpdates ?? mergeUpdates; + + return merge(updates.filter(bin => !isEmptyUpdate(bin))); + } +} diff --git a/packages/common/nbstore/src/sync/blob/index.ts b/packages/common/nbstore/src/sync/blob/index.ts index 6e21d046c6..b54e1868a9 100644 --- a/packages/common/nbstore/src/sync/blob/index.ts +++ b/packages/common/nbstore/src/sync/blob/index.ts @@ -4,6 +4,8 @@ import type { BlobStorage } from '../../storage'; import { MANUALLY_STOP, throwIfAborted } from '../../utils/throw-if-aborted'; export class BlobSyncEngine { + private abort: AbortController | null = null; + constructor( readonly local: BlobStorage, readonly remotes: BlobStorage[] @@ -72,18 +74,24 @@ export class BlobSyncEngine { } } - async run(signal?: AbortSignal) { - if (signal?.aborted) { - return; + start() { + if (this.abort) { + this.abort.abort(); } - try { - await this.sync(signal); - } catch (error) { + const abort = new AbortController(); + this.abort = abort; + + this.sync(abort.signal).catch(error => { if (error === MANUALLY_STOP) { return; } console.error('sync blob error', error); - } + }); + } + + stop() { + this.abort?.abort(); + this.abort = null; } } diff --git a/packages/common/nbstore/src/sync/doc/index.ts b/packages/common/nbstore/src/sync/doc/index.ts index 1b46fc0caf..7b7fb63d4d 100644 --- a/packages/common/nbstore/src/sync/doc/index.ts +++ b/packages/common/nbstore/src/sync/doc/index.ts @@ -2,17 +2,37 @@ import type { DocStorage, SyncStorage } from '../../storage'; import { DocSyncPeer } from './peer'; export class DocSyncEngine { + private readonly peers: DocSyncPeer[]; + private abort: AbortController | null = null; + constructor( readonly local: DocStorage, readonly sync: SyncStorage, - readonly peers: DocStorage[] - ) {} + readonly remotes: DocStorage[] + ) { + this.peers = remotes.map(remote => new DocSyncPeer(local, sync, remote)); + } - async run(signal?: AbortSignal) { - await Promise.all( - this.peers.map(peer => - new DocSyncPeer(this.local, this.sync, peer).mainLoop(signal) - ) - ); + start() { + if (this.abort) { + this.abort.abort(); + } + const abort = new AbortController(); + this.abort = abort; + Promise.allSettled( + this.peers.map(peer => peer.mainLoop(abort.signal)) + ).catch(error => { + console.error(error); + }); + } + + stop() { + this.abort?.abort(); + this.abort = null; + } + + addPriority(id: string, priority: number) { + const undo = this.peers.map(peer => peer.addPriority(id, priority)); + return () => undo.forEach(fn => fn()); } } diff --git a/packages/common/nbstore/src/sync/doc/peer.ts b/packages/common/nbstore/src/sync/doc/peer.ts index 6526c52342..688026f28b 100644 --- a/packages/common/nbstore/src/sync/doc/peer.ts +++ b/packages/common/nbstore/src/sync/doc/peer.ts @@ -610,11 +610,6 @@ export class DocSyncPeer { this.statusUpdatedSubject$.next(job.docId); } - setPriority(docId: string, priority: number) { - this.prioritySettings.set(docId, priority); - return this.status.jobDocQueue.setPriority(docId, priority); - } - addPriority(id: string, priority: number) { const oldPriority = this.prioritySettings.get(id) ?? 0; this.prioritySettings.set(id, priority); diff --git a/packages/common/nbstore/src/sync/index.ts b/packages/common/nbstore/src/sync/index.ts index ca2de2e58b..22b640a6c1 100644 --- a/packages/common/nbstore/src/sync/index.ts +++ b/packages/common/nbstore/src/sync/index.ts @@ -3,37 +3,44 @@ import { BlobSyncEngine } from './blob'; import { DocSyncEngine } from './doc'; export class SyncEngine { + private readonly doc: DocSyncEngine | null; + private readonly blob: BlobSyncEngine | null; + constructor( readonly local: SpaceStorage, readonly peers: SpaceStorage[] - ) {} + ) { + const doc = local.tryGet('doc'); + const blob = local.tryGet('blob'); + const sync = local.tryGet('sync'); - async run(signal?: AbortSignal) { - const doc = this.local.tryGet('doc'); - const blob = this.local.tryGet('blob'); - const sync = this.local.tryGet('sync'); - - await Promise.allSettled([ - (async () => { - if (doc && sync) { - const peerDocs = this.peers - .map(peer => peer.tryGet('doc')) - .filter((v): v is DocStorage => !!v); - - const engine = new DocSyncEngine(doc, sync, peerDocs); - await engine.run(signal); - } - })(), - (async () => { - if (blob) { - const peerBlobs = this.peers + this.doc = + doc && sync + ? new DocSyncEngine( + doc, + sync, + peers + .map(peer => peer.tryGet('doc')) + .filter((v): v is DocStorage => !!v) + ) + : null; + this.blob = blob + ? new BlobSyncEngine( + blob, + peers .map(peer => peer.tryGet('blob')) - .filter((v): v is BlobStorage => !!v); + .filter((v): v is BlobStorage => !!v) + ) + : null; + } - const engine = new BlobSyncEngine(blob, peerBlobs); - await engine.run(signal); - } - })(), - ]); + start() { + this.doc?.start(); + this.blob?.start(); + } + + stop() { + this.doc?.stop(); + this.blob?.stop(); } }