mirror of
https://github.com/n8n-io/n8n.git
synced 2024-11-10 12:35:46 +03:00
fix(core): Add check that queue is defined and remove cyclic dependency (#7404)
In a rare edge case an undefined queue could be returned - this should not happen and now an error is thrown. Also using the opportunity to remove a cyclic dependency from the Queue.
This commit is contained in:
parent
609f0837cf
commit
45f2ef373e
@ -2,7 +2,8 @@ import type Bull from 'bull';
|
||||
import { Service } from 'typedi';
|
||||
import type { ExecutionError, IExecuteResponsePromiseData } from 'n8n-workflow';
|
||||
import { ActiveExecutions } from '@/ActiveExecutions';
|
||||
import * as WebhookHelpers from '@/WebhookHelpers';
|
||||
import { decodeWebhookResponse } from '@/helpers/decodeWebhookResponse';
|
||||
|
||||
import {
|
||||
getRedisClusterClient,
|
||||
getRedisClusterNodes,
|
||||
@ -62,7 +63,7 @@ export class Queue {
|
||||
this.jobQueue.on('global:progress', (jobId, progress: WebhookResponse) => {
|
||||
this.activeExecutions.resolveResponsePromise(
|
||||
progress.executionId,
|
||||
WebhookHelpers.decodeWebhookResponse(progress.response),
|
||||
decodeWebhookResponse(progress.response),
|
||||
);
|
||||
});
|
||||
}
|
||||
@ -79,7 +80,23 @@ export class Queue {
|
||||
return this.jobQueue.getJobs(jobTypes);
|
||||
}
|
||||
|
||||
async process(concurrency: number, fn: Bull.ProcessCallbackFunction<JobData>): Promise<void> {
|
||||
return this.jobQueue.process(concurrency, fn);
|
||||
}
|
||||
|
||||
async ping(): Promise<string> {
|
||||
return this.jobQueue.client.ping();
|
||||
}
|
||||
|
||||
async pause(isLocal?: boolean): Promise<void> {
|
||||
return this.jobQueue.pause(isLocal);
|
||||
}
|
||||
|
||||
getBullObjectInstance(): JobQueue {
|
||||
if (this.jobQueue === undefined) {
|
||||
// if queue is not initialized yet throw an error, since we do not want to hand around an undefined queue
|
||||
throw new Error('Queue is not initialized yet!');
|
||||
}
|
||||
return this.jobQueue;
|
||||
}
|
||||
|
||||
|
@ -160,23 +160,6 @@ export function getWorkflowWebhooks(
|
||||
return returnData;
|
||||
}
|
||||
|
||||
export function decodeWebhookResponse(
|
||||
response: IExecuteResponsePromiseData,
|
||||
): IExecuteResponsePromiseData {
|
||||
if (
|
||||
typeof response === 'object' &&
|
||||
typeof response.body === 'object' &&
|
||||
(response.body as IDataObject)['__@N8nEncodedBuffer@__']
|
||||
) {
|
||||
response.body = Buffer.from(
|
||||
(response.body as IDataObject)['__@N8nEncodedBuffer@__'] as string,
|
||||
BINARY_ENCODING,
|
||||
);
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
export function encodeWebhookResponse(
|
||||
response: IExecuteResponsePromiseData,
|
||||
): IExecuteResponsePromiseData {
|
||||
|
@ -37,10 +37,10 @@ import type {
|
||||
IWorkflowExecutionDataProcessWithExecution,
|
||||
} from '@/Interfaces';
|
||||
import { NodeTypes } from '@/NodeTypes';
|
||||
import type { Job, JobData, JobQueue, JobResponse } from '@/Queue';
|
||||
import type { Job, JobData, JobResponse } from '@/Queue';
|
||||
// eslint-disable-next-line import/no-cycle
|
||||
import { Queue } from '@/Queue';
|
||||
import * as WebhookHelpers from '@/WebhookHelpers';
|
||||
import { decodeWebhookResponse } from '@/helpers/decodeWebhookResponse';
|
||||
// eslint-disable-next-line import/no-cycle
|
||||
import * as WorkflowHelpers from '@/WorkflowHelpers';
|
||||
// eslint-disable-next-line import/no-cycle
|
||||
@ -60,7 +60,7 @@ export class WorkflowRunner {
|
||||
|
||||
push: Push;
|
||||
|
||||
jobQueue: JobQueue;
|
||||
jobQueue: Queue;
|
||||
|
||||
constructor() {
|
||||
this.push = Container.get(Push);
|
||||
@ -172,8 +172,7 @@ export class WorkflowRunner {
|
||||
await initErrorHandling();
|
||||
|
||||
if (executionsMode === 'queue') {
|
||||
const queue = Container.get(Queue);
|
||||
this.jobQueue = queue.getBullObjectInstance();
|
||||
this.jobQueue = Container.get(Queue);
|
||||
}
|
||||
|
||||
if (executionsMode === 'queue' && data.executionMode !== 'manual') {
|
||||
@ -733,7 +732,7 @@ export class WorkflowRunner {
|
||||
this.activeExecutions.remove(executionId, message.data.runData);
|
||||
} else if (message.type === 'sendResponse') {
|
||||
if (responsePromise) {
|
||||
responsePromise.resolve(WebhookHelpers.decodeWebhookResponse(message.data.response));
|
||||
responsePromise.resolve(decodeWebhookResponse(message.data.response));
|
||||
}
|
||||
} else if (message.type === 'sendDataToUI') {
|
||||
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
|
||||
|
@ -22,7 +22,7 @@ import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'
|
||||
import { PermissionChecker } from '@/UserManagement/PermissionChecker';
|
||||
|
||||
import config from '@/config';
|
||||
import type { Job, JobId, JobQueue, JobResponse, WebhookResponse } from '@/Queue';
|
||||
import type { Job, JobId, JobResponse, WebhookResponse } from '@/Queue';
|
||||
import { Queue } from '@/Queue';
|
||||
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
|
||||
import { N8N_VERSION } from '@/constants';
|
||||
@ -61,7 +61,7 @@ export class Worker extends BaseCommand {
|
||||
[jobId: string]: WorkerJobStatusSummary;
|
||||
} = {};
|
||||
|
||||
static jobQueue: JobQueue;
|
||||
static jobQueue: Queue;
|
||||
|
||||
redisSubscriber: RedisServicePubSubSubscriber;
|
||||
|
||||
@ -335,15 +335,14 @@ export class Worker extends BaseCommand {
|
||||
`Opening Redis connection to listen to messages with timeout ${redisConnectionTimeoutLimit}`,
|
||||
);
|
||||
|
||||
const queue = Container.get(Queue);
|
||||
await queue.init();
|
||||
Worker.jobQueue = Container.get(Queue);
|
||||
await Worker.jobQueue.init();
|
||||
this.logger.debug('Queue singleton ready');
|
||||
Worker.jobQueue = queue.getBullObjectInstance();
|
||||
void Worker.jobQueue.process(flags.concurrency, async (job) =>
|
||||
this.runJob(job, this.nodeTypes),
|
||||
);
|
||||
|
||||
Worker.jobQueue.on('global:progress', (jobId: JobId, progress) => {
|
||||
Worker.jobQueue.getBullObjectInstance().on('global:progress', (jobId: JobId, progress) => {
|
||||
// Progress of a job got updated which does get used
|
||||
// to communicate that a job got canceled.
|
||||
|
||||
@ -359,7 +358,7 @@ export class Worker extends BaseCommand {
|
||||
|
||||
let lastTimer = 0;
|
||||
let cumulativeTimeout = 0;
|
||||
Worker.jobQueue.on('error', (error: Error) => {
|
||||
Worker.jobQueue.getBullObjectInstance().on('error', (error: Error) => {
|
||||
if (error.toString().includes('ECONNREFUSED')) {
|
||||
const now = Date.now();
|
||||
if (now - lastTimer > 30000) {
|
||||
@ -423,7 +422,7 @@ export class Worker extends BaseCommand {
|
||||
// if it loses the connection to redis
|
||||
try {
|
||||
// Redis ping
|
||||
await Worker.jobQueue.client.ping();
|
||||
await Worker.jobQueue.ping();
|
||||
} catch (e) {
|
||||
LoggerProxy.error('No Redis connection!', e as Error);
|
||||
const error = new ResponseHelper.ServiceUnavailableError('No Redis connection!');
|
||||
|
18
packages/cli/src/helpers/decodeWebhookResponse.ts
Normal file
18
packages/cli/src/helpers/decodeWebhookResponse.ts
Normal file
@ -0,0 +1,18 @@
|
||||
import { BINARY_ENCODING, type IDataObject, type IExecuteResponsePromiseData } from 'n8n-workflow';
|
||||
|
||||
export function decodeWebhookResponse(
|
||||
response: IExecuteResponsePromiseData,
|
||||
): IExecuteResponsePromiseData {
|
||||
if (
|
||||
typeof response === 'object' &&
|
||||
typeof response.body === 'object' &&
|
||||
(response.body as IDataObject)['__@N8nEncodedBuffer@__']
|
||||
) {
|
||||
response.body = Buffer.from(
|
||||
(response.body as IDataObject)['__@N8nEncodedBuffer@__'] as string,
|
||||
BINARY_ENCODING,
|
||||
);
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
Loading…
Reference in New Issue
Block a user