Add cron mechanism (#3318)

* Add cron to message queue interfaces

* Add command to launch cron job

* Add command to stop cron job

* Update clean inactive workspaces job

* Isolate cron mechanism

* Code review returns

* Remove useless object.assign

* Add MessageQueuCronJobData interface

* Rename cron job utils

* Fix typing
This commit is contained in:
martmull 2024-01-09 12:23:45 +01:00 committed by GitHub
parent ed06cc0310
commit 361446d79c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 119 additions and 11 deletions

View File

@ -49,6 +49,39 @@ export class BullMQDriver implements MessageQueueDriver {
this.workerMap[queueName] = worker;
}
async addCron<T>(
queueName: MessageQueue,
jobName: string,
data: T,
pattern: string,
options?: QueueJobOptions,
): Promise<void> {
if (!this.queueMap[queueName]) {
throw new Error(
`Queue ${queueName} is not registered, make sure you have added it as a queue provider`,
);
}
const queueOptions = {
jobId: options?.id,
priority: options?.priority,
repeat: {
pattern,
},
};
await this.queueMap[queueName].add(jobName, data, queueOptions);
}
async removeCron(
queueName: MessageQueue,
jobName: string,
pattern: string,
): Promise<void> {
await this.queueMap[queueName].removeRepeatable(jobName, {
pattern,
});
}
async add<T>(
queueName: MessageQueue,
jobName: string,
@ -60,9 +93,8 @@ export class BullMQDriver implements MessageQueueDriver {
`Queue ${queueName} is not registered, make sure you have added it as a queue provider`,
);
}
await this.queueMap[queueName].add(jobName, data, {
jobId: options?.id,
priority: options?.priority,
});
const queueOptions = { jobId: options?.id, priority: options?.priority };
await this.queueMap[queueName].add(jobName, data, queueOptions);
}
}

View File

@ -14,6 +14,14 @@ export interface MessageQueueDriver {
queueName: MessageQueue,
handler: ({ data, id }: { data: T; id: string }) => Promise<void> | void,
);
addCron<T extends MessageQueueJobData | undefined>(
queueName: MessageQueue,
jobName: string,
data: T,
pattern: string,
options?: QueueJobOptions,
);
removeCron(queueName: MessageQueue, jobName: string, pattern?: string);
stop?(): Promise<void>;
register?(queueName: MessageQueue): void;
}

View File

@ -30,6 +30,30 @@ export class PgBossDriver implements MessageQueueDriver {
return this.pgBoss.work(`${queueName}.*`, handler);
}
async addCron<T>(
queueName: MessageQueue,
jobName: string,
data: T,
pattern: string,
options?: QueueJobOptions,
): Promise<void> {
await this.pgBoss.schedule(
`${queueName}.${jobName}`,
pattern,
data as object,
options
? {
...options,
singletonKey: options?.id,
}
: {},
);
}
async removeCron(queueName: MessageQueue, jobName: string): Promise<void> {
await this.pgBoss.unschedule(`${queueName}.${jobName}`);
}
async add<T>(
queueName: MessageQueue,
jobName: string,

View File

@ -1,7 +1,9 @@
import { ModuleRef } from '@nestjs/core';
import { Logger } from '@nestjs/common';
import { MessageQueueDriver } from 'src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface';
import {
MessageQueueCronJobData,
MessageQueueJob,
MessageQueueJobData,
} from 'src/integrations/message-queue/interfaces/message-queue-job.interface';
@ -10,6 +12,7 @@ import { MessageQueue } from 'src/integrations/message-queue/message-queue.const
import { getJobClassName } from 'src/integrations/message-queue/utils/get-job-class-name.util';
export class SyncDriver implements MessageQueueDriver {
private readonly logger = new Logger(SyncDriver.name);
constructor(private readonly jobsModuleRef: ModuleRef) {}
async add<T extends MessageQueueJobData>(
@ -23,7 +26,30 @@ export class SyncDriver implements MessageQueueDriver {
{ strict: true },
);
return await job.handle(data);
await job.handle(data);
}
async addCron<T extends MessageQueueJobData | undefined>(
_queueName: MessageQueue,
jobName: string,
data: T,
pattern: string,
): Promise<void> {
this.logger.log(`Running '${pattern}' cron job with SyncDriver`);
const jobClassName = getJobClassName(jobName);
const job: MessageQueueCronJobData<MessageQueueJobData | undefined> =
this.jobsModuleRef.get(jobClassName, {
strict: true,
});
await job.handle(data);
}
async removeCron(_queueName: MessageQueue, jobName: string) {
this.logger.log(`Removing '${jobName}' cron job with SyncDriver`);
return;
}
work() {

View File

@ -2,6 +2,12 @@ export interface MessageQueueJob<T extends MessageQueueJobData> {
handle(data: T): Promise<void> | void;
}
export interface MessageQueueCronJobData<
T extends MessageQueueJobData | undefined,
> {
handle(data: T): Promise<void> | void;
}
export interface MessageQueueJobData {
[key: string]: any;
}

View File

@ -8,6 +8,7 @@ import { CallWebhookJob } from 'src/workspace/workspace-query-runner/jobs/call-w
import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module';
import { ObjectMetadataModule } from 'src/metadata/object-metadata/object-metadata.module';
import { DataSourceModule } from 'src/metadata/data-source/data-source.module';
import { TypeORMModule } from 'src/database/typeorm/typeorm.module';
import { FetchWorkspaceMessagesModule } from 'src/workspace/messaging/services/fetch-workspace-messages.module';
@Module({
@ -16,6 +17,7 @@ import { FetchWorkspaceMessagesModule } from 'src/workspace/messaging/services/f
ObjectMetadataModule,
DataSourceModule,
HttpModule,
TypeORMModule,
FetchWorkspaceMessagesModule,
],
providers: [

View File

@ -4,4 +4,5 @@ export enum MessageQueue {
taskAssignedQueue = 'task-assigned-queue',
messagingQueue = 'messaging-queue',
webhookQueue = 'webhook-queue',
cronQueue = 'cron-queue',
}

View File

@ -7,8 +7,8 @@ import {
MessageQueueModuleAsyncOptions,
} from 'src/integrations/message-queue/interfaces';
import {
QUEUE_DRIVER,
MessageQueue,
QUEUE_DRIVER,
} from 'src/integrations/message-queue/message-queue.constants';
import { PgBossDriver } from 'src/integrations/message-queue/drivers/pg-boss.driver';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
@ -55,11 +55,7 @@ export class MessageQueueModule {
module: MessageQueueModule,
imports: [JobsModule, ...(options.imports || [])],
providers,
exports: [
MessageQueue.taskAssignedQueue,
MessageQueue.messagingQueue,
MessageQueue.webhookQueue,
],
exports: Object.values(MessageQueue),
};
}
}

View File

@ -34,6 +34,19 @@ export class MessageQueueService implements OnModuleDestroy {
return this.driver.add(this.queueName, jobName, data, options);
}
addCron<T extends MessageQueueJobData | undefined>(
jobName: string,
data: T,
pattern: string,
options?: QueueJobOptions,
): Promise<void> {
return this.driver.addCron(this.queueName, jobName, data, pattern, options);
}
removeCron(jobName: string, pattern: string): Promise<void> {
return this.driver.removeCron(this.queueName, jobName, pattern);
}
work<T extends MessageQueueJobData>(
handler: ({ data, id }: { data: T; id: string }) => Promise<void> | void,
) {