From e6a55f270bac9838e77b92d06e96f1eba3420bf8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Bosi?= <71827178+bosiraphael@users.noreply.github.com> Date: Tue, 27 Aug 2024 19:13:15 +0200 Subject: [PATCH] 5617 Create CalendarOngoingStaleCron Job (#6748) Closes #5617 --------- Co-authored-by: Charles Bochet --- .../calendar-event-import-manager.module.ts | 6 ++ ...ar-import-ongoing-sync-timeout.constant.ts | 1 + .../calendar-ongoing-stale.cron.command.ts | 32 ++++++++ .../jobs/calendar-ongoing-stale.cron.job.ts | 55 +++++++++++++ .../jobs/calendar-ongoing-stale.job.ts | 78 +++++++++++++++++++ .../calendar-channel-sync-status.service.ts | 11 +++ .../utils/is-sync-stale.util.ts | 13 ++++ .../message-channel-sync-status.service.ts | 14 ++++ .../jobs/messaging-ongoing-stale.job.ts | 4 + 9 files changed, 214 insertions(+) create mode 100644 packages/twenty-server/src/modules/calendar/calendar-event-import-manager/constants/calendar-import-ongoing-sync-timeout.constant.ts create mode 100644 packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-ongoing-stale.cron.command.ts create mode 100644 packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-ongoing-stale.cron.job.ts create mode 100644 packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-ongoing-stale.job.ts create mode 100644 packages/twenty-server/src/modules/calendar/calendar-event-import-manager/utils/is-sync-stale.util.ts diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts index a4b0fc43a5..1e2ff015a3 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts @@ -10,9 +10,12 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; import { CalendarEventCleanerModule } from 'src/modules/calendar/calendar-event-cleaner/calendar-event-cleaner.module'; import { CalendarEventListFetchCronCommand } from 'src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-event-list-fetch.cron.command'; +import { CalendarOngoingStaleCronCommand } from 'src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-ongoing-stale.cron.command'; import { CalendarEventListFetchCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job'; +import { CalendarOngoingStaleCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-ongoing-stale.cron.job'; import { GoogleCalendarDriverModule } from 'src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/google-calendar-driver.module'; import { CalendarEventListFetchJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job'; +import { CalendarOngoingStaleJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-ongoing-stale.job'; import { CalendarChannelSyncStatusService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service'; import { CalendarEventImportErrorHandlerService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service'; import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service'; @@ -51,6 +54,9 @@ import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/sta CalendarEventListFetchCronJob, CalendarEventListFetchCronCommand, CalendarEventListFetchJob, + CalendarOngoingStaleCronJob, + CalendarOngoingStaleCronCommand, + CalendarOngoingStaleJob, ], exports: [CalendarEventsImportService], }) diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/constants/calendar-import-ongoing-sync-timeout.constant.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/constants/calendar-import-ongoing-sync-timeout.constant.ts new file mode 100644 index 0000000000..c99aa82535 --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/constants/calendar-import-ongoing-sync-timeout.constant.ts @@ -0,0 +1 @@ +export const CALENDAR_IMPORT_ONGOING_SYNC_TIMEOUT = 1000 * 60 * 60; // 1 hour diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-ongoing-stale.cron.command.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-ongoing-stale.cron.command.ts new file mode 100644 index 0000000000..3164e0a760 --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-ongoing-stale.cron.command.ts @@ -0,0 +1,32 @@ +import { Command, CommandRunner } from 'nest-commander'; + +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { CalendarOngoingStaleCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-ongoing-stale.cron.job'; + +const CALENDAR_ONGOING_STALE_CRON_PATTERN = '0 * * * *'; + +@Command({ + name: 'cron:calendar:ongoing-stale', + description: + 'Starts a cron job to check for stale ongoing calendar event imports and put them back to pending', +}) +export class CalendarOngoingStaleCronCommand extends CommandRunner { + constructor( + @InjectMessageQueue(MessageQueue.cronQueue) + private readonly messageQueueService: MessageQueueService, + ) { + super(); + } + + async run(): Promise { + await this.messageQueueService.addCron( + CalendarOngoingStaleCronJob.name, + undefined, + { + repeat: { pattern: CALENDAR_ONGOING_STALE_CRON_PATTERN }, + }, + ); + } +} diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-ongoing-stale.cron.job.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-ongoing-stale.cron.job.ts new file mode 100644 index 0000000000..d3e5370b52 --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-ongoing-stale.cron.job.ts @@ -0,0 +1,55 @@ +import { InjectRepository } from '@nestjs/typeorm'; + +import { Repository } from 'typeorm'; + +import { + Workspace, + WorkspaceActivationStatus, +} from 'src/engine/core-modules/workspace/workspace.entity'; +import { ExceptionHandlerService } from 'src/engine/integrations/exception-handler/exception-handler.service'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { + CalendarOngoingStaleJob, + CalendarOngoingStaleJobData, +} from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-ongoing-stale.job'; + +@Processor(MessageQueue.cronQueue) +export class CalendarOngoingStaleCronJob { + constructor( + @InjectRepository(Workspace, 'core') + private readonly workspaceRepository: Repository, + @InjectMessageQueue(MessageQueue.calendarQueue) + private readonly messageQueueService: MessageQueueService, + private readonly exceptionHandlerService: ExceptionHandlerService, + ) {} + + @Process(CalendarOngoingStaleCronJob.name) + async handle(): Promise { + const activeWorkspaces = await this.workspaceRepository.find({ + where: { + activationStatus: WorkspaceActivationStatus.ACTIVE, + }, + }); + + for (const activeWorkspace of activeWorkspaces) { + try { + await this.messageQueueService.add( + CalendarOngoingStaleJob.name, + { + workspaceId: activeWorkspace.id, + }, + ); + } catch (error) { + this.exceptionHandlerService.captureExceptions([error], { + user: { + workspaceId: activeWorkspace.id, + }, + }); + } + } + } +} diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-ongoing-stale.job.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-ongoing-stale.job.ts new file mode 100644 index 0000000000..bcf5805d14 --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-ongoing-stale.job.ts @@ -0,0 +1,78 @@ +import { Logger, Scope } from '@nestjs/common'; + +import { In } from 'typeorm'; + +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { CalendarChannelSyncStatusService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service'; +import { isSyncStale } from 'src/modules/calendar/calendar-event-import-manager/utils/is-sync-stale.util'; +import { + CalendarChannelSyncStage, + CalendarChannelWorkspaceEntity, +} from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; + +export type CalendarOngoingStaleJobData = { + workspaceId: string; +}; + +@Processor({ + queueName: MessageQueue.messagingQueue, + scope: Scope.REQUEST, +}) +export class CalendarOngoingStaleJob { + private readonly logger = new Logger(CalendarOngoingStaleJob.name); + constructor( + private readonly twentyORMManager: TwentyORMManager, + private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService, + ) {} + + @Process(CalendarOngoingStaleJob.name) + async handle(data: CalendarOngoingStaleJobData): Promise { + const { workspaceId } = data; + + const calendarChannelRepository = + await this.twentyORMManager.getRepository( + 'calendarChannel', + ); + + const calendarChannels = await calendarChannelRepository.find({ + where: { + syncStage: In([ + CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_ONGOING, + CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_ONGOING, + ]), + }, + }); + + for (const calendarChannel of calendarChannels) { + if ( + calendarChannel.syncStageStartedAt && + isSyncStale(calendarChannel.syncStageStartedAt) + ) { + this.logger.log( + `Sync for calendar channel ${calendarChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to pending`, + ); + await this.calendarChannelSyncStatusService.resetSyncStageStartedAt( + calendarChannel.id, + ); + + switch (calendarChannel.syncStage) { + case CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_ONGOING: + await this.calendarChannelSyncStatusService.schedulePartialCalendarEventListFetch( + calendarChannel.id, + ); + break; + case CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_ONGOING: + await this.calendarChannelSyncStatusService.scheduleCalendarEventsImport( + calendarChannel.id, + ); + break; + default: + break; + } + } + } + } +} diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service.ts index 0355dde325..7bcc8a99d1 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service.ts @@ -87,6 +87,17 @@ export class CalendarChannelSyncStatusService { await this.scheduleFullCalendarEventListFetch(calendarChannelId); } + public async resetSyncStageStartedAt(calendarChannelId: string) { + const calendarChannelRepository = + await this.twentyORMManager.getRepository( + 'calendarChannel', + ); + + await calendarChannelRepository.update(calendarChannelId, { + syncStageStartedAt: null, + }); + } + public async scheduleCalendarEventsImport(calendarChannelId: string) { const calendarChannelRepository = await this.twentyORMManager.getRepository( diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/utils/is-sync-stale.util.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/utils/is-sync-stale.util.ts new file mode 100644 index 0000000000..a204ae83e2 --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/utils/is-sync-stale.util.ts @@ -0,0 +1,13 @@ +import { CALENDAR_IMPORT_ONGOING_SYNC_TIMEOUT } from 'src/modules/calendar/calendar-event-import-manager/constants/calendar-import-ongoing-sync-timeout.constant'; + +export const isSyncStale = (syncStageStartedAt: string): boolean => { + const syncStageStartedTime = new Date(syncStageStartedAt).getTime(); + + if (isNaN(syncStageStartedTime)) { + throw new Error('Invalid date format'); + } + + return ( + Date.now() - syncStageStartedTime > CALENDAR_IMPORT_ONGOING_SYNC_TIMEOUT + ); +}; diff --git a/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts b/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts index 9c6eeb188c..6be3ad36a6 100644 --- a/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts +++ b/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts @@ -93,6 +93,20 @@ export class MessageChannelSyncStatusService { await this.scheduleFullMessageListFetch(messageChannelId); } + public async resetSyncStageStartedAt(messageChannelId: string) { + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + await messageChannelRepository.update( + { id: messageChannelId }, + { + syncStageStartedAt: null, + }, + ); + } + public async markAsMessagesListFetchOngoing(messageChannelId: string) { const messageChannelRepository = await this.twentyORMManager.getRepository( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts index 0388005c96..a2d47cc97e 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts @@ -55,6 +55,10 @@ export class MessagingOngoingStaleJob { `Sync for message channel ${messageChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to MESSAGES_IMPORT_PENDING`, ); + await this.messagingChannelSyncStatusService.resetSyncStageStartedAt( + messageChannel.id, + ); + switch (messageChannel.syncStage) { case MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING: await this.messageChannelSyncStatusService.schedulePartialMessageListFetch(