diff --git a/docker/images/n8n/n8n-task-runners.json b/docker/images/n8n/n8n-task-runners.json index d9575997c0..c64d0ecdd0 100644 --- a/docker/images/n8n/n8n-task-runners.json +++ b/docker/images/n8n/n8n-task-runners.json @@ -12,6 +12,7 @@ "N8N_RUNNERS_TASK_BROKER_URI", "N8N_RUNNERS_MAX_PAYLOAD", "N8N_RUNNERS_MAX_CONCURRENCY", + "N8N_RUNNERS_TASK_TIMEOUT", "N8N_RUNNERS_HEALTH_CHECK_SERVER_ENABLED", "N8N_RUNNERS_HEALTH_CHECK_SERVER_HOST", "N8N_RUNNERS_HEALTH_CHECK_SERVER_PORT", diff --git a/packages/@n8n/config/src/configs/runners.config.ts b/packages/@n8n/config/src/configs/runners.config.ts index 06e262fe49..733e724408 100644 --- a/packages/@n8n/config/src/configs/runners.config.ts +++ b/packages/@n8n/config/src/configs/runners.config.ts @@ -43,11 +43,11 @@ export class TaskRunnersConfig { @Env('N8N_RUNNERS_MAX_CONCURRENCY') maxConcurrency: number = 5; - /** How long (in seconds) a task is allowed to take for completion, else the task will be aborted and the runner restarted. Must be greater than 0. */ + /** How long (in seconds) a task is allowed to take for completion, else the task will be aborted. (In internal mode, the runner will also be restarted.) Must be greater than 0. */ @Env('N8N_RUNNERS_TASK_TIMEOUT') taskTimeout: number = 60; - /** How often (in seconds) the runner must send a heartbeat to the broker, else the task will be aborted and the runner restarted. Must be greater than 0. */ + /** How often (in seconds) the runner must send a heartbeat to the broker, else the task will be aborted. (In internal mode, the runner will also be restarted.) Must be greater than 0. */ @Env('N8N_RUNNERS_HEARTBEAT_INTERVAL') heartbeatInterval: number = 30; } diff --git a/packages/@n8n/task-runner/src/config/base-runner-config.ts b/packages/@n8n/task-runner/src/config/base-runner-config.ts index a1059adf4b..d08056c5ae 100644 --- a/packages/@n8n/task-runner/src/config/base-runner-config.ts +++ b/packages/@n8n/task-runner/src/config/base-runner-config.ts @@ -37,6 +37,9 @@ export class BaseRunnerConfig { @Env('GENERIC_TIMEZONE') timezone: string = 'America/New_York'; + @Env('N8N_RUNNERS_TASK_TIMEOUT') + taskTimeout: number = 60; + @Nested healthcheckServer!: HealthcheckServerConfig; } diff --git a/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts b/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts index 439de19eac..e53d246fcb 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts @@ -1,3 +1,4 @@ +import { mock } from 'jest-mock-extended'; import { DateTime } from 'luxon'; import { setGlobalState, type CodeExecutionMode, type IDataObject } from 'n8n-workflow'; import fs from 'node:fs'; @@ -61,7 +62,7 @@ describe('JsTaskRunner', () => { runner?: JsTaskRunner; }) => { jest.spyOn(runner, 'requestData').mockResolvedValue(taskData); - return await runner.executeTask(task); + return await runner.executeTask(task, mock()); }; afterEach(() => { diff --git a/packages/@n8n/task-runner/src/js-task-runner/__tests__/task-runner.test.ts b/packages/@n8n/task-runner/src/js-task-runner/__tests__/task-runner.test.ts index c633e95688..af3e727409 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/__tests__/task-runner.test.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/__tests__/task-runner.test.ts @@ -21,6 +21,7 @@ describe('TestRunner', () => { maxPayloadSize: 1024, taskBrokerUri: 'http://localhost:8080', timezone: 'America/New_York', + taskTimeout: 60, healthcheckServer: { enabled: false, host: 'localhost', @@ -48,6 +49,7 @@ describe('TestRunner', () => { maxPayloadSize: 1024, taskBrokerUri: 'https://example.com:3000/path', timezone: 'America/New_York', + taskTimeout: 60, healthcheckServer: { enabled: false, host: 'localhost', @@ -77,6 +79,7 @@ describe('TestRunner', () => { maxPayloadSize: 1024, taskBrokerUri: 'not-a-valid-uri', timezone: 'America/New_York', + taskTimeout: 60, healthcheckServer: { enabled: false, host: 'localhost', @@ -86,4 +89,65 @@ describe('TestRunner', () => { ).toThrowError(/Invalid URL/); }); }); + + describe('taskCancelled', () => { + it('should reject pending requests when task is cancelled', () => { + const runner = new TestRunner({ + taskType: 'test-task', + maxConcurrency: 5, + idleTimeout: 60, + grantToken: 'test-token', + maxPayloadSize: 1024, + taskBrokerUri: 'http://localhost:8080', + timezone: 'America/New_York', + taskTimeout: 60, + healthcheckServer: { + enabled: false, + host: 'localhost', + port: 8081, + }, + }); + + const taskId = 'test-task'; + runner.runningTasks.set(taskId, { + taskId, + active: false, + cancelled: false, + }); + + const dataRequestReject = jest.fn(); + const nodeTypesRequestReject = jest.fn(); + + runner.dataRequests.set('data-req', { + taskId, + requestId: 'data-req', + resolve: jest.fn(), + reject: dataRequestReject, + }); + + runner.nodeTypesRequests.set('node-req', { + taskId, + requestId: 'node-req', + resolve: jest.fn(), + reject: nodeTypesRequestReject, + }); + + runner.taskCancelled(taskId, 'test-reason'); + + expect(dataRequestReject).toHaveBeenCalledWith( + expect.objectContaining({ + message: 'Task cancelled: test-reason', + }), + ); + + expect(nodeTypesRequestReject).toHaveBeenCalledWith( + expect.objectContaining({ + message: 'Task cancelled: test-reason', + }), + ); + + expect(runner.dataRequests.size).toBe(0); + expect(runner.nodeTypesRequests.size).toBe(0); + }); + }); }); diff --git a/packages/@n8n/task-runner/src/js-task-runner/errors/task-cancelled-error.ts b/packages/@n8n/task-runner/src/js-task-runner/errors/task-cancelled-error.ts new file mode 100644 index 0000000000..1970c11fcd --- /dev/null +++ b/packages/@n8n/task-runner/src/js-task-runner/errors/task-cancelled-error.ts @@ -0,0 +1,7 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class TaskCancelledError extends ApplicationError { + constructor(reason: string) { + super(`Task cancelled: ${reason}`, { level: 'warning' }); + } +} diff --git a/packages/@n8n/task-runner/src/js-task-runner/errors/timeout-error.ts b/packages/@n8n/task-runner/src/js-task-runner/errors/timeout-error.ts new file mode 100644 index 0000000000..ef3cc89751 --- /dev/null +++ b/packages/@n8n/task-runner/src/js-task-runner/errors/timeout-error.ts @@ -0,0 +1,30 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class TimeoutError extends ApplicationError { + description: string; + + constructor(taskTimeout: number) { + super( + `Task execution timed out after ${taskTimeout} ${taskTimeout === 1 ? 'second' : 'seconds'}`, + ); + + const subtitle = 'The task runner was taking too long on this task, so the task was aborted.'; + + const fixes = { + optimizeScript: + 'Optimize your script to prevent long-running tasks, e.g. by processing data in smaller batches.', + ensureTermination: + 'Ensure that all paths in your script are able to terminate, i.e. no infinite loops.', + }; + + const suggestions = [fixes.optimizeScript, fixes.ensureTermination]; + + const suggestionsText = suggestions + .map((suggestion, index) => `${index + 1}. ${suggestion}`) + .join('
'); + + const description = `${subtitle} You can try the following:

${suggestionsText}`; + + this.description = description; + } +} diff --git a/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts b/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts index 005267862e..cbc1b4d832 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts @@ -32,6 +32,7 @@ import { BuiltInsParserState } from './built-ins-parser/built-ins-parser-state'; import { isErrorLike } from './errors/error-like'; import { ExecutionError } from './errors/execution-error'; import { makeSerializable } from './errors/serializable-error'; +import { TimeoutError } from './errors/timeout-error'; import type { RequireResolver } from './require-resolver'; import { createRequireResolver } from './require-resolver'; import { validateRunForAllItemsOutput, validateRunForEachItemOutput } from './result-validation'; @@ -94,7 +95,7 @@ export class JsTaskRunner extends TaskRunner { }); } - async executeTask(task: Task): Promise { + async executeTask(task: Task, signal: AbortSignal): Promise { const settings = task.settings; a.ok(settings, 'JS Code not sent to runner'); @@ -133,8 +134,8 @@ export class JsTaskRunner extends TaskRunner { const result = settings.nodeMode === 'runOnceForAllItems' - ? await this.runForAllItems(task.taskId, settings, data, workflow, customConsole) - : await this.runForEachItem(task.taskId, settings, data, workflow, customConsole); + ? await this.runForAllItems(task.taskId, settings, data, workflow, customConsole, signal) + : await this.runForEachItem(task.taskId, settings, data, workflow, customConsole, signal); return { result, @@ -183,6 +184,7 @@ export class JsTaskRunner extends TaskRunner { data: JsTaskData, workflow: Workflow, customConsole: CustomConsole, + signal: AbortSignal, ): Promise { const dataProxy = this.createDataProxy(data, workflow, data.itemIndex); const inputItems = data.connectionInputData; @@ -199,10 +201,26 @@ export class JsTaskRunner extends TaskRunner { }; try { - const result = (await runInNewContext( - `globalThis.global = globalThis; module.exports = async function VmCodeWrapper() {${settings.code}\n}()`, - context, - )) as TaskResultData['result']; + const result = await new Promise((resolve, reject) => { + const abortHandler = () => { + reject(new TimeoutError(this.taskTimeout)); + }; + + signal.addEventListener('abort', abortHandler, { once: true }); + + const taskResult = runInNewContext( + `globalThis.global = globalThis; module.exports = async function VmCodeWrapper() {${settings.code}\n}()`, + context, + { timeout: this.taskTimeout * 1000 }, + ) as Promise; + + void taskResult + .then(resolve) + .catch(reject) + .finally(() => { + signal.removeEventListener('abort', abortHandler); + }); + }); if (result === null) { return []; @@ -230,6 +248,7 @@ export class JsTaskRunner extends TaskRunner { data: JsTaskData, workflow: Workflow, customConsole: CustomConsole, + signal: AbortSignal, ): Promise { const inputItems = data.connectionInputData; const returnData: INodeExecutionData[] = []; @@ -255,10 +274,26 @@ export class JsTaskRunner extends TaskRunner { }; try { - let result = (await runInNewContext( - `module.exports = async function VmCodeWrapper() {${settings.code}\n}()`, - context, - )) as INodeExecutionData | undefined; + let result = await new Promise((resolve, reject) => { + const abortHandler = () => { + reject(new TimeoutError(this.taskTimeout)); + }; + + signal.addEventListener('abort', abortHandler); + + const taskResult = runInNewContext( + `module.exports = async function VmCodeWrapper() {${settings.code}\n}()`, + context, + { timeout: this.taskTimeout * 1000 }, + ) as Promise; + + void taskResult + .then(resolve) + .catch(reject) + .finally(() => { + signal.removeEventListener('abort', abortHandler); + }); + }); // Filter out null values if (result === null) { diff --git a/packages/@n8n/task-runner/src/task-runner.ts b/packages/@n8n/task-runner/src/task-runner.ts index f0af115b5a..e8ee605ef5 100644 --- a/packages/@n8n/task-runner/src/task-runner.ts +++ b/packages/@n8n/task-runner/src/task-runner.ts @@ -8,6 +8,8 @@ import type { BrokerMessage, RunnerMessage } from '@/message-types'; import { TaskRunnerNodeTypes } from '@/node-types'; import { RPC_ALLOW_LIST, type TaskResultData } from '@/runner-types'; +import { TaskCancelledError } from './js-task-runner/errors/task-cancelled-error'; + export interface Task { taskId: string; settings?: T; @@ -21,12 +23,14 @@ export interface TaskOffer { } interface DataRequest { + taskId: string; requestId: string; resolve: (data: unknown) => void; reject: (error: unknown) => void; } interface NodeTypesRequest { + taskId: string; requestId: string; resolve: (data: unknown) => void; reject: (error: unknown) => void; @@ -82,14 +86,20 @@ export abstract class TaskRunner extends EventEmitter { private idleTimer: NodeJS.Timeout | undefined; + /** How long (in seconds) a task is allowed to take for completion, else the task will be aborted. */ + protected readonly taskTimeout: number; + /** How long (in seconds) a runner may be idle for before exit. */ private readonly idleTimeout: number; + protected taskCancellations = new Map(); + constructor(opts: TaskRunnerOpts) { super(); this.taskType = opts.taskType; this.name = opts.name ?? 'Node.js Task Runner SDK'; this.maxConcurrency = opts.maxConcurrency; + this.taskTimeout = opts.taskTimeout; this.idleTimeout = opts.idleTimeout; const { host: taskBrokerHost } = new URL(opts.taskBrokerUri); @@ -210,7 +220,7 @@ export abstract class TaskRunner extends EventEmitter { this.offerAccepted(message.offerId, message.taskId); break; case 'broker:taskcancel': - this.taskCancelled(message.taskId); + this.taskCancelled(message.taskId, message.reason); break; case 'broker:tasksettings': void this.receivedSettings(message.taskId, message.settings); @@ -285,17 +295,35 @@ export abstract class TaskRunner extends EventEmitter { }); } - taskCancelled(taskId: string) { + taskCancelled(taskId: string, reason: string) { const task = this.runningTasks.get(taskId); if (!task) { return; } task.cancelled = true; - if (task.active) { - // TODO - } else { - this.runningTasks.delete(taskId); + + for (const [requestId, request] of this.dataRequests.entries()) { + if (request.taskId === taskId) { + request.reject(new TaskCancelledError(reason)); + this.dataRequests.delete(requestId); + } } + + for (const [requestId, request] of this.nodeTypesRequests.entries()) { + if (request.taskId === taskId) { + request.reject(new TaskCancelledError(reason)); + this.nodeTypesRequests.delete(requestId); + } + } + + const controller = this.taskCancellations.get(taskId); + if (controller) { + controller.abort(); + this.taskCancellations.delete(taskId); + } + + if (!task.active) this.runningTasks.delete(taskId); + this.sendOffers(); } @@ -328,20 +356,33 @@ export abstract class TaskRunner extends EventEmitter { this.runningTasks.delete(taskId); return; } + + const controller = new AbortController(); + this.taskCancellations.set(taskId, controller); + + const taskTimeout = setTimeout(() => { + if (!task.cancelled) { + controller.abort(); + this.taskCancellations.delete(taskId); + } + }, this.taskTimeout * 1_000); + task.settings = settings; task.active = true; try { - const data = await this.executeTask(task); + const data = await this.executeTask(task, controller.signal); this.taskDone(taskId, data); } catch (error) { - this.taskErrored(taskId, error); + if (!task.cancelled) this.taskErrored(taskId, error); } finally { + clearTimeout(taskTimeout); + this.taskCancellations.delete(taskId); this.resetIdleTimer(); } } // eslint-disable-next-line @typescript-eslint/naming-convention - async executeTask(_task: Task): Promise { + async executeTask(_task: Task, _signal: AbortSignal): Promise { throw new ApplicationError('Unimplemented'); } @@ -354,6 +395,7 @@ export abstract class TaskRunner extends EventEmitter { const nodeTypesPromise = new Promise((resolve, reject) => { this.nodeTypesRequests.set(requestId, { requestId, + taskId, resolve: resolve as (data: unknown) => void, reject, }); @@ -382,6 +424,7 @@ export abstract class TaskRunner extends EventEmitter { const p = new Promise((resolve, reject) => { this.dataRequests.set(requestId, { requestId, + taskId, resolve: resolve as (data: unknown) => void, reject, }); diff --git a/packages/cli/src/runners/__tests__/task-broker.test.ts b/packages/cli/src/runners/__tests__/task-broker.test.ts index 8e86f189e8..1f5030ada8 100644 --- a/packages/cli/src/runners/__tests__/task-broker.test.ts +++ b/packages/cli/src/runners/__tests__/task-broker.test.ts @@ -6,6 +6,7 @@ import { ApplicationError, type INodeTypeBaseDescription } from 'n8n-workflow'; import { Time } from '@/constants'; import { TaskRejectError } from '../errors'; +import { TaskRunnerTimeoutError } from '../errors/task-runner-timeout.error'; import type { RunnerLifecycleEvents } from '../runner-lifecycle-events'; import { TaskBroker } from '../task-broker.service'; import type { TaskOffer, TaskRequest, TaskRunner } from '../task-broker.service'; @@ -721,7 +722,7 @@ describe('TaskBroker', () => { beforeAll(() => { jest.useFakeTimers(); - config = mock({ taskTimeout: 30 }); + config = mock({ taskTimeout: 30, mode: 'internal' }); taskBroker = new TaskBroker(mock(), config, runnerLifecycleEvents); }); @@ -800,7 +801,7 @@ describe('TaskBroker', () => { expect(taskBroker.getTasks().get(taskId)).toBeUndefined(); }); - it('on timeout, we should emit `runner:timed-out-during-task` event and send error to requester', async () => { + it('[internal mode] on timeout, we should emit `runner:timed-out-during-task` event and send error to requester', async () => { jest.spyOn(global, 'clearTimeout'); const taskId = 'task1'; @@ -839,5 +840,50 @@ describe('TaskBroker', () => { expect(taskBroker.getTasks().get(taskId)).toBeUndefined(); }); + + it('[external mode] on timeout, we should instruct the runner to cancel and send error to requester', async () => { + const config = mock({ taskTimeout: 30, mode: 'external' }); + taskBroker = new TaskBroker(mock(), config, runnerLifecycleEvents); + + jest.spyOn(global, 'clearTimeout'); + + const taskId = 'task1'; + const runnerId = 'runner1'; + const requesterId = 'requester1'; + const runner = mock({ id: runnerId }); + const runnerCallback = jest.fn(); + const requesterCallback = jest.fn(); + + taskBroker.registerRunner(runner, runnerCallback); + taskBroker.registerRequester(requesterId, requesterCallback); + + taskBroker.setTasks({ + [taskId]: { id: taskId, runnerId, requesterId, taskType: 'test' }, + }); + + await taskBroker.sendTaskSettings(taskId, {}); + runnerCallback.mockClear(); + + jest.runAllTimers(); + + await Promise.resolve(); // for timeout callback + await Promise.resolve(); // for sending messages to runner and requester + await Promise.resolve(); // for task cleanup and removal + + expect(runnerCallback).toHaveBeenLastCalledWith({ + type: 'broker:taskcancel', + taskId, + reason: 'Task execution timed out', + }); + + expect(requesterCallback).toHaveBeenCalledWith({ + type: 'broker:taskerror', + taskId, + error: expect.any(TaskRunnerTimeoutError), + }); + + expect(clearTimeout).toHaveBeenCalled(); + expect(taskBroker.getTasks().get(taskId)).toBeUndefined(); + }); }); }); diff --git a/packages/cli/src/runners/errors/task-runner-timeout.error.ts b/packages/cli/src/runners/errors/task-runner-timeout.error.ts index 88f3533028..1d9d463e3a 100644 --- a/packages/cli/src/runners/errors/task-runner-timeout.error.ts +++ b/packages/cli/src/runners/errors/task-runner-timeout.error.ts @@ -1,15 +1,23 @@ +import type { TaskRunnerMode } from '@n8n/config/src/configs/runners.config'; import { ApplicationError } from 'n8n-workflow'; export class TaskRunnerTimeoutError extends ApplicationError { description: string; - constructor(taskTimeout: number, isSelfHosted: boolean) { + constructor({ + taskTimeout, + isSelfHosted, + mode, + }: { taskTimeout: number; isSelfHosted: boolean; mode: TaskRunnerMode }) { super( `Task execution timed out after ${taskTimeout} ${taskTimeout === 1 ? 'second' : 'seconds'}`, ); - const subtitle = - 'The task runner was taking too long on this task, so it was suspected of being unresponsive and restarted, and the task was aborted. You can try the following:'; + const subtitles = { + internal: + 'The task runner was taking too long on this task, so it was suspected of being unresponsive and restarted, and the task was aborted.', + external: 'The task runner was taking too long on this task, so the task was aborted.', + }; const fixes = { optimizeScript: @@ -27,7 +35,7 @@ export class TaskRunnerTimeoutError extends ApplicationError { .map((suggestion, index) => `${index + 1}. ${suggestion}`) .join('
'); - const description = `${subtitle}

${suggestionsText}`; + const description = `${mode === 'internal' ? subtitles.internal : subtitles.external} You can try the following:

${suggestionsText}`; this.description = description; } diff --git a/packages/cli/src/runners/task-broker.service.ts b/packages/cli/src/runners/task-broker.service.ts index 80e918b47a..e52992d38e 100644 --- a/packages/cli/src/runners/task-broker.service.ts +++ b/packages/cli/src/runners/task-broker.service.ts @@ -459,14 +459,25 @@ export class TaskBroker { const task = this.tasks.get(taskId); if (!task) return; - this.runnerLifecycleEvents.emit('runner:timed-out-during-task'); + if (this.taskRunnersConfig.mode === 'internal') { + this.runnerLifecycleEvents.emit('runner:timed-out-during-task'); + } else if (this.taskRunnersConfig.mode === 'external') { + await this.messageRunner(task.runnerId, { + type: 'broker:taskcancel', + taskId, + reason: 'Task execution timed out', + }); + } + + const { taskTimeout, mode } = this.taskRunnersConfig; await this.taskErrorHandler( taskId, - new TaskRunnerTimeoutError( - this.taskRunnersConfig.taskTimeout, - config.getEnv('deployment.type') !== 'cloud', - ), + new TaskRunnerTimeoutError({ + taskTimeout, + isSelfHosted: config.getEnv('deployment.type') !== 'cloud', + mode, + }), ); }