diff --git a/packages/cli/src/Queue.ts b/packages/cli/src/Queue.ts index 9f82a9b2ad..b96143dafa 100644 --- a/packages/cli/src/Queue.ts +++ b/packages/cli/src/Queue.ts @@ -109,8 +109,11 @@ export class Queue { return await this.jobQueue.client.ping(); } - async pause(isLocal?: boolean): Promise { - return await this.jobQueue.pause(isLocal); + async pause({ + isLocal, + doNotWaitActive, + }: { isLocal?: boolean; doNotWaitActive?: boolean } = {}): Promise { + return await this.jobQueue.pause(isLocal, doNotWaitActive); } getBullObjectInstance(): JobQueue { diff --git a/packages/cli/src/commands/worker.ts b/packages/cli/src/commands/worker.ts index 8bb4fad610..7182e32850 100644 --- a/packages/cli/src/commands/worker.ts +++ b/packages/cli/src/commands/worker.ts @@ -63,23 +63,23 @@ export class Worker extends BaseCommand { async stopProcess() { this.logger.info('Stopping n8n...'); - // Stop accepting new jobs - await Worker.jobQueue.pause(true); + // Stop accepting new jobs, `doNotWaitActive` allows reporting progress + await Worker.jobQueue.pause({ isLocal: true, doNotWaitActive: true }); try { await this.externalHooks?.run('n8n.stop', []); - const hardStopTime = Date.now() + this.gracefulShutdownTimeoutInS; + const hardStopTimeMs = Date.now() + this.gracefulShutdownTimeoutInS * 1000; // Wait for active workflow executions to finish let count = 0; while (Object.keys(Worker.runningJobs).length !== 0) { if (count++ % 4 === 0) { - const waitLeft = Math.ceil((hardStopTime - Date.now()) / 1000); + const waitLeft = Math.ceil((hardStopTimeMs - Date.now()) / 1000); this.logger.info( `Waiting for ${ Object.keys(Worker.runningJobs).length - } active executions to finish... (wait ${waitLeft} more seconds)`, + } active executions to finish... (max wait ${waitLeft} more seconds)`, ); } @@ -483,6 +483,16 @@ export class Worker extends BaseCommand { await this.setupHealthMonitor(); } + if (process.stdout.isTTY) { + process.stdin.setRawMode(true); + process.stdin.resume(); + process.stdin.setEncoding('utf8'); + + process.stdin.on('data', (key: string) => { + if (key.charCodeAt(0) === 3) process.kill(process.pid, 'SIGINT'); // ctrl+c + }); + } + // Make sure that the process does not close await new Promise(() => {}); }