diff --git a/packages/common/infra/src/modules/workspace/entities/engine.ts b/packages/common/infra/src/modules/workspace/entities/engine.ts index dbd4b9cb6d..d13ec6356c 100644 --- a/packages/common/infra/src/modules/workspace/entities/engine.ts +++ b/packages/common/infra/src/modules/workspace/entities/engine.ts @@ -3,6 +3,7 @@ import type { Doc as YDoc } from 'yjs'; import { Entity } from '../../../framework'; import { AwarenessEngine, BlobEngine, DocEngine } from '../../../sync'; import { throwIfAborted } from '../../../utils'; +import { WorkspaceEngineBeforeStart } from '../events'; import type { WorkspaceEngineProvider } from '../providers/flavour'; import type { WorkspaceService } from '../services/workspace'; @@ -33,6 +34,7 @@ export class WorkspaceEngine extends Entity<{ } start() { + this.eventBus.emit(WorkspaceEngineBeforeStart, this); this.doc.start(); this.awareness.connect(this.workspaceService.workspace.awareness); this.blob.start(); diff --git a/packages/common/infra/src/modules/workspace/events/index.ts b/packages/common/infra/src/modules/workspace/events/index.ts new file mode 100644 index 0000000000..2cd9c2f230 --- /dev/null +++ b/packages/common/infra/src/modules/workspace/events/index.ts @@ -0,0 +1,6 @@ +import { createEvent } from '../../../framework'; +import type { WorkspaceEngine } from '../entities/engine'; + +export const WorkspaceEngineBeforeStart = createEvent( + 'WorkspaceEngineBeforeStart' +); diff --git a/packages/common/infra/src/modules/workspace/index.ts b/packages/common/infra/src/modules/workspace/index.ts index 6ca53b3396..7487b80271 100644 --- a/packages/common/infra/src/modules/workspace/index.ts +++ b/packages/common/infra/src/modules/workspace/index.ts @@ -1,5 +1,6 @@ export type { WorkspaceProfileInfo } from './entities/profile'; export { Workspace } from './entities/workspace'; +export { WorkspaceEngineBeforeStart } from './events'; export { globalBlockSuiteSchema } from './global-schema'; export type { WorkspaceMetadata } from './metadata'; export type { WorkspaceOpenOptions } from './open-options'; diff --git a/packages/common/infra/src/sync/doc/index.ts b/packages/common/infra/src/sync/doc/index.ts index c75106c6a6..435f76d30a 100644 --- a/packages/common/infra/src/sync/doc/index.ts +++ b/packages/common/infra/src/sync/doc/index.ts @@ -23,6 +23,7 @@ export { } from './storage'; export class DocEngine { + readonly clientId: string; localPart: DocEngineLocalPart; remotePart: DocEngineRemotePart | null; @@ -80,11 +81,11 @@ export class DocEngine { storage: DocStorage, private readonly server?: DocServer | null ) { - const clientId = nanoid(); + this.clientId = nanoid(); this.storage = new DocStorageInner(storage); - this.localPart = new DocEngineLocalPart(clientId, this.storage); + this.localPart = new DocEngineLocalPart(this.clientId, this.storage); this.remotePart = this.server - ? new DocEngineRemotePart(clientId, this.storage, this.server) + ? new DocEngineRemotePart(this.clientId, this.storage, this.server) : null; } diff --git a/packages/common/infra/src/sync/index.ts b/packages/common/infra/src/sync/index.ts index f55463ee6f..9f8f3bb7e1 100644 --- a/packages/common/infra/src/sync/index.ts +++ b/packages/common/infra/src/sync/index.ts @@ -4,3 +4,11 @@ export type { BlobStatus, BlobStorage } from './blob/blob'; export { BlobEngine, EmptyBlobStorage } from './blob/blob'; export { BlobStorageOverCapacity } from './blob/error'; export * from './doc'; +export * from './indexer'; +export { + IndexedDBIndex, + IndexedDBIndexStorage, +} from './indexer/impl/indexeddb'; +export { MemoryIndex, MemoryIndexStorage } from './indexer/impl/memory'; +export * from './job'; +export { IndexedDBJobQueue } from './job/impl/indexeddb'; diff --git a/packages/frontend/core/src/modules/docs-search/README.md b/packages/frontend/core/src/modules/docs-search/README.md new file mode 100644 index 0000000000..9f6a1f955e --- /dev/null +++ b/packages/frontend/core/src/modules/docs-search/README.md @@ -0,0 +1,3 @@ +# docs-search + +This module is responsible for subscribing to updates from the doc engine and writing the doc content into the indexer, providing search and aggregation capabilities. diff --git a/packages/frontend/core/src/modules/docs-search/entities/docs-indexer.ts b/packages/frontend/core/src/modules/docs-search/entities/docs-indexer.ts new file mode 100644 index 0000000000..48e4189d6e --- /dev/null +++ b/packages/frontend/core/src/modules/docs-search/entities/docs-indexer.ts @@ -0,0 +1,456 @@ +import { DebugLogger } from '@affine/debug'; +import type { AffineTextAttributes } from '@blocksuite/blocks'; +import type { DeltaInsert } from '@blocksuite/inline'; +import type { Job, JobQueue, WorkspaceService } from '@toeverything/infra'; +import { + Document, + Entity, + IndexedDBIndexStorage, + IndexedDBJobQueue, + JobRunner, + LiveData, +} from '@toeverything/infra'; +import { difference } from 'lodash-es'; +import { map } from 'rxjs'; +import { Array as YArray, Map as YMap, type Text as YText } from 'yjs'; +import { applyUpdate, Doc as YDoc } from 'yjs'; + +import { + type BlockIndexSchema, + blockIndexSchema, + docIndexSchema, +} from '../schema'; + +const logger = new DebugLogger('crawler'); + +interface IndexerJobPayload { + docId: string; +} + +export class DocsIndexer extends Entity { + private readonly jobQueue: JobQueue = + new IndexedDBJobQueue( + 'jq:' + this.workspaceService.workspace.id + ); + + private readonly runner = new JobRunner( + this.jobQueue, + (jobs, signal) => this.execJob(jobs, signal), + () => new Promise(resolve => requestIdleCallback(() => resolve())) + ); + + private readonly indexStorage = new IndexedDBIndexStorage( + 'idx:' + this.workspaceService.workspace.id + ); + + readonly docIndex = this.indexStorage.getIndex('doc', docIndexSchema); + + readonly blockIndex = this.indexStorage.getIndex('block', blockIndexSchema); + + private readonly workspaceEngine = this.workspaceService.workspace.engine; + + private readonly workspaceId = this.workspaceService.workspace.id; + + readonly status$ = LiveData.from<{ remaining?: number }>( + this.jobQueue.status$.pipe( + map(status => ({ + remaining: status.remaining, + })) + ), + {} + ); + + constructor(private readonly workspaceService: WorkspaceService) { + super(); + } + + setupListener() { + this.workspaceEngine.doc.storage.eventBus.on(event => { + if (event.clientId === this.workspaceEngine.doc.clientId) { + const docId = event.docId; + + this.jobQueue + .enqueue([ + { + batchKey: docId, + payload: { docId }, + }, + ]) + .catch(err => { + console.error('Error enqueueing job', err); + }); + } + }); + } + + async execJob(jobs: Job[], _signal: AbortSignal) { + if (jobs.length === 0) { + return; + } + + // jobs should have the same docId, so we just pick the first one + const docId = jobs[0].payload.docId; + + logger.debug('Start crawling job for docId:', docId); + + if (docId) { + if (docId === this.workspaceId) { + await this.crawlingRootDocData(); + } else { + await this.crawlingDocData(docId); + } + } + } + + startCrawling() { + this.runner.start(); + this.jobQueue + .enqueue([ + { + batchKey: this.workspaceId, + payload: { docId: this.workspaceId }, + }, + ]) + .catch(err => { + console.error('Error enqueueing job', err); + }); + } + + async crawlingDocData(docId: string) { + const rootDocBuffer = + await this.workspaceEngine.doc.storage.loadDocFromLocal(this.workspaceId); + + if (!rootDocBuffer) { + return; + } + + const yRootDoc = new YDoc(); + applyUpdate(yRootDoc, rootDocBuffer); + + const docStoragePossibleIds = Array.from(yRootDoc.getSubdocs()) + .map(doc => doc.guid) + .filter(id => id.endsWith(docId)); + + let docBuffer; + + for (const id of docStoragePossibleIds) { + docBuffer = await this.workspaceEngine.doc.storage.loadDocFromLocal(id); + + if (docBuffer) { + break; + } + } + + if (!docBuffer) { + return; + } + + const ydoc = new YDoc(); + + applyUpdate(ydoc, docBuffer); + + let docExists: boolean | null = null; + + ( + yRootDoc.getMap('meta').get('pages') as YArray> | undefined + )?.forEach(page => { + if (page.get('id') === docId) { + docExists = !(page.get('trash') ?? false); + } + }); + + if (!docExists) { + const indexWriter = await this.docIndex.write(); + indexWriter.delete(docId); + await indexWriter.commit(); + + const blockIndexWriter = await this.blockIndex.write(); + const oldBlocks = await blockIndexWriter.search( + { + type: 'match', + field: 'docId', + match: docId, + }, + { + pagination: { + limit: Number.MAX_SAFE_INTEGER, + }, + } + ); + for (const block of oldBlocks.nodes) { + blockIndexWriter.delete(block.id); + } + await blockIndexWriter.commit(); + } else { + const blocks = ydoc.getMap('blocks'); + + if (blocks.size === 0) { + return; + } + + let docTitle = ''; + + const blockDocuments: Document[] = []; + + for (const block of blocks.values()) { + const flavour = block.get('sys:flavour')?.toString(); + const blockId = block.get('sys:id')?.toString(); + + if (!flavour || !blockId) { + continue; + } + + if (flavour === 'affine:page') { + docTitle = block.get('prop:title').toString(); + blockDocuments.push( + Document.from(`${docId}:${blockId}`, { + docId, + flavour, + blockId, + content: docTitle, + }) + ); + } + + if ( + flavour === 'affine:paragraph' || + flavour === 'affine:list' || + flavour === 'affine:code' + ) { + const text = block.get('prop:text') as YText; + if (!text) { + continue; + } + + const deltas: DeltaInsert[] = text.toDelta(); + const ref = deltas + .map(delta => { + if ( + delta.attributes && + delta.attributes.reference && + delta.attributes.reference.pageId + ) { + return delta.attributes.reference.pageId; + } + return null; + }) + .filter((link): link is string => !!link); + + blockDocuments.push( + Document.from(`${docId}:${blockId}`, { + docId, + flavour, + blockId, + content: text.toString(), + ref, + }) + ); + } + + if ( + flavour === 'affine:embed-linked-doc' || + flavour === 'affine:embed-synced-doc' + ) { + const pageId = block.get('prop:pageId'); + if (typeof pageId === 'string') { + blockDocuments.push( + Document.from(`${docId}:${blockId}`, { + docId, + flavour, + blockId, + ref: pageId, + }) + ); + } + } + + if (flavour === 'affine:attachment' || flavour === 'affine:image') { + const blobId = block.get('prop:sourceId'); + if (typeof blobId === 'string') { + blockDocuments.push( + Document.from(`${docId}:${blockId}`, { + docId, + flavour, + blockId, + blob: [blobId], + }) + ); + } + } + + if (flavour === 'affine:surface') { + const texts = []; + + const elementsObj = block.get('prop:elements'); + if ( + !( + elementsObj instanceof YMap && + elementsObj.get('type') === '$blocksuite:internal:native$' + ) + ) { + continue; + } + const elements = elementsObj.get('value') as YMap; + if (!(elements instanceof YMap)) { + continue; + } + + for (const element of elements.values()) { + if (!(element instanceof YMap)) { + continue; + } + const text = element.get('text') as YText; + if (!text) { + continue; + } + + texts.push(text.toString()); + } + + blockDocuments.push( + Document.from(`${docId}:${blockId}`, { + docId, + flavour, + blockId, + content: texts, + }) + ); + } + + if (flavour === 'affine:database') { + const texts = []; + const columnsObj = block.get('prop:columns'); + if (!(columnsObj instanceof YArray)) { + continue; + } + for (const column of columnsObj) { + if (!(column instanceof YMap)) { + continue; + } + if (typeof column.get('name') === 'string') { + texts.push(column.get('name')); + } + + const data = column.get('data'); + if (!(data instanceof YMap)) { + continue; + } + const options = data.get('options'); + if (!(options instanceof YArray)) { + continue; + } + for (const option of options) { + if (!(option instanceof YMap)) { + continue; + } + const value = option.get('value'); + if (typeof value === 'string') { + texts.push(value); + } + } + } + + blockDocuments.push( + Document.from(`${docId}:${blockId}`, { + docId, + flavour, + blockId, + content: texts, + }) + ); + } + } + + const docIndexWriter = await this.docIndex.write(); + docIndexWriter.put( + Document.from(docId, { + title: docTitle, + }) + ); + await docIndexWriter.commit(); + + const blockIndexWriter = await this.blockIndex.write(); + const oldBlocks = await blockIndexWriter.search( + { + type: 'match', + field: 'docId', + match: docId, + }, + { + pagination: { + limit: Number.MAX_SAFE_INTEGER, + }, + } + ); + for (const block of oldBlocks.nodes) { + blockIndexWriter.delete(block.id); + } + for (const block of blockDocuments) { + blockIndexWriter.insert(block); + } + await blockIndexWriter.commit(); + } + } + + async crawlingRootDocData() { + const buffer = await this.workspaceEngine.doc.storage.loadDocFromLocal( + this.workspaceId + ); + if (!buffer) { + return; + } + + const ydoc = new YDoc(); + + applyUpdate(ydoc, buffer); + + const docs = ydoc.getMap('meta').get('pages') as + | YArray> + | undefined; + + if (!docs) { + return; + } + + const availableDocs = []; + + for (const page of docs) { + const docId = page.get('id'); + + if (typeof docId !== 'string') { + continue; + } + + const inTrash = page.get('trash') ?? false; + + if (!inTrash) { + availableDocs.push(docId); + } + } + + // a hack to get all docs in index + const allIndexedDocs = ( + await this.docIndex.search( + { + type: 'all', + }, + { + pagination: { + limit: Number.MAX_SAFE_INTEGER, + skip: 0, + }, + } + ) + ).nodes.map(n => n.id); + + const needDelete = difference(allIndexedDocs, availableDocs); + const needAdd = difference(availableDocs, allIndexedDocs); + + await this.jobQueue.enqueue( + [...needAdd, ...needDelete].map(docId => ({ + batchKey: docId, + payload: { docId }, + })) + ); + } +} diff --git a/packages/frontend/core/src/modules/docs-search/index.ts b/packages/frontend/core/src/modules/docs-search/index.ts new file mode 100644 index 0000000000..a202b34246 --- /dev/null +++ b/packages/frontend/core/src/modules/docs-search/index.ts @@ -0,0 +1,17 @@ +export { DocsSearchService } from './services/docs-search'; + +import { + type Framework, + WorkspaceScope, + WorkspaceService, +} from '@toeverything/infra'; + +import { DocsIndexer } from './entities/docs-indexer'; +import { DocsSearchService } from './services/docs-search'; + +export function configureDocsSearchModule(framework: Framework) { + framework + .scope(WorkspaceScope) + .service(DocsSearchService) + .entity(DocsIndexer, [WorkspaceService]); +} diff --git a/packages/frontend/core/src/modules/docs-search/schema.ts b/packages/frontend/core/src/modules/docs-search/schema.ts new file mode 100644 index 0000000000..6f9e426deb --- /dev/null +++ b/packages/frontend/core/src/modules/docs-search/schema.ts @@ -0,0 +1,18 @@ +import { defineSchema } from '@toeverything/infra'; + +export const docIndexSchema = defineSchema({ + title: 'FullText', +}); + +export type DocIndexSchema = typeof docIndexSchema; + +export const blockIndexSchema = defineSchema({ + docId: 'String', + blockId: 'String', + content: 'FullText', + flavour: 'String', + ref: 'String', + blob: 'String', +}); + +export type BlockIndexSchema = typeof blockIndexSchema; diff --git a/packages/frontend/core/src/modules/docs-search/services/docs-search.ts b/packages/frontend/core/src/modules/docs-search/services/docs-search.ts new file mode 100644 index 0000000000..6c093e8b17 --- /dev/null +++ b/packages/frontend/core/src/modules/docs-search/services/docs-search.ts @@ -0,0 +1,123 @@ +import { + OnEvent, + Service, + WorkspaceEngineBeforeStart, +} from '@toeverything/infra'; + +import { DocsIndexer } from '../entities/docs-indexer'; + +@OnEvent(WorkspaceEngineBeforeStart, s => s.handleWorkspaceEngineBeforeStart) +export class DocsSearchService extends Service { + private readonly indexer = this.framework.createEntity(DocsIndexer); + + handleWorkspaceEngineBeforeStart() { + this.indexer.setupListener(); + this.indexer.startCrawling(); + } + + async search(query: string): Promise< + { + docId: string; + title: string; + score: number; + blockId?: string; + blockContent?: string; + }[] + > { + const { buckets } = await this.indexer.blockIndex.aggregate( + { + type: 'boolean', + occur: 'must', + queries: [ + { + type: 'match', + field: 'content', + match: query, + }, + { + type: 'boolean', + occur: 'should', + queries: [ + { + type: 'all', + }, + { + type: 'boost', + boost: 100, + query: { + type: 'match', + field: 'flavour', + match: 'affine:page', + }, + }, + ], + }, + ], + }, + 'docId', + { + pagination: { + limit: 50, + skip: 0, + }, + hits: { + pagination: { + limit: 2, + skip: 0, + }, + fields: ['blockId', 'flavour'], + highlights: [ + { + field: 'content', + before: '', + end: '', + }, + ], + }, + } + ); + + const docData = await this.indexer.docIndex.getAll( + buckets.map(bucket => bucket.key) + ); + + const result = []; + + for (const bucket of buckets) { + const firstMatchFlavour = bucket.hits.nodes[0]?.fields.flavour; + if (firstMatchFlavour === 'affine:page') { + // is title match + const blockContent = bucket.hits.nodes[1]?.highlights.content[0]; // try to get block content + result.push({ + docId: bucket.key, + title: bucket.hits.nodes[0].highlights.content[0], + score: bucket.score, + blockContent, + }); + } else { + const title = + docData.find(doc => doc.id === bucket.key)?.get('title') ?? ''; + const matchedBlockId = bucket.hits.nodes[0]?.fields.blockId; + // is block match + result.push({ + docId: bucket.key, + title: typeof title === 'string' ? title : title[0], + blockId: + typeof matchedBlockId === 'string' + ? matchedBlockId + : matchedBlockId[0], + score: bucket.score, + blockContent: bucket.hits.nodes[0]?.highlights.content[0], + }); + } + } + + return result; + } + + async getDocTitle(docId: string) { + const doc = await this.indexer.docIndex.get(docId); + const title = doc?.get('title'); + return typeof title === 'string' ? title : title?.[0]; + } +}