Add pg-boss worker poc (#2991)

* Add pg-boss worker poc

* add Example job

* add retry limit

* rename MessageQueue
This commit is contained in:
Weiko 2023-12-14 18:57:25 +01:00 committed by GitHub
parent 468744298b
commit 36164ab59b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 196 additions and 41 deletions

View File

@ -28,7 +28,8 @@
"database:seed:dev": "yarn build && yarn command workspace:seed:dev", "database:seed:dev": "yarn build && yarn command workspace:seed:dev",
"database:seed:demo": "yarn build && yarn command workspace:seed:demo", "database:seed:demo": "yarn build && yarn command workspace:seed:demo",
"database:reset": "yarn database:truncate && yarn database:init", "database:reset": "yarn database:truncate && yarn database:init",
"command": "node dist/src/command" "command": "node dist/src/command",
"queue:work": "node dist/src/queue-worker"
}, },
"dependencies": { "dependencies": {
"@apollo/server": "^4.7.3", "@apollo/server": "^4.7.3",

View File

@ -1,7 +1,7 @@
import { Module } from '@nestjs/common'; import { Module } from '@nestjs/common';
import { DatabaseCommandModule } from 'src/database/commands/database-command.module'; import { DatabaseCommandModule } from 'src/database/commands/database-command.module';
import { FetchWorkspaceMessagesCommand } from 'src/workspace/messaging/commands/fetch-workspace-messages.command'; import { FetchWorkspaceMessagesCommandsModule } from 'src/workspace/messaging/commands/fetch-workspace-messages-commands.module';
import { AppModule } from './app.module'; import { AppModule } from './app.module';
@ -14,7 +14,7 @@ import { WorkspaceMigrationRunnerCommandsModule } from './workspace/workspace-mi
WorkspaceMigrationRunnerCommandsModule, WorkspaceMigrationRunnerCommandsModule,
WorkspaceSyncMetadataCommandsModule, WorkspaceSyncMetadataCommandsModule,
DatabaseCommandModule, DatabaseCommandModule,
FetchWorkspaceMessagesCommand, FetchWorkspaceMessagesCommandsModule,
], ],
}) })
export class CommandModule {} export class CommandModule {}

View File

@ -2,25 +2,25 @@ import { Queue, QueueOptions, Worker } from 'bullmq';
import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface'; import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface';
import { MessageQueues } from 'src/integrations/message-queue/message-queue.constants'; import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
import { MessageQueueDriver } from './interfaces/message-queue-driver.interface'; import { MessageQueueDriver } from './interfaces/message-queue-driver.interface';
export type BullMQDriverOptions = QueueOptions; export type BullMQDriverOptions = QueueOptions;
export class BullMQDriver implements MessageQueueDriver { export class BullMQDriver implements MessageQueueDriver {
private queueMap: Record<MessageQueues, Queue> = {} as Record< private queueMap: Record<MessageQueue, Queue> = {} as Record<
MessageQueues, MessageQueue,
Queue Queue
>; >;
private workerMap: Record<MessageQueues, Worker> = {} as Record< private workerMap: Record<MessageQueue, Worker> = {} as Record<
MessageQueues, MessageQueue,
Worker Worker
>; >;
constructor(private options: BullMQDriverOptions) {} constructor(private options: BullMQDriverOptions) {}
register(queueName: MessageQueues): void { register(queueName: MessageQueue): void {
this.queueMap[queueName] = new Queue(queueName, this.options); this.queueMap[queueName] = new Queue(queueName, this.options);
} }
@ -35,7 +35,7 @@ export class BullMQDriver implements MessageQueueDriver {
} }
async work<T>( async work<T>(
queueName: MessageQueues, queueName: MessageQueue,
handler: ({ data, id }: { data: T; id: string }) => Promise<void>, handler: ({ data, id }: { data: T; id: string }) => Promise<void>,
) { ) {
const worker = new Worker(queueName, async (job) => { const worker = new Worker(queueName, async (job) => {
@ -46,7 +46,8 @@ export class BullMQDriver implements MessageQueueDriver {
} }
async add<T>( async add<T>(
queueName: MessageQueues, queueName: MessageQueue,
jobName: string,
data: T, data: T,
options?: QueueJobOptions, options?: QueueJobOptions,
): Promise<void> { ): Promise<void> {
@ -55,7 +56,8 @@ export class BullMQDriver implements MessageQueueDriver {
`Queue ${queueName} is not registered, make sure you have added it as a queue provider`, `Queue ${queueName} is not registered, make sure you have added it as a queue provider`,
); );
} }
await this.queueMap[queueName].add(options?.id || '', data, { await this.queueMap[queueName].add(jobName, data, {
jobId: options?.id,
priority: options?.priority, priority: options?.priority,
}); });
} }

View File

@ -1,4 +1,5 @@
export interface QueueJobOptions { export interface QueueJobOptions {
id?: string; id?: string;
priority?: number; priority?: number;
retryLimit?: number;
} }

View File

@ -1,17 +1,18 @@
import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface'; import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface';
import { MessageQueues } from 'src/integrations/message-queue/message-queue.constants'; import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
export interface MessageQueueDriver { export interface MessageQueueDriver {
add<T>( add<T>(
queueName: MessageQueues, queueName: MessageQueue,
jobName: string,
data: T, data: T,
options?: QueueJobOptions, options?: QueueJobOptions,
): Promise<void>; ): Promise<void>;
work<T>( work<T>(
queueName: string, queueName: MessageQueue,
handler: ({ data, id }: { data: T; id: string }) => Promise<void> | void, handler: ({ data, id }: { data: T; id: string }) => Promise<void> | void,
); );
stop?(): Promise<void>; stop?(): Promise<void>;
register?(queueName: MessageQueues): void; register?(queueName: MessageQueue): void;
} }

View File

@ -2,6 +2,8 @@ import PgBoss from 'pg-boss';
import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface'; import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface';
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
import { MessageQueueDriver } from './interfaces/message-queue-driver.interface'; import { MessageQueueDriver } from './interfaces/message-queue-driver.interface';
export type PgBossDriverOptions = PgBoss.ConstructorOptions; export type PgBossDriverOptions = PgBoss.ConstructorOptions;
@ -25,14 +27,19 @@ export class PgBossDriver implements MessageQueueDriver {
queueName: string, queueName: string,
handler: ({ data, id }: { data: T; id: string }) => Promise<void>, handler: ({ data, id }: { data: T; id: string }) => Promise<void>,
) { ) {
return this.pgBoss.work(queueName, handler); return this.pgBoss.work(`${queueName}.*`, handler);
} }
async add<T>( async add<T>(
queueName: string, queueName: MessageQueue,
jobName: string,
data: T, data: T,
options?: QueueJobOptions, options?: QueueJobOptions,
): Promise<void> { ): Promise<void> {
await this.pgBoss.send(queueName, data as object, options ? options : {}); await this.pgBoss.send(
`${queueName}.${jobName}`,
data as object,
options ?? {},
);
} }
} }

View File

@ -0,0 +1,7 @@
export interface MessageQueueJob<T extends MessageQueueJobData> {
handle(data: T): Promise<void> | void;
}
export interface MessageQueueJobData {
[key: string]: any;
}

View File

@ -1,5 +1,6 @@
export const QUEUE_DRIVER = Symbol('QUEUE_DRIVER'); export const QUEUE_DRIVER = Symbol('QUEUE_DRIVER');
export enum MessageQueues { export enum MessageQueue {
taskAssignedQueue = 'task-assigned-queue', taskAssignedQueue = 'task-assigned-queue',
messagingQueue = 'messaging-queue',
} }

View File

@ -8,7 +8,7 @@ import {
} from 'src/integrations/message-queue/interfaces'; } from 'src/integrations/message-queue/interfaces';
import { import {
QUEUE_DRIVER, QUEUE_DRIVER,
MessageQueues, MessageQueue,
} from 'src/integrations/message-queue/message-queue.constants'; } from 'src/integrations/message-queue/message-queue.constants';
import { PgBossDriver } from 'src/integrations/message-queue/drivers/pg-boss.driver'; import { PgBossDriver } from 'src/integrations/message-queue/drivers/pg-boss.driver';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
@ -18,16 +18,13 @@ import { BullMQDriver } from 'src/integrations/message-queue/drivers/bullmq.driv
export class MessageQueueModule { export class MessageQueueModule {
static forRoot(options: MessageQueueModuleAsyncOptions): DynamicModule { static forRoot(options: MessageQueueModuleAsyncOptions): DynamicModule {
const providers = [ const providers = [
{ ...Object.values(MessageQueue).map((queue) => ({
provide: MessageQueues.taskAssignedQueue, provide: queue,
useFactory: (driver: MessageQueueDriver) => { useFactory: (driver: MessageQueueDriver) => {
return new MessageQueueService( return new MessageQueueService(driver, queue);
driver,
MessageQueues.taskAssignedQueue,
);
}, },
inject: [QUEUE_DRIVER], inject: [QUEUE_DRIVER],
}, })),
{ {
provide: QUEUE_DRIVER, provide: QUEUE_DRIVER,
useFactory: async (...args: any[]) => { useFactory: async (...args: any[]) => {
@ -51,7 +48,7 @@ export class MessageQueueModule {
module: MessageQueueModule, module: MessageQueueModule,
imports: options.imports || [], imports: options.imports || [],
providers, providers,
exports: [MessageQueues.taskAssignedQueue], exports: [MessageQueue.taskAssignedQueue, MessageQueue.messagingQueue],
}; };
} }
} }

View File

@ -4,7 +4,7 @@ import { MessageQueueDriver } from 'src/integrations/message-queue/drivers/inter
import { import {
QUEUE_DRIVER, QUEUE_DRIVER,
MessageQueues, MessageQueue,
} from 'src/integrations/message-queue/message-queue.constants'; } from 'src/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
@ -15,11 +15,11 @@ describe('MessageQueueTaskAssigned queue', () => {
const module: TestingModule = await Test.createTestingModule({ const module: TestingModule = await Test.createTestingModule({
providers: [ providers: [
{ {
provide: MessageQueues.taskAssignedQueue, provide: MessageQueue.taskAssignedQueue,
useFactory: (driver: MessageQueueDriver) => { useFactory: (driver: MessageQueueDriver) => {
return new MessageQueueService( return new MessageQueueService(
driver, driver,
MessageQueues.taskAssignedQueue, MessageQueue.taskAssignedQueue,
); );
}, },
inject: [QUEUE_DRIVER], inject: [QUEUE_DRIVER],
@ -31,7 +31,7 @@ describe('MessageQueueTaskAssigned queue', () => {
], ],
}).compile(); }).compile();
service = module.get<MessageQueueService>(MessageQueues.taskAssignedQueue); service = module.get<MessageQueueService>(MessageQueue.taskAssignedQueue);
}); });
it('should be defined', () => { it('should be defined', () => {
@ -40,7 +40,7 @@ describe('MessageQueueTaskAssigned queue', () => {
it('should contain the topic and driver', () => { it('should contain the topic and driver', () => {
expect(service).toEqual({ expect(service).toEqual({
driver: {}, driver: {},
queueName: MessageQueues.taskAssignedQueue, queueName: MessageQueue.taskAssignedQueue,
}); });
}); });
}); });

View File

@ -2,9 +2,10 @@ import { Inject, Injectable, OnModuleDestroy } from '@nestjs/common';
import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface'; import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface';
import { MessageQueueDriver } from 'src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface'; import { MessageQueueDriver } from 'src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface';
import { MessageQueueJobData } from 'src/integrations/message-queue/interfaces/message-queue-job.interface';
import { import {
MessageQueues, MessageQueue,
QUEUE_DRIVER, QUEUE_DRIVER,
} from 'src/integrations/message-queue/message-queue.constants'; } from 'src/integrations/message-queue/message-queue.constants';
@ -12,7 +13,7 @@ import {
export class MessageQueueService implements OnModuleDestroy { export class MessageQueueService implements OnModuleDestroy {
constructor( constructor(
@Inject(QUEUE_DRIVER) protected driver: MessageQueueDriver, @Inject(QUEUE_DRIVER) protected driver: MessageQueueDriver,
protected queueName: MessageQueues, protected queueName: MessageQueue,
) { ) {
if (typeof this.driver.register === 'function') { if (typeof this.driver.register === 'function') {
this.driver.register(queueName); this.driver.register(queueName);
@ -25,11 +26,15 @@ export class MessageQueueService implements OnModuleDestroy {
} }
} }
add<T>(data: T, options?: QueueJobOptions): Promise<void> { add<T extends MessageQueueJobData>(
return this.driver.add(this.queueName, data, options); jobName: string,
data: T,
options?: QueueJobOptions,
): Promise<void> {
return this.driver.add(this.queueName, jobName, data, options);
} }
work<T>( work<T extends MessageQueueJobData>(
handler: ({ data, id }: { data: T; id: string }) => Promise<void> | void, handler: ({ data, id }: { data: T; id: string }) => Promise<void> | void,
) { ) {
return this.driver.work(this.queueName, handler); return this.driver.work(this.queueName, handler);

View File

@ -0,0 +1,30 @@
import { Module } from '@nestjs/common';
import { EnvironmentModule } from 'src/integrations/environment/environment.module';
import { EnvironmentService } from 'src/integrations/environment/environment.service';
import { LoggerModule } from 'src/integrations/logger/logger.module';
import { loggerModuleFactory } from 'src/integrations/logger/logger.module-factory';
import { MessageQueueModule } from 'src/integrations/message-queue/message-queue.module';
import { messageQueueModuleFactory } from 'src/integrations/message-queue/message-queue.module-factory';
import { FetchMessagesJob } from 'src/workspace/messaging/jobs/fetch-messages.job';
@Module({
imports: [
EnvironmentModule.forRoot({}),
LoggerModule.forRootAsync({
useFactory: loggerModuleFactory,
inject: [EnvironmentService],
}),
MessageQueueModule.forRoot({
useFactory: messageQueueModuleFactory,
inject: [EnvironmentService],
}),
],
providers: [
{
provide: FetchMessagesJob.name,
useClass: FetchMessagesJob,
},
],
})
export class QueueWorkerModule {}

View File

@ -0,0 +1,34 @@
import { NestFactory } from '@nestjs/core';
import {
MessageQueueJob,
MessageQueueJobData,
} from 'src/integrations/message-queue/interfaces/message-queue-job.interface';
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
import { QueueWorkerModule } from 'src/queue-worker.module';
async function bootstrap() {
const app = await NestFactory.createApplicationContext(QueueWorkerModule);
for (const queueName of Object.values(MessageQueue)) {
const messageQueueService: MessageQueueService = app.get(queueName);
await messageQueueService.work(async (jobData: MessageQueueJobData) => {
const jobClassName = getJobClassName(jobData.name);
const job: MessageQueueJob<MessageQueueJobData> = app
.select(QueueWorkerModule)
.get(jobClassName, { strict: true });
await job.handle(jobData.data);
});
}
}
bootstrap();
function getJobClassName(name: string): string {
const [, jobName] = name.split('.') ?? [];
return jobName ?? name;
}

View File

@ -1,9 +1,10 @@
import { Module } from '@nestjs/common'; import { Module } from '@nestjs/common';
import { FetchWorkspaceMessagesCommand } from 'src/workspace/messaging/commands/fetch-workspace-messages.command'; import { FetchWorkspaceMessagesCommand } from 'src/workspace/messaging/commands/fetch-workspace-messages.command';
import { MessagingModule } from 'src/workspace/messaging/messaging.module';
@Module({ @Module({
imports: [], imports: [MessagingModule],
providers: [FetchWorkspaceMessagesCommand], providers: [FetchWorkspaceMessagesCommand],
}) })
export class FetchWorkspaceMessagesCommandsModule {} export class FetchWorkspaceMessagesCommandsModule {}

View File

@ -1,5 +1,7 @@
import { Command, CommandRunner, Option } from 'nest-commander'; import { Command, CommandRunner, Option } from 'nest-commander';
import { MessagingProducer } from 'src/workspace/messaging/producers/messaging-producer';
interface FetchWorkspaceMessagesOptions { interface FetchWorkspaceMessagesOptions {
workspaceId: string; workspaceId: string;
} }
@ -9,11 +11,18 @@ interface FetchWorkspaceMessagesOptions {
description: 'Fetch messages of all workspaceMembers in a workspace.', description: 'Fetch messages of all workspaceMembers in a workspace.',
}) })
export class FetchWorkspaceMessagesCommand extends CommandRunner { export class FetchWorkspaceMessagesCommand extends CommandRunner {
constructor(private readonly messagingProducer: MessagingProducer) {
super();
}
async run( async run(
_passedParam: string[], _passedParam: string[],
options: FetchWorkspaceMessagesOptions, options: FetchWorkspaceMessagesOptions,
): Promise<void> { ): Promise<void> {
console.log('fetching messages for workspace', options.workspaceId); await this.messagingProducer.enqueueFetchMessages(
{ workspaceId: options.workspaceId },
options.workspaceId,
);
return; return;
} }

View File

@ -0,0 +1,22 @@
import { Injectable } from '@nestjs/common';
import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/message-queue-job.interface';
import { EnvironmentService } from 'src/integrations/environment/environment.service';
export type FetchMessagesJobData = {
workspaceId: string;
};
@Injectable()
export class FetchMessagesJob implements MessageQueueJob<FetchMessagesJobData> {
constructor(private readonly environmentService: EnvironmentService) {}
async handle(data: FetchMessagesJobData): Promise<void> {
console.log(
`fetching messages for workspace ${
data.workspaceId
} with ${this.environmentService.getMessageQueueDriverType()}`,
);
}
}

View File

@ -0,0 +1,10 @@
import { Module } from '@nestjs/common';
import { MessagingProducer } from 'src/workspace/messaging/producers/messaging-producer';
@Module({
imports: [],
providers: [MessagingProducer],
exports: [MessagingProducer],
})
export class MessagingModule {}

View File

@ -0,0 +1,27 @@
import { Inject, Injectable } from '@nestjs/common';
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
import {
FetchMessagesJob,
FetchMessagesJobData,
} from 'src/workspace/messaging/jobs/fetch-messages.job';
@Injectable()
export class MessagingProducer {
constructor(
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
) {}
async enqueueFetchMessages(data: FetchMessagesJobData, singletonKey: string) {
await this.messageQueueService.add<FetchMessagesJobData>(
FetchMessagesJob.name,
data,
{
id: singletonKey,
retryLimit: 2,
},
);
}
}