From 0096e604891218f7d68c74ba41c52c8aa2dc8367 Mon Sep 17 00:00:00 2001 From: Weiko Date: Mon, 5 Feb 2024 17:15:11 +0100 Subject: [PATCH] [messaging] add cronjob for workspaces messages partial sync (#3800) * [messaging] add cronjob for workspaces messages partial sync * run cron every 10 minutes * use logger --- packages/twenty-server/src/command.module.ts | 4 ++ .../integrations/message-queue/jobs.module.ts | 7 ++ .../twenty-server/src/queue-worker.module.ts | 20 +----- ...ch-all-workspaces-messages.cron.command.ts | 29 ++++++++ ...ch-all-workspaces-messages.cron.command.ts | 28 ++++++++ ...ch-all-workspaces-messages.cron.pattern.ts | 1 + .../fetch-all-workspaces-messages.job.ts | 66 +++++++++++++++++++ .../messaging/jobs/gmail-full-sync.job.ts | 16 ++--- .../messaging/jobs/gmail-partial-sync.job.ts | 12 ++-- .../services/gmail-full-sync.service.ts | 6 +- .../services/gmail-partial-sync.service.ts | 12 +++- 11 files changed, 163 insertions(+), 38 deletions(-) create mode 100644 packages/twenty-server/src/workspace/cron/fetch-all-workspaces-messages/commands/start-fetch-all-workspaces-messages.cron.command.ts create mode 100644 packages/twenty-server/src/workspace/cron/fetch-all-workspaces-messages/commands/stop-fetch-all-workspaces-messages.cron.command.ts create mode 100644 packages/twenty-server/src/workspace/cron/fetch-all-workspaces-messages/fetch-all-workspaces-messages.cron.pattern.ts create mode 100644 packages/twenty-server/src/workspace/cron/fetch-all-workspaces-messages/fetch-all-workspaces-messages.job.ts diff --git a/packages/twenty-server/src/command.module.ts b/packages/twenty-server/src/command.module.ts index a99241a8d6..7a05919a14 100644 --- a/packages/twenty-server/src/command.module.ts +++ b/packages/twenty-server/src/command.module.ts @@ -6,6 +6,8 @@ import { StartCleanInactiveWorkspacesCronCommand } from 'src/workspace/cron/clea import { StopCleanInactiveWorkspacesCronCommand } from 'src/workspace/cron/clean-inactive-workspaces/commands/stop-clean-inactive-workspaces.cron.command'; import { CleanInactiveWorkspacesCommand } from 'src/workspace/cron/clean-inactive-workspaces/commands/clean-inactive-workspaces.command'; import { WorkspaceHealthCommandModule } from 'src/workspace/workspace-health/commands/workspace-health-command.module'; +import { StartFetchAllWorkspacesMessagesCronCommand } from 'src/workspace/cron/fetch-all-workspaces-messages/commands/start-fetch-all-workspaces-messages.cron.command'; +import { StopFetchAllWorkspacesMessagesCronCommand } from 'src/workspace/cron/fetch-all-workspaces-messages/commands/stop-fetch-all-workspaces-messages.cron.command'; import { AppModule } from './app.module'; @@ -23,6 +25,8 @@ import { WorkspaceMigrationRunnerCommandsModule } from './workspace/workspace-mi CleanInactiveWorkspacesCommand, WorkspaceHealthCommandModule, WorkspaceMigrationRunnerCommandsModule, + StartFetchAllWorkspacesMessagesCronCommand, + StopFetchAllWorkspacesMessagesCronCommand, ], }) export class CommandModule {} diff --git a/packages/twenty-server/src/integrations/message-queue/jobs.module.ts b/packages/twenty-server/src/integrations/message-queue/jobs.module.ts index 3faab9030b..f4e3865174 100644 --- a/packages/twenty-server/src/integrations/message-queue/jobs.module.ts +++ b/packages/twenty-server/src/integrations/message-queue/jobs.module.ts @@ -17,6 +17,8 @@ import { EmailSenderJob } from 'src/integrations/email/email-sender.job'; import { UserModule } from 'src/core/user/user.module'; import { EnvironmentModule } from 'src/integrations/environment/environment.module'; import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity'; +import { FetchAllWorkspacesMessagesJob } from 'src/workspace/cron/fetch-all-workspaces-messages/fetch-all-workspaces-messages.job'; +import { ConnectedAccountModule } from 'src/workspace/messaging/connected-account/connected-account.module'; @Module({ imports: [ @@ -30,6 +32,7 @@ import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity'; EnvironmentModule, TypeORMModule, TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), + ConnectedAccountModule, ], providers: [ { @@ -53,6 +56,10 @@ import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity'; useClass: CleanInactiveWorkspaceJob, }, { provide: EmailSenderJob.name, useClass: EmailSenderJob }, + { + provide: FetchAllWorkspacesMessagesJob.name, + useClass: FetchAllWorkspacesMessagesJob, + }, ], }) export class JobsModule { diff --git a/packages/twenty-server/src/queue-worker.module.ts b/packages/twenty-server/src/queue-worker.module.ts index 06fa839ebd..50d93ea625 100644 --- a/packages/twenty-server/src/queue-worker.module.ts +++ b/packages/twenty-server/src/queue-worker.module.ts @@ -1,27 +1,9 @@ import { Module } from '@nestjs/common'; -import { EnvironmentModule } from 'src/integrations/environment/environment.module'; -import { EnvironmentService } from 'src/integrations/environment/environment.service'; -import { LoggerModule } from 'src/integrations/logger/logger.module'; -import { loggerModuleFactory } from 'src/integrations/logger/logger.module-factory'; import { JobsModule } from 'src/integrations/message-queue/jobs.module'; -import { MessageQueueModule } from 'src/integrations/message-queue/message-queue.module'; -import { messageQueueModuleFactory } from 'src/integrations/message-queue/message-queue.module-factory'; import { IntegrationsModule } from 'src/integrations/integrations.module'; @Module({ - imports: [ - EnvironmentModule.forRoot({}), - LoggerModule.forRootAsync({ - useFactory: loggerModuleFactory, - inject: [EnvironmentService], - }), - MessageQueueModule.forRoot({ - useFactory: messageQueueModuleFactory, - inject: [EnvironmentService], - }), - JobsModule, - IntegrationsModule, - ], + imports: [IntegrationsModule, JobsModule], }) export class QueueWorkerModule {} diff --git a/packages/twenty-server/src/workspace/cron/fetch-all-workspaces-messages/commands/start-fetch-all-workspaces-messages.cron.command.ts b/packages/twenty-server/src/workspace/cron/fetch-all-workspaces-messages/commands/start-fetch-all-workspaces-messages.cron.command.ts new file mode 100644 index 0000000000..1e7f6bbb56 --- /dev/null +++ b/packages/twenty-server/src/workspace/cron/fetch-all-workspaces-messages/commands/start-fetch-all-workspaces-messages.cron.command.ts @@ -0,0 +1,29 @@ +import { Inject } from '@nestjs/common'; + +import { Command, CommandRunner } from 'nest-commander'; + +import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; +import { fetchAllWorkspacesMessagesCronPattern } from 'src/workspace/cron/fetch-all-workspaces-messages/fetch-all-workspaces-messages.cron.pattern'; +import { FetchAllWorkspacesMessagesJob } from 'src/workspace/cron/fetch-all-workspaces-messages/fetch-all-workspaces-messages.job'; + +@Command({ + name: 'fetch-all-workspaces-messages:cron:start', + description: 'Starts a cron job to fetch all workspaces messages', +}) +export class StartFetchAllWorkspacesMessagesCronCommand extends CommandRunner { + constructor( + @Inject(MessageQueue.cronQueue) + private readonly messageQueueService: MessageQueueService, + ) { + super(); + } + + async run(): Promise { + await this.messageQueueService.addCron( + FetchAllWorkspacesMessagesJob.name, + undefined, + fetchAllWorkspacesMessagesCronPattern, + ); + } +} diff --git a/packages/twenty-server/src/workspace/cron/fetch-all-workspaces-messages/commands/stop-fetch-all-workspaces-messages.cron.command.ts b/packages/twenty-server/src/workspace/cron/fetch-all-workspaces-messages/commands/stop-fetch-all-workspaces-messages.cron.command.ts new file mode 100644 index 0000000000..c8589785c9 --- /dev/null +++ b/packages/twenty-server/src/workspace/cron/fetch-all-workspaces-messages/commands/stop-fetch-all-workspaces-messages.cron.command.ts @@ -0,0 +1,28 @@ +import { Inject } from '@nestjs/common'; + +import { Command, CommandRunner } from 'nest-commander'; + +import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; +import { fetchAllWorkspacesMessagesCronPattern } from 'src/workspace/cron/fetch-all-workspaces-messages/fetch-all-workspaces-messages.cron.pattern'; +import { FetchAllWorkspacesMessagesJob } from 'src/workspace/cron/fetch-all-workspaces-messages/fetch-all-workspaces-messages.job'; + +@Command({ + name: 'fetch-all-workspaces-messages:cron:stop', + description: 'Stops the fetch all workspaces messages cron job', +}) +export class StopFetchAllWorkspacesMessagesCronCommand extends CommandRunner { + constructor( + @Inject(MessageQueue.cronQueue) + private readonly messageQueueService: MessageQueueService, + ) { + super(); + } + + async run(): Promise { + await this.messageQueueService.removeCron( + FetchAllWorkspacesMessagesJob.name, + fetchAllWorkspacesMessagesCronPattern, + ); + } +} diff --git a/packages/twenty-server/src/workspace/cron/fetch-all-workspaces-messages/fetch-all-workspaces-messages.cron.pattern.ts b/packages/twenty-server/src/workspace/cron/fetch-all-workspaces-messages/fetch-all-workspaces-messages.cron.pattern.ts new file mode 100644 index 0000000000..99d9acaa7f --- /dev/null +++ b/packages/twenty-server/src/workspace/cron/fetch-all-workspaces-messages/fetch-all-workspaces-messages.cron.pattern.ts @@ -0,0 +1 @@ +export const fetchAllWorkspacesMessagesCronPattern = '*/10 * * * *'; diff --git a/packages/twenty-server/src/workspace/cron/fetch-all-workspaces-messages/fetch-all-workspaces-messages.job.ts b/packages/twenty-server/src/workspace/cron/fetch-all-workspaces-messages/fetch-all-workspaces-messages.job.ts new file mode 100644 index 0000000000..ec46c73513 --- /dev/null +++ b/packages/twenty-server/src/workspace/cron/fetch-all-workspaces-messages/fetch-all-workspaces-messages.job.ts @@ -0,0 +1,66 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; + +import { Repository } from 'typeorm'; + +import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/message-queue-job.interface'; + +import { + FeatureFlagEntity, + FeatureFlagKeys, +} from 'src/core/feature-flag/feature-flag.entity'; +import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; +import { ConnectedAccountService } from 'src/workspace/messaging/connected-account/connected-account.service'; +import { + GmailPartialSyncJobData, + GmailPartialSyncJob, +} from 'src/workspace/messaging/jobs/gmail-partial-sync.job'; + +@Injectable() +export class FetchAllWorkspacesMessagesJob + implements MessageQueueJob +{ + constructor( + @InjectRepository(FeatureFlagEntity, 'core') + private readonly featureFlagRepository: Repository, + @Inject(MessageQueue.messagingQueue) + private readonly messageQueueService: MessageQueueService, + private readonly connectedAccountService: ConnectedAccountService, + ) {} + + async handle(): Promise { + const featureFlagsWithMessagingEnabled = + await this.featureFlagRepository.findBy({ + key: FeatureFlagKeys.IsMessagingEnabled, + value: true, + }); + + const workspaceIds = featureFlagsWithMessagingEnabled.map( + (featureFlag) => featureFlag.workspaceId, + ); + + for (const workspaceId of workspaceIds) { + await this.fetchWorkspaceMessages(workspaceId); + } + } + + private async fetchWorkspaceMessages(workspaceId: string): Promise { + const connectedAccounts = + await this.connectedAccountService.getAll(workspaceId); + + for (const connectedAccount of connectedAccounts) { + await this.messageQueueService.add( + GmailPartialSyncJob.name, + { + workspaceId, + connectedAccountId: connectedAccount.id, + }, + { + id: `${workspaceId}-${connectedAccount.id}`, + retryLimit: 2, + }, + ); + } + } +} diff --git a/packages/twenty-server/src/workspace/messaging/jobs/gmail-full-sync.job.ts b/packages/twenty-server/src/workspace/messaging/jobs/gmail-full-sync.job.ts index 4b565a2c1f..3ab9f5bb8e 100644 --- a/packages/twenty-server/src/workspace/messaging/jobs/gmail-full-sync.job.ts +++ b/packages/twenty-server/src/workspace/messaging/jobs/gmail-full-sync.job.ts @@ -1,8 +1,7 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/message-queue-job.interface'; -import { EnvironmentService } from 'src/integrations/environment/environment.service'; import { GmailRefreshAccessTokenService } from 'src/workspace/messaging/services/gmail-refresh-access-token.service'; import { GmailFullSyncService } from 'src/workspace/messaging/services/gmail-full-sync.service'; @@ -14,26 +13,25 @@ export type GmailFullSyncJobData = { @Injectable() export class GmailFullSyncJob implements MessageQueueJob { + private readonly logger = new Logger(GmailFullSyncJob.name); + constructor( - private readonly environmentService: EnvironmentService, private readonly gmailRefreshAccessTokenService: GmailRefreshAccessTokenService, - private readonly fetchWorkspaceMessagesService: GmailFullSyncService, + private readonly gmailFullSyncService: GmailFullSyncService, ) {} async handle(data: GmailFullSyncJobData): Promise { - console.log( + this.logger.log( `gmail full-sync for workspace ${data.workspaceId} and account ${ data.connectedAccountId - } ${ - data.nextPageToken ? `and ${data.nextPageToken} pageToken` : '' - } with ${this.environmentService.getMessageQueueDriverType()}`, + } ${data.nextPageToken ? `and ${data.nextPageToken} pageToken` : ''}`, ); await this.gmailRefreshAccessTokenService.refreshAndSaveAccessToken( data.workspaceId, data.connectedAccountId, ); - await this.fetchWorkspaceMessagesService.fetchConnectedAccountThreads( + await this.gmailFullSyncService.fetchConnectedAccountThreads( data.workspaceId, data.connectedAccountId, data.nextPageToken, diff --git a/packages/twenty-server/src/workspace/messaging/jobs/gmail-partial-sync.job.ts b/packages/twenty-server/src/workspace/messaging/jobs/gmail-partial-sync.job.ts index f552c7f1d4..f8169517f7 100644 --- a/packages/twenty-server/src/workspace/messaging/jobs/gmail-partial-sync.job.ts +++ b/packages/twenty-server/src/workspace/messaging/jobs/gmail-partial-sync.job.ts @@ -1,8 +1,7 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/message-queue-job.interface'; -import { EnvironmentService } from 'src/integrations/environment/environment.service'; import { GmailRefreshAccessTokenService } from 'src/workspace/messaging/services/gmail-refresh-access-token.service'; import { GmailPartialSyncService } from 'src/workspace/messaging/services/gmail-partial-sync.service'; @@ -15,17 +14,16 @@ export type GmailPartialSyncJobData = { export class GmailPartialSyncJob implements MessageQueueJob { + private readonly logger = new Logger(GmailPartialSyncJob.name); + constructor( - private readonly environmentService: EnvironmentService, private readonly gmailRefreshAccessTokenService: GmailRefreshAccessTokenService, private readonly gmailPartialSyncService: GmailPartialSyncService, ) {} async handle(data: GmailPartialSyncJobData): Promise { - console.log( - `gmail partial-sync for workspace ${data.workspaceId} and account ${ - data.connectedAccountId - } with ${this.environmentService.getMessageQueueDriverType()}`, + this.logger.log( + `gmail partial-sync for workspace ${data.workspaceId} and account ${data.connectedAccountId}`, ); await this.gmailRefreshAccessTokenService.refreshAndSaveAccessToken( data.workspaceId, diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts index 41ed2598c0..019b963b39 100644 --- a/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts @@ -1,4 +1,4 @@ -import { Inject, Injectable } from '@nestjs/common'; +import { Inject, Injectable, Logger } from '@nestjs/common'; import { FetchMessagesByBatchesService } from 'src/workspace/messaging/services/fetch-messages-by-batches.service'; import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider'; @@ -16,6 +16,8 @@ import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/w @Injectable() export class GmailFullSyncService { + private readonly logger = new Logger(GmailFullSyncService.name); + constructor( private readonly gmailClientProvider: GmailClientProvider, private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService, @@ -135,7 +137,7 @@ export class GmailFullSyncService { workspaceId, ); - console.log( + this.logger.log( `gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} ${ nextPageToken ? `and ${nextPageToken} pageToken` : '' }done.`, diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts index 40a9b9bd98..aadc430b47 100644 --- a/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts @@ -1,4 +1,4 @@ -import { Inject, Injectable } from '@nestjs/common'; +import { Inject, Injectable, Logger } from '@nestjs/common'; import { gmail_v1 } from 'googleapis'; @@ -17,6 +17,8 @@ import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/w @Injectable() export class GmailPartialSyncService { + private readonly logger = new Logger(GmailPartialSyncService.name); + constructor( private readonly gmailClientProvider: GmailClientProvider, private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService, @@ -77,6 +79,10 @@ export class GmailPartialSyncService { } if (newHistoryId === lastSyncHistoryId) { + this.logger.log( + `gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to update.`, + ); + return; } @@ -127,6 +133,10 @@ export class GmailPartialSyncService { connectedAccount.id, workspaceId, ); + + this.logger.log( + `gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId} done.`, + ); } private async getMessageIdsFromHistory(