feat(infra): job system (#7212)

This commit is contained in:
EYHN 2024-07-02 09:17:39 +00:00
parent 0957c30e74
commit 3870801ebb
No known key found for this signature in database
GPG Key ID: 46C9E26A75AB276C
8 changed files with 623 additions and 0 deletions

View File

@ -0,0 +1,47 @@
# job
Job system abstraction for AFFiNE. Currently, only `IndexedDBJobQueue` is implemented; more backends will be implemented in the future.
Run background jobs in browser & distributed environment. `runners` can consume tasks simultaneously without additional communication.
# Basic Usage
```ts
const queue = new IndexedDBJobQueue('my-queue');
await queue.enqueue([
{
batchKey: '1',
payload: { a: 'hello' },
},
{
batchKey: '2',
payload: { a: 'world' },
},
]);
const runner = new JobRunner(queue, job => {
console.log(job);
});
runner.start();
// Output:
// { batchKey: '1', payload: { a: 'hello' } }
// { batchKey: '2', payload: { a: 'world' } }
```
## `batchKey`
Each job has a `batchKey`, and jobs with the same `batchKey` are handed over to one `runner` for execution at once.
Additionally, if there are ongoing jobs with the same `batchKey`, other `runners` will not take on jobs with this `batchKey`, ensuring exclusive resource locking.
> In the future, `batchKey` will be used to implement priority.
## `timeout`
If the job execution time exceeds 30 seconds, it will be considered a timeout and reassigned to another `runner`.
## Error Handling
If an error is thrown during job execution, will log an error, but the job will be considered complete.

View File

@ -0,0 +1,231 @@
/**
* @vitest-environment happy-dom
*/
import 'fake-indexeddb/auto';
import { afterEach, beforeEach, describe, expect, test, vitest } from 'vitest';
import { IndexedDBJobQueue } from '../impl/indexeddb';
import type { JobQueue } from '../queue';
let queue: JobQueue<{
a: string;
}> = null!;
describe.each([{ name: 'idb', backend: IndexedDBJobQueue }])(
'impl tests($name)',
({ backend }) => {
beforeEach(async () => {
queue = new backend();
await queue.clear();
vitest.useFakeTimers({
toFake: ['Date'],
});
});
afterEach(() => {
vitest.useRealTimers();
});
test('basic', async () => {
await queue.enqueue([
{
batchKey: '1',
payload: { a: 'hello' },
},
{
batchKey: '2',
payload: { a: 'world' },
},
]);
const job1 = await queue.accept();
const job2 = await queue.accept();
expect([job1!, job2!]).toEqual([
[
{
id: expect.any(String),
batchKey: '1',
payload: { a: 'hello' },
},
],
[
{
id: expect.any(String),
batchKey: '2',
payload: { a: 'world' },
},
],
]);
const job3 = await queue.accept();
expect(job3).toBeNull();
await queue.return(job1!);
await queue.return(job2!);
});
test('batch', async () => {
await queue.enqueue([
{
batchKey: '1',
payload: { a: 'hello' },
},
{
batchKey: '1',
payload: { a: 'world' },
},
]);
const job1 = await queue.accept();
expect(job1).toEqual(
expect.arrayContaining([
{
id: expect.any(String),
batchKey: '1',
payload: { a: 'hello' },
},
{
id: expect.any(String),
batchKey: '1',
payload: { a: 'world' },
},
])
);
});
test('timeout', async () => {
await queue.enqueue([
{
batchKey: '1',
payload: { a: 'hello' },
},
]);
{
const job = await queue.accept();
expect(job).toEqual([
{
id: expect.any(String),
batchKey: '1',
payload: { a: 'hello' },
},
]);
}
{
const job = await queue.accept();
expect(job).toBeNull();
}
vitest.advanceTimersByTime(1000 * 60 * 60);
{
const job = await queue.accept();
expect(job).toEqual([
{
id: expect.any(String),
batchKey: '1',
payload: { a: 'hello' },
},
]);
}
});
test('waitForAccept', async () => {
const abort = new AbortController();
let result = null as any;
queue.waitForAccept(abort.signal).then(jobs => (result = jobs));
await new Promise(resolve => setTimeout(resolve, 100));
expect(result).toBeNull();
await queue.enqueue([
{
batchKey: '1',
payload: { a: 'hello' },
},
]);
await vitest.waitFor(() => {
expect(result).toEqual([
{
id: expect.any(String),
batchKey: '1',
payload: { a: 'hello' },
},
]);
});
});
test('waitForAccept race', async () => {
const abort = new AbortController();
let result1 = null as any;
let result2 = null as any;
queue.waitForAccept(abort.signal).then(jobs => (result1 = jobs));
queue.waitForAccept(abort.signal).then(jobs => (result2 = jobs));
await new Promise(resolve => setTimeout(resolve, 100));
expect(result1).toBeNull();
expect(result2).toBeNull();
await queue.enqueue([
{
batchKey: '1',
payload: { a: 'hello' },
},
]);
await new Promise(resolve => setTimeout(resolve, 100));
expect([result1, result2]).toEqual(
expect.arrayContaining([
[
{
id: expect.any(String),
batchKey: '1',
payload: { a: 'hello' },
},
],
null,
])
);
await queue.enqueue([
{
batchKey: '2',
payload: { a: 'world' },
},
]);
await vitest.waitFor(() => {
expect([result1, result2]).toEqual(
expect.arrayContaining([
[
{
id: expect.any(String),
batchKey: '1',
payload: { a: 'hello' },
},
],
[
{
id: expect.any(String),
batchKey: '2',
payload: { a: 'world' },
},
],
])
);
});
});
}
);

View File

@ -0,0 +1,242 @@
import type { DBSchema, IDBPDatabase } from 'idb';
import { openDB } from 'idb';
import { merge, Observable, of, throttleTime } from 'rxjs';
import { fromPromise } from '../../../../livedata';
import { throwIfAborted } from '../../../../utils';
import { exhaustMapWithTrailing } from '../../../../utils/exhaustmap-with-trailing';
import type { Job, JobParams, JobQueue } from '../../';
interface IndexDB extends DBSchema {
jobs: {
key: number;
value: JobRecord;
indexes: {
batchKey: string;
};
};
}
interface JobRecord {
batchKey: string;
startTime: number | null;
payload: any;
}
export class IndexedDBJobQueue<J> implements JobQueue<J> {
database: IDBPDatabase<IndexDB> = null as any;
broadcast = new BroadcastChannel('idb-job-queue:' + this.databaseName);
constructor(private readonly databaseName: string = 'jobs') {}
async enqueue(jobs: JobParams[]): Promise<void> {
await this.ensureInitialized();
const trx = this.database.transaction(['jobs'], 'readwrite');
for (const job of jobs) {
await trx.objectStore('jobs').add({
batchKey: job.batchKey,
payload: job.payload,
startTime: null,
});
}
trx.commit();
// send broadcast to notify new jobs
this.broadcast.postMessage('new-jobs');
}
async accept(): Promise<Job[] | null> {
await this.ensureInitialized();
const jobs = [];
const trx = this.database.transaction(['jobs'], 'readwrite');
// if no priority jobs
if (jobs.length === 0) {
const batchKeys = trx.objectStore('jobs').index('batchKey').iterate();
let currentBatchKey: string = null as any;
let currentBatchJobs = [];
let skipCurrentBatch = false;
for await (const item of batchKeys) {
if (item.value.batchKey !== currentBatchKey) {
if (!skipCurrentBatch && currentBatchJobs.length > 0) {
break;
}
currentBatchKey = item.value.batchKey;
currentBatchJobs = [];
skipCurrentBatch = false;
}
if (skipCurrentBatch) {
continue;
}
if (this.isAcceptable(item.value)) {
currentBatchJobs.push({
id: item.primaryKey,
job: item.value,
});
} else {
skipCurrentBatch = true;
}
}
if (skipCurrentBatch === false && currentBatchJobs.length > 0) {
jobs.push(...currentBatchJobs);
}
}
for (const { id, job } of jobs) {
const startTime = Date.now();
await trx.objectStore('jobs').put({ ...job, startTime }, id);
}
if (jobs.length === 0) {
return null;
}
return jobs.map(({ id, job }) => ({
id: id.toString(),
batchKey: job.batchKey,
payload: job.payload,
}));
}
async waitForAccept(signal: AbortSignal): Promise<Job<J>[]> {
const broadcast = new BroadcastChannel(
'idb-job-queue:' + this.databaseName
);
try {
let deferred = Promise.withResolvers<void>();
broadcast.onmessage = () => {
deferred.resolve();
};
while (throwIfAborted(signal)) {
const jobs = await this.accept();
if (jobs !== null) {
return jobs;
}
await Promise.race([
deferred.promise,
new Promise(resolve => {
setTimeout(resolve, 5000);
}),
new Promise((_, reject) => {
// exit if manually stopped
if (signal?.aborted) {
reject(signal.reason);
}
signal?.addEventListener('abort', () => {
reject(signal.reason);
});
}),
]);
deferred = Promise.withResolvers<void>();
}
return [];
} finally {
broadcast.close();
}
}
async complete(jobs: Job[]): Promise<void> {
await this.ensureInitialized();
const trx = this.database.transaction(['jobs'], 'readwrite');
for (const { id } of jobs) {
await trx
.objectStore('jobs')
.delete(typeof id === 'string' ? parseInt(id) : id);
}
}
async return(jobs: Job[], retry: boolean = false): Promise<void> {
await this.ensureInitialized();
const trx = this.database.transaction(['jobs'], 'readwrite');
for (const { id } of jobs) {
if (retry) {
const nid = typeof id === 'string' ? parseInt(id) : id;
const job = await trx.objectStore('jobs').get(nid);
if (job) {
await trx.objectStore('jobs').put({ ...job, startTime: null }, nid);
}
} else {
await trx
.objectStore('jobs')
.delete(typeof id === 'string' ? parseInt(id) : id);
}
}
}
async clear(): Promise<void> {
await this.ensureInitialized();
const trx = this.database.transaction(['jobs'], 'readwrite');
await trx.objectStore('jobs').clear();
}
private async ensureInitialized(): Promise<void> {
if (!this.database) {
await this.initialize();
}
}
private async initialize(): Promise<void> {
if (this.database) {
return;
}
this.database = await openDB(this.databaseName, 1, {
upgrade(database) {
const jobs = database.createObjectStore('jobs', {
autoIncrement: true,
});
jobs.createIndex('batchKey', 'batchKey');
},
});
}
TIMEOUT = 1000 * 30 /* 30 seconds */;
private isTimeout(job: JobRecord) {
return job.startTime !== null && job.startTime + this.TIMEOUT < Date.now();
}
private isAcceptable(job: JobRecord) {
return job.startTime === null || this.isTimeout(job);
}
get status$() {
return merge(
of(1),
new Observable(subscriber => {
const broadcast = new BroadcastChannel(
'idb-job-queue:' + this.databaseName
);
broadcast.onmessage = () => {
subscriber.next(1);
};
return () => {
broadcast.close();
};
})
).pipe(
throttleTime(300, undefined, { leading: true, trailing: true }),
exhaustMapWithTrailing(() =>
fromPromise(async () => {
const trx = this.database.transaction(['jobs'], 'readonly');
const remaining = await trx.objectStore('jobs').count();
console.log(remaining);
return { remaining };
})
)
);
}
}

View File

@ -0,0 +1,2 @@
export * from './queue';
export * from './runner';

View File

@ -0,0 +1,28 @@
import type { Observable } from 'rxjs';
export interface JobParams<Payload = any> {
batchKey: string;
payload: Payload;
}
export interface Job<Payload = any> extends JobParams<Payload> {
id: string;
}
export interface JobQueueStatus {
remaining: number;
}
export interface JobQueue<Payload> {
enqueue(jobs: JobParams<Payload>[]): Promise<void>;
accept(): Promise<Job<Payload>[] | null>;
waitForAccept(signal: AbortSignal): Promise<Job<Payload>[]>;
return(jobs: Job<Payload>[], retry?: boolean): Promise<void>;
clear(): Promise<void>;
status$: Observable<JobQueueStatus>;
}

View File

@ -0,0 +1,60 @@
import { DebugLogger } from '@affine/debug';
import { MANUALLY_STOP, throwIfAborted } from '../../utils';
import type { Job, JobQueue } from './queue';
const logger = new DebugLogger('job-runner');
export class JobRunner<J> {
abort: AbortController | null = null;
constructor(
private readonly queue: JobQueue<J>,
private readonly worker: (
jobs: Job<J>[],
signal: AbortSignal
) => Promise<void>,
private readonly interval: () => Promise<void> = async () => {}
) {}
start() {
this.stop();
this.abort = new AbortController();
this.loop(this.abort.signal).catch(err => {
if (err === MANUALLY_STOP) {
return;
}
logger.error(err);
});
}
stop() {
this.abort?.abort();
this.abort = null;
}
async loop(signal: AbortSignal) {
while (throwIfAborted(signal)) {
const jobs = await this.queue.waitForAccept(signal);
if (jobs !== null) {
try {
await this.worker(jobs, signal);
await this.queue.return(jobs);
} catch (err) {
if (err === MANUALLY_STOP) {
await this.queue.return(jobs, true);
} else {
// TODO: retry logic
await this.queue.return(jobs);
}
logger.error('Error processing jobs', err);
}
} else {
await new Promise(resolve => setTimeout(resolve, 1000));
}
await this.interval();
}
}
}

12
scripts/setup/polyfill.ts Normal file
View File

@ -0,0 +1,12 @@
/* eslint-disable */
// @ts-nocheck
Promise.withResolvers ??= function withResolvers() {
var a,
b,
c = new this(function (resolve, reject) {
a = resolve;
b = reject;
});
return { resolve: a, reject: b, promise: c };
};

View File

@ -47,6 +47,7 @@ export default defineConfig({
},
test: {
setupFiles: [
resolve(rootDir, './scripts/setup/polyfill.ts'),
resolve(rootDir, './scripts/setup/lit.ts'),
resolve(rootDir, './scripts/setup/vi-mock.ts'),
resolve(rootDir, './scripts/setup/global.ts'),