1
1
mirror of https://github.com/n8n-io/n8n.git synced 2024-10-06 01:27:49 +03:00

feat: Graceful termination of task runner

This commit is contained in:
Tomi Turtiainen 2024-09-29 21:19:21 +03:00
parent 27d83e0d91
commit 52caff34ba
No known key found for this signature in database
3 changed files with 64 additions and 7 deletions

View File

@ -8,8 +8,6 @@ import {
type IWorkflowExecuteAdditionalData,
WorkflowDataProxy,
type WorkflowParameters,
} from 'n8n-workflow';
import {
type IDataObject,
type IExecuteData,
type INodeExecutionData,

View File

@ -1,9 +1,10 @@
import * as a from 'node:assert/strict';
import { JsTaskRunner } from './code';
import { authenticate } from './authenticator';
import { JsTaskRunner } from './code';
let _runner: JsTaskRunner;
let _runner: JsTaskRunner | undefined;
let isShuttingDown = false;
type Config = {
n8nUri: string;
@ -20,6 +21,28 @@ function readAndParseConfig(): Config {
};
}
function createSignalHandler(signal: string) {
return async function onSignal() {
if (isShuttingDown) {
return;
}
console.log(`Received ${signal} signal, shutting down...`);
isShuttingDown = true;
try {
if (_runner) {
await _runner.stop();
_runner = undefined;
}
} catch (error) {
console.error(`Error stopping task runner: ${error}`);
} finally {
process.exit(0);
}
};
}
void (async function start() {
const config = readAndParseConfig();
@ -31,4 +54,7 @@ void (async function start() {
const wsUrl = `ws://${config.n8nUri}/rest/runners/_ws`;
_runner = new JsTaskRunner('javascript', wsUrl, grantToken, 5);
process.on('SIGINT', createSignalHandler('SIGINT'));
process.on('SIGTERM', createSignalHandler('SIGTERM'));
})();

View File

@ -1,5 +1,6 @@
import { URL } from 'node:url';
import { ApplicationError } from 'n8n-workflow';
import { nanoid } from 'nanoid';
import { URL } from 'node:url';
import { type MessageEvent, WebSocket } from 'ws';
import { ensureError } from 'n8n-workflow';
@ -267,7 +268,7 @@ export abstract class TaskRunner {
// eslint-disable-next-line @typescript-eslint/naming-convention
async executeTask(_task: Task): Promise<TaskResultData> {
throw new Error('Unimplemented');
throw new ApplicationError('Unimplemented');
}
async requestData<T = unknown>(
@ -354,9 +355,41 @@ export abstract class TaskRunner {
obj = obj[s];
return;
}
obj[s] = async (...args: unknown[]) => this.makeRpcCall(taskId, r, args);
obj[s] = async (...args: unknown[]) => await this.makeRpcCall(taskId, r, args);
});
}
return rpcObject;
}
/** Close the connection gracefully and wait until has been closed */
async stop() {
this.stopTaskOffers();
await this.waitUntilAllTasksAreDone();
await this.closeConnection();
}
private async closeConnection() {
// 1000 is the standard close code
// https://www.rfc-editor.org/rfc/rfc6455.html#section-7.1.5
this.ws.close(1000, 'Shutting down');
await new Promise((resolve) => {
this.ws.once('close', resolve);
});
}
private async waitUntilAllTasksAreDone(maxWaitTimeInMs = 30_000) {
// TODO: Make maxWaitTimeInMs configurable
const start = Date.now();
while (Object.values(this.runningTasks).length > 0) {
if (Date.now() - start > maxWaitTimeInMs) {
throw new ApplicationError('Timeout while waiting for tasks to finish');
}
await new Promise((resolve) => setTimeout(resolve, 100));
}
}
}