Trigger workflow on database event (#6480)

- Add global listener on database event
- Fetch event listener associated
- Trigger associated workflow

Also updated the runner so it expects the input to be in the payload
rather than the trigger
This commit is contained in:
Thomas Trompette 2024-08-01 11:57:44 +02:00 committed by GitHub
parent ae423f5e75
commit 8c8f192765
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 271 additions and 39 deletions

View File

@ -11,7 +11,7 @@ import { TimelineMessagingModule } from 'src/engine/core-modules/messaging/timel
import { OpenApiModule } from 'src/engine/core-modules/open-api/open-api.module';
import { PostgresCredentialsModule } from 'src/engine/core-modules/postgres-credentials/postgres-credentials.module';
import { UserModule } from 'src/engine/core-modules/user/user.module';
import { WorkflowTriggerModule } from 'src/engine/core-modules/workflow/workflow-trigger.module';
import { WorkflowTriggerCoreModule } from 'src/engine/core-modules/workflow/core-workflow-trigger.module';
import { WorkspaceModule } from 'src/engine/core-modules/workspace/workspace.module';
import { AnalyticsModule } from './analytics/analytics.module';
@ -35,7 +35,7 @@ import { FileModule } from './file/file.module';
WorkspaceModule,
AISQLQueryModule,
PostgresCredentialsModule,
WorkflowTriggerModule,
WorkflowTriggerCoreModule,
],
exports: [
AnalyticsModule,

View File

@ -2,11 +2,11 @@ import { Module } from '@nestjs/common';
import { WorkflowTriggerResolver } from 'src/engine/core-modules/workflow/workflow-trigger.resolver';
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
import { WorkflowTriggerService } from 'src/modules/workflow/workflow-trigger/workflow-trigger.service';
import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module';
import { WorkflowTriggerService } from 'src/modules/workflow/workflow-trigger/workflow-trigger.service';
@Module({
imports: [WorkflowCommonModule, WorkflowRunnerModule],
providers: [WorkflowTriggerService, WorkflowTriggerResolver],
})
export class WorkflowTriggerModule {}
export class WorkflowTriggerCoreModule {}

View File

@ -0,0 +1,18 @@
import { Field, InputType } from '@nestjs/graphql';
import graphqlTypeJson from 'graphql-type-json';
@InputType()
export class RunWorkflowVersionInput {
@Field(() => String, {
description: 'Workflow version ID',
nullable: false,
})
workflowVersionId: string;
@Field(() => graphqlTypeJson, {
description: 'Execution result in JSON format',
nullable: true,
})
payload?: JSON;
}

View File

@ -1,12 +1,13 @@
import { UseGuards } from '@nestjs/common';
import { Args, Mutation, Resolver } from '@nestjs/graphql';
import { RunWorkflowVersionInput } from 'src/engine/core-modules/workflow/dtos/run-workflow-version-input.dto';
import { WorkflowTriggerResultDTO } from 'src/engine/core-modules/workflow/dtos/workflow-trigger-result.dto';
import { workflowTriggerGraphqlApiExceptionHandler } from 'src/engine/core-modules/workflow/utils/workflow-trigger-graphql-api-exception-handler.util';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { AuthWorkspace } from 'src/engine/decorators/auth/auth-workspace.decorator';
import { JwtAuthGuard } from 'src/engine/guards/jwt.auth.guard';
import { WorkflowTriggerService } from 'src/modules/workflow/workflow-trigger/workflow-trigger.service';
import { WorkflowTriggerResultDTO } from 'src/engine/core-modules/workflow/dtos/workflow-trigger-result.dto';
@UseGuards(JwtAuthGuard)
@Resolver()
@ -31,15 +32,16 @@ export class WorkflowTriggerResolver {
}
@Mutation(() => WorkflowTriggerResultDTO)
async triggerWorkflow(
async runWorkflowVersion(
@AuthWorkspace() { id: workspaceId }: Workspace,
@Args('workflowVersionId') workflowVersionId: string,
@Args('input') { workflowVersionId, payload }: RunWorkflowVersionInput,
) {
try {
return {
result: await this.workflowTriggerService.runWorkflow(
result: await this.workflowTriggerService.runWorkflowVersion(
workspaceId,
workflowVersionId,
payload ?? {},
),
};
} catch (error) {

View File

@ -23,6 +23,7 @@ import { AutoCompaniesAndContactsCreationJobModule } from 'src/modules/contact-c
import { MessagingModule } from 'src/modules/messaging/messaging.module';
import { TimelineJobModule } from 'src/modules/timeline/jobs/timeline-job.module';
import { TimelineActivityModule } from 'src/modules/timeline/timeline-activity.module';
import { WorkflowModule } from 'src/modules/workflow/workflow.module';
@Module({
imports: [
@ -43,6 +44,7 @@ import { TimelineActivityModule } from 'src/modules/timeline/timeline-activity.m
WorkspaceQueryRunnerJobModule,
AutoCompaniesAndContactsCreationJobModule,
TimelineJobModule,
WorkflowModule,
],
providers: [
CleanInactiveWorkspaceJob,

View File

@ -2,7 +2,6 @@ import { WorkflowAction } from 'src/modules/workflow/common/types/workflow-actio
export enum WorkflowTriggerType {
DATABASE_EVENT = 'DATABASE_EVENT',
MANUAL = 'MANUAL',
}
type BaseTrigger = {
@ -19,10 +18,4 @@ export type WorkflowDatabaseEventTrigger = BaseTrigger & {
};
};
type WorkflowManualTrigger = BaseTrigger & {
type: WorkflowTriggerType.MANUAL;
};
export type WorkflowTrigger =
| WorkflowDatabaseEventTrigger
| WorkflowManualTrigger;
export type WorkflowTrigger = WorkflowDatabaseEventTrigger;

View File

@ -2,11 +2,11 @@ import { Injectable } from '@nestjs/common';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import { WorkflowTrigger } from 'src/modules/workflow/common/types/workflow-trigger.type';
import {
WorkflowTriggerException,
WorkflowTriggerExceptionCode,
} from 'src/modules/workflow/workflow-trigger/workflow-trigger.exception';
import { WorkflowTrigger } from 'src/modules/workflow/common/types/workflow-trigger.type';
@Injectable()
export class WorkflowCommonService {

View File

@ -1,32 +1,54 @@
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
import { WorkflowCommonService } from 'src/modules/workflow/common/workflow-common.services';
import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service';
type RunWorkflowJobData = { workspaceId: string; workflowVersionId: string };
export type RunWorkflowJobData = {
workspaceId: string;
workflowId: string;
payload: object;
};
@Processor(MessageQueue.workflowQueue)
export class WorkflowRunnerJob {
constructor(
private readonly workflowCommonService: WorkflowCommonService,
private readonly workflowRunnerService: WorkflowRunnerService,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
) {}
@Process(WorkflowRunnerJob.name)
async handle({
workspaceId,
workflowVersionId,
workflowId,
payload,
}: RunWorkflowJobData): Promise<void> {
const workflowRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowWorkspaceEntity>(
workspaceId,
'workflow',
);
const workflow = await workflowRepository.findOneByOrFail({
id: workflowId,
});
if (!workflow.publishedVersionId) {
throw new Error('Workflow has no published version');
}
const workflowVersion = await this.workflowCommonService.getWorkflowVersion(
workspaceId,
workflowVersionId,
workflow.publishedVersionId,
);
await this.workflowRunnerService.run({
action: workflowVersion.trigger.nextAction,
workspaceId,
payload: workflowVersion.trigger.input,
payload,
});
}
}

View File

@ -0,0 +1,36 @@
import { Logger } from '@nestjs/common';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import {
RunWorkflowJobData,
WorkflowRunnerJob,
} from 'src/modules/workflow/workflow-runner/workflow-runner.job';
export type WorkflowEventTriggerJobData = {
workspaceId: string;
workflowId: string;
payload: object;
};
@Processor(MessageQueue.workflowQueue)
export class WorkflowEventTriggerJob {
private readonly logger = new Logger(WorkflowEventTriggerJob.name);
constructor(
@InjectMessageQueue(MessageQueue.workflowQueue)
private readonly messageQueueService: MessageQueueService,
) {}
@Process(WorkflowEventTriggerJob.name)
async handle(data: WorkflowEventTriggerJobData): Promise<void> {
this.messageQueueService.add<RunWorkflowJobData>(WorkflowRunnerJob.name, {
workspaceId: data.workspaceId,
workflowId: data.workflowId,
payload: data.payload,
});
}
}

View File

@ -0,0 +1,10 @@
import { Module } from '@nestjs/common';
import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module';
import { WorkflowEventTriggerJob } from 'src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job';
@Module({
imports: [WorkflowRunnerModule],
providers: [WorkflowEventTriggerJob],
})
export class WorkflowTriggerJobModule {}

View File

@ -0,0 +1,98 @@
import { Injectable, Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { FeatureFlagKey } from 'src/engine/core-modules/feature-flag/enums/feature-flag-key.enum';
import { IsFeatureEnabledService } from 'src/engine/core-modules/feature-flag/services/is-feature-enabled.service';
import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event';
import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event';
import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/types/object-record-update.event';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity';
import {
WorkflowEventTriggerJob,
WorkflowEventTriggerJobData,
} from 'src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job';
@Injectable()
export class DatabaseEventTriggerListener {
private readonly logger = new Logger('DatabaseEventTriggerListener');
constructor(
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
@InjectMessageQueue(MessageQueue.workflowQueue)
private readonly messageQueueService: MessageQueueService,
private readonly isFeatureFlagEnabledService: IsFeatureEnabledService,
) {}
@OnEvent('*.created')
async handleObjectRecordCreateEvent(payload: ObjectRecordCreateEvent<any>) {
await this.handleEvent(payload);
}
@OnEvent('*.updated')
async handleObjectRecordUpdateEvent(payload: ObjectRecordUpdateEvent<any>) {
await this.handleEvent(payload);
}
@OnEvent('*.deleted')
async handleObjectRecordDeleteEvent(payload: ObjectRecordDeleteEvent<any>) {
await this.handleEvent(payload);
}
private async handleEvent(
payload:
| ObjectRecordCreateEvent<any>
| ObjectRecordUpdateEvent<any>
| ObjectRecordDeleteEvent<any>,
) {
const workspaceId = payload.workspaceId;
const eventName = payload.name;
if (!workspaceId || !eventName) {
this.logger.error(
`Missing workspaceId or eventName in payload ${JSON.stringify(
payload,
)}`,
);
return;
}
const isWorkflowEnabled =
await this.isFeatureFlagEnabledService.isFeatureEnabled(
FeatureFlagKey.IsWorkflowEnabled,
workspaceId,
);
if (!isWorkflowEnabled) {
return;
}
const workflowEventListenerRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowEventListenerWorkspaceEntity>(
workspaceId,
'workflowEventListener',
);
const eventListeners = await workflowEventListenerRepository.find({
where: {
eventName,
},
});
for (const eventListener of eventListeners) {
this.messageQueueService.add<WorkflowEventTriggerJobData>(
WorkflowEventTriggerJob.name,
{
workspaceId,
workflowId: eventListener.workflowId,
payload,
},
{ retryLimit: 3 },
);
}
}
}

View File

@ -0,0 +1,10 @@
import { Module } from '@nestjs/common';
import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module';
import { DatabaseEventTriggerListener } from 'src/modules/workflow/workflow-trigger/listeners/database-event-trigger.listener';
@Module({
imports: [FeatureFlagModule],
providers: [DatabaseEventTriggerListener],
})
export class WorkflowTriggerListenerModule {}

View File

@ -0,0 +1,9 @@
import { Module } from '@nestjs/common';
import { WorkflowTriggerJobModule } from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger-job.module';
import { WorkflowTriggerListenerModule } from 'src/modules/workflow/workflow-trigger/listeners/workflow-trigger-listener.module';
@Module({
imports: [WorkflowTriggerJobModule, WorkflowTriggerListenerModule],
})
export class WorkflowTriggerModule {}

View File

@ -2,16 +2,17 @@ import { Injectable } from '@nestjs/common';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity';
import {
WorkflowTriggerException,
WorkflowTriggerExceptionCode,
} from 'src/modules/workflow/workflow-trigger/workflow-trigger.exception';
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
import {
WorkflowDatabaseEventTrigger,
WorkflowTriggerType,
} from 'src/modules/workflow/common/types/workflow-trigger.type';
import { WorkflowCommonService } from 'src/modules/workflow/common/workflow-common.services';
import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service';
import {
WorkflowTriggerException,
WorkflowTriggerExceptionCode,
} from 'src/modules/workflow/workflow-trigger/workflow-trigger.exception';
@Injectable()
export class WorkflowTriggerService {
@ -21,7 +22,11 @@ export class WorkflowTriggerService {
private readonly workflowRunnerService: WorkflowRunnerService,
) {}
async runWorkflow(workspaceId: string, workflowVersionId: string) {
async runWorkflowVersion(
workspaceId: string,
workflowVersionId: string,
payload: object,
) {
const workflowVersion = await this.workflowCommonService.getWorkflowVersion(
workspaceId,
workflowVersionId,
@ -30,7 +35,7 @@ export class WorkflowTriggerService {
return await this.workflowRunnerService.run({
action: workflowVersion.trigger.nextAction,
workspaceId,
payload: workflowVersion.trigger.input,
payload,
});
}
@ -42,9 +47,10 @@ export class WorkflowTriggerService {
switch (workflowVersion.trigger.type) {
case WorkflowTriggerType.DATABASE_EVENT:
await this.upsertWorkflowEventListener(
await this.upsertEventListenerAndPublishVersion(
workspaceId,
workflowVersion.workflowId,
workflowVersionId,
workflowVersion.trigger,
);
break;
@ -55,9 +61,10 @@ export class WorkflowTriggerService {
return true;
}
private async upsertWorkflowEventListener(
private async upsertEventListenerAndPublishVersion(
workspaceId: string,
workflowId: string,
workflowVersionId: string,
trigger: WorkflowDatabaseEventTrigger,
) {
const eventName = trigger?.settings?.eventName;
@ -75,17 +82,41 @@ export class WorkflowTriggerService {
'workflowEventListener',
);
// TODO: Use upsert when available for workspace entities
await workflowEventListenerRepository.delete({
workflowId,
eventName,
});
const workflowEventListener = await workflowEventListenerRepository.create({
workflowId,
eventName,
});
await workflowEventListenerRepository.save(workflowEventListener);
const workspaceDataSource =
await this.twentyORMGlobalManager.getDataSourceForWorkspace(workspaceId);
const workflowRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowWorkspaceEntity>(
workspaceId,
'workflow',
);
await workspaceDataSource?.transaction(async (transactionManager) => {
// TODO: Use upsert when available for workspace entities
await workflowEventListenerRepository.delete(
{
workflowId,
eventName,
},
transactionManager,
);
await workflowEventListenerRepository.save(
workflowEventListener,
{},
transactionManager,
);
await workflowRepository.update(
{ id: workflowId },
{ publishedVersionId: workflowVersionId },
transactionManager,
);
});
}
}

View File

@ -1,8 +1,9 @@
import { Module } from '@nestjs/common';
import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module';
import { WorkflowTriggerModule } from 'src/modules/workflow/workflow-trigger/workflow-trigger.module';
@Module({
imports: [WorkflowRunnerModule],
imports: [WorkflowRunnerModule, WorkflowTriggerModule],
})
export class WorkflowModule {}