From 55b31827999bc0e8b350813f3741ba965e7fd4d6 Mon Sep 17 00:00:00 2001 From: Alex Yang Date: Thu, 31 Aug 2023 00:40:34 -0500 Subject: [PATCH] feat(core): support syncing workspaces and blobs in the background (#4057) --- apps/core/src/adapters/workspace.ts | 5 +- packages/env/src/workspace.ts | 2 + packages/workspace/package.json | 2 + .../workspace/src/affine}/crud.ts | 5 +- packages/workspace/src/affine/sync.ts | 93 +++++++++++++++++++ packages/workspace/src/affine/worker.ts | 0 packages/workspace/src/atom.ts | 2 + .../workspace/src/providers/cloud/index.ts | 13 ++- packages/y-indexeddb/src/utils.ts | 30 ++++++ 9 files changed, 146 insertions(+), 6 deletions(-) rename {apps/core/src/adapters/cloud => packages/workspace/src/affine}/crud.ts (97%) create mode 100644 packages/workspace/src/affine/sync.ts create mode 100644 packages/workspace/src/affine/worker.ts diff --git a/apps/core/src/adapters/workspace.ts b/apps/core/src/adapters/workspace.ts index 77bd0f2c8..fc5dfbf6e 100644 --- a/apps/core/src/adapters/workspace.ts +++ b/apps/core/src/adapters/workspace.ts @@ -9,8 +9,9 @@ import { ReleaseType, WorkspaceFlavour, } from '@affine/env/workspace'; +import { CRUD as CloudCRUD } from '@affine/workspace/affine/crud'; +import { startSync, stopSync } from '@affine/workspace/affine/sync'; -import { CRUD as CloudCRUD } from './cloud/crud'; import { UI as CloudUI } from './cloud/ui'; import { LocalAdapter } from './local'; import { UI as PublicCloudUI } from './public-cloud/ui'; @@ -40,6 +41,8 @@ export const WorkspaceAdapters = { return false; } }, + 'service:start': startSync, + 'service:stop': stopSync, } as Partial, CRUD: CloudCRUD, UI: CloudUI, diff --git a/packages/env/src/workspace.ts b/packages/env/src/workspace.ts index a43142cd3..bbe79aa81 100644 --- a/packages/env/src/workspace.ts +++ b/packages/env/src/workspace.ts @@ -190,6 +190,8 @@ export interface AppEvents { 'app:init': () => string[]; // event if you have access to workspace adapter 'app:access': () => Promise; + 'service:start': () => void; + 'service:stop': () => void; } export interface WorkspaceAdapter { diff --git a/packages/workspace/package.json b/packages/workspace/package.json index 997528ea4..7c3f2a642 100644 --- a/packages/workspace/package.json +++ b/packages/workspace/package.json @@ -7,7 +7,9 @@ "./type": "./src/type.ts", "./migration": "./src/migration/index.ts", "./local/crud": "./src/local/crud.ts", + "./affine/crud": "./src/affine/crud.ts", "./affine/gql": "./src/affine/gql.ts", + "./affine/sync": "./src/affine/sync.ts", "./providers": "./src/providers/index.ts" }, "peerDependencies": { diff --git a/apps/core/src/adapters/cloud/crud.ts b/packages/workspace/src/affine/crud.ts similarity index 97% rename from apps/core/src/adapters/cloud/crud.ts rename to packages/workspace/src/affine/crud.ts index e0bbf520f..576f38a07 100644 --- a/apps/core/src/adapters/cloud/crud.ts +++ b/packages/workspace/src/affine/crud.ts @@ -9,8 +9,6 @@ import { getWorkspaceQuery, getWorkspacesQuery, } from '@affine/graphql'; -import { fetcher } from '@affine/workspace/affine/gql'; -import { getOrCreateWorkspace } from '@affine/workspace/manager'; import { createIndexeddbStorage, Workspace } from '@blocksuite/store'; import { migrateLocalBlobStorage } from '@toeverything/infra/blocksuite'; import { @@ -20,6 +18,9 @@ import { import { getSession } from 'next-auth/react'; import { proxy } from 'valtio/vanilla'; +import { getOrCreateWorkspace } from '../manager'; +import { fetcher } from './gql'; + const Y = Workspace.Y; async function deleteLocalBlobStorage(id: string) { diff --git a/packages/workspace/src/affine/sync.ts b/packages/workspace/src/affine/sync.ts new file mode 100644 index 000000000..1e47bde0d --- /dev/null +++ b/packages/workspace/src/affine/sync.ts @@ -0,0 +1,93 @@ +import { createIndexeddbStorage } from '@blocksuite/store'; +import { pushBinary } from '@toeverything/y-indexeddb'; +import type { Doc } from 'yjs'; +import { applyUpdate } from 'yjs'; + +import { createCloudBlobStorage } from '../blob/cloud-blob-storage'; +import { downloadBinaryFromCloud } from '../providers/cloud'; +import { CRUD } from './crud'; + +let abortController: AbortController | undefined; + +const downloadRecursive = async ( + rootGuid: string, + doc: Doc, + signal: AbortSignal +): Promise => { + if (signal.aborted) { + return; + } + const binary = await downloadBinaryFromCloud(rootGuid, doc.guid); + if (typeof binary !== 'boolean') { + const update = new Uint8Array(binary); + if (rootGuid === doc.guid) { + // only apply the root doc + applyUpdate(doc, update, 'affine-cloud-service'); + } else { + await pushBinary(doc.guid, update); + } + } + return Promise.all( + [...doc.subdocs.values()].map(subdoc => + downloadRecursive(rootGuid, subdoc, signal) + ) + ).then(); +}; + +export async function startSync() { + abortController = new AbortController(); + const signal = abortController.signal; + const workspaces = await CRUD.list(); + const downloadCloudPromises = workspaces.map(workspace => + downloadRecursive(workspace.id, workspace.blockSuiteWorkspace.doc, signal) + ); + const syncBlobPromises = workspaces.map(async workspace => { + const cloudBlobStorage = createCloudBlobStorage(workspace.id); + const indexeddbBlobStorage = createIndexeddbStorage(workspace.id); + return Promise.all([ + cloudBlobStorage.crud.list(), + indexeddbBlobStorage.crud.list(), + ]).then(([cloudKeys, indexeddbKeys]) => { + if (signal.aborted) { + return; + } + const cloudKeysSet = new Set(cloudKeys); + const indexeddbKeysSet = new Set(indexeddbKeys); + // missing in indexeddb + const missingLocalKeys = cloudKeys.filter( + key => !indexeddbKeysSet.has(key) + ); + // missing in cloud + const missingCloudKeys = indexeddbKeys.filter( + key => !cloudKeysSet.has(key) + ); + return Promise.all([ + ...missingLocalKeys.map(key => + cloudBlobStorage.crud.get(key).then(async value => { + if (signal.aborted) { + return; + } + if (value) { + await indexeddbBlobStorage.crud.set(key, value); + } + }) + ), + ...missingCloudKeys.map(key => + indexeddbBlobStorage.crud.get(key).then(async value => { + if (signal.aborted) { + return; + } + if (value) { + await cloudBlobStorage.crud.set(key, value); + } + }) + ), + ]); + }); + }); + await Promise.all([...downloadCloudPromises, ...syncBlobPromises]); +} + +export async function stopSync() { + abortController?.abort(); +} diff --git a/packages/workspace/src/affine/worker.ts b/packages/workspace/src/affine/worker.ts new file mode 100644 index 000000000..e69de29bb diff --git a/packages/workspace/src/atom.ts b/packages/workspace/src/atom.ts index ace6f9fc5..67d7d9346 100644 --- a/packages/workspace/src/atom.ts +++ b/packages/workspace/src/atom.ts @@ -143,6 +143,7 @@ const fetchMetadata: FetchMetadata = async (get, { signal }) => { removed.forEach(meta => { metadata.splice(metadata.indexOf(meta), 1); }); + Adapter.Events['service:stop']?.(); continue; } try { @@ -178,6 +179,7 @@ const fetchMetadata: FetchMetadata = async (get, { signal }) => { } catch (e) { console.error('list data error:', e); } + Adapter.Events['service:start']?.(); } } const metadataMap = new Map(metadata.map(x => [x.id, x])); diff --git a/packages/workspace/src/providers/cloud/index.ts b/packages/workspace/src/providers/cloud/index.ts index d4895a9bf..d2b1a502f 100644 --- a/packages/workspace/src/providers/cloud/index.ts +++ b/packages/workspace/src/providers/cloud/index.ts @@ -8,23 +8,30 @@ const Y = Workspace.Y; const logger = new DebugLogger('affine:cloud'); +const hashMap = new Map(); + export async function downloadBinaryFromCloud( rootGuid: string, pageGuid: string -) { +): Promise { + if (hashMap.has(`${rootGuid}/${pageGuid}`)) { + return true; + } const response = await fetchWithReport( runtimeConfig.serverUrlPrefix + `/api/workspaces/${rootGuid}/docs/${pageGuid}` ); if (response.ok) { - return response.arrayBuffer(); + const arrayBuffer = await response.arrayBuffer(); + hashMap.set(`${rootGuid}/${pageGuid}`, arrayBuffer); + return arrayBuffer; } return false; } async function downloadBinary(rootGuid: string, doc: Doc) { const buffer = await downloadBinaryFromCloud(rootGuid, doc.guid); - if (buffer) { + if (typeof buffer !== 'boolean') { Y.applyUpdate(doc, new Uint8Array(buffer), 'affine-cloud'); } } diff --git a/packages/y-indexeddb/src/utils.ts b/packages/y-indexeddb/src/utils.ts index 30a1a1df3..543afea0a 100644 --- a/packages/y-indexeddb/src/utils.ts +++ b/packages/y-indexeddb/src/utils.ts @@ -173,3 +173,33 @@ export async function overwriteBinary( ], }); } + +export async function pushBinary( + guid: string, + update: UpdateMessage['update'], + dbName = DEFAULT_DB_NAME +) { + const dbPromise = openDB(dbName, dbVersion, { + upgrade: upgradeDB, + }); + const db = await dbPromise; + const t = db.transaction('workspace', 'readwrite').objectStore('workspace'); + const doc = await t.get(guid); + if (!doc) { + await t.put({ + id: guid, + updates: [ + { + timestamp: Date.now(), + update, + }, + ], + }); + } else { + doc.updates.push({ + timestamp: Date.now(), + update, + }); + await t.put(doc); + } +}