diff --git a/packages/twenty-server/package.json b/packages/twenty-server/package.json index 491b3ca2e7..dc4af45f89 100644 --- a/packages/twenty-server/package.json +++ b/packages/twenty-server/package.json @@ -28,7 +28,8 @@ "database:seed:dev": "yarn build && yarn command workspace:seed:dev", "database:seed:demo": "yarn build && yarn command workspace:seed:demo", "database:reset": "yarn database:truncate && yarn database:init", - "command": "node dist/src/command" + "command": "node dist/src/command", + "queue:work": "node dist/src/queue-worker" }, "dependencies": { "@apollo/server": "^4.7.3", diff --git a/packages/twenty-server/src/command.module.ts b/packages/twenty-server/src/command.module.ts index c13c814c43..738e562e99 100644 --- a/packages/twenty-server/src/command.module.ts +++ b/packages/twenty-server/src/command.module.ts @@ -1,7 +1,7 @@ import { Module } from '@nestjs/common'; import { DatabaseCommandModule } from 'src/database/commands/database-command.module'; -import { FetchWorkspaceMessagesCommand } from 'src/workspace/messaging/commands/fetch-workspace-messages.command'; +import { FetchWorkspaceMessagesCommandsModule } from 'src/workspace/messaging/commands/fetch-workspace-messages-commands.module'; import { AppModule } from './app.module'; @@ -14,7 +14,7 @@ import { WorkspaceMigrationRunnerCommandsModule } from './workspace/workspace-mi WorkspaceMigrationRunnerCommandsModule, WorkspaceSyncMetadataCommandsModule, DatabaseCommandModule, - FetchWorkspaceMessagesCommand, + FetchWorkspaceMessagesCommandsModule, ], }) export class CommandModule {} diff --git a/packages/twenty-server/src/integrations/message-queue/drivers/bullmq.driver.ts b/packages/twenty-server/src/integrations/message-queue/drivers/bullmq.driver.ts index 4f619125cd..aecc2802e0 100644 --- a/packages/twenty-server/src/integrations/message-queue/drivers/bullmq.driver.ts +++ b/packages/twenty-server/src/integrations/message-queue/drivers/bullmq.driver.ts @@ -2,25 +2,25 @@ import { Queue, QueueOptions, Worker } from 'bullmq'; import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface'; -import { MessageQueues } from 'src/integrations/message-queue/message-queue.constants'; +import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; import { MessageQueueDriver } from './interfaces/message-queue-driver.interface'; export type BullMQDriverOptions = QueueOptions; export class BullMQDriver implements MessageQueueDriver { - private queueMap: Record = {} as Record< - MessageQueues, + private queueMap: Record = {} as Record< + MessageQueue, Queue >; - private workerMap: Record = {} as Record< - MessageQueues, + private workerMap: Record = {} as Record< + MessageQueue, Worker >; constructor(private options: BullMQDriverOptions) {} - register(queueName: MessageQueues): void { + register(queueName: MessageQueue): void { this.queueMap[queueName] = new Queue(queueName, this.options); } @@ -35,7 +35,7 @@ export class BullMQDriver implements MessageQueueDriver { } async work( - queueName: MessageQueues, + queueName: MessageQueue, handler: ({ data, id }: { data: T; id: string }) => Promise, ) { const worker = new Worker(queueName, async (job) => { @@ -46,7 +46,8 @@ export class BullMQDriver implements MessageQueueDriver { } async add( - queueName: MessageQueues, + queueName: MessageQueue, + jobName: string, data: T, options?: QueueJobOptions, ): Promise { @@ -55,7 +56,8 @@ export class BullMQDriver implements MessageQueueDriver { `Queue ${queueName} is not registered, make sure you have added it as a queue provider`, ); } - await this.queueMap[queueName].add(options?.id || '', data, { + await this.queueMap[queueName].add(jobName, data, { + jobId: options?.id, priority: options?.priority, }); } diff --git a/packages/twenty-server/src/integrations/message-queue/drivers/interfaces/job-options.interface.ts b/packages/twenty-server/src/integrations/message-queue/drivers/interfaces/job-options.interface.ts index f2cd28a24e..304c0648ac 100644 --- a/packages/twenty-server/src/integrations/message-queue/drivers/interfaces/job-options.interface.ts +++ b/packages/twenty-server/src/integrations/message-queue/drivers/interfaces/job-options.interface.ts @@ -1,4 +1,5 @@ export interface QueueJobOptions { id?: string; priority?: number; + retryLimit?: number; } diff --git a/packages/twenty-server/src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface.ts b/packages/twenty-server/src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface.ts index a0ed9e6d5e..59c21c36d6 100644 --- a/packages/twenty-server/src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface.ts +++ b/packages/twenty-server/src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface.ts @@ -1,17 +1,18 @@ import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface'; -import { MessageQueues } from 'src/integrations/message-queue/message-queue.constants'; +import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; export interface MessageQueueDriver { add( - queueName: MessageQueues, + queueName: MessageQueue, + jobName: string, data: T, options?: QueueJobOptions, ): Promise; work( - queueName: string, + queueName: MessageQueue, handler: ({ data, id }: { data: T; id: string }) => Promise | void, ); stop?(): Promise; - register?(queueName: MessageQueues): void; + register?(queueName: MessageQueue): void; } diff --git a/packages/twenty-server/src/integrations/message-queue/drivers/pg-boss.driver.ts b/packages/twenty-server/src/integrations/message-queue/drivers/pg-boss.driver.ts index 6755c2cb34..63bd312bb1 100644 --- a/packages/twenty-server/src/integrations/message-queue/drivers/pg-boss.driver.ts +++ b/packages/twenty-server/src/integrations/message-queue/drivers/pg-boss.driver.ts @@ -2,6 +2,8 @@ import PgBoss from 'pg-boss'; import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface'; +import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; + import { MessageQueueDriver } from './interfaces/message-queue-driver.interface'; export type PgBossDriverOptions = PgBoss.ConstructorOptions; @@ -25,14 +27,19 @@ export class PgBossDriver implements MessageQueueDriver { queueName: string, handler: ({ data, id }: { data: T; id: string }) => Promise, ) { - return this.pgBoss.work(queueName, handler); + return this.pgBoss.work(`${queueName}.*`, handler); } async add( - queueName: string, + queueName: MessageQueue, + jobName: string, data: T, options?: QueueJobOptions, ): Promise { - await this.pgBoss.send(queueName, data as object, options ? options : {}); + await this.pgBoss.send( + `${queueName}.${jobName}`, + data as object, + options ?? {}, + ); } } diff --git a/packages/twenty-server/src/integrations/message-queue/interfaces/message-queue-job.interface.ts b/packages/twenty-server/src/integrations/message-queue/interfaces/message-queue-job.interface.ts new file mode 100644 index 0000000000..9328cb1bd7 --- /dev/null +++ b/packages/twenty-server/src/integrations/message-queue/interfaces/message-queue-job.interface.ts @@ -0,0 +1,7 @@ +export interface MessageQueueJob { + handle(data: T): Promise | void; +} + +export interface MessageQueueJobData { + [key: string]: any; +} diff --git a/packages/twenty-server/src/integrations/message-queue/message-queue.constants.ts b/packages/twenty-server/src/integrations/message-queue/message-queue.constants.ts index 849e402b42..33d82c604c 100644 --- a/packages/twenty-server/src/integrations/message-queue/message-queue.constants.ts +++ b/packages/twenty-server/src/integrations/message-queue/message-queue.constants.ts @@ -1,5 +1,6 @@ export const QUEUE_DRIVER = Symbol('QUEUE_DRIVER'); -export enum MessageQueues { +export enum MessageQueue { taskAssignedQueue = 'task-assigned-queue', + messagingQueue = 'messaging-queue', } diff --git a/packages/twenty-server/src/integrations/message-queue/message-queue.module.ts b/packages/twenty-server/src/integrations/message-queue/message-queue.module.ts index a0f4d5a720..5a7f9c74a9 100644 --- a/packages/twenty-server/src/integrations/message-queue/message-queue.module.ts +++ b/packages/twenty-server/src/integrations/message-queue/message-queue.module.ts @@ -8,7 +8,7 @@ import { } from 'src/integrations/message-queue/interfaces'; import { QUEUE_DRIVER, - MessageQueues, + MessageQueue, } from 'src/integrations/message-queue/message-queue.constants'; import { PgBossDriver } from 'src/integrations/message-queue/drivers/pg-boss.driver'; import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; @@ -18,16 +18,13 @@ import { BullMQDriver } from 'src/integrations/message-queue/drivers/bullmq.driv export class MessageQueueModule { static forRoot(options: MessageQueueModuleAsyncOptions): DynamicModule { const providers = [ - { - provide: MessageQueues.taskAssignedQueue, + ...Object.values(MessageQueue).map((queue) => ({ + provide: queue, useFactory: (driver: MessageQueueDriver) => { - return new MessageQueueService( - driver, - MessageQueues.taskAssignedQueue, - ); + return new MessageQueueService(driver, queue); }, inject: [QUEUE_DRIVER], - }, + })), { provide: QUEUE_DRIVER, useFactory: async (...args: any[]) => { @@ -51,7 +48,7 @@ export class MessageQueueModule { module: MessageQueueModule, imports: options.imports || [], providers, - exports: [MessageQueues.taskAssignedQueue], + exports: [MessageQueue.taskAssignedQueue, MessageQueue.messagingQueue], }; } } diff --git a/packages/twenty-server/src/integrations/message-queue/services/message-queue-task-assigned.service.spec.ts b/packages/twenty-server/src/integrations/message-queue/services/message-queue-task-assigned.service.spec.ts index 8244b94c31..03d7dfcb4c 100644 --- a/packages/twenty-server/src/integrations/message-queue/services/message-queue-task-assigned.service.spec.ts +++ b/packages/twenty-server/src/integrations/message-queue/services/message-queue-task-assigned.service.spec.ts @@ -4,7 +4,7 @@ import { MessageQueueDriver } from 'src/integrations/message-queue/drivers/inter import { QUEUE_DRIVER, - MessageQueues, + MessageQueue, } from 'src/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; @@ -15,11 +15,11 @@ describe('MessageQueueTaskAssigned queue', () => { const module: TestingModule = await Test.createTestingModule({ providers: [ { - provide: MessageQueues.taskAssignedQueue, + provide: MessageQueue.taskAssignedQueue, useFactory: (driver: MessageQueueDriver) => { return new MessageQueueService( driver, - MessageQueues.taskAssignedQueue, + MessageQueue.taskAssignedQueue, ); }, inject: [QUEUE_DRIVER], @@ -31,7 +31,7 @@ describe('MessageQueueTaskAssigned queue', () => { ], }).compile(); - service = module.get(MessageQueues.taskAssignedQueue); + service = module.get(MessageQueue.taskAssignedQueue); }); it('should be defined', () => { @@ -40,7 +40,7 @@ describe('MessageQueueTaskAssigned queue', () => { it('should contain the topic and driver', () => { expect(service).toEqual({ driver: {}, - queueName: MessageQueues.taskAssignedQueue, + queueName: MessageQueue.taskAssignedQueue, }); }); }); diff --git a/packages/twenty-server/src/integrations/message-queue/services/message-queue.service.ts b/packages/twenty-server/src/integrations/message-queue/services/message-queue.service.ts index 28248478ab..4c5402f5c2 100644 --- a/packages/twenty-server/src/integrations/message-queue/services/message-queue.service.ts +++ b/packages/twenty-server/src/integrations/message-queue/services/message-queue.service.ts @@ -2,9 +2,10 @@ import { Inject, Injectable, OnModuleDestroy } from '@nestjs/common'; import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface'; import { MessageQueueDriver } from 'src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface'; +import { MessageQueueJobData } from 'src/integrations/message-queue/interfaces/message-queue-job.interface'; import { - MessageQueues, + MessageQueue, QUEUE_DRIVER, } from 'src/integrations/message-queue/message-queue.constants'; @@ -12,7 +13,7 @@ import { export class MessageQueueService implements OnModuleDestroy { constructor( @Inject(QUEUE_DRIVER) protected driver: MessageQueueDriver, - protected queueName: MessageQueues, + protected queueName: MessageQueue, ) { if (typeof this.driver.register === 'function') { this.driver.register(queueName); @@ -25,11 +26,15 @@ export class MessageQueueService implements OnModuleDestroy { } } - add(data: T, options?: QueueJobOptions): Promise { - return this.driver.add(this.queueName, data, options); + add( + jobName: string, + data: T, + options?: QueueJobOptions, + ): Promise { + return this.driver.add(this.queueName, jobName, data, options); } - work( + work( handler: ({ data, id }: { data: T; id: string }) => Promise | void, ) { return this.driver.work(this.queueName, handler); diff --git a/packages/twenty-server/src/queue-worker.module.ts b/packages/twenty-server/src/queue-worker.module.ts new file mode 100644 index 0000000000..6c7018e48a --- /dev/null +++ b/packages/twenty-server/src/queue-worker.module.ts @@ -0,0 +1,30 @@ +import { Module } from '@nestjs/common'; + +import { EnvironmentModule } from 'src/integrations/environment/environment.module'; +import { EnvironmentService } from 'src/integrations/environment/environment.service'; +import { LoggerModule } from 'src/integrations/logger/logger.module'; +import { loggerModuleFactory } from 'src/integrations/logger/logger.module-factory'; +import { MessageQueueModule } from 'src/integrations/message-queue/message-queue.module'; +import { messageQueueModuleFactory } from 'src/integrations/message-queue/message-queue.module-factory'; +import { FetchMessagesJob } from 'src/workspace/messaging/jobs/fetch-messages.job'; + +@Module({ + imports: [ + EnvironmentModule.forRoot({}), + LoggerModule.forRootAsync({ + useFactory: loggerModuleFactory, + inject: [EnvironmentService], + }), + MessageQueueModule.forRoot({ + useFactory: messageQueueModuleFactory, + inject: [EnvironmentService], + }), + ], + providers: [ + { + provide: FetchMessagesJob.name, + useClass: FetchMessagesJob, + }, + ], +}) +export class QueueWorkerModule {} diff --git a/packages/twenty-server/src/queue-worker.ts b/packages/twenty-server/src/queue-worker.ts new file mode 100644 index 0000000000..50e1e69eab --- /dev/null +++ b/packages/twenty-server/src/queue-worker.ts @@ -0,0 +1,34 @@ +import { NestFactory } from '@nestjs/core'; + +import { + MessageQueueJob, + MessageQueueJobData, +} from 'src/integrations/message-queue/interfaces/message-queue-job.interface'; + +import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; +import { QueueWorkerModule } from 'src/queue-worker.module'; + +async function bootstrap() { + const app = await NestFactory.createApplicationContext(QueueWorkerModule); + + for (const queueName of Object.values(MessageQueue)) { + const messageQueueService: MessageQueueService = app.get(queueName); + + await messageQueueService.work(async (jobData: MessageQueueJobData) => { + const jobClassName = getJobClassName(jobData.name); + const job: MessageQueueJob = app + .select(QueueWorkerModule) + .get(jobClassName, { strict: true }); + + await job.handle(jobData.data); + }); + } +} +bootstrap(); + +function getJobClassName(name: string): string { + const [, jobName] = name.split('.') ?? []; + + return jobName ?? name; +} diff --git a/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages-commands.module.ts b/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages-commands.module.ts index ed7324f04b..5b17178c33 100644 --- a/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages-commands.module.ts +++ b/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages-commands.module.ts @@ -1,9 +1,10 @@ import { Module } from '@nestjs/common'; import { FetchWorkspaceMessagesCommand } from 'src/workspace/messaging/commands/fetch-workspace-messages.command'; +import { MessagingModule } from 'src/workspace/messaging/messaging.module'; @Module({ - imports: [], + imports: [MessagingModule], providers: [FetchWorkspaceMessagesCommand], }) export class FetchWorkspaceMessagesCommandsModule {} diff --git a/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages.command.ts b/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages.command.ts index 9a47745091..537d66b37c 100644 --- a/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages.command.ts +++ b/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages.command.ts @@ -1,5 +1,7 @@ import { Command, CommandRunner, Option } from 'nest-commander'; +import { MessagingProducer } from 'src/workspace/messaging/producers/messaging-producer'; + interface FetchWorkspaceMessagesOptions { workspaceId: string; } @@ -9,11 +11,18 @@ interface FetchWorkspaceMessagesOptions { description: 'Fetch messages of all workspaceMembers in a workspace.', }) export class FetchWorkspaceMessagesCommand extends CommandRunner { + constructor(private readonly messagingProducer: MessagingProducer) { + super(); + } + async run( _passedParam: string[], options: FetchWorkspaceMessagesOptions, ): Promise { - console.log('fetching messages for workspace', options.workspaceId); + await this.messagingProducer.enqueueFetchMessages( + { workspaceId: options.workspaceId }, + options.workspaceId, + ); return; } diff --git a/packages/twenty-server/src/workspace/messaging/jobs/fetch-messages.job.ts b/packages/twenty-server/src/workspace/messaging/jobs/fetch-messages.job.ts new file mode 100644 index 0000000000..9c25781493 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/jobs/fetch-messages.job.ts @@ -0,0 +1,22 @@ +import { Injectable } from '@nestjs/common'; + +import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/message-queue-job.interface'; + +import { EnvironmentService } from 'src/integrations/environment/environment.service'; + +export type FetchMessagesJobData = { + workspaceId: string; +}; + +@Injectable() +export class FetchMessagesJob implements MessageQueueJob { + constructor(private readonly environmentService: EnvironmentService) {} + + async handle(data: FetchMessagesJobData): Promise { + console.log( + `fetching messages for workspace ${ + data.workspaceId + } with ${this.environmentService.getMessageQueueDriverType()}`, + ); + } +} diff --git a/packages/twenty-server/src/workspace/messaging/messaging.module.ts b/packages/twenty-server/src/workspace/messaging/messaging.module.ts new file mode 100644 index 0000000000..b001aae359 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/messaging.module.ts @@ -0,0 +1,10 @@ +import { Module } from '@nestjs/common'; + +import { MessagingProducer } from 'src/workspace/messaging/producers/messaging-producer'; + +@Module({ + imports: [], + providers: [MessagingProducer], + exports: [MessagingProducer], +}) +export class MessagingModule {} diff --git a/packages/twenty-server/src/workspace/messaging/producers/messaging-producer.ts b/packages/twenty-server/src/workspace/messaging/producers/messaging-producer.ts new file mode 100644 index 0000000000..e54546ee4f --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/producers/messaging-producer.ts @@ -0,0 +1,27 @@ +import { Inject, Injectable } from '@nestjs/common'; + +import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; +import { + FetchMessagesJob, + FetchMessagesJobData, +} from 'src/workspace/messaging/jobs/fetch-messages.job'; + +@Injectable() +export class MessagingProducer { + constructor( + @Inject(MessageQueue.messagingQueue) + private readonly messageQueueService: MessageQueueService, + ) {} + + async enqueueFetchMessages(data: FetchMessagesJobData, singletonKey: string) { + await this.messageQueueService.add( + FetchMessagesJob.name, + data, + { + id: singletonKey, + retryLimit: 2, + }, + ); + } +}