twenty/packages/twenty-server/src/app.module.ts
Jérémy M d99b9d1d6b
feat: Enhancements to MessageQueue Module with Decorators (#5657)
### Overview

This PR introduces significant enhancements to the MessageQueue module
by integrating `@Processor`, `@Process`, and `@InjectMessageQueue`
decorators. These changes streamline the process of defining and
managing queue processors and job handlers, and also allow for
request-scoped handlers, improving compatibility with services that rely
on scoped providers like TwentyORM repositories.

### Key Features

1. **Decorator-based Job Handling**: Use `@Processor` and `@Process`
decorators to define job handlers declaratively.
2. **Request Scope Support**: Job handlers can be scoped per request,
enhancing integration with request-scoped services.

### Usage

#### Defining Processors and Job Handlers

The `@Processor` decorator is used to define a class that processes jobs
for a specific queue. The `@Process` decorator is applied to methods
within this class to define specific job handlers.

##### Example 1: Specific Job Handlers

```typescript
import { Processor, Process, InjectMessageQueue } from 'src/engine/integrations/message-queue';

@Processor('taskQueue')
export class TaskProcessor {

  @Process('taskA')
  async handleTaskA(job: { id: string, data: any }) {
    console.log(`Handling task A with data:`, job.data);
    // Logic for task A
  }

  @Process('taskB')
  async handleTaskB(job: { id: string, data: any }) {
    console.log(`Handling task B with data:`, job.data);
    // Logic for task B
  }
}
```

In the example above, `TaskProcessor` is responsible for processing jobs
in the `taskQueue`. The `handleTaskA` method will only be called for
jobs with the name `taskA`, while `handleTaskB` will be called for
`taskB` jobs.

##### Example 2: General Job Handler

```typescript
import { Processor, Process, InjectMessageQueue } from 'src/engine/integrations/message-queue';

@Processor('generalQueue')
export class GeneralProcessor {

  @Process()
  async handleAnyJob(job: { id: string, name: string, data: any }) {
    console.log(`Handling job ${job.name} with data:`, job.data);
    // Logic for any job
  }
}
```

In this example, `GeneralProcessor` handles all jobs in the
`generalQueue`, regardless of the job name. The `handleAnyJob` method
will be invoked for every job added to the `generalQueue`.

#### Adding Jobs to a Queue

You can use the `@InjectMessageQueue` decorator to inject a queue into a
service and add jobs to it.

##### Example:

```typescript
import { Injectable } from '@nestjs/common';
import { InjectMessageQueue, MessageQueue } from 'src/engine/integrations/message-queue';

@Injectable()
export class TaskService {
  constructor(
    @InjectMessageQueue('taskQueue') private readonly taskQueue: MessageQueue,
  ) {}

  async addTaskA(data: any) {
    await this.taskQueue.add('taskA', data);
  }

  async addTaskB(data: any) {
    await this.taskQueue.add('taskB', data);
  }
}
```

In this example, `TaskService` adds jobs to the `taskQueue`. The
`addTaskA` and `addTaskB` methods add jobs named `taskA` and `taskB`,
respectively, to the queue.

#### Using Scoped Job Handlers

To utilize request-scoped job handlers, specify the scope in the
`@Processor` decorator. This is particularly useful for services that
use scoped repositories like those in TwentyORM.

##### Example:

```typescript
import { Processor, Process, InjectMessageQueue, Scope } from 'src/engine/integrations/message-queue';

@Processor({ name: 'scopedQueue', scope: Scope.REQUEST })
export class ScopedTaskProcessor {

  @Process('scopedTask')
  async handleScopedTask(job: { id: string, data: any }) {
    console.log(`Handling scoped task with data:`, job.data);
    // Logic for scoped task, which might use request-scoped services
  }
}
```

Here, the `ScopedTaskProcessor` is associated with `scopedQueue` and
operates with request scope. This setup is essential when the job
handler relies on services that need to be instantiated per request,
such as scoped repositories.

### Migration Notes

- **Decorators**: Refactor job handlers to use `@Processor` and
`@Process` decorators.
- **Request Scope**: Utilize the scope option in `@Processor` if your
job handlers depend on request-scoped services.

Fix #5628

---------

Co-authored-by: Weiko <corentin@twenty.com>
2024-06-17 09:49:37 +02:00

97 lines
3.6 KiB
TypeScript

import {
DynamicModule,
MiddlewareConsumer,
Module,
RequestMethod,
} from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { ServeStaticModule } from '@nestjs/serve-static';
import { GraphQLModule } from '@nestjs/graphql';
import { existsSync } from 'fs';
import { join } from 'path';
import { YogaDriverConfig, YogaDriver } from '@graphql-yoga/nestjs';
import { RestApiModule } from 'src/engine/api/rest/rest-api.module';
import { ModulesModule } from 'src/modules/modules.module';
import { CoreGraphQLApiModule } from 'src/engine/api/graphql/core-graphql-api.module';
import { MetadataGraphQLApiModule } from 'src/engine/api/graphql/metadata-graphql-api.module';
import { GraphQLConfigModule } from 'src/engine/api/graphql/graphql-config/graphql-config.module';
import { GraphQLConfigService } from 'src/engine/api/graphql/graphql-config/graphql-config.service';
import { WorkspaceCacheVersionModule } from 'src/engine/metadata-modules/workspace-cache-version/workspace-cache-version.module';
import { GraphQLHydrateRequestFromTokenMiddleware } from 'src/engine/middlewares/graphql-hydrate-request-from-token.middleware';
import { MessageQueueModule } from 'src/engine/integrations/message-queue/message-queue.module';
import { MessageQueueDriverType } from 'src/engine/integrations/message-queue/interfaces';
import { IntegrationsModule } from './engine/integrations/integrations.module';
import { CoreEngineModule } from './engine/core-modules/core-engine.module';
@Module({
imports: [
// Nest.js devtools, use devtools.nestjs.com to debug
// DevtoolsModule.registerAsync({
// useFactory: (environmentService: EnvironmentService) => ({
// http: environmentService.get('DEBUG_MODE'),
// port: environmentService.get('DEBUG_PORT'),
// }),
// inject: [EnvironmentService],
// }),
ConfigModule.forRoot({
isGlobal: true,
}),
GraphQLModule.forRootAsync<YogaDriverConfig>({
driver: YogaDriver,
imports: [CoreEngineModule, GraphQLConfigModule],
useClass: GraphQLConfigService,
}),
// Integrations module, contains all the integrations with other services
IntegrationsModule,
// Core engine module, contains all the core modules
CoreEngineModule,
// Modules module, contains all business logic modules
ModulesModule,
// Needed for the user workspace middleware
WorkspaceCacheVersionModule,
// Api modules
CoreGraphQLApiModule,
MetadataGraphQLApiModule,
RestApiModule,
// Conditional modules
...AppModule.getConditionalModules(),
],
})
export class AppModule {
private static getConditionalModules(): DynamicModule[] {
const modules: DynamicModule[] = [];
const frontPath = join(__dirname, '..', 'front');
if (existsSync(frontPath)) {
modules.push(
ServeStaticModule.forRoot({
rootPath: frontPath,
}),
);
}
// Messaque Queue explorer only for sync driver
// Maybe we don't need to conditionaly register the explorer, because we're creating a jobs module
// that will expose classes that are only used in the queue worker
if (process.env.MESSAGE_QUEUE_TYPE === MessageQueueDriverType.Sync) {
modules.push(MessageQueueModule.registerExplorer());
}
return modules;
}
configure(consumer: MiddlewareConsumer) {
consumer
.apply(GraphQLHydrateRequestFromTokenMiddleware)
.forRoutes({ path: 'graphql', method: RequestMethod.ALL });
consumer
.apply(GraphQLHydrateRequestFromTokenMiddleware)
.forRoutes({ path: 'metadata', method: RequestMethod.ALL });
}
}