From 8c8f1927653823f9db3a720131c6a0dbcc84eadc Mon Sep 17 00:00:00 2001 From: Thomas Trompette Date: Thu, 1 Aug 2024 11:57:44 +0200 Subject: [PATCH] Trigger workflow on database event (#6480) - Add global listener on database event - Fetch event listener associated - Trigger associated workflow Also updated the runner so it expects the input to be in the payload rather than the trigger --- .../engine/core-modules/core-engine.module.ts | 4 +- ...ule.ts => core-workflow-trigger.module.ts} | 4 +- .../dtos/run-workflow-version-input.dto.ts | 18 ++++ .../workflow/workflow-trigger.resolver.ts | 10 +- .../integrations/message-queue/jobs.module.ts | 2 + .../common/types/workflow-trigger.type.ts | 9 +- .../common/workflow-common.services.ts | 2 +- .../workflow-runner/workflow-runner.job.ts | 34 +++++-- .../jobs/workflow-event-trigger.job.ts | 36 +++++++ .../jobs/workflow-trigger-job.module.ts | 10 ++ .../database-event-trigger.listener.ts | 98 +++++++++++++++++++ .../workflow-trigger-listener.module.ts | 10 ++ .../workflow-trigger.module.ts | 9 ++ .../workflow-trigger.service.ts | 61 +++++++++--- .../src/modules/workflow/workflow.module.ts | 3 +- 15 files changed, 271 insertions(+), 39 deletions(-) rename packages/twenty-server/src/engine/core-modules/workflow/{workflow-trigger.module.ts => core-workflow-trigger.module.ts} (93%) create mode 100644 packages/twenty-server/src/engine/core-modules/workflow/dtos/run-workflow-version-input.dto.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-trigger/listeners/database-event-trigger.listener.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-trigger/listeners/workflow-trigger-listener.module.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.module.ts diff --git a/packages/twenty-server/src/engine/core-modules/core-engine.module.ts b/packages/twenty-server/src/engine/core-modules/core-engine.module.ts index 91d7c3e20f..11e07c91da 100644 --- a/packages/twenty-server/src/engine/core-modules/core-engine.module.ts +++ b/packages/twenty-server/src/engine/core-modules/core-engine.module.ts @@ -11,7 +11,7 @@ import { TimelineMessagingModule } from 'src/engine/core-modules/messaging/timel import { OpenApiModule } from 'src/engine/core-modules/open-api/open-api.module'; import { PostgresCredentialsModule } from 'src/engine/core-modules/postgres-credentials/postgres-credentials.module'; import { UserModule } from 'src/engine/core-modules/user/user.module'; -import { WorkflowTriggerModule } from 'src/engine/core-modules/workflow/workflow-trigger.module'; +import { WorkflowTriggerCoreModule } from 'src/engine/core-modules/workflow/core-workflow-trigger.module'; import { WorkspaceModule } from 'src/engine/core-modules/workspace/workspace.module'; import { AnalyticsModule } from './analytics/analytics.module'; @@ -35,7 +35,7 @@ import { FileModule } from './file/file.module'; WorkspaceModule, AISQLQueryModule, PostgresCredentialsModule, - WorkflowTriggerModule, + WorkflowTriggerCoreModule, ], exports: [ AnalyticsModule, diff --git a/packages/twenty-server/src/engine/core-modules/workflow/workflow-trigger.module.ts b/packages/twenty-server/src/engine/core-modules/workflow/core-workflow-trigger.module.ts similarity index 93% rename from packages/twenty-server/src/engine/core-modules/workflow/workflow-trigger.module.ts rename to packages/twenty-server/src/engine/core-modules/workflow/core-workflow-trigger.module.ts index 4b4d66018c..821b31d9d4 100644 --- a/packages/twenty-server/src/engine/core-modules/workflow/workflow-trigger.module.ts +++ b/packages/twenty-server/src/engine/core-modules/workflow/core-workflow-trigger.module.ts @@ -2,11 +2,11 @@ import { Module } from '@nestjs/common'; import { WorkflowTriggerResolver } from 'src/engine/core-modules/workflow/workflow-trigger.resolver'; import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module'; -import { WorkflowTriggerService } from 'src/modules/workflow/workflow-trigger/workflow-trigger.service'; import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module'; +import { WorkflowTriggerService } from 'src/modules/workflow/workflow-trigger/workflow-trigger.service'; @Module({ imports: [WorkflowCommonModule, WorkflowRunnerModule], providers: [WorkflowTriggerService, WorkflowTriggerResolver], }) -export class WorkflowTriggerModule {} +export class WorkflowTriggerCoreModule {} diff --git a/packages/twenty-server/src/engine/core-modules/workflow/dtos/run-workflow-version-input.dto.ts b/packages/twenty-server/src/engine/core-modules/workflow/dtos/run-workflow-version-input.dto.ts new file mode 100644 index 0000000000..a8e78b3534 --- /dev/null +++ b/packages/twenty-server/src/engine/core-modules/workflow/dtos/run-workflow-version-input.dto.ts @@ -0,0 +1,18 @@ +import { Field, InputType } from '@nestjs/graphql'; + +import graphqlTypeJson from 'graphql-type-json'; + +@InputType() +export class RunWorkflowVersionInput { + @Field(() => String, { + description: 'Workflow version ID', + nullable: false, + }) + workflowVersionId: string; + + @Field(() => graphqlTypeJson, { + description: 'Execution result in JSON format', + nullable: true, + }) + payload?: JSON; +} diff --git a/packages/twenty-server/src/engine/core-modules/workflow/workflow-trigger.resolver.ts b/packages/twenty-server/src/engine/core-modules/workflow/workflow-trigger.resolver.ts index 5c39f4605d..078ac0d7e3 100644 --- a/packages/twenty-server/src/engine/core-modules/workflow/workflow-trigger.resolver.ts +++ b/packages/twenty-server/src/engine/core-modules/workflow/workflow-trigger.resolver.ts @@ -1,12 +1,13 @@ import { UseGuards } from '@nestjs/common'; import { Args, Mutation, Resolver } from '@nestjs/graphql'; +import { RunWorkflowVersionInput } from 'src/engine/core-modules/workflow/dtos/run-workflow-version-input.dto'; +import { WorkflowTriggerResultDTO } from 'src/engine/core-modules/workflow/dtos/workflow-trigger-result.dto'; import { workflowTriggerGraphqlApiExceptionHandler } from 'src/engine/core-modules/workflow/utils/workflow-trigger-graphql-api-exception-handler.util'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { AuthWorkspace } from 'src/engine/decorators/auth/auth-workspace.decorator'; import { JwtAuthGuard } from 'src/engine/guards/jwt.auth.guard'; import { WorkflowTriggerService } from 'src/modules/workflow/workflow-trigger/workflow-trigger.service'; -import { WorkflowTriggerResultDTO } from 'src/engine/core-modules/workflow/dtos/workflow-trigger-result.dto'; @UseGuards(JwtAuthGuard) @Resolver() @@ -31,15 +32,16 @@ export class WorkflowTriggerResolver { } @Mutation(() => WorkflowTriggerResultDTO) - async triggerWorkflow( + async runWorkflowVersion( @AuthWorkspace() { id: workspaceId }: Workspace, - @Args('workflowVersionId') workflowVersionId: string, + @Args('input') { workflowVersionId, payload }: RunWorkflowVersionInput, ) { try { return { - result: await this.workflowTriggerService.runWorkflow( + result: await this.workflowTriggerService.runWorkflowVersion( workspaceId, workflowVersionId, + payload ?? {}, ), }; } catch (error) { diff --git a/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts b/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts index b8eee0bbbb..6dc82d5cb9 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts @@ -23,6 +23,7 @@ import { AutoCompaniesAndContactsCreationJobModule } from 'src/modules/contact-c import { MessagingModule } from 'src/modules/messaging/messaging.module'; import { TimelineJobModule } from 'src/modules/timeline/jobs/timeline-job.module'; import { TimelineActivityModule } from 'src/modules/timeline/timeline-activity.module'; +import { WorkflowModule } from 'src/modules/workflow/workflow.module'; @Module({ imports: [ @@ -43,6 +44,7 @@ import { TimelineActivityModule } from 'src/modules/timeline/timeline-activity.m WorkspaceQueryRunnerJobModule, AutoCompaniesAndContactsCreationJobModule, TimelineJobModule, + WorkflowModule, ], providers: [ CleanInactiveWorkspaceJob, diff --git a/packages/twenty-server/src/modules/workflow/common/types/workflow-trigger.type.ts b/packages/twenty-server/src/modules/workflow/common/types/workflow-trigger.type.ts index ffd25f4df9..66f981cf80 100644 --- a/packages/twenty-server/src/modules/workflow/common/types/workflow-trigger.type.ts +++ b/packages/twenty-server/src/modules/workflow/common/types/workflow-trigger.type.ts @@ -2,7 +2,6 @@ import { WorkflowAction } from 'src/modules/workflow/common/types/workflow-actio export enum WorkflowTriggerType { DATABASE_EVENT = 'DATABASE_EVENT', - MANUAL = 'MANUAL', } type BaseTrigger = { @@ -19,10 +18,4 @@ export type WorkflowDatabaseEventTrigger = BaseTrigger & { }; }; -type WorkflowManualTrigger = BaseTrigger & { - type: WorkflowTriggerType.MANUAL; -}; - -export type WorkflowTrigger = - | WorkflowDatabaseEventTrigger - | WorkflowManualTrigger; +export type WorkflowTrigger = WorkflowDatabaseEventTrigger; diff --git a/packages/twenty-server/src/modules/workflow/common/workflow-common.services.ts b/packages/twenty-server/src/modules/workflow/common/workflow-common.services.ts index 7012de0865..baa23b8e1a 100644 --- a/packages/twenty-server/src/modules/workflow/common/workflow-common.services.ts +++ b/packages/twenty-server/src/modules/workflow/common/workflow-common.services.ts @@ -2,11 +2,11 @@ import { Injectable } from '@nestjs/common'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; +import { WorkflowTrigger } from 'src/modules/workflow/common/types/workflow-trigger.type'; import { WorkflowTriggerException, WorkflowTriggerExceptionCode, } from 'src/modules/workflow/workflow-trigger/workflow-trigger.exception'; -import { WorkflowTrigger } from 'src/modules/workflow/common/types/workflow-trigger.type'; @Injectable() export class WorkflowCommonService { diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts index 46a742e089..b2937ffd17 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.job.ts @@ -1,32 +1,54 @@ +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; -import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; -import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service'; +import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; import { WorkflowCommonService } from 'src/modules/workflow/common/workflow-common.services'; +import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service'; -type RunWorkflowJobData = { workspaceId: string; workflowVersionId: string }; +export type RunWorkflowJobData = { + workspaceId: string; + workflowId: string; + payload: object; +}; @Processor(MessageQueue.workflowQueue) export class WorkflowRunnerJob { constructor( private readonly workflowCommonService: WorkflowCommonService, private readonly workflowRunnerService: WorkflowRunnerService, + private readonly twentyORMGlobalManager: TwentyORMGlobalManager, ) {} @Process(WorkflowRunnerJob.name) async handle({ workspaceId, - workflowVersionId, + workflowId, + payload, }: RunWorkflowJobData): Promise { + const workflowRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + workspaceId, + 'workflow', + ); + + const workflow = await workflowRepository.findOneByOrFail({ + id: workflowId, + }); + + if (!workflow.publishedVersionId) { + throw new Error('Workflow has no published version'); + } + const workflowVersion = await this.workflowCommonService.getWorkflowVersion( workspaceId, - workflowVersionId, + workflow.publishedVersionId, ); await this.workflowRunnerService.run({ action: workflowVersion.trigger.nextAction, workspaceId, - payload: workflowVersion.trigger.input, + payload, }); } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts new file mode 100644 index 0000000000..5370d04c1e --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job.ts @@ -0,0 +1,36 @@ +import { Logger } from '@nestjs/common'; + +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { + RunWorkflowJobData, + WorkflowRunnerJob, +} from 'src/modules/workflow/workflow-runner/workflow-runner.job'; + +export type WorkflowEventTriggerJobData = { + workspaceId: string; + workflowId: string; + payload: object; +}; + +@Processor(MessageQueue.workflowQueue) +export class WorkflowEventTriggerJob { + private readonly logger = new Logger(WorkflowEventTriggerJob.name); + + constructor( + @InjectMessageQueue(MessageQueue.workflowQueue) + private readonly messageQueueService: MessageQueueService, + ) {} + + @Process(WorkflowEventTriggerJob.name) + async handle(data: WorkflowEventTriggerJobData): Promise { + this.messageQueueService.add(WorkflowRunnerJob.name, { + workspaceId: data.workspaceId, + workflowId: data.workflowId, + payload: data.payload, + }); + } +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module.ts new file mode 100644 index 0000000000..8d11ad4cb7 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module.ts @@ -0,0 +1,10 @@ +import { Module } from '@nestjs/common'; + +import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module'; +import { WorkflowEventTriggerJob } from 'src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job'; + +@Module({ + imports: [WorkflowRunnerModule], + providers: [WorkflowEventTriggerJob], +}) +export class WorkflowTriggerJobModule {} diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/listeners/database-event-trigger.listener.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/listeners/database-event-trigger.listener.ts new file mode 100644 index 0000000000..3bf1eaa7e4 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/listeners/database-event-trigger.listener.ts @@ -0,0 +1,98 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { OnEvent } from '@nestjs/event-emitter'; + +import { FeatureFlagKey } from 'src/engine/core-modules/feature-flag/enums/feature-flag-key.enum'; +import { IsFeatureEnabledService } from 'src/engine/core-modules/feature-flag/services/is-feature-enabled.service'; +import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event'; +import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event'; +import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/types/object-record-update.event'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity'; +import { + WorkflowEventTriggerJob, + WorkflowEventTriggerJobData, +} from 'src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job'; + +@Injectable() +export class DatabaseEventTriggerListener { + private readonly logger = new Logger('DatabaseEventTriggerListener'); + + constructor( + private readonly twentyORMGlobalManager: TwentyORMGlobalManager, + @InjectMessageQueue(MessageQueue.workflowQueue) + private readonly messageQueueService: MessageQueueService, + private readonly isFeatureFlagEnabledService: IsFeatureEnabledService, + ) {} + + @OnEvent('*.created') + async handleObjectRecordCreateEvent(payload: ObjectRecordCreateEvent) { + await this.handleEvent(payload); + } + + @OnEvent('*.updated') + async handleObjectRecordUpdateEvent(payload: ObjectRecordUpdateEvent) { + await this.handleEvent(payload); + } + + @OnEvent('*.deleted') + async handleObjectRecordDeleteEvent(payload: ObjectRecordDeleteEvent) { + await this.handleEvent(payload); + } + + private async handleEvent( + payload: + | ObjectRecordCreateEvent + | ObjectRecordUpdateEvent + | ObjectRecordDeleteEvent, + ) { + const workspaceId = payload.workspaceId; + const eventName = payload.name; + + if (!workspaceId || !eventName) { + this.logger.error( + `Missing workspaceId or eventName in payload ${JSON.stringify( + payload, + )}`, + ); + + return; + } + + const isWorkflowEnabled = + await this.isFeatureFlagEnabledService.isFeatureEnabled( + FeatureFlagKey.IsWorkflowEnabled, + workspaceId, + ); + + if (!isWorkflowEnabled) { + return; + } + + const workflowEventListenerRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + workspaceId, + 'workflowEventListener', + ); + + const eventListeners = await workflowEventListenerRepository.find({ + where: { + eventName, + }, + }); + + for (const eventListener of eventListeners) { + this.messageQueueService.add( + WorkflowEventTriggerJob.name, + { + workspaceId, + workflowId: eventListener.workflowId, + payload, + }, + { retryLimit: 3 }, + ); + } + } +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/listeners/workflow-trigger-listener.module.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/listeners/workflow-trigger-listener.module.ts new file mode 100644 index 0000000000..0c8b8afec9 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/listeners/workflow-trigger-listener.module.ts @@ -0,0 +1,10 @@ +import { Module } from '@nestjs/common'; + +import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module'; +import { DatabaseEventTriggerListener } from 'src/modules/workflow/workflow-trigger/listeners/database-event-trigger.listener'; + +@Module({ + imports: [FeatureFlagModule], + providers: [DatabaseEventTriggerListener], +}) +export class WorkflowTriggerListenerModule {} diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.module.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.module.ts new file mode 100644 index 0000000000..6ffdefb0e1 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.module.ts @@ -0,0 +1,9 @@ +import { Module } from '@nestjs/common'; + +import { WorkflowTriggerJobModule } from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module'; +import { WorkflowTriggerListenerModule } from 'src/modules/workflow/workflow-trigger/listeners/workflow-trigger-listener.module'; + +@Module({ + imports: [WorkflowTriggerJobModule, WorkflowTriggerListenerModule], +}) +export class WorkflowTriggerModule {} diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.service.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.service.ts index 7f23bffb56..94b1720884 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.service.ts @@ -2,16 +2,17 @@ import { Injectable } from '@nestjs/common'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity'; -import { - WorkflowTriggerException, - WorkflowTriggerExceptionCode, -} from 'src/modules/workflow/workflow-trigger/workflow-trigger.exception'; +import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; import { WorkflowDatabaseEventTrigger, WorkflowTriggerType, } from 'src/modules/workflow/common/types/workflow-trigger.type'; import { WorkflowCommonService } from 'src/modules/workflow/common/workflow-common.services'; import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service'; +import { + WorkflowTriggerException, + WorkflowTriggerExceptionCode, +} from 'src/modules/workflow/workflow-trigger/workflow-trigger.exception'; @Injectable() export class WorkflowTriggerService { @@ -21,7 +22,11 @@ export class WorkflowTriggerService { private readonly workflowRunnerService: WorkflowRunnerService, ) {} - async runWorkflow(workspaceId: string, workflowVersionId: string) { + async runWorkflowVersion( + workspaceId: string, + workflowVersionId: string, + payload: object, + ) { const workflowVersion = await this.workflowCommonService.getWorkflowVersion( workspaceId, workflowVersionId, @@ -30,7 +35,7 @@ export class WorkflowTriggerService { return await this.workflowRunnerService.run({ action: workflowVersion.trigger.nextAction, workspaceId, - payload: workflowVersion.trigger.input, + payload, }); } @@ -42,9 +47,10 @@ export class WorkflowTriggerService { switch (workflowVersion.trigger.type) { case WorkflowTriggerType.DATABASE_EVENT: - await this.upsertWorkflowEventListener( + await this.upsertEventListenerAndPublishVersion( workspaceId, workflowVersion.workflowId, + workflowVersionId, workflowVersion.trigger, ); break; @@ -55,9 +61,10 @@ export class WorkflowTriggerService { return true; } - private async upsertWorkflowEventListener( + private async upsertEventListenerAndPublishVersion( workspaceId: string, workflowId: string, + workflowVersionId: string, trigger: WorkflowDatabaseEventTrigger, ) { const eventName = trigger?.settings?.eventName; @@ -75,17 +82,41 @@ export class WorkflowTriggerService { 'workflowEventListener', ); - // TODO: Use upsert when available for workspace entities - await workflowEventListenerRepository.delete({ - workflowId, - eventName, - }); - const workflowEventListener = await workflowEventListenerRepository.create({ workflowId, eventName, }); - await workflowEventListenerRepository.save(workflowEventListener); + const workspaceDataSource = + await this.twentyORMGlobalManager.getDataSourceForWorkspace(workspaceId); + + const workflowRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + workspaceId, + 'workflow', + ); + + await workspaceDataSource?.transaction(async (transactionManager) => { + // TODO: Use upsert when available for workspace entities + await workflowEventListenerRepository.delete( + { + workflowId, + eventName, + }, + transactionManager, + ); + + await workflowEventListenerRepository.save( + workflowEventListener, + {}, + transactionManager, + ); + + await workflowRepository.update( + { id: workflowId }, + { publishedVersionId: workflowVersionId }, + transactionManager, + ); + }); } } diff --git a/packages/twenty-server/src/modules/workflow/workflow.module.ts b/packages/twenty-server/src/modules/workflow/workflow.module.ts index 88b9529f95..97536cf904 100644 --- a/packages/twenty-server/src/modules/workflow/workflow.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow.module.ts @@ -1,8 +1,9 @@ import { Module } from '@nestjs/common'; import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module'; +import { WorkflowTriggerModule } from 'src/modules/workflow/workflow-trigger/workflow-trigger.module'; @Module({ - imports: [WorkflowRunnerModule], + imports: [WorkflowRunnerModule, WorkflowTriggerModule], }) export class WorkflowModule {}