[messaging] remove partial sync retry and fix missing datasource error (#4371)

* [messaging] remove partial sync retry and fix missing datasource error

* revert

* fix

* add 429

* fix

* fix

* fix

* remove duplicate log

* fix cron pattern
This commit is contained in:
Weiko 2024-03-08 14:06:21 +01:00 committed by GitHub
parent d2e2e50d8a
commit 250bb6134e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 35 additions and 12 deletions

View File

@ -33,6 +33,7 @@ import { UserWorkspaceModule } from 'src/core/user-workspace/user-workspace.modu
import { StripeModule } from 'src/core/billing/stripe/stripe.module'; import { StripeModule } from 'src/core/billing/stripe/stripe.module';
import { Workspace } from 'src/core/workspace/workspace.entity'; import { Workspace } from 'src/core/workspace/workspace.entity';
import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity'; import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity';
import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity';
@Module({ @Module({
imports: [ imports: [
@ -51,6 +52,7 @@ import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity';
ThreadCleanerModule, ThreadCleanerModule,
TypeORMModule, TypeORMModule,
TypeOrmModule.forFeature([Workspace, FeatureFlagEntity], 'core'), TypeOrmModule.forFeature([Workspace, FeatureFlagEntity], 'core'),
TypeOrmModule.forFeature([DataSourceEntity], 'metadata'),
UserModule, UserModule,
UserWorkspaceModule, UserWorkspaceModule,
WorkspaceDataSourceModule, WorkspaceDataSourceModule,

View File

@ -1,7 +1,7 @@
import { Inject, Injectable, Logger } from '@nestjs/common'; import { Inject, Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm'; import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm'; import { Repository, In } from 'typeorm';
import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/message-queue-job.interface'; import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/message-queue-job.interface';
@ -13,6 +13,7 @@ import {
GmailPartialSyncJobData, GmailPartialSyncJobData,
GmailPartialSyncJob, GmailPartialSyncJob,
} from 'src/workspace/messaging/jobs/gmail-partial-sync.job'; } from 'src/workspace/messaging/jobs/gmail-partial-sync.job';
import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity';
@Injectable() @Injectable()
export class FetchAllWorkspacesMessagesJob export class FetchAllWorkspacesMessagesJob
@ -23,6 +24,8 @@ export class FetchAllWorkspacesMessagesJob
constructor( constructor(
@InjectRepository(Workspace, 'core') @InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>, private readonly workspaceRepository: Repository<Workspace>,
@InjectRepository(DataSourceEntity, 'metadata')
private readonly dataSourceRepository: Repository<DataSourceEntity>,
@Inject(MessageQueue.messagingQueue) @Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService, private readonly messageQueueService: MessageQueueService,
private readonly connectedAccountService: ConnectedAccountService, private readonly connectedAccountService: ConnectedAccountService,
@ -38,7 +41,17 @@ export class FetchAllWorkspacesMessagesJob
}) })
).map((workspace) => workspace.id); ).map((workspace) => workspace.id);
for (const workspaceId of workspaceIds) { const dataSources = await this.dataSourceRepository.find({
where: {
workspaceId: In(workspaceIds),
},
});
const workspaceIdsWithDataSources = new Set(
dataSources.map((dataSource) => dataSource.workspaceId),
);
for (const workspaceId of workspaceIdsWithDataSources) {
await this.fetchWorkspaceMessages(workspaceId); await this.fetchWorkspaceMessages(workspaceId);
} }
} }
@ -55,9 +68,6 @@ export class FetchAllWorkspacesMessagesJob
workspaceId, workspaceId,
connectedAccountId: connectedAccount.id, connectedAccountId: connectedAccount.id,
}, },
{
retryLimit: 2,
},
); );
} }
} catch (error) { } catch (error) {

View File

@ -56,9 +56,6 @@ export class GmailPartialSyncCommand extends CommandRunner {
workspaceId, workspaceId,
connectedAccountId: connectedAccount.id, connectedAccountId: connectedAccount.id,
}, },
{
retryLimit: 2,
},
); );
} }
} }

View File

@ -187,8 +187,6 @@ export class FetchMessagesByBatchesService {
const formattedResponse = Promise.all( const formattedResponse = Promise.all(
parsedResponses.map(async (message: GmailMessageParsedResponse) => { parsedResponses.map(async (message: GmailMessageParsedResponse) => {
if (message.error) { if (message.error) {
console.log('Error', message.error);
errors.push(message.error); errors.push(message.error);
return; return;

View File

@ -213,10 +213,26 @@ export class GmailPartialSyncService {
} }
if (errors.length) { if (errors.length) {
this.logger.error(
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId} during partial-sync: ${JSON.stringify(
errors,
null,
2,
)}`,
);
const errorsCanBeIgnored = errors.every((error) => error.code === 404);
const errorsShouldBeRetried = errors.some((error) => error.code === 429);
if (errorsShouldBeRetried) {
return;
}
if (!errorsCanBeIgnored) {
throw new Error( throw new Error(
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId} during partial-sync`, `Error fetching messages for ${connectedAccountId} in workspace ${workspaceId} during partial-sync`,
); );
} }
}
startTime = Date.now(); startTime = Date.now();
await this.connectedAccountService.updateLastSyncHistoryId( await this.connectedAccountService.updateLastSyncHistoryId(