From 2e71c980cf7c76a5097ab1fd8513bff5217e32e8 Mon Sep 17 00:00:00 2001 From: EYHN Date: Tue, 30 Jan 2024 06:31:23 +0000 Subject: [PATCH] feat(infra): new workspace infra (#5617) This PR copying @affine/workspace into common/infra, and adding definitions for services and unit tests. --- packages/common/infra/package.json | 1 + packages/common/infra/src/index.ts | 9 +- .../src/utils/__tests__/async-queue.spec.ts | 45 ++ .../utils/__tests__/throw-if-aborted.spec.ts | 13 + .../common/infra/src/utils/async-queue.ts | 101 ++++ packages/common/infra/src/utils/index.ts | 5 + .../common/infra/src/utils/merge-updates.ts | 17 + .../common/infra/src/utils/object-pool.ts | 96 ++++ .../infra/src/utils/throw-if-aborted.ts | 9 + .../src/workspace/__tests__/workspace.spec.ts | 38 ++ .../common/infra/src/workspace/context.ts | 76 +++ .../infra/src/workspace/engine/awareness.ts | 25 + .../common/infra/src/workspace/engine/blob.ts | 242 ++++++++++ .../infra/src/workspace/engine/error.ts | 5 + .../infra/src/workspace/engine/index.ts | 81 ++++ .../engine/sync/__tests__/engine.spec.ts | 152 ++++++ .../engine/sync/__tests__/peer.spec.ts | 100 ++++ .../engine/sync/__tests__/test-storage.ts | 42 ++ .../infra/src/workspace/engine/sync/consts.ts | 15 + .../infra/src/workspace/engine/sync/engine.ts | 285 +++++++++++ .../infra/src/workspace/engine/sync/index.ts | 20 + .../infra/src/workspace/engine/sync/peer.ts | 444 ++++++++++++++++++ .../src/workspace/engine/sync/storage.ts | 25 + .../common/infra/src/workspace/factory.ts | 15 + .../infra/src/workspace/global-schema.ts | 6 + packages/common/infra/src/workspace/index.ts | 81 ++++ .../common/infra/src/workspace/list/cache.ts | 25 + .../common/infra/src/workspace/list/index.ts | 301 ++++++++++++ .../infra/src/workspace/list/information.ts | 92 ++++ .../common/infra/src/workspace/manager.ts | 190 ++++++++ .../common/infra/src/workspace/metadata.ts | 3 + .../infra/src/workspace/service-scope.ts | 3 + .../common/infra/src/workspace/testing.ts | 244 ++++++++++ .../common/infra/src/workspace/upgrade.ts | 142 ++++++ .../common/infra/src/workspace/workspace.ts | 131 ++++++ packages/frontend/electron/package.json | 2 +- yarn.lock | 5 +- 37 files changed, 3082 insertions(+), 4 deletions(-) create mode 100644 packages/common/infra/src/utils/__tests__/async-queue.spec.ts create mode 100644 packages/common/infra/src/utils/__tests__/throw-if-aborted.spec.ts create mode 100644 packages/common/infra/src/utils/async-queue.ts create mode 100644 packages/common/infra/src/utils/index.ts create mode 100644 packages/common/infra/src/utils/merge-updates.ts create mode 100644 packages/common/infra/src/utils/object-pool.ts create mode 100644 packages/common/infra/src/utils/throw-if-aborted.ts create mode 100644 packages/common/infra/src/workspace/__tests__/workspace.spec.ts create mode 100644 packages/common/infra/src/workspace/context.ts create mode 100644 packages/common/infra/src/workspace/engine/awareness.ts create mode 100644 packages/common/infra/src/workspace/engine/blob.ts create mode 100644 packages/common/infra/src/workspace/engine/error.ts create mode 100644 packages/common/infra/src/workspace/engine/index.ts create mode 100644 packages/common/infra/src/workspace/engine/sync/__tests__/engine.spec.ts create mode 100644 packages/common/infra/src/workspace/engine/sync/__tests__/peer.spec.ts create mode 100644 packages/common/infra/src/workspace/engine/sync/__tests__/test-storage.ts create mode 100644 packages/common/infra/src/workspace/engine/sync/consts.ts create mode 100644 packages/common/infra/src/workspace/engine/sync/engine.ts create mode 100644 packages/common/infra/src/workspace/engine/sync/index.ts create mode 100644 packages/common/infra/src/workspace/engine/sync/peer.ts create mode 100644 packages/common/infra/src/workspace/engine/sync/storage.ts create mode 100644 packages/common/infra/src/workspace/factory.ts create mode 100644 packages/common/infra/src/workspace/global-schema.ts create mode 100644 packages/common/infra/src/workspace/index.ts create mode 100644 packages/common/infra/src/workspace/list/cache.ts create mode 100644 packages/common/infra/src/workspace/list/index.ts create mode 100644 packages/common/infra/src/workspace/list/information.ts create mode 100644 packages/common/infra/src/workspace/manager.ts create mode 100644 packages/common/infra/src/workspace/metadata.ts create mode 100644 packages/common/infra/src/workspace/service-scope.ts create mode 100644 packages/common/infra/src/workspace/testing.ts create mode 100644 packages/common/infra/src/workspace/upgrade.ts create mode 100644 packages/common/infra/src/workspace/workspace.ts diff --git a/packages/common/infra/package.json b/packages/common/infra/package.json index c7e991d445..071a12996b 100644 --- a/packages/common/infra/package.json +++ b/packages/common/infra/package.json @@ -23,6 +23,7 @@ "foxact": "^0.2.20", "jotai": "^2.5.1", "jotai-effect": "^0.2.3", + "lodash-es": "^4.17.21", "nanoid": "^5.0.3", "react": "18.2.0", "tinykeys": "patch:tinykeys@npm%3A2.1.0#~/.yarn/patches/tinykeys-npm-2.1.0-819feeaed0.patch", diff --git a/packages/common/infra/src/index.ts b/packages/common/infra/src/index.ts index d228f9670c..d4d5815909 100644 --- a/packages/common/infra/src/index.ts +++ b/packages/common/infra/src/index.ts @@ -5,17 +5,24 @@ export * from './command'; export * from './di'; export * from './livedata'; export * from './storage'; +export * from './utils'; +export * from './workspace'; import type { ServiceCollection } from './di'; import { CleanupService } from './lifecycle'; import { GlobalCache, GlobalState, MemoryMemento } from './storage'; +import { + configureTestingWorkspaceServices, + configureWorkspaceServices, +} from './workspace'; export function configureInfraServices(services: ServiceCollection) { services.add(CleanupService); + configureWorkspaceServices(services); } export function configureTestingInfraServices(services: ServiceCollection) { - configureInfraServices(services); + configureTestingWorkspaceServices(services); services.addImpl(GlobalCache, MemoryMemento); services.addImpl(GlobalState, MemoryMemento); } diff --git a/packages/common/infra/src/utils/__tests__/async-queue.spec.ts b/packages/common/infra/src/utils/__tests__/async-queue.spec.ts new file mode 100644 index 0000000000..017401ec84 --- /dev/null +++ b/packages/common/infra/src/utils/__tests__/async-queue.spec.ts @@ -0,0 +1,45 @@ +import { describe, expect, test, vi } from 'vitest'; + +import { AsyncQueue } from '../async-queue'; + +describe('async-queue', () => { + test('push & pop', async () => { + const queue = new AsyncQueue(); + queue.push(1, 2, 3); + expect(queue.length).toBe(3); + expect(await queue.next()).toBe(1); + expect(await queue.next()).toBe(2); + expect(await queue.next()).toBe(3); + expect(queue.length).toBe(0); + }); + + test('await', async () => { + const queue = new AsyncQueue(); + queue.push(1, 2); + expect(await queue.next()).toBe(1); + expect(await queue.next()).toBe(2); + + let v = -1; + + // setup 2 pop tasks + queue.next().then(next => { + v = next; + }); + queue.next().then(next => { + v = next; + }); + + // Wait for 100ms + await new Promise(resolve => setTimeout(resolve, 100)); + // v should not be changed + expect(v).toBe(-1); + + // push 3, should trigger the first pop task + queue.push(3); + await vi.waitFor(() => v === 3); + + // push 4, should trigger the second pop task + queue.push(4); + await vi.waitFor(() => v === 4); + }); +}); diff --git a/packages/common/infra/src/utils/__tests__/throw-if-aborted.spec.ts b/packages/common/infra/src/utils/__tests__/throw-if-aborted.spec.ts new file mode 100644 index 0000000000..137f748a6b --- /dev/null +++ b/packages/common/infra/src/utils/__tests__/throw-if-aborted.spec.ts @@ -0,0 +1,13 @@ +import { describe, expect, test } from 'vitest'; + +import { throwIfAborted } from '../throw-if-aborted'; + +describe('throw-if-aborted', () => { + test('basic', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; + expect(throwIfAborted(abortSignal)).toBe(true); + abortController.abort('TEST_ABORT'); + expect(() => throwIfAborted(abortSignal)).toThrowError('TEST_ABORT'); + }); +}); diff --git a/packages/common/infra/src/utils/async-queue.ts b/packages/common/infra/src/utils/async-queue.ts new file mode 100644 index 0000000000..e7f994a39b --- /dev/null +++ b/packages/common/infra/src/utils/async-queue.ts @@ -0,0 +1,101 @@ +export class AsyncQueue { + private _queue: T[]; + + private _resolveUpdate: (() => void) | null = null; + private _waitForUpdate: Promise | null = null; + + constructor(init: T[] = []) { + this._queue = init; + } + + get length() { + return this._queue.length; + } + + async next( + abort?: AbortSignal, + dequeue: (arr: T[]) => T | undefined = a => a.shift() + ): Promise { + const update = dequeue(this._queue); + if (update) { + return update; + } else { + if (!this._waitForUpdate) { + this._waitForUpdate = new Promise(resolve => { + this._resolveUpdate = resolve; + }); + } + + await Promise.race([ + this._waitForUpdate, + new Promise((_, reject) => { + if (abort?.aborted) { + reject(abort?.reason); + } + abort?.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + ]); + + return this.next(abort, dequeue); + } + } + + push(...updates: T[]) { + this._queue.push(...updates); + if (this._resolveUpdate) { + const resolve = this._resolveUpdate; + this._resolveUpdate = null; + this._waitForUpdate = null; + resolve(); + } + } + + remove(predicate: (update: T) => boolean) { + const index = this._queue.findIndex(predicate); + if (index !== -1) { + this._queue.splice(index, 1); + } + } + + find(predicate: (update: T) => boolean) { + return this._queue.find(predicate); + } + + clear() { + this._queue = []; + } +} + +export class PriorityAsyncQueue< + T extends { id: string }, +> extends AsyncQueue { + constructor( + init: T[] = [], + public readonly priorityTarget: SharedPriorityTarget = new SharedPriorityTarget() + ) { + super(init); + } + + override next(abort?: AbortSignal | undefined): Promise { + return super.next(abort, arr => { + if (this.priorityTarget.priorityRule !== null) { + const index = arr.findIndex( + update => this.priorityTarget.priorityRule?.(update.id) + ); + if (index !== -1) { + return arr.splice(index, 1)[0]; + } + } + return arr.shift(); + }); + } +} + +/** + * Shared priority target can be shared by multiple queues. + */ +export class SharedPriorityTarget { + public priorityRule: ((id: string) => boolean) | null = null; +} diff --git a/packages/common/infra/src/utils/index.ts b/packages/common/infra/src/utils/index.ts new file mode 100644 index 0000000000..08fe1ee8ca --- /dev/null +++ b/packages/common/infra/src/utils/index.ts @@ -0,0 +1,5 @@ +export * from './async-queue'; +export * from './merge-updates'; +export * from './object-pool'; +export * from './stable-hash'; +export * from './throw-if-aborted'; diff --git a/packages/common/infra/src/utils/merge-updates.ts b/packages/common/infra/src/utils/merge-updates.ts new file mode 100644 index 0000000000..e3c8a4a06a --- /dev/null +++ b/packages/common/infra/src/utils/merge-updates.ts @@ -0,0 +1,17 @@ +import { applyUpdate, Doc, encodeStateAsUpdate } from 'yjs'; + +export function mergeUpdates(updates: Uint8Array[]) { + if (updates.length === 0) { + return new Uint8Array(); + } + if (updates.length === 1) { + return updates[0]; + } + const doc = new Doc(); + doc.transact(() => { + updates.forEach(update => { + applyUpdate(doc, update); + }); + }); + return encodeStateAsUpdate(doc); +} diff --git a/packages/common/infra/src/utils/object-pool.ts b/packages/common/infra/src/utils/object-pool.ts new file mode 100644 index 0000000000..a8569ee3e7 --- /dev/null +++ b/packages/common/infra/src/utils/object-pool.ts @@ -0,0 +1,96 @@ +import { Unreachable } from '@affine/env/constant'; + +export interface RcRef { + obj: T; + release: () => void; +} + +export class ObjectPool { + objects = new Map(); + timeoutToGc: NodeJS.Timeout | null = null; + + constructor( + private readonly options: { + onDelete?: (obj: T) => void; + onDangling?: (obj: T) => boolean; + } = {} + ) {} + + get(key: Key): RcRef | null { + const exist = this.objects.get(key); + if (exist) { + exist.rc++; + let released = false; + return { + obj: exist.obj, + release: () => { + // avoid double release + if (released) { + return; + } + released = true; + exist.rc--; + this.requestGc(); + }, + }; + } + return null; + } + + put(key: Key, obj: T) { + const ref = { obj, rc: 0 }; + this.objects.set(key, ref); + + const r = this.get(key); + if (!r) { + throw new Unreachable(); + } + + return r; + } + + private requestGc() { + if (this.timeoutToGc) { + clearInterval(this.timeoutToGc); + } + + // do gc every 1s + this.timeoutToGc = setInterval(() => { + this.gc(); + }, 1000); + } + + private gc() { + for (const [key, { obj, rc }] of new Map( + this.objects /* clone the map, because the origin will be modified during iteration */ + )) { + if ( + rc === 0 && + (!this.options.onDangling || this.options.onDangling(obj)) + ) { + this.options.onDelete?.(obj); + + this.objects.delete(key); + } + } + + for (const [_, { rc }] of this.objects) { + if (rc === 0) { + return; + } + } + + // if all object has referrer, stop gc + if (this.timeoutToGc) { + clearInterval(this.timeoutToGc); + } + } + + clear() { + for (const { obj } of this.objects.values()) { + this.options.onDelete?.(obj); + } + + this.objects.clear(); + } +} diff --git a/packages/common/infra/src/utils/throw-if-aborted.ts b/packages/common/infra/src/utils/throw-if-aborted.ts new file mode 100644 index 0000000000..54e2c81ac9 --- /dev/null +++ b/packages/common/infra/src/utils/throw-if-aborted.ts @@ -0,0 +1,9 @@ +// because AbortSignal.throwIfAborted is not available in abortcontroller-polyfill +export function throwIfAborted(abort?: AbortSignal) { + if (abort?.aborted) { + throw new Error(abort.reason); + } + return true; +} + +export const MANUALLY_STOP = 'manually-stop'; diff --git a/packages/common/infra/src/workspace/__tests__/workspace.spec.ts b/packages/common/infra/src/workspace/__tests__/workspace.spec.ts new file mode 100644 index 0000000000..8d15d1196c --- /dev/null +++ b/packages/common/infra/src/workspace/__tests__/workspace.spec.ts @@ -0,0 +1,38 @@ +import { WorkspaceFlavour } from '@affine/env/workspace'; +import { describe, expect, test } from 'vitest'; + +import { configureInfraServices, configureTestingInfraServices } from '../..'; +import { ServiceCollection } from '../../di'; +import { WorkspaceListService, WorkspaceManager } from '../'; + +describe('Workspace System', () => { + test('create workspace', async () => { + const services = new ServiceCollection(); + configureInfraServices(services); + configureTestingInfraServices(services); + + const provider = services.provider(); + const workspaceManager = provider.get(WorkspaceManager); + const workspaceListService = provider.get(WorkspaceListService); + expect(workspaceListService.workspaceList.value.length).toBe(0); + + const { workspace } = workspaceManager.open( + await workspaceManager.createWorkspace(WorkspaceFlavour.LOCAL) + ); + + expect(workspaceListService.workspaceList.value.length).toBe(1); + + const page = workspace.blockSuiteWorkspace.createPage({ + id: 'page0', + }); + await page.load(); + page.addBlock('affine:page', { + title: new page.Text('test-page'), + }); + + expect(workspace.blockSuiteWorkspace.pages.size).toBe(1); + expect( + (page!.getBlockByFlavour('affine:page')[0] as any).title.toString() + ).toBe('test-page'); + }); +}); diff --git a/packages/common/infra/src/workspace/context.ts b/packages/common/infra/src/workspace/context.ts new file mode 100644 index 0000000000..41e2f8d248 --- /dev/null +++ b/packages/common/infra/src/workspace/context.ts @@ -0,0 +1,76 @@ +/** + * This module contains the context of the workspace scope. + * You can use those context when declare workspace service. + * + * Is helpful when implement workspace low level providers, like `SyncEngine`, + * which need to access workspace low level components. + * + * Normally, business service should depend on `Workspace` service, not workspace context. + * + * @example + * ```ts + * import { declareWorkspaceService } from '@toeverything/infra'; + * declareWorkspaceService(XXXService, { + * factory: declareFactory( + * [BlockSuiteWorkspaceContext, RootYDocContext], // <== inject workspace context + * (bs, rootDoc) => new XXXService(bs.value, rootDoc.value) + * ), + * }) + */ + +import { Workspace as BlockSuiteWorkspace } from '@blocksuite/store'; +import { nanoid } from 'nanoid'; +import type { Awareness } from 'y-protocols/awareness.js'; +import type { Doc as YDoc } from 'yjs'; + +import { createIdentifier, type ServiceCollection } from '../di'; +import { BlobEngine } from './engine/blob'; +import { globalBlockSuiteSchema } from './global-schema'; +import type { WorkspaceMetadata } from './metadata'; +import { WorkspaceScope } from './service-scope'; + +export const BlockSuiteWorkspaceContext = createIdentifier( + 'BlockSuiteWorkspaceContext' +); + +export const RootYDocContext = createIdentifier('RootYDocContext'); + +export const AwarenessContext = createIdentifier('AwarenessContext'); + +export const WorkspaceMetadataContext = createIdentifier( + 'WorkspaceMetadataContext' +); + +export const WorkspaceIdContext = + createIdentifier('WorkspaceIdContext'); + +export function configureWorkspaceContext( + services: ServiceCollection, + workspaceMetadata: WorkspaceMetadata +) { + services + .scope(WorkspaceScope) + .addImpl(WorkspaceMetadataContext, workspaceMetadata) + .addImpl(WorkspaceIdContext, workspaceMetadata.id) + .addImpl(BlockSuiteWorkspaceContext, provider => { + return new BlockSuiteWorkspace({ + id: workspaceMetadata.id, + blobStorages: [ + () => ({ + crud: provider.get(BlobEngine), + }), + ], + idGenerator: () => nanoid(), + schema: globalBlockSuiteSchema, + }); + }) + .addImpl( + AwarenessContext, + provider => + provider.get(BlockSuiteWorkspaceContext).awarenessStore.awareness + ) + .addImpl( + RootYDocContext, + provider => provider.get(BlockSuiteWorkspaceContext).doc + ); +} diff --git a/packages/common/infra/src/workspace/engine/awareness.ts b/packages/common/infra/src/workspace/engine/awareness.ts new file mode 100644 index 0000000000..4964b264f3 --- /dev/null +++ b/packages/common/infra/src/workspace/engine/awareness.ts @@ -0,0 +1,25 @@ +import { createIdentifier } from '../../di'; + +export interface AwarenessProvider { + connect(): void; + disconnect(): void; +} + +export const AwarenessProvider = + createIdentifier('AwarenessProvider'); + +export class AwarenessEngine { + constructor(public readonly providers: AwarenessProvider[]) {} + + static get EMPTY() { + return new AwarenessEngine([]); + } + + connect() { + this.providers.forEach(provider => provider.connect()); + } + + disconnect() { + this.providers.forEach(provider => provider.disconnect()); + } +} diff --git a/packages/common/infra/src/workspace/engine/blob.ts b/packages/common/infra/src/workspace/engine/blob.ts new file mode 100644 index 0000000000..4bdb888480 --- /dev/null +++ b/packages/common/infra/src/workspace/engine/blob.ts @@ -0,0 +1,242 @@ +import { DebugLogger } from '@affine/debug'; +import { Slot } from '@blocksuite/global/utils'; +import { difference } from 'lodash-es'; + +import { createIdentifier } from '../../di'; +import { BlobStorageOverCapacity } from './error'; + +const logger = new DebugLogger('affine:blob-engine'); + +export interface BlobStorage { + name: string; + readonly: boolean; + get: (key: string) => Promise; + set: (key: string, value: Blob) => Promise; + delete: (key: string) => Promise; + list: () => Promise; +} + +export const LocalBlobStorage = + createIdentifier('LocalBlobStorage'); + +export const RemoteBlobStorage = + createIdentifier('RemoteBlobStorage'); + +export interface BlobStatus { + isStorageOverCapacity: boolean; +} + +/** + * # BlobEngine + * + * sync blobs between storages in background. + * + * all operations priority use local, then use remote. + */ +export class BlobEngine { + private abort: AbortController | null = null; + private _status: BlobStatus = { isStorageOverCapacity: false }; + onStatusChange = new Slot(); + singleBlobSizeLimit: number = 100 * 1024 * 1024; + onAbortLargeBlob = new Slot(); + + private set status(s: BlobStatus) { + logger.debug('status change', s); + this._status = s; + this.onStatusChange.emit(s); + } + get status() { + return this._status; + } + + constructor( + private readonly local: BlobStorage, + private readonly remotes: BlobStorage[] + ) {} + + static get EMPTY() { + return new BlobEngine(createEmptyBlobStorage(), []); + } + + start() { + if (this.abort || this._status.isStorageOverCapacity) { + return; + } + this.abort = new AbortController(); + const abortSignal = this.abort.signal; + + const sync = () => { + if (abortSignal.aborted) { + return; + } + + this.sync() + .catch(error => { + logger.error('sync blob error', error); + }) + .finally(() => { + // sync every 1 minute + setTimeout(sync, 60000); + }); + }; + + sync(); + } + + stop() { + this.abort?.abort(); + this.abort = null; + } + + get storages() { + return [this.local, ...this.remotes]; + } + + async sync() { + if (this.local.readonly) { + return; + } + logger.debug('start syncing blob...'); + for (const remote of this.remotes) { + let localList: string[] = []; + let remoteList: string[] = []; + + if (!remote.readonly) { + try { + localList = await this.local.list(); + remoteList = await remote.list(); + } catch (err) { + logger.error(`error when sync`, err); + continue; + } + + const needUpload = difference(localList, remoteList); + for (const key of needUpload) { + try { + const data = await this.local.get(key); + if (data) { + await remote.set(key, data); + } + } catch (err) { + logger.error( + `error when sync ${key} from [${this.local.name}] to [${remote.name}]`, + err + ); + } + } + } + + const needDownload = difference(remoteList, localList); + + for (const key of needDownload) { + try { + const data = await remote.get(key); + if (data) { + await this.local.set(key, data); + } + } catch (err) { + if (err instanceof BlobStorageOverCapacity) { + this.status = { + isStorageOverCapacity: true, + }; + } + logger.error( + `error when sync ${key} from [${remote.name}] to [${this.local.name}]`, + err + ); + } + } + } + + logger.debug('finish syncing blob'); + } + + async get(key: string) { + logger.debug('get blob', key); + for (const storage of this.storages) { + const data = await storage.get(key); + if (data) { + return data; + } + } + return null; + } + + async set(key: string, value: Blob) { + if (this.local.readonly) { + throw new Error('local peer is readonly'); + } + + if (value.size > this.singleBlobSizeLimit) { + this.onAbortLargeBlob.emit(value); + logger.error('blob over limit, abort set'); + return key; + } + + // await upload to the local peer + await this.local.set(key, value); + + // uploads to other peers in the background + Promise.allSettled( + this.remotes + .filter(r => !r.readonly) + .map(peer => + peer.set(key, value).catch(err => { + logger.error('Error when uploading to peer', err); + }) + ) + ) + .then(result => { + if (result.some(({ status }) => status === 'rejected')) { + logger.error( + `blob ${key} update finish, but some peers failed to update` + ); + } else { + logger.debug(`blob ${key} update finish`); + } + }) + .catch(() => { + // Promise.allSettled never reject + }); + + return key; + } + + async delete(_key: string) { + // not supported + } + + async list() { + const blobList = new Set(); + + for (const peer of this.storages) { + const list = await peer.list(); + if (list) { + for (const blob of list) { + blobList.add(blob); + } + } + } + + return Array.from(blobList); + } +} + +export function createEmptyBlobStorage() { + return { + name: 'empty', + readonly: true, + async get(_key: string) { + return null; + }, + async set(_key: string, _value: Blob) { + throw new Error('not supported'); + }, + async delete(_key: string) { + throw new Error('not supported'); + }, + async list() { + return []; + }, + } satisfies BlobStorage; +} diff --git a/packages/common/infra/src/workspace/engine/error.ts b/packages/common/infra/src/workspace/engine/error.ts new file mode 100644 index 0000000000..db74dc86f6 --- /dev/null +++ b/packages/common/infra/src/workspace/engine/error.ts @@ -0,0 +1,5 @@ +export class BlobStorageOverCapacity extends Error { + constructor(public originError?: any) { + super('Blob storage over capacity.'); + } +} diff --git a/packages/common/infra/src/workspace/engine/index.ts b/packages/common/infra/src/workspace/engine/index.ts new file mode 100644 index 0000000000..402d2a7739 --- /dev/null +++ b/packages/common/infra/src/workspace/engine/index.ts @@ -0,0 +1,81 @@ +import { Slot } from '@blocksuite/global/utils'; + +import { throwIfAborted } from '../../utils/throw-if-aborted'; +import type { AwarenessEngine } from './awareness'; +import type { BlobEngine, BlobStatus } from './blob'; +import type { SyncEngine } from './sync'; +import { type SyncEngineStatus } from './sync'; + +export interface WorkspaceEngineStatus { + sync: SyncEngineStatus; + blob: BlobStatus; +} + +/** + * # WorkspaceEngine + * + * sync ydoc, blob, awareness together + */ +export class WorkspaceEngine { + _status: WorkspaceEngineStatus; + onStatusChange = new Slot(); + + get status() { + return this._status; + } + + set status(status: WorkspaceEngineStatus) { + this._status = status; + this.onStatusChange.emit(status); + } + + constructor( + public blob: BlobEngine, + public sync: SyncEngine, + public awareness: AwarenessEngine + ) { + this._status = { + sync: sync.status, + blob: blob.status, + }; + sync.onStatusChange.on(status => { + this.status = { + sync: status, + blob: blob.status, + }; + }); + blob.onStatusChange.on(status => { + this.status = { + sync: sync.status, + blob: status, + }; + }); + } + + start() { + this.sync.start(); + this.awareness.connect(); + this.blob.start(); + } + + canGracefulStop() { + return this.sync.canGracefulStop(); + } + + async waitForGracefulStop(abort?: AbortSignal) { + await this.sync.waitForGracefulStop(abort); + throwIfAborted(abort); + this.forceStop(); + } + + forceStop() { + this.sync.forceStop(); + this.awareness.disconnect(); + this.blob.stop(); + } +} + +export * from './awareness'; +export * from './blob'; +export * from './error'; +export * from './sync'; diff --git a/packages/common/infra/src/workspace/engine/sync/__tests__/engine.spec.ts b/packages/common/infra/src/workspace/engine/sync/__tests__/engine.spec.ts new file mode 100644 index 0000000000..f29fac9122 --- /dev/null +++ b/packages/common/infra/src/workspace/engine/sync/__tests__/engine.spec.ts @@ -0,0 +1,152 @@ +import { WorkspaceFlavour } from '@affine/env/workspace'; +import { Workspace } from '@blocksuite/store'; +import { beforeEach, describe, expect, test, vi } from 'vitest'; +import { Doc } from 'yjs'; + +import { MemoryMemento } from '../../../../storage'; +import { globalBlockSuiteSchema } from '../../../global-schema'; +import { TestingSyncStorage } from '../../../testing'; +import { SyncEngineStep, SyncPeerStep } from '../consts'; +import { SyncEngine } from '../engine'; +import { createTestStorage } from './test-storage'; + +beforeEach(() => { + vi.useFakeTimers({ toFake: ['requestIdleCallback'] }); +}); + +const testMeta = { + id: 'test', + flavour: WorkspaceFlavour.LOCAL, +}; + +describe('SyncEngine', () => { + test('basic - indexeddb', async () => { + const storage = new MemoryMemento(); + const storage1 = new MemoryMemento(); + const storage2 = new MemoryMemento(); + let prev: any; + { + const workspace = new Workspace({ + id: 'test', + + schema: globalBlockSuiteSchema, + }); + + const syncEngine = new SyncEngine( + workspace.doc, + new TestingSyncStorage(testMeta, storage), + [ + new TestingSyncStorage(testMeta, storage1), + new TestingSyncStorage(testMeta, storage2), + ] + ); + syncEngine.start(); + + const page = workspace.createPage({ + id: 'page0', + }); + await page.load(); + const pageBlockId = page.addBlock('affine:page', { + title: new page.Text(''), + }); + page.addBlock('affine:surface', {}, pageBlockId); + const frameId = page.addBlock('affine:note', {}, pageBlockId); + page.addBlock('affine:paragraph', {}, frameId); + await syncEngine.waitForSynced(); + syncEngine.forceStop(); + prev = workspace.doc.toJSON(); + } + + for (const current of [storage, storage1, storage2]) { + const workspace = new Workspace({ + id: 'test', + + schema: globalBlockSuiteSchema, + }); + const syncEngine = new SyncEngine( + workspace.doc, + new TestingSyncStorage(testMeta, current), + [] + ); + syncEngine.start(); + await syncEngine.waitForSynced(); + expect(workspace.doc.toJSON()).toEqual({ + ...prev, + }); + syncEngine.forceStop(); + } + }); + + test('status', async () => { + const ydoc = new Doc({ guid: 'test' }); + + const storage1 = new MemoryMemento(); + const storage2 = new MemoryMemento(); + + const localStorage = createTestStorage( + new TestingSyncStorage(testMeta, storage1) + ); + const remoteStorage = createTestStorage( + new TestingSyncStorage(testMeta, storage2) + ); + + localStorage.pausePull(); + localStorage.pausePush(); + remoteStorage.pausePull(); + remoteStorage.pausePush(); + + const syncEngine = new SyncEngine(ydoc, localStorage, [remoteStorage]); + expect(syncEngine.status.step).toEqual(SyncEngineStep.Stopped); + + syncEngine.start(); + + await vi.waitFor(() => { + expect(syncEngine.status.step).toEqual(SyncEngineStep.Syncing); + expect(syncEngine.status.local?.step).toEqual( + SyncPeerStep.LoadingRootDoc + ); + }); + + localStorage.resumePull(); + + await vi.waitFor(() => { + expect(syncEngine.status.step).toEqual(SyncEngineStep.Syncing); + expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Synced); + expect(syncEngine.status.remotes[0]?.step).toEqual( + SyncPeerStep.LoadingRootDoc + ); + }); + + remoteStorage.resumePull(); + + await vi.waitFor(() => { + expect(syncEngine.status.step).toEqual(SyncEngineStep.Synced); + expect(syncEngine.status.remotes[0]?.step).toEqual(SyncPeerStep.Synced); + expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Synced); + }); + + ydoc.getArray('test').insert(0, [1, 2, 3]); + + await vi.waitFor(() => { + expect(syncEngine.status.step).toEqual(SyncEngineStep.Syncing); + expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Syncing); + expect(syncEngine.status.remotes[0]?.step).toEqual(SyncPeerStep.Syncing); + }); + + localStorage.resumePush(); + + await vi.waitFor(() => { + expect(syncEngine.status.step).toEqual(SyncEngineStep.Syncing); + expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Synced); + expect(syncEngine.status.remotes[0]?.step).toEqual(SyncPeerStep.Syncing); + }); + + remoteStorage.resumePush(); + + await vi.waitFor(() => { + expect(syncEngine.status.step).toEqual(SyncEngineStep.Synced); + expect(syncEngine.status.local?.step).toEqual(SyncPeerStep.Synced); + expect(syncEngine.status.remotes[0]?.step).toEqual(SyncPeerStep.Synced); + }); + }); +}); diff --git a/packages/common/infra/src/workspace/engine/sync/__tests__/peer.spec.ts b/packages/common/infra/src/workspace/engine/sync/__tests__/peer.spec.ts new file mode 100644 index 0000000000..b9c53e444f --- /dev/null +++ b/packages/common/infra/src/workspace/engine/sync/__tests__/peer.spec.ts @@ -0,0 +1,100 @@ +import { WorkspaceFlavour } from '@affine/env/workspace'; +import { Workspace } from '@blocksuite/store'; +import { beforeEach, describe, expect, test, vi } from 'vitest'; + +import { MemoryMemento } from '../../../../storage'; +import { globalBlockSuiteSchema } from '../../../global-schema'; +import { TestingSyncStorage } from '../../../testing'; +import { SyncPeerStep } from '../consts'; +import { SyncPeer } from '../peer'; + +beforeEach(() => { + vi.useFakeTimers({ toFake: ['requestIdleCallback'] }); +}); + +const testMeta = { + id: 'test', + flavour: WorkspaceFlavour.LOCAL, +}; + +describe('SyncPeer', () => { + test('basic - indexeddb', async () => { + const storage = new MemoryMemento(); + + let prev: any; + { + const workspace = new Workspace({ + id: 'test', + + schema: globalBlockSuiteSchema, + }); + + const syncPeer = new SyncPeer( + workspace.doc, + new TestingSyncStorage(testMeta, storage) + ); + await syncPeer.waitForLoaded(); + + const page = workspace.createPage({ + id: 'page0', + }); + await page.load(); + const pageBlockId = page.addBlock('affine:page', { + title: new page.Text(''), + }); + page.addBlock('affine:surface', {}, pageBlockId); + const frameId = page.addBlock('affine:note', {}, pageBlockId); + page.addBlock('affine:paragraph', {}, frameId); + await syncPeer.waitForSynced(); + syncPeer.stop(); + prev = workspace.doc.toJSON(); + } + + { + const workspace = new Workspace({ + id: 'test', + + schema: globalBlockSuiteSchema, + }); + const syncPeer = new SyncPeer( + workspace.doc, + new TestingSyncStorage(testMeta, storage) + ); + await syncPeer.waitForSynced(); + expect(workspace.doc.toJSON()).toEqual({ + ...prev, + }); + syncPeer.stop(); + } + }); + + test('status', async () => { + const storage = new MemoryMemento(); + + const workspace = new Workspace({ + id: 'test', + + schema: globalBlockSuiteSchema, + }); + + const syncPeer = new SyncPeer( + workspace.doc, + new TestingSyncStorage(testMeta, storage) + ); + expect(syncPeer.status.step).toBe(SyncPeerStep.LoadingRootDoc); + await syncPeer.waitForSynced(); + expect(syncPeer.status.step).toBe(SyncPeerStep.Synced); + + const page = workspace.createPage({ + id: 'page0', + }); + expect(syncPeer.status.step).toBe(SyncPeerStep.LoadingSubDoc); + await page.load(); + await syncPeer.waitForSynced(); + page.addBlock('affine:page', { + title: new page.Text(''), + }); + expect(syncPeer.status.step).toBe(SyncPeerStep.Syncing); + syncPeer.stop(); + }); +}); diff --git a/packages/common/infra/src/workspace/engine/sync/__tests__/test-storage.ts b/packages/common/infra/src/workspace/engine/sync/__tests__/test-storage.ts new file mode 100644 index 0000000000..733cd3ee1c --- /dev/null +++ b/packages/common/infra/src/workspace/engine/sync/__tests__/test-storage.ts @@ -0,0 +1,42 @@ +import type { SyncStorage } from '../storage'; + +export function createTestStorage(origin: SyncStorage) { + const controler = { + pausedPull: Promise.resolve(), + resumePull: () => {}, + pausedPush: Promise.resolve(), + resumePush: () => {}, + }; + + return { + name: `${origin.name}(testing)`, + pull(docId: string, state: Uint8Array) { + return controler.pausedPull.then(() => origin.pull(docId, state)); + }, + push(docId: string, data: Uint8Array) { + return controler.pausedPush.then(() => origin.push(docId, data)); + }, + subscribe( + cb: (docId: string, data: Uint8Array) => void, + disconnect: (reason: string) => void + ) { + return origin.subscribe(cb, disconnect); + }, + pausePull() { + controler.pausedPull = new Promise(resolve => { + controler.resumePull = resolve; + }); + }, + resumePull() { + controler.resumePull?.(); + }, + pausePush() { + controler.pausedPush = new Promise(resolve => { + controler.resumePush = resolve; + }); + }, + resumePush() { + controler.resumePush?.(); + }, + }; +} diff --git a/packages/common/infra/src/workspace/engine/sync/consts.ts b/packages/common/infra/src/workspace/engine/sync/consts.ts new file mode 100644 index 0000000000..e5fd2e8718 --- /dev/null +++ b/packages/common/infra/src/workspace/engine/sync/consts.ts @@ -0,0 +1,15 @@ +export enum SyncEngineStep { + Stopped = 0, + Syncing = 1, + Synced = 2, +} + +export enum SyncPeerStep { + Stopped = 0, + Retrying = 1, + LoadingRootDoc = 2, + LoadingSubDoc = 3, + Loaded = 4.5, + Syncing = 5, + Synced = 6, +} diff --git a/packages/common/infra/src/workspace/engine/sync/engine.ts b/packages/common/infra/src/workspace/engine/sync/engine.ts new file mode 100644 index 0000000000..996d422a91 --- /dev/null +++ b/packages/common/infra/src/workspace/engine/sync/engine.ts @@ -0,0 +1,285 @@ +import { DebugLogger } from '@affine/debug'; +import { Slot } from '@blocksuite/global/utils'; +import type { Doc } from 'yjs'; + +import { createIdentifier } from '../../../di'; +import { SharedPriorityTarget } from '../../../utils/async-queue'; +import { MANUALLY_STOP, throwIfAborted } from '../../../utils/throw-if-aborted'; +import { SyncEngineStep, SyncPeerStep } from './consts'; +import { SyncPeer, type SyncPeerStatus } from './peer'; +import { type SyncStorage } from './storage'; + +export interface SyncEngineStatus { + step: SyncEngineStep; + local: SyncPeerStatus | null; + remotes: (SyncPeerStatus | null)[]; + retrying: boolean; +} + +export const LocalSyncStorage = + createIdentifier('LocalSyncStorage'); + +export const RemoteSyncStorage = + createIdentifier('RemoteSyncStorage'); + +/** + * # SyncEngine + * + * ``` + * ┌────────────┐ + * │ SyncEngine │ + * └─────┬──────┘ + * │ + * ▼ + * ┌────────────┐ + * │ SyncPeer │ + * ┌─────────┤ local ├─────────┐ + * │ └─────┬──────┘ │ + * │ │ │ + * ▼ ▼ ▼ + * ┌────────────┐ ┌────────────┐ ┌────────────┐ + * │ SyncPeer │ │ SyncPeer │ │ SyncPeer │ + * │ Remote │ │ Remote │ │ Remote │ + * └────────────┘ └────────────┘ └────────────┘ + * ``` + * + * Sync engine manage sync peers + * + * Sync steps: + * 1. start local sync + * 2. wait for local sync complete + * 3. start remote sync + * 4. continuously sync local and remote + */ +export class SyncEngine { + get rootDocId() { + return this.rootDoc.guid; + } + + logger = new DebugLogger('affine:sync-engine:' + this.rootDocId); + private _status: SyncEngineStatus; + onStatusChange = new Slot(); + private set status(s: SyncEngineStatus) { + this.logger.debug('status change', s); + this._status = s; + this.onStatusChange.emit(s); + } + + priorityTarget = new SharedPriorityTarget(); + + get status() { + return this._status; + } + + private abort = new AbortController(); + + constructor( + private readonly rootDoc: Doc, + private readonly local: SyncStorage, + private readonly remotes: SyncStorage[] + ) { + this._status = { + step: SyncEngineStep.Stopped, + local: null, + remotes: remotes.map(() => null), + retrying: false, + }; + } + + start() { + if (this.status.step !== SyncEngineStep.Stopped) { + this.forceStop(); + } + this.abort = new AbortController(); + + this.sync(this.abort.signal).catch(err => { + // should never reach here + this.logger.error(err); + }); + } + + canGracefulStop() { + return !!this.status.local && this.status.local.pendingPushUpdates === 0; + } + + async waitForGracefulStop(abort?: AbortSignal) { + await Promise.race([ + new Promise((_, reject) => { + if (abort?.aborted) { + reject(abort?.reason); + } + abort?.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + new Promise(resolve => { + this.onStatusChange.on(() => { + if (this.canGracefulStop()) { + resolve(); + } + }); + }), + ]); + throwIfAborted(abort); + this.forceStop(); + } + + forceStop() { + this.abort.abort(MANUALLY_STOP); + this._status = { + step: SyncEngineStep.Stopped, + local: null, + remotes: this.remotes.map(() => null), + retrying: false, + }; + } + + // main sync process, should never return until abort + async sync(signal: AbortSignal) { + const state: { + localPeer: SyncPeer | null; + remotePeers: (SyncPeer | null)[]; + } = { + localPeer: null, + remotePeers: this.remotes.map(() => null), + }; + + const cleanUp: (() => void)[] = []; + try { + // Step 1: start local sync peer + state.localPeer = new SyncPeer( + this.rootDoc, + this.local, + this.priorityTarget + ); + + cleanUp.push( + state.localPeer.onStatusChange.on(() => { + if (!signal.aborted) + this.updateSyncingState(state.localPeer, state.remotePeers); + }).dispose + ); + + this.updateSyncingState(state.localPeer, state.remotePeers); + + // Step 2: wait for local sync complete + await state.localPeer.waitForLoaded(signal); + + // Step 3: start remote sync peer + state.remotePeers = this.remotes.map(remote => { + const peer = new SyncPeer(this.rootDoc, remote, this.priorityTarget); + cleanUp.push( + peer.onStatusChange.on(() => { + if (!signal.aborted) + this.updateSyncingState(state.localPeer, state.remotePeers); + }).dispose + ); + return peer; + }); + + this.updateSyncingState(state.localPeer, state.remotePeers); + + // Step 4: continuously sync local and remote + + // wait for abort + await new Promise((_, reject) => { + if (signal.aborted) { + reject(signal.reason); + } + signal.addEventListener('abort', () => { + reject(signal.reason); + }); + }); + } catch (error) { + if (error === MANUALLY_STOP || signal.aborted) { + return; + } + throw error; + } finally { + // stop peers + state.localPeer?.stop(); + for (const remotePeer of state.remotePeers) { + remotePeer?.stop(); + } + for (const clean of cleanUp) { + clean(); + } + } + } + + updateSyncingState(local: SyncPeer | null, remotes: (SyncPeer | null)[]) { + let step = SyncEngineStep.Synced; + const allPeer = [local, ...remotes]; + for (const peer of allPeer) { + if (!peer || peer.status.step !== SyncPeerStep.Synced) { + step = SyncEngineStep.Syncing; + break; + } + } + this.status = { + step, + local: local?.status ?? null, + remotes: remotes.map(peer => peer?.status ?? null), + retrying: allPeer.some( + peer => peer?.status.step === SyncPeerStep.Retrying + ), + }; + } + + async waitForSynced(abort?: AbortSignal) { + if (this.status.step === SyncEngineStep.Synced) { + return; + } else { + return Promise.race([ + new Promise(resolve => { + this.onStatusChange.on(status => { + if (status.step === SyncEngineStep.Synced) { + resolve(); + } + }); + }), + new Promise((_, reject) => { + if (abort?.aborted) { + reject(abort?.reason); + } + abort?.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + ]); + } + } + + async waitForLoadedRootDoc(abort?: AbortSignal) { + function isLoadedRootDoc(status: SyncEngineStatus) { + return ![status.local, ...status.remotes].some( + peer => !peer || peer.step <= SyncPeerStep.LoadingRootDoc + ); + } + if (isLoadedRootDoc(this.status)) { + return; + } else { + return Promise.race([ + new Promise(resolve => { + this.onStatusChange.on(status => { + if (isLoadedRootDoc(status)) { + resolve(); + } + }); + }), + new Promise((_, reject) => { + if (abort?.aborted) { + reject(abort?.reason); + } + abort?.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + ]); + } + } + + setPriorityRule(target: ((id: string) => boolean) | null) { + this.priorityTarget.priorityRule = target; + } +} diff --git a/packages/common/infra/src/workspace/engine/sync/index.ts b/packages/common/infra/src/workspace/engine/sync/index.ts new file mode 100644 index 0000000000..0e3d766d79 --- /dev/null +++ b/packages/common/infra/src/workspace/engine/sync/index.ts @@ -0,0 +1,20 @@ +/** + * + * **SyncEngine** + * + * Manages one local storage and multiple remote storages. + * + * Responsible for creating SyncPeers for synchronization, following the local-first strategy. + * + * **SyncPeer** + * + * Responsible for synchronizing a single storage with Y.Doc. + * + * Carries the main synchronization logic. + * + */ + +export * from './consts'; +export * from './engine'; +export * from './peer'; +export * from './storage'; diff --git a/packages/common/infra/src/workspace/engine/sync/peer.ts b/packages/common/infra/src/workspace/engine/sync/peer.ts new file mode 100644 index 0000000000..fd465d7728 --- /dev/null +++ b/packages/common/infra/src/workspace/engine/sync/peer.ts @@ -0,0 +1,444 @@ +import { DebugLogger } from '@affine/debug'; +import { Slot } from '@blocksuite/global/utils'; +import { isEqual } from '@blocksuite/global/utils'; +import type { Doc } from 'yjs'; +import { applyUpdate, encodeStateAsUpdate, encodeStateVector } from 'yjs'; + +import { + PriorityAsyncQueue, + SharedPriorityTarget, +} from '../../../utils/async-queue'; +import { mergeUpdates } from '../../../utils/merge-updates'; +import { MANUALLY_STOP, throwIfAborted } from '../../../utils/throw-if-aborted'; +import { SyncPeerStep } from './consts'; +import type { SyncStorage } from './storage'; + +export interface SyncPeerStatus { + step: SyncPeerStep; + totalDocs: number; + loadedDocs: number; + pendingPullUpdates: number; + pendingPushUpdates: number; +} + +/** + * # SyncPeer + * A SyncPeer is responsible for syncing one Storage with one Y.Doc and its subdocs. + * + * ``` + * ┌─────┐ + * │Start│ + * └──┬──┘ + * │ + * ┌──────┐ ┌─────▼──────┐ ┌────┐ + * │listen◄─────┤pull rootdoc│ │peer│ + * └──┬───┘ └─────┬──────┘ └──┬─┘ + * │ │ onLoad() │ + * ┌──▼───┐ ┌─────▼──────┐ ┌────▼────┐ + * │listen◄─────┤pull subdocs│ │subscribe│ + * └──┬───┘ └─────┬──────┘ └────┬────┘ + * │ │ onReady() │ + * ┌──▼──┐ ┌─────▼───────┐ ┌──▼──┐ + * │queue├──────►apply updates◄───────┤queue│ + * └─────┘ └─────────────┘ └─────┘ + * ``` + * + * listen: listen for updates from ydoc, typically from user modifications. + * subscribe: listen for updates from storage, typically from other users. + * + */ +export class SyncPeer { + private _status: SyncPeerStatus = { + step: SyncPeerStep.LoadingRootDoc, + totalDocs: 1, + loadedDocs: 0, + pendingPullUpdates: 0, + pendingPushUpdates: 0, + }; + onStatusChange = new Slot(); + readonly abort = new AbortController(); + get name() { + return this.storage.name; + } + logger = new DebugLogger('affine:sync-peer:' + this.name); + + constructor( + private readonly rootDoc: Doc, + private readonly storage: SyncStorage, + private readonly priorityTarget = new SharedPriorityTarget() + ) { + this.logger.debug('peer start'); + + this.syncRetryLoop(this.abort.signal).catch(err => { + // should not reach here + console.error(err); + }); + } + + private set status(s: SyncPeerStatus) { + if (!isEqual(s, this._status)) { + this.logger.debug('status change', s); + this._status = s; + this.onStatusChange.emit(s); + } + } + + get status() { + return this._status; + } + + /** + * stop sync + * + * SyncPeer is one-time use, this peer should be discarded after call stop(). + */ + stop() { + this.logger.debug('peer stop'); + this.abort.abort(MANUALLY_STOP); + } + + /** + * auto retry after 5 seconds if sync failed + */ + async syncRetryLoop(abort: AbortSignal) { + while (abort.aborted === false) { + try { + await this.sync(abort); + } catch (err) { + if (err === MANUALLY_STOP || abort.aborted) { + return; + } + + this.logger.error('sync error', err); + } + try { + this.logger.error('retry after 5 seconds'); + this.status = { + step: SyncPeerStep.Retrying, + totalDocs: 1, + loadedDocs: 0, + pendingPullUpdates: 0, + pendingPushUpdates: 0, + }; + await Promise.race([ + new Promise(resolve => { + setTimeout(resolve, 5 * 1000); + }), + new Promise((_, reject) => { + // exit if manually stopped + if (abort.aborted) { + reject(abort.reason); + } + abort.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + ]); + } catch (err) { + if (err === MANUALLY_STOP || abort.aborted) { + return; + } + + // should never reach here + throw err; + } + } + } + + private readonly state: { + connectedDocs: Map; + pushUpdatesQueue: PriorityAsyncQueue<{ + id: string; + data: Uint8Array[]; + }>; + pushingUpdate: boolean; + pullUpdatesQueue: PriorityAsyncQueue<{ + id: string; + data: Uint8Array; + }>; + subdocLoading: boolean; + subdocsLoadQueue: PriorityAsyncQueue<{ id: string; doc: Doc }>; + } = { + connectedDocs: new Map(), + pushUpdatesQueue: new PriorityAsyncQueue([], this.priorityTarget), + pushingUpdate: false, + pullUpdatesQueue: new PriorityAsyncQueue([], this.priorityTarget), + subdocLoading: false, + subdocsLoadQueue: new PriorityAsyncQueue([], this.priorityTarget), + }; + + initState() { + this.state.connectedDocs.clear(); + this.state.pushUpdatesQueue.clear(); + this.state.pullUpdatesQueue.clear(); + this.state.subdocsLoadQueue.clear(); + this.state.pushingUpdate = false; + this.state.subdocLoading = false; + } + + /** + * main synchronization logic + */ + async sync(abortOuter: AbortSignal) { + this.initState(); + const abortInner = new AbortController(); + + abortOuter.addEventListener('abort', reason => { + abortInner.abort(reason); + }); + + let dispose: (() => void) | null = null; + try { + this.reportSyncStatus(); + + // start listen storage updates + dispose = await this.storage.subscribe( + this.handleStorageUpdates, + reason => { + // abort if storage disconnect, should trigger retry loop + abortInner.abort('subscribe disconnect:' + reason); + } + ); + throwIfAborted(abortInner.signal); + + // Step 1: load root doc + await this.connectDoc(this.rootDoc, abortInner.signal); + + // Step 2: load subdocs + this.state.subdocsLoadQueue.push( + ...Array.from(this.rootDoc.getSubdocs()).map(doc => ({ + id: doc.guid, + doc, + })) + ); + this.reportSyncStatus(); + + this.rootDoc.on('subdocs', this.handleSubdocsUpdate); + + // Finally: start sync + await Promise.all([ + // load subdocs + (async () => { + while (throwIfAborted(abortInner.signal)) { + const subdoc = await this.state.subdocsLoadQueue.next( + abortInner.signal + ); + this.state.subdocLoading = true; + this.reportSyncStatus(); + await this.connectDoc(subdoc.doc, abortInner.signal); + this.state.subdocLoading = false; + this.reportSyncStatus(); + } + })(), + // pull updates + (async () => { + while (throwIfAborted(abortInner.signal)) { + const { id, data } = await this.state.pullUpdatesQueue.next( + abortInner.signal + ); + // don't apply empty data or Uint8Array([0, 0]) + if ( + !( + data.byteLength === 0 || + (data.byteLength === 2 && data[0] === 0 && data[1] === 0) + ) + ) { + const subdoc = this.state.connectedDocs.get(id); + if (subdoc) { + applyUpdate(subdoc, data, this.name); + } + } + this.reportSyncStatus(); + } + })(), + // push updates + (async () => { + while (throwIfAborted(abortInner.signal)) { + const { id, data } = await this.state.pushUpdatesQueue.next( + abortInner.signal + ); + this.state.pushingUpdate = true; + this.reportSyncStatus(); + + const merged = mergeUpdates(data); + + // don't push empty data or Uint8Array([0, 0]) + if ( + !( + merged.byteLength === 0 || + (merged.byteLength === 2 && merged[0] === 0 && merged[1] === 0) + ) + ) { + await this.storage.push(id, merged); + } + + this.state.pushingUpdate = false; + this.reportSyncStatus(); + } + })(), + ]); + } finally { + dispose?.(); + for (const docs of this.state.connectedDocs.values()) { + this.disconnectDoc(docs); + } + this.rootDoc.off('subdocs', this.handleSubdocsUpdate); + } + } + + async connectDoc(doc: Doc, abort: AbortSignal) { + const { data: docData, state: inStorageState } = + (await this.storage.pull(doc.guid, encodeStateVector(doc))) ?? {}; + throwIfAborted(abort); + + if (docData) { + applyUpdate(doc, docData, 'load'); + } + + // diff root doc and in-storage, save updates to pendingUpdates + this.state.pushUpdatesQueue.push({ + id: doc.guid, + data: [encodeStateAsUpdate(doc, inStorageState)], + }); + + this.state.connectedDocs.set(doc.guid, doc); + + // start listen root doc changes + doc.on('update', this.handleYDocUpdates); + + // mark rootDoc as loaded + doc.emit('sync', [true]); + + this.reportSyncStatus(); + } + + disconnectDoc(doc: Doc) { + doc.off('update', this.handleYDocUpdates); + this.state.connectedDocs.delete(doc.guid); + this.reportSyncStatus(); + } + + // handle updates from ydoc + handleYDocUpdates = (update: Uint8Array, origin: string, doc: Doc) => { + // don't push updates from storage + if (origin === this.name) { + return; + } + + const exist = this.state.pushUpdatesQueue.find(({ id }) => id === doc.guid); + if (exist) { + exist.data.push(update); + } else { + this.state.pushUpdatesQueue.push({ + id: doc.guid, + data: [update], + }); + } + + this.reportSyncStatus(); + }; + + // handle subdocs changes, append new subdocs to queue, remove subdocs from queue + handleSubdocsUpdate = ({ + added, + removed, + }: { + added: Set; + removed: Set; + }) => { + for (const subdoc of added) { + this.state.subdocsLoadQueue.push({ id: subdoc.guid, doc: subdoc }); + } + + for (const subdoc of removed) { + this.disconnectDoc(subdoc); + this.state.subdocsLoadQueue.remove(doc => doc.doc === subdoc); + } + this.reportSyncStatus(); + }; + + // handle updates from storage + handleStorageUpdates = (id: string, data: Uint8Array) => { + this.state.pullUpdatesQueue.push({ + id, + data, + }); + this.reportSyncStatus(); + }; + + reportSyncStatus() { + let step; + if (this.state.connectedDocs.size === 0) { + step = SyncPeerStep.LoadingRootDoc; + } else if (this.state.subdocsLoadQueue.length || this.state.subdocLoading) { + step = SyncPeerStep.LoadingSubDoc; + } else if ( + this.state.pullUpdatesQueue.length || + this.state.pushUpdatesQueue.length || + this.state.pushingUpdate + ) { + step = SyncPeerStep.Syncing; + } else { + step = SyncPeerStep.Synced; + } + + this.status = { + step: step, + totalDocs: + this.state.connectedDocs.size + this.state.subdocsLoadQueue.length, + loadedDocs: this.state.connectedDocs.size, + pendingPullUpdates: + this.state.pullUpdatesQueue.length + (this.state.subdocLoading ? 1 : 0), + pendingPushUpdates: + this.state.pushUpdatesQueue.length + (this.state.pushingUpdate ? 1 : 0), + }; + } + + async waitForSynced(abort?: AbortSignal) { + if (this.status.step >= SyncPeerStep.Synced) { + return; + } else { + return Promise.race([ + new Promise(resolve => { + this.onStatusChange.on(status => { + if (status.step >= SyncPeerStep.Synced) { + resolve(); + } + }); + }), + new Promise((_, reject) => { + if (abort?.aborted) { + reject(abort?.reason); + } + abort?.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + ]); + } + } + + async waitForLoaded(abort?: AbortSignal) { + if (this.status.step > SyncPeerStep.Loaded) { + return; + } else { + return Promise.race([ + new Promise(resolve => { + this.onStatusChange.on(status => { + if (status.step > SyncPeerStep.Loaded) { + resolve(); + } + }); + }), + new Promise((_, reject) => { + if (abort?.aborted) { + reject(abort?.reason); + } + abort?.addEventListener('abort', () => { + reject(abort.reason); + }); + }), + ]); + } + } +} diff --git a/packages/common/infra/src/workspace/engine/sync/storage.ts b/packages/common/infra/src/workspace/engine/sync/storage.ts new file mode 100644 index 0000000000..34784f1d40 --- /dev/null +++ b/packages/common/infra/src/workspace/engine/sync/storage.ts @@ -0,0 +1,25 @@ +export interface SyncStorage { + /** + * for debug + */ + name: string; + + pull( + docId: string, + state: Uint8Array + ): Promise<{ data: Uint8Array; state?: Uint8Array } | null>; + push(docId: string, data: Uint8Array): Promise; + + /** + * Subscribe to updates from peer + * + * @param cb callback to handle updates + * @param disconnect callback to handle disconnect, reason can be something like 'network-error' + * + * @returns unsubscribe function + */ + subscribe( + cb: (docId: string, data: Uint8Array) => void, + disconnect: (reason: string) => void + ): Promise<() => void>; +} diff --git a/packages/common/infra/src/workspace/factory.ts b/packages/common/infra/src/workspace/factory.ts new file mode 100644 index 0000000000..fd3d85d13f --- /dev/null +++ b/packages/common/infra/src/workspace/factory.ts @@ -0,0 +1,15 @@ +import { createIdentifier, type ServiceCollection } from '../di'; + +export interface WorkspaceFactory { + name: string; + + configureWorkspace(services: ServiceCollection): void; + + /** + * get blob without open workspace + */ + getWorkspaceBlob(id: string, blobKey: string): Promise; +} + +export const WorkspaceFactory = + createIdentifier('WorkspaceFactory'); diff --git a/packages/common/infra/src/workspace/global-schema.ts b/packages/common/infra/src/workspace/global-schema.ts new file mode 100644 index 0000000000..e03dc9a7c2 --- /dev/null +++ b/packages/common/infra/src/workspace/global-schema.ts @@ -0,0 +1,6 @@ +import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models'; +import { Schema } from '@blocksuite/store'; + +export const globalBlockSuiteSchema = new Schema(); + +globalBlockSuiteSchema.register(AffineSchemas).register(__unstableSchemas); diff --git a/packages/common/infra/src/workspace/index.ts b/packages/common/infra/src/workspace/index.ts new file mode 100644 index 0000000000..74629684f7 --- /dev/null +++ b/packages/common/infra/src/workspace/index.ts @@ -0,0 +1,81 @@ +export * from './context'; +export * from './engine'; +export * from './factory'; +export * from './global-schema'; +export * from './list'; +export * from './manager'; +export * from './metadata'; +export * from './service-scope'; +export * from './testing'; +export * from './upgrade'; +export * from './workspace'; + +import { type ServiceCollection, ServiceProvider } from '../di'; +import { CleanupService } from '../lifecycle'; +import { GlobalCache, GlobalState } from '../storage'; +import { + BlockSuiteWorkspaceContext, + RootYDocContext, + WorkspaceMetadataContext, +} from './context'; +import { + AwarenessEngine, + AwarenessProvider, + BlobEngine, + LocalBlobStorage, + LocalSyncStorage, + RemoteBlobStorage, + RemoteSyncStorage, + SyncEngine, + WorkspaceEngine, +} from './engine'; +import { WorkspaceFactory } from './factory'; +import { WorkspaceListProvider, WorkspaceListService } from './list'; +import { WorkspaceManager } from './manager'; +import { WorkspaceScope } from './service-scope'; +import { + TestingLocalWorkspaceFactory, + TestingLocalWorkspaceListProvider, +} from './testing'; +import { WorkspaceUpgradeController } from './upgrade'; +import { Workspace } from './workspace'; + +export function configureWorkspaceServices(services: ServiceCollection) { + // global scope + services + .add(WorkspaceManager, [ + WorkspaceListService, + [WorkspaceFactory], + ServiceProvider, + ]) + .add(WorkspaceListService, [[WorkspaceListProvider], GlobalCache]); + + // workspace scope + services + .scope(WorkspaceScope) + .add(CleanupService) + .add(Workspace, [ + WorkspaceMetadataContext, + WorkspaceEngine, + BlockSuiteWorkspaceContext, + WorkspaceUpgradeController, + ServiceProvider, + ]) + .add(WorkspaceEngine, [BlobEngine, SyncEngine, AwarenessEngine]) + .add(AwarenessEngine, [[AwarenessProvider]]) + .add(BlobEngine, [LocalBlobStorage, [RemoteBlobStorage]]) + .add(SyncEngine, [RootYDocContext, LocalSyncStorage, [RemoteSyncStorage]]) + .add(WorkspaceUpgradeController, [ + BlockSuiteWorkspaceContext, + SyncEngine, + WorkspaceMetadataContext, + ]); +} + +export function configureTestingWorkspaceServices(services: ServiceCollection) { + services + .addImpl(WorkspaceListProvider, TestingLocalWorkspaceListProvider, [ + GlobalState, + ]) + .addImpl(WorkspaceFactory, TestingLocalWorkspaceFactory, [GlobalState]); +} diff --git a/packages/common/infra/src/workspace/list/cache.ts b/packages/common/infra/src/workspace/list/cache.ts new file mode 100644 index 0000000000..ecf1005037 --- /dev/null +++ b/packages/common/infra/src/workspace/list/cache.ts @@ -0,0 +1,25 @@ +import type { GlobalCache } from '../../storage'; +import { type WorkspaceMetadata } from '../metadata'; + +const CACHE_STORAGE_KEY = 'jotai-workspaces'; + +export function readWorkspaceListCache(cache: GlobalCache) { + const metadata = cache.get(CACHE_STORAGE_KEY); + if (metadata) { + try { + const items = metadata as WorkspaceMetadata[]; + return [...items]; + } catch (e) { + console.error('cannot parse worksapce', e); + } + return []; + } + return []; +} + +export function writeWorkspaceListCache( + cache: GlobalCache, + metadata: WorkspaceMetadata[] +) { + cache.set(CACHE_STORAGE_KEY, metadata); +} diff --git a/packages/common/infra/src/workspace/list/index.ts b/packages/common/infra/src/workspace/list/index.ts new file mode 100644 index 0000000000..ff414bc1d6 --- /dev/null +++ b/packages/common/infra/src/workspace/list/index.ts @@ -0,0 +1,301 @@ +import { DebugLogger } from '@affine/debug'; +import type { WorkspaceFlavour } from '@affine/env/workspace'; +import type { Workspace as BlockSuiteWorkspace } from '@blocksuite/store'; +import { differenceWith } from 'lodash-es'; + +import { createIdentifier } from '../../di'; +import { LiveData } from '../../livedata'; +import type { GlobalCache } from '../../storage'; +import type { BlobStorage } from '../engine'; +import type { WorkspaceMetadata } from '../metadata'; +import { readWorkspaceListCache, writeWorkspaceListCache } from './cache'; +import { type WorkspaceInfo, WorkspaceInformation } from './information'; + +export * from './information'; + +const logger = new DebugLogger('affine:workspace:list'); + +export interface WorkspaceListProvider { + name: WorkspaceFlavour; + + /** + * get workspaces list + */ + getList(): Promise; + + /** + * delete workspace by id + */ + delete(workspaceId: string): Promise; + + /** + * create workspace + * @param initial callback to put initial data to workspace + */ + create( + initial: ( + workspace: BlockSuiteWorkspace, + blobStorage: BlobStorage + ) => Promise + ): Promise; + + /** + * Start subscribe workspaces list + * + * @returns unsubscribe function + */ + subscribe( + callback: (changed: { + added?: WorkspaceMetadata[]; + deleted?: WorkspaceMetadata[]; + }) => void + ): () => void; + + /** + * get workspace avatar and name by id + * + * @param id workspace id + */ + getInformation(id: string): Promise; +} + +export const WorkspaceListProvider = createIdentifier( + 'WorkspaceListProvider' +); + +export interface WorkspaceListStatus { + /** + * is workspace list doing first loading. + * if false, UI can display workspace not found page. + */ + loading: boolean; + workspaceList: WorkspaceMetadata[]; +} + +/** + * # WorkspaceList + * + * manage multiple workspace metadata list providers. + * provide a __cache-first__ and __offline useable__ workspace list. + */ +export class WorkspaceListService { + private readonly abortController = new AbortController(); + + private readonly workspaceInformationList = new Map< + string, + WorkspaceInformation + >(); + + status = new LiveData({ + loading: true, + workspaceList: [], + }); + + setStatus(status: WorkspaceListStatus) { + this.status.next(status); + // update cache + writeWorkspaceListCache(this.cache, status.workspaceList); + } + + workspaceList = this.status.map(x => x.workspaceList); + + constructor( + private readonly providers: WorkspaceListProvider[], + private readonly cache: GlobalCache + ) { + // initialize workspace list from cache + const cached = readWorkspaceListCache(cache); + const workspaceList = cached; + this.status.next({ + ...this.status.value, + workspaceList, + }); + + // start first load + this.startLoad(); + } + + /** + * create workspace + * @param flavour workspace flavour + * @param initial callback to put initial data to workspace + * @returns workspace id + */ + async create( + flavour: WorkspaceFlavour, + initial: ( + workspace: BlockSuiteWorkspace, + blobStorage: BlobStorage + ) => Promise = () => Promise.resolve() + ) { + const provider = this.providers.find(x => x.name === flavour); + if (!provider) { + throw new Error(`Unknown workspace flavour: ${flavour}`); + } + const metadata = await provider.create(initial); + // update workspace list + this.setStatus(this.addWorkspace(this.status.value, metadata)); + return metadata; + } + + /** + * delete workspace + * @param workspaceMetadata + */ + async delete(workspaceMetadata: WorkspaceMetadata) { + logger.info( + `delete workspace [${workspaceMetadata.flavour}] ${workspaceMetadata.id}` + ); + const provider = this.providers.find( + x => x.name === workspaceMetadata.flavour + ); + if (!provider) { + throw new Error( + `Unknown workspace flavour: ${workspaceMetadata.flavour}` + ); + } + await provider.delete(workspaceMetadata.id); + + // delete workspace from list + this.setStatus(this.deleteWorkspace(this.status.value, workspaceMetadata)); + } + + /** + * add workspace to list + */ + private addWorkspace( + status: WorkspaceListStatus, + workspaceMetadata: WorkspaceMetadata + ) { + if (status.workspaceList.some(x => x.id === workspaceMetadata.id)) { + return status; + } + return { + ...status, + workspaceList: status.workspaceList.concat(workspaceMetadata), + }; + } + + /** + * delete workspace from list + */ + private deleteWorkspace( + status: WorkspaceListStatus, + workspaceMetadata: WorkspaceMetadata + ) { + if (!status.workspaceList.some(x => x.id === workspaceMetadata.id)) { + return status; + } + return { + ...status, + workspaceList: status.workspaceList.filter( + x => x.id !== workspaceMetadata.id + ), + }; + } + + /** + * callback for subscribe workspaces list + */ + private handleWorkspaceChange(changed: { + added?: WorkspaceMetadata[]; + deleted?: WorkspaceMetadata[]; + }) { + let status = this.status.value; + + for (const added of changed.added ?? []) { + status = this.addWorkspace(status, added); + } + for (const deleted of changed.deleted ?? []) { + status = this.deleteWorkspace(status, deleted); + } + + this.setStatus(status); + } + + /** + * start first load workspace list + */ + private startLoad() { + for (const provider of this.providers) { + // subscribe workspace list change + const unsubscribe = provider.subscribe(changed => { + this.handleWorkspaceChange(changed); + }); + + // unsubscribe when abort + if (this.abortController.signal.aborted) { + unsubscribe(); + return; + } + this.abortController.signal.addEventListener('abort', () => { + unsubscribe(); + }); + } + + this.revalidate() + .catch(error => { + logger.error('load workspace list error: ' + error); + }) + .finally(() => { + this.setStatus({ + ...this.status.value, + loading: false, + }); + }); + } + + async revalidate() { + await Promise.allSettled( + this.providers.map(async provider => { + try { + const list = await provider.getList(); + const oldList = this.workspaceList.value.filter( + w => w.flavour === provider.name + ); + this.handleWorkspaceChange({ + added: differenceWith(list, oldList, (a, b) => a.id === b.id), + deleted: differenceWith(oldList, list, (a, b) => a.id === b.id), + }); + } catch (error) { + logger.error('load workspace list error: ' + error); + } + }) + ); + } + + /** + * get workspace information, if not exists, create it. + */ + getInformation(meta: WorkspaceMetadata) { + const exists = this.workspaceInformationList.get(meta.id); + if (exists) { + return exists; + } + + return this.createInformation(meta); + } + + private createInformation(workspaceMetadata: WorkspaceMetadata) { + const provider = this.providers.find( + x => x.name === workspaceMetadata.flavour + ); + if (!provider) { + throw new Error( + `Unknown workspace flavour: ${workspaceMetadata.flavour}` + ); + } + const information = new WorkspaceInformation( + workspaceMetadata, + provider, + this.cache + ); + information.fetch(); + this.workspaceInformationList.set(workspaceMetadata.id, information); + return information; + } + + dispose() { + this.abortController.abort(); + } +} diff --git a/packages/common/infra/src/workspace/list/information.ts b/packages/common/infra/src/workspace/list/information.ts new file mode 100644 index 0000000000..e143d72e50 --- /dev/null +++ b/packages/common/infra/src/workspace/list/information.ts @@ -0,0 +1,92 @@ +import { DebugLogger } from '@affine/debug'; +import { Slot } from '@blocksuite/global/utils'; + +import type { Memento } from '../..'; +import type { WorkspaceMetadata } from '../metadata'; +import type { Workspace } from '../workspace'; +import type { WorkspaceListProvider } from '.'; + +const logger = new DebugLogger('affine:workspace:list:information'); + +const WORKSPACE_INFORMATION_CACHE_KEY = 'workspace-information:'; + +export interface WorkspaceInfo { + avatar?: string; + name?: string; +} + +/** + * # WorkspaceInformation + * + * This class take care of workspace avatar and name + * + * The class will try to get from 3 places: + * - local cache + * - fetch from `WorkspaceListProvider`, which will fetch from database or server + * - sync with active workspace + */ +export class WorkspaceInformation { + private _info: WorkspaceInfo = {}; + + public set info(info: WorkspaceInfo) { + if (info.avatar !== this._info.avatar || info.name !== this._info.name) { + this.cache.set(WORKSPACE_INFORMATION_CACHE_KEY + this.meta.id, info); + this._info = info; + this.onUpdated.emit(info); + } + } + + public get info() { + return this._info; + } + + public onUpdated = new Slot(); + + constructor( + public meta: WorkspaceMetadata, + public provider: WorkspaceListProvider, + public cache: Memento + ) { + const cached = this.getCachedInformation(); + // init with cached information + this.info = { ...cached }; + } + + /** + * sync information with workspace + */ + syncWithWorkspace(workspace: Workspace) { + this.info = { + avatar: workspace.blockSuiteWorkspace.meta.avatar ?? this.info.avatar, + name: workspace.blockSuiteWorkspace.meta.name ?? this.info.name, + }; + workspace.blockSuiteWorkspace.meta.commonFieldsUpdated.on(() => { + this.info = { + avatar: workspace.blockSuiteWorkspace.meta.avatar ?? this.info.avatar, + name: workspace.blockSuiteWorkspace.meta.name ?? this.info.name, + }; + }); + } + + getCachedInformation() { + return this.cache.get( + WORKSPACE_INFORMATION_CACHE_KEY + this.meta.id + ); + } + + /** + * fetch information from provider + */ + fetch() { + this.provider + .getInformation(this.meta.id) + .then(info => { + if (info) { + this.info = info; + } + }) + .catch(err => { + logger.warn('get workspace information error: ' + err); + }); + } +} diff --git a/packages/common/infra/src/workspace/manager.ts b/packages/common/infra/src/workspace/manager.ts new file mode 100644 index 0000000000..29570ef5b0 --- /dev/null +++ b/packages/common/infra/src/workspace/manager.ts @@ -0,0 +1,190 @@ +import { DebugLogger } from '@affine/debug'; +import { WorkspaceFlavour } from '@affine/env/workspace'; +import { assertEquals } from '@blocksuite/global/utils'; +import type { Workspace as BlockSuiteWorkspace } from '@blocksuite/store'; +import { applyUpdate, encodeStateAsUpdate } from 'yjs'; + +import { fixWorkspaceVersion } from '../blocksuite'; +import type { ServiceProvider } from '../di'; +import { ObjectPool } from '../utils/object-pool'; +import { configureWorkspaceContext } from './context'; +import type { BlobStorage } from './engine'; +import type { WorkspaceFactory } from './factory'; +import type { WorkspaceListService } from './list'; +import type { WorkspaceMetadata } from './metadata'; +import { WorkspaceScope } from './service-scope'; +import { Workspace } from './workspace'; + +const logger = new DebugLogger('affine:workspace-manager'); + +/** + * # `WorkspaceManager` + * + * This class acts as the central hub for managing various aspects of workspaces. + * It is structured as follows: + * + * ``` + * ┌───────────┐ + * │ Workspace │ + * │ Manager │ + * └─────┬─────┘ + * ┌─────────────┼─────────────┐ + * ┌───┴───┐ ┌───┴───┐ ┌─────┴─────┐ + * │ List │ │ Pool │ │ Factories │ + * └───────┘ └───────┘ └───────────┘ + * ``` + * + * Manage every about workspace + * + * # List + * + * The `WorkspaceList` component stores metadata for all workspaces, also include workspace avatar and custom name. + * + * # Factories + * + * This class contains a collection of `WorkspaceFactory`, + * We utilize `metadata.flavour` to identify the appropriate factory for opening a workspace. + * Once opened, workspaces are stored in the `WorkspacePool`. + * + * # Pool + * + * The `WorkspacePool` use reference counting to manage active workspaces. + * Calling `use()` to create a reference to the workspace. Calling `release()` to release the reference. + * When the reference count is 0, it will close the workspace. + * + */ +export class WorkspaceManager { + pool = new ObjectPool({ + onDelete(workspace) { + workspace.forceStop(); + }, + onDangling(workspace) { + return workspace.canGracefulStop(); + }, + }); + + constructor( + public readonly list: WorkspaceListService, + public readonly factories: WorkspaceFactory[], + private readonly serviceProvider: ServiceProvider + ) {} + + /** + * get workspace reference by metadata. + * + * You basically don't need to call this function directly, use the react hook `useWorkspace(metadata)` instead. + * + * @returns the workspace reference and a release function, don't forget to call release function when you don't + * need the workspace anymore. + */ + open(metadata: WorkspaceMetadata): { + workspace: Workspace; + release: () => void; + } { + const exist = this.pool.get(metadata.id); + if (exist) { + return { + workspace: exist.obj, + release: exist.release, + }; + } + + const workspace = this.instantiate(metadata); + const ref = this.pool.put(workspace.meta.id, workspace); + + return { + workspace: ref.obj, + release: ref.release, + }; + } + + createWorkspace( + flavour: WorkspaceFlavour, + initial?: ( + workspace: BlockSuiteWorkspace, + blobStorage: BlobStorage + ) => Promise + ): Promise { + logger.info(`create workspace [${flavour}]`); + return this.list.create(flavour, initial); + } + + /** + * delete workspace by metadata, same as `WorkspaceList.deleteWorkspace` + */ + async deleteWorkspace(metadata: WorkspaceMetadata) { + await this.list.delete(metadata); + } + + /** + * helper function to transform local workspace to cloud workspace + */ + async transformLocalToCloud(local: Workspace): Promise { + assertEquals(local.flavour, WorkspaceFlavour.LOCAL); + + await local.engine.sync.waitForSynced(); + + const newId = await this.list.create( + WorkspaceFlavour.AFFINE_CLOUD, + async (ws, bs) => { + applyUpdate(ws.doc, encodeStateAsUpdate(local.blockSuiteWorkspace.doc)); + + for (const subdoc of local.blockSuiteWorkspace.doc.getSubdocs()) { + for (const newSubdoc of ws.doc.getSubdocs()) { + if (newSubdoc.guid === subdoc.guid) { + applyUpdate(newSubdoc, encodeStateAsUpdate(subdoc)); + } + } + } + + const blobList = await local.engine.blob.list(); + + for (const blobKey of blobList) { + const blob = await local.engine.blob.get(blobKey); + if (blob) { + await bs.set(blobKey, blob); + } + } + } + ); + + await this.list.delete(local.meta); + + return newId; + } + + /** + * helper function to get blob without open workspace, its be used for download workspace avatars. + */ + getWorkspaceBlob(metadata: WorkspaceMetadata, blobKey: string) { + const factory = this.factories.find(x => x.name === metadata.flavour); + if (!factory) { + throw new Error(`Unknown workspace flavour: ${metadata.flavour}`); + } + return factory.getWorkspaceBlob(metadata.id, blobKey); + } + + private instantiate(metadata: WorkspaceMetadata) { + logger.info(`open workspace [${metadata.flavour}] ${metadata.id} `); + const factory = this.factories.find(x => x.name === metadata.flavour); + if (!factory) { + throw new Error(`Unknown workspace flavour: ${metadata.flavour}`); + } + const serviceCollection = this.serviceProvider.collection.clone(); + factory.configureWorkspace(serviceCollection); + configureWorkspaceContext(serviceCollection, metadata); + const provider = serviceCollection.provider( + WorkspaceScope, + this.serviceProvider + ); + const workspace = provider.get(Workspace); + + // sync information with workspace list, when workspace's avatar and name changed, information will be updated + this.list.getInformation(metadata).syncWithWorkspace(workspace); + + // apply compatibility fix + fixWorkspaceVersion(workspace.blockSuiteWorkspace.doc); + + return workspace; + } +} diff --git a/packages/common/infra/src/workspace/metadata.ts b/packages/common/infra/src/workspace/metadata.ts new file mode 100644 index 0000000000..d73b79f8a6 --- /dev/null +++ b/packages/common/infra/src/workspace/metadata.ts @@ -0,0 +1,3 @@ +import type { WorkspaceFlavour } from '@affine/env/workspace'; + +export type WorkspaceMetadata = { id: string; flavour: WorkspaceFlavour }; diff --git a/packages/common/infra/src/workspace/service-scope.ts b/packages/common/infra/src/workspace/service-scope.ts new file mode 100644 index 0000000000..4212cf9ed7 --- /dev/null +++ b/packages/common/infra/src/workspace/service-scope.ts @@ -0,0 +1,3 @@ +import { createScope } from '../di'; + +export const WorkspaceScope = createScope('workspace'); diff --git a/packages/common/infra/src/workspace/testing.ts b/packages/common/infra/src/workspace/testing.ts new file mode 100644 index 0000000000..6f2b364b0f --- /dev/null +++ b/packages/common/infra/src/workspace/testing.ts @@ -0,0 +1,244 @@ +import { WorkspaceFlavour } from '@affine/env/workspace'; +import { Workspace as BlockSuiteWorkspace } from '@blocksuite/store'; +import { differenceBy } from 'lodash-es'; +import { nanoid } from 'nanoid'; +import { applyUpdate, encodeStateAsUpdate } from 'yjs'; + +import { type ServiceCollection } from '../di'; +import { GlobalState, type Memento } from '../storage'; +import { mergeUpdates } from '../utils/merge-updates'; +import { WorkspaceMetadataContext } from './context'; +import { + AwarenessProvider, + type BlobStorage, + LocalBlobStorage, + LocalSyncStorage, + type SyncStorage, +} from './engine'; +import type { WorkspaceFactory } from './factory'; +import { globalBlockSuiteSchema } from './global-schema'; +import type { WorkspaceListProvider } from './list'; +import { type WorkspaceInfo } from './list'; +import { type WorkspaceMetadata } from './metadata'; +import { WorkspaceScope } from './service-scope'; + +const LIST_STORE_KEY = 'testing-workspace-list'; + +export class TestingLocalWorkspaceListProvider + implements WorkspaceListProvider +{ + name = WorkspaceFlavour.LOCAL; + + constructor(private readonly state: Memento) {} + + getList(): Promise { + const list = this.state.get(LIST_STORE_KEY); + return Promise.resolve(list ?? []); + } + delete(workspaceId: string): Promise { + const list = this.state.get(LIST_STORE_KEY) ?? []; + const newList = list.filter(meta => meta.id !== workspaceId); + this.state.set(LIST_STORE_KEY, newList); + return Promise.resolve(); + } + async create( + initial: ( + workspace: BlockSuiteWorkspace, + blobStorage: BlobStorage + ) => Promise + ): Promise { + const id = nanoid(); + const meta = { id, flavour: WorkspaceFlavour.LOCAL }; + + const blobStorage = new TestingBlobStorage(meta, this.state); + const syncStorage = new TestingSyncStorage(meta, this.state); + + const workspace = new BlockSuiteWorkspace({ + id: id, + idGenerator: () => nanoid(), + schema: globalBlockSuiteSchema, + }); + + // apply initial state + await initial(workspace, blobStorage); + + // save workspace to storage + await syncStorage.push(id, encodeStateAsUpdate(workspace.doc)); + for (const subdocs of workspace.doc.getSubdocs()) { + await syncStorage.push(subdocs.guid, encodeStateAsUpdate(subdocs)); + } + + const list = this.state.get(LIST_STORE_KEY) ?? []; + this.state.set(LIST_STORE_KEY, [...list, meta]); + + return { id, flavour: WorkspaceFlavour.LOCAL }; + } + subscribe( + callback: (changed: { + added?: WorkspaceMetadata[] | undefined; + deleted?: WorkspaceMetadata[] | undefined; + }) => void + ): () => void { + let lastWorkspaces: WorkspaceMetadata[] = + this.state.get(LIST_STORE_KEY) ?? []; + + const sub = this.state + .watch(LIST_STORE_KEY) + .subscribe(allWorkspaces => { + if (allWorkspaces) { + const added = differenceBy(allWorkspaces, lastWorkspaces, v => v.id); + const deleted = differenceBy( + lastWorkspaces, + allWorkspaces, + v => v.id + ); + lastWorkspaces = allWorkspaces; + if (added.length || deleted.length) { + callback({ added, deleted }); + } + } + }); + return () => { + sub.unsubscribe(); + }; + } + async getInformation(id: string): Promise { + // get information from root doc + const storage = new TestingSyncStorage( + { + flavour: WorkspaceFlavour.LOCAL, + id, + }, + this.state + ); + const data = await storage.pull(id, new Uint8Array([])); + + if (!data) { + return; + } + + const bs = new BlockSuiteWorkspace({ + id, + schema: globalBlockSuiteSchema, + }); + + applyUpdate(bs.doc, data.data); + + return { + name: bs.meta.name, + avatar: bs.meta.avatar, + }; + } +} + +export class TestingLocalWorkspaceFactory implements WorkspaceFactory { + constructor(private readonly state: Memento) {} + + name = WorkspaceFlavour.LOCAL; + + configureWorkspace(services: ServiceCollection): void { + services + .scope(WorkspaceScope) + .addImpl(LocalBlobStorage, TestingBlobStorage, [ + WorkspaceMetadataContext, + GlobalState, + ]) + .addImpl(LocalSyncStorage, TestingSyncStorage, [ + WorkspaceMetadataContext, + GlobalState, + ]) + .addImpl(AwarenessProvider, TestingAwarenessProvider); + } + + getWorkspaceBlob(id: string, blobKey: string): Promise { + return new TestingBlobStorage( + { + flavour: WorkspaceFlavour.LOCAL, + id, + }, + this.state + ).get(blobKey); + } +} + +export class TestingSyncStorage implements SyncStorage { + constructor( + private readonly metadata: WorkspaceMetadata, + private readonly state: Memento + ) {} + name: string = 'testing'; + async pull( + docId: string, + _: Uint8Array + ): Promise<{ data: Uint8Array; state?: Uint8Array | undefined } | null> { + const key = 'testing-sync/' + this.metadata.id + '/' + docId; + const data = this.state.get(key); + if (data) { + return { data }; + } else { + return null; + } + } + async push(docId: string, data: Uint8Array): Promise { + const key = 'testing-sync/' + this.metadata.id + '/' + docId; + const oldData = this.state.get(key); + const update = mergeUpdates(oldData ? [oldData, data] : [data]); + this.state.set(key, update); + } + async subscribe( + _cb: (docId: string, data: Uint8Array) => void, + _disconnect: (reason: string) => void + ): Promise<() => void> { + return () => {}; + } +} + +export class TestingBlobStorage implements BlobStorage { + name = 'testing'; + readonly = false; + + constructor( + private readonly metadata: WorkspaceMetadata, + private readonly state: Memento + ) {} + + get(key: string) { + const storeKey = 'testing-blob/' + this.metadata.id + '/' + key; + return Promise.resolve(this.state.get(storeKey) ?? null); + } + set(key: string, value: Blob) { + const storeKey = 'testing-blob/' + this.metadata.id + '/' + key; + this.state.set(storeKey, value); + + const listKey = 'testing-blob-list/' + this.metadata.id; + const list = this.state.get>(listKey) ?? new Set(); + list.add(key); + this.state.set(listKey, list); + + return Promise.resolve(key); + } + delete(key: string) { + this.state.set(key, null); + + const listKey = 'testing-blob-list/' + this.metadata.id; + const list = this.state.get>(listKey) ?? new Set(); + list.delete(key); + this.state.set(listKey, list); + + return Promise.resolve(); + } + list() { + const listKey = 'testing-blob-list/' + this.metadata.id; + const list = this.state.get>(listKey); + return Promise.resolve(list ? Array.from(list) : []); + } +} + +export class TestingAwarenessProvider implements AwarenessProvider { + connect(): void { + /* do nothing */ + } + disconnect(): void { + /* do nothing */ + } +} diff --git a/packages/common/infra/src/workspace/upgrade.ts b/packages/common/infra/src/workspace/upgrade.ts new file mode 100644 index 0000000000..2b57dafce4 --- /dev/null +++ b/packages/common/infra/src/workspace/upgrade.ts @@ -0,0 +1,142 @@ +import { Unreachable } from '@affine/env/constant'; +import { WorkspaceFlavour } from '@affine/env/workspace'; +import { Slot } from '@blocksuite/global/utils'; +import type { Workspace as BlockSuiteWorkspace } from '@blocksuite/store'; +import { applyUpdate, Doc as YDoc, encodeStateAsUpdate } from 'yjs'; + +import { checkWorkspaceCompatibility, MigrationPoint } from '../blocksuite'; +import { forceUpgradePages, upgradeV1ToV2 } from '../blocksuite'; +import { migrateGuidCompatibility } from '../blocksuite'; +import type { SyncEngine } from './engine/sync'; +import type { WorkspaceManager } from './manager'; +import { type WorkspaceMetadata } from './metadata'; + +export interface WorkspaceUpgradeStatus { + needUpgrade: boolean; + upgrading: boolean; +} + +export class WorkspaceUpgradeController { + _status: Readonly = { + needUpgrade: false, + upgrading: false, + }; + readonly onStatusChange = new Slot(); + + get status() { + return this._status; + } + + set status(value) { + if ( + value.needUpgrade !== this._status.needUpgrade || + value.upgrading !== this._status.upgrading + ) { + this._status = value; + this.onStatusChange.emit(value); + } + } + + constructor( + private readonly blockSuiteWorkspace: BlockSuiteWorkspace, + private readonly sync: SyncEngine, + private readonly workspaceMetadata: WorkspaceMetadata + ) { + blockSuiteWorkspace.doc.on('update', () => { + this.checkIfNeedUpgrade(); + }); + } + + checkIfNeedUpgrade() { + const needUpgrade = !!checkWorkspaceCompatibility( + this.blockSuiteWorkspace, + this.workspaceMetadata.flavour === WorkspaceFlavour.AFFINE_CLOUD + ); + this.status = { + ...this.status, + needUpgrade, + }; + return needUpgrade; + } + + async upgrade( + workspaceManager: WorkspaceManager + ): Promise { + if (this.status.upgrading) { + return null; + } + + this.status = { ...this.status, upgrading: true }; + + try { + await this.sync.waitForSynced(); + + const step = checkWorkspaceCompatibility( + this.blockSuiteWorkspace, + this.workspaceMetadata.flavour === WorkspaceFlavour.AFFINE_CLOUD + ); + + if (!step) { + return null; + } + + // Clone a new doc to prevent change events. + const clonedDoc = new YDoc({ + guid: this.blockSuiteWorkspace.doc.guid, + }); + applyDoc(clonedDoc, this.blockSuiteWorkspace.doc); + + if (step === MigrationPoint.SubDoc) { + const newWorkspace = await workspaceManager.createWorkspace( + WorkspaceFlavour.LOCAL, + async (workspace, blobStorage) => { + await upgradeV1ToV2(clonedDoc, workspace.doc); + migrateGuidCompatibility(clonedDoc); + await forceUpgradePages( + workspace.doc, + this.blockSuiteWorkspace.schema + ); + const blobList = await this.blockSuiteWorkspace.blob.list(); + + for (const blobKey of blobList) { + const blob = await this.blockSuiteWorkspace.blob.get(blobKey); + if (blob) { + await blobStorage.set(blobKey, blob); + } + } + } + ); + await workspaceManager.deleteWorkspace(this.workspaceMetadata); + return newWorkspace; + } else if (step === MigrationPoint.GuidFix) { + migrateGuidCompatibility(clonedDoc); + await forceUpgradePages(clonedDoc, this.blockSuiteWorkspace.schema); + applyDoc(this.blockSuiteWorkspace.doc, clonedDoc); + await this.sync.waitForSynced(); + return null; + } else if (step === MigrationPoint.BlockVersion) { + await forceUpgradePages(clonedDoc, this.blockSuiteWorkspace.schema); + applyDoc(this.blockSuiteWorkspace.doc, clonedDoc); + await this.sync.waitForSynced(); + return null; + } else { + throw new Unreachable(); + } + } finally { + this.status = { ...this.status, upgrading: false }; + } + } +} + +function applyDoc(target: YDoc, result: YDoc) { + applyUpdate(target, encodeStateAsUpdate(result)); + for (const targetSubDoc of target.subdocs.values()) { + const resultSubDocs = Array.from(result.subdocs.values()); + const resultSubDoc = resultSubDocs.find( + item => item.guid === targetSubDoc.guid + ); + if (resultSubDoc) { + applyDoc(targetSubDoc, resultSubDoc); + } + } +} diff --git a/packages/common/infra/src/workspace/workspace.ts b/packages/common/infra/src/workspace/workspace.ts new file mode 100644 index 0000000000..ffca6bf491 --- /dev/null +++ b/packages/common/infra/src/workspace/workspace.ts @@ -0,0 +1,131 @@ +import { DebugLogger } from '@affine/debug'; +import { Slot } from '@blocksuite/global/utils'; +import type { Workspace as BlockSuiteWorkspace } from '@blocksuite/store'; + +import type { ServiceProvider } from '../di'; +import { CleanupService } from '../lifecycle'; +import type { WorkspaceEngine } from './engine'; +import { type WorkspaceEngineStatus } from './engine'; +import { type WorkspaceMetadata } from './metadata'; +import type { WorkspaceUpgradeController } from './upgrade'; +import { type WorkspaceUpgradeStatus } from './upgrade'; + +const logger = new DebugLogger('affine:workspace'); + +export type WorkspaceStatus = { + mode: 'ready' | 'closed'; + engine: WorkspaceEngineStatus; + upgrade: WorkspaceUpgradeStatus; +}; + +/** + * # Workspace + * + * ``` + * ┌───────────┐ + * │ Workspace │ + * └─────┬─────┘ + * │ + * │ + * ┌──────────────┼─────────────┐ + * │ │ │ + * ┌───┴─────┐ ┌──────┴─────┐ ┌───┴────┐ + * │ Upgrade │ │ blocksuite │ │ Engine │ + * └─────────┘ └────────────┘ └───┬────┘ + * │ + * ┌──────┼─────────┐ + * │ │ │ + * ┌──┴─┐ ┌──┴─┐ ┌─────┴───┐ + * │sync│ │blob│ │awareness│ + * └────┘ └────┘ └─────────┘ + * ``` + * + * This class contains all the components needed to run a workspace. + */ +export class Workspace { + get id() { + return this.meta.id; + } + get flavour() { + return this.meta.flavour; + } + + private _status: WorkspaceStatus; + + onStatusChange = new Slot(); + get status() { + return this._status; + } + + set status(status: WorkspaceStatus) { + this._status = status; + this.onStatusChange.emit(status); + } + + constructor( + public meta: WorkspaceMetadata, + public engine: WorkspaceEngine, + public blockSuiteWorkspace: BlockSuiteWorkspace, + public upgrade: WorkspaceUpgradeController, + public services: ServiceProvider + ) { + this._status = { + mode: 'closed', + engine: engine.status, + upgrade: this.upgrade.status, + }; + this.engine.onStatusChange.on(status => { + this.status = { + ...this.status, + engine: status, + }; + }); + this.upgrade.onStatusChange.on(status => { + this.status = { + ...this.status, + upgrade: status, + }; + }); + + this.start(); + } + + /** + * workspace start when create and workspace is one-time use + */ + private start() { + if (this.status.mode === 'ready') { + return; + } + logger.info('start workspace', this.id); + this.engine.start(); + this.status = { + ...this.status, + mode: 'ready', + engine: this.engine.status, + }; + } + + canGracefulStop() { + return this.engine.canGracefulStop() && !this.status.upgrade.upgrading; + } + + forceStop() { + if (this.status.mode === 'closed') { + return; + } + logger.info('stop workspace', this.id); + this.engine.forceStop(); + this.status = { + ...this.status, + mode: 'closed', + engine: this.engine.status, + }; + this.services.get(CleanupService).cleanup(); + } + + // same as `WorkspaceEngine.sync.setPriorityRule` + setPriorityRule(target: ((id: string) => boolean) | null) { + this.engine.sync.setPriorityRule(target); + } +} diff --git a/packages/frontend/electron/package.json b/packages/frontend/electron/package.json index 48a2697cd6..862d38ffbe 100644 --- a/packages/frontend/electron/package.json +++ b/packages/frontend/electron/package.json @@ -55,7 +55,7 @@ "lodash-es": "^4.17.21", "rxjs": "^7.8.1", "semver": "^7.5.4", - "tinykeys": "^2.1.0", + "tinykeys": "patch:tinykeys@npm%3A2.1.0#~/.yarn/patches/tinykeys-npm-2.1.0-819feeaed0.patch", "tree-kill": "^1.2.2", "ts-node": "^10.9.1", "undici": "^6.0.0", diff --git a/yarn.lock b/yarn.lock index bdc0345e69..34e906d216 100644 --- a/yarn.lock +++ b/yarn.lock @@ -466,7 +466,7 @@ __metadata: nanoid: "npm:^5.0.3" rxjs: "npm:^7.8.1" semver: "npm:^7.5.4" - tinykeys: "npm:^2.1.0" + tinykeys: "patch:tinykeys@npm%3A2.1.0#~/.yarn/patches/tinykeys-npm-2.1.0-819feeaed0.patch" tree-kill: "npm:^1.2.2" ts-node: "npm:^10.9.1" undici: "npm:^6.0.0" @@ -13054,6 +13054,7 @@ __metadata: foxact: "npm:^0.2.20" jotai: "npm:^2.5.1" jotai-effect: "npm:^0.2.3" + lodash-es: "npm:^4.17.21" nanoid: "npm:^5.0.3" react: "npm:^18.2.0" rxjs: "npm:^7.8.1" @@ -33232,7 +33233,7 @@ __metadata: languageName: node linkType: hard -"tinykeys@npm:2.1.0, tinykeys@npm:^2.1.0": +"tinykeys@npm:2.1.0": version: 2.1.0 resolution: "tinykeys@npm:2.1.0" checksum: 64d222e08472d1a55e42fb1f20f8c4587f7fab633cba0a23754eea3bf477044ae3160f203fd1061435352ed3df900fb49ecf83e829c090e847d7fa45f54491ad