From 25fce27fe32e96d7159b15f801eec1cd7b4b4367 Mon Sep 17 00:00:00 2001 From: Charles Bochet Date: Wed, 3 Jul 2024 22:54:39 +0200 Subject: [PATCH] Improve performance/robustness of worker --- .../integrations/message-queue/message-queue.explorer.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/packages/twenty-server/src/engine/integrations/message-queue/message-queue.explorer.ts b/packages/twenty-server/src/engine/integrations/message-queue/message-queue.explorer.ts index 49730821b4..05f859917a 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/message-queue.explorer.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/message-queue.explorer.ts @@ -25,6 +25,7 @@ import { MessageQueueMetadataAccessor } from './message-queue-metadata.accessor' interface ProcessorGroup { instance: object; host: Module; + processorName: string; processMethodNames: string[]; isRequestScoped: boolean; } @@ -77,6 +78,7 @@ export class MessageQueueExplorer implements OnModuleInit { (acc, wrapper) => { const { instance, metatype } = wrapper; const methodNames = this.metadataScanner.getAllMethodNames(instance); + const processorName = wrapper.name; const { queueName } = this.metadataAccessor.getProcessorMetadata( instance.constructor || metatype, @@ -109,6 +111,7 @@ export class MessageQueueExplorer implements OnModuleInit { acc[queueName].push({ instance, host: wrapper.host, + processorName, processMethodNames, isRequestScoped: !wrapper.isDependencyTreeStatic(), }); @@ -137,7 +140,11 @@ export class MessageQueueExplorer implements OnModuleInit { ) { queue.work(async (job) => { for (const processorGroup of processorGroupCollection) { - await this.handleProcessor(processorGroup, job); + const { processorName } = processorGroup; + + if (job.name === processorName) { + await this.handleProcessor(processorGroup, job); + } } }, options); }