diff --git a/packages/twenty-server/src/workspace/messaging/messaging.module.ts b/packages/twenty-server/src/workspace/messaging/messaging.module.ts index fa82fcb3e4..f019dcf259 100644 --- a/packages/twenty-server/src/workspace/messaging/messaging.module.ts +++ b/packages/twenty-server/src/workspace/messaging/messaging.module.ts @@ -25,6 +25,7 @@ import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity'; import { CreateCompaniesAndContactsModule } from 'src/workspace/messaging/services/create-companies-and-contacts/create-companies-and-contacts.module'; import { CompanyModule } from 'src/workspace/messaging/repositories/company/company.module'; import { PersonModule } from 'src/workspace/messaging/repositories/person/person.module'; +import { SaveMessagesAndCreateContactsService } from 'src/workspace/messaging/services/save-messages-and-create-contacts.service'; import { MessagingConnectedAccountListener } from 'src/workspace/messaging/listeners/messaging-connected-account.listener'; @Module({ imports: [ @@ -54,6 +55,7 @@ import { MessagingConnectedAccountListener } from 'src/workspace/messaging/liste MessagingWorkspaceMemberListener, MessagingMessageChannelListener, MessageService, + SaveMessagesAndCreateContactsService, MessagingConnectedAccountListener, ], exports: [ diff --git a/packages/twenty-server/src/workspace/messaging/repositories/connected-account/connected-account.service.ts b/packages/twenty-server/src/workspace/messaging/repositories/connected-account/connected-account.service.ts index 830f986975..5137d4bea1 100644 --- a/packages/twenty-server/src/workspace/messaging/repositories/connected-account/connected-account.service.ts +++ b/packages/twenty-server/src/workspace/messaging/repositories/connected-account/connected-account.service.ts @@ -83,6 +83,41 @@ export class ConnectedAccountService { ); } + public async updateLastSyncHistoryIdIfHigher( + historyId: string, + connectedAccountId: string, + workspaceId: string, + transactionManager?: EntityManager, + ) { + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); + + await this.workspaceDataSourceService.executeRawQuery( + `UPDATE ${dataSourceSchema}."connectedAccount" SET "lastSyncHistoryId" = $1 + WHERE "id" = $2 + AND ("lastSyncHistoryId" < $1 OR "lastSyncHistoryId" = '')`, + [historyId, connectedAccountId], + workspaceId, + transactionManager, + ); + } + + public async deleteHistoryId( + connectedAccountId: string, + workspaceId: string, + transactionManager?: EntityManager, + ) { + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); + + await this.workspaceDataSourceService.executeRawQuery( + `UPDATE ${dataSourceSchema}."connectedAccount" SET "lastSyncHistoryId" = '' WHERE "id" = $1`, + [connectedAccountId], + workspaceId, + transactionManager, + ); + } + public async updateAccessToken( accessToken: string, connectedAccountId: string, diff --git a/packages/twenty-server/src/workspace/messaging/repositories/message-participant/message-participant.service.ts b/packages/twenty-server/src/workspace/messaging/repositories/message-participant/message-participant.service.ts index 3cb8ccc65a..aebbf3a3a4 100644 --- a/packages/twenty-server/src/workspace/messaging/repositories/message-participant/message-participant.service.ts +++ b/packages/twenty-server/src/workspace/messaging/repositories/message-participant/message-participant.service.ts @@ -7,7 +7,7 @@ import { MessageParticipantObjectMetadata } from 'src/workspace/workspace-sync-m import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record'; import { ParticipantWithId, - Participant, + ParticipantWithMessageId, } from 'src/workspace/messaging/types/gmail-message'; import { PersonService } from 'src/workspace/messaging/repositories/person/person.service'; @@ -138,8 +138,7 @@ export class MessageParticipantService { } public async saveMessageParticipants( - participants: Participant[], - messageId: string, + participants: ParticipantWithMessageId[], workspaceId: string, transactionManager?: EntityManager, ): Promise { @@ -169,7 +168,7 @@ export class MessageParticipantService { ); const messageParticipantsToSave = participants.map((participant) => [ - messageId, + participant.messageId, participant.role, participant.handle, participant.displayName, diff --git a/packages/twenty-server/src/workspace/messaging/repositories/message/message.service.ts b/packages/twenty-server/src/workspace/messaging/repositories/message/message.service.ts index c13d977a36..0d81c11d95 100644 --- a/packages/twenty-server/src/workspace/messaging/repositories/message/message.service.ts +++ b/packages/twenty-server/src/workspace/messaging/repositories/message/message.service.ts @@ -122,7 +122,9 @@ export class MessageService { connectedAccount: ObjectRecord, gmailMessageChannelId: string, workspaceId: string, - ) { + ): Promise> { + const messageExternalIdsAndIdsMap = new Map(); + for (const message of messages) { if (this.shouldSkipImport(message)) { continue; @@ -159,6 +161,11 @@ export class MessageService { manager, ); + messageExternalIdsAndIdsMap.set( + message.externalId, + savedOrExistingMessageId, + ); + await manager.query( `INSERT INTO ${dataSourceMetadata.schema}."messageChannelMessageAssociation" ("messageChannelId", "messageId", "messageExternalId", "messageThreadId", "messageThreadExternalId") VALUES ($1, $2, $3, $4, $5)`, [ @@ -171,6 +178,8 @@ export class MessageService { ); }); } + + return messageExternalIdsAndIdsMap; } private shouldSkipImport(message: GmailMessage): boolean { @@ -216,53 +225,19 @@ export class MessageService { ], ); - const isContactAutoCreationEnabled = - await this.messageChannelService.getIsContactAutoCreationEnabledByConnectedAccountIdOrFail( - connectedAccount.id, - workspaceId, - ); - - if (isContactAutoCreationEnabled && messageDirection === 'outgoing') { - await this.createCompaniesAndContactsService.createCompaniesAndContacts( - connectedAccount.handle, - message.participants, - workspaceId, - manager, - ); - - const handles = message.participants.map( - (participant) => participant.handle, - ); - - const messageParticipantsWithoutPersonIdAndWorkspaceMemberId = - await this.messageParticipantService.getByHandlesWithoutPersonIdAndWorkspaceMemberId( - handles, - workspaceId, - ); - - await this.messageParticipantService.updateMessageParticipantsAfterPeopleCreation( - messageParticipantsWithoutPersonIdAndWorkspaceMemberId, - workspaceId, - manager, - ); - } - - await this.messageParticipantService.saveMessageParticipants( - message.participants, - newMessageId, - workspaceId, - manager, - ); - return Promise.resolve(newMessageId); } public async deleteMessages( - workspaceDataSource: DataSource, messagesDeletedMessageExternalIds: string[], gmailMessageChannelId: string, workspaceId: string, ) { + const workspaceDataSource = + await this.workspaceDataSourceService.connectToWorkspaceDataSource( + workspaceId, + ); + await workspaceDataSource?.transaction(async (manager: EntityManager) => { const messageChannelMessageAssociationsToDelete = await this.messageChannelMessageAssociationService.getByMessageExternalIdsAndMessageChannelId( diff --git a/packages/twenty-server/src/workspace/messaging/services/fetch-messages-by-batches.service.ts b/packages/twenty-server/src/workspace/messaging/services/fetch-messages-by-batches.service.ts index 62c2a68348..7684654ad6 100644 --- a/packages/twenty-server/src/workspace/messaging/services/fetch-messages-by-batches.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/fetch-messages-by-batches.service.ts @@ -1,4 +1,4 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import axios, { AxiosInstance, AxiosResponse } from 'axios'; import { simpleParser, AddressObject } from 'mailparser'; @@ -14,6 +14,7 @@ import { GmailMessageParsedResponse } from 'src/workspace/messaging/types/gmail- @Injectable() export class FetchMessagesByBatchesService { private readonly httpService: AxiosInstance; + private readonly logger = new Logger(FetchMessagesByBatchesService.name); constructor() { this.httpService = axios.create({ @@ -24,14 +25,38 @@ export class FetchMessagesByBatchesService { async fetchAllMessages( queries: MessageQuery[], accessToken: string, + jobName?: string, + workspaceId?: string, + connectedAccountId?: string, ): Promise<{ messages: GmailMessage[]; errors: any[] }> { + let startTime = Date.now(); const batchResponses = await this.fetchAllByBatches( queries, accessToken, 'batch_gmail_messages', ); + let endTime = Date.now(); - return this.formatBatchResponsesAsGmailMessages(batchResponses); + this.logger.log( + `${jobName} for workspace ${workspaceId} and account ${connectedAccountId} fetching ${ + queries.length + } messages in ${endTime - startTime}ms`, + ); + + startTime = Date.now(); + + const formattedResponse = + await this.formatBatchResponsesAsGmailMessages(batchResponses); + + endTime = Date.now(); + + this.logger.log( + `${jobName} for workspace ${workspaceId} and account ${connectedAccountId} formatting ${ + queries.length + } messages in ${endTime - startTime}ms`, + ); + + return formattedResponse; } async fetchAllByBatches( @@ -39,7 +64,7 @@ export class FetchMessagesByBatchesService { accessToken: string, boundary: string, ): Promise[]> { - const batchLimit = 100; + const batchLimit = 50; let batchOffset = 0; diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts index fa1654193f..509b997c36 100644 --- a/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts @@ -11,9 +11,8 @@ import { import { ConnectedAccountService } from 'src/workspace/messaging/repositories/connected-account/connected-account.service'; import { MessageChannelService } from 'src/workspace/messaging/repositories/message-channel/message-channel.service'; import { MessageChannelMessageAssociationService } from 'src/workspace/messaging/repositories/message-channel-message-association/message-channel-message-association.service'; -import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; -import { MessageService } from 'src/workspace/messaging/repositories/message/message.service'; import { createQueriesFromMessageIds } from 'src/workspace/messaging/utils/create-queries-from-message-ids.util'; +import { SaveMessagesAndCreateContactsService } from 'src/workspace/messaging/services/save-messages-and-create-contacts.service'; @Injectable() export class GmailFullSyncService { @@ -24,11 +23,10 @@ export class GmailFullSyncService { private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService, @Inject(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, - private readonly workspaceDataSourceService: WorkspaceDataSourceService, private readonly connectedAccountService: ConnectedAccountService, private readonly messageChannelService: MessageChannelService, private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService, - private readonly messageService: MessageService, + private readonly saveMessagesAndCreateContactsService: SaveMessagesAndCreateContactsService, ) {} public async fetchConnectedAccountThreads( @@ -36,11 +34,6 @@ export class GmailFullSyncService { connectedAccountId: string, nextPageToken?: string, ): Promise { - const { dataSource: workspaceDataSource, dataSourceMetadata } = - await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( - workspaceId, - ); - const connectedAccount = await this.connectedAccountService.getByIdOrFail( connectedAccountId, workspaceId, @@ -64,12 +57,22 @@ export class GmailFullSyncService { const gmailClient = await this.gmailClientProvider.getGmailClient(refreshToken); + let startTime = Date.now(); + const messages = await gmailClient.users.messages.list({ userId: 'me', maxResults: 500, pageToken: nextPageToken, }); + let endTime = Date.now(); + + this.logger.log( + `gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} getting messages list in ${ + endTime - startTime + }ms.`, + ); + const messagesData = messages.data.messages; const messageExternalIds = messagesData @@ -80,6 +83,8 @@ export class GmailFullSyncService { return; } + startTime = Date.now(); + const existingMessageChannelMessageAssociations = await this.messageChannelMessageAssociationService.getByMessageExternalIdsAndMessageChannelId( messageExternalIds, @@ -87,6 +92,14 @@ export class GmailFullSyncService { workspaceId, ); + endTime = Date.now(); + + this.logger.log( + `gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}: getting existing message channel message associations in ${ + endTime - startTime + }ms.`, + ); + const existingMessageChannelMessageAssociationsExternalIds = existingMessageChannelMessageAssociations.map( (messageChannelMessageAssociation) => @@ -102,13 +115,28 @@ export class GmailFullSyncService { const messageQueries = createQueriesFromMessageIds(messagesToFetch); + startTime = Date.now(); + const { messages: messagesToSave, errors } = await this.fetchMessagesByBatchesService.fetchAllMessages( messageQueries, accessToken, + 'gmail full-sync', + workspaceId, + connectedAccountId, ); + endTime = Date.now(); + + this.logger.log( + `gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}: fetching all messages in ${ + endTime - startTime + }ms.`, + ); + if (messagesToSave.length === 0) { + if (errors.length) throw new Error('Error fetching messages'); + this.logger.log( `gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to import.`, ); @@ -116,13 +144,12 @@ export class GmailFullSyncService { return; } - await this.messageService.saveMessages( + this.saveMessagesAndCreateContactsService.saveMessagesAndCreateContacts( messagesToSave, - dataSourceMetadata, - workspaceDataSource, connectedAccount, - gmailMessageChannelId, workspaceId, + gmailMessageChannelId, + 'gmail full-sync', ); if (errors.length) throw new Error('Error fetching messages'); @@ -135,12 +162,22 @@ export class GmailFullSyncService { if (!historyId) throw new Error('No history id found'); - await this.connectedAccountService.updateLastSyncHistoryId( + startTime = Date.now(); + + await this.connectedAccountService.updateLastSyncHistoryIdIfHigher( historyId, connectedAccount.id, workspaceId, ); + endTime = Date.now(); + + this.logger.log( + `gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}: updating last sync history id in ${ + endTime - startTime + }ms.`, + ); + this.logger.log( `gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} ${ nextPageToken ? `and ${nextPageToken} pageToken` : '' diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts index d5d04f6974..d62db8d2f0 100644 --- a/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts @@ -15,6 +15,7 @@ import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/w import { MessageChannelService } from 'src/workspace/messaging/repositories/message-channel/message-channel.service'; import { MessageService } from 'src/workspace/messaging/repositories/message/message.service'; import { createQueriesFromMessageIds } from 'src/workspace/messaging/utils/create-queries-from-message-ids.util'; +import { SaveMessagesAndCreateContactsService } from 'src/workspace/messaging/services/save-messages-and-create-contacts.service'; @Injectable() export class GmailPartialSyncService { @@ -29,6 +30,7 @@ export class GmailPartialSyncService { private readonly connectedAccountService: ConnectedAccountService, private readonly messageChannelService: MessageChannelService, private readonly messageService: MessageService, + private readonly saveMessagesAndCreateContactsService: SaveMessagesAndCreateContactsService, ) {} public async fetchConnectedAccountThreads( @@ -36,11 +38,6 @@ export class GmailPartialSyncService { connectedAccountId: string, maxResults = 500, ): Promise { - const { dataSource: workspaceDataSource, dataSourceMetadata } = - await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( - workspaceId, - ); - const connectedAccount = await this.connectedAccountService.getByIdOrFail( connectedAccountId, workspaceId, @@ -68,6 +65,11 @@ export class GmailPartialSyncService { ); if (error && error.code === 404) { + await this.connectedAccountService.deleteHistoryId( + connectedAccountId, + workspaceId, + ); + await this.fallbackToFullSync(workspaceId, connectedAccountId); return; @@ -104,22 +106,23 @@ export class GmailPartialSyncService { await this.fetchMessagesByBatchesService.fetchAllMessages( messageQueries, accessToken, + 'gmail full-sync', + workspaceId, + connectedAccountId, ); if (messagesToSave.length !== 0) { - await this.messageService.saveMessages( + await this.saveMessagesAndCreateContactsService.saveMessagesAndCreateContacts( messagesToSave, - dataSourceMetadata, - workspaceDataSource, connectedAccount, - gmailMessageChannelId, workspaceId, + gmailMessageChannelId, + 'gmail partial-sync', ); } if (messagesDeleted.length !== 0) { await this.messageService.deleteMessages( - workspaceDataSource, messagesDeleted, gmailMessageChannelId, workspaceId, diff --git a/packages/twenty-server/src/workspace/messaging/services/save-messages-and-create-contacts.service.ts b/packages/twenty-server/src/workspace/messaging/services/save-messages-and-create-contacts.service.ts new file mode 100644 index 0000000000..1bc86e4db5 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/services/save-messages-and-create-contacts.service.ts @@ -0,0 +1,163 @@ +import { Injectable, Logger } from '@nestjs/common'; + +import { MessageChannelService } from 'src/workspace/messaging/repositories/message-channel/message-channel.service'; +import { MessageParticipantService } from 'src/workspace/messaging/repositories/message-participant/message-participant.service'; +import { MessageService } from 'src/workspace/messaging/repositories/message/message.service'; +import { CreateCompaniesAndContactsService } from 'src/workspace/messaging/services/create-companies-and-contacts/create-companies-and-contacts.service'; +import { + GmailMessage, + ParticipantWithMessageId, +} from 'src/workspace/messaging/types/gmail-message'; +import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; +import { ConnectedAccountObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/connected-account.object-metadata'; +import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record'; + +@Injectable() +export class SaveMessagesAndCreateContactsService { + private readonly logger = new Logger( + SaveMessagesAndCreateContactsService.name, + ); + + constructor( + private readonly messageService: MessageService, + private readonly messageChannelService: MessageChannelService, + private readonly createCompaniesAndContactsService: CreateCompaniesAndContactsService, + private readonly messageParticipantService: MessageParticipantService, + private readonly workspaceDataSourceService: WorkspaceDataSourceService, + ) {} + + async saveMessagesAndCreateContacts( + messagesToSave: GmailMessage[], + connectedAccount: ObjectRecord, + workspaceId: string, + gmailMessageChannelId: string, + jobName?: string, + ) { + const { dataSource: workspaceDataSource, dataSourceMetadata } = + await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( + workspaceId, + ); + + let startTime = Date.now(); + + const messageExternalIdsAndIdsMap = await this.messageService.saveMessages( + messagesToSave, + dataSourceMetadata, + workspaceDataSource, + connectedAccount, + gmailMessageChannelId, + workspaceId, + ); + + let endTime = Date.now(); + + this.logger.log( + `${jobName} saving messages for workspace ${workspaceId} and account ${ + connectedAccount.id + } in ${endTime - startTime}ms`, + ); + + const isContactAutoCreationEnabled = + await this.messageChannelService.getIsContactAutoCreationEnabledByConnectedAccountIdOrFail( + connectedAccount.id, + workspaceId, + ); + + const participantsWithMessageId: ParticipantWithMessageId[] = + messagesToSave.flatMap((message) => { + const messageId = messageExternalIdsAndIdsMap.get(message.externalId); + + return messageId + ? message.participants.map((participant) => ({ + ...participant, + messageId, + })) + : []; + }); + + const contactsToCreate = messagesToSave + .filter((message) => connectedAccount.handle === message.fromHandle) + .flatMap((message) => message.participants); + + if (isContactAutoCreationEnabled) { + startTime = Date.now(); + + await this.createCompaniesAndContactsService.createCompaniesAndContacts( + connectedAccount.handle, + contactsToCreate, + workspaceId, + ); + + const handles = participantsWithMessageId.map( + (participant) => participant.handle, + ); + + const messageParticipantsWithoutPersonIdAndWorkspaceMemberId = + await this.messageParticipantService.getByHandlesWithoutPersonIdAndWorkspaceMemberId( + handles, + workspaceId, + ); + + await this.messageParticipantService.updateMessageParticipantsAfterPeopleCreation( + messageParticipantsWithoutPersonIdAndWorkspaceMemberId, + workspaceId, + ); + + endTime = Date.now(); + + this.logger.log( + `${jobName} creating companies and contacts for workspace ${workspaceId} and account ${ + connectedAccount.id + } in ${endTime - startTime}ms`, + ); + } + + startTime = Date.now(); + + await this.tryToSaveMessageParticipantsOrDeleteMessagesIfError( + participantsWithMessageId, + gmailMessageChannelId, + workspaceId, + connectedAccount, + jobName, + ); + + endTime = Date.now(); + + this.logger.log( + `${jobName} saving message participants for workspace ${workspaceId} and account in ${ + connectedAccount.id + } ${endTime - startTime}ms`, + ); + } + + private async tryToSaveMessageParticipantsOrDeleteMessagesIfError( + participantsWithMessageId: ParticipantWithMessageId[], + gmailMessageChannelId: string, + workspaceId: string, + connectedAccount: ObjectRecord, + jobName?: string, + ) { + try { + await this.messageParticipantService.saveMessageParticipants( + participantsWithMessageId, + workspaceId, + ); + } catch (error) { + this.logger.error( + `${jobName} error saving message participants for workspace ${workspaceId} and account ${connectedAccount.id}`, + error, + ); + + const messagesToDelete = participantsWithMessageId.map( + (participant) => participant.messageId, + ); + + await this.messageService.deleteMessages( + messagesToDelete, + gmailMessageChannelId, + workspaceId, + ); + } + } +} diff --git a/packages/twenty-server/src/workspace/messaging/types/gmail-message.ts b/packages/twenty-server/src/workspace/messaging/types/gmail-message.ts index 1bf268e7fc..8976ab033f 100644 --- a/packages/twenty-server/src/workspace/messaging/types/gmail-message.ts +++ b/packages/twenty-server/src/workspace/messaging/types/gmail-message.ts @@ -21,6 +21,8 @@ export type Participant = { displayName: string; }; +export type ParticipantWithMessageId = Participant & { messageId: string }; + export type ParticipantWithId = Participant & { id: string; };