6687 change messaging import cron job to run every minute (#6704)

Closes #6687
This commit is contained in:
Raphaël Bosi 2024-08-22 17:51:08 +02:00 committed by GitHub
parent 9898ca3e53
commit 579c2ebcea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 15 additions and 19 deletions

View File

@ -5,6 +5,8 @@ import { MessageQueue } from 'src/engine/integrations/message-queue/message-queu
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { MessagingMessagesImportCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job'; import { MessagingMessagesImportCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job';
const MESSAGING_MESSAGES_IMPORT_CRON_PATTERN = '*/1 * * * *';
@Command({ @Command({
name: 'cron:messaging:messages-import', name: 'cron:messaging:messages-import',
description: 'Starts a cron job to fetch all messages from cache', description: 'Starts a cron job to fetch all messages from cache',
@ -23,7 +25,7 @@ export class MessagingMessagesImportCronCommand extends CommandRunner {
undefined, undefined,
{ {
repeat: { repeat: {
every: 30000, pattern: MESSAGING_MESSAGES_IMPORT_CRON_PATTERN,
}, },
}, },
); );

View File

@ -1,4 +1,3 @@
import { Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm'; import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm'; import { Repository } from 'typeorm';
@ -24,8 +23,6 @@ import {
@Processor(MessageQueue.cronQueue) @Processor(MessageQueue.cronQueue)
export class MessagingMessagesImportCronJob { export class MessagingMessagesImportCronJob {
private readonly logger = new Logger(MessagingMessagesImportCronJob.name);
constructor( constructor(
@InjectRepository(Workspace, 'core') @InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>, private readonly workspaceRepository: Repository<Workspace>,
@ -51,15 +48,13 @@ export class MessagingMessagesImportCronJob {
); );
const messageChannels = await messageChannelRepository.find({ const messageChannels = await messageChannelRepository.find({
select: ['id', 'isSyncEnabled', 'syncStage'], where: {
isSyncEnabled: true,
syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_PENDING,
},
}); });
for (const messageChannel of messageChannels) { for (const messageChannel of messageChannels) {
if (
messageChannel.isSyncEnabled &&
messageChannel.syncStage ===
MessageChannelSyncStage.MESSAGES_IMPORT_PENDING
) {
await this.messageQueueService.add<MessagingMessagesImportJobData>( await this.messageQueueService.add<MessagingMessagesImportJobData>(
MessagingMessagesImportJob.name, MessagingMessagesImportJob.name,
{ {
@ -69,7 +64,6 @@ export class MessagingMessagesImportCronJob {
); );
} }
} }
}
console.timeEnd('MessagingMessagesImportCronJob time'); console.timeEnd('MessagingMessagesImportCronJob time');
} }

View File

@ -1 +1 @@
export const MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE = 100; export const MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE = 200;