refactor(workspace): blob sync (#5037)

This pr implements a blob engine.
It exposes a single `BlobStorage` to the `blocksuite`, and in it we sync blobs between multiple storages.

The implement still have few issues, but we can merge this pr first and fix them in future.

* BlobEngine currently **do nothing when delete**, because synchronization logic conflicts with deletion logic.
* BlobEngine sync between storages by querying the blob list at regular intervals. This will **cause many queries**, we can avoid this in the future by subscribing to remote changes.
This commit is contained in:
EYHN 2023-11-23 07:56:19 +00:00
parent 1740e7efa1
commit 23e0137ed8
No known key found for this signature in database
GPG Key ID: 46C9E26A75AB276C
13 changed files with 423 additions and 174 deletions

View File

@ -14,6 +14,7 @@ import {
} from '@affine/component/workspace';
import { useAFFiNEI18N } from '@affine/i18n/hooks';
import { rootWorkspacesMetadataAtom } from '@affine/workspace/atom';
import { getBlobEngine } from '@affine/workspace/manager';
import { assertExists } from '@blocksuite/global/utils';
import type { DragEndEvent } from '@dnd-kit/core';
import {
@ -116,10 +117,44 @@ type WorkspaceLayoutProps = {
migration?: MigrationPoint;
};
const useSyncWorkspaceBlob = () => {
// temporary solution for sync blob
const [currentWorkspace] = useCurrentWorkspace();
useEffect(() => {
const blobEngine = getBlobEngine(currentWorkspace.blockSuiteWorkspace);
let stopped = false;
function sync() {
if (stopped) {
return;
}
blobEngine
?.sync()
.catch(error => {
console.error('sync blob error', error);
})
.finally(() => {
// sync every 1 minute
setTimeout(sync, 60000);
});
}
// after currentWorkspace changed, wait 1 second to start sync
setTimeout(sync, 1000);
return () => {
stopped = true;
};
}, [currentWorkspace]);
};
export const WorkspaceLayout = function WorkspacesSuspense({
children,
migration,
}: PropsWithChildren<WorkspaceLayoutProps>) {
useSyncWorkspaceBlob();
return (
<AdapterProviderWrapper>
<CurrentWorkspaceContext>

View File

@ -22,6 +22,7 @@
"@toeverything/y-indexeddb": "workspace:*",
"async-call-rpc": "^6.3.1",
"idb": "^7.1.1",
"idb-keyval": "^6.2.1",
"is-svg": "^5.0.0",
"jotai": "^2.5.1",
"js-base64": "^3.7.5",

View File

@ -1,79 +0,0 @@
import {
checkBlobSizesQuery,
deleteBlobMutation,
fetchWithTraceReport,
listBlobsQuery,
setBlobMutation,
} from '@affine/graphql';
import { fetcher } from '@affine/workspace/affine/gql';
import type { BlobStorage } from '@blocksuite/store';
import { predefinedStaticFiles } from './local-static-storage';
import { bufferToBlob } from './util';
export const createCloudBlobStorage = (workspaceId: string): BlobStorage => {
return {
crud: {
get: async key => {
const suffix = key.startsWith('/')
? key
: predefinedStaticFiles.includes(key)
? `/static/${key}`
: `/api/workspaces/${workspaceId}/blobs/${key}`;
return fetchWithTraceReport(
runtimeConfig.serverUrlPrefix + suffix
).then(async res => {
if (!res.ok) {
// status not in the range 200-299
return null;
}
return bufferToBlob(await res.arrayBuffer());
});
},
set: async (key, value) => {
const {
checkBlobSize: { size },
} = await fetcher({
query: checkBlobSizesQuery,
variables: {
workspaceId,
size: value.size,
},
});
if (size <= 0) {
throw new Error('Blob size limit exceeded');
}
const result = await fetcher({
query: setBlobMutation,
variables: {
workspaceId,
blob: new File([value], key),
},
});
console.assert(result.setBlob === key, 'Blob hash mismatch');
return key;
},
list: async () => {
const result = await fetcher({
query: listBlobsQuery,
variables: {
workspaceId,
},
});
return result.listBlobs;
},
delete: async (key: string) => {
await fetcher({
query: deleteBlobMutation,
variables: {
workspaceId,
hash: key,
},
});
},
},
};
};

View File

@ -0,0 +1,139 @@
import { DebugLogger } from '@affine/debug';
import { difference } from 'lodash-es';
const logger = new DebugLogger('affine:blob-engine');
export class BlobEngine {
constructor(
private local: BlobStorage,
private remotes: BlobStorage[]
) {}
get storages() {
return [this.local, ...this.remotes];
}
async sync() {
if (this.local.readonly) {
return;
}
logger.debug('start syncing blob...');
for (const remote of this.remotes) {
let localList;
let remoteList;
try {
localList = await this.local.list();
remoteList = await remote.list();
} catch (err) {
logger.error(`error when sync`, err);
continue;
}
if (!remote.readonly) {
const needUpload = difference(localList, remoteList);
for (const key of needUpload) {
try {
const data = await this.local.get(key);
if (data) {
await remote.set(key, data);
}
} catch (err) {
logger.error(
`error when sync ${key} from [${this.local.name}] to [${remote.name}]`,
err
);
}
}
}
const needDownload = difference(remoteList, localList);
for (const key of needDownload) {
try {
const data = await remote.get(key);
if (data) {
await this.local.set(key, data);
}
} catch (err) {
logger.error(
`error when sync ${key} from [${remote.name}] to [${this.local.name}]`,
err
);
}
}
}
logger.debug('finish syncing blob');
}
async get(key: string) {
logger.debug('get blob', key);
for (const storage of this.storages) {
const data = await storage.get(key);
if (data) {
return data;
}
}
return undefined;
}
async set(key: string, value: Blob) {
if (this.local.readonly) {
throw new Error('local peer is readonly');
}
// await upload to the local peer
await this.local.set(key, value);
// uploads to other peers in the background
Promise.allSettled(
this.remotes
.filter(r => !r.readonly)
.map(peer =>
peer.set(key, value).catch(err => {
logger.error('error when upload to peer', err);
})
)
)
.then(result => {
if (result.some(({ status }) => status === 'rejected')) {
logger.error(
`blob ${key} update finish, but some peers failed to update`
);
} else {
logger.debug(`blob ${key} update finish`);
}
})
.catch(() => {
// Promise.allSettled never reject
});
}
async delete(_key: string) {
// not supported
}
async list() {
const blobList = new Set<string>();
for (const peer of this.storages) {
const list = await peer.list();
if (list) {
for (const blob of list) {
blobList.add(blob);
}
}
}
return Array.from(blobList);
}
}
export interface BlobStorage {
name: string;
readonly: boolean;
get: (key: string) => Promise<Blob | undefined>;
set: (key: string, value: Blob) => Promise<void>;
delete: (key: string) => Promise<void>;
list: () => Promise<string[]>;
}

View File

@ -0,0 +1,37 @@
import { BlobEngine } from './engine';
import {
createAffineCloudBlobStorage,
createIndexeddbBlobStorage,
createSQLiteBlobStorage,
createStaticBlobStorage,
} from './storage';
export * from './engine';
export * from './storage';
export function createLocalBlobStorage(workspaceId: string) {
if (environment.isDesktop) {
return createSQLiteBlobStorage(workspaceId);
} else {
return createIndexeddbBlobStorage(workspaceId);
}
}
export function createLocalBlobEngine(workspaceId: string) {
return new BlobEngine(createLocalBlobStorage(workspaceId), [
createStaticBlobStorage(),
]);
}
export function createAffineCloudBlobEngine(workspaceId: string) {
return new BlobEngine(createLocalBlobStorage(workspaceId), [
createStaticBlobStorage(),
createAffineCloudBlobStorage(workspaceId),
]);
}
export function createAffinePublicBlobEngine(workspaceId: string) {
return new BlobEngine(createAffineCloudBlobStorage(workspaceId), [
createStaticBlobStorage(),
]);
}

View File

@ -1,34 +0,0 @@
import { assertExists } from '@blocksuite/global/utils';
import type { BlobStorage } from '@blocksuite/store';
import { bufferToBlob } from './util';
export const createSQLiteStorage = (workspaceId: string): BlobStorage => {
const apis = window.apis;
assertExists(apis);
return {
crud: {
get: async (key: string) => {
const buffer = await apis.db.getBlob(workspaceId, key);
if (buffer) {
return bufferToBlob(buffer);
}
return null;
},
set: async (key: string, value: Blob) => {
await apis.db.addBlob(
workspaceId,
key,
new Uint8Array(await value.arrayBuffer())
);
return key;
},
delete: async (key: string) => {
return apis.db.deleteBlob(workspaceId, key);
},
list: async () => {
return apis.db.getBlobKeys(workspaceId);
},
},
};
};

View File

@ -0,0 +1,76 @@
import {
checkBlobSizesQuery,
deleteBlobMutation,
fetchWithTraceReport,
listBlobsQuery,
setBlobMutation,
} from '@affine/graphql';
import { fetcher } from '@affine/workspace/affine/gql';
import type { BlobStorage } from '../engine';
export const createAffineCloudBlobStorage = (
workspaceId: string
): BlobStorage => {
return {
name: 'affine-cloud',
readonly: false,
get: async key => {
const suffix = key.startsWith('/')
? key
: `/api/workspaces/${workspaceId}/blobs/${key}`;
return fetchWithTraceReport(runtimeConfig.serverUrlPrefix + suffix).then(
async res => {
if (!res.ok) {
// status not in the range 200-299
return undefined;
}
return await res.blob();
}
);
},
set: async (key, value) => {
const {
checkBlobSize: { size },
} = await fetcher({
query: checkBlobSizesQuery,
variables: {
workspaceId,
size: value.size,
},
});
if (size <= 0) {
throw new Error('Blob size limit exceeded');
}
const result = await fetcher({
query: setBlobMutation,
variables: {
workspaceId,
blob: new File([value], key),
},
});
console.assert(result.setBlob === key, 'Blob hash mismatch');
},
list: async () => {
const result = await fetcher({
query: listBlobsQuery,
variables: {
workspaceId,
},
});
return result.listBlobs;
},
delete: async (key: string) => {
await fetcher({
query: deleteBlobMutation,
variables: {
workspaceId,
hash: key,
},
});
},
};
};

View File

@ -0,0 +1,4 @@
export * from './affine-cloud';
export * from './indexeddb';
export * from './sqlite';
export * from './static';

View File

@ -0,0 +1,32 @@
import { createStore, del, get, keys, set } from 'idb-keyval';
import type { BlobStorage } from '../engine';
export const createIndexeddbBlobStorage = (
workspaceId: string
): BlobStorage => {
const db = createStore(`${workspaceId}_blob`, 'blob');
const mimeTypeDb = createStore(`${workspaceId}_blob_mime`, 'blob_mime');
return {
name: 'indexeddb',
readonly: false,
get: async (key: string) => {
const res = await get<ArrayBuffer>(key, db);
if (res) {
return new Blob([res], { type: await get(key, mimeTypeDb) });
}
return undefined;
},
set: async (key: string, value: Blob) => {
await set(key, await value.arrayBuffer(), db);
await set(key, value.type, mimeTypeDb);
},
delete: async (key: string) => {
await del(key, db);
await del(key, mimeTypeDb);
},
list: async () => {
return keys<string>(db);
},
};
};

View File

@ -0,0 +1,33 @@
import { assertExists } from '@blocksuite/global/utils';
import type { BlobStorage } from '../engine';
import { bufferToBlob } from '../util';
export const createSQLiteBlobStorage = (workspaceId: string): BlobStorage => {
const apis = window.apis;
assertExists(apis);
return {
name: 'sqlite',
readonly: false,
get: async (key: string) => {
const buffer = await apis.db.getBlob(workspaceId, key);
if (buffer) {
return bufferToBlob(buffer);
}
return undefined;
},
set: async (key: string, value: Blob) => {
await apis.db.addBlob(
workspaceId,
key,
new Uint8Array(await value.arrayBuffer())
);
},
delete: async (key: string) => {
return apis.db.deleteBlob(workspaceId, key);
},
list: async () => {
return apis.db.getBlobKeys(workspaceId);
},
};
};

View File

@ -1,6 +1,4 @@
import type { BlobStorage } from '@blocksuite/store';
import { bufferToBlob } from './util';
import type { BlobStorage } from '../engine';
export const predefinedStaticFiles = [
'029uztLz2CzJezK7UUhrbGiWUdZ0J7NVs_qR6RDsvb8=',
@ -38,38 +36,36 @@ export const predefinedStaticFiles = [
'v2yF7lY2L5rtorTtTmYFsoMb9dBPKs5M1y9cUKxcI1M=',
];
export const createStaticStorage = (): BlobStorage => {
export const createStaticBlobStorage = (): BlobStorage => {
return {
crud: {
get: async (key: string) => {
const isStaticResource =
predefinedStaticFiles.includes(key) || key.startsWith('/static/');
name: 'static',
readonly: true,
get: async (key: string) => {
const isStaticResource =
predefinedStaticFiles.includes(key) || key.startsWith('/static/');
if (!isStaticResource) {
return null;
}
if (!isStaticResource) {
return undefined;
}
const path = key.startsWith('/static/') ? key : `/static/${key}`;
const response = await fetch(path);
const path = key.startsWith('/static/') ? key : `/static/${key}`;
const response = await fetch(path);
if (response.ok) {
const buffer = await response.arrayBuffer();
return bufferToBlob(buffer);
}
if (response.ok) {
return await response.blob();
}
return null;
},
set: async (key: string) => {
// ignore
return key;
},
delete: async () => {
// ignore
},
list: async () => {
// ignore
return [];
},
return undefined;
},
set: async () => {
// ignore
},
delete: async () => {
// ignore
},
list: async () => {
// ignore
return [];
},
};
};

View File

@ -3,16 +3,19 @@ import type { BlockSuiteFeatureFlags } from '@affine/env/global';
import { WorkspaceFlavour } from '@affine/env/workspace';
import { createAffinePublicProviders } from '@affine/workspace/providers';
import { __unstableSchemas, AffineSchemas } from '@blocksuite/blocks/models';
import type { DocProviderCreator, StoreOptions } from '@blocksuite/store';
import { createIndexeddbStorage, Schema, Workspace } from '@blocksuite/store';
import type { DocProviderCreator } from '@blocksuite/store';
import { Schema, Workspace } from '@blocksuite/store';
import { INTERNAL_BLOCKSUITE_HASH_MAP } from '@toeverything/infra/__internal__/workspace';
import { nanoid } from 'nanoid';
import type { Doc } from 'yjs';
import type { Transaction } from 'yjs';
import { createCloudBlobStorage } from '../blob/cloud-blob-storage';
import { createStaticStorage } from '../blob/local-static-storage';
import { createSQLiteStorage } from '../blob/sqlite-blob-storage';
import type { BlobEngine } from '../blob';
import {
createAffineCloudBlobEngine,
createAffinePublicBlobEngine,
createLocalBlobEngine,
} from '../blob';
import { createAffineProviders, createLocalProviders } from '../providers';
function setEditorFlags(workspace: Workspace) {
@ -81,6 +84,12 @@ const createMonitor = (doc: Doc) => {
});
};
const workspaceBlobEngineWeakMap = new WeakMap<Workspace, BlobEngine>();
export function getBlobEngine(workspace: Workspace) {
// temporary solution to get blob engine from workspace
return workspaceBlobEngineWeakMap.get(workspace);
}
// if not exist, create a new workspace
export function getOrCreateWorkspace(
id: string,
@ -91,48 +100,47 @@ export function getOrCreateWorkspace(
return INTERNAL_BLOCKSUITE_HASH_MAP.get(id) as Workspace;
}
const blobStorages: StoreOptions['blobStorages'] = [];
let blobEngine: BlobEngine;
if (flavour === WorkspaceFlavour.AFFINE_CLOUD) {
if (isBrowser) {
blobStorages.push(createIndexeddbStorage);
blobStorages.push(createCloudBlobStorage);
if (environment.isDesktop && runtimeConfig.enableSQLiteProvider) {
blobStorages.push(createSQLiteStorage);
}
providerCreators.push(...createAffineProviders());
// todo(JimmFly): add support for cloud storage
}
blobEngine = createAffineCloudBlobEngine(id);
providerCreators.push(...createAffineProviders());
} else if (flavour === WorkspaceFlavour.LOCAL) {
if (isBrowser) {
blobStorages.push(createIndexeddbStorage);
if (environment.isDesktop && runtimeConfig.enableSQLiteProvider) {
blobStorages.push(createSQLiteStorage);
}
}
blobEngine = createLocalBlobEngine(id);
providerCreators.push(...createLocalProviders());
} else if (flavour === WorkspaceFlavour.AFFINE_PUBLIC) {
if (isBrowser) {
blobStorages.push(createIndexeddbStorage);
if (environment.isDesktop && runtimeConfig.enableSQLiteProvider) {
blobStorages.push(createSQLiteStorage);
}
}
blobStorages.push(createCloudBlobStorage);
blobEngine = createAffinePublicBlobEngine(id);
providerCreators.push(...createAffinePublicProviders());
} else {
throw new Error('unsupported flavour');
}
blobStorages.push(createStaticStorage);
const workspace = new Workspace({
id,
isSSR: !isBrowser,
providerCreators: typeof window === 'undefined' ? [] : providerCreators,
blobStorages: blobStorages,
blobStorages: [
() => ({
crud: {
async get(key) {
return (await blobEngine.get(key)) ?? null;
},
async set(key, value) {
await blobEngine.set(key, value);
return key;
},
async delete(key) {
return blobEngine.delete(key);
},
async list() {
return blobEngine.list();
},
},
}),
],
idGenerator: () => nanoid(),
schema: globalBlockSuiteSchema,
});
workspaceBlobEngineWeakMap.set(workspace, blobEngine);
createMonitor(workspace.doc);
setEditorFlags(workspace);
INTERNAL_BLOCKSUITE_HASH_MAP.set(id, workspace);

View File

@ -892,6 +892,7 @@ __metadata:
async-call-rpc: "npm:^6.3.1"
fake-indexeddb: "npm:^5.0.0"
idb: "npm:^7.1.1"
idb-keyval: "npm:^6.2.1"
is-svg: "npm:^5.0.0"
jotai: "npm:^2.5.1"
js-base64: "npm:^3.7.5"