4017 improve queries on messages write (#4207)

* modify code to reduce nested loops and improve performances

* is working

* fix lastSyncHistoryId

* create new service to share it betweent partial sync and full sync

* update partial sync

* update batch limit

* renaming

* adding logs

* update logs

* update logs

* update logs

* delete messages if error while saving the participants

* refactoring

* improving logs

* update logs

* delete historyId if outdated
This commit is contained in:
bosiraphael 2024-02-27 16:06:19 +01:00 committed by GitHub
parent 16fe79b044
commit a19de71fad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 312 additions and 71 deletions

View File

@ -25,6 +25,7 @@ import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity';
import { CreateCompaniesAndContactsModule } from 'src/workspace/messaging/services/create-companies-and-contacts/create-companies-and-contacts.module';
import { CompanyModule } from 'src/workspace/messaging/repositories/company/company.module';
import { PersonModule } from 'src/workspace/messaging/repositories/person/person.module';
import { SaveMessagesAndCreateContactsService } from 'src/workspace/messaging/services/save-messages-and-create-contacts.service';
import { MessagingConnectedAccountListener } from 'src/workspace/messaging/listeners/messaging-connected-account.listener';
@Module({
imports: [
@ -54,6 +55,7 @@ import { MessagingConnectedAccountListener } from 'src/workspace/messaging/liste
MessagingWorkspaceMemberListener,
MessagingMessageChannelListener,
MessageService,
SaveMessagesAndCreateContactsService,
MessagingConnectedAccountListener,
],
exports: [

View File

@ -83,6 +83,41 @@ export class ConnectedAccountService {
);
}
public async updateLastSyncHistoryIdIfHigher(
historyId: string,
connectedAccountId: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."connectedAccount" SET "lastSyncHistoryId" = $1
WHERE "id" = $2
AND ("lastSyncHistoryId" < $1 OR "lastSyncHistoryId" = '')`,
[historyId, connectedAccountId],
workspaceId,
transactionManager,
);
}
public async deleteHistoryId(
connectedAccountId: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."connectedAccount" SET "lastSyncHistoryId" = '' WHERE "id" = $1`,
[connectedAccountId],
workspaceId,
transactionManager,
);
}
public async updateAccessToken(
accessToken: string,
connectedAccountId: string,

View File

@ -7,7 +7,7 @@ import { MessageParticipantObjectMetadata } from 'src/workspace/workspace-sync-m
import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record';
import {
ParticipantWithId,
Participant,
ParticipantWithMessageId,
} from 'src/workspace/messaging/types/gmail-message';
import { PersonService } from 'src/workspace/messaging/repositories/person/person.service';
@ -138,8 +138,7 @@ export class MessageParticipantService {
}
public async saveMessageParticipants(
participants: Participant[],
messageId: string,
participants: ParticipantWithMessageId[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
@ -169,7 +168,7 @@ export class MessageParticipantService {
);
const messageParticipantsToSave = participants.map((participant) => [
messageId,
participant.messageId,
participant.role,
participant.handle,
participant.displayName,

View File

@ -122,7 +122,9 @@ export class MessageService {
connectedAccount: ObjectRecord<ConnectedAccountObjectMetadata>,
gmailMessageChannelId: string,
workspaceId: string,
) {
): Promise<Map<string, string>> {
const messageExternalIdsAndIdsMap = new Map<string, string>();
for (const message of messages) {
if (this.shouldSkipImport(message)) {
continue;
@ -159,6 +161,11 @@ export class MessageService {
manager,
);
messageExternalIdsAndIdsMap.set(
message.externalId,
savedOrExistingMessageId,
);
await manager.query(
`INSERT INTO ${dataSourceMetadata.schema}."messageChannelMessageAssociation" ("messageChannelId", "messageId", "messageExternalId", "messageThreadId", "messageThreadExternalId") VALUES ($1, $2, $3, $4, $5)`,
[
@ -171,6 +178,8 @@ export class MessageService {
);
});
}
return messageExternalIdsAndIdsMap;
}
private shouldSkipImport(message: GmailMessage): boolean {
@ -216,53 +225,19 @@ export class MessageService {
],
);
const isContactAutoCreationEnabled =
await this.messageChannelService.getIsContactAutoCreationEnabledByConnectedAccountIdOrFail(
connectedAccount.id,
workspaceId,
);
if (isContactAutoCreationEnabled && messageDirection === 'outgoing') {
await this.createCompaniesAndContactsService.createCompaniesAndContacts(
connectedAccount.handle,
message.participants,
workspaceId,
manager,
);
const handles = message.participants.map(
(participant) => participant.handle,
);
const messageParticipantsWithoutPersonIdAndWorkspaceMemberId =
await this.messageParticipantService.getByHandlesWithoutPersonIdAndWorkspaceMemberId(
handles,
workspaceId,
);
await this.messageParticipantService.updateMessageParticipantsAfterPeopleCreation(
messageParticipantsWithoutPersonIdAndWorkspaceMemberId,
workspaceId,
manager,
);
}
await this.messageParticipantService.saveMessageParticipants(
message.participants,
newMessageId,
workspaceId,
manager,
);
return Promise.resolve(newMessageId);
}
public async deleteMessages(
workspaceDataSource: DataSource,
messagesDeletedMessageExternalIds: string[],
gmailMessageChannelId: string,
workspaceId: string,
) {
const workspaceDataSource =
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
workspaceId,
);
await workspaceDataSource?.transaction(async (manager: EntityManager) => {
const messageChannelMessageAssociationsToDelete =
await this.messageChannelMessageAssociationService.getByMessageExternalIdsAndMessageChannelId(

View File

@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import axios, { AxiosInstance, AxiosResponse } from 'axios';
import { simpleParser, AddressObject } from 'mailparser';
@ -14,6 +14,7 @@ import { GmailMessageParsedResponse } from 'src/workspace/messaging/types/gmail-
@Injectable()
export class FetchMessagesByBatchesService {
private readonly httpService: AxiosInstance;
private readonly logger = new Logger(FetchMessagesByBatchesService.name);
constructor() {
this.httpService = axios.create({
@ -24,14 +25,38 @@ export class FetchMessagesByBatchesService {
async fetchAllMessages(
queries: MessageQuery[],
accessToken: string,
jobName?: string,
workspaceId?: string,
connectedAccountId?: string,
): Promise<{ messages: GmailMessage[]; errors: any[] }> {
let startTime = Date.now();
const batchResponses = await this.fetchAllByBatches(
queries,
accessToken,
'batch_gmail_messages',
);
let endTime = Date.now();
return this.formatBatchResponsesAsGmailMessages(batchResponses);
this.logger.log(
`${jobName} for workspace ${workspaceId} and account ${connectedAccountId} fetching ${
queries.length
} messages in ${endTime - startTime}ms`,
);
startTime = Date.now();
const formattedResponse =
await this.formatBatchResponsesAsGmailMessages(batchResponses);
endTime = Date.now();
this.logger.log(
`${jobName} for workspace ${workspaceId} and account ${connectedAccountId} formatting ${
queries.length
} messages in ${endTime - startTime}ms`,
);
return formattedResponse;
}
async fetchAllByBatches(
@ -39,7 +64,7 @@ export class FetchMessagesByBatchesService {
accessToken: string,
boundary: string,
): Promise<AxiosResponse<any, any>[]> {
const batchLimit = 100;
const batchLimit = 50;
let batchOffset = 0;

View File

@ -11,9 +11,8 @@ import {
import { ConnectedAccountService } from 'src/workspace/messaging/repositories/connected-account/connected-account.service';
import { MessageChannelService } from 'src/workspace/messaging/repositories/message-channel/message-channel.service';
import { MessageChannelMessageAssociationService } from 'src/workspace/messaging/repositories/message-channel-message-association/message-channel-message-association.service';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import { MessageService } from 'src/workspace/messaging/repositories/message/message.service';
import { createQueriesFromMessageIds } from 'src/workspace/messaging/utils/create-queries-from-message-ids.util';
import { SaveMessagesAndCreateContactsService } from 'src/workspace/messaging/services/save-messages-and-create-contacts.service';
@Injectable()
export class GmailFullSyncService {
@ -24,11 +23,10 @@ export class GmailFullSyncService {
private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService,
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly connectedAccountService: ConnectedAccountService,
private readonly messageChannelService: MessageChannelService,
private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService,
private readonly messageService: MessageService,
private readonly saveMessagesAndCreateContactsService: SaveMessagesAndCreateContactsService,
) {}
public async fetchConnectedAccountThreads(
@ -36,11 +34,6 @@ export class GmailFullSyncService {
connectedAccountId: string,
nextPageToken?: string,
): Promise<void> {
const { dataSource: workspaceDataSource, dataSourceMetadata } =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId,
);
const connectedAccount = await this.connectedAccountService.getByIdOrFail(
connectedAccountId,
workspaceId,
@ -64,12 +57,22 @@ export class GmailFullSyncService {
const gmailClient =
await this.gmailClientProvider.getGmailClient(refreshToken);
let startTime = Date.now();
const messages = await gmailClient.users.messages.list({
userId: 'me',
maxResults: 500,
pageToken: nextPageToken,
});
let endTime = Date.now();
this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} getting messages list in ${
endTime - startTime
}ms.`,
);
const messagesData = messages.data.messages;
const messageExternalIds = messagesData
@ -80,6 +83,8 @@ export class GmailFullSyncService {
return;
}
startTime = Date.now();
const existingMessageChannelMessageAssociations =
await this.messageChannelMessageAssociationService.getByMessageExternalIdsAndMessageChannelId(
messageExternalIds,
@ -87,6 +92,14 @@ export class GmailFullSyncService {
workspaceId,
);
endTime = Date.now();
this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}: getting existing message channel message associations in ${
endTime - startTime
}ms.`,
);
const existingMessageChannelMessageAssociationsExternalIds =
existingMessageChannelMessageAssociations.map(
(messageChannelMessageAssociation) =>
@ -102,13 +115,28 @@ export class GmailFullSyncService {
const messageQueries = createQueriesFromMessageIds(messagesToFetch);
startTime = Date.now();
const { messages: messagesToSave, errors } =
await this.fetchMessagesByBatchesService.fetchAllMessages(
messageQueries,
accessToken,
'gmail full-sync',
workspaceId,
connectedAccountId,
);
endTime = Date.now();
this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}: fetching all messages in ${
endTime - startTime
}ms.`,
);
if (messagesToSave.length === 0) {
if (errors.length) throw new Error('Error fetching messages');
this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to import.`,
);
@ -116,13 +144,12 @@ export class GmailFullSyncService {
return;
}
await this.messageService.saveMessages(
this.saveMessagesAndCreateContactsService.saveMessagesAndCreateContacts(
messagesToSave,
dataSourceMetadata,
workspaceDataSource,
connectedAccount,
gmailMessageChannelId,
workspaceId,
gmailMessageChannelId,
'gmail full-sync',
);
if (errors.length) throw new Error('Error fetching messages');
@ -135,12 +162,22 @@ export class GmailFullSyncService {
if (!historyId) throw new Error('No history id found');
await this.connectedAccountService.updateLastSyncHistoryId(
startTime = Date.now();
await this.connectedAccountService.updateLastSyncHistoryIdIfHigher(
historyId,
connectedAccount.id,
workspaceId,
);
endTime = Date.now();
this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}: updating last sync history id in ${
endTime - startTime
}ms.`,
);
this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} ${
nextPageToken ? `and ${nextPageToken} pageToken` : ''

View File

@ -15,6 +15,7 @@ import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/w
import { MessageChannelService } from 'src/workspace/messaging/repositories/message-channel/message-channel.service';
import { MessageService } from 'src/workspace/messaging/repositories/message/message.service';
import { createQueriesFromMessageIds } from 'src/workspace/messaging/utils/create-queries-from-message-ids.util';
import { SaveMessagesAndCreateContactsService } from 'src/workspace/messaging/services/save-messages-and-create-contacts.service';
@Injectable()
export class GmailPartialSyncService {
@ -29,6 +30,7 @@ export class GmailPartialSyncService {
private readonly connectedAccountService: ConnectedAccountService,
private readonly messageChannelService: MessageChannelService,
private readonly messageService: MessageService,
private readonly saveMessagesAndCreateContactsService: SaveMessagesAndCreateContactsService,
) {}
public async fetchConnectedAccountThreads(
@ -36,11 +38,6 @@ export class GmailPartialSyncService {
connectedAccountId: string,
maxResults = 500,
): Promise<void> {
const { dataSource: workspaceDataSource, dataSourceMetadata } =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId,
);
const connectedAccount = await this.connectedAccountService.getByIdOrFail(
connectedAccountId,
workspaceId,
@ -68,6 +65,11 @@ export class GmailPartialSyncService {
);
if (error && error.code === 404) {
await this.connectedAccountService.deleteHistoryId(
connectedAccountId,
workspaceId,
);
await this.fallbackToFullSync(workspaceId, connectedAccountId);
return;
@ -104,22 +106,23 @@ export class GmailPartialSyncService {
await this.fetchMessagesByBatchesService.fetchAllMessages(
messageQueries,
accessToken,
'gmail full-sync',
workspaceId,
connectedAccountId,
);
if (messagesToSave.length !== 0) {
await this.messageService.saveMessages(
await this.saveMessagesAndCreateContactsService.saveMessagesAndCreateContacts(
messagesToSave,
dataSourceMetadata,
workspaceDataSource,
connectedAccount,
gmailMessageChannelId,
workspaceId,
gmailMessageChannelId,
'gmail partial-sync',
);
}
if (messagesDeleted.length !== 0) {
await this.messageService.deleteMessages(
workspaceDataSource,
messagesDeleted,
gmailMessageChannelId,
workspaceId,

View File

@ -0,0 +1,163 @@
import { Injectable, Logger } from '@nestjs/common';
import { MessageChannelService } from 'src/workspace/messaging/repositories/message-channel/message-channel.service';
import { MessageParticipantService } from 'src/workspace/messaging/repositories/message-participant/message-participant.service';
import { MessageService } from 'src/workspace/messaging/repositories/message/message.service';
import { CreateCompaniesAndContactsService } from 'src/workspace/messaging/services/create-companies-and-contacts/create-companies-and-contacts.service';
import {
GmailMessage,
ParticipantWithMessageId,
} from 'src/workspace/messaging/types/gmail-message';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import { ConnectedAccountObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/connected-account.object-metadata';
import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record';
@Injectable()
export class SaveMessagesAndCreateContactsService {
private readonly logger = new Logger(
SaveMessagesAndCreateContactsService.name,
);
constructor(
private readonly messageService: MessageService,
private readonly messageChannelService: MessageChannelService,
private readonly createCompaniesAndContactsService: CreateCompaniesAndContactsService,
private readonly messageParticipantService: MessageParticipantService,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
async saveMessagesAndCreateContacts(
messagesToSave: GmailMessage[],
connectedAccount: ObjectRecord<ConnectedAccountObjectMetadata>,
workspaceId: string,
gmailMessageChannelId: string,
jobName?: string,
) {
const { dataSource: workspaceDataSource, dataSourceMetadata } =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId,
);
let startTime = Date.now();
const messageExternalIdsAndIdsMap = await this.messageService.saveMessages(
messagesToSave,
dataSourceMetadata,
workspaceDataSource,
connectedAccount,
gmailMessageChannelId,
workspaceId,
);
let endTime = Date.now();
this.logger.log(
`${jobName} saving messages for workspace ${workspaceId} and account ${
connectedAccount.id
} in ${endTime - startTime}ms`,
);
const isContactAutoCreationEnabled =
await this.messageChannelService.getIsContactAutoCreationEnabledByConnectedAccountIdOrFail(
connectedAccount.id,
workspaceId,
);
const participantsWithMessageId: ParticipantWithMessageId[] =
messagesToSave.flatMap((message) => {
const messageId = messageExternalIdsAndIdsMap.get(message.externalId);
return messageId
? message.participants.map((participant) => ({
...participant,
messageId,
}))
: [];
});
const contactsToCreate = messagesToSave
.filter((message) => connectedAccount.handle === message.fromHandle)
.flatMap((message) => message.participants);
if (isContactAutoCreationEnabled) {
startTime = Date.now();
await this.createCompaniesAndContactsService.createCompaniesAndContacts(
connectedAccount.handle,
contactsToCreate,
workspaceId,
);
const handles = participantsWithMessageId.map(
(participant) => participant.handle,
);
const messageParticipantsWithoutPersonIdAndWorkspaceMemberId =
await this.messageParticipantService.getByHandlesWithoutPersonIdAndWorkspaceMemberId(
handles,
workspaceId,
);
await this.messageParticipantService.updateMessageParticipantsAfterPeopleCreation(
messageParticipantsWithoutPersonIdAndWorkspaceMemberId,
workspaceId,
);
endTime = Date.now();
this.logger.log(
`${jobName} creating companies and contacts for workspace ${workspaceId} and account ${
connectedAccount.id
} in ${endTime - startTime}ms`,
);
}
startTime = Date.now();
await this.tryToSaveMessageParticipantsOrDeleteMessagesIfError(
participantsWithMessageId,
gmailMessageChannelId,
workspaceId,
connectedAccount,
jobName,
);
endTime = Date.now();
this.logger.log(
`${jobName} saving message participants for workspace ${workspaceId} and account in ${
connectedAccount.id
} ${endTime - startTime}ms`,
);
}
private async tryToSaveMessageParticipantsOrDeleteMessagesIfError(
participantsWithMessageId: ParticipantWithMessageId[],
gmailMessageChannelId: string,
workspaceId: string,
connectedAccount: ObjectRecord<ConnectedAccountObjectMetadata>,
jobName?: string,
) {
try {
await this.messageParticipantService.saveMessageParticipants(
participantsWithMessageId,
workspaceId,
);
} catch (error) {
this.logger.error(
`${jobName} error saving message participants for workspace ${workspaceId} and account ${connectedAccount.id}`,
error,
);
const messagesToDelete = participantsWithMessageId.map(
(participant) => participant.messageId,
);
await this.messageService.deleteMessages(
messagesToDelete,
gmailMessageChannelId,
workspaceId,
);
}
}
}

View File

@ -21,6 +21,8 @@ export type Participant = {
displayName: string;
};
export type ParticipantWithMessageId = Participant & { messageId: string };
export type ParticipantWithId = Participant & {
id: string;
};