From 05452bb297346726da741cce6393fc31b3268c60 Mon Sep 17 00:00:00 2001 From: Peng Xiao Date: Tue, 27 Jun 2023 15:40:37 +0800 Subject: [PATCH] feat: sqlite subdocument (#2816) Co-authored-by: Alex Yang --- apps/electron/playwright.config.ts | 1 + .../db/__tests__/workspace-db-adapter.spec.ts | 44 +++- .../electron/src/helper/db/base-db-adapter.ts | 21 +- apps/electron/src/helper/db/index.ts | 22 +- apps/electron/src/helper/db/secondary-db.ts | 116 ++++++--- apps/electron/src/helper/db/subjects.ts | 6 +- .../src/helper/db/workspace-db-adapter.ts | 126 ++++++++-- apps/electron/src/main/protocol.ts | 1 - .../pure/workspace-list-modal/index.tsx | 224 +++++++++--------- apps/web/src/hooks/__tests__/index.spec.tsx | 2 +- .../[workspaceId]/[pageId].tsx | 31 ++- nx.json | 3 +- .../image-preview-modal/hooks/use-zoom.tsx | 1 - .../image-preview-modal/index.css.ts | 1 - .../src/components/page-list/all-page.tsx | 28 +-- packages/component/src/ui/scrollbar/index.ts | 2 +- .../component/src/ui/scrollbar/scrollbar.tsx | 50 ++-- packages/hooks/src/__tests__/index.spec.ts | 2 +- .../src/use-block-suite-workspace-page.ts | 3 +- packages/infra/src/handler.ts | 11 +- packages/native/build.rs | 12 +- packages/native/index.d.ts | 15 +- packages/native/schema/src/lib.rs | 3 +- packages/native/src/sqlite/mod.rs | 138 ++++++++++- .../src/affine/__tests__/sync.spec.ts | 4 +- .../__tests__/sqlite-provider.spec.ts | 17 +- packages/workspace/src/providers/index.ts | 151 +----------- .../src/providers/sqlite-providers.ts | 212 +++++++++++++++++ .../y-indexeddb/src/__tests__/index.spec.ts | 15 +- tests/parallels/all-page.spec.ts | 6 +- 30 files changed, 842 insertions(+), 426 deletions(-) create mode 100644 packages/workspace/src/providers/sqlite-providers.ts diff --git a/apps/electron/playwright.config.ts b/apps/electron/playwright.config.ts index 4e91b35964..d3f67bbca7 100644 --- a/apps/electron/playwright.config.ts +++ b/apps/electron/playwright.config.ts @@ -12,6 +12,7 @@ import type { PlaywrightTestConfig } from '@playwright/test'; */ const config: PlaywrightTestConfig = { testDir: './tests', + testIgnore: '**/lib/**', fullyParallel: true, timeout: process.env.CI ? 50_000 : 30_000, use: { diff --git a/apps/electron/src/helper/db/__tests__/workspace-db-adapter.spec.ts b/apps/electron/src/helper/db/__tests__/workspace-db-adapter.spec.ts index 348d6c9d5a..329d509d09 100644 --- a/apps/electron/src/helper/db/__tests__/workspace-db-adapter.spec.ts +++ b/apps/electron/src/helper/db/__tests__/workspace-db-adapter.spec.ts @@ -20,14 +20,31 @@ afterEach(async () => { await fs.remove(tmpDir); }); +let testYDoc: Y.Doc; +let testYSubDoc: Y.Doc; + function getTestUpdates() { - const testYDoc = new Y.Doc(); + testYDoc = new Y.Doc(); const yText = testYDoc.getText('test'); yText.insert(0, 'hello'); + + testYSubDoc = new Y.Doc(); + testYDoc.getMap('subdocs').set('test-subdoc', testYSubDoc); + const updates = Y.encodeStateAsUpdate(testYDoc); return updates; } + +function getTestSubDocUpdates() { + const yText = testYSubDoc.getText('test'); + yText.insert(0, 'hello'); + + const updates = Y.encodeStateAsUpdate(testYSubDoc); + + return updates; +} + test('can create new db file if not exists', async () => { const { openWorkspaceDatabase } = await import('../workspace-db-adapter'); const workspaceId = v4(); @@ -68,6 +85,31 @@ test('on applyUpdate (from renderer), will trigger update', async () => { await db.destroy(); }); +test('on applyUpdate (from renderer, subdoc), will trigger update', async () => { + const { openWorkspaceDatabase } = await import('../workspace-db-adapter'); + const workspaceId = v4(); + const onUpdate = vi.fn(); + const insertUpdates = vi.fn(); + + const db = await openWorkspaceDatabase(workspaceId); + db.applyUpdate(getTestUpdates(), 'renderer'); + + db.db!.insertUpdates = insertUpdates; + db.update$.subscribe(onUpdate); + + const subdocUpdates = getTestSubDocUpdates(); + db.applyUpdate(subdocUpdates, 'renderer', testYSubDoc.guid); + + expect(onUpdate).toHaveBeenCalled(); + expect(insertUpdates).toHaveBeenCalledWith([ + { + docId: testYSubDoc.guid, + data: subdocUpdates, + }, + ]); + await db.destroy(); +}); + test('on applyUpdate (from external), will trigger update & send external update event', async () => { const { openWorkspaceDatabase } = await import('../workspace-db-adapter'); const workspaceId = v4(); diff --git a/apps/electron/src/helper/db/base-db-adapter.ts b/apps/electron/src/helper/db/base-db-adapter.ts index f147c85109..adece889b0 100644 --- a/apps/electron/src/helper/db/base-db-adapter.ts +++ b/apps/electron/src/helper/db/base-db-adapter.ts @@ -1,4 +1,4 @@ -import { SqliteConnection } from '@affine/native'; +import { type InsertRow, SqliteConnection } from '@affine/native'; import { logger } from '../logger'; @@ -79,21 +79,34 @@ export abstract class BaseSQLiteAdapter { } } - async getUpdates() { + async getUpdates(docId?: string) { try { if (!this.db) { logger.warn(`${this.path} is not connected`); return []; } - return await this.db.getUpdates(); + return await this.db.getUpdates(docId); } catch (error) { logger.error('getUpdates', error); return []; } } + async getAllUpdates() { + try { + if (!this.db) { + logger.warn(`${this.path} is not connected`); + return []; + } + return await this.db.getAllUpdates(); + } catch (error) { + logger.error('getAllUpdates', error); + return []; + } + } + // add a single update to SQLite - async addUpdateToSQLite(updates: Uint8Array[]) { + async addUpdateToSQLite(updates: InsertRow[]) { // batch write instead write per key stroke? try { if (!this.db) { diff --git a/apps/electron/src/helper/db/index.ts b/apps/electron/src/helper/db/index.ts index 05a6fd27c0..4f3f5c43b7 100644 --- a/apps/electron/src/helper/db/index.ts +++ b/apps/electron/src/helper/db/index.ts @@ -7,13 +7,17 @@ export * from './ensure-db'; export * from './subjects'; export const dbHandlers = { - getDocAsUpdates: async (id: string) => { - const workspaceDB = await ensureSQLiteDB(id); - return workspaceDB.getDocAsUpdates(); + getDocAsUpdates: async (workspaceId: string, subdocId?: string) => { + const workspaceDB = await ensureSQLiteDB(workspaceId); + return workspaceDB.getDocAsUpdates(subdocId); }, - applyDocUpdate: async (id: string, update: Uint8Array) => { - const workspaceDB = await ensureSQLiteDB(id); - return workspaceDB.applyUpdate(update); + applyDocUpdate: async ( + workspaceId: string, + update: Uint8Array, + subdocId?: string + ) => { + const workspaceDB = await ensureSQLiteDB(workspaceId); + return workspaceDB.applyUpdate(update, 'renderer', subdocId); }, addBlob: async (workspaceId: string, key: string, data: Uint8Array) => { const workspaceDB = await ensureSQLiteDB(workspaceId); @@ -38,7 +42,11 @@ export const dbHandlers = { export const dbEvents = { onExternalUpdate: ( - fn: (update: { workspaceId: string; update: Uint8Array }) => void + fn: (update: { + workspaceId: string; + update: Uint8Array; + docId?: string; + }) => void ) => { const sub = dbSubjects.externalUpdate.subscribe(fn); return () => { diff --git a/apps/electron/src/helper/db/secondary-db.ts b/apps/electron/src/helper/db/secondary-db.ts index e3687136d5..753acc93ec 100644 --- a/apps/electron/src/helper/db/secondary-db.ts +++ b/apps/electron/src/helper/db/secondary-db.ts @@ -1,6 +1,6 @@ import assert from 'node:assert'; -import type { SqliteConnection } from '@affine/native'; +import type { InsertRow } from '@affine/native'; import { debounce } from 'lodash-es'; import * as Y from 'yjs'; @@ -8,19 +8,19 @@ import { logger } from '../logger'; import type { YOrigin } from '../type'; import { getWorkspaceMeta } from '../workspace'; import { BaseSQLiteAdapter } from './base-db-adapter'; -import { mergeUpdate } from './merge-update'; import type { WorkspaceSQLiteDB } from './workspace-db-adapter'; const FLUSH_WAIT_TIME = 5000; const FLUSH_MAX_WAIT_TIME = 10000; +// todo: trim db when it is too big export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter { role = 'secondary'; yDoc = new Y.Doc(); firstConnected = false; destroyed = false; - updateQueue: Uint8Array[] = []; + updateQueue: { data: Uint8Array; docId?: string }[] = []; unsubscribers = new Set<() => void>(); @@ -29,10 +29,23 @@ export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter { public upstream: WorkspaceSQLiteDB ) { super(path); - this.setupAndListen(); + this.init(); logger.debug('[SecondaryWorkspaceSQLiteDB] created', this.workspaceId); } + getDoc(docId?: string) { + if (!docId) { + return this.yDoc; + } + // this should be pretty fast and we don't need to cache it + for (const subdoc of this.yDoc.subdocs) { + if (subdoc.guid === docId) { + return subdoc; + } + } + return null; + } + override async destroy() { await this.flushUpdateQueue(); this.unsubscribers.forEach(unsub => unsub()); @@ -47,7 +60,7 @@ export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter { // do not update db immediately, instead, push to a queue // and flush the queue in a future time - async addUpdateToUpdateQueue(db: SqliteConnection, update: Uint8Array) { + async addUpdateToUpdateQueue(update: InsertRow) { this.updateQueue.push(update); await this.debouncedFlush(); } @@ -101,55 +114,82 @@ export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter { } } - setupAndListen() { - if (this.firstConnected) { + setupListener(docId?: string) { + const doc = this.getDoc(docId); + if (!doc) { return; } - this.firstConnected = true; const onUpstreamUpdate = (update: Uint8Array, origin: YOrigin) => { if (origin === 'renderer') { // update to upstream yDoc should be replicated to self yDoc - this.applyUpdate(update, 'upstream'); + this.applyUpdate(update, 'upstream', docId); } }; const onSelfUpdate = async (update: Uint8Array, origin: YOrigin) => { // for self update from upstream, we need to push it to external DB - if (origin === 'upstream' && this.db) { - await this.addUpdateToUpdateQueue(this.db, update); + if (origin === 'upstream') { + await this.addUpdateToUpdateQueue({ + data: update, + docId, + }); } if (origin === 'self') { - this.upstream.applyUpdate(update, 'external'); + this.upstream.applyUpdate(update, 'external', docId); } }; + const onSubdocs = ({ added }: { added: Set }) => { + added.forEach(subdoc => { + this.setupListener(subdoc.guid); + }); + }; + // listen to upstream update this.upstream.yDoc.on('update', onUpstreamUpdate); this.yDoc.on('update', onSelfUpdate); + this.yDoc.on('subdocs', onSubdocs); this.unsubscribers.add(() => { this.upstream.yDoc.off('update', onUpstreamUpdate); this.yDoc.off('update', onSelfUpdate); + this.yDoc.off('subdocs', onSubdocs); }); - - this.run(() => { - // apply all updates from upstream - const upstreamUpdate = this.upstream.getDocAsUpdates(); - // to initialize the yDoc, we need to apply all updates from the db - this.applyUpdate(upstreamUpdate, 'upstream'); - }) - .then(() => { - logger.debug('run success'); - }) - .catch(err => { - logger.error('run error', err); - }); } - applyUpdate = (data: Uint8Array, origin: YOrigin = 'upstream') => { - Y.applyUpdate(this.yDoc, data, origin); + init() { + if (this.firstConnected) { + return; + } + this.firstConnected = true; + this.setupListener(); + // apply all updates from upstream + // we assume here that the upstream ydoc is already sync'ed + const syncUpstreamDoc = (docId?: string) => { + const update = this.upstream.getDocAsUpdates(docId); + if (update) { + this.applyUpdate(update, 'upstream'); + } + }; + syncUpstreamDoc(); + this.upstream.yDoc.subdocs.forEach(subdoc => { + syncUpstreamDoc(subdoc.guid); + }); + } + + applyUpdate = ( + data: Uint8Array, + origin: YOrigin = 'upstream', + docId?: string + ) => { + const doc = this.getDoc(docId); + if (doc) { + Y.applyUpdate(this.yDoc, data, origin); + } else { + logger.warn('applyUpdate: doc not found', docId); + } }; // TODO: have a better solution to handle blobs @@ -186,23 +226,33 @@ export class SecondaryWorkspaceSQLiteDB extends BaseSQLiteAdapter { async pull() { const start = performance.now(); assert(this.upstream.db, 'upstream db should be connected'); - const updates = await this.run(async () => { + const rows = await this.run(async () => { // TODO: no need to get all updates, just get the latest ones (using a cursor, etc)? await this.syncBlobs(); - return (await this.getUpdates()).map(update => update.data); + return await this.getAllUpdates(); }); - if (!updates || this.destroyed) { + if (!rows || this.destroyed) { return; } - const merged = mergeUpdate(updates); - this.applyUpdate(merged, 'self'); + // apply root doc first + rows.forEach(row => { + if (!row.docId) { + this.applyUpdate(row.data, 'self'); + } + }); + + rows.forEach(row => { + if (row.docId) { + this.applyUpdate(row.data, 'self', row.docId); + } + }); logger.debug( 'pull external updates', this.path, - updates.length, + rows.length, (performance.now() - start).toFixed(2), 'ms' ); diff --git a/apps/electron/src/helper/db/subjects.ts b/apps/electron/src/helper/db/subjects.ts index 05943e2331..263d68cc91 100644 --- a/apps/electron/src/helper/db/subjects.ts +++ b/apps/electron/src/helper/db/subjects.ts @@ -1,5 +1,9 @@ import { Subject } from 'rxjs'; export const dbSubjects = { - externalUpdate: new Subject<{ workspaceId: string; update: Uint8Array }>(), + externalUpdate: new Subject<{ + workspaceId: string; + update: Uint8Array; + docId?: string; + }>(), }; diff --git a/apps/electron/src/helper/db/workspace-db-adapter.ts b/apps/electron/src/helper/db/workspace-db-adapter.ts index abf663e16b..cda955be33 100644 --- a/apps/electron/src/helper/db/workspace-db-adapter.ts +++ b/apps/electron/src/helper/db/workspace-db-adapter.ts @@ -1,3 +1,5 @@ +import type { InsertRow } from '@affine/native'; +import { debounce } from 'lodash-es'; import { Subject } from 'rxjs'; import * as Y from 'yjs'; @@ -5,9 +7,10 @@ import { logger } from '../logger'; import type { YOrigin } from '../type'; import { getWorkspaceMeta } from '../workspace'; import { BaseSQLiteAdapter } from './base-db-adapter'; -import { mergeUpdate } from './merge-update'; import { dbSubjects } from './subjects'; +const TRIM_SIZE = 500; + export class WorkspaceSQLiteDB extends BaseSQLiteAdapter { role = 'primary'; yDoc = new Y.Doc(); @@ -28,33 +31,76 @@ export class WorkspaceSQLiteDB extends BaseSQLiteAdapter { this.firstConnected = false; } + getDoc(docId?: string) { + if (!docId) { + return this.yDoc; + } + // this should be pretty fast and we don't need to cache it + for (const subdoc of this.yDoc.subdocs) { + if (subdoc.guid === docId) { + return subdoc; + } + } + return null; + } + getWorkspaceName = () => { - return this.yDoc.getMap('space:meta').get('name') as string; + return this.yDoc.getMap('meta').get('name') as string; }; + setupListener(docId?: string) { + const doc = this.getDoc(docId); + if (doc) { + const onUpdate = async (update: Uint8Array, origin: YOrigin) => { + const insertRows = [{ data: update, docId }]; + if (origin === 'renderer') { + await this.addUpdateToSQLite(insertRows); + } else if (origin === 'external') { + dbSubjects.externalUpdate.next({ + workspaceId: this.workspaceId, + update, + docId, + }); + await this.addUpdateToSQLite(insertRows); + logger.debug('external update', this.workspaceId); + } + }; + const onSubdocs = ({ added }: { added: Set }) => { + added.forEach(subdoc => { + this.setupListener(subdoc.guid); + }); + }; + + doc.on('update', onUpdate); + doc.on('subdocs', onSubdocs); + } else { + logger.error('setupListener: doc not found', docId); + } + } + async init() { const db = await super.connectIfNeeded(); if (!this.firstConnected) { - this.yDoc.on('update', async (update: Uint8Array, origin: YOrigin) => { - if (origin === 'renderer') { - await this.addUpdateToSQLite([update]); - } else if (origin === 'external') { - dbSubjects.externalUpdate.next({ - workspaceId: this.workspaceId, - update, - }); - await this.addUpdateToSQLite([update]); - logger.debug('external update', this.workspaceId); - } - }); + this.setupListener(); } - const updates = await this.getUpdates(); - const merged = mergeUpdate(updates.map(update => update.data)); + const updates = await this.getAllUpdates(); - // to initialize the yDoc, we need to apply all updates from the db - this.applyUpdate(merged, 'self'); + // apply root first (without ID). + // subdoc will be available after root is applied + updates.forEach(update => { + if (!update.docId) { + this.applyUpdate(update.data, 'self'); + } + }); + + // then, for all subdocs, apply the updates + updates.forEach(update => { + if (update.docId) { + this.applyUpdate(update.data, 'self', update.docId); + } + }); this.firstConnected = true; this.update$.next(); @@ -62,18 +108,32 @@ export class WorkspaceSQLiteDB extends BaseSQLiteAdapter { return db; } - getDocAsUpdates = () => { - return Y.encodeStateAsUpdate(this.yDoc); + // unlike getUpdates, this will return updates in yDoc + getDocAsUpdates = (docId?: string) => { + const doc = docId ? this.getDoc(docId) : this.yDoc; + if (doc) { + return Y.encodeStateAsUpdate(doc); + } + return null; }; // non-blocking and use yDoc to validate the update // after that, the update is added to the db - applyUpdate = (data: Uint8Array, origin: YOrigin = 'renderer') => { + applyUpdate = ( + data: Uint8Array, + origin: YOrigin = 'renderer', + docId?: string + ) => { // todo: trim the updates when the number of records is too large // 1. store the current ydoc state in the db // 2. then delete the old updates // yjs-idb will always trim the db for the first time after DB is loaded - Y.applyUpdate(this.yDoc, data, origin); + const doc = this.getDoc(docId); + if (doc) { + Y.applyUpdate(doc, data, origin); + } else { + logger.warn('applyUpdate: doc not found', docId); + } }; override async addBlob(key: string, value: Uint8Array) { @@ -87,10 +147,30 @@ export class WorkspaceSQLiteDB extends BaseSQLiteAdapter { await super.deleteBlob(key); } - override async addUpdateToSQLite(data: Uint8Array[]) { + override async addUpdateToSQLite(data: InsertRow[]) { this.update$.next(); + data.forEach(row => { + this.trimWhenNecessary(row.docId)?.catch(err => { + logger.error('trimWhenNecessary failed', err); + }); + }); await super.addUpdateToSQLite(data); } + + trimWhenNecessary = debounce(async (docId?: string) => { + if (this.firstConnected) { + const count = (await this.db?.getUpdatesCount(docId)) ?? 0; + if (count > TRIM_SIZE) { + logger.debug(`trim ${this.workspaceId}:${docId} ${count}`); + const update = this.getDocAsUpdates(docId); + if (update) { + const insertRows = [{ data: update, docId }]; + await this.db?.replaceUpdates(docId, insertRows); + logger.debug(`trim ${this.workspaceId}:${docId} successfully`); + } + } + } + }, 1000); } export async function openWorkspaceDatabase(workspaceId: string) { diff --git a/apps/electron/src/main/protocol.ts b/apps/electron/src/main/protocol.ts index 4dfd18f9b3..2a9df42178 100644 --- a/apps/electron/src/main/protocol.ts +++ b/apps/electron/src/main/protocol.ts @@ -34,7 +34,6 @@ export function registerProtocol() { const url = request.url.replace(/^file:\/\//, ''); const realpath = toAbsolutePath(url); callback(realpath); - console.log('interceptFileProtocol realpath', request.url, realpath); return true; }); diff --git a/apps/web/src/components/pure/workspace-list-modal/index.tsx b/apps/web/src/components/pure/workspace-list-modal/index.tsx index dab3c8462a..7ef3c40cc6 100644 --- a/apps/web/src/components/pure/workspace-list-modal/index.tsx +++ b/apps/web/src/components/pure/workspace-list-modal/index.tsx @@ -107,120 +107,120 @@ export const WorkspaceListModal = ({ - - flavour !== WorkspaceFlavour.PUBLIC - ) as (AffineLegacyCloudWorkspace | LocalWorkspace)[] - } - currentWorkspaceId={currentWorkspaceId} - onClick={onClickWorkspace} - onSettingClick={onClickWorkspaceSetting} - onDragEnd={useCallback( - (event: DragEndEvent) => { - const { active, over } = event; - if (active.id !== over?.id) { - onMoveWorkspace(active.id as string, over?.id as string); - } - }, - [onMoveWorkspace] - )} - /> - {!environment.isDesktop && ( -
- - - - - - - - {t['New Workspace']()} - -

{t['Create Or Import']()}

-
-
-
- )} - - {environment.isDesktop && ( - - - - -
-

{t['New Workspace']()}

- -

{t['Create your own workspace']()}

-
-
- - - -
-
-
- - - -
-

{t['Add Workspace']()}

- -

{t['Add Workspace Hint']()}

-
-
- - - -
-
-
- + + flavour !== WorkspaceFlavour.PUBLIC + ) as (AffineLegacyCloudWorkspace | LocalWorkspace)[] } - > - - - - + currentWorkspaceId={currentWorkspaceId} + onClick={onClickWorkspace} + onSettingClick={onClickWorkspaceSetting} + onDragEnd={useCallback( + (event: DragEndEvent) => { + const { active, over } = event; + if (active.id !== over?.id) { + onMoveWorkspace(active.id as string, over?.id as string); + } + }, + [onMoveWorkspace] + )} + /> + {!environment.isDesktop && ( +
+ + + + - - - {t['New Workspace']()} - -

{t['Create Or Import']()}

-
-
-
- )} -
+ + + {t['New Workspace']()} + +

{t['Create Or Import']()}

+
+ + + )} + + {environment.isDesktop && ( + + + + +
+

{t['New Workspace']()}

+ +

{t['Create your own workspace']()}

+
+
+ + + +
+
+
+ + + +
+

{t['Add Workspace']()}

+ +

{t['Add Workspace Hint']()}

+
+
+ + + +
+
+
+ + } + > + + + + + + + + {t['New Workspace']()} + +

{t['Create Or Import']()}

+
+
+
+ )} +