From 9096ac29601b8079ba258a908cfef729fa660ca1 Mon Sep 17 00:00:00 2001 From: Himself65 Date: Wed, 3 May 2023 18:16:22 -0500 Subject: [PATCH] refactor: workspace provider (#2218) --- apps/web/src/atoms/__tests__/atom.spec.ts | 71 ++++++++ apps/web/src/atoms/root.ts | 51 +++++- .../src/blocksuite/__tests__/index.spec.ts | 30 ---- apps/web/src/blocksuite/index.ts | 10 +- .../src/blocksuite/providers/affine/index.ts | 8 + apps/web/src/blocksuite/providers/index.ts | 48 ------ .../use-router-with-workspace-id-defense.ts | 1 + ...e-sync-router-with-current-workspace-id.ts | 6 +- apps/web/src/layouts/workspace-layout.tsx | 109 +++++------- apps/web/src/plugins/local/index.tsx | 7 +- packages/workspace/src/atom.ts | 2 +- .../src/providers/broad-cast-channel/index.ts | 13 +- packages/workspace/src/providers/index.ts | 158 ++++++++++-------- packages/workspace/src/type.ts | 62 +++++-- packages/workspace/src/utils.ts | 35 ++++ packages/y-indexeddb/README.md | 12 +- .../y-indexeddb/src/__tests__/index.spec.ts | 5 +- packages/y-indexeddb/src/utils.ts | 4 +- 18 files changed, 377 insertions(+), 255 deletions(-) create mode 100644 apps/web/src/atoms/__tests__/atom.spec.ts delete mode 100644 apps/web/src/blocksuite/__tests__/index.spec.ts delete mode 100644 apps/web/src/blocksuite/providers/index.ts diff --git a/apps/web/src/atoms/__tests__/atom.spec.ts b/apps/web/src/atoms/__tests__/atom.spec.ts new file mode 100644 index 0000000000..c7f24f1112 --- /dev/null +++ b/apps/web/src/atoms/__tests__/atom.spec.ts @@ -0,0 +1,71 @@ +/** + * @vitest-environment happy-dom + */ +import 'fake-indexeddb/auto'; + +import { initPage } from '@affine/env/blocksuite'; +import { + rootCurrentWorkspaceIdAtom, + rootWorkspacesMetadataAtom, +} from '@affine/workspace/atom'; +import { createIndexedDBDownloadProvider } from '@affine/workspace/providers'; +import { WorkspaceFlavour } from '@affine/workspace/type'; +import { + _cleanupBlockSuiteWorkspaceCache, + createEmptyBlockSuiteWorkspace, +} from '@affine/workspace/utils'; +import type { ParagraphBlockModel } from '@blocksuite/blocks/models'; +import type { Page } from '@blocksuite/store'; +import { createStore } from 'jotai'; +import { describe, expect, test } from 'vitest'; + +import { WorkspacePlugins } from '../../plugins'; +import { rootCurrentWorkspaceAtom } from '../root'; + +describe('currentWorkspace atom', () => { + test('should be defined', async () => { + const store = createStore(); + let id: string; + { + const workspace = createEmptyBlockSuiteWorkspace( + 'test', + WorkspaceFlavour.LOCAL + ); + const page = workspace.createPage('page0'); + initPage(page); + const frameId = page.getBlockByFlavour('affine:frame').at(0) + ?.id as string; + id = page.addBlock( + 'affine:paragraph', + { + text: new page.Text('test 1'), + }, + frameId + ); + const provider = createIndexedDBDownloadProvider(workspace); + provider.sync(); + await provider.whenReady; + const workspaceId = await WorkspacePlugins[ + WorkspaceFlavour.LOCAL + ].CRUD.create(workspace); + store.set(rootWorkspacesMetadataAtom, [ + { + id: workspaceId, + flavour: WorkspaceFlavour.LOCAL, + }, + ]); + _cleanupBlockSuiteWorkspaceCache(); + } + store.set( + rootCurrentWorkspaceIdAtom, + store.get(rootWorkspacesMetadataAtom)[0].id + ); + const workspace = await store.get(rootCurrentWorkspaceAtom); + expect(workspace).toBeDefined(); + const page = workspace.blockSuiteWorkspace.getPage('page0') as Page; + expect(page).not.toBeNull(); + const paragraphBlock = page.getBlockById(id) as ParagraphBlockModel; + expect(paragraphBlock).not.toBeNull(); + expect(paragraphBlock.text.toString()).toBe('test 1'); + }); +}); diff --git a/apps/web/src/atoms/root.ts b/apps/web/src/atoms/root.ts index 5123975cd1..e14f8ed864 100644 --- a/apps/web/src/atoms/root.ts +++ b/apps/web/src/atoms/root.ts @@ -5,6 +5,10 @@ import { rootCurrentWorkspaceIdAtom, rootWorkspacesMetadataAtom, } from '@affine/workspace/atom'; +import type { + NecessaryProvider, + WorkspaceRegistry, +} from '@affine/workspace/type'; import { WorkspaceFlavour } from '@affine/workspace/type'; import { assertExists } from '@blocksuite/store'; import { atom } from 'jotai'; @@ -37,18 +41,38 @@ export const workspacesAtom = atom>(async get => { WorkspacePlugins[workspace.flavour as keyof typeof WorkspacePlugins]; assertExists(plugin); const { CRUD } = plugin; - return CRUD.get(workspace.id); + return CRUD.get(workspace.id).then(workspace => { + if (workspace === null) { + console.warn( + 'workspace is null. this should not happen. If you see this error, please report it to the developer.' + ); + } + return workspace; + }); }) + ).then(workspaces => + workspaces.filter( + (workspace): workspace is WorkspaceRegistry['affine' | 'local'] => + workspace !== null + ) ); - logger.info('workspaces', workspaces); - workspaces.forEach(workspace => { - if (workspace === null) { - console.warn( - 'workspace is null. this should not happen. If you see this error, please report it to the developer.' - ); + const workspaceProviders = workspaces.map(workspace => + workspace.providers.filter( + (provider): provider is NecessaryProvider => + 'necessary' in provider && provider.necessary + ) + ); + const promises: Promise[] = []; + for (const providers of workspaceProviders) { + for (const provider of providers) { + provider.sync(); + promises.push(provider.whenReady); } - }); - return workspaces.filter(workspace => workspace !== null) as AllWorkspace[]; + } + // we will wait for all the necessary providers to be ready + await Promise.all(promises); + logger.info('workspaces', workspaces); + return workspaces; }); /** @@ -77,6 +101,15 @@ export const rootCurrentWorkspaceAtom = atom>( `cannot find the workspace with id ${targetId} in the plugin ${targetWorkspace.flavour}.` ); } + const providers = workspace.providers.filter( + (provider): provider is NecessaryProvider => + 'necessary' in provider && provider.necessary === true + ); + for (const provider of providers) { + provider.sync(); + // we will wait for the necessary providers to be ready + await provider.whenReady; + } return workspace; } ); diff --git a/apps/web/src/blocksuite/__tests__/index.spec.ts b/apps/web/src/blocksuite/__tests__/index.spec.ts deleted file mode 100644 index ca32cbcd86..0000000000 --- a/apps/web/src/blocksuite/__tests__/index.spec.ts +++ /dev/null @@ -1,30 +0,0 @@ -/** - * @vitest-environment happy-dom - */ -import 'fake-indexeddb/auto'; - -import { beforeEach, describe, expect, test } from 'vitest'; - -import { BlockSuiteWorkspace } from '../../shared'; -import { createAffineProviders, createLocalProviders } from '..'; - -let blockSuiteWorkspace: BlockSuiteWorkspace; - -beforeEach(() => { - blockSuiteWorkspace = new BlockSuiteWorkspace({ id: 'test' }); -}); - -describe('blocksuite providers', () => { - test('should be valid provider', () => { - [createLocalProviders, createAffineProviders].forEach(createProviders => { - createProviders(blockSuiteWorkspace).forEach(provider => { - expect(provider).toBeTypeOf('object'); - expect(provider).toHaveProperty('flavour'); - expect(provider).toHaveProperty('connect'); - expect(provider.connect).toBeTypeOf('function'); - expect(provider).toHaveProperty('disconnect'); - expect(provider.disconnect).toBeTypeOf('function'); - }); - }); - }); -}); diff --git a/apps/web/src/blocksuite/index.ts b/apps/web/src/blocksuite/index.ts index 5e4e33a890..eae4335c64 100644 --- a/apps/web/src/blocksuite/index.ts +++ b/apps/web/src/blocksuite/index.ts @@ -1,13 +1,15 @@ import { config } from '@affine/env'; import { - createIndexedDBProvider, + createIndexedDBDownloadProvider, createLocalProviders, } from '@affine/workspace/providers'; -import { createBroadCastChannelProvider } from '@affine/workspace/providers'; +import { + createAffineWebSocketProvider, + createBroadCastChannelProvider, +} from '@affine/workspace/providers'; import type { Provider } from '@affine/workspace/type'; import type { BlockSuiteWorkspace } from '../shared'; -import { createAffineWebSocketProvider } from './providers'; import { createAffineDownloadProvider } from './providers/affine'; export const createAffineProviders = ( @@ -19,7 +21,7 @@ export const createAffineProviders = ( createAffineWebSocketProvider(blockSuiteWorkspace), config.enableBroadCastChannelProvider && createBroadCastChannelProvider(blockSuiteWorkspace), - createIndexedDBProvider(blockSuiteWorkspace), + createIndexedDBDownloadProvider(blockSuiteWorkspace), ] as any[] ).filter(v => Boolean(v)); }; diff --git a/apps/web/src/blocksuite/providers/affine/index.ts b/apps/web/src/blocksuite/providers/affine/index.ts index f536248d1a..7c87ef6164 100644 --- a/apps/web/src/blocksuite/providers/affine/index.ts +++ b/apps/web/src/blocksuite/providers/affine/index.ts @@ -12,9 +12,15 @@ export const createAffineDownloadProvider = ( ): AffineDownloadProvider => { assertExists(blockSuiteWorkspace.id); const id = blockSuiteWorkspace.id; + let connected = false; + const callbacks = new Set<() => void>(); return { flavour: 'affine-download', background: true, + get connected() { + return connected; + }, + callbacks, connect: () => { providerLogger.info('connect download provider', id); if (hashMap.has(id)) { @@ -23,6 +29,7 @@ export const createAffineDownloadProvider = ( blockSuiteWorkspace.doc, new Uint8Array(hashMap.get(id) as ArrayBuffer) ); + connected = true; return; } affineApis @@ -41,6 +48,7 @@ export const createAffineDownloadProvider = ( }, disconnect: () => { providerLogger.info('disconnect download provider', id); + connected = false; }, cleanup: () => { hashMap.delete(id); diff --git a/apps/web/src/blocksuite/providers/index.ts b/apps/web/src/blocksuite/providers/index.ts deleted file mode 100644 index 1ce6628b02..0000000000 --- a/apps/web/src/blocksuite/providers/index.ts +++ /dev/null @@ -1,48 +0,0 @@ -import { websocketPrefixUrl } from '@affine/env'; -import { KeckProvider } from '@affine/workspace/affine/keck'; -import { getLoginStorage } from '@affine/workspace/affine/login'; -import type { AffineWebSocketProvider } from '@affine/workspace/type'; -import { assertExists } from '@blocksuite/store'; - -import type { BlockSuiteWorkspace } from '../../shared'; -import { providerLogger } from '../logger'; - -const createAffineWebSocketProvider = ( - blockSuiteWorkspace: BlockSuiteWorkspace -): AffineWebSocketProvider => { - let webSocketProvider: KeckProvider | null = null; - return { - flavour: 'affine-websocket', - background: false, - cleanup: () => { - assertExists(webSocketProvider); - webSocketProvider.destroy(); - webSocketProvider = null; - }, - connect: () => { - webSocketProvider = new KeckProvider( - websocketPrefixUrl + '/api/sync/', - blockSuiteWorkspace.id, - blockSuiteWorkspace.doc, - { - params: { token: getLoginStorage()?.token ?? '' }, - awareness: blockSuiteWorkspace.awarenessStore.awareness, - // we maintain broadcast channel by ourselves - // @ts-expect-error - disableBc: true, - connect: false, - } - ); - providerLogger.info('connect', webSocketProvider.url); - webSocketProvider.connect(); - }, - disconnect: () => { - assertExists(webSocketProvider); - providerLogger.info('disconnect', webSocketProvider.url); - webSocketProvider.destroy(); - webSocketProvider = null; - }, - }; -}; - -export { createAffineWebSocketProvider }; diff --git a/apps/web/src/hooks/use-router-with-workspace-id-defense.ts b/apps/web/src/hooks/use-router-with-workspace-id-defense.ts index d66ea3b324..caee3a78bf 100644 --- a/apps/web/src/hooks/use-router-with-workspace-id-defense.ts +++ b/apps/web/src/hooks/use-router-with-workspace-id-defense.ts @@ -22,6 +22,7 @@ export function useRouterWithWorkspaceIdDefense(router: NextRouter) { } const exist = metadata.find(m => m.id === currentWorkspaceId); if (!exist) { + console.warn('workspace not exist, redirect to first one'); // clean up setCurrentWorkspaceId(null); setCurrentPageId(null); diff --git a/apps/web/src/hooks/use-sync-router-with-current-workspace-id.ts b/apps/web/src/hooks/use-sync-router-with-current-workspace-id.ts index bb3acf8884..c71f9077f0 100644 --- a/apps/web/src/hooks/use-sync-router-with-current-workspace-id.ts +++ b/apps/web/src/hooks/use-sync-router-with-current-workspace-id.ts @@ -45,8 +45,9 @@ export function useSyncRouterWithCurrentWorkspaceId(router: NextRouter) { window.apis?.onWorkspaceChange(targetWorkspace.id); } void router.push({ - pathname: '/workspace/[workspaceId]/all', + pathname: router.pathname, query: { + ...router.query, workspaceId: targetWorkspace.id, }, }); @@ -56,8 +57,9 @@ export function useSyncRouterWithCurrentWorkspaceId(router: NextRouter) { console.log('set workspace id', workspaceId); setCurrentWorkspaceId(targetWorkspace.id); void router.push({ - pathname: '/workspace/[workspaceId]/all', + pathname: router.pathname, query: { + ...router.query, workspaceId: targetWorkspace.id, }, }); diff --git a/apps/web/src/layouts/workspace-layout.tsx b/apps/web/src/layouts/workspace-layout.tsx index 0c4e8c37d6..0ac26545d8 100644 --- a/apps/web/src/layouts/workspace-layout.tsx +++ b/apps/web/src/layouts/workspace-layout.tsx @@ -1,6 +1,6 @@ import { DebugLogger } from '@affine/debug'; import { DEFAULT_HELLO_WORLD_PAGE_ID } from '@affine/env'; -import { ensureRootPinboard, initPage } from '@affine/env/blocksuite'; +import { initPage } from '@affine/env/blocksuite'; import { setUpLanguage, useTranslation } from '@affine/i18n'; import { createAffineGlobalChannel } from '@affine/workspace/affine/sync'; import { @@ -9,7 +9,7 @@ import { rootStore, rootWorkspacesMetadataAtom, } from '@affine/workspace/atom'; -import type { LocalIndexedDBProvider } from '@affine/workspace/type'; +import type { BackgroundProvider } from '@affine/workspace/type'; import { WorkspaceFlavour } from '@affine/workspace/type'; import { assertEquals, assertExists, nanoid } from '@blocksuite/store'; import { useBlockSuiteWorkspaceHelper } from '@toeverything/hooks/use-block-suite-workspace-helper'; @@ -17,14 +17,7 @@ import { useAtom, useAtomValue, useSetAtom } from 'jotai'; import Head from 'next/head'; import { useRouter } from 'next/router'; import type { FC, PropsWithChildren, ReactElement } from 'react'; -import { - lazy, - Suspense, - useCallback, - useEffect, - useMemo, - useState, -} from 'react'; +import { lazy, Suspense, useCallback, useEffect, useMemo } from 'react'; import { openQuickSearchModalAtom, openWorkspacesModalAtom } from '../atoms'; import { @@ -127,7 +120,10 @@ export const AllWorkspaceContext = ({ // ignore current workspace .filter(workspace => workspace.id !== currentWorkspaceId) .flatMap(workspace => - workspace.providers.filter(provider => provider.background) + workspace.providers.filter( + (provider): provider is BackgroundProvider => + 'background' in provider && provider.background + ) ); providers.forEach(provider => { provider.connect(); @@ -260,69 +256,48 @@ export const WorkspaceLayoutInner: FC = ({ children }) => { const currentPageId = useAtomValue(rootCurrentPageIdAtom); const router = useRouter(); const { jumpToPage } = useRouterHelper(router); - const [isLoading, setIsLoading] = useState(true); const { t } = useTranslation(); useEffect(() => { logger.info('currentWorkspace: ', currentWorkspace); + globalThis.currentWorkspace = currentWorkspace; }, [currentWorkspace]); - useEffect(() => { - if (currentWorkspace) { - globalThis.currentWorkspace = currentWorkspace; + //#region init workspace + if (currentWorkspace.blockSuiteWorkspace.isEmpty) { + // this is a new workspace, so we should redirect to the new page + const pageId = nanoid(); + const page = currentWorkspace.blockSuiteWorkspace.createPage(pageId); + assertEquals(page.id, pageId); + currentWorkspace.blockSuiteWorkspace.setPageMeta(page.id, { + init: true, + }); + initPage(page); + if (!router.query.pageId) { + setCurrentPageId(pageId); + void jumpToPage(currentWorkspace.id, pageId); } - }, [currentWorkspace]); + } + + // fixme: pinboard has been removed, + // the related code should be removed in the future. + // no matter the workspace is empty, ensure the root pinboard exists + // ensureRootPinboard(currentWorkspace.blockSuiteWorkspace); + //#endregion useEffect(() => { - if (currentWorkspace) { - currentWorkspace.providers.forEach(provider => { - provider.connect(); - }); - return () => { - currentWorkspace.providers.forEach(provider => { - provider.disconnect(); - }); - }; - } - }, [currentWorkspace]); - - useEffect(() => { - if (!router.isReady) { - return; - } - if (!currentWorkspace) { - return; - } - const localProvider = currentWorkspace.providers.find( - provider => provider.flavour === 'local-indexeddb' + const backgroundProviders = currentWorkspace.providers.filter( + (provider): provider is BackgroundProvider => 'background' in provider ); - if (localProvider && localProvider.flavour === 'local-indexeddb') { - const provider = localProvider as LocalIndexedDBProvider; - const callback = () => { - setIsLoading(false); - if (currentWorkspace.blockSuiteWorkspace.isEmpty) { - // this is a new workspace, so we should redirect to the new page - const pageId = nanoid(); - const page = currentWorkspace.blockSuiteWorkspace.createPage(pageId); - assertEquals(page.id, pageId); - currentWorkspace.blockSuiteWorkspace.setPageMeta(page.id, { - init: true, - }); - initPage(page); - if (!router.query.pageId) { - setCurrentPageId(pageId); - void jumpToPage(currentWorkspace.id, pageId); - } - } - // no matter the workspace is empty, ensure the root pinboard exists - ensureRootPinboard(currentWorkspace.blockSuiteWorkspace); - }; - provider.callbacks.add(callback); - return () => { - provider.callbacks.delete(callback); - }; - } - }, [currentWorkspace, jumpToPage, router, setCurrentPageId]); + backgroundProviders.forEach(provider => { + provider.connect(); + }); + return () => { + backgroundProviders.forEach(provider => { + provider.disconnect(); + }); + }; + }, [currentWorkspace]); useEffect(() => { if (!currentWorkspace) { @@ -395,11 +370,7 @@ export const WorkspaceLayoutInner: FC = ({ children }) => { }> - {isLoading ? ( - - ) : ( - children - )} + {children} {/* fixme(himself65): remove this */} diff --git a/apps/web/src/plugins/local/index.tsx b/apps/web/src/plugins/local/index.tsx index bcfc30623e..f41e2d5187 100644 --- a/apps/web/src/plugins/local/index.tsx +++ b/apps/web/src/plugins/local/index.tsx @@ -3,12 +3,12 @@ import { DEFAULT_HELLO_WORLD_PAGE_ID, DEFAULT_WORKSPACE_NAME, } from '@affine/env'; -import { ensureRootPinboard, initPage } from '@affine/env/blocksuite'; +import { initPage } from '@affine/env/blocksuite'; import { CRUD, saveWorkspaceToLocalStorage, } from '@affine/workspace/local/crud'; -import { createIndexedDBProvider } from '@affine/workspace/providers'; +import { createIndexedDBBackgroundProvider } from '@affine/workspace/providers'; import { LoadPriority, WorkspaceFlavour } from '@affine/workspace/type'; import { createEmptyBlockSuiteWorkspace } from '@affine/workspace/utils'; import { nanoid } from '@blocksuite/store'; @@ -40,12 +40,11 @@ export const LocalPlugin: WorkspacePlugin = { blockSuiteWorkspace.setPageMeta(page.id, { jumpOnce: true, }); - const provider = createIndexedDBProvider(blockSuiteWorkspace); + const provider = createIndexedDBBackgroundProvider(blockSuiteWorkspace); provider.connect(); provider.callbacks.add(() => { provider.disconnect(); }); - ensureRootPinboard(blockSuiteWorkspace); saveWorkspaceToLocalStorage(blockSuiteWorkspace.id); logger.debug('create first workspace'); return [blockSuiteWorkspace.id]; diff --git a/packages/workspace/src/atom.ts b/packages/workspace/src/atom.ts index ca4acf818c..1e6c201a10 100644 --- a/packages/workspace/src/atom.ts +++ b/packages/workspace/src/atom.ts @@ -17,7 +17,7 @@ export type RootWorkspaceMetadata = { /** * root workspaces atom * this atom stores the metadata of all workspaces, - * which is `id` and `flavour`, that is enough to load the real workspace data + * which is `id` and `flavor`, that is enough to load the real workspace data */ export const rootWorkspacesMetadataAtom = atomWithSyncStorage< RootWorkspaceMetadata[] diff --git a/packages/workspace/src/providers/broad-cast-channel/index.ts b/packages/workspace/src/providers/broad-cast-channel/index.ts index 69d31c0c24..5ec51bd8e9 100644 --- a/packages/workspace/src/providers/broad-cast-channel/index.ts +++ b/packages/workspace/src/providers/broad-cast-channel/index.ts @@ -1,3 +1,4 @@ +import { CallbackSet } from '@affine/workspace/utils'; import { Workspace as BlockSuiteWorkspace } from '@blocksuite/store'; import { assertExists } from '@blocksuite/store'; import type { Awareness } from 'y-protocols/awareness'; @@ -23,6 +24,7 @@ export const createBroadCastChannelProvider = ( const awareness = blockSuiteWorkspace.awarenessStore .awareness as unknown as Awareness; let broadcastChannel: TypedBroadcastChannel | null = null; + const callbacks = new CallbackSet(); const handleBroadcastChannelMessage = ( event: BroadcastChannelMessageEvent ) => { @@ -56,6 +58,9 @@ export const createBroadCastChannelProvider = ( break; } } + if (callbacks.ready) { + callbacks.forEach(cb => cb()); + } }; const handleDocUpdate = (updateV1: Uint8Array, origin: any) => { if (origin === broadcastChannel) { @@ -77,7 +82,11 @@ export const createBroadCastChannelProvider = ( }; return { flavour: 'broadcast-channel', - background: false, + background: true, + get connected() { + return callbacks.ready; + }, + callbacks, connect: () => { assertExists(blockSuiteWorkspace.id); broadcastChannel = Object.assign( @@ -101,6 +110,7 @@ export const createBroadCastChannelProvider = ( broadcastChannel.postMessage(['awareness:update', awarenessUpdate]); doc.on('update', handleDocUpdate); awareness.on('update', handleAwarenessUpdate); + callbacks.ready = true; }, disconnect: () => { assertExists(broadcastChannel); @@ -111,6 +121,7 @@ export const createBroadCastChannelProvider = ( doc.off('update', handleDocUpdate); awareness.off('update', handleAwarenessUpdate); broadcastChannel.close(); + callbacks.ready = false; }, cleanup: () => { assertExists(broadcastChannel); diff --git a/packages/workspace/src/providers/index.ts b/packages/workspace/src/providers/index.ts index f45ea49c0d..5395dc4673 100644 --- a/packages/workspace/src/providers/index.ts +++ b/packages/workspace/src/providers/index.ts @@ -4,16 +4,22 @@ import { getLoginStorage, storageChangeSlot, } from '@affine/workspace/affine/login'; -import type { Provider, SQLiteProvider } from '@affine/workspace/type'; import type { AffineWebSocketProvider, - LocalIndexedDBProvider, + LocalIndexedDBBackgroundProvider, + LocalIndexedDBDownloadProvider, + Provider, + SQLiteProvider, } from '@affine/workspace/type'; +import { CallbackSet } from '@affine/workspace/utils'; import type { BlobManager, Disposable } from '@blocksuite/store'; -import { Workspace as BlockSuiteWorkspace } from '@blocksuite/store'; -import { assertExists } from '@blocksuite/store'; +import { + assertExists, + Workspace as BlockSuiteWorkspace, +} from '@blocksuite/store'; import { createIndexedDBProvider as create, + downloadBinary, EarlyDisconnectError, } from '@toeverything/y-indexeddb'; @@ -27,9 +33,15 @@ const createAffineWebSocketProvider = ( ): AffineWebSocketProvider => { let webSocketProvider: KeckProvider | null = null; let dispose: Disposable | undefined = undefined; + const callbacks = new CallbackSet(); + const cb = () => callbacks.forEach(cb => cb()); const apis: AffineWebSocketProvider = { flavour: 'affine-websocket', - background: false, + background: true, + get connected() { + return callbacks.ready; + }, + callbacks, cleanup: () => { assertExists(webSocketProvider); webSocketProvider.destroy(); @@ -48,20 +60,19 @@ const createAffineWebSocketProvider = ( { params: { token: getLoginStorage()?.token ?? '' }, awareness: blockSuiteWorkspace.awarenessStore.awareness, - // we maintain broadcast channel by ourselves - // @ts-expect-error - disableBc: true, + // we maintain a broadcast channel by ourselves connect: false, } ); logger.info('connect', webSocketProvider.url); + webSocketProvider.on('synced', cb); webSocketProvider.connect(); }, disconnect: () => { assertExists(webSocketProvider); logger.info('disconnect', webSocketProvider.url); - webSocketProvider.destroy(); - webSocketProvider = null; + webSocketProvider.disconnect(); + webSocketProvider.off('synced', cb); dispose?.dispose(); }, }; @@ -69,52 +80,21 @@ const createAffineWebSocketProvider = ( return apis; }; -class CallbackSet extends Set<() => void> { - #ready = false; - - get ready(): boolean { - return this.#ready; - } - - set ready(v: boolean) { - this.#ready = v; - } - - add(cb: () => void) { - if (this.ready) { - cb(); - return this; - } - if (this.has(cb)) { - return this; - } - return super.add(cb); - } - - delete(cb: () => void) { - if (this.has(cb)) { - return super.delete(cb); - } - return false; - } -} - -const createIndexedDBProvider = ( +const createIndexedDBBackgroundProvider = ( blockSuiteWorkspace: BlockSuiteWorkspace -): LocalIndexedDBProvider => { +): LocalIndexedDBBackgroundProvider => { const indexeddbProvider = create( blockSuiteWorkspace.id, blockSuiteWorkspace.doc ); const callbacks = new CallbackSet(); return { - flavour: 'local-indexeddb', - // fixme: remove callbacks - callbacks, - // fixme: remove whenSynced - whenSynced: indexeddbProvider.whenSynced, - // fixme: remove background long polling + flavour: 'local-indexeddb-background', background: true, + get connected() { + return callbacks.ready; + }, + callbacks, cleanup: () => { // todo: cleanup data }, @@ -127,6 +107,7 @@ const createIndexedDBProvider = ( callbacks.forEach(cb => cb()); }) .catch(error => { + callbacks.ready = false; if (error instanceof EarlyDisconnectError) { return; } else { @@ -143,6 +124,40 @@ const createIndexedDBProvider = ( }; }; +const createIndexedDBDownloadProvider = ( + blockSuiteWorkspace: BlockSuiteWorkspace +): LocalIndexedDBDownloadProvider => { + let _resolve: () => void; + let _reject: (error: unknown) => void; + const promise = new Promise((resolve, reject) => { + _resolve = resolve; + _reject = reject; + }); + return { + flavour: 'local-indexeddb', + necessary: true, + get whenReady() { + return promise; + }, + cleanup: () => { + // todo: cleanup data + }, + sync: () => { + logger.info('connect indexeddb provider', blockSuiteWorkspace.id); + downloadBinary(blockSuiteWorkspace.id) + .then(binary => { + if (binary !== false) { + Y.applyUpdate(blockSuiteWorkspace.doc, binary); + } + _resolve(); + }) + .catch(error => { + _reject(error); + }); + }, + }; +}; + const createSQLiteProvider = ( blockSuiteWorkspace: BlockSuiteWorkspace ): SQLiteProvider => { @@ -166,18 +181,20 @@ const createSQLiteProvider = ( const keysToPersist = allKeys.filter(k => !persistedKeys.includes(k)); logger.info('persisting blobs', keysToPersist, 'to sqlite'); - keysToPersist.forEach(async k => { - const blob = await bs.get(k); - if (!blob) { - logger.warn('blob not found for', k); - return; - } - window.apis.db.addBlob( - blockSuiteWorkspace.id, - k, - new Uint8Array(await blob.arrayBuffer()) - ); - }); + return Promise.all( + keysToPersist.map(async k => { + const blob = await bs.get(k); + if (!blob) { + logger.warn('blob not found for', k); + return; + } + return window.apis.db.addBlob( + blockSuiteWorkspace.id, + k, + new Uint8Array(await blob.arrayBuffer()) + ); + }) + ); } async function syncUpdates() { @@ -202,16 +219,23 @@ const createSQLiteProvider = ( } let unsubscribe = () => {}; + let connected = false; + const callbacks = new CallbackSet(); - const provider = { + return { flavour: 'sqlite', background: true, + callbacks, + get connected(): boolean { + return connected; + }, cleanup: () => { throw new Error('Method not implemented.'); }, connect: async () => { logger.info('connecting sqlite provider', blockSuiteWorkspace.id); await syncUpdates(); + connected = true; blockSuiteWorkspace.doc.on('update', handleUpdate); @@ -223,6 +247,7 @@ const createSQLiteProvider = ( if (timer) { clearTimeout(timer); } + // @ts-expect-error ignore the type timer = setTimeout(() => { syncUpdates(); @@ -237,16 +262,16 @@ const createSQLiteProvider = ( disconnect: () => { unsubscribe(); blockSuiteWorkspace.doc.off('update', handleUpdate); + connected = false; }, - } satisfies SQLiteProvider; - - return provider; + }; }; export { createAffineWebSocketProvider, createBroadCastChannelProvider, - createIndexedDBProvider, + createIndexedDBBackgroundProvider, + createIndexedDBDownloadProvider, createSQLiteProvider, }; @@ -257,7 +282,8 @@ export const createLocalProviders = ( [ config.enableBroadCastChannelProvider && createBroadCastChannelProvider(blockSuiteWorkspace), - createIndexedDBProvider(blockSuiteWorkspace), + createIndexedDBBackgroundProvider(blockSuiteWorkspace), + createIndexedDBDownloadProvider(blockSuiteWorkspace), environment.isDesktop && createSQLiteProvider(blockSuiteWorkspace), ] as any[] ).filter(v => Boolean(v)); diff --git a/packages/workspace/src/type.ts b/packages/workspace/src/type.ts index 56efa845f5..ab1176b4b5 100644 --- a/packages/workspace/src/type.ts +++ b/packages/workspace/src/type.ts @@ -9,44 +9,76 @@ export type JotaiStore = ReturnType; export type BaseProvider = { flavour: string; - // if this is true, we will connect the provider on the background - background: boolean; - connect: () => void; - disconnect: () => void; + // cleanup data when workspace is removed cleanup: () => void; }; +/** + * @description + * If a provider is marked as a background provider, + * we will connect it in the `useEffect` in React.js. + * + * This means that the data might be stale when you use it. + */ export interface BackgroundProvider extends BaseProvider { + // if this is true, + // we will connect the provider on the background background: true; + get connected(): boolean; + connect(): void; + disconnect(): void; callbacks: Set<() => void>; } -export interface AffineDownloadProvider extends BaseProvider { +/** + * @description + * If a provider is marked as a necessary provider, + * we will connect it once you read the workspace. + * + * This means that the data will be fresh when you use it. + * + * Currently, there is only on necessary provider: `local-indexeddb`. + */ +export interface NecessaryProvider extends Omit { + // if this is true, + // we will ensure that the provider is connected before you can use it + necessary: true; + sync(): void; + get whenReady(): Promise; +} + +export interface AffineDownloadProvider extends BackgroundProvider { flavour: 'affine-download'; } -export interface BroadCastChannelProvider extends BaseProvider { +/** + * Download the first binary from local indexeddb + */ +export interface BroadCastChannelProvider extends BackgroundProvider { flavour: 'broadcast-channel'; } -export interface LocalIndexedDBProvider extends BackgroundProvider { - flavour: 'local-indexeddb'; - whenSynced: Promise; +/** + * Long polling provider with local indexeddb + */ +export interface LocalIndexedDBBackgroundProvider extends BackgroundProvider { + flavour: 'local-indexeddb-background'; } -export interface SQLiteProvider extends BaseProvider { +export interface SQLiteProvider extends BackgroundProvider { flavour: 'sqlite'; } -export interface AffineWebSocketProvider extends BaseProvider { +export interface LocalIndexedDBDownloadProvider extends NecessaryProvider { + flavour: 'local-indexeddb'; +} + +export interface AffineWebSocketProvider extends BackgroundProvider { flavour: 'affine-websocket'; } -export type Provider = - | LocalIndexedDBProvider - | AffineWebSocketProvider - | BroadCastChannelProvider; +export type Provider = BackgroundProvider | NecessaryProvider; export interface AffineWorkspace extends RemoteWorkspace { flavour: WorkspaceFlavour.AFFINE; diff --git a/packages/workspace/src/utils.ts b/packages/workspace/src/utils.ts index fdbc0c4333..b653744823 100644 --- a/packages/workspace/src/utils.ts +++ b/packages/workspace/src/utils.ts @@ -16,6 +16,11 @@ export function cleanupWorkspace(flavour: WorkspaceFlavour) { const hashMap = new Map(); +/** + * @internal test only + */ +export const _cleanupBlockSuiteWorkspaceCache = () => hashMap.clear(); + export function createEmptyBlockSuiteWorkspace( id: string, flavour: WorkspaceFlavour.AFFINE, @@ -83,3 +88,33 @@ export function createEmptyBlockSuiteWorkspace( hashMap.set(cacheKey, workspace); return workspace; } + +export class CallbackSet extends Set<() => void> { + #ready = false; + + get ready(): boolean { + return this.#ready; + } + + set ready(v: boolean) { + this.#ready = v; + } + + add(cb: () => void) { + if (this.ready) { + cb(); + return this; + } + if (this.has(cb)) { + return this; + } + return super.add(cb); + } + + delete(cb: () => void) { + if (this.has(cb)) { + return super.delete(cb); + } + return false; + } +} diff --git a/packages/y-indexeddb/README.md b/packages/y-indexeddb/README.md index f8350381cb..312172b818 100644 --- a/packages/y-indexeddb/README.md +++ b/packages/y-indexeddb/README.md @@ -1,18 +1,24 @@ # @toeverything/y-indexeddb -> This package haven't been published yet. - ## Usage ```ts -import { createIndexedDBProvider } from '@toeverything/y-indexeddb'; +import { createIndexedDBProvider, downloadBinary } from '@toeverything/y-indexeddb'; import * as Y from 'yjs'; const yDoc = new Y.Doc(); +// sync yDoc with indexedDB const provider = createIndexedDBProvider('docName', yDoc); provider.connect(); await provider.whenSynced.then(() => { console.log('synced'); provider.disconnect(); }); + +// dowload binary data from indexedDB for once +downloadBinary('docName').then(blob => { + if (blob !== false) { + Y.applyUpdate(yDoc, blob); + } +}); ``` diff --git a/packages/y-indexeddb/src/__tests__/index.spec.ts b/packages/y-indexeddb/src/__tests__/index.spec.ts index 50bccf1031..c99247a393 100644 --- a/packages/y-indexeddb/src/__tests__/index.spec.ts +++ b/packages/y-indexeddb/src/__tests__/index.spec.ts @@ -340,7 +340,10 @@ describe('utils', () => { provider.connect(); await provider.whenSynced; provider.disconnect(); - const update = await downloadBinary(workspace.id, rootDBName); + const update = (await downloadBinary( + workspace.id, + rootDBName + )) as Uint8Array; expect(update).toBeInstanceOf(Uint8Array); const newWorkspace = new Workspace({ id, diff --git a/packages/y-indexeddb/src/utils.ts b/packages/y-indexeddb/src/utils.ts index 8c54aded7c..79300b75cc 100644 --- a/packages/y-indexeddb/src/utils.ts +++ b/packages/y-indexeddb/src/utils.ts @@ -129,7 +129,7 @@ export async function tryMigrate( export async function downloadBinary( id: string, dbName = DEFAULT_DB_NAME -): Promise { +): Promise { const dbPromise = openDB(dbName, dbVersion, { upgrade: upgradeDB, }); @@ -137,7 +137,7 @@ export async function downloadBinary( const t = db.transaction('workspace', 'readonly').objectStore('workspace'); const doc = await t.get(id); if (!doc) { - return new Uint8Array(0); + return false; } else { return mergeUpdates(doc.updates.map(({ update }) => update)); }