Improve performance/robustness of worker

This commit is contained in:
Charles Bochet 2024-07-03 22:54:39 +02:00
parent 14cd6f8b6b
commit 25fce27fe3

View File

@ -25,6 +25,7 @@ import { MessageQueueMetadataAccessor } from './message-queue-metadata.accessor'
interface ProcessorGroup { interface ProcessorGroup {
instance: object; instance: object;
host: Module; host: Module;
processorName: string;
processMethodNames: string[]; processMethodNames: string[];
isRequestScoped: boolean; isRequestScoped: boolean;
} }
@ -77,6 +78,7 @@ export class MessageQueueExplorer implements OnModuleInit {
(acc, wrapper) => { (acc, wrapper) => {
const { instance, metatype } = wrapper; const { instance, metatype } = wrapper;
const methodNames = this.metadataScanner.getAllMethodNames(instance); const methodNames = this.metadataScanner.getAllMethodNames(instance);
const processorName = wrapper.name;
const { queueName } = const { queueName } =
this.metadataAccessor.getProcessorMetadata( this.metadataAccessor.getProcessorMetadata(
instance.constructor || metatype, instance.constructor || metatype,
@ -109,6 +111,7 @@ export class MessageQueueExplorer implements OnModuleInit {
acc[queueName].push({ acc[queueName].push({
instance, instance,
host: wrapper.host, host: wrapper.host,
processorName,
processMethodNames, processMethodNames,
isRequestScoped: !wrapper.isDependencyTreeStatic(), isRequestScoped: !wrapper.isDependencyTreeStatic(),
}); });
@ -137,7 +140,11 @@ export class MessageQueueExplorer implements OnModuleInit {
) { ) {
queue.work(async (job) => { queue.work(async (job) => {
for (const processorGroup of processorGroupCollection) { for (const processorGroup of processorGroupCollection) {
await this.handleProcessor(processorGroup, job); const { processorName } = processorGroup;
if (job.name === processorName) {
await this.handleProcessor(processorGroup, job);
}
} }
}, options); }, options);
} }