diff --git a/packages/twenty-server/src/engine/integrations/message-queue/message-queue.explorer.ts b/packages/twenty-server/src/engine/integrations/message-queue/message-queue.explorer.ts index 49730821b4..05f859917a 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/message-queue.explorer.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/message-queue.explorer.ts @@ -25,6 +25,7 @@ import { MessageQueueMetadataAccessor } from './message-queue-metadata.accessor' interface ProcessorGroup { instance: object; host: Module; + processorName: string; processMethodNames: string[]; isRequestScoped: boolean; } @@ -77,6 +78,7 @@ export class MessageQueueExplorer implements OnModuleInit { (acc, wrapper) => { const { instance, metatype } = wrapper; const methodNames = this.metadataScanner.getAllMethodNames(instance); + const processorName = wrapper.name; const { queueName } = this.metadataAccessor.getProcessorMetadata( instance.constructor || metatype, @@ -109,6 +111,7 @@ export class MessageQueueExplorer implements OnModuleInit { acc[queueName].push({ instance, host: wrapper.host, + processorName, processMethodNames, isRequestScoped: !wrapper.isDependencyTreeStatic(), }); @@ -137,7 +140,11 @@ export class MessageQueueExplorer implements OnModuleInit { ) { queue.work(async (job) => { for (const processorGroup of processorGroupCollection) { - await this.handleProcessor(processorGroup, job); + const { processorName } = processorGroup; + + if (job.name === processorName) { + await this.handleProcessor(processorGroup, job); + } } }, options); }