From 8728b63aeb46d0ca1d42e046fff522df4ec148eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Mon, 12 Aug 2024 11:03:37 +0200 Subject: [PATCH] refactor(core): Port scaling mode config (no-changelog) (#10321) --- .../config/src/configs/scaling-mode.config.ts | 96 +++++++++++++++ packages/@n8n/config/src/index.ts | 4 + packages/@n8n/config/test/config.test.ts | 27 +++++ packages/cli/src/WorkflowRunner.ts | 3 +- packages/cli/src/commands/worker.ts | 6 +- packages/cli/src/config/schema.ts | 113 ------------------ packages/cli/src/scaling/scaling.service.ts | 8 +- .../services/redis/redis-client.service.ts | 16 +-- 8 files changed, 146 insertions(+), 127 deletions(-) create mode 100644 packages/@n8n/config/src/configs/scaling-mode.config.ts diff --git a/packages/@n8n/config/src/configs/scaling-mode.config.ts b/packages/@n8n/config/src/configs/scaling-mode.config.ts new file mode 100644 index 0000000000..6ff331eedd --- /dev/null +++ b/packages/@n8n/config/src/configs/scaling-mode.config.ts @@ -0,0 +1,96 @@ +import { Config, Env, Nested } from '../decorators'; + +@Config +class HealthConfig { + /** Whether to enable the worker health check endpoint `/healthz`. */ + @Env('QUEUE_HEALTH_CHECK_ACTIVE') + active = false; + + /** Port for worker to respond to health checks requests on, if enabled. */ + @Env('QUEUE_HEALTH_CHECK_PORT') + port = 5678; +} + +@Config +class RedisConfig { + /** Redis database for Bull queue. */ + @Env('QUEUE_BULL_REDIS_DB') + db = 0; + + /** Redis host for Bull queue. */ + @Env('QUEUE_BULL_REDIS_HOST') + host = 'localhost'; + + /** Password to authenticate with Redis. */ + @Env('QUEUE_BULL_REDIS_PASSWORD') + password = ''; + + /** Port for Redis to listen on. */ + @Env('QUEUE_BULL_REDIS_PORT') + port = 6379; + + /** Max cumulative timeout (in milliseconds) of connection retries before process exit. */ + @Env('QUEUE_BULL_REDIS_TIMEOUT_THRESHOLD') + timeoutThreshold = 10_000; + + /** Redis username. Redis 6.0 or higher required. */ + @Env('QUEUE_BULL_REDIS_USERNAME') + username = ''; + + /** Redis cluster startup nodes, as comma-separated list of `{host}:{port}` pairs. @example 'redis-1:6379,redis-2:6379' */ + @Env('QUEUE_BULL_REDIS_CLUSTER_NODES') + clusterNodes = ''; + + /** Whether to enable TLS on Redis connections. */ + @Env('QUEUE_BULL_REDIS_TLS') + tls = false; +} + +@Config +class SettingsConfig { + /** How long (in milliseconds) is the lease period for a worker processing a job. */ + @Env('QUEUE_WORKER_LOCK_DURATION') + lockDuration = 30_000; + + /** How often (in milliseconds) a worker must renew the lease. */ + @Env('QUEUE_WORKER_LOCK_RENEW_TIME') + lockRenewTime = 15_000; + + /** How often (in milliseconds) Bull must check for stalled jobs. `0` to disable. */ + @Env('QUEUE_WORKER_STALLED_INTERVAL') + stalledInterval = 30_000; + + /** Max number of times a stalled job will be re-processed. See Bull's [documentation](https://docs.bullmq.io/guide/workers/stalled-jobs). */ + @Env('QUEUE_WORKER_MAX_STALLED_COUNT') + maxStalledCount = 1; +} + +@Config +class BullConfig { + /** Prefix for Bull keys on Redis. @example 'bull:jobs:23' */ + @Env('QUEUE_BULL_PREFIX') + prefix = 'bull'; + + @Nested + redis: RedisConfig; + + /** How often (in seconds) to poll the Bull queue to identify executions finished during a Redis crash. `0` to disable. May increase Redis traffic significantly. */ + @Env('QUEUE_RECOVERY_INTERVAL') + queueRecoveryInterval = 60; // watchdog interval + + /** @deprecated How long (in seconds) a worker must wait for active executions to finish before exiting. Use `N8N_GRACEFUL_SHUTDOWN_TIMEOUT` instead */ + @Env('QUEUE_WORKER_TIMEOUT') + gracefulShutdownTimeout = 30; + + @Nested + settings: SettingsConfig; +} + +@Config +export class ScalingModeConfig { + @Nested + health: HealthConfig; + + @Nested + bull: BullConfig; +} diff --git a/packages/@n8n/config/src/index.ts b/packages/@n8n/config/src/index.ts index 33d3e67655..88e6fb0117 100644 --- a/packages/@n8n/config/src/index.ts +++ b/packages/@n8n/config/src/index.ts @@ -12,6 +12,7 @@ import { ExternalStorageConfig } from './configs/external-storage'; import { WorkflowsConfig } from './configs/workflows'; import { EndpointsConfig } from './configs/endpoints'; import { CacheConfig } from './configs/cache'; +import { ScalingModeConfig } from './configs/scaling-mode.config'; @Config class UserManagementConfig { @@ -79,4 +80,7 @@ export class GlobalConfig { @Nested readonly cache: CacheConfig; + + @Nested + queue: ScalingModeConfig; } diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index e077e233b7..83edcc25af 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -184,6 +184,33 @@ describe('GlobalConfig', () => { ttl: 3600000, }, }, + queue: { + health: { + active: false, + port: 5678, + }, + bull: { + redis: { + db: 0, + host: 'localhost', + password: '', + port: 6379, + timeoutThreshold: 10_000, + username: '', + clusterNodes: '', + tls: false, + }, + queueRecoveryInterval: 60, + gracefulShutdownTimeout: 30, + prefix: 'bull', + settings: { + lockDuration: 30_000, + lockRenewTime: 15_000, + stalledInterval: 30_000, + maxStalledCount: 1, + }, + }, + }, }; it('should use all default values when no env variables are defined', () => { diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index 1be878c165..9ff1de8f35 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -37,6 +37,7 @@ import { PermissionChecker } from '@/UserManagement/PermissionChecker'; import { Logger } from '@/Logger'; import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service'; import { EventService } from './events/event.service'; +import { GlobalConfig } from '@n8n/config'; @Service() export class WorkflowRunner { @@ -424,7 +425,7 @@ export class WorkflowRunner { const jobData: Promise = job.finished(); - const queueRecoveryInterval = config.getEnv('queue.bull.queueRecoveryInterval'); + const { queueRecoveryInterval } = Container.get(GlobalConfig).queue.bull; const racingPromises: Array> = [jobData]; diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index cadf4f708a..1473ed13ff 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -102,7 +102,7 @@ export class Worker extends BaseCommand { const { QUEUE_WORKER_TIMEOUT } = process.env; if (QUEUE_WORKER_TIMEOUT) { this.gracefulShutdownTimeoutInS = - parseInt(QUEUE_WORKER_TIMEOUT, 10) || config.default('queue.bull.gracefulShutdownTimeout'); + parseInt(QUEUE_WORKER_TIMEOUT, 10) || this.globalConfig.queue.bull.gracefulShutdownTimeout; this.logger.warn( 'QUEUE_WORKER_TIMEOUT has been deprecated. Rename it to N8N_GRACEFUL_SHUTDOWN_TIMEOUT.', ); @@ -182,7 +182,7 @@ export class Worker extends BaseCommand { } async setupHealthMonitor() { - const port = config.getEnv('queue.health.port'); + const { port } = this.globalConfig.queue.health; const app = express(); app.disable('x-powered-by'); @@ -285,7 +285,7 @@ export class Worker extends BaseCommand { this.logger.info(` * Concurrency: ${this.concurrency}`); this.logger.info(''); - if (config.getEnv('queue.health.active')) { + if (this.globalConfig.queue.health.active) { await this.setupHealthMonitor(); } diff --git a/packages/cli/src/config/schema.ts b/packages/cli/src/config/schema.ts index 90043ee20e..a0db9a4c36 100644 --- a/packages/cli/src/config/schema.ts +++ b/packages/cli/src/config/schema.ts @@ -162,119 +162,6 @@ export const schema = { }, }, - queue: { - health: { - active: { - doc: 'If health checks should be enabled', - format: Boolean, - default: false, - env: 'QUEUE_HEALTH_CHECK_ACTIVE', - }, - port: { - doc: 'Port to serve health check on if activated', - format: Number, - default: 5678, - env: 'QUEUE_HEALTH_CHECK_PORT', - }, - }, - bull: { - prefix: { - doc: 'Prefix for all bull queue keys', - format: String, - default: 'bull', - env: 'QUEUE_BULL_PREFIX', - }, - redis: { - db: { - doc: 'Redis DB', - format: Number, - default: 0, - env: 'QUEUE_BULL_REDIS_DB', - }, - host: { - doc: 'Redis Host', - format: String, - default: 'localhost', - env: 'QUEUE_BULL_REDIS_HOST', - }, - password: { - doc: 'Redis Password', - format: String, - default: '', - env: 'QUEUE_BULL_REDIS_PASSWORD', - }, - port: { - doc: 'Redis Port', - format: Number, - default: 6379, - env: 'QUEUE_BULL_REDIS_PORT', - }, - timeoutThreshold: { - doc: 'Max cumulative timeout (in milliseconds) of connection retries before process exit', - format: Number, - default: 10000, - env: 'QUEUE_BULL_REDIS_TIMEOUT_THRESHOLD', - }, - username: { - doc: 'Redis Username (needs Redis >= 6)', - format: String, - default: '', - env: 'QUEUE_BULL_REDIS_USERNAME', - }, - clusterNodes: { - doc: 'Redis Cluster startup nodes (comma separated list of host:port pairs)', - format: String, - default: '', - env: 'QUEUE_BULL_REDIS_CLUSTER_NODES', - }, - tls: { - format: Boolean, - default: false, - env: 'QUEUE_BULL_REDIS_TLS', - doc: 'Enable TLS on Redis connections. Default: false', - }, - }, - queueRecoveryInterval: { - doc: 'If > 0 enables an active polling to the queue that can recover for Redis crashes. Given in seconds; 0 is disabled. May increase Redis traffic significantly.', - format: Number, - default: 60, - env: 'QUEUE_RECOVERY_INTERVAL', - }, - gracefulShutdownTimeout: { - doc: '[DEPRECATED] (Use N8N_GRACEFUL_SHUTDOWN_TIMEOUT instead) How long should n8n wait for running executions before exiting worker process (seconds)', - format: Number, - default: 30, - env: 'QUEUE_WORKER_TIMEOUT', - }, - settings: { - lockDuration: { - doc: 'How long (ms) is the lease period for a worker to work on a message', - format: Number, - default: 30000, - env: 'QUEUE_WORKER_LOCK_DURATION', - }, - lockRenewTime: { - doc: 'How frequently (ms) should a worker renew the lease time', - format: Number, - default: 15000, - env: 'QUEUE_WORKER_LOCK_RENEW_TIME', - }, - stalledInterval: { - doc: 'How often check for stalled jobs (use 0 for never checking)', - format: Number, - default: 30000, - env: 'QUEUE_WORKER_STALLED_INTERVAL', - }, - maxStalledCount: { - doc: 'Max amount of times a stalled job will be re-processed', - format: Number, - default: 1, - env: 'QUEUE_WORKER_MAX_STALLED_COUNT', - }, - }, - }, - }, - generic: { // The timezone to use. Is important for nodes like "Cron" which start the // workflow automatically at a specified time. This setting can also be diff --git a/packages/cli/src/scaling/scaling.service.ts b/packages/cli/src/scaling/scaling.service.ts index 21d05fceff..73b072af6c 100644 --- a/packages/cli/src/scaling/scaling.service.ts +++ b/packages/cli/src/scaling/scaling.service.ts @@ -10,6 +10,7 @@ import { JOB_TYPE_NAME, QUEUE_NAME } from './constants'; import { JobProcessor } from './job-processor'; import type { JobQueue, Job, JobData, JobOptions, JobMessage, JobStatus, JobId } from './types'; import type { IExecuteResponsePromiseData } from 'n8n-workflow'; +import { GlobalConfig } from '@n8n/config'; @Service() export class ScalingService { @@ -21,6 +22,7 @@ export class ScalingService { private readonly logger: Logger, private readonly activeExecutions: ActiveExecutions, private readonly jobProcessor: JobProcessor, + private readonly globalConfig: GlobalConfig, ) {} // #region Lifecycle @@ -30,12 +32,12 @@ export class ScalingService { const { RedisClientService } = await import('@/services/redis/redis-client.service'); const service = Container.get(RedisClientService); - const bullPrefix = config.getEnv('queue.bull.prefix'); + const bullPrefix = this.globalConfig.queue.bull.prefix; const prefix = service.toValidPrefix(bullPrefix); this.queue = new BullQueue(QUEUE_NAME, { prefix, - settings: config.get('queue.bull.settings'), + settings: this.globalConfig.queue.bull.settings, createClient: (type) => service.createClient({ type: `${type}(bull)` }), }); @@ -133,7 +135,7 @@ export class ScalingService { let latestAttemptTs = 0; let cumulativeTimeoutMs = 0; - const MAX_TIMEOUT_MS = config.getEnv('queue.bull.redis.timeoutThreshold'); + const MAX_TIMEOUT_MS = this.globalConfig.queue.bull.redis.timeoutThreshold; const RESET_LENGTH_MS = 30_000; this.queue.on('error', (error: Error) => { diff --git a/packages/cli/src/services/redis/redis-client.service.ts b/packages/cli/src/services/redis/redis-client.service.ts index 7363a9c9b7..b5c86523e0 100644 --- a/packages/cli/src/services/redis/redis-client.service.ts +++ b/packages/cli/src/services/redis/redis-client.service.ts @@ -1,17 +1,20 @@ import { Service } from 'typedi'; -import config from '@/config'; import { Logger } from '@/Logger'; import ioRedis from 'ioredis'; import type { Cluster, RedisOptions } from 'ioredis'; import type { RedisClientType } from './RedisServiceBaseClasses'; import { OnShutdown } from '@/decorators/OnShutdown'; import { LOWEST_SHUTDOWN_PRIORITY } from '@/constants'; +import { GlobalConfig } from '@n8n/config'; @Service() export class RedisClientService { private readonly clients = new Set(); - constructor(private readonly logger: Logger) {} + constructor( + private readonly logger: Logger, + private readonly globalConfig: GlobalConfig, + ) {} createClient(arg: { type: RedisClientType; extraOptions?: RedisOptions }) { const client = @@ -57,7 +60,7 @@ export class RedisClientService { }) { const options = this.getOptions({ extraOptions }); - const { host, port } = config.getEnv('queue.bull.redis'); + const { host, port } = this.globalConfig.queue.bull.redis; options.host = host; options.port = port; @@ -87,7 +90,7 @@ export class RedisClientService { } private getOptions({ extraOptions }: { extraOptions?: RedisOptions }) { - const { username, password, db, tls } = config.getEnv('queue.bull.redis'); + const { username, password, db, tls } = this.globalConfig.queue.bull.redis; /** * Disabling ready check allows quick reconnection to Redis if Redis becomes @@ -124,7 +127,7 @@ export class RedisClientService { private retryStrategy() { const RETRY_INTERVAL = 500; // ms const RESET_LENGTH = 30_000; // ms - const MAX_TIMEOUT = config.getEnv('queue.bull.redis.timeoutThreshold'); + const MAX_TIMEOUT = this.globalConfig.queue.bull.redis.timeoutThreshold; let lastAttemptTs = 0; let cumulativeTimeout = 0; @@ -152,8 +155,7 @@ export class RedisClientService { } private clusterNodes() { - return config - .getEnv('queue.bull.redis.clusterNodes') + return this.globalConfig.queue.bull.redis.clusterNodes .split(',') .filter((pair) => pair.trim().length > 0) .map((pair) => {