diff --git a/packages/cli/src/ActiveExecutions.ts b/packages/cli/src/ActiveExecutions.ts index df270fcad7..67616ee4b3 100644 --- a/packages/cli/src/ActiveExecutions.ts +++ b/packages/cli/src/ActiveExecutions.ts @@ -36,6 +36,10 @@ export class ActiveExecutions { private readonly concurrencyControl: ConcurrencyControlService, ) {} + has(executionId: string) { + return this.activeExecutions[executionId] !== undefined; + } + /** * Add a new active execution */ diff --git a/packages/cli/src/Queue.ts b/packages/cli/src/Queue.ts index fbb53b9180..fce59867b5 100644 --- a/packages/cli/src/Queue.ts +++ b/packages/cli/src/Queue.ts @@ -58,6 +58,12 @@ export class Queue { }); } + async findRunningJobBy({ executionId }: { executionId: string }) { + const activeOrWaitingJobs = await this.getJobs(['active', 'waiting']); + + return activeOrWaitingJobs.find(({ data }) => data.executionId === executionId) ?? null; + } + decodeWebhookResponse(response: IExecuteResponsePromiseData): IExecuteResponsePromiseData { if ( typeof response === 'object' && diff --git a/packages/cli/src/WaitTracker.ts b/packages/cli/src/WaitTracker.ts index 201c22b1f3..d3f3f780f9 100644 --- a/packages/cli/src/WaitTracker.ts +++ b/packages/cli/src/WaitTracker.ts @@ -1,10 +1,6 @@ -import { - ApplicationError, - ErrorReporterProxy as ErrorReporter, - WorkflowOperationError, -} from 'n8n-workflow'; +import { ApplicationError, ErrorReporterProxy as ErrorReporter } from 'n8n-workflow'; import { Service } from 'typedi'; -import type { ExecutionStopResult, IWorkflowExecutionDataProcess } from '@/Interfaces'; +import type { IWorkflowExecutionDataProcess } from '@/Interfaces'; import { WorkflowRunner } from '@/WorkflowRunner'; import { ExecutionRepository } from '@db/repositories/execution.repository'; import { OwnershipService } from '@/services/ownership.service'; @@ -30,6 +26,10 @@ export class WaitTracker { private readonly orchestrationService: OrchestrationService, ) {} + has(executionId: string) { + return this.waitingExecutions[executionId] !== undefined; + } + /** * @important Requires `OrchestrationService` to be initialized. */ @@ -101,53 +101,12 @@ export class WaitTracker { } } - async stopExecution(executionId: string): Promise { - if (this.waitingExecutions[executionId] !== undefined) { - // The waiting execution was already scheduled to execute. - // So stop timer and remove. - clearTimeout(this.waitingExecutions[executionId].timer); - delete this.waitingExecutions[executionId]; - } + async stopExecution(executionId: string) { + if (!this.waitingExecutions[executionId]) return; - // Also check in database - const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, { - includeData: true, - unflattenData: true, - }); + clearTimeout(this.waitingExecutions[executionId].timer); - if (!fullExecutionData) { - throw new ApplicationError('Execution not found.', { - extra: { executionId }, - }); - } - - if (!['new', 'unknown', 'waiting', 'running'].includes(fullExecutionData.status)) { - throw new WorkflowOperationError( - `Only running or waiting executions can be stopped and ${executionId} is currently ${fullExecutionData.status}.`, - ); - } - // Set in execution in DB as failed and remove waitTill time - const error = new WorkflowOperationError('Workflow-Execution has been canceled!'); - - fullExecutionData.data.resultData.error = { - ...error, - message: error.message, - stack: error.stack, - }; - - fullExecutionData.stoppedAt = new Date(); - fullExecutionData.waitTill = null; - fullExecutionData.status = 'canceled'; - - await this.executionRepository.updateExistingExecution(executionId, fullExecutionData); - - return { - mode: fullExecutionData.mode, - startedAt: new Date(fullExecutionData.startedAt), - stoppedAt: fullExecutionData.stoppedAt ? new Date(fullExecutionData.stoppedAt) : undefined, - finished: fullExecutionData.finished, - status: fullExecutionData.status, - }; + delete this.waitingExecutions[executionId]; } startExecution(executionId: string) { diff --git a/packages/cli/src/concurrency/concurrency-control.service.ts b/packages/cli/src/concurrency/concurrency-control.service.ts index 4f42a7b45c..77310b7694 100644 --- a/packages/cli/src/concurrency/concurrency-control.service.ts +++ b/packages/cli/src/concurrency/concurrency-control.service.ts @@ -69,6 +69,15 @@ export class ConcurrencyControlService { }); } + /** + * Check whether an execution is in the production queue. + */ + has(executionId: string) { + if (!this.isEnabled) return false; + + return this.productionQueue.getAll().has(executionId); + } + /** * Block or let through an execution based on concurrency capacity. */ diff --git a/packages/cli/src/databases/repositories/execution.repository.ts b/packages/cli/src/databases/repositories/execution.repository.ts index 03a2a8ae82..4e9977c404 100644 --- a/packages/cli/src/databases/repositories/execution.repository.ts +++ b/packages/cli/src/databases/repositories/execution.repository.ts @@ -22,6 +22,7 @@ import type { import { parse, stringify } from 'flatted'; import { ApplicationError, + WorkflowOperationError, type ExecutionStatus, type ExecutionSummary, type IRunExecutionData, @@ -609,8 +610,34 @@ export class ExecutionRepository extends Repository { }); } - async cancel(executionId: string) { - await this.update({ id: executionId }, { status: 'canceled', stoppedAt: new Date() }); + async stopBeforeRun(execution: IExecutionResponse) { + execution.status = 'canceled'; + execution.stoppedAt = new Date(); + + await this.update( + { id: execution.id }, + { status: execution.status, stoppedAt: execution.stoppedAt }, + ); + + return execution; + } + + async stopDuringRun(execution: IExecutionResponse) { + const error = new WorkflowOperationError('Workflow-Execution has been canceled!'); + + execution.data.resultData.error = { + ...error, + message: error.message, + stack: error.stack, + }; + + execution.stoppedAt = new Date(); + execution.waitTill = null; + execution.status = 'canceled'; + + await this.updateExistingExecution(execution.id, execution); + + return execution; } async cancelMany(executionIds: string[]) { diff --git a/packages/cli/src/errors/missing-execution-stop.error.ts b/packages/cli/src/errors/missing-execution-stop.error.ts new file mode 100644 index 0000000000..3662cc4a72 --- /dev/null +++ b/packages/cli/src/errors/missing-execution-stop.error.ts @@ -0,0 +1,7 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class MissingExecutionStopError extends ApplicationError { + constructor(executionId: string) { + super('Failed to find execution to stop', { extra: { executionId } }); + } +} diff --git a/packages/cli/src/executions/__tests__/execution.service.test.ts b/packages/cli/src/executions/__tests__/execution.service.test.ts new file mode 100644 index 0000000000..1d3ea301b0 --- /dev/null +++ b/packages/cli/src/executions/__tests__/execution.service.test.ts @@ -0,0 +1,269 @@ +import { mock } from 'jest-mock-extended'; +import { WorkflowOperationError } from 'n8n-workflow'; +import config from '@/config'; +import { ExecutionService } from '@/executions/execution.service'; +import { AbortedExecutionRetryError } from '@/errors/aborted-execution-retry.error'; +import { MissingExecutionStopError } from '@/errors/missing-execution-stop.error'; +import type { ActiveExecutions } from '@/ActiveExecutions'; +import type { IExecutionResponse } from '@/Interfaces'; +import type { Job, Queue } from '@/Queue'; +import type { WaitTracker } from '@/WaitTracker'; +import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; +import type { ExecutionRequest } from '@/executions/execution.types'; +import type { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; + +describe('ExecutionService', () => { + const queue = mock(); + const activeExecutions = mock(); + const executionRepository = mock(); + const waitTracker = mock(); + const concurrencyControl = mock(); + + const executionService = new ExecutionService( + mock(), + queue, + activeExecutions, + executionRepository, + mock(), + mock(), + waitTracker, + mock(), + concurrencyControl, + mock(), + ); + + beforeEach(() => { + config.set('executions.mode', 'regular'); + jest.clearAllMocks(); + }); + + describe('retry', () => { + it('should error on retrying a execution that was aborted before starting', async () => { + /** + * Arrange + */ + executionRepository.findWithUnflattenedData.mockResolvedValue( + mock({ data: { executionData: undefined } }), + ); + const req = mock(); + + /** + * Act + */ + const retry = executionService.retry(req, []); + + /** + * Assert + */ + await expect(retry).rejects.toThrow(AbortedExecutionRetryError); + }); + }); + + describe('stop', () => { + it('should throw when stopping a missing execution', async () => { + /** + * Arrange + */ + executionRepository.findSingleExecution.mockResolvedValue(undefined); + + /** + * Act + */ + const stop = executionService.stop('inexistent-123'); + + /** + * Assert + */ + await expect(stop).rejects.toThrowError(MissingExecutionStopError); + }); + + it('should throw when stopping a not-in-progress execution', async () => { + /** + * Arrange + */ + const execution = mock({ id: '123', status: 'success' }); + executionRepository.findSingleExecution.mockResolvedValue(execution); + + /** + * Act + */ + const stop = executionService.stop(execution.id); + + /** + * Assert + */ + await expect(stop).rejects.toThrowError(WorkflowOperationError); + }); + + describe('regular mode', () => { + it('should stop a `running` execution in regular mode', async () => { + /** + * Arrange + */ + const execution = mock({ id: '123', status: 'running' }); + executionRepository.findSingleExecution.mockResolvedValue(execution); + concurrencyControl.has.mockReturnValue(false); + activeExecutions.has.mockReturnValue(true); + waitTracker.has.mockReturnValue(false); + executionRepository.stopDuringRun.mockResolvedValue(mock()); + + /** + * Act + */ + await executionService.stop(execution.id); + + /** + * Assert + */ + expect(concurrencyControl.remove).not.toHaveBeenCalled(); + expect(activeExecutions.stopExecution).toHaveBeenCalledWith(execution.id); + expect(waitTracker.stopExecution).not.toHaveBeenCalled(); + expect(executionRepository.stopDuringRun).toHaveBeenCalledWith(execution); + }); + + it('should stop a `waiting` execution in regular mode', async () => { + /** + * Arrange + */ + const execution = mock({ id: '123', status: 'waiting' }); + executionRepository.findSingleExecution.mockResolvedValue(execution); + concurrencyControl.has.mockReturnValue(false); + activeExecutions.has.mockReturnValue(true); + waitTracker.has.mockReturnValue(true); + executionRepository.stopDuringRun.mockResolvedValue(mock()); + + /** + * Act + */ + await executionService.stop(execution.id); + + /** + * Assert + */ + expect(concurrencyControl.remove).not.toHaveBeenCalled(); + expect(activeExecutions.stopExecution).toHaveBeenCalledWith(execution.id); + expect(waitTracker.stopExecution).toHaveBeenCalledWith(execution.id); + expect(executionRepository.stopDuringRun).toHaveBeenCalledWith(execution); + }); + + it('should stop a concurrency-controlled `new` execution in regular mode', async () => { + /** + * Arrange + */ + const execution = mock({ id: '123', status: 'new', mode: 'trigger' }); + executionRepository.findSingleExecution.mockResolvedValue(execution); + concurrencyControl.has.mockReturnValue(true); + activeExecutions.has.mockReturnValue(false); + waitTracker.has.mockReturnValue(false); + executionRepository.stopBeforeRun.mockResolvedValue(mock()); + + /** + * Act + */ + await executionService.stop(execution.id); + + /** + * Assert + */ + expect(concurrencyControl.remove).toHaveBeenCalledWith({ + mode: execution.mode, + executionId: execution.id, + }); + expect(activeExecutions.stopExecution).not.toHaveBeenCalled(); + expect(waitTracker.stopExecution).not.toHaveBeenCalled(); + expect(executionRepository.stopDuringRun).not.toHaveBeenCalled(); + }); + }); + + describe('scaling mode', () => { + describe('manual execution', () => { + it('should delegate to regular mode in scaling mode', async () => { + /** + * Arrange + */ + config.set('executions.mode', 'queue'); + const execution = mock({ + id: '123', + mode: 'manual', + status: 'running', + }); + executionRepository.findSingleExecution.mockResolvedValue(execution); + concurrencyControl.has.mockReturnValue(false); + activeExecutions.has.mockReturnValue(true); + waitTracker.has.mockReturnValue(false); + executionRepository.stopDuringRun.mockResolvedValue(mock()); + // @ts-expect-error Private method + const stopInRegularModeSpy = jest.spyOn(executionService, 'stopInRegularMode'); + + /** + * Act + */ + await executionService.stop(execution.id); + + /** + * Assert + */ + expect(stopInRegularModeSpy).toHaveBeenCalledWith(execution); + expect(activeExecutions.stopExecution).toHaveBeenCalledWith(execution.id); + expect(executionRepository.stopDuringRun).toHaveBeenCalledWith(execution); + + expect(concurrencyControl.remove).not.toHaveBeenCalled(); + expect(waitTracker.stopExecution).not.toHaveBeenCalled(); + expect(queue.stopJob).not.toHaveBeenCalled(); + }); + }); + + describe('production execution', () => { + it('should stop a `running` execution in scaling mode', async () => { + /** + * Arrange + */ + config.set('executions.mode', 'queue'); + const execution = mock({ id: '123', status: 'running' }); + executionRepository.findSingleExecution.mockResolvedValue(execution); + waitTracker.has.mockReturnValue(false); + queue.findRunningJobBy.mockResolvedValue(mock()); + executionRepository.stopDuringRun.mockResolvedValue(mock()); + + /** + * Act + */ + await executionService.stop(execution.id); + + /** + * Assert + */ + expect(waitTracker.stopExecution).not.toHaveBeenCalled(); + expect(queue.findRunningJobBy).toBeCalledWith({ executionId: execution.id }); + expect(queue.stopJob).toHaveBeenCalled(); + expect(executionRepository.stopDuringRun).toHaveBeenCalled(); + }); + + it('should stop a `waiting` execution in scaling mode', async () => { + /** + * Arrange + */ + config.set('executions.mode', 'queue'); + const execution = mock({ id: '123', status: 'waiting' }); + executionRepository.findSingleExecution.mockResolvedValue(execution); + waitTracker.has.mockReturnValue(true); + queue.findRunningJobBy.mockResolvedValue(mock()); + executionRepository.stopDuringRun.mockResolvedValue(mock()); + + /** + * Act + */ + await executionService.stop(execution.id); + + /** + * Assert + */ + expect(waitTracker.stopExecution).toHaveBeenCalledWith(execution.id); + expect(queue.findRunningJobBy).toBeCalledWith({ executionId: execution.id }); + expect(queue.stopJob).toHaveBeenCalled(); + expect(executionRepository.stopDuringRun).toHaveBeenCalled(); + }); + }); + }); + }); +}); diff --git a/packages/cli/src/executions/execution.service.ts b/packages/cli/src/executions/execution.service.ts index 6ff642acff..546443ec4a 100644 --- a/packages/cli/src/executions/execution.service.ts +++ b/packages/cli/src/executions/execution.service.ts @@ -24,7 +24,7 @@ import type { } from '@/Interfaces'; import { NodeTypes } from '@/NodeTypes'; import { Queue } from '@/Queue'; -import type { ExecutionRequest, ExecutionSummaries } from './execution.types'; +import type { ExecutionRequest, ExecutionSummaries, StopResult } from './execution.types'; import { WorkflowRunner } from '@/WorkflowRunner'; import type { IGetExecutionsQueryFilter } from '@db/repositories/execution.repository'; import { ExecutionRepository } from '@db/repositories/execution.repository'; @@ -34,7 +34,7 @@ import { InternalServerError } from '@/errors/response-errors/internal-server.er import { NotFoundError } from '@/errors/response-errors/not-found.error'; import config from '@/config'; import { WaitTracker } from '@/WaitTracker'; -import type { ExecutionEntity } from '@/databases/entities/ExecutionEntity'; +import { MissingExecutionStopError } from '@/errors/missing-execution-stop.error'; import { QueuedExecutionRetryError } from '@/errors/queued-execution-retry.error'; import { ConcurrencyControlService } from '@/concurrency/concurrency-control.service'; import { AbortedExecutionRetryError } from '@/errors/aborted-execution-retry.error'; @@ -328,8 +328,6 @@ export class ExecutionService { // new API // ---------------------------------- - private readonly isRegularMode = config.getEnv('executions.mode') === 'regular'; - /** * Find summaries of executions that satisfy a query. * @@ -392,59 +390,6 @@ export class ExecutionService { }; } - /** - * Stop an active execution. - */ - async stop(executionId: string) { - const execution = await this.executionRepository.findOneBy({ id: executionId }); - - if (!execution) throw new NotFoundError('Execution not found'); - - if (execution.status === 'new') { - this.concurrencyControl.remove({ mode: execution.mode, executionId }); - await this.executionRepository.cancel(executionId); - - return; - } - - const stopResult = await this.activeExecutions.stopExecution(execution.id); - - if (stopResult) return this.toExecutionStopResult(execution); - - if (this.isRegularMode) { - return await this.waitTracker.stopExecution(execution.id); - } - - // queue mode - - try { - return await this.waitTracker.stopExecution(execution.id); - } catch { - // @TODO: Why are we swallowing this error in queue mode? - } - - const activeJobs = await this.queue.getJobs(['active', 'waiting']); - const job = activeJobs.find(({ data }) => data.executionId === execution.id); - - if (job) { - await this.queue.stopJob(job); - } else { - this.logger.debug('Job to stop no longer in queue', { jobId: execution.id }); - } - - return this.toExecutionStopResult(execution); - } - - private toExecutionStopResult(execution: ExecutionEntity) { - return { - mode: execution.mode, - startedAt: new Date(execution.startedAt), - stoppedAt: execution.stoppedAt ? new Date(execution.stoppedAt) : undefined, - finished: execution.finished, - status: execution.status, - }; - } - async findAllEnqueuedExecutions() { return await this.executionRepository.findMultipleExecutions( { @@ -455,4 +400,76 @@ export class ExecutionService { { includeData: true, unflattenData: true }, ); } + + async stop(executionId: string): Promise { + const execution = await this.executionRepository.findSingleExecution(executionId, { + includeData: true, + unflattenData: true, + }); + + if (!execution) throw new MissingExecutionStopError(executionId); + + this.assertStoppable(execution); + + const { mode, startedAt, stoppedAt, finished, status } = + config.getEnv('executions.mode') === 'regular' + ? await this.stopInRegularMode(execution) + : await this.stopInScalingMode(execution); + + return { + mode, + startedAt: new Date(startedAt), + stoppedAt: stoppedAt ? new Date(stoppedAt) : undefined, + finished, + status, + }; + } + + private assertStoppable(execution: IExecutionResponse) { + const STOPPABLE_STATUSES: ExecutionStatus[] = ['new', 'unknown', 'waiting', 'running']; + + if (!STOPPABLE_STATUSES.includes(execution.status)) { + throw new WorkflowOperationError( + `Only running or waiting executions can be stopped and ${execution.id} is currently ${execution.status}`, + ); + } + } + + private async stopInRegularMode(execution: IExecutionResponse) { + if (this.concurrencyControl.has(execution.id)) { + this.concurrencyControl.remove({ mode: execution.mode, executionId: execution.id }); + return await this.executionRepository.stopBeforeRun(execution); + } + + if (this.activeExecutions.has(execution.id)) { + await this.activeExecutions.stopExecution(execution.id); + } + + if (this.waitTracker.has(execution.id)) { + await this.waitTracker.stopExecution(execution.id); + } + + return await this.executionRepository.stopDuringRun(execution); + } + + private async stopInScalingMode(execution: IExecutionResponse) { + if (execution.mode === 'manual') { + // manual executions in scaling mode are processed by main + return await this.stopInRegularMode(execution); + } + + if (this.waitTracker.has(execution.id)) { + await this.waitTracker.stopExecution(execution.id); + } + + const job = await this.queue.findRunningJobBy({ executionId: execution.id }); + + if (job) { + await this.queue.stopJob(job); + } else { + this.logger.debug('Job to stop not in queue', { executionId: execution.id }); + } + + return await this.executionRepository.stopDuringRun(execution); + } } diff --git a/packages/cli/src/executions/execution.types.ts b/packages/cli/src/executions/execution.types.ts index 95b9a3cdec..7e8872bf1b 100644 --- a/packages/cli/src/executions/execution.types.ts +++ b/packages/cli/src/executions/execution.types.ts @@ -1,6 +1,6 @@ import type { ExecutionEntity } from '@/databases/entities/ExecutionEntity'; import type { AuthenticatedRequest } from '@/requests'; -import type { ExecutionStatus, IDataObject } from 'n8n-workflow'; +import type { ExecutionStatus, IDataObject, WorkflowExecuteMode } from 'n8n-workflow'; export declare namespace ExecutionRequest { namespace QueryParams { @@ -101,3 +101,11 @@ export type QueueRecoverySettings = { */ waitMs: number; }; + +export type StopResult = { + mode: WorkflowExecuteMode; + startedAt: Date; + stoppedAt?: Date; + finished: boolean; + status: ExecutionStatus; +}; diff --git a/packages/cli/test/unit/services/execution.service.test.ts b/packages/cli/test/unit/services/execution.service.test.ts deleted file mode 100644 index fe05c76585..0000000000 --- a/packages/cli/test/unit/services/execution.service.test.ts +++ /dev/null @@ -1,32 +0,0 @@ -import type { IExecutionResponse } from '@/Interfaces'; -import type { ExecutionRepository } from '@/databases/repositories/execution.repository'; -import { AbortedExecutionRetryError } from '@/errors/aborted-execution-retry.error'; -import { ExecutionService } from '@/executions/execution.service'; -import type { ExecutionRequest } from '@/executions/execution.types'; -import { mock } from 'jest-mock-extended'; - -describe('ExecutionService', () => { - const executionRepository = mock(); - const executionService = new ExecutionService( - mock(), - mock(), - mock(), - executionRepository, - mock(), - mock(), - mock(), - mock(), - mock(), - mock(), - ); - - it('should error on retrying an aborted execution', async () => { - const abortedExecutionData = mock({ data: { executionData: undefined } }); - executionRepository.findWithUnflattenedData.mockResolvedValue(abortedExecutionData); - const req = mock(); - - const retry = executionService.retry(req, []); - - await expect(retry).rejects.toThrow(AbortedExecutionRetryError); - }); -});