1
1
mirror of https://github.com/n8n-io/n8n.git synced 2024-09-19 17:07:18 +03:00

feat(core): Remove own execution-process mode (#8490)

This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2024-01-30 12:51:40 +01:00 committed by GitHub
parent 79c9763122
commit 121a55b691
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 14 additions and 784 deletions

View File

@ -1,5 +1,4 @@
import { Service } from 'typedi';
import type { ChildProcess } from 'child_process';
import type PCancelable from 'p-cancelable';
import type {
IDeferredPromise,
@ -34,11 +33,7 @@ export class ActiveExecutions {
/**
* Add a new active execution
*/
async add(
executionData: IWorkflowExecutionDataProcess,
process?: ChildProcess,
executionId?: string,
): Promise<string> {
async add(executionData: IWorkflowExecutionDataProcess, executionId?: string): Promise<string> {
let executionStatus: ExecutionStatus = executionId ? 'running' : 'new';
if (executionId === undefined) {
// Is a new execution so save in DB
@ -82,7 +77,6 @@ export class ActiveExecutions {
this.activeExecutions[executionId] = {
executionData,
process,
startedAt: new Date(),
postExecutePromises: [],
status: executionStatus,
@ -152,33 +146,15 @@ export class ActiveExecutions {
/**
* Forces an execution to stop
*
* @param {string} executionId The id of the execution to stop
* @param {string} timeout String 'timeout' given if stop due to timeout
*/
async stopExecution(executionId: string, timeout?: string): Promise<IRun | undefined> {
async stopExecution(executionId: string): Promise<IRun | undefined> {
const execution = this.activeExecutions[executionId];
if (execution === undefined) {
// There is no execution running with that id
return;
}
// In case something goes wrong make sure that promise gets first
// returned that it gets then also resolved correctly.
if (execution.process !== undefined) {
// Workflow is running in subprocess
if (execution.process.connected) {
setTimeout(() => {
// execute on next event loop tick;
execution.process!.send({
type: timeout || 'stopExecution',
});
}, 1);
}
} else {
// Workflow is running in current process
execution.workflowExecution!.cancel();
}
execution.workflowExecution!.cancel();
return await this.getPostExecutePromise(executionId);
}

View File

@ -31,8 +31,6 @@ import type { WorkflowExecute } from 'n8n-core';
import type PCancelable from 'p-cancelable';
import type { ChildProcess } from 'child_process';
import type { DatabaseType } from '@db/types';
import type { AuthProviderType } from '@db/entities/AuthIdentity';
import type { SharedCredentials } from '@db/entities/SharedCredentials';
@ -192,7 +190,6 @@ export interface IExecutionsCurrentSummary {
export interface IExecutingWorkflowData {
executionData: IWorkflowExecutionDataProcess;
process?: ChildProcess;
startedAt: Date;
postExecutePromises: Array<IDeferredPromise<IRun | undefined>>;
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>;
@ -558,11 +555,6 @@ export interface IWorkflowErrorData {
};
}
export interface IProcessMessageDataHook {
hook: string;
parameters: any[];
}
export interface IWorkflowExecutionDataProcess {
destinationNode?: string;
restartExecutionId?: string;
@ -577,11 +569,6 @@ export interface IWorkflowExecutionDataProcess {
userId: string;
}
export interface IWorkflowExecutionDataProcessWithExecution extends IWorkflowExecutionDataProcess {
executionId: string;
userId: string;
}
export interface IWorkflowExecuteProcess {
startedAt: Date;
workflow: Workflow;

View File

@ -156,7 +156,6 @@ export class Server extends AbstractServer {
},
},
executionVariables: {
executions_process: config.getEnv('executions.process'),
executions_mode: config.getEnv('executions.mode'),
executions_timeout: config.getEnv('executions.timeout'),
executions_timeout_max: config.getEnv('executions.maxTimeout'),

View File

@ -3,7 +3,6 @@
/* eslint-disable @typescript-eslint/no-shadow */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { Container, Service } from 'typedi';
import type { IProcessMessage } from 'n8n-core';
import { WorkflowExecute } from 'n8n-core';
import type {
@ -22,8 +21,6 @@ import {
} from 'n8n-workflow';
import PCancelable from 'p-cancelable';
import { join as pathJoin } from 'path';
import { fork } from 'child_process';
import { ActiveExecutions } from '@/ActiveExecutions';
import config from '@/config';
@ -31,16 +28,10 @@ import { ExecutionRepository } from '@db/repositories/execution.repository';
import { MessageEventBus } from '@/eventbus';
import { ExecutionDataRecoveryService } from '@/eventbus/executionDataRecovery.service';
import { ExternalHooks } from '@/ExternalHooks';
import type {
IExecutionResponse,
IProcessMessageDataHook,
IWorkflowExecutionDataProcess,
IWorkflowExecutionDataProcessWithExecution,
} from '@/Interfaces';
import type { IExecutionResponse, IWorkflowExecutionDataProcess } from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import type { Job, JobData, JobResponse } from '@/Queue';
import { Queue } from '@/Queue';
import { decodeWebhookResponse } from '@/helpers/decodeWebhookResponse';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
@ -56,8 +47,6 @@ export class WorkflowRunner {
private executionsMode = config.getEnv('executions.mode');
private executionsProcess = config.getEnv('executions.process');
constructor(
private readonly logger: Logger,
private readonly activeExecutions: ActiveExecutions,
@ -68,14 +57,6 @@ export class WorkflowRunner {
private readonly permissionChecker: PermissionChecker,
) {}
/** The process did send a hook message so execute the appropriate hook */
private async processHookMessage(
workflowHooks: WorkflowHooks,
hookData: IProcessMessageDataHook,
) {
await workflowHooks.executeHookFunctions(hookData.hook, hookData.parameters);
}
/** The process did error */
async processError(
error: ExecutionError,
@ -189,11 +170,7 @@ export class WorkflowRunner {
responsePromise,
);
} else {
if (this.executionsProcess === 'main') {
executionId = await this.runMainProcess(data, loadStaticData, executionId, responsePromise);
} else {
executionId = await this.runSubprocess(data, loadStaticData, executionId, responsePromise);
}
executionId = await this.runMainProcess(data, loadStaticData, executionId, responsePromise);
void Container.get(InternalHooks).onWorkflowBeforeExecute(executionId, data);
}
@ -283,7 +260,7 @@ export class WorkflowRunner {
additionalData.restartExecutionId = restartExecutionId;
// Register the active execution
const executionId = await this.activeExecutions.add(data, undefined, restartExecutionId);
const executionId = await this.activeExecutions.add(data, restartExecutionId);
additionalData.executionId = executionId;
this.logger.verbose(
@ -380,7 +357,7 @@ export class WorkflowRunner {
if (workflowTimeout > 0) {
const timeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')) * 1000; // as seconds
executionTimeout = setTimeout(() => {
void this.activeExecutions.stopExecution(executionId, 'timeout');
void this.activeExecutions.stopExecution(executionId);
}, timeout);
}
@ -428,7 +405,7 @@ export class WorkflowRunner {
// TODO: If "loadStaticData" is set to true it has to load data new on worker
// Register the active execution
const executionId = await this.activeExecutions.add(data, undefined, restartExecutionId);
const executionId = await this.activeExecutions.add(data, restartExecutionId);
if (responsePromise) {
this.activeExecutions.attachResponsePromise(executionId, responsePromise);
}
@ -626,188 +603,4 @@ export class WorkflowRunner {
this.activeExecutions.attachWorkflowExecution(executionId, workflowExecution);
return executionId;
}
/** Run the workflow in a child-process */
private async runSubprocess(
data: IWorkflowExecutionDataProcess,
loadStaticData?: boolean,
restartExecutionId?: string,
responsePromise?: IDeferredPromise<IExecuteResponsePromiseData>,
): Promise<string> {
const workflowId = data.workflowData.id;
let startedAt = new Date();
const subprocess = fork(pathJoin(__dirname, 'WorkflowRunnerProcess.js'));
if (loadStaticData === true && workflowId) {
data.workflowData.staticData =
await this.workflowStaticDataService.getStaticDataById(workflowId);
}
data.restartExecutionId = restartExecutionId;
// Register the active execution
const executionId = await this.activeExecutions.add(data, subprocess, restartExecutionId);
(data as unknown as IWorkflowExecutionDataProcessWithExecution).executionId = executionId;
await this.executionRepository.updateStatus(executionId, 'running');
const workflowHooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(data, executionId);
try {
// Send all data to subprocess it needs to run the workflow
subprocess.send({ type: 'startWorkflow', data } as IProcessMessage);
} catch (error) {
await this.processError(error, new Date(), data.executionMode, executionId, workflowHooks);
return executionId;
}
// Start timeout for the execution
let executionTimeout: NodeJS.Timeout;
const workflowSettings = data.workflowData.settings ?? {};
let workflowTimeout = workflowSettings.executionTimeout ?? config.getEnv('executions.timeout'); // initialize with default
const processTimeoutFunction = (timeout: number) => {
void this.activeExecutions.stopExecution(executionId, 'timeout');
executionTimeout = setTimeout(() => subprocess.kill(), Math.max(timeout * 0.2, 5000)); // minimum 5 seconds
};
if (workflowTimeout > 0) {
workflowTimeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout')) * 1000; // as seconds
// Start timeout already now but give process at least 5 seconds to start.
// Without it could would it be possible that the workflow executions times out before it even got started if
// the timeout time is very short as the process start time can be quite long.
executionTimeout = setTimeout(
processTimeoutFunction,
Math.max(5000, workflowTimeout),
workflowTimeout,
);
}
// Create a list of child spawned executions
// If after the child process exits we have
// outstanding executions, we remove them
const childExecutionIds: string[] = [];
// Listen to data from the subprocess
subprocess.on('message', async (message: IProcessMessage) => {
this.logger.debug(
`Received child process message of type ${message.type} for execution ID ${executionId}.`,
{ executionId },
);
if (message.type === 'start') {
// Now that the execution actually started set the timeout again so that does not time out to early.
startedAt = new Date();
if (workflowTimeout > 0) {
clearTimeout(executionTimeout);
executionTimeout = setTimeout(processTimeoutFunction, workflowTimeout, workflowTimeout);
}
} else if (message.type === 'end') {
clearTimeout(executionTimeout);
this.activeExecutions.remove(executionId, message.data.runData);
} else if (message.type === 'sendResponse') {
if (responsePromise) {
responsePromise.resolve(decodeWebhookResponse(message.data.response));
}
} else if (message.type === 'sendDataToUI') {
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
WorkflowExecuteAdditionalData.sendDataToUI.bind({ sessionId: data.sessionId })(
message.data.type,
message.data.data,
);
} else if (message.type === 'processError') {
clearTimeout(executionTimeout);
const executionError = message.data.executionError as ExecutionError;
await this.processError(
executionError,
startedAt,
data.executionMode,
executionId,
workflowHooks,
);
} else if (message.type === 'processHook') {
await this.processHookMessage(workflowHooks, message.data as IProcessMessageDataHook);
} else if (message.type === 'timeout') {
// Execution timed out and its process has been terminated
const timeoutError = new WorkflowOperationError('Workflow execution timed out!');
// No need to add hook here as the subprocess takes care of calling the hooks
await this.processError(timeoutError, startedAt, data.executionMode, executionId);
} else if (message.type === 'startExecution') {
const executionId = await this.activeExecutions.add(message.data.runData);
childExecutionIds.push(executionId);
subprocess.send({ type: 'executionId', data: { executionId } } as IProcessMessage);
} else if (message.type === 'finishExecution') {
const executionIdIndex = childExecutionIds.indexOf(message.data.executionId);
if (executionIdIndex !== -1) {
childExecutionIds.splice(executionIdIndex, 1);
}
if (message.data.result === undefined) {
const noDataError = new WorkflowOperationError('Workflow finished with no result data');
const subWorkflowHooks = WorkflowExecuteAdditionalData.getWorkflowHooksMain(
data,
message.data.executionId,
);
await this.processError(
noDataError,
startedAt,
data.executionMode,
message.data?.executionId,
subWorkflowHooks,
);
} else {
this.activeExecutions.remove(message.data.executionId, message.data.result);
}
}
});
// Also get informed when the processes does exit especially when it did crash or timed out
subprocess.on('exit', async (code, signal) => {
if (signal === 'SIGTERM') {
this.logger.debug(`Subprocess for execution ID ${executionId} timed out.`, { executionId });
// Execution timed out and its process has been terminated
const timeoutError = new WorkflowOperationError('Workflow execution timed out!');
await this.processError(
timeoutError,
startedAt,
data.executionMode,
executionId,
workflowHooks,
);
} else if (code !== 0) {
this.logger.debug(
`Subprocess for execution ID ${executionId} finished with error code ${code}.`,
{ executionId },
);
// Process did exit with error code, so something went wrong.
const executionError = new WorkflowOperationError(
'Workflow execution process crashed for an unknown reason!',
);
await this.processError(
executionError,
startedAt,
data.executionMode,
executionId,
workflowHooks,
);
}
for (const executionId of childExecutionIds) {
// When the child process exits, if we still have
// pending child executions, we mark them as finished
// They will display as unknown to the user
// Instead of pending forever as executing when it
// actually isn't anymore.
this.activeExecutions.remove(executionId);
}
clearTimeout(executionTimeout);
});
return executionId;
}
}

View File

@ -1,506 +0,0 @@
/* eslint-disable @typescript-eslint/no-unsafe-argument */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable @typescript-eslint/no-shadow */
/* eslint-disable @typescript-eslint/no-use-before-define */
/* eslint-disable @typescript-eslint/unbound-method */
import 'source-map-support/register';
import 'reflect-metadata';
import { setDefaultResultOrder } from 'dns';
import { Container } from 'typedi';
import type { IProcessMessage } from 'n8n-core';
import { BinaryDataService, WorkflowExecute } from 'n8n-core';
import type {
ExecutionError,
IDataObject,
IExecuteResponsePromiseData,
IExecuteWorkflowInfo,
INode,
INodeExecutionData,
IRun,
ITaskData,
IWorkflowExecuteAdditionalData,
IWorkflowExecuteHooks,
IWorkflowSettings,
NodeOperationError,
WorkflowExecuteMode,
} from 'n8n-workflow';
import {
ErrorReporterProxy as ErrorReporter,
Workflow,
WorkflowHooks,
WorkflowOperationError,
} from 'n8n-workflow';
import * as Db from '@/Db';
import { ExternalHooks } from '@/ExternalHooks';
import type {
IWorkflowExecuteProcess,
IWorkflowExecutionDataProcessWithExecution,
} from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import * as WebhookHelpers from '@/WebhookHelpers';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { Logger } from '@/Logger';
import config from '@/config';
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
import { initErrorHandling } from '@/ErrorReporting';
import { PermissionChecker } from '@/UserManagement/PermissionChecker';
import { License } from '@/License';
import { InternalHooks } from '@/InternalHooks';
import { PostHogClient } from '@/posthog';
if (process.env.NODEJS_PREFER_IPV4 === 'true') {
setDefaultResultOrder('ipv4first');
}
class WorkflowRunnerProcess {
data: IWorkflowExecutionDataProcessWithExecution | undefined;
logger: Logger;
startedAt = new Date();
workflow: Workflow | undefined;
workflowExecute: WorkflowExecute | undefined;
// eslint-disable-next-line @typescript-eslint/no-invalid-void-type
executionIdCallback: (executionId: string) => void | undefined;
childExecutions: {
[key: string]: IWorkflowExecuteProcess;
} = {};
static async stopProcess() {
setTimeout(() => {
// Attempt a graceful shutdown, giving executions 30 seconds to finish
process.exit(0);
}, 30000);
}
constructor() {
this.logger = Container.get(Logger);
}
async runWorkflow(inputData: IWorkflowExecutionDataProcessWithExecution): Promise<IRun> {
process.once('SIGTERM', WorkflowRunnerProcess.stopProcess);
process.once('SIGINT', WorkflowRunnerProcess.stopProcess);
await initErrorHandling();
this.data = inputData;
const { userId } = inputData;
this.logger.verbose('Initializing n8n sub-process', {
pid: process.pid,
workflowId: this.data.workflowData.id,
});
this.startedAt = new Date();
// Init db since we need to read the license.
await Db.init();
const nodeTypes = Container.get(NodeTypes);
await Container.get(LoadNodesAndCredentials).init();
// Load all external hooks
const externalHooks = Container.get(ExternalHooks);
await externalHooks.init();
await Container.get(PostHogClient).init();
await Container.get(InternalHooks).init();
const binaryDataConfig = config.getEnv('binaryDataManager');
await Container.get(BinaryDataService).init(binaryDataConfig);
const license = Container.get(License);
await license.init();
const workflowSettings = this.data.workflowData.settings ?? {};
// Start timeout for the execution
let workflowTimeout = workflowSettings.executionTimeout ?? config.getEnv('executions.timeout'); // initialize with default
if (workflowTimeout > 0) {
workflowTimeout = Math.min(workflowTimeout, config.getEnv('executions.maxTimeout'));
}
this.workflow = new Workflow({
id: this.data.workflowData.id,
name: this.data.workflowData.name,
nodes: this.data.workflowData.nodes,
connections: this.data.workflowData.connections,
active: this.data.workflowData.active,
nodeTypes,
staticData: this.data.workflowData.staticData,
settings: this.data.workflowData.settings,
pinData: this.data.pinData,
});
try {
await Container.get(PermissionChecker).check(this.workflow, userId);
} catch (error) {
const caughtError = error as NodeOperationError;
const failedExecutionData = generateFailedExecutionFromError(
this.data.executionMode,
caughtError,
caughtError.node,
);
// Force the `workflowExecuteAfter` hook to run since
// it's the one responsible for saving the execution
await this.sendHookToParentProcess('workflowExecuteAfter', [failedExecutionData]);
// Interrupt the workflow execution since we don't have all necessary creds.
return failedExecutionData;
}
const additionalData = await WorkflowExecuteAdditionalData.getBase(
userId,
undefined,
workflowTimeout <= 0 ? undefined : Date.now() + workflowTimeout * 1000,
);
additionalData.restartExecutionId = this.data.restartExecutionId;
additionalData.hooks = this.getProcessForwardHooks();
additionalData.hooks.hookFunctions.sendResponse = [
async (response: IExecuteResponsePromiseData): Promise<void> => {
await sendToParentProcess('sendResponse', {
response: WebhookHelpers.encodeWebhookResponse(response),
});
},
];
additionalData.executionId = inputData.executionId;
additionalData.setExecutionStatus = WorkflowExecuteAdditionalData.setExecutionStatus.bind({
executionId: inputData.executionId,
});
additionalData.sendDataToUI = async (type: string, data: IDataObject | IDataObject[]) => {
if (workflowRunner.data!.executionMode !== 'manual') {
return;
}
try {
await sendToParentProcess('sendDataToUI', { type, data });
} catch (error) {
ErrorReporter.error(error);
this.logger.error(
`There was a problem sending UI data to parent process: "${error.message}"`,
);
}
};
const executeWorkflowFunction = additionalData.executeWorkflow;
additionalData.executeWorkflow = async (
workflowInfo: IExecuteWorkflowInfo,
additionalData: IWorkflowExecuteAdditionalData,
options: {
parentWorkflowId: string;
inputData?: INodeExecutionData[];
parentWorkflowSettings?: IWorkflowSettings;
},
): Promise<Array<INodeExecutionData[] | null> | IRun> => {
const workflowData = await WorkflowExecuteAdditionalData.getWorkflowData(
workflowInfo,
options.parentWorkflowId,
options.parentWorkflowSettings,
);
const runData = await WorkflowExecuteAdditionalData.getRunData(
workflowData,
additionalData.userId,
options?.inputData,
);
await sendToParentProcess('startExecution', { runData });
const executionId: string = await new Promise((resolve) => {
this.executionIdCallback = (executionId: string) => {
resolve(executionId);
};
});
void Container.get(InternalHooks).onWorkflowBeforeExecute(executionId || '', runData);
let result: IRun;
try {
const executeWorkflowFunctionOutput = (await executeWorkflowFunction(
workflowInfo,
additionalData,
{
parentWorkflowId: options?.parentWorkflowId,
inputData: options?.inputData,
parentExecutionId: executionId,
loadedWorkflowData: workflowData,
loadedRunData: runData,
parentWorkflowSettings: options?.parentWorkflowSettings,
},
)) as { workflowExecute: WorkflowExecute; workflow: Workflow } as IWorkflowExecuteProcess;
const { workflowExecute } = executeWorkflowFunctionOutput;
this.childExecutions[executionId] = executeWorkflowFunctionOutput;
const { workflow } = executeWorkflowFunctionOutput;
result = await workflowExecute.processRunExecutionData(workflow);
await externalHooks.run('workflow.postExecute', [result, workflowData, executionId]);
void Container.get(InternalHooks).onWorkflowPostExecute(
executionId,
workflowData,
result,
additionalData.userId,
);
await sendToParentProcess('finishExecution', { executionId, result });
delete this.childExecutions[executionId];
} catch (e) {
await sendToParentProcess('finishExecution', { executionId });
delete this.childExecutions[executionId];
// Throw same error we had
throw e;
}
await sendToParentProcess('finishExecution', { executionId, result });
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(result);
if (returnData!.error) {
const error = new Error(returnData!.error.message);
error.stack = returnData!.error.stack;
throw error;
}
return returnData!.data!.main;
};
if (this.data.executionData !== undefined) {
this.workflowExecute = new WorkflowExecute(
additionalData,
this.data.executionMode,
this.data.executionData,
);
return await this.workflowExecute.processRunExecutionData(this.workflow);
}
if (
this.data.runData === undefined ||
this.data.startNodes === undefined ||
this.data.startNodes.length === 0
) {
// Execute all nodes
const startNode = WorkflowHelpers.getExecutionStartNode(this.data, this.workflow);
// Can execute without webhook so go on
this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode);
return await this.workflowExecute.run(
this.workflow,
startNode,
this.data.destinationNode,
this.data.pinData,
);
}
// Execute only the nodes between start and destination nodes
this.workflowExecute = new WorkflowExecute(additionalData, this.data.executionMode);
return await this.workflowExecute.runPartialWorkflow(
this.workflow,
this.data.runData,
this.data.startNodes,
this.data.destinationNode,
this.data.pinData,
);
}
/**
* Sends hook data to the parent process that it executes them
*/
async sendHookToParentProcess(hook: string, parameters: any[]) {
try {
await sendToParentProcess('processHook', {
hook,
parameters,
});
} catch (error) {
ErrorReporter.error(error);
this.logger.error(`There was a problem sending hook: "${hook}"`, { parameters, error });
}
}
/**
* Create a wrapper for hooks which simply forwards the data to
* the parent process where they then can be executed with access
* to database and to PushService
*
*/
getProcessForwardHooks(): WorkflowHooks {
const hookFunctions: IWorkflowExecuteHooks = {
nodeExecuteBefore: [
async (nodeName: string): Promise<void> => {
await this.sendHookToParentProcess('nodeExecuteBefore', [nodeName]);
},
],
nodeExecuteAfter: [
async (nodeName: string, data: ITaskData): Promise<void> => {
await this.sendHookToParentProcess('nodeExecuteAfter', [nodeName, data]);
},
],
workflowExecuteBefore: [
async (): Promise<void> => {
await this.sendHookToParentProcess('workflowExecuteBefore', []);
},
],
workflowExecuteAfter: [
async (fullRunData: IRun, newStaticData?: IDataObject): Promise<void> => {
await this.sendHookToParentProcess('workflowExecuteAfter', [fullRunData, newStaticData]);
},
],
nodeFetchedData: [
async (workflowId: string, node: INode) => {
await this.sendHookToParentProcess('nodeFetchedData', [workflowId, node]);
},
],
};
const preExecuteFunctions = WorkflowExecuteAdditionalData.hookFunctionsPreExecute();
for (const key of Object.keys(preExecuteFunctions)) {
if (hookFunctions[key] === undefined) {
hookFunctions[key] = [];
}
hookFunctions[key]!.push.apply(hookFunctions[key], preExecuteFunctions[key]);
}
return new WorkflowHooks(
hookFunctions,
this.data!.executionMode,
this.data!.executionId,
this.data!.workflowData,
{ sessionId: this.data!.sessionId, retryOf: this.data!.retryOf as string },
);
}
}
/**
* Sends data to parent process
*
* @param {string} type The type of data to send
* @param {*} data The data
*/
async function sendToParentProcess(type: string, data: any): Promise<void> {
return await new Promise((resolve, reject) => {
process.send!(
{
type,
data,
},
(error: Error) => {
if (error) {
return reject(error);
}
resolve();
},
);
});
}
const workflowRunner = new WorkflowRunnerProcess();
// Listen to messages from parent process which send the data of
// the workflow to process
process.on('message', async (message: IProcessMessage) => {
try {
if (message.type === 'startWorkflow') {
await sendToParentProcess('start', {});
const runData = await workflowRunner.runWorkflow(message.data);
await sendToParentProcess('end', {
runData,
});
// Once the workflow got executed make sure the process gets killed again
process.exit();
} else if (message.type === 'stopExecution' || message.type === 'timeout') {
// The workflow execution should be stopped
let runData: IRun;
if (workflowRunner.workflowExecute !== undefined) {
const executionIds = Object.keys(workflowRunner.childExecutions);
for (const executionId of executionIds) {
const childWorkflowExecute = workflowRunner.childExecutions[executionId];
runData = childWorkflowExecute.workflowExecute.getFullRunData(
workflowRunner.childExecutions[executionId].startedAt,
);
const timeOutError =
message.type === 'timeout'
? new WorkflowOperationError('Workflow execution timed out!')
: new WorkflowOperationError('Workflow-Execution has been canceled!');
// If there is any data send it to parent process, if execution timedout add the error
await childWorkflowExecute.workflowExecute.processSuccessExecution(
workflowRunner.childExecutions[executionId].startedAt,
childWorkflowExecute.workflow,
timeOutError,
);
}
// Workflow started already executing
runData = workflowRunner.workflowExecute.getFullRunData(workflowRunner.startedAt);
const timeOutError =
message.type === 'timeout'
? new WorkflowOperationError('Workflow execution timed out!')
: new WorkflowOperationError('Workflow-Execution has been canceled!');
runData.status = message.type === 'timeout' ? 'failed' : 'canceled';
// If there is any data send it to parent process, if execution timedout add the error
await workflowRunner.workflowExecute.processSuccessExecution(
workflowRunner.startedAt,
workflowRunner.workflow!,
timeOutError,
);
} else {
// Workflow did not get started yet
runData = {
data: {
resultData: {
runData: {},
},
},
finished: false,
mode: workflowRunner.data
? workflowRunner.data.executionMode
: ('own' as WorkflowExecuteMode),
startedAt: workflowRunner.startedAt,
stoppedAt: new Date(),
status: 'canceled',
};
await workflowRunner.sendHookToParentProcess('workflowExecuteAfter', [runData]);
}
await sendToParentProcess(message.type === 'timeout' ? message.type : 'end', {
runData,
});
// Stop process
process.exit();
} else if (message.type === 'executionId') {
workflowRunner.executionIdCallback(message.data.executionId);
}
} catch (error) {
workflowRunner.logger.error(error.message);
// Catch all uncaught errors and forward them to parent process
const executionError = {
...error,
name: error.name || 'Error',
message: error.message,
stack: error.stack,
} as ExecutionError;
await sendToParentProcess('processError', {
executionError,
});
process.exit();
}
});

View File

@ -84,8 +84,8 @@ export abstract class BaseCommand extends Command {
);
}
if (process.env.EXECUTIONS_PROCESS === 'own') {
this.logger.warn(
'Own mode has been deprecated and will be removed in a future version of n8n. If you need the isolation and performance gains, please consider using queue mode.',
throw new ApplicationError(
'Own mode has been removed. If you need the isolation and performance gains, please consider using queue mode.',
);
}

View File

@ -6,7 +6,6 @@ import { inTest, inE2ETests } from '@/constants';
if (inE2ETests) {
// Skip loading config from env variables in end-to-end tests
process.env.EXECUTIONS_PROCESS = 'main';
process.env.N8N_DIAGNOSTICS_ENABLED = 'false';
process.env.N8N_PUBLIC_API_DISABLED = 'true';
process.env.EXTERNAL_FRONTEND_HOOKS_URLS = '';

View File

@ -234,15 +234,6 @@ export const schema = {
},
executions: {
// By default workflows get always executed in the main process.
// TODO: remove this and all usage of `executions.process` when `own` mode is deleted
process: {
doc: 'In what process workflows should be executed.',
format: ['main', 'own'] as const,
default: 'main',
env: 'EXECUTIONS_PROCESS',
},
mode: {
doc: 'If it should run executions directly or via queue',
format: ['regular', 'queue'] as const,

View File

@ -45,11 +45,7 @@ describe('ActiveExecutions', () => {
test('Should update execution if add is called with execution ID', async () => {
const newExecution = mockExecutionData();
const executionId = await activeExecutions.add(
newExecution,
undefined,
FAKE_SECOND_EXECUTION_ID,
);
const executionId = await activeExecutions.add(newExecution, FAKE_SECOND_EXECUTION_ID);
expect(executionId).toBe(FAKE_SECOND_EXECUTION_ID);
expect(activeExecutions.getActiveExecutions().length).toBe(1);
@ -67,7 +63,7 @@ describe('ActiveExecutions', () => {
test('Should successfully attach execution to valid executionId', async () => {
const newExecution = mockExecutionData();
await activeExecutions.add(newExecution, undefined, FAKE_EXECUTION_ID);
await activeExecutions.add(newExecution, FAKE_EXECUTION_ID);
const deferredPromise = mockCancelablePromise();
expect(() =>
@ -77,7 +73,7 @@ describe('ActiveExecutions', () => {
test('Should attach and resolve response promise to existing execution', async () => {
const newExecution = mockExecutionData();
await activeExecutions.add(newExecution, undefined, FAKE_EXECUTION_ID);
await activeExecutions.add(newExecution, FAKE_EXECUTION_ID);
const deferredPromise = await mockDeferredPromise();
activeExecutions.attachResponsePromise(FAKE_EXECUTION_ID, deferredPromise);
const fakeResponse = { data: { resultData: { runData: {} } } };
@ -129,6 +125,7 @@ function mockExecutionData(): IWorkflowExecutionDataProcess {
return {
executionMode: 'manual',
workflowData: {
id: '123',
name: 'Test workflow 1',
active: false,
createdAt: new Date(),

View File

@ -7,12 +7,6 @@ import type {
export type Class<T = object, A extends unknown[] = unknown[]> = new (...args: A) => T;
export interface IProcessMessage {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
data?: any;
type: string;
}
export interface IResponseError extends Error {
statusCode?: number;
}