diff --git a/packages/twenty-server/src/engine/core-modules/workflow/workflow-trigger.resolver.ts b/packages/twenty-server/src/engine/core-modules/workflow/workflow-trigger.resolver.ts index 82aa30498f..630832a18e 100644 --- a/packages/twenty-server/src/engine/core-modules/workflow/workflow-trigger.resolver.ts +++ b/packages/twenty-server/src/engine/core-modules/workflow/workflow-trigger.resolver.ts @@ -19,19 +19,19 @@ export class WorkflowTriggerResolver { ) {} @Mutation(() => Boolean) - async enableWorkflowTrigger( + async activateWorkflowVersion( @Args('workflowVersionId') workflowVersionId: string, ) { - return await this.workflowTriggerWorkspaceService.enableWorkflowTrigger( + return await this.workflowTriggerWorkspaceService.activateWorkflowVersion( workflowVersionId, ); } @Mutation(() => Boolean) - async disableWorkflowTrigger( + async deactivateWorkflowVersion( @Args('workflowVersionId') workflowVersionId: string, ) { - return await this.workflowTriggerWorkspaceService.disableWorkflowTrigger( + return await this.workflowTriggerWorkspaceService.deactivateWorkflowVersion( workflowVersionId, ); } diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-version.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-version.workspace-entity.ts index 5324aaf632..8149cfcc39 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-version.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-version.workspace-entity.ts @@ -28,9 +28,10 @@ export enum WorkflowVersionStatus { DRAFT = 'DRAFT', ACTIVE = 'ACTIVE', DEACTIVATED = 'DEACTIVATED', + ARCHIVED = 'ARCHIVED', } -export const WorkflowVersionStatusOptions = [ +const WorkflowVersionStatusOptions = [ { value: WorkflowVersionStatus.DRAFT, label: 'Draft', @@ -47,7 +48,13 @@ export const WorkflowVersionStatusOptions = [ value: WorkflowVersionStatus.DEACTIVATED, label: 'Deactivated', position: 2, - color: 'gray', + color: 'red', + }, + { + value: WorkflowVersionStatus.ARCHIVED, + label: 'Archived', + position: 3, + color: 'grey', }, ]; diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow.workspace-entity.ts index 455a7e02ba..98dd40417e 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow.workspace-entity.ts @@ -18,11 +18,34 @@ import { STANDARD_OBJECT_IDS } from 'src/engine/workspace-manager/workspace-sync import { FavoriteWorkspaceEntity } from 'src/modules/favorite/standard-objects/favorite.workspace-entity'; import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity'; import { WorkflowRunWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; -import { - WorkflowVersionStatus, - WorkflowVersionStatusOptions, - WorkflowVersionWorkspaceEntity, -} from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; +import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; + +export enum WorkflowStatus { + DRAFT = 'DRAFT', + ACTIVE = 'ACTIVE', + DEACTIVATED = 'DEACTIVATED', +} + +const WorkflowStatusOptions = [ + { + value: WorkflowStatus.DRAFT, + label: 'Draft', + position: 0, + color: 'yellow', + }, + { + value: WorkflowStatus.ACTIVE, + label: 'Active', + position: 1, + color: 'green', + }, + { + value: WorkflowStatus.DEACTIVATED, + label: 'Deactivated', + position: 2, + color: 'grey', + }, +]; @WorkspaceEntity({ standardId: STANDARD_OBJECT_IDS.workflow, @@ -61,10 +84,10 @@ export class WorkflowWorkspaceEntity extends BaseWorkspaceEntity { type: FieldMetadataType.MULTI_SELECT, label: 'Statuses', description: 'The current statuses of the workflow versions', - options: WorkflowVersionStatusOptions, + options: WorkflowStatusOptions, }) @WorkspaceIsNullable() - statuses: WorkflowVersionStatus[] | null; + statuses: WorkflowStatus[] | null; @WorkspaceField({ standardId: WORKFLOW_STANDARD_FIELD_IDS.position, diff --git a/packages/twenty-server/src/modules/workflow/common/workflow-common.workspace-service.ts b/packages/twenty-server/src/modules/workflow/common/workflow-common.workspace-service.ts index cfb44ea261..9d325f01bc 100644 --- a/packages/twenty-server/src/modules/workflow/common/workflow-common.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/common/workflow-common.workspace-service.ts @@ -28,6 +28,16 @@ export class WorkflowCommonWorkspaceService { }, }); + return this.getValidWorkflowVersionOrFail(workflowVersion); + } + + async getValidWorkflowVersionOrFail( + workflowVersion: WorkflowVersionWorkspaceEntity | null, + ): Promise< + Omit & { + trigger: WorkflowTrigger; + } + > { if (!workflowVersion) { throw new WorkflowTriggerException( 'Workflow version not found', diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/jobs/__tests__/workflow-statuses-update.job.spec.ts b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/__tests__/workflow-statuses-update.job.spec.ts new file mode 100644 index 0000000000..bb370fa853 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/__tests__/workflow-statuses-update.job.spec.ts @@ -0,0 +1,268 @@ +import { Test, TestingModule } from '@nestjs/testing'; + +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { WorkflowVersionStatus } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; +import { WorkflowStatus } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; +import { + WorkflowStatusesUpdateJob, + WorkflowVersionBatchEvent, + WorkflowVersionEventType, +} from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job'; + +describe('WorkflowStatusesUpdate', () => { + let job: WorkflowStatusesUpdateJob; + + const mockWorkflowRepository = { + findOneOrFail: jest.fn(), + update: jest.fn(), + }; + + const mockTwentyORMManager = { + getRepository: jest.fn().mockResolvedValue(mockWorkflowRepository), + }; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [ + WorkflowStatusesUpdateJob, + { + provide: TwentyORMManager, + useValue: mockTwentyORMManager, + }, + ], + }).compile(); + + job = await module.resolve( + WorkflowStatusesUpdateJob, + ); + }); + + it('should be defined', () => { + expect(job).toBeDefined(); + }); + + describe('handle', () => { + describe('when event type is CREATE', () => { + it('when already a draft, do not change anything', async () => { + const event: WorkflowVersionBatchEvent = { + workspaceId: '1', + type: WorkflowVersionEventType.CREATE, + workflowIds: ['1'], + }; + + const mockWorkflow = { + statuses: [WorkflowStatus.DRAFT], + }; + + mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow); + + await job.handle(event); + + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(0); + }); + + it('when no draft yet, update statuses', async () => { + const event: WorkflowVersionBatchEvent = { + workspaceId: '1', + type: WorkflowVersionEventType.CREATE, + workflowIds: ['1'], + }; + + const mockWorkflow = { + id: '1', + statuses: [WorkflowStatus.ACTIVE], + }; + + mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow); + + await job.handle(event); + + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowRepository.update).toHaveBeenCalledWith( + { id: '1' }, + { statuses: [WorkflowStatus.ACTIVE, WorkflowStatus.DRAFT] }, + ); + }); + }); + + describe('when event type is STATUS_UPDATE', () => { + test('when status is the same, should not do anything', async () => { + const event: WorkflowVersionBatchEvent = { + workspaceId: '1', + type: WorkflowVersionEventType.STATUS_UPDATE, + statusUpdates: [ + { + workflowId: '1', + previousStatus: WorkflowVersionStatus.ACTIVE, + newStatus: WorkflowVersionStatus.ACTIVE, + }, + ], + }; + + const mockWorkflow = { + statuses: [WorkflowStatus.ACTIVE], + }; + + mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow); + + await job.handle(event); + + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(0); + }); + + test('when update that should be impossible, do not do anything', async () => { + const event: WorkflowVersionBatchEvent = { + workspaceId: '1', + type: WorkflowVersionEventType.STATUS_UPDATE, + statusUpdates: [ + { + workflowId: '1', + previousStatus: WorkflowVersionStatus.ACTIVE, + newStatus: WorkflowVersionStatus.DRAFT, + }, + ], + }; + + const mockWorkflow = { + statuses: [WorkflowStatus.ACTIVE], + }; + + mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow); + + await job.handle(event); + + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(0); + }); + + test('when WorkflowVersionStatus.DEACTIVATED to WorkflowVersionStatus.ACTIVE, should activate', async () => { + const event: WorkflowVersionBatchEvent = { + workspaceId: '1', + type: WorkflowVersionEventType.STATUS_UPDATE, + statusUpdates: [ + { + workflowId: '1', + previousStatus: WorkflowVersionStatus.DEACTIVATED, + newStatus: WorkflowVersionStatus.ACTIVE, + }, + ], + }; + + const mockWorkflow = { + statuses: [WorkflowStatus.DEACTIVATED], + }; + + mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow); + + await job.handle(event); + + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowRepository.update).toHaveBeenCalledWith( + { id: '1' }, + { statuses: [WorkflowStatus.ACTIVE] }, + ); + }); + + test('when WorkflowVersionStatus.ACTIVE to WorkflowVersionStatus.DEACTIVATED, should deactivate', async () => { + const event: WorkflowVersionBatchEvent = { + workspaceId: '1', + type: WorkflowVersionEventType.STATUS_UPDATE, + statusUpdates: [ + { + workflowId: '1', + previousStatus: WorkflowVersionStatus.ACTIVE, + newStatus: WorkflowVersionStatus.DEACTIVATED, + }, + ], + }; + + const mockWorkflow = { + statuses: [WorkflowStatus.ACTIVE], + }; + + mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow); + + await job.handle(event); + + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowRepository.update).toHaveBeenCalledWith( + { id: '1' }, + { statuses: [WorkflowStatus.DEACTIVATED] }, + ); + }); + + test('when WorkflowVersionStatus.DRAFT to WorkflowVersionStatus.ACTIVE, should activate', async () => { + const event: WorkflowVersionBatchEvent = { + workspaceId: '1', + type: WorkflowVersionEventType.STATUS_UPDATE, + statusUpdates: [ + { + workflowId: '1', + previousStatus: WorkflowVersionStatus.DRAFT, + newStatus: WorkflowVersionStatus.ACTIVE, + }, + ], + }; + + const mockWorkflow = { + statuses: [WorkflowStatus.DRAFT], + }; + + mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow); + + await job.handle(event); + + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowRepository.update).toHaveBeenCalledWith( + { id: '1' }, + { statuses: [WorkflowStatus.ACTIVE] }, + ); + }); + }); + + describe('when event type is DELETE', () => { + test('when status is not draft, should not do anything', async () => { + const event: WorkflowVersionBatchEvent = { + workspaceId: '1', + type: WorkflowVersionEventType.DELETE, + workflowIds: ['1'], + }; + + const mockWorkflow = { + statuses: [WorkflowStatus.ACTIVE], + }; + + mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow); + + await job.handle(event); + + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(0); + }); + + test('when status is draft, should delete', async () => { + const event: WorkflowVersionBatchEvent = { + workspaceId: '1', + type: WorkflowVersionEventType.DELETE, + workflowIds: ['1'], + }; + + const mockWorkflow = { + statuses: [WorkflowStatus.DRAFT], + }; + + mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow); + + await job.handle(event); + + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowRepository.update).toHaveBeenCalledWith( + { id: '1' }, + { statuses: [] }, + ); + }); + }); + }); +}); diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts new file mode 100644 index 0000000000..9cf1e7a399 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts @@ -0,0 +1,201 @@ +import { Scope } from '@nestjs/common'; + +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { WorkflowVersionStatus } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; +import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; +import { getStatusCombinationFromArray } from 'src/modules/workflow/workflow-status/utils/get-status-combination-from-array.util'; +import { getStatusCombinationFromUpdate } from 'src/modules/workflow/workflow-status/utils/get-status-combination-from-update.util'; +import { getWorkflowStatusesFromCombination } from 'src/modules/workflow/workflow-status/utils/get-statuses-from-combination.util'; + +export enum WorkflowVersionEventType { + CREATE = 'CREATE', + STATUS_UPDATE = 'STATUS_UPDATE', + DELETE = 'DELETE', +} + +export type WorkflowVersionBatchEvent = { + workspaceId: string; +} & ( + | WorkflowVersionBatchCreateEvent + | WorkflowVersionBatchStatusUpdate + | WorkflowVersionBatchDelete +); + +export type WorkflowVersionBatchCreateEvent = { + type: WorkflowVersionEventType.CREATE; +} & { + workflowIds: string[]; +}; + +export type WorkflowVersionStatusUpdate = { + workflowId: string; + previousStatus: WorkflowVersionStatus; + newStatus: WorkflowVersionStatus; +}; + +export type WorkflowVersionBatchStatusUpdate = { + type: WorkflowVersionEventType.STATUS_UPDATE; +} & { + statusUpdates: WorkflowVersionStatusUpdate[]; +}; + +export type WorkflowVersionBatchDelete = { + type: WorkflowVersionEventType.DELETE; +} & { workflowIds: string[] }; + +@Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST }) +export class WorkflowStatusesUpdateJob { + constructor(private readonly twentyORMManager: TwentyORMManager) {} + + @Process(WorkflowStatusesUpdateJob.name) + async handle(event: WorkflowVersionBatchEvent): Promise { + switch (event.type) { + case WorkflowVersionEventType.CREATE: + await Promise.all( + event.workflowIds.map((workflowId) => + this.handleWorkflowVersionCreated(workflowId), + ), + ); + break; + case WorkflowVersionEventType.STATUS_UPDATE: + await Promise.all( + event.statusUpdates.map((statusUpdate) => + this.handleWorkflowVersionStatusUpdated(statusUpdate), + ), + ); + break; + case WorkflowVersionEventType.DELETE: + await Promise.all( + event.workflowIds.map((workflowId) => + this.handleWorkflowVersionDeleted(workflowId), + ), + ); + break; + default: + break; + } + } + + private async handleWorkflowVersionCreated( + workflowId: string, + ): Promise { + const workflowRepository = + await this.twentyORMManager.getRepository( + 'workflow', + ); + + const workflow = await workflowRepository.findOneOrFail({ + where: { + id: workflowId, + }, + }); + + const currentWorkflowStatusCombination = getStatusCombinationFromArray( + workflow.statuses || [], + ); + + const newWorkflowStatusCombination = getStatusCombinationFromUpdate( + currentWorkflowStatusCombination, + undefined, + WorkflowVersionStatus.DRAFT, + ); + + if (newWorkflowStatusCombination === currentWorkflowStatusCombination) { + return; + } + + await workflowRepository.update( + { + id: workflow.id, + }, + { + statuses: getWorkflowStatusesFromCombination( + newWorkflowStatusCombination, + ), + }, + ); + } + + private async handleWorkflowVersionStatusUpdated( + statusUpdate: WorkflowVersionStatusUpdate, + ): Promise { + const workflowRepository = + await this.twentyORMManager.getRepository( + 'workflow', + ); + + const workflow = await workflowRepository.findOneOrFail({ + where: { + id: statusUpdate.workflowId, + }, + }); + + const currentWorkflowStatusCombination = getStatusCombinationFromArray( + workflow.statuses || [], + ); + + const newWorkflowStatusCombination = getStatusCombinationFromUpdate( + currentWorkflowStatusCombination, + statusUpdate.previousStatus, + statusUpdate.newStatus, + ); + + if (newWorkflowStatusCombination === currentWorkflowStatusCombination) { + return; + } + + await workflowRepository.update( + { + id: statusUpdate.workflowId, + }, + { + statuses: getWorkflowStatusesFromCombination( + newWorkflowStatusCombination, + ), + }, + ); + } + + private async handleWorkflowVersionDeleted( + workflowId: string, + ): Promise { + const workflowRepository = + await this.twentyORMManager.getRepository( + 'workflow', + ); + + const workflow = await workflowRepository.findOneOrFail({ + where: { + id: workflowId, + }, + }); + + const currentWorkflowStatusCombination = getStatusCombinationFromArray( + workflow.statuses || [], + ); + + const newWorkflowStatusCombination = getStatusCombinationFromUpdate( + currentWorkflowStatusCombination, + WorkflowVersionStatus.DRAFT, + undefined, + ); + + if (newWorkflowStatusCombination === currentWorkflowStatusCombination) { + return; + } + + await workflowRepository.update( + { + id: workflowId, + }, + { + statuses: getWorkflowStatusesFromCombination( + newWorkflowStatusCombination, + ), + }, + ); + } +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/listeners/workflow-version-status.listener.ts b/packages/twenty-server/src/modules/workflow/workflow-status/listeners/workflow-version-status.listener.ts new file mode 100644 index 0000000000..f9fe6aa1b5 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/listeners/workflow-version-status.listener.ts @@ -0,0 +1,96 @@ +import { Injectable } from '@nestjs/common'; +import { OnEvent } from '@nestjs/event-emitter'; + +import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event'; +import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; +import { + WorkflowVersionStatus, + WorkflowVersionWorkspaceEntity, +} from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; +import { + WorkflowStatusesUpdateJob, + WorkflowVersionBatchEvent, + WorkflowVersionEventType, + WorkflowVersionStatusUpdate, +} from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job'; + +@Injectable() +export class WorkflowVersionStatusListener { + constructor( + @InjectMessageQueue(MessageQueue.workflowQueue) + private readonly messageQueueService: MessageQueueService, + ) {} + + @OnEvent('workflowVersion.created') + async handleWorkflowVersionCreated( + payload: WorkspaceEventBatch< + ObjectRecordCreateEvent + >, + ): Promise { + const workflowIds = payload.events + .filter( + (event) => + !event.properties.after.status || + event.properties.after.status === WorkflowVersionStatus.DRAFT, + ) + .map((event) => event.properties.after.workflowId); + + if (workflowIds.length === 0) { + return; + } + + await this.messageQueueService.add( + WorkflowStatusesUpdateJob.name, + { + type: WorkflowVersionEventType.CREATE, + workspaceId: payload.workspaceId, + workflowIds, + }, + ); + } + + @OnEvent('workflowVersion.statusUpdated') + async handleWorkflowVersionUpdated( + payload: WorkspaceEventBatch, + ): Promise { + await this.messageQueueService.add( + WorkflowStatusesUpdateJob.name, + { + type: WorkflowVersionEventType.STATUS_UPDATE, + workspaceId: payload.workspaceId, + statusUpdates: payload.events, + }, + ); + } + + @OnEvent('workflowVersion.deleted') + async handleWorkflowVersionDeleted( + payload: WorkspaceEventBatch< + ObjectRecordDeleteEvent + >, + ): Promise { + const workflowIds = payload.events + .filter( + (event) => + event.properties.before.status === WorkflowVersionStatus.DRAFT, + ) + .map((event) => event.properties.before.workflowId); + + if (workflowIds.length === 0) { + return; + } + + await this.messageQueueService.add( + WorkflowStatusesUpdateJob.name, + { + type: WorkflowVersionEventType.DELETE, + workspaceId: payload.workspaceId, + workflowIds, + }, + ); + } +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/utils/get-status-combination-from-array.util.ts b/packages/twenty-server/src/modules/workflow/workflow-status/utils/get-status-combination-from-array.util.ts new file mode 100644 index 0000000000..1df699db43 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/utils/get-status-combination-from-array.util.ts @@ -0,0 +1,37 @@ +import isEqual from 'lodash.isequal'; + +import { WorkflowStatus } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; +import { + ACTIVE_AND_DRAFT_STATUSES, + ACTIVE_STATUSES, + DEACTIVATED_AND_DRAFT_STATUSES, + DEACTIVATED_STATUSES, + DRAFT_STATUSES, +} from 'src/modules/workflow/workflow-status/workflow-status.constants'; +import { WorkflowStatusCombination } from 'src/modules/workflow/workflow-status/workflow-status.enums'; + +export const getStatusCombinationFromArray = ( + statuses: WorkflowStatus[], +): WorkflowStatusCombination => { + if (isEqual(statuses, ACTIVE_AND_DRAFT_STATUSES)) { + return WorkflowStatusCombination.ACTIVE_AND_DRAFT; + } + + if (isEqual(statuses, ACTIVE_STATUSES)) { + return WorkflowStatusCombination.ACTIVE; + } + + if (isEqual(statuses, DEACTIVATED_AND_DRAFT_STATUSES)) { + return WorkflowStatusCombination.DEACTIVATED_AND_DRAFT; + } + + if (isEqual(statuses, DEACTIVATED_STATUSES)) { + return WorkflowStatusCombination.DEACTIVATED; + } + + if (isEqual(statuses, DRAFT_STATUSES)) { + return WorkflowStatusCombination.DRAFT; + } + + return WorkflowStatusCombination.NO_STATUSES; +}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/utils/get-status-combination-from-update.util.ts b/packages/twenty-server/src/modules/workflow/workflow-status/utils/get-status-combination-from-update.util.ts new file mode 100644 index 0000000000..ecf0fe2f9e --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/utils/get-status-combination-from-update.util.ts @@ -0,0 +1,75 @@ +import { WorkflowVersionStatus } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; +import { WorkflowStatusCombination } from 'src/modules/workflow/workflow-status/workflow-status.enums'; + +export const getStatusCombinationFromUpdate = ( + previousCombination: WorkflowStatusCombination, + statusToRemove?: WorkflowVersionStatus, + statusToAdd?: WorkflowVersionStatus, +): WorkflowStatusCombination => { + switch (previousCombination) { + case WorkflowStatusCombination.ACTIVE_AND_DRAFT: + if ( + statusToAdd === WorkflowVersionStatus.ACTIVE && + statusToRemove === WorkflowVersionStatus.DRAFT + ) { + return WorkflowStatusCombination.ACTIVE; + } + if (statusToRemove === WorkflowVersionStatus.DRAFT) { + return WorkflowStatusCombination.ACTIVE; + } + break; + case WorkflowStatusCombination.DEACTIVATED_AND_DRAFT: + if ( + statusToRemove === WorkflowVersionStatus.DRAFT && + statusToAdd === WorkflowVersionStatus.ACTIVE + ) { + return WorkflowStatusCombination.ACTIVE; + } + if (statusToRemove === WorkflowVersionStatus.DRAFT) { + return WorkflowStatusCombination.DEACTIVATED; + } + break; + case WorkflowStatusCombination.ACTIVE: + if ( + statusToRemove === WorkflowVersionStatus.ACTIVE && + statusToAdd === WorkflowVersionStatus.DEACTIVATED + ) { + return WorkflowStatusCombination.DEACTIVATED; + } + if (!statusToRemove && statusToAdd === WorkflowVersionStatus.DRAFT) { + return WorkflowStatusCombination.ACTIVE_AND_DRAFT; + } + break; + case WorkflowStatusCombination.DEACTIVATED: + if ( + statusToRemove === WorkflowVersionStatus.DEACTIVATED && + statusToAdd === WorkflowVersionStatus.ACTIVE + ) { + return WorkflowStatusCombination.ACTIVE; + } + if (!statusToRemove && statusToAdd === WorkflowVersionStatus.DRAFT) { + return WorkflowStatusCombination.DEACTIVATED_AND_DRAFT; + } + break; + case WorkflowStatusCombination.DRAFT: + if ( + statusToRemove === WorkflowVersionStatus.DRAFT && + statusToAdd === WorkflowVersionStatus.ACTIVE + ) { + return WorkflowStatusCombination.ACTIVE; + } + if (statusToRemove === WorkflowVersionStatus.DRAFT) { + return WorkflowStatusCombination.NO_STATUSES; + } + break; + case WorkflowStatusCombination.NO_STATUSES: + if (statusToAdd === WorkflowVersionStatus.DRAFT) { + return WorkflowStatusCombination.DRAFT; + } + break; + default: + break; + } + + return previousCombination; +}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/utils/get-statuses-from-combination.util.ts b/packages/twenty-server/src/modules/workflow/workflow-status/utils/get-statuses-from-combination.util.ts new file mode 100644 index 0000000000..e8f65beae5 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/utils/get-statuses-from-combination.util.ts @@ -0,0 +1,29 @@ +import { WorkflowStatus } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; +import { + ACTIVE_AND_DRAFT_STATUSES, + ACTIVE_STATUSES, + DEACTIVATED_AND_DRAFT_STATUSES, + DEACTIVATED_STATUSES, + DRAFT_STATUSES, + NO_STATUSES, +} from 'src/modules/workflow/workflow-status/workflow-status.constants'; +import { WorkflowStatusCombination } from 'src/modules/workflow/workflow-status/workflow-status.enums'; + +export const getWorkflowStatusesFromCombination = ( + combination: WorkflowStatusCombination, +): WorkflowStatus[] => { + switch (combination) { + case WorkflowStatusCombination.ACTIVE: + return ACTIVE_STATUSES; + case WorkflowStatusCombination.DRAFT: + return DRAFT_STATUSES; + case WorkflowStatusCombination.DEACTIVATED: + return DEACTIVATED_STATUSES; + case WorkflowStatusCombination.ACTIVE_AND_DRAFT: + return ACTIVE_AND_DRAFT_STATUSES; + case WorkflowStatusCombination.DEACTIVATED_AND_DRAFT: + return DEACTIVATED_AND_DRAFT_STATUSES; + case WorkflowStatusCombination.NO_STATUSES: + return NO_STATUSES; + } +}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.constants.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.constants.ts new file mode 100644 index 0000000000..48feac7a65 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.constants.ts @@ -0,0 +1,19 @@ +import { WorkflowStatus } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; + +export const ACTIVE_AND_DRAFT_STATUSES = [ + WorkflowStatus.ACTIVE, + WorkflowStatus.DRAFT, +]; + +export const DEACTIVATED_AND_DRAFT_STATUSES = [ + WorkflowStatus.DEACTIVATED, + WorkflowStatus.DRAFT, +]; + +export const ACTIVE_STATUSES = [WorkflowStatus.ACTIVE]; + +export const DEACTIVATED_STATUSES = [WorkflowStatus.DEACTIVATED]; + +export const DRAFT_STATUSES = [WorkflowStatus.DRAFT]; + +export const NO_STATUSES = []; diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.enums.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.enums.ts new file mode 100644 index 0000000000..58e9d8e99b --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.enums.ts @@ -0,0 +1,8 @@ +export enum WorkflowStatusCombination { + ACTIVE = 'ACTIVE', + DRAFT = 'DRAFT', + DEACTIVATED = 'DEACTIVATED', + ACTIVE_AND_DRAFT = 'ACTIVE_AND_DRAFT', + DEACTIVATED_AND_DRAFT = 'DEACTIVATED_AND_DRAFT', + NO_STATUSES = 'NO_STATUSES', +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts new file mode 100644 index 0000000000..57c69f530e --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts @@ -0,0 +1,9 @@ +import { Module } from '@nestjs/common'; + +import { WorkflowStatusesUpdateJob } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job'; +import { WorkflowVersionStatusListener } from 'src/modules/workflow/workflow-status/listeners/workflow-version-status.listener'; + +@Module({ + providers: [WorkflowStatusesUpdateJob, WorkflowVersionStatusListener], +}) +export class WorkflowStatusModule {} diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.workspace-service.ts index 6e225bdcde..8142bc2545 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.workspace-service.ts @@ -5,15 +5,21 @@ import { EntityManager } from 'typeorm'; import { buildCreatedByFromWorkspaceMember } from 'src/engine/core-modules/actor/utils/build-created-by-from-workspace-member.util'; import { User } from 'src/engine/core-modules/user/user.entity'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; +import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.repository'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; import { WorkflowVersionStatus, WorkflowVersionWorkspaceEntity, } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; -import { WorkflowTriggerType } from 'src/modules/workflow/common/types/workflow-trigger.type'; +import { + WorkflowTrigger, + WorkflowTriggerType, +} from 'src/modules/workflow/common/types/workflow-trigger.type'; import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workflow-common.workspace-service'; import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-runner.workspace-service'; +import { WorkflowVersionStatusUpdate } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job'; import { DatabaseEventTriggerService } from 'src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.service'; import { assertVersionCanBeActivated } from 'src/modules/workflow/workflow-trigger/utils/assert-version-can-be-activated.util'; import { @@ -29,6 +35,7 @@ export class WorkflowTriggerWorkspaceService { private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, private readonly workflowRunnerWorkspaceService: WorkflowRunnerWorkspaceService, private readonly databaseEventTriggerService: DatabaseEventTriggerService, + private readonly workspaceEventEmitter: WorkspaceEventEmitter, ) {} async runWorkflowVersion( @@ -58,10 +65,19 @@ export class WorkflowTriggerWorkspaceService { ); } - async enableWorkflowTrigger(workflowVersionId: string) { + async activateWorkflowVersion(workflowVersionId: string) { + const workflowVersionRepository = + await this.twentyORMManager.getRepository( + 'workflowVersion', + ); + + const workflowVersionNullable = await workflowVersionRepository.findOne({ + where: { id: workflowVersionId }, + }); + const workflowVersion = - await this.workflowCommonWorkspaceService.getWorkflowVersionOrFail( - workflowVersionId, + await this.workflowCommonWorkspaceService.getValidWorkflowVersionOrFail( + workflowVersionNullable, ); const workflowRepository = @@ -91,38 +107,13 @@ export class WorkflowTriggerWorkspaceService { const manager = queryRunner.manager; try { - if ( - workflow.lastPublishedVersionId && - workflowVersionId !== workflow.lastPublishedVersionId - ) { - await this.disableWorkflowTriggerWithManager( - workflow.lastPublishedVersionId, - manager, - ); - } - - await this.activateWorkflowVersion( - workflowVersion.workflowId, - workflowVersionId, + await this.performActivationSteps( + workflow, + workflowVersion, + workflowRepository, + workflowVersionRepository, manager, ); - await workflowRepository.update( - { id: workflow.id }, - { lastPublishedVersionId: workflowVersionId }, - manager, - ); - - switch (workflowVersion.trigger.type) { - case WorkflowTriggerType.DATABASE_EVENT: - await this.databaseEventTriggerService.createEventListener( - workflowVersion.workflowId, - workflowVersion.trigger, - manager, - ); - break; - default: - break; - } await queryRunner.commitTransaction(); @@ -135,7 +126,7 @@ export class WorkflowTriggerWorkspaceService { } } - async disableWorkflowTrigger(workflowVersionId: string) { + async deactivateWorkflowVersion(workflowVersionId: string) { const workspaceDataSource = await this.twentyORMManager.getDatasource(); const queryRunner = workspaceDataSource.createQueryRunner(); @@ -143,8 +134,14 @@ export class WorkflowTriggerWorkspaceService { await queryRunner.startTransaction(); try { - await this.disableWorkflowTriggerWithManager( + const workflowVersionRepository = + await this.twentyORMManager.getRepository( + 'workflowVersion', + ); + + await this.performDeactivationSteps( workflowVersionId, + workflowVersionRepository, queryRunner.manager, ); @@ -159,64 +156,79 @@ export class WorkflowTriggerWorkspaceService { } } - private async disableWorkflowTriggerWithManager( - workflowVersionId: string, + private async performActivationSteps( + workflow: WorkflowWorkspaceEntity, + workflowVersion: Omit & { + trigger: WorkflowTrigger; + }, + workflowRepository: WorkspaceRepository, + workflowVersionRepository: WorkspaceRepository, manager: EntityManager, ) { - const workflowVersionRepository = - await this.twentyORMManager.getRepository( - 'workflowVersion', - ); - - const workflowVersion = await workflowVersionRepository.findOne({ - where: { id: workflowVersionId }, - }); - - if (!workflowVersion) { - throw new WorkflowTriggerException( - 'No workflow version found', - WorkflowTriggerExceptionCode.INVALID_INPUT, + if ( + workflow.lastPublishedVersionId && + workflowVersion.id !== workflow.lastPublishedVersionId + ) { + await this.performDeactivationSteps( + workflow.lastPublishedVersionId, + workflowVersionRepository, + manager, ); } - if (workflowVersion.status !== WorkflowVersionStatus.ACTIVE) { - throw new WorkflowTriggerException( - 'Cannot disable non-active workflow version', - WorkflowTriggerExceptionCode.INVALID_INPUT, - ); - } - - await workflowVersionRepository.update( - { id: workflowVersionId }, - { status: WorkflowVersionStatus.DEACTIVATED }, + await this.upgradeWorkflowVersion( + workflow, + workflowVersion.id, + workflowRepository, + workflowVersionRepository, manager, ); - switch (workflowVersion?.trigger?.type) { - case WorkflowTriggerType.DATABASE_EVENT: - await this.databaseEventTriggerService.deleteEventListener( - workflowVersion.workflowId, - manager, - ); - break; - default: - break; - } + await this.setActiveVersionStatus( + workflowVersion, + workflowVersionRepository, + manager, + ); + + await this.enableTrigger(workflowVersion, manager); } - private async activateWorkflowVersion( - workflowId: string, + private async performDeactivationSteps( workflowVersionId: string, + workflowVersionRepository: WorkspaceRepository, manager: EntityManager, ) { - const workflowVersionRepository = - await this.twentyORMManager.getRepository( - 'workflowVersion', + const workflowVersionNullable = await workflowVersionRepository.findOne({ + where: { id: workflowVersionId }, + }); + + const workflowVersion = + await this.workflowCommonWorkspaceService.getValidWorkflowVersionOrFail( + workflowVersionNullable, ); + await this.setDeactivatedVersionStatus( + workflowVersion, + workflowVersionRepository, + manager, + ); + + await this.disableTrigger(workflowVersion, manager); + } + + private async setActiveVersionStatus( + workflowVersion: Omit & { + trigger: WorkflowTrigger; + }, + workflowVersionRepository: WorkspaceRepository, + manager: EntityManager, + ) { const activeWorkflowVersions = await workflowVersionRepository.find( { - where: { workflowId, status: WorkflowVersionStatus.ACTIVE }, + where: { + workflowId: workflowVersion.workflowId, + status: WorkflowVersionStatus.ACTIVE, + }, }, manager, ); @@ -229,9 +241,132 @@ export class WorkflowTriggerWorkspaceService { } await workflowVersionRepository.update( - { id: workflowVersionId }, + { id: workflowVersion.id }, { status: WorkflowVersionStatus.ACTIVE }, manager, ); + + this.emitStatusUpdateEventOrThrow( + workflowVersion.workflowId, + workflowVersion.status, + WorkflowVersionStatus.ACTIVE, + ); + } + + private async setDeactivatedVersionStatus( + workflowVersion: Omit & { + trigger: WorkflowTrigger; + }, + workflowVersionRepository: WorkspaceRepository, + manager: EntityManager, + ) { + if (workflowVersion.status !== WorkflowVersionStatus.ACTIVE) { + throw new WorkflowTriggerException( + 'Cannot disable non-active workflow version', + WorkflowTriggerExceptionCode.FORBIDDEN, + ); + } + + await workflowVersionRepository.update( + { id: workflowVersion.id }, + { status: WorkflowVersionStatus.DEACTIVATED }, + manager, + ); + + this.emitStatusUpdateEventOrThrow( + workflowVersion.workflowId, + workflowVersion.status, + WorkflowVersionStatus.DEACTIVATED, + ); + } + + private async upgradeWorkflowVersion( + workflow: WorkflowWorkspaceEntity, + newPublishedVersionId: string, + workflowRepository: WorkspaceRepository, + workflowVersionRepository: WorkspaceRepository, + manager: EntityManager, + ) { + if (workflow.lastPublishedVersionId === newPublishedVersionId) { + return; + } + + if (workflow.lastPublishedVersionId) { + await workflowVersionRepository.update( + { id: workflow.lastPublishedVersionId }, + { status: WorkflowVersionStatus.ARCHIVED }, + manager, + ); + } + + await workflowRepository.update( + { id: workflow.id }, + { lastPublishedVersionId: newPublishedVersionId }, + manager, + ); + } + + private async enableTrigger( + workflowVersion: Omit & { + trigger: WorkflowTrigger; + }, + manager: EntityManager, + ) { + switch (workflowVersion.trigger.type) { + case WorkflowTriggerType.DATABASE_EVENT: + await this.databaseEventTriggerService.createEventListener( + workflowVersion.workflowId, + workflowVersion.trigger, + manager, + ); + break; + default: + break; + } + } + + private async disableTrigger( + workflowVersion: Omit & { + trigger: WorkflowTrigger; + }, + manager: EntityManager, + ) { + switch (workflowVersion.trigger.type) { + case WorkflowTriggerType.DATABASE_EVENT: + await this.databaseEventTriggerService.deleteEventListener( + workflowVersion.workflowId, + manager, + ); + break; + default: + break; + } + } + + private emitStatusUpdateEventOrThrow( + workflowId: string, + previousStatus: WorkflowVersionStatus, + newStatus: WorkflowVersionStatus, + ) { + const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId; + + if (!workspaceId) { + throw new WorkflowTriggerException( + 'No workspace id found', + WorkflowTriggerExceptionCode.INTERNAL_ERROR, + ); + } + + this.workspaceEventEmitter.emit( + 'workflowVersion.statusUpdated', + [ + { + workflowId, + previousStatus, + newStatus, + } satisfies WorkflowVersionStatusUpdate, + ], + workspaceId, + ); } } diff --git a/packages/twenty-server/src/modules/workflow/workflow.module.ts b/packages/twenty-server/src/modules/workflow/workflow.module.ts index 5f794b972a..08456972b3 100644 --- a/packages/twenty-server/src/modules/workflow/workflow.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow.module.ts @@ -1,8 +1,9 @@ import { Module } from '@nestjs/common'; +import { WorkflowStatusModule } from 'src/modules/workflow/workflow-status/workflow-status.module'; import { WorkflowTriggerModule } from 'src/modules/workflow/workflow-trigger/workflow-trigger.module'; @Module({ - imports: [WorkflowTriggerModule], + imports: [WorkflowTriggerModule, WorkflowStatusModule], }) export class WorkflowModule {}