feat(core): run indexer in worker (#7418)

This commit is contained in:
liuyi 2024-07-04 15:37:26 +08:00 committed by GitHub
parent 5c1f78afd4
commit 555f203be6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 717 additions and 362 deletions

View File

@ -74,4 +74,11 @@ spec:
name: affine-web
port:
number: {{ .Values.web.service.port }}
- path: /js/worker.(.+).js
pathType: ImplementationSpecific
backend:
service:
name: affine-web
port:
number: {{ .Values.web.service.port }}
{{- end }}

View File

@ -2,6 +2,7 @@
"name": "@toeverything/infra",
"type": "module",
"private": true,
"sideEffects": false,
"exports": {
"./blocksuite": "./src/blocksuite/index.ts",
"./storage": "./src/storage/index.ts",

View File

@ -18,6 +18,7 @@ import {
timer,
} from 'rxjs';
import { MANUALLY_STOP } from '../utils';
import type { LiveData } from './livedata';
/**
@ -107,7 +108,8 @@ export function fromPromise<T>(
.catch(error => {
subscriber.error(error);
});
return () => abortController.abort('Aborted');
return () => abortController.abort(MANUALLY_STOP);
});
}

View File

@ -4,6 +4,7 @@ import { difference } from 'lodash-es';
import { LiveData } from '../../livedata';
import type { Memento } from '../../storage';
import { MANUALLY_STOP } from '../../utils';
import { BlobStorageOverCapacity } from './error';
const logger = new DebugLogger('affine:blob-engine');
@ -70,7 +71,7 @@ export class BlobEngine {
}
stop() {
this.abort?.abort();
this.abort?.abort(MANUALLY_STOP);
this.abort = null;
}

View File

@ -0,0 +1,24 @@
AFFiNE currently has a lot of data stored using the old ID format. Here, we record the usage of IDs to avoid forgetting.
## Old ID Format
The format is:
- `{workspace-id}:space:{nanoid}` Common
- `{workspace-id}:space:page:{nanoid}`
> Note: sometimes the `workspace-id` is not same with current workspace id.
## Usage
- Local Storage
- indexeddb: Both new and old IDs coexist
- sqlite: Both new and old IDs coexist
- server-clock: Only new IDs are stored
- sync-metadata: Both new and old IDs coexist
- Server Storage
- Only stores new IDs but accepts writes using old IDs
- Protocols
- When the client submits an update, both new and old IDs are used.
- When the server broadcasts updates sent by other clients, both new and old IDs are used.
- When the server responds to `client-pre-sync` (listing all updated docids), only new IDs are used.

View File

@ -20,6 +20,9 @@ export interface IndexWriter<S extends Schema>
delete(id: string): void;
// TODO(@eyhn)
// deleteByQuery(query: Query<S>): void;
commit(): Promise<void>;
rollback(): void;

View File

@ -155,6 +155,9 @@ export class IndexedDBJobQueue<J> implements JobQueue<J> {
.objectStore('jobs')
.delete(typeof id === 'string' ? parseInt(id) : id);
}
trx.commit();
this.broadcast.postMessage('job-completed');
}
async return(jobs: Job[], retry: boolean = false): Promise<void> {
@ -174,6 +177,10 @@ export class IndexedDBJobQueue<J> implements JobQueue<J> {
.delete(typeof id === 'string' ? parseInt(id) : id);
}
}
trx.commit();
this.broadcast.postMessage('job-completed');
}
async clear(): Promise<void> {

View File

@ -29,7 +29,7 @@ export class JobRunner<J> {
}
stop() {
this.abort?.abort();
this.abort?.abort(MANUALLY_STOP);
this.abort = null;
}

View File

@ -41,8 +41,8 @@ export class FetchService extends Service {
throw externalSignal.reason;
}
const abortController = new AbortController();
externalSignal?.addEventListener('abort', () => {
abortController.abort();
externalSignal?.addEventListener('abort', reason => {
abortController.abort(reason);
});
const timeout = init?.timeout ?? 15000;

View File

@ -1,32 +1,32 @@
import { DebugLogger } from '@affine/debug';
import type { AffineTextAttributes } from '@blocksuite/blocks';
import type { DeltaInsert } from '@blocksuite/inline';
import type { Job, JobQueue, WorkspaceService } from '@toeverything/infra';
import {
Document,
Entity,
IndexedDBIndexStorage,
IndexedDBJobQueue,
JobRunner,
LiveData,
} from '@toeverything/infra';
import { difference } from 'lodash-es';
import { map } from 'rxjs';
import { Array as YArray, Map as YMap, type Text as YText } from 'yjs';
import { applyUpdate, Doc as YDoc } from 'yjs';
import {
type BlockIndexSchema,
blockIndexSchema,
docIndexSchema,
} from '../schema';
import { blockIndexSchema, docIndexSchema } from '../schema';
import { createWorker, type IndexerWorker } from '../worker/out-worker';
export function isEmptyUpdate(binary: Uint8Array) {
return (
binary.byteLength === 0 ||
(binary.byteLength === 2 && binary[0] === 0 && binary[1] === 0)
);
}
const logger = new DebugLogger('crawler');
interface IndexerJobPayload {
docId: string;
storageDocId: string;
}
// TODO(@eyhn): simplify this, it's too complex
export class DocsIndexer extends Entity {
private readonly jobQueue: JobQueue<IndexerJobPayload> =
new IndexedDBJobQueue<IndexerJobPayload>(
@ -51,6 +51,8 @@ export class DocsIndexer extends Entity {
private readonly workspaceId = this.workspaceService.workspace.id;
private worker: IndexerWorker | null = null;
readonly status$ = LiveData.from<{ remaining?: number }>(
this.jobQueue.status$.pipe(
map(status => ({
@ -67,13 +69,13 @@ export class DocsIndexer extends Entity {
setupListener() {
this.workspaceEngine.doc.storage.eventBus.on(event => {
if (event.clientId === this.workspaceEngine.doc.clientId) {
const docId = event.docId;
const docId = normalizeDocId(event.docId);
this.jobQueue
.enqueue([
{
batchKey: docId,
payload: { docId },
payload: { docId, storageDocId: event.docId },
},
])
.catch(err => {
@ -83,358 +85,32 @@ export class DocsIndexer extends Entity {
});
}
async execJob(jobs: Job<IndexerJobPayload>[], _signal: AbortSignal) {
async execJob(jobs: Job<IndexerJobPayload>[], signal: AbortSignal) {
if (jobs.length === 0) {
return;
}
// jobs should have the same docId, so we just pick the first one
const docId = jobs[0].payload.docId;
const storageDocId = jobs[0].payload.storageDocId;
const worker = await this.ensureWorker(signal);
const startTime = performance.now();
logger.debug('Start crawling job for docId:', docId);
if (docId) {
let workerOutput;
if (docId === this.workspaceId) {
await this.crawlingRootDocData();
} else {
await this.crawlingDocData(docId);
}
}
const duration = performance.now() - startTime;
logger.debug(
'Finish crawling job for docId:' + docId + ' in ' + duration + 'ms '
);
}
startCrawling() {
this.runner.start();
this.jobQueue
.enqueue([
{
batchKey: this.workspaceId,
payload: { docId: this.workspaceId },
},
])
.catch(err => {
console.error('Error enqueueing job', err);
});
}
async crawlingDocData(docId: string) {
const rootDocBuffer =
await this.workspaceEngine.doc.storage.loadDocFromLocal(this.workspaceId);
await this.workspaceEngine.doc.storage.loadDocFromLocal(
this.workspaceId
);
if (!rootDocBuffer) {
return;
}
const yRootDoc = new YDoc();
applyUpdate(yRootDoc, rootDocBuffer);
const docStoragePossibleIds = Array.from(yRootDoc.getSubdocs())
.map(doc => doc.guid)
.filter(id => id.endsWith(docId));
let docBuffer;
for (const id of docStoragePossibleIds) {
docBuffer = await this.workspaceEngine.doc.storage.loadDocFromLocal(id);
if (docBuffer) {
break;
}
}
if (!docBuffer) {
return;
}
const ydoc = new YDoc();
applyUpdate(ydoc, docBuffer);
let docExists: boolean | null = null;
(
yRootDoc.getMap('meta').get('pages') as YArray<YMap<any>> | undefined
)?.forEach(page => {
if (page.get('id') === docId) {
docExists = !(page.get('trash') ?? false);
}
});
if (!docExists) {
const indexWriter = await this.docIndex.write();
indexWriter.delete(docId);
await indexWriter.commit();
const blockIndexWriter = await this.blockIndex.write();
const oldBlocks = await blockIndexWriter.search(
{
type: 'match',
field: 'docId',
match: docId,
},
{
pagination: {
limit: Number.MAX_SAFE_INTEGER,
},
}
);
for (const block of oldBlocks.nodes) {
blockIndexWriter.delete(block.id);
}
await blockIndexWriter.commit();
} else {
const blocks = ydoc.getMap<any>('blocks');
if (blocks.size === 0) {
return;
}
let docTitle = '';
const blockDocuments: Document<typeof blockIndexSchema>[] = [];
for (const block of blocks.values()) {
const flavour = block.get('sys:flavour')?.toString();
const blockId = block.get('sys:id')?.toString();
if (!flavour || !blockId) {
continue;
}
if (flavour === 'affine:page') {
docTitle = block.get('prop:title').toString();
blockDocuments.push(
Document.from(`${docId}:${blockId}`, {
docId,
flavour,
blockId,
content: docTitle,
})
);
}
if (
flavour === 'affine:paragraph' ||
flavour === 'affine:list' ||
flavour === 'affine:code'
) {
const text = block.get('prop:text') as YText;
if (!text) {
continue;
}
const deltas: DeltaInsert<AffineTextAttributes>[] = text.toDelta();
const ref = deltas
.map(delta => {
if (
delta.attributes &&
delta.attributes.reference &&
delta.attributes.reference.pageId
) {
return delta.attributes.reference.pageId;
}
return null;
})
.filter((link): link is string => !!link);
blockDocuments.push(
Document.from<BlockIndexSchema>(`${docId}:${blockId}`, {
docId,
flavour,
blockId,
content: text.toString(),
ref,
})
);
}
if (
flavour === 'affine:embed-linked-doc' ||
flavour === 'affine:embed-synced-doc'
) {
const pageId = block.get('prop:pageId');
if (typeof pageId === 'string') {
blockDocuments.push(
Document.from<BlockIndexSchema>(`${docId}:${blockId}`, {
docId,
flavour,
blockId,
ref: pageId,
})
);
}
}
if (flavour === 'affine:attachment' || flavour === 'affine:image') {
const blobId = block.get('prop:sourceId');
if (typeof blobId === 'string') {
blockDocuments.push(
Document.from<BlockIndexSchema>(`${docId}:${blockId}`, {
docId,
flavour,
blockId,
blob: [blobId],
})
);
}
}
if (flavour === 'affine:surface') {
const texts = [];
const elementsObj = block.get('prop:elements');
if (
!(
elementsObj instanceof YMap &&
elementsObj.get('type') === '$blocksuite:internal:native$'
)
) {
continue;
}
const elements = elementsObj.get('value') as YMap<any>;
if (!(elements instanceof YMap)) {
continue;
}
for (const element of elements.values()) {
if (!(element instanceof YMap)) {
continue;
}
const text = element.get('text') as YText;
if (!text) {
continue;
}
texts.push(text.toString());
}
blockDocuments.push(
Document.from<BlockIndexSchema>(`${docId}:${blockId}`, {
docId,
flavour,
blockId,
content: texts,
})
);
}
if (flavour === 'affine:database') {
const texts = [];
const columnsObj = block.get('prop:columns');
if (!(columnsObj instanceof YArray)) {
continue;
}
for (const column of columnsObj) {
if (!(column instanceof YMap)) {
continue;
}
if (typeof column.get('name') === 'string') {
texts.push(column.get('name'));
}
const data = column.get('data');
if (!(data instanceof YMap)) {
continue;
}
const options = data.get('options');
if (!(options instanceof YArray)) {
continue;
}
for (const option of options) {
if (!(option instanceof YMap)) {
continue;
}
const value = option.get('value');
if (typeof value === 'string') {
texts.push(value);
}
}
}
blockDocuments.push(
Document.from<BlockIndexSchema>(`${docId}:${blockId}`, {
docId,
flavour,
blockId,
content: texts,
})
);
}
}
const docIndexWriter = await this.docIndex.write();
docIndexWriter.put(
Document.from<typeof docIndexSchema>(docId, {
title: docTitle,
})
);
await docIndexWriter.commit();
const blockIndexWriter = await this.blockIndex.write();
const oldBlocks = await blockIndexWriter.search(
{
type: 'match',
field: 'docId',
match: docId,
},
{
pagination: {
limit: Number.MAX_SAFE_INTEGER,
},
}
);
for (const block of oldBlocks.nodes) {
blockIndexWriter.delete(block.id);
}
for (const block of blockDocuments) {
blockIndexWriter.insert(block);
}
await blockIndexWriter.commit();
}
}
async crawlingRootDocData() {
const buffer = await this.workspaceEngine.doc.storage.loadDocFromLocal(
this.workspaceId
);
if (!buffer) {
return;
}
const ydoc = new YDoc();
applyUpdate(ydoc, buffer);
const docs = ydoc.getMap('meta').get('pages') as
| YArray<YMap<any>>
| undefined;
if (!docs) {
return;
}
const availableDocs = [];
for (const page of docs) {
const docId = page.get('id');
if (typeof docId !== 'string') {
continue;
}
const inTrash = page.get('trash') ?? false;
if (!inTrash) {
availableDocs.push(docId);
}
}
// a hack to get all docs in index
const allIndexedDocs = (
await this.docIndex.search(
{
@ -449,14 +125,174 @@ export class DocsIndexer extends Entity {
)
).nodes.map(n => n.id);
const needDelete = difference(allIndexedDocs, availableDocs);
const needAdd = difference(availableDocs, allIndexedDocs);
workerOutput = await worker.run({
type: 'rootDoc',
allIndexedDocs,
rootDocBuffer,
});
} else {
const rootDocBuffer =
await this.workspaceEngine.doc.storage.loadDocFromLocal(
this.workspaceId
);
const docBuffer =
(await this.workspaceEngine.doc.storage.loadDocFromLocal(
storageDocId
)) ?? new Uint8Array(0);
if (!rootDocBuffer) {
return;
}
workerOutput = await worker.run({
type: 'doc',
docBuffer,
docId,
rootDocBuffer,
});
}
if (workerOutput.deletedDoc || workerOutput.addedDoc) {
if (workerOutput.deletedDoc) {
const docIndexWriter = await this.docIndex.write();
for (const docId of workerOutput.deletedDoc) {
docIndexWriter.delete(docId);
}
await docIndexWriter.commit();
const blockIndexWriter = await this.blockIndex.write();
for (const docId of workerOutput.deletedDoc) {
const oldBlocks = await blockIndexWriter.search(
{
type: 'match',
field: 'docId',
match: docId,
},
{
pagination: {
limit: Number.MAX_SAFE_INTEGER,
},
}
);
for (const block of oldBlocks.nodes) {
docIndexWriter.delete(block.id);
}
}
await blockIndexWriter.commit();
}
if (workerOutput.addedDoc) {
const docIndexWriter = await this.docIndex.write();
for (const { doc } of workerOutput.addedDoc) {
docIndexWriter.put(doc);
}
await docIndexWriter.commit();
const blockIndexWriter = await this.blockIndex.write();
for (const { blocks } of workerOutput.addedDoc) {
// delete old blocks
const oldBlocks = await blockIndexWriter.search(
{
type: 'match',
field: 'docId',
match: docId,
},
{
pagination: {
limit: Number.MAX_SAFE_INTEGER,
},
}
);
for (const block of oldBlocks.nodes) {
blockIndexWriter.delete(block.id);
}
for (const block of blocks) {
blockIndexWriter.insert(block);
}
}
await blockIndexWriter.commit();
}
}
if (workerOutput.reindexDoc) {
await this.jobQueue.enqueue(
[...needAdd, ...needDelete].map(docId => ({
workerOutput.reindexDoc.map(({ docId, storageDocId }) => ({
batchKey: docId,
payload: { docId },
payload: { docId, storageDocId },
}))
);
}
const duration = performance.now() - startTime;
logger.debug(
'Finish crawling job for docId:' + docId + ' in ' + duration + 'ms '
);
}
startCrawling() {
this.runner.start();
this.jobQueue
.enqueue([
{
batchKey: this.workspaceId,
payload: { docId: this.workspaceId, storageDocId: this.workspaceId },
},
])
.catch(err => {
console.error('Error enqueueing job', err);
});
}
async ensureWorker(signal: AbortSignal): Promise<IndexerWorker> {
if (!this.worker) {
this.worker = await createWorker(signal);
}
return this.worker;
}
override dispose(): void {
this.runner.stop();
}
}
function normalizeDocId(raw: string) {
enum DocVariant {
Workspace = 'workspace',
Page = 'page',
Space = 'space',
Settings = 'settings',
Unknown = 'unknown',
}
try {
if (!raw.length) {
throw new Error('Invalid Empty Doc ID');
}
let parts = raw.split(':');
if (parts.length > 3) {
// special adapt case `wsId:space:page:pageId`
if (parts[1] === DocVariant.Space && parts[2] === DocVariant.Page) {
parts = [parts[0], DocVariant.Space, parts[3]];
} else {
throw new Error(`Invalid format of Doc ID: ${raw}`);
}
} else if (parts.length === 2) {
// `${variant}:${guid}`
throw new Error('not supported');
} else if (parts.length === 1) {
// ${ws} or ${pageId}
parts = ['', DocVariant.Unknown, parts[0]];
}
const docId = parts.at(2);
if (!docId) {
throw new Error('ID is required');
}
return docId;
} catch (err) {
logger.error('Error on normalize docId ' + raw, err);
return raw;
}
}

View File

@ -419,4 +419,8 @@ export class DocsSearchService extends Service {
const title = doc?.get('title');
return typeof title === 'string' ? title : title?.[0];
}
override dispose(): void {
this.indexer.dispose();
}
}

View File

@ -0,0 +1,313 @@
import type { AffineTextAttributes } from '@blocksuite/blocks';
import type { DeltaInsert } from '@blocksuite/inline';
import { Document } from '@toeverything/infra';
import { difference } from 'lodash-es';
import {
applyUpdate,
Array as YArray,
Doc as YDoc,
Map as YMap,
type Text as YText,
} from 'yjs';
import type { BlockIndexSchema, docIndexSchema } from '../schema';
import type {
WorkerIngoingMessage,
WorkerInput,
WorkerOutgoingMessage,
WorkerOutput,
} from './types';
function crawlingDocData({
docBuffer,
docId,
rootDocBuffer,
}: WorkerInput & { type: 'doc' }): WorkerOutput {
const yRootDoc = new YDoc();
applyUpdate(yRootDoc, rootDocBuffer);
const ydoc = new YDoc();
applyUpdate(ydoc, docBuffer);
let docExists: boolean | null = null;
(
yRootDoc.getMap('meta').get('pages') as YArray<YMap<any>> | undefined
)?.forEach(page => {
if (page.get('id') === docId) {
docExists = !(page.get('trash') ?? false);
}
});
if (!docExists) {
return {
deletedDoc: [docId],
};
} else {
const blocks = ydoc.getMap<any>('blocks');
if (blocks.size === 0) {
return {};
}
let docTitle = '';
const blockDocuments: Document<BlockIndexSchema>[] = [];
for (const block of blocks.values()) {
const flavour = block.get('sys:flavour')?.toString();
const blockId = block.get('sys:id')?.toString();
if (!flavour || !blockId) {
continue;
}
if (flavour === 'affine:page') {
docTitle = block.get('prop:title').toString();
blockDocuments.push(
Document.from(`${docId}:${blockId}`, {
docId,
flavour,
blockId,
content: docTitle,
})
);
}
if (
flavour === 'affine:paragraph' ||
flavour === 'affine:list' ||
flavour === 'affine:code'
) {
const text = block.get('prop:text') as YText;
if (!text) {
continue;
}
const deltas: DeltaInsert<AffineTextAttributes>[] = text.toDelta();
const ref = deltas
.map(delta => {
if (
delta.attributes &&
delta.attributes.reference &&
delta.attributes.reference.pageId
) {
return delta.attributes.reference.pageId;
}
return null;
})
.filter((link): link is string => !!link);
blockDocuments.push(
Document.from<BlockIndexSchema>(`${docId}:${blockId}`, {
docId,
flavour,
blockId,
content: text.toString(),
ref,
})
);
}
if (
flavour === 'affine:embed-linked-doc' ||
flavour === 'affine:embed-synced-doc'
) {
const pageId = block.get('prop:pageId');
if (typeof pageId === 'string') {
blockDocuments.push(
Document.from<BlockIndexSchema>(`${docId}:${blockId}`, {
docId,
flavour,
blockId,
ref: pageId,
})
);
}
}
if (flavour === 'affine:attachment' || flavour === 'affine:image') {
const blobId = block.get('prop:sourceId');
if (typeof blobId === 'string') {
blockDocuments.push(
Document.from<BlockIndexSchema>(`${docId}:${blockId}`, {
docId,
flavour,
blockId,
blob: [blobId],
})
);
}
}
if (flavour === 'affine:surface') {
const texts = [];
const elementsObj = block.get('prop:elements');
if (
!(
elementsObj instanceof YMap &&
elementsObj.get('type') === '$blocksuite:internal:native$'
)
) {
continue;
}
const elements = elementsObj.get('value') as YMap<any>;
if (!(elements instanceof YMap)) {
continue;
}
for (const element of elements.values()) {
if (!(element instanceof YMap)) {
continue;
}
const text = element.get('text') as YText;
if (!text) {
continue;
}
texts.push(text.toString());
}
blockDocuments.push(
Document.from<BlockIndexSchema>(`${docId}:${blockId}`, {
docId,
flavour,
blockId,
content: texts,
})
);
}
if (flavour === 'affine:database') {
const texts = [];
const columnsObj = block.get('prop:columns');
if (!(columnsObj instanceof YArray)) {
continue;
}
for (const column of columnsObj) {
if (!(column instanceof YMap)) {
continue;
}
if (typeof column.get('name') === 'string') {
texts.push(column.get('name'));
}
const data = column.get('data');
if (!(data instanceof YMap)) {
continue;
}
const options = data.get('options');
if (!(options instanceof YArray)) {
continue;
}
for (const option of options) {
if (!(option instanceof YMap)) {
continue;
}
const value = option.get('value');
if (typeof value === 'string') {
texts.push(value);
}
}
}
blockDocuments.push(
Document.from<BlockIndexSchema>(`${docId}:${blockId}`, {
docId,
flavour,
blockId,
content: texts,
})
);
}
}
return {
addedDoc: [
{
id: docId,
doc: Document.from<typeof docIndexSchema>(docId, {
title: docTitle,
}),
blocks: blockDocuments,
},
],
};
}
}
function crawlingRootDocData({
allIndexedDocs,
rootDocBuffer,
}: WorkerInput & {
type: 'rootDoc';
}): WorkerOutput {
const ydoc = new YDoc();
applyUpdate(ydoc, rootDocBuffer);
const docs = ydoc.getMap('meta').get('pages') as
| YArray<YMap<any>>
| undefined;
if (!docs) {
return {};
}
const availableDocs = [];
for (const page of docs) {
const docId = page.get('id');
if (typeof docId !== 'string') {
continue;
}
const inTrash = page.get('trash') ?? false;
if (!inTrash) {
availableDocs.push(docId);
}
}
const needDelete = difference(allIndexedDocs, availableDocs);
const needAdd = difference(availableDocs, allIndexedDocs);
return {
reindexDoc: [...needAdd, ...needDelete].map(docId => ({
docId,
storageDocId: ydoc.getMap<YDoc>('spaces').get(docId)?.guid ?? docId,
})),
};
}
globalThis.onmessage = (event: MessageEvent<WorkerIngoingMessage>) => {
const message = event.data;
if (message.type === 'init') {
postMessage({ type: 'init', msgId: message.msgId });
return;
}
if (message.type === 'run') {
const { input } = message;
try {
let data;
if (input.type === 'rootDoc') {
data = crawlingRootDocData(input);
} else {
data = crawlingDocData(input);
}
postMessage({ type: 'done', msgId: message.msgId, output: data });
} catch (error) {
postMessage({
type: 'failed',
msgId: message.msgId,
error: error instanceof Error ? error.message : error + '',
});
}
}
};
declare function postMessage(message: WorkerOutgoingMessage): void;

View File

@ -0,0 +1,104 @@
import { DebugLogger } from '@affine/debug';
import { MANUALLY_STOP, throwIfAborted } from '@toeverything/infra';
import type {
WorkerIngoingMessage,
WorkerInput,
WorkerOutgoingMessage,
WorkerOutput,
} from './types';
const logger = new DebugLogger('affine:indexer-worker');
export async function createWorker(abort: AbortSignal) {
let worker: Worker | null = null;
while (throwIfAborted(abort)) {
try {
worker = await new Promise<Worker>((resolve, reject) => {
// @TODO(@forehalo): need to make a general worker
const worker = new Worker(
/* webpackChunkName: "worker" */ new URL(
'./in-worker.ts',
import.meta.url
)
);
worker.addEventListener('error', reject);
worker.addEventListener('message', event => {
if (event.data.type === 'init') {
resolve(worker);
}
});
worker.postMessage({ type: 'init', msgId: 0 } as WorkerIngoingMessage);
setTimeout(() => {
reject('timeout');
}, 1000 * 30 /* 30 sec */);
});
} catch (err) {
logger.debug(
`Indexer worker init failed, ${err}, will retry in 5 seconds.`
);
await new Promise(resolve => setTimeout(resolve, 5000));
}
if (worker) {
break;
}
}
if (!worker) {
// never reach here
throw new Error('Worker is not created');
}
const terminateAbort = new AbortController();
let msgId = 1;
return {
run: async (input: WorkerInput) => {
const dispose: (() => void)[] = [];
return new Promise<WorkerOutput>((resolve, reject) => {
const currentMsgId = msgId++;
const msgHandler = (event: MessageEvent<WorkerOutgoingMessage>) => {
if (event.data.msgId === currentMsgId) {
if (event.data.type === 'done') {
resolve(event.data.output);
} else if (event.data.type === 'failed') {
reject(new Error(event.data.error));
} else {
reject(new Error('Unknown message type'));
}
}
};
const abortHandler = (reason: any) => {
reject(reason);
};
worker.addEventListener('message', msgHandler);
dispose.push(() => {
worker?.removeEventListener('message', msgHandler);
});
terminateAbort.signal.addEventListener('abort', abortHandler);
dispose.push(() => {
terminateAbort.signal.removeEventListener('abort', abortHandler);
});
worker.postMessage({
type: 'run',
msgId: currentMsgId,
input,
} as WorkerIngoingMessage);
}).finally(() => {
for (const d of dispose) {
d();
}
});
},
dispose: () => {
worker.terminate();
terminateAbort.abort(MANUALLY_STOP);
},
};
}
export type IndexerWorker = Awaited<ReturnType<typeof createWorker>>;

View File

@ -0,0 +1,50 @@
import type { Document } from '@toeverything/infra';
import type { BlockIndexSchema, DocIndexSchema } from '../schema';
export type WorkerIngoingMessage = (
| {
type: 'init';
}
| {
type: 'run';
input: WorkerInput;
}
) & { msgId: number };
export type WorkerOutgoingMessage = (
| {
type: 'init';
}
| {
type: 'done';
output: WorkerOutput;
}
| {
type: 'failed';
error: string;
}
) & { msgId: number };
export type WorkerInput =
| {
type: 'rootDoc';
rootDocBuffer: Uint8Array;
allIndexedDocs: string[];
}
| {
type: 'doc';
docId: string;
rootDocBuffer: Uint8Array;
docBuffer: Uint8Array;
};
export interface WorkerOutput {
reindexDoc?: { docId: string; storageDocId: string }[];
addedDoc?: {
id: string;
blocks: Document<BlockIndexSchema>[];
doc: Document<DocIndexSchema>;
}[];
deletedDoc?: string[];
}

View File

@ -112,8 +112,10 @@ export const createConfiguration: (
? 'js/[name]-[contenthash:8].js'
: 'js/[name].js',
// In some cases webpack will emit files starts with "_" which is reserved in web extension.
chunkFilename:
buildFlags.mode === 'production'
chunkFilename: pathData =>
pathData.chunk?.name === 'worker'
? 'js/worker.[contenthash:8].js'
: buildFlags.mode === 'production'
? 'js/chunk.[name]-[contenthash:8].js'
: 'js/chunk.[name].js',
assetModuleFilename:
@ -127,6 +129,7 @@ export const createConfiguration: (
clean: buildFlags.mode === 'production',
globalObject: 'globalThis',
publicPath: getPublicPath(buildFlags),
workerPublicPath: '/',
},
target: ['web', 'es2022'],