Fix redis connection (#7956)

## Context
bull-mq connection was not working as intended, the connection parameter
was ignored and was falling back to localhost.
This PR should fix the issue by instantiating a IORedis client following
bullmq documentation https://docs.bullmq.io/guide/connections
I also changed cache-storage module to use IORedis client as well to be
more consistent even though it was not necessary there. We could move
that instantiation to a factory class in the future.

## Test
start server + worker with correct port and wrong port with
cache-storage-type memory/redis and queue-type sync/bull-mq
This commit is contained in:
Weiko 2024-10-22 16:40:18 +02:00 committed by GitHub
parent 18cfe79b80
commit 02c34d547f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 58 additions and 23 deletions

View File

@ -4,9 +4,11 @@ import { redisStore } from 'cache-manager-redis-yet';
import { CacheStorageType } from 'src/engine/core-modules/cache-storage/types/cache-storage-type.enum';
import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';
export const cacheStorageModuleFactory = (
environmentService: EnvironmentService,
redisClientService: RedisClientService,
): CacheModuleOptions => {
const cacheStorageType = environmentService.get('CACHE_STORAGE_TYPE');
const cacheStorageTtl = environmentService.get('CACHE_STORAGE_TTL');
@ -20,18 +22,10 @@ export const cacheStorageModuleFactory = (
return cacheModuleOptions;
}
case CacheStorageType.Redis: {
const connectionString = environmentService.get('REDIS_URL');
if (!connectionString) {
throw new Error(
`${cacheStorageType} cache storage requires REDIS_URL to be defined, check your .env file`,
);
}
return {
...cacheModuleOptions,
store: redisStore,
url: connectionString,
client: redisClientService.getClient(),
};
}
default:

View File

@ -7,6 +7,7 @@ import { FlushCacheCommand } from 'src/engine/core-modules/cache-storage/command
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';
@Global()
@Module({
@ -15,7 +16,7 @@ import { EnvironmentService } from 'src/engine/core-modules/environment/environm
isGlobal: true,
imports: [ConfigModule],
useFactory: cacheStorageModuleFactory,
inject: [EnvironmentService],
inject: [EnvironmentService, RedisClientService],
}),
],
providers: [

View File

@ -32,15 +32,17 @@ import { messageQueueModuleFactory } from 'src/engine/core-modules/message-queue
import { TimelineMessagingModule } from 'src/engine/core-modules/messaging/timeline-messaging.module';
import { OpenApiModule } from 'src/engine/core-modules/open-api/open-api.module';
import { PostgresCredentialsModule } from 'src/engine/core-modules/postgres-credentials/postgres-credentials.module';
import { RedisClientModule } from 'src/engine/core-modules/redis-client/redis-client.module';
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';
import { serverlessModuleFactory } from 'src/engine/core-modules/serverless/serverless-module.factory';
import { ServerlessModule } from 'src/engine/core-modules/serverless/serverless.module';
import { WorkspaceSSOModule } from 'src/engine/core-modules/sso/sso.module';
import { TelemetryModule } from 'src/engine/core-modules/telemetry/telemetry.module';
import { UserModule } from 'src/engine/core-modules/user/user.module';
import { WorkflowTriggerApiModule } from 'src/engine/core-modules/workflow/workflow-trigger-api.module';
import { WorkspaceInvitationModule } from 'src/engine/core-modules/workspace-invitation/workspace-invitation.module';
import { WorkspaceModule } from 'src/engine/core-modules/workspace/workspace.module';
import { WorkspaceEventEmitterModule } from 'src/engine/workspace-event-emitter/workspace-event-emitter.module';
import { WorkspaceSSOModule } from 'src/engine/core-modules/sso/sso.module';
import { AnalyticsModule } from './analytics/analytics.module';
import { ClientConfigModule } from './client-config/client-config.module';
@ -69,6 +71,7 @@ import { FileModule } from './file/file.module';
ActorModule,
TelemetryModule,
EnvironmentModule.forRoot({}),
RedisClientModule,
FileStorageModule.forRootAsync({
useFactory: fileStorageModuleFactory,
inject: [EnvironmentService],
@ -79,7 +82,7 @@ import { FileModule } from './file/file.module';
}),
MessageQueueModule.registerAsync({
useFactory: messageQueueModuleFactory,
inject: [EnvironmentService],
inject: [EnvironmentService, RedisClientService],
}),
ExceptionHandlerModule.forRootAsync({
useFactory: exceptionHandlerModuleFactory,

View File

@ -1,5 +1,3 @@
import { ConnectionOptions } from 'tls';
import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';
import {
BullMQDriverFactoryOptions,
@ -8,6 +6,7 @@ import {
PgBossDriverFactoryOptions,
SyncDriverFactoryOptions,
} from 'src/engine/core-modules/message-queue/interfaces';
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';
/**
* MessageQueue Module factory
@ -16,6 +15,7 @@ import {
*/
export const messageQueueModuleFactory = async (
environmentService: EnvironmentService,
redisClientService: RedisClientService,
): Promise<MessageQueueModuleOptions> => {
const driverType = environmentService.get('MESSAGE_QUEUE_TYPE');
@ -37,18 +37,10 @@ export const messageQueueModuleFactory = async (
} satisfies PgBossDriverFactoryOptions;
}
case MessageQueueDriverType.BullMQ: {
const connectionString = environmentService.get('REDIS_URL');
if (!connectionString) {
throw new Error(
`${MessageQueueDriverType.BullMQ} message queue requires REDIS_URL to be defined, check your .env file`,
);
}
return {
type: MessageQueueDriverType.BullMQ,
options: {
connection: connectionString as ConnectionOptions,
connection: redisClientService.getClient(),
},
} satisfies BullMQDriverFactoryOptions;
}

View File

@ -0,0 +1,12 @@
import { Global, Module } from '@nestjs/common';
import { EnvironmentModule } from 'src/engine/core-modules/environment/environment.module';
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';
@Global()
@Module({
imports: [EnvironmentModule],
providers: [RedisClientService],
exports: [RedisClientService],
})
export class RedisClientModule {}

View File

@ -0,0 +1,33 @@
import { Injectable, OnModuleDestroy } from '@nestjs/common';
import IORedis from 'ioredis';
import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';
@Injectable()
export class RedisClientService implements OnModuleDestroy {
private redisClient: IORedis | null = null;
constructor(private readonly environmentService: EnvironmentService) {}
getClient() {
if (!this.redisClient) {
const redisUrl = this.environmentService.get('REDIS_URL');
if (!redisUrl) {
throw new Error('REDIS_URL must be defined');
}
this.redisClient = new IORedis(redisUrl);
}
return this.redisClient;
}
async onModuleDestroy() {
if (this.redisClient) {
await this.redisClient.quit();
this.redisClient = null;
}
}
}