From 3870801ebbe61e603916e99983d0e987103722a5 Mon Sep 17 00:00:00 2001 From: EYHN Date: Tue, 2 Jul 2024 09:17:39 +0000 Subject: [PATCH] feat(infra): job system (#7212) --- packages/common/infra/src/sync/job/README.md | 47 ++++ .../src/sync/job/__tests__/black-box.spec.ts | 231 +++++++++++++++++ .../src/sync/job/impl/indexeddb/index.ts | 242 ++++++++++++++++++ packages/common/infra/src/sync/job/index.ts | 2 + packages/common/infra/src/sync/job/queue.ts | 28 ++ packages/common/infra/src/sync/job/runner.ts | 60 +++++ scripts/setup/polyfill.ts | 12 + vitest.config.ts | 1 + 8 files changed, 623 insertions(+) create mode 100644 packages/common/infra/src/sync/job/README.md create mode 100644 packages/common/infra/src/sync/job/__tests__/black-box.spec.ts create mode 100644 packages/common/infra/src/sync/job/impl/indexeddb/index.ts create mode 100644 packages/common/infra/src/sync/job/index.ts create mode 100644 packages/common/infra/src/sync/job/queue.ts create mode 100644 packages/common/infra/src/sync/job/runner.ts create mode 100644 scripts/setup/polyfill.ts diff --git a/packages/common/infra/src/sync/job/README.md b/packages/common/infra/src/sync/job/README.md new file mode 100644 index 0000000000..30b8b37331 --- /dev/null +++ b/packages/common/infra/src/sync/job/README.md @@ -0,0 +1,47 @@ +# job + +Job system abstraction for AFFiNE. Currently, only `IndexedDBJobQueue` is implemented; more backends will be implemented in the future. + +Run background jobs in browser & distributed environment. `runners` can consume tasks simultaneously without additional communication. + +# Basic Usage + +```ts +const queue = new IndexedDBJobQueue('my-queue'); + +await queue.enqueue([ + { + batchKey: '1', + payload: { a: 'hello' }, + }, + { + batchKey: '2', + payload: { a: 'world' }, + }, +]); + +const runner = new JobRunner(queue, job => { + console.log(job); +}); + +runner.start(); + +// Output: +// { batchKey: '1', payload: { a: 'hello' } } +// { batchKey: '2', payload: { a: 'world' } } +``` + +## `batchKey` + +Each job has a `batchKey`, and jobs with the same `batchKey` are handed over to one `runner` for execution at once. +Additionally, if there are ongoing jobs with the same `batchKey`, other `runners` will not take on jobs with this `batchKey`, ensuring exclusive resource locking. + +> In the future, `batchKey` will be used to implement priority. + +## `timeout` + +If the job execution time exceeds 30 seconds, it will be considered a timeout and reassigned to another `runner`. + +## Error Handling + +If an error is thrown during job execution, will log an error, but the job will be considered complete. diff --git a/packages/common/infra/src/sync/job/__tests__/black-box.spec.ts b/packages/common/infra/src/sync/job/__tests__/black-box.spec.ts new file mode 100644 index 0000000000..e9bb944578 --- /dev/null +++ b/packages/common/infra/src/sync/job/__tests__/black-box.spec.ts @@ -0,0 +1,231 @@ +/** + * @vitest-environment happy-dom + */ +import 'fake-indexeddb/auto'; + +import { afterEach, beforeEach, describe, expect, test, vitest } from 'vitest'; + +import { IndexedDBJobQueue } from '../impl/indexeddb'; +import type { JobQueue } from '../queue'; + +let queue: JobQueue<{ + a: string; +}> = null!; + +describe.each([{ name: 'idb', backend: IndexedDBJobQueue }])( + 'impl tests($name)', + ({ backend }) => { + beforeEach(async () => { + queue = new backend(); + + await queue.clear(); + + vitest.useFakeTimers({ + toFake: ['Date'], + }); + }); + + afterEach(() => { + vitest.useRealTimers(); + }); + + test('basic', async () => { + await queue.enqueue([ + { + batchKey: '1', + payload: { a: 'hello' }, + }, + { + batchKey: '2', + payload: { a: 'world' }, + }, + ]); + const job1 = await queue.accept(); + const job2 = await queue.accept(); + + expect([job1!, job2!]).toEqual([ + [ + { + id: expect.any(String), + batchKey: '1', + payload: { a: 'hello' }, + }, + ], + [ + { + id: expect.any(String), + batchKey: '2', + payload: { a: 'world' }, + }, + ], + ]); + + const job3 = await queue.accept(); + expect(job3).toBeNull(); + + await queue.return(job1!); + await queue.return(job2!); + }); + + test('batch', async () => { + await queue.enqueue([ + { + batchKey: '1', + payload: { a: 'hello' }, + }, + { + batchKey: '1', + payload: { a: 'world' }, + }, + ]); + const job1 = await queue.accept(); + + expect(job1).toEqual( + expect.arrayContaining([ + { + id: expect.any(String), + batchKey: '1', + payload: { a: 'hello' }, + }, + { + id: expect.any(String), + batchKey: '1', + payload: { a: 'world' }, + }, + ]) + ); + }); + + test('timeout', async () => { + await queue.enqueue([ + { + batchKey: '1', + payload: { a: 'hello' }, + }, + ]); + { + const job = await queue.accept(); + + expect(job).toEqual([ + { + id: expect.any(String), + batchKey: '1', + payload: { a: 'hello' }, + }, + ]); + } + + { + const job = await queue.accept(); + + expect(job).toBeNull(); + } + + vitest.advanceTimersByTime(1000 * 60 * 60); + + { + const job = await queue.accept(); + + expect(job).toEqual([ + { + id: expect.any(String), + batchKey: '1', + payload: { a: 'hello' }, + }, + ]); + } + }); + + test('waitForAccept', async () => { + const abort = new AbortController(); + + let result = null as any; + queue.waitForAccept(abort.signal).then(jobs => (result = jobs)); + + await new Promise(resolve => setTimeout(resolve, 100)); + + expect(result).toBeNull(); + + await queue.enqueue([ + { + batchKey: '1', + payload: { a: 'hello' }, + }, + ]); + + await vitest.waitFor(() => { + expect(result).toEqual([ + { + id: expect.any(String), + batchKey: '1', + payload: { a: 'hello' }, + }, + ]); + }); + }); + + test('waitForAccept race', async () => { + const abort = new AbortController(); + + let result1 = null as any; + let result2 = null as any; + queue.waitForAccept(abort.signal).then(jobs => (result1 = jobs)); + queue.waitForAccept(abort.signal).then(jobs => (result2 = jobs)); + + await new Promise(resolve => setTimeout(resolve, 100)); + + expect(result1).toBeNull(); + expect(result2).toBeNull(); + + await queue.enqueue([ + { + batchKey: '1', + payload: { a: 'hello' }, + }, + ]); + + await new Promise(resolve => setTimeout(resolve, 100)); + + expect([result1, result2]).toEqual( + expect.arrayContaining([ + [ + { + id: expect.any(String), + batchKey: '1', + payload: { a: 'hello' }, + }, + ], + null, + ]) + ); + + await queue.enqueue([ + { + batchKey: '2', + payload: { a: 'world' }, + }, + ]); + + await vitest.waitFor(() => { + expect([result1, result2]).toEqual( + expect.arrayContaining([ + [ + { + id: expect.any(String), + batchKey: '1', + payload: { a: 'hello' }, + }, + ], + [ + { + id: expect.any(String), + batchKey: '2', + payload: { a: 'world' }, + }, + ], + ]) + ); + }); + }); + } +); diff --git a/packages/common/infra/src/sync/job/impl/indexeddb/index.ts b/packages/common/infra/src/sync/job/impl/indexeddb/index.ts new file mode 100644 index 0000000000..d99ccfd55f --- /dev/null +++ b/packages/common/infra/src/sync/job/impl/indexeddb/index.ts @@ -0,0 +1,242 @@ +import type { DBSchema, IDBPDatabase } from 'idb'; +import { openDB } from 'idb'; +import { merge, Observable, of, throttleTime } from 'rxjs'; + +import { fromPromise } from '../../../../livedata'; +import { throwIfAborted } from '../../../../utils'; +import { exhaustMapWithTrailing } from '../../../../utils/exhaustmap-with-trailing'; +import type { Job, JobParams, JobQueue } from '../../'; + +interface IndexDB extends DBSchema { + jobs: { + key: number; + value: JobRecord; + indexes: { + batchKey: string; + }; + }; +} + +interface JobRecord { + batchKey: string; + startTime: number | null; + payload: any; +} + +export class IndexedDBJobQueue implements JobQueue { + database: IDBPDatabase = null as any; + broadcast = new BroadcastChannel('idb-job-queue:' + this.databaseName); + + constructor(private readonly databaseName: string = 'jobs') {} + + async enqueue(jobs: JobParams[]): Promise { + await this.ensureInitialized(); + const trx = this.database.transaction(['jobs'], 'readwrite'); + + for (const job of jobs) { + await trx.objectStore('jobs').add({ + batchKey: job.batchKey, + payload: job.payload, + startTime: null, + }); + } + + trx.commit(); + + // send broadcast to notify new jobs + this.broadcast.postMessage('new-jobs'); + } + + async accept(): Promise { + await this.ensureInitialized(); + const jobs = []; + const trx = this.database.transaction(['jobs'], 'readwrite'); + + // if no priority jobs + + if (jobs.length === 0) { + const batchKeys = trx.objectStore('jobs').index('batchKey').iterate(); + + let currentBatchKey: string = null as any; + let currentBatchJobs = []; + let skipCurrentBatch = false; + + for await (const item of batchKeys) { + if (item.value.batchKey !== currentBatchKey) { + if (!skipCurrentBatch && currentBatchJobs.length > 0) { + break; + } + + currentBatchKey = item.value.batchKey; + currentBatchJobs = []; + skipCurrentBatch = false; + } + if (skipCurrentBatch) { + continue; + } + if (this.isAcceptable(item.value)) { + currentBatchJobs.push({ + id: item.primaryKey, + job: item.value, + }); + } else { + skipCurrentBatch = true; + } + } + + if (skipCurrentBatch === false && currentBatchJobs.length > 0) { + jobs.push(...currentBatchJobs); + } + } + + for (const { id, job } of jobs) { + const startTime = Date.now(); + await trx.objectStore('jobs').put({ ...job, startTime }, id); + } + + if (jobs.length === 0) { + return null; + } + + return jobs.map(({ id, job }) => ({ + id: id.toString(), + batchKey: job.batchKey, + payload: job.payload, + })); + } + + async waitForAccept(signal: AbortSignal): Promise[]> { + const broadcast = new BroadcastChannel( + 'idb-job-queue:' + this.databaseName + ); + + try { + let deferred = Promise.withResolvers(); + + broadcast.onmessage = () => { + deferred.resolve(); + }; + + while (throwIfAborted(signal)) { + const jobs = await this.accept(); + if (jobs !== null) { + return jobs; + } + + await Promise.race([ + deferred.promise, + new Promise(resolve => { + setTimeout(resolve, 5000); + }), + new Promise((_, reject) => { + // exit if manually stopped + if (signal?.aborted) { + reject(signal.reason); + } + signal?.addEventListener('abort', () => { + reject(signal.reason); + }); + }), + ]); + deferred = Promise.withResolvers(); + } + return []; + } finally { + broadcast.close(); + } + } + + async complete(jobs: Job[]): Promise { + await this.ensureInitialized(); + const trx = this.database.transaction(['jobs'], 'readwrite'); + + for (const { id } of jobs) { + await trx + .objectStore('jobs') + .delete(typeof id === 'string' ? parseInt(id) : id); + } + } + + async return(jobs: Job[], retry: boolean = false): Promise { + await this.ensureInitialized(); + const trx = this.database.transaction(['jobs'], 'readwrite'); + + for (const { id } of jobs) { + if (retry) { + const nid = typeof id === 'string' ? parseInt(id) : id; + const job = await trx.objectStore('jobs').get(nid); + if (job) { + await trx.objectStore('jobs').put({ ...job, startTime: null }, nid); + } + } else { + await trx + .objectStore('jobs') + .delete(typeof id === 'string' ? parseInt(id) : id); + } + } + } + + async clear(): Promise { + await this.ensureInitialized(); + const trx = this.database.transaction(['jobs'], 'readwrite'); + await trx.objectStore('jobs').clear(); + } + + private async ensureInitialized(): Promise { + if (!this.database) { + await this.initialize(); + } + } + + private async initialize(): Promise { + if (this.database) { + return; + } + this.database = await openDB(this.databaseName, 1, { + upgrade(database) { + const jobs = database.createObjectStore('jobs', { + autoIncrement: true, + }); + jobs.createIndex('batchKey', 'batchKey'); + }, + }); + } + + TIMEOUT = 1000 * 30 /* 30 seconds */; + + private isTimeout(job: JobRecord) { + return job.startTime !== null && job.startTime + this.TIMEOUT < Date.now(); + } + + private isAcceptable(job: JobRecord) { + return job.startTime === null || this.isTimeout(job); + } + + get status$() { + return merge( + of(1), + new Observable(subscriber => { + const broadcast = new BroadcastChannel( + 'idb-job-queue:' + this.databaseName + ); + + broadcast.onmessage = () => { + subscriber.next(1); + }; + return () => { + broadcast.close(); + }; + }) + ).pipe( + throttleTime(300, undefined, { leading: true, trailing: true }), + exhaustMapWithTrailing(() => + fromPromise(async () => { + const trx = this.database.transaction(['jobs'], 'readonly'); + const remaining = await trx.objectStore('jobs').count(); + console.log(remaining); + return { remaining }; + }) + ) + ); + } +} diff --git a/packages/common/infra/src/sync/job/index.ts b/packages/common/infra/src/sync/job/index.ts new file mode 100644 index 0000000000..a4c109e47e --- /dev/null +++ b/packages/common/infra/src/sync/job/index.ts @@ -0,0 +1,2 @@ +export * from './queue'; +export * from './runner'; diff --git a/packages/common/infra/src/sync/job/queue.ts b/packages/common/infra/src/sync/job/queue.ts new file mode 100644 index 0000000000..588102a904 --- /dev/null +++ b/packages/common/infra/src/sync/job/queue.ts @@ -0,0 +1,28 @@ +import type { Observable } from 'rxjs'; + +export interface JobParams { + batchKey: string; + payload: Payload; +} + +export interface Job extends JobParams { + id: string; +} + +export interface JobQueueStatus { + remaining: number; +} + +export interface JobQueue { + enqueue(jobs: JobParams[]): Promise; + + accept(): Promise[] | null>; + + waitForAccept(signal: AbortSignal): Promise[]>; + + return(jobs: Job[], retry?: boolean): Promise; + + clear(): Promise; + + status$: Observable; +} diff --git a/packages/common/infra/src/sync/job/runner.ts b/packages/common/infra/src/sync/job/runner.ts new file mode 100644 index 0000000000..7bf04c62d7 --- /dev/null +++ b/packages/common/infra/src/sync/job/runner.ts @@ -0,0 +1,60 @@ +import { DebugLogger } from '@affine/debug'; + +import { MANUALLY_STOP, throwIfAborted } from '../../utils'; +import type { Job, JobQueue } from './queue'; + +const logger = new DebugLogger('job-runner'); + +export class JobRunner { + abort: AbortController | null = null; + + constructor( + private readonly queue: JobQueue, + private readonly worker: ( + jobs: Job[], + signal: AbortSignal + ) => Promise, + private readonly interval: () => Promise = async () => {} + ) {} + + start() { + this.stop(); + this.abort = new AbortController(); + this.loop(this.abort.signal).catch(err => { + if (err === MANUALLY_STOP) { + return; + } + logger.error(err); + }); + } + + stop() { + this.abort?.abort(); + this.abort = null; + } + + async loop(signal: AbortSignal) { + while (throwIfAborted(signal)) { + const jobs = await this.queue.waitForAccept(signal); + + if (jobs !== null) { + try { + await this.worker(jobs, signal); + await this.queue.return(jobs); + } catch (err) { + if (err === MANUALLY_STOP) { + await this.queue.return(jobs, true); + } else { + // TODO: retry logic + await this.queue.return(jobs); + } + logger.error('Error processing jobs', err); + } + } else { + await new Promise(resolve => setTimeout(resolve, 1000)); + } + + await this.interval(); + } + } +} diff --git a/scripts/setup/polyfill.ts b/scripts/setup/polyfill.ts new file mode 100644 index 0000000000..d3fd472bce --- /dev/null +++ b/scripts/setup/polyfill.ts @@ -0,0 +1,12 @@ +/* eslint-disable */ +// @ts-nocheck + +Promise.withResolvers ??= function withResolvers() { + var a, + b, + c = new this(function (resolve, reject) { + a = resolve; + b = reject; + }); + return { resolve: a, reject: b, promise: c }; +}; diff --git a/vitest.config.ts b/vitest.config.ts index 33dfd01849..d73a917473 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -47,6 +47,7 @@ export default defineConfig({ }, test: { setupFiles: [ + resolve(rootDir, './scripts/setup/polyfill.ts'), resolve(rootDir, './scripts/setup/lit.ts'), resolve(rootDir, './scripts/setup/vi-mock.ts'), resolve(rootDir, './scripts/setup/global.ts'),