From 5a61e34f4cdb3ebfdb2a0e6cac0dd3bf1bd313e5 Mon Sep 17 00:00:00 2001 From: bosiraphael <71827178+bosiraphael@users.noreply.github.com> Date: Fri, 12 Jan 2024 17:46:55 +0100 Subject: [PATCH] 3239 create a command to do a partial sync with the gmail api using the historyid (#3405) * create utils service * getLastSyncHistoryId * getHistory * add historyTypes messageAdded and messageDeleted * getMessageIdsAndThreadIdsNotInDatabase * wip * fix messageThreadId null * no need to fetch threads anymore * get messagesAdded in partial sync * adding errors * save lastSyncHistoryId * improve * renaming * create partial sync job * improve partial sync * adding messages with partial sync is working * now adding messages with partial sync is working * deleting messages and empty threads is working * wip * wip * fix bug to delete threads * update partial sync to cover edge cases * renaming * modify ambiguous naming * renaming --- .../auth/services/google-gmail.service.ts | 12 +- .../integrations/message-queue/jobs.module.ts | 11 +- ...etch-workspace-messages-commands.module.ts | 10 +- ....command.ts => gmail-full-sync.command.ts} | 37 +-- .../commands/gmail-partial-sync.command.ts | 68 +++++ ...-account.job.ts => gmail-full-sync.job.ts} | 20 +- .../messaging/jobs/gmail-partial-sync.job.ts | 40 +++ .../messaging/producers/messaging-producer.ts | 29 +- ...s => fetch-messages-by-batches.service.ts} | 127 ++------- .../fetch-workspace-messages.module.ts | 27 +- .../services/gmail-full-sync.service.ts | 113 ++++++++ .../services/gmail-partial-sync.service.ts | 235 +++++++++++++++++ ... => gmail-refresh-access-token.service.ts} | 2 +- ....service.ts => messaging-utils.service.ts} | 249 ++++++++++-------- .../workspace/messaging/types/gmailMessage.ts | 3 +- .../workspace/messaging/types/gmailThread.ts | 2 +- .../types/gmailThreadParsedResponse.ts | 14 - .../messaging/types/messageOrThreadQuery.ts | 2 +- 18 files changed, 705 insertions(+), 296 deletions(-) rename packages/twenty-server/src/workspace/messaging/commands/{fetch-workspace-messages.command.ts => gmail-full-sync.command.ts} (56%) create mode 100644 packages/twenty-server/src/workspace/messaging/commands/gmail-partial-sync.command.ts rename packages/twenty-server/src/workspace/messaging/jobs/{fetch-all-messages-from-connected-account.job.ts => gmail-full-sync.job.ts} (52%) create mode 100644 packages/twenty-server/src/workspace/messaging/jobs/gmail-partial-sync.job.ts rename packages/twenty-server/src/workspace/messaging/services/{fetch-batch-messages.service.ts => fetch-messages-by-batches.service.ts} (65%) create mode 100644 packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts create mode 100644 packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts rename packages/twenty-server/src/workspace/messaging/services/{refresh-access-token.service.ts => gmail-refresh-access-token.service.ts} (98%) rename packages/twenty-server/src/workspace/messaging/services/{fetch-workspace-messages.service.ts => messaging-utils.service.ts} (60%) delete mode 100644 packages/twenty-server/src/workspace/messaging/types/gmailThreadParsedResponse.ts diff --git a/packages/twenty-server/src/core/auth/services/google-gmail.service.ts b/packages/twenty-server/src/core/auth/services/google-gmail.service.ts index 5b035131e9..cb73dab967 100644 --- a/packages/twenty-server/src/core/auth/services/google-gmail.service.ts +++ b/packages/twenty-server/src/core/auth/services/google-gmail.service.ts @@ -5,12 +5,12 @@ import { v4 } from 'uuid'; import { DataSourceService } from 'src/metadata/data-source/data-source.service'; import { TypeORMService } from 'src/database/typeorm/typeorm.service'; import { SaveConnectedAccountInput } from 'src/core/auth/dto/save-connected-account'; -import { - FetchAllMessagesFromConnectedAccountJobData, - FetchAllMessagesFromConnectedAccountJob, -} from 'src/workspace/messaging/jobs/fetch-all-messages-from-connected-account.job'; import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; +import { + GmailFullSyncJob, + GmailFullSyncJobData, +} from 'src/workspace/messaging/jobs/gmail-full-sync.job'; @Injectable() export class GoogleGmailService { @@ -73,8 +73,8 @@ export class GoogleGmailService { ); }); - await this.messageQueueService.add( - FetchAllMessagesFromConnectedAccountJob.name, + await this.messageQueueService.add( + GmailFullSyncJob.name, { workspaceId, connectedAccountId, diff --git a/packages/twenty-server/src/integrations/message-queue/jobs.module.ts b/packages/twenty-server/src/integrations/message-queue/jobs.module.ts index 1104718464..dc7c345f79 100644 --- a/packages/twenty-server/src/integrations/message-queue/jobs.module.ts +++ b/packages/twenty-server/src/integrations/message-queue/jobs.module.ts @@ -2,7 +2,7 @@ import { Module } from '@nestjs/common'; import { ModuleRef } from '@nestjs/core'; import { HttpModule } from '@nestjs/axios'; -import { FetchAllMessagesFromConnectedAccountJob } from 'src/workspace/messaging/jobs/fetch-all-messages-from-connected-account.job'; +import { GmailFullSyncJob } from 'src/workspace/messaging/jobs/gmail-full-sync.job'; import { CallWebhookJobsJob } from 'src/workspace/workspace-query-runner/jobs/call-webhook-jobs.job'; import { CallWebhookJob } from 'src/workspace/workspace-query-runner/jobs/call-webhook.job'; import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module'; @@ -10,6 +10,7 @@ import { ObjectMetadataModule } from 'src/metadata/object-metadata/object-metada import { DataSourceModule } from 'src/metadata/data-source/data-source.module'; import { TypeORMModule } from 'src/database/typeorm/typeorm.module'; import { FetchWorkspaceMessagesModule } from 'src/workspace/messaging/services/fetch-workspace-messages.module'; +import { GmailPartialSyncJob } from 'src/workspace/messaging/jobs/gmail-partial-sync.job'; import { EmailSenderJob } from 'src/integrations/email/email-sender.job'; @Module({ @@ -23,8 +24,12 @@ import { EmailSenderJob } from 'src/integrations/email/email-sender.job'; ], providers: [ { - provide: FetchAllMessagesFromConnectedAccountJob.name, - useClass: FetchAllMessagesFromConnectedAccountJob, + provide: GmailFullSyncJob.name, + useClass: GmailFullSyncJob, + }, + { + provide: GmailPartialSyncJob.name, + useClass: GmailPartialSyncJob, }, { provide: CallWebhookJobsJob.name, diff --git a/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages-commands.module.ts b/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages-commands.module.ts index 85531150d2..64ff68d0e8 100644 --- a/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages-commands.module.ts +++ b/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages-commands.module.ts @@ -4,8 +4,10 @@ import { TypeOrmModule } from '@nestjs/typeorm'; import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity'; import { TypeORMModule } from 'src/database/typeorm/typeorm.module'; import { DataSourceModule } from 'src/metadata/data-source/data-source.module'; -import { FetchWorkspaceMessagesCommand } from 'src/workspace/messaging/commands/fetch-workspace-messages.command'; +import { GmailFullSyncCommand } from 'src/workspace/messaging/commands/gmail-full-sync.command'; +import { GmailPartialSyncCommand } from 'src/workspace/messaging/commands/gmail-partial-sync.command'; import { MessagingModule } from 'src/workspace/messaging/messaging.module'; +import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; @Module({ imports: [ @@ -14,6 +16,10 @@ import { MessagingModule } from 'src/workspace/messaging/messaging.module'; TypeORMModule, TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), ], - providers: [FetchWorkspaceMessagesCommand], + providers: [ + GmailFullSyncCommand, + GmailPartialSyncCommand, + MessagingUtilsService, + ], }) export class FetchWorkspaceMessagesCommandsModule {} diff --git a/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages.command.ts b/packages/twenty-server/src/workspace/messaging/commands/gmail-full-sync.command.ts similarity index 56% rename from packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages.command.ts rename to packages/twenty-server/src/workspace/messaging/commands/gmail-full-sync.command.ts index 08bc0855ee..dc8b4a0c39 100644 --- a/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages.command.ts +++ b/packages/twenty-server/src/workspace/messaging/commands/gmail-full-sync.command.ts @@ -4,23 +4,21 @@ import { Command, CommandRunner, Option } from 'nest-commander'; import { Repository } from 'typeorm'; import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity'; -import { TypeORMService } from 'src/database/typeorm/typeorm.service'; -import { DataSourceService } from 'src/metadata/data-source/data-source.service'; import { MessagingProducer } from 'src/workspace/messaging/producers/messaging-producer'; +import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; -interface FetchWorkspaceMessagesOptions { +interface GmailFullSyncOptions { workspaceId: string; } @Command({ - name: 'workspace:fetch-messages', + name: 'workspace:gmail-full-sync', description: 'Fetch messages of all workspaceMembers in a workspace.', }) -export class FetchWorkspaceMessagesCommand extends CommandRunner { +export class GmailFullSyncCommand extends CommandRunner { constructor( - private readonly dataSourceService: DataSourceService, - private readonly typeORMService: TypeORMService, private readonly messagingProducer: MessagingProducer, + private readonly utils: MessagingUtilsService, @InjectRepository(FeatureFlagEntity, 'core') private readonly featureFlagRepository: Repository, @@ -30,7 +28,7 @@ export class FetchWorkspaceMessagesCommand extends CommandRunner { async run( _passedParam: string[], - options: FetchWorkspaceMessagesOptions, + options: GmailFullSyncOptions, ): Promise { const isMessagingEnabled = await this.featureFlagRepository.findOneBy({ workspaceId: options.workspaceId, @@ -57,28 +55,11 @@ export class FetchWorkspaceMessagesCommand extends CommandRunner { } private async fetchWorkspaceMessages(workspaceId: string): Promise { - const dataSourceMetadata = - await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( - workspaceId, - ); - - const workspaceDataSource = - await this.typeORMService.connectToDataSource(dataSourceMetadata); - - if (!workspaceDataSource) { - throw new Error('No workspace data source found'); - } - - const connectedAccounts = await workspaceDataSource?.query( - `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail'`, - ); - - if (!connectedAccounts || connectedAccounts.length === 0) { - throw new Error('No connected account found'); - } + const connectedAccounts = + await this.utils.getConnectedAccountsFromWorkspaceId(workspaceId); for (const connectedAccount of connectedAccounts) { - await this.messagingProducer.enqueueFetchAllMessagesFromConnectedAccount( + await this.messagingProducer.enqueueGmailFullSync( { workspaceId, connectedAccountId: connectedAccount.id }, `${workspaceId}-${connectedAccount.id}`, ); diff --git a/packages/twenty-server/src/workspace/messaging/commands/gmail-partial-sync.command.ts b/packages/twenty-server/src/workspace/messaging/commands/gmail-partial-sync.command.ts new file mode 100644 index 0000000000..ce7950cd64 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/commands/gmail-partial-sync.command.ts @@ -0,0 +1,68 @@ +import { InjectRepository } from '@nestjs/typeorm'; + +import { Command, CommandRunner, Option } from 'nest-commander'; +import { Repository } from 'typeorm'; + +import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity'; +import { MessagingProducer } from 'src/workspace/messaging/producers/messaging-producer'; +import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; + +interface GmailPartialSyncOptions { + workspaceId: string; +} + +@Command({ + name: 'workspace:gmail-partial-sync', + description: 'Fetch messages of all workspaceMembers in a workspace.', +}) +export class GmailPartialSyncCommand extends CommandRunner { + constructor( + private readonly messagingProducer: MessagingProducer, + private readonly utils: MessagingUtilsService, + + @InjectRepository(FeatureFlagEntity, 'core') + private readonly featureFlagRepository: Repository, + ) { + super(); + } + + async run( + _passedParam: string[], + options: GmailPartialSyncOptions, + ): Promise { + const isMessagingEnabled = await this.featureFlagRepository.findOneBy({ + workspaceId: options.workspaceId, + key: 'IS_MESSAGING_ENABLED', + value: true, + }); + + if (!isMessagingEnabled) { + throw new Error('Messaging is not enabled for this workspace'); + } + + await this.fetchWorkspaceMessages(options.workspaceId); + + return; + } + + @Option({ + flags: '-w, --workspace-id [workspace_id]', + description: 'workspace id', + required: true, + }) + parseWorkspaceId(value: string): string { + return value; + } + + private async fetchWorkspaceMessages(workspaceId: string): Promise { + const connectedAccounts = + await this.utils.getConnectedAccountsFromWorkspaceId(workspaceId); + + for (const connectedAccount of connectedAccounts) { + await this.messagingProducer.enqueueGmailPartialSync( + { workspaceId, connectedAccountId: connectedAccount.id }, + `${workspaceId}-${connectedAccount.id}`, + ); + } + } +} diff --git a/packages/twenty-server/src/workspace/messaging/jobs/fetch-all-messages-from-connected-account.job.ts b/packages/twenty-server/src/workspace/messaging/jobs/gmail-full-sync.job.ts similarity index 52% rename from packages/twenty-server/src/workspace/messaging/jobs/fetch-all-messages-from-connected-account.job.ts rename to packages/twenty-server/src/workspace/messaging/jobs/gmail-full-sync.job.ts index bd77ba685d..5320f8ed42 100644 --- a/packages/twenty-server/src/workspace/messaging/jobs/fetch-all-messages-from-connected-account.job.ts +++ b/packages/twenty-server/src/workspace/messaging/jobs/gmail-full-sync.job.ts @@ -3,33 +3,29 @@ import { Injectable } from '@nestjs/common'; import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/message-queue-job.interface'; import { EnvironmentService } from 'src/integrations/environment/environment.service'; -import { RefreshAccessTokenService } from 'src/workspace/messaging/services/refresh-access-token.service'; -import { FetchWorkspaceMessagesService } from 'src/workspace/messaging/services/fetch-workspace-messages.service'; +import { GmailRefreshAccessTokenService } from 'src/workspace/messaging/services/gmail-refresh-access-token.service'; +import { GmailFullSyncService } from 'src/workspace/messaging/services/gmail-full-sync.service'; -export type FetchAllMessagesFromConnectedAccountJobData = { +export type GmailFullSyncJobData = { workspaceId: string; connectedAccountId: string; }; @Injectable() -export class FetchAllMessagesFromConnectedAccountJob - implements MessageQueueJob -{ +export class GmailFullSyncJob implements MessageQueueJob { constructor( private readonly environmentService: EnvironmentService, - private readonly refreshAccessTokenService: RefreshAccessTokenService, - private readonly fetchWorkspaceMessagesService: FetchWorkspaceMessagesService, + private readonly gmailRefreshAccessTokenService: GmailRefreshAccessTokenService, + private readonly fetchWorkspaceMessagesService: GmailFullSyncService, ) {} - async handle( - data: FetchAllMessagesFromConnectedAccountJobData, - ): Promise { + async handle(data: GmailFullSyncJobData): Promise { console.log( `fetching messages for workspace ${data.workspaceId} and account ${ data.connectedAccountId } with ${this.environmentService.getMessageQueueDriverType()}`, ); - await this.refreshAccessTokenService.refreshAndSaveAccessToken( + await this.gmailRefreshAccessTokenService.refreshAndSaveAccessToken( data.workspaceId, data.connectedAccountId, ); diff --git a/packages/twenty-server/src/workspace/messaging/jobs/gmail-partial-sync.job.ts b/packages/twenty-server/src/workspace/messaging/jobs/gmail-partial-sync.job.ts new file mode 100644 index 0000000000..1c0951f8d7 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/jobs/gmail-partial-sync.job.ts @@ -0,0 +1,40 @@ +import { Injectable } from '@nestjs/common'; + +import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/message-queue-job.interface'; + +import { EnvironmentService } from 'src/integrations/environment/environment.service'; +import { GmailRefreshAccessTokenService } from 'src/workspace/messaging/services/gmail-refresh-access-token.service'; +import { GmailPartialSyncService } from 'src/workspace/messaging/services/gmail-partial-sync.service'; + +export type GmailPartialSyncJobData = { + workspaceId: string; + connectedAccountId: string; +}; + +@Injectable() +export class GmailPartialSyncJob + implements MessageQueueJob +{ + constructor( + private readonly environmentService: EnvironmentService, + private readonly gmailRefreshAccessTokenService: GmailRefreshAccessTokenService, + private readonly gmailPartialSyncService: GmailPartialSyncService, + ) {} + + async handle(data: GmailPartialSyncJobData): Promise { + console.log( + `fetching messages for workspace ${data.workspaceId} and account ${ + data.connectedAccountId + } with ${this.environmentService.getMessageQueueDriverType()}`, + ); + await this.gmailRefreshAccessTokenService.refreshAndSaveAccessToken( + data.workspaceId, + data.connectedAccountId, + ); + + await this.gmailPartialSyncService.fetchConnectedAccountThreads( + data.workspaceId, + data.connectedAccountId, + ); + } +} diff --git a/packages/twenty-server/src/workspace/messaging/producers/messaging-producer.ts b/packages/twenty-server/src/workspace/messaging/producers/messaging-producer.ts index d6b28f1311..79b015c451 100644 --- a/packages/twenty-server/src/workspace/messaging/producers/messaging-producer.ts +++ b/packages/twenty-server/src/workspace/messaging/producers/messaging-producer.ts @@ -3,9 +3,13 @@ import { Inject, Injectable } from '@nestjs/common'; import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; import { - FetchAllMessagesFromConnectedAccountJob, - FetchAllMessagesFromConnectedAccountJobData, -} from 'src/workspace/messaging/jobs/fetch-all-messages-from-connected-account.job'; + GmailFullSyncJob, + GmailFullSyncJobData, +} from 'src/workspace/messaging/jobs/gmail-full-sync.job'; +import { + GmailPartialSyncJob, + GmailPartialSyncJobData, +} from 'src/workspace/messaging/jobs/gmail-partial-sync.job'; @Injectable() export class MessagingProducer { @@ -14,12 +18,23 @@ export class MessagingProducer { private readonly messageQueueService: MessageQueueService, ) {} - async enqueueFetchAllMessagesFromConnectedAccount( - data: FetchAllMessagesFromConnectedAccountJobData, + async enqueueGmailFullSync(data: GmailFullSyncJobData, singletonKey: string) { + await this.messageQueueService.add( + GmailFullSyncJob.name, + data, + { + id: singletonKey, + retryLimit: 2, + }, + ); + } + + async enqueueGmailPartialSync( + data: GmailPartialSyncJobData, singletonKey: string, ) { - await this.messageQueueService.add( - FetchAllMessagesFromConnectedAccountJob.name, + await this.messageQueueService.add( + GmailPartialSyncJob.name, data, { id: singletonKey, diff --git a/packages/twenty-server/src/workspace/messaging/services/fetch-batch-messages.service.ts b/packages/twenty-server/src/workspace/messaging/services/fetch-messages-by-batches.service.ts similarity index 65% rename from packages/twenty-server/src/workspace/messaging/services/fetch-batch-messages.service.ts rename to packages/twenty-server/src/workspace/messaging/services/fetch-messages-by-batches.service.ts index e2e17946e3..75bd56ce60 100644 --- a/packages/twenty-server/src/workspace/messaging/services/fetch-batch-messages.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/fetch-messages-by-batches.service.ts @@ -7,16 +7,12 @@ import { GmailMessage, Recipient, } from 'src/workspace/messaging/types/gmailMessage'; -import { MessageOrThreadQuery } from 'src/workspace/messaging/types/messageOrThreadQuery'; +import { MessageQuery } from 'src/workspace/messaging/types/messageOrThreadQuery'; import { GmailMessageParsedResponse } from 'src/workspace/messaging/types/gmailMessageParsedResponse'; -import { GmailThreadParsedResponse } from 'src/workspace/messaging/types/gmailThreadParsedResponse'; -import { GmailThread } from 'src/workspace/messaging/types/gmailThread'; -import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider'; @Injectable() -export class FetchBatchMessagesService { +export class FetchMessagesByBatchesService { private readonly httpService: AxiosInstance; - private readonly gmailClientProvider: GmailClientProvider; constructor() { this.httpService = axios.create({ @@ -25,39 +21,20 @@ export class FetchBatchMessagesService { } async fetchAllMessages( - queries: MessageOrThreadQuery[], + queries: MessageQuery[], accessToken: string, - ): Promise { + ): Promise<{ messages: GmailMessage[]; errors: any[] }> { const batchResponses = await this.fetchAllByBatches( queries, accessToken, 'batch_gmail_messages', ); - const messages = - await this.formatBatchResponsesAsGmailMessages(batchResponses); - - return messages; - } - - async fetchAllThreads( - queries: MessageOrThreadQuery[], - accessToken: string, - ): Promise { - const batchResponses = await this.fetchAllByBatches( - queries, - accessToken, - 'batch_gmail_threads', - ); - - const threads = - await this.formatBatchResponsesAsGmailThreads(batchResponses); - - return threads; + return this.formatBatchResponsesAsGmailMessages(batchResponses); } async fetchAllByBatches( - queries: MessageOrThreadQuery[], + queries: MessageQuery[], accessToken: string, boundary: string, ): Promise[]> { @@ -85,7 +62,7 @@ export class FetchBatchMessagesService { } async fetchBatch( - queries: MessageOrThreadQuery[], + queries: MessageQuery[], accessToken: string, batchOffset: number, batchLimit: number, @@ -107,13 +84,10 @@ export class FetchBatchMessagesService { return response; } - createBatchBody( - messageQueries: MessageOrThreadQuery[], - boundary: string, - ): string { + createBatchBody(queries: MessageQuery[], boundary: string): string { let batchBody: string[] = []; - messageQueries.forEach(function (call) { + queries.forEach(function (call) { const method = 'GET'; const uri = call.uri; @@ -136,10 +110,8 @@ export class FetchBatchMessagesService { parseBatch( responseCollection: AxiosResponse, - ): GmailMessageParsedResponse[] | GmailThreadParsedResponse[] { - const responseItems: - | GmailMessageParsedResponse[] - | GmailThreadParsedResponse[] = []; + ): GmailMessageParsedResponse[] { + const responseItems: GmailMessageParsedResponse[] = []; const boundary = this.getBatchSeparator(responseCollection); @@ -179,20 +151,24 @@ export class FetchBatchMessagesService { async formatBatchResponseAsGmailMessage( responseCollection: AxiosResponse, - ): Promise { + ): Promise<{ messages: GmailMessage[]; errors: any[] }> { const parsedResponses = this.parseBatch( responseCollection, ) as GmailMessageParsedResponse[]; + const errors: any = []; + const formattedResponse = Promise.all( parsedResponses.map(async (message: GmailMessageParsedResponse) => { if (message.error) { console.log('Error', message.error); + errors.push(message.error); + return; } - const { id, threadId, internalDate, raw } = message; + const { historyId, id, threadId, internalDate, raw } = message; const body = atob(raw?.replace(/-/g, '+').replace(/_/g, '/')); @@ -222,10 +198,11 @@ export class FetchBatchMessagesService { ]; const messageFromGmail: GmailMessage = { + historyId, externalId: id, headerMessageId: messageId || '', subject: subject || '', - messageThreadId: threadId, + messageThreadExternalId: threadId, internalDate, fromHandle: from.value[0].address || '', fromDisplayName: from.value[0].name || '', @@ -238,15 +215,17 @@ export class FetchBatchMessagesService { return messageFromGmail; } catch (error) { console.log('Error', error); + + errors.push(error); } }), ); - const filteredResponse = (await formattedResponse).filter( + const filteredMessages = (await formattedResponse).filter( (message) => message, ) as GmailMessage[]; - return filteredResponse; + return { messages: filteredMessages, errors }; } formatAddressObjectAsArray( @@ -281,65 +260,17 @@ export class FetchBatchMessagesService { async formatBatchResponsesAsGmailMessages( batchResponses: AxiosResponse[], - ): Promise { - const formattedResponses = await Promise.all( + ): Promise<{ messages: GmailMessage[]; errors: any[] }> { + const messagesAndErrors = await Promise.all( batchResponses.map(async (response) => { - const formattedResponse = - await this.formatBatchResponseAsGmailMessage(response); - - return formattedResponse; + return this.formatBatchResponseAsGmailMessage(response); }), ); - return formattedResponses.flat(); - } + const messages = messagesAndErrors.map((item) => item.messages).flat(); - async formatBatchResponseAsGmailThread( - responseCollection: AxiosResponse, - ): Promise { - const parsedResponses = this.parseBatch( - responseCollection, - ) as GmailThreadParsedResponse[]; + const errors = messagesAndErrors.map((item) => item.errors).flat(); - const formattedResponse = Promise.all( - parsedResponses.map(async (thread: GmailThreadParsedResponse) => { - if (thread.error) { - console.log('Error', thread.error); - - return; - } - try { - const { id, messages } = thread; - - return { - id, - messageIds: messages.map((message) => message.id) || [], - }; - } catch (error) { - console.log('Error', error); - } - }), - ); - - const filteredResponse = (await formattedResponse).filter( - (item) => item, - ) as GmailThread[]; - - return filteredResponse; - } - - async formatBatchResponsesAsGmailThreads( - batchResponses: AxiosResponse[], - ): Promise { - const formattedResponses = await Promise.all( - batchResponses.map(async (response) => { - const formattedResponse = - await this.formatBatchResponseAsGmailThread(response); - - return formattedResponse; - }), - ); - - return formattedResponses.flat(); + return { messages, errors }; } } diff --git a/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.module.ts b/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.module.ts index 68825cebec..eb1e1ff8eb 100644 --- a/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.module.ts +++ b/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.module.ts @@ -4,10 +4,12 @@ import { TypeORMModule } from 'src/database/typeorm/typeorm.module'; import { EnvironmentModule } from 'src/integrations/environment/environment.module'; import { DataSourceModule } from 'src/metadata/data-source/data-source.module'; import { MessagingModule } from 'src/workspace/messaging/messaging.module'; -import { MessagingProvidersModule } from 'src/workspace/messaging/providers/messaging-providers.module'; -import { FetchBatchMessagesService } from 'src/workspace/messaging/services/fetch-batch-messages.service'; -import { FetchWorkspaceMessagesService } from 'src/workspace/messaging/services/fetch-workspace-messages.service'; -import { RefreshAccessTokenService } from 'src/workspace/messaging/services/refresh-access-token.service'; +import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider'; +import { FetchMessagesByBatchesService } from 'src/workspace/messaging/services/fetch-messages-by-batches.service'; +import { GmailFullSyncService } from 'src/workspace/messaging/services/gmail-full-sync.service'; +import { GmailPartialSyncService } from 'src/workspace/messaging/services/gmail-partial-sync.service'; +import { GmailRefreshAccessTokenService } from 'src/workspace/messaging/services/gmail-refresh-access-token.service'; +import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; @Module({ imports: [ @@ -15,13 +17,20 @@ import { RefreshAccessTokenService } from 'src/workspace/messaging/services/refr TypeORMModule, DataSourceModule, EnvironmentModule, - MessagingProvidersModule, ], providers: [ - FetchWorkspaceMessagesService, - FetchBatchMessagesService, - RefreshAccessTokenService, + GmailFullSyncService, + GmailPartialSyncService, + FetchMessagesByBatchesService, + GmailRefreshAccessTokenService, + MessagingUtilsService, + GmailClientProvider, + ], + exports: [ + GmailPartialSyncService, + GmailFullSyncService, + GmailRefreshAccessTokenService, + MessagingUtilsService, ], - exports: [FetchWorkspaceMessagesService, RefreshAccessTokenService], }) export class FetchWorkspaceMessagesModule {} diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts new file mode 100644 index 0000000000..1fa1f632ef --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts @@ -0,0 +1,113 @@ +import { Injectable } from '@nestjs/common'; + +import { FetchMessagesByBatchesService } from 'src/workspace/messaging/services/fetch-messages-by-batches.service'; +import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider'; +import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; + +@Injectable() +export class GmailFullSyncService { + constructor( + private readonly gmailClientProvider: GmailClientProvider, + private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService, + private readonly utils: MessagingUtilsService, + ) {} + + public async fetchConnectedAccountThreads( + workspaceId: string, + connectedAccountId: string, + maxResults = 500, + ): Promise { + const { workspaceDataSource, dataSourceMetadata, connectedAccount } = + await this.utils.getDataSourceMetadataWorkspaceMetadataAndConnectedAccount( + workspaceId, + connectedAccountId, + ); + + const accessToken = connectedAccount.accessToken; + const refreshToken = connectedAccount.refreshToken; + + if (!refreshToken) { + throw new Error('No refresh token found'); + } + + const gmailClient = + await this.gmailClientProvider.getGmailClient(refreshToken); + + const messages = await gmailClient.users.messages.list({ + userId: 'me', + maxResults, + }); + + const messagesData = messages.data.messages; + + const messageExternalIds = messagesData + ? messagesData.map((message) => message.id || '') + : []; + + if (!messagesData || messagesData?.length === 0) { + return; + } + + const { savedMessageIds, savedThreadIds } = + await this.utils.getSavedMessageIdsAndThreadIds( + messageExternalIds, + connectedAccountId, + dataSourceMetadata, + workspaceDataSource, + ); + + const messageIdsToSave = messageExternalIds.filter( + (messageId) => !savedMessageIds.includes(messageId), + ); + + const messageQueries = + this.utils.createQueriesFromMessageIds(messageIdsToSave); + + const { messages: messagesToSave, errors } = + await this.fetchMessagesByBatchesService.fetchAllMessages( + messageQueries, + accessToken, + ); + + const threads = this.utils.getThreadsFromMessages(messagesToSave); + + const threadsToSave = threads.filter( + (threadId) => !savedThreadIds.includes(threadId.id), + ); + + await this.utils.saveMessageThreads( + threadsToSave, + dataSourceMetadata, + workspaceDataSource, + connectedAccount.id, + ); + + await this.utils.saveMessages( + messagesToSave, + dataSourceMetadata, + workspaceDataSource, + connectedAccount, + ); + + if (errors.length) throw new Error('Error fetching messages'); + + if (messagesToSave.length === 0) { + return; + } + + const lastModifiedMessageId = messagesData[0].id; + + const historyId = messagesToSave.find( + (message) => message.externalId === lastModifiedMessageId, + )?.historyId; + + if (!historyId) throw new Error('No history id found'); + + await this.utils.saveLastSyncHistoryId( + historyId, + connectedAccount.id, + dataSourceMetadata, + workspaceDataSource, + ); + } +} diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts new file mode 100644 index 0000000000..63022acade --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts @@ -0,0 +1,235 @@ +import { Inject, Injectable } from '@nestjs/common'; + +import { gmail_v1 } from 'googleapis'; + +import { FetchMessagesByBatchesService } from 'src/workspace/messaging/services/fetch-messages-by-batches.service'; +import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider'; +import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; +import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; +import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; +import { + GmailFullSyncJob, + GmailFullSyncJobData, +} from 'src/workspace/messaging/jobs/gmail-full-sync.job'; + +@Injectable() +export class GmailPartialSyncService { + constructor( + private readonly gmailClientProvider: GmailClientProvider, + private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService, + private readonly utils: MessagingUtilsService, + @Inject(MessageQueue.messagingQueue) + private readonly messageQueueService: MessageQueueService, + ) {} + + private async getHistory( + workspaceId: string, + connectedAccountId: string, + lastSyncHistoryId: string, + maxResults: number, + ) { + const { connectedAccount } = + await this.utils.getDataSourceMetadataWorkspaceMetadataAndConnectedAccount( + workspaceId, + connectedAccountId, + ); + + const gmailClient = await this.gmailClientProvider.getGmailClient( + connectedAccount.refreshToken, + ); + + const history = await gmailClient.users.history.list({ + userId: 'me', + startHistoryId: lastSyncHistoryId, + historyTypes: ['messageAdded', 'messageDeleted'], + maxResults, + }); + + return history.data; + } + + public async fetchConnectedAccountThreads( + workspaceId: string, + connectedAccountId: string, + maxResults = 500, + ): Promise { + const { workspaceDataSource, dataSourceMetadata, connectedAccount } = + await this.utils.getDataSourceMetadataWorkspaceMetadataAndConnectedAccount( + workspaceId, + connectedAccountId, + ); + + const lastSyncHistoryId = connectedAccount.lastSyncHistoryId; + + if (!lastSyncHistoryId) { + // Fall back to full sync + + await this.messageQueueService.add( + GmailFullSyncJob.name, + { workspaceId, connectedAccountId }, + { + id: `${workspaceId}-${connectedAccount.id}`, + retryLimit: 2, + }, + ); + + return; + } + + const accessToken = connectedAccount.accessToken; + const refreshToken = connectedAccount.refreshToken; + + if (!refreshToken) { + throw new Error('No refresh token found'); + } + + const history = await this.getHistory( + workspaceId, + connectedAccountId, + lastSyncHistoryId, + maxResults, + ); + + const historyId = history.historyId; + + if (!historyId) { + throw new Error('No history id found'); + } + + if (historyId === lastSyncHistoryId) { + return; + } + + if (!history.history) { + await this.utils.saveLastSyncHistoryId( + historyId, + connectedAccountId, + dataSourceMetadata, + workspaceDataSource, + ); + + return; + } + + const { messagesAdded, messagesDeleted } = + await this.getMessageIdsAndThreadIdsFromHistory(history); + + const { + savedMessageIds: messagesAddedAlreadySaved, + savedThreadIds: threadsAddedAlreadySaved, + } = await this.utils.getSavedMessageIdsAndThreadIds( + messagesAdded, + connectedAccountId, + dataSourceMetadata, + workspaceDataSource, + ); + + const messageExternalIdsToSave = messagesAdded.filter( + (messageId) => + !messagesAddedAlreadySaved.includes(messageId) && + !messagesDeleted.includes(messageId), + ); + + const { savedMessageIds: messagesDeletedAlreadySaved } = + await this.utils.getSavedMessageIdsAndThreadIds( + messagesDeleted, + connectedAccountId, + dataSourceMetadata, + workspaceDataSource, + ); + + const messageExternalIdsToDelete = messagesDeleted.filter((messageId) => + messagesDeletedAlreadySaved.includes(messageId), + ); + + const messageQueries = this.utils.createQueriesFromMessageIds( + messageExternalIdsToSave, + ); + + const { messages: messagesToSave, errors } = + await this.fetchMessagesByBatchesService.fetchAllMessages( + messageQueries, + accessToken, + ); + + const threads = this.utils.getThreadsFromMessages(messagesToSave); + + const threadsToSave = threads.filter( + (thread) => !threadsAddedAlreadySaved.includes(thread.id), + ); + + await this.utils.saveMessageThreads( + threadsToSave, + dataSourceMetadata, + workspaceDataSource, + connectedAccount.id, + ); + + await this.utils.saveMessages( + messagesToSave, + dataSourceMetadata, + workspaceDataSource, + connectedAccount, + ); + + await this.utils.deleteMessages( + messageExternalIdsToDelete, + dataSourceMetadata, + workspaceDataSource, + ); + + await this.utils.deleteEmptyThreads( + messagesDeleted, + connectedAccountId, + dataSourceMetadata, + workspaceDataSource, + ); + + if (errors.length) throw new Error('Error fetching messages'); + + await this.utils.saveLastSyncHistoryId( + historyId, + connectedAccount.id, + dataSourceMetadata, + workspaceDataSource, + ); + } + + private async getMessageIdsAndThreadIdsFromHistory( + history: gmail_v1.Schema$ListHistoryResponse, + ): Promise<{ + messagesAdded: string[]; + messagesDeleted: string[]; + }> { + if (!history.history) throw new Error('No history found'); + + const { messagesAdded, messagesDeleted } = history.history.reduce( + ( + acc: { + messagesAdded: string[]; + messagesDeleted: string[]; + }, + history, + ) => { + const messagesAdded = history.messagesAdded?.map( + (messageAdded) => messageAdded.message?.id || '', + ); + + const messagesDeleted = history.messagesDeleted?.map( + (messageDeleted) => messageDeleted.message?.id || '', + ); + + if (messagesAdded) acc.messagesAdded.push(...messagesAdded); + if (messagesDeleted) acc.messagesDeleted.push(...messagesDeleted); + + return acc; + }, + { messagesAdded: [], messagesDeleted: [] }, + ); + + return { + messagesAdded, + messagesDeleted, + }; + } +} diff --git a/packages/twenty-server/src/workspace/messaging/services/refresh-access-token.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-refresh-access-token.service.ts similarity index 98% rename from packages/twenty-server/src/workspace/messaging/services/refresh-access-token.service.ts rename to packages/twenty-server/src/workspace/messaging/services/gmail-refresh-access-token.service.ts index 887b6c7afb..fcfbb45c6d 100644 --- a/packages/twenty-server/src/workspace/messaging/services/refresh-access-token.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-refresh-access-token.service.ts @@ -7,7 +7,7 @@ import { EnvironmentService } from 'src/integrations/environment/environment.ser import { DataSourceService } from 'src/metadata/data-source/data-source.service'; @Injectable() -export class RefreshAccessTokenService { +export class GmailRefreshAccessTokenService { constructor( private readonly environmentService: EnvironmentService, private readonly dataSourceService: DataSourceService, diff --git a/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.service.ts b/packages/twenty-server/src/workspace/messaging/services/messaging-utils.service.ts similarity index 60% rename from packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.service.ts rename to packages/twenty-server/src/workspace/messaging/services/messaging-utils.service.ts index 7363c9e805..7bb0b5c3ca 100644 --- a/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/messaging-utils.service.ts @@ -1,119 +1,48 @@ import { Injectable } from '@nestjs/common'; -import { gmail_v1 } from 'googleapis'; +import { EntityManager, DataSource } from 'typeorm'; import { v4 } from 'uuid'; -import { DataSource, EntityManager } from 'typeorm'; import { TypeORMService } from 'src/database/typeorm/typeorm.service'; import { DataSourceService } from 'src/metadata/data-source/data-source.service'; -import { FetchBatchMessagesService } from 'src/workspace/messaging/services/fetch-batch-messages.service'; +import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity'; import { GmailMessage, Recipient, } from 'src/workspace/messaging/types/gmailMessage'; -import { MessageOrThreadQuery } from 'src/workspace/messaging/types/messageOrThreadQuery'; -import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity'; -import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider'; +import { GmailThread } from 'src/workspace/messaging/types/gmailThread'; +import { MessageQuery } from 'src/workspace/messaging/types/messageOrThreadQuery'; @Injectable() -export class FetchWorkspaceMessagesService { +export class MessagingUtilsService { constructor( - private readonly gmailClientProvider: GmailClientProvider, private readonly dataSourceService: DataSourceService, private readonly typeORMService: TypeORMService, - private readonly fetchBatchMessagesService: FetchBatchMessagesService, ) {} - public async fetchConnectedAccountThreads( - workspaceId: string, - connectedAccountId: string, - maxResults = 500, - ): Promise { - const { workspaceDataSource, dataSourceMetadata, connectedAccount } = - await this.getDataSourceMetadataWorkspaceMetadataAndConnectedAccount( - workspaceId, - connectedAccountId, - ); - - const accessToken = connectedAccount.accessToken; - const refreshToken = connectedAccount.refreshToken; - - if (!refreshToken) { - throw new Error('No refresh token found'); - } - - const gmailClient = - await this.gmailClientProvider.getGmailClient(refreshToken); - - const threads = await gmailClient.users.threads.list({ - userId: 'me', - maxResults, - }); - - const threadsData = threads.data.threads; - - if (!threadsData || threadsData?.length === 0) { - return; - } - - const { savedMessageIds, savedThreadIds } = - await this.getAllSavedMessagesIdsAndMessageThreadsIdsForConnectedAccount( - dataSourceMetadata, - workspaceDataSource, - connectedAccount.id, - ); - - const threadsToSave = threadsData.filter( - (thread) => thread.id && !savedThreadIds.includes(thread.id), - ); - - await this.saveMessageThreads( - threadsToSave, - dataSourceMetadata, - workspaceDataSource, - connectedAccount.id, - ); - - const threadQueries: MessageOrThreadQuery[] = threadsData.map((thread) => ({ - uri: '/gmail/v1/users/me/threads/' + thread.id + '?format=minimal', + public createQueriesFromMessageIds( + messageExternalIds: string[], + ): MessageQuery[] { + return messageExternalIds.map((messageId) => ({ + uri: '/gmail/v1/users/me/messages/' + messageId + '?format=RAW', })); - - const threadsWithMessageIds = - await this.fetchBatchMessagesService.fetchAllThreads( - threadQueries, - accessToken, - ); - - const messageIds = threadsWithMessageIds - .map((thread) => thread.messageIds) - .flat(); - - const messageIdsToSave = messageIds.filter( - (messageId) => !savedMessageIds.includes(messageId), - ); - - const messageQueries: MessageOrThreadQuery[] = messageIdsToSave.map( - (messageId) => ({ - uri: '/gmail/v1/users/me/messages/' + messageId + '?format=RAW', - }), - ); - - const messagesResponse = - await this.fetchBatchMessagesService.fetchAllMessages( - messageQueries, - accessToken, - ); - - await this.saveMessages( - messagesResponse, - dataSourceMetadata, - workspaceDataSource, - connectedAccount, - ); } - private async saveMessageThreads( - threads: gmail_v1.Schema$Thread[], + public getThreadsFromMessages(messages: GmailMessage[]): GmailThread[] { + return messages.reduce((acc, message) => { + if (message.externalId === message.messageThreadExternalId) { + acc.push({ + id: message.messageThreadExternalId, + subject: message.subject, + }); + } + + return acc; + }, [] as GmailThread[]); + } + + public async saveMessageThreads( + threads: GmailThread[], dataSourceMetadata: DataSourceEntity, workspaceDataSource: DataSource, connectedAccountId: string, @@ -130,12 +59,12 @@ export class FetchWorkspaceMessagesService { for (const thread of threads) { await workspaceDataSource?.query( `INSERT INTO ${dataSourceMetadata.schema}."messageThread" ("externalId", "subject", "messageChannelId", "visibility") VALUES ($1, $2, $3, $4)`, - [thread.id, thread.snippet, messageChannel[0].id, 'default'], + [thread.id, thread.subject, messageChannel[0].id, 'default'], ); } } - private async saveMessages( + public async saveMessages( messages: GmailMessage[], dataSourceMetadata: DataSourceEntity, workspaceDataSource: DataSource, @@ -146,7 +75,7 @@ export class FetchWorkspaceMessagesService { externalId, headerMessageId, subject, - messageThreadId, + messageThreadExternalId, internalDate, fromHandle, fromDisplayName, @@ -158,7 +87,7 @@ export class FetchWorkspaceMessagesService { const messageThread = await workspaceDataSource?.query( `SELECT * FROM ${dataSourceMetadata.schema}."messageThread" WHERE "externalId" = $1`, - [messageThreadId], + [messageThreadExternalId], ); const messageId = v4(); @@ -219,7 +148,7 @@ export class FetchWorkspaceMessagesService { } } - async saveMessageRecipients( + public async saveMessageRecipients( recipients: Recipient[], dataSourceMetadata: DataSourceEntity, messageId: string, @@ -258,34 +187,70 @@ export class FetchWorkspaceMessagesService { } } - private async getAllSavedMessagesIdsAndMessageThreadsIdsForConnectedAccount( + public async getSavedMessageIdsAndThreadIds( + messageEternalIds: string[], + connectedAccountId: string, dataSourceMetadata: DataSourceEntity, workspaceDataSource: DataSource, - connectedAccountId: string, ): Promise<{ savedMessageIds: string[]; savedThreadIds: string[]; }> { - const messageIds: { messageId: string; messageThreadId: string }[] = - await workspaceDataSource?.query( - `SELECT message."externalId" AS "messageId", - "messageThread"."externalId" AS "messageThreadId" + const messageIdsInDatabase: { + messageExternalId: string; + messageThreadExternalId: string; + }[] = await workspaceDataSource?.query( + `SELECT message."externalId" AS "messageExternalId", + "messageThread"."externalId" AS "messageThreadExternalId" FROM ${dataSourceMetadata.schema}."message" message LEFT JOIN ${dataSourceMetadata.schema}."messageThread" "messageThread" ON message."messageThreadId" = "messageThread"."id" LEFT JOIN ${dataSourceMetadata.schema}."messageChannel" ON "messageThread"."messageChannelId" = ${dataSourceMetadata.schema}."messageChannel"."id" - WHERE ${dataSourceMetadata.schema}."messageChannel"."connectedAccountId" = $1`, - [connectedAccountId], - ); + WHERE ${dataSourceMetadata.schema}."messageChannel"."connectedAccountId" = $1 + AND message."externalId" = ANY($2)`, + [connectedAccountId, messageEternalIds], + ); return { - savedMessageIds: messageIds.map((message) => message.messageId), + savedMessageIds: messageIdsInDatabase.map( + (message) => message.messageExternalId, + ), savedThreadIds: [ - ...new Set(messageIds.map((message) => message.messageThreadId)), + ...new Set( + messageIdsInDatabase.map( + (message) => message.messageThreadExternalId, + ), + ), ], }; } - private async getDataSourceMetadataWorkspaceMetadataAndConnectedAccount( + public async getConnectedAccountsFromWorkspaceId( + workspaceId: string, + ): Promise { + const dataSourceMetadata = + await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( + workspaceId, + ); + + const workspaceDataSource = + await this.typeORMService.connectToDataSource(dataSourceMetadata); + + if (!workspaceDataSource) { + throw new Error('No workspace data source found'); + } + + const connectedAccounts = await workspaceDataSource?.query( + `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail'`, + ); + + if (!connectedAccounts || connectedAccounts.length === 0) { + throw new Error('No connected account found'); + } + + return connectedAccounts; + } + + public async getDataSourceMetadataWorkspaceMetadataAndConnectedAccount( workspaceId: string, connectedAccountId: string, ): Promise<{ @@ -320,4 +285,62 @@ export class FetchWorkspaceMessagesService { connectedAccount: connectedAccounts[0], }; } + + public async saveLastSyncHistoryId( + historyId: string, + connectedAccountId: string, + dataSourceMetadata: DataSourceEntity, + workspaceDataSource: DataSource, + ) { + await workspaceDataSource?.query( + `UPDATE ${dataSourceMetadata.schema}."connectedAccount" SET "lastSyncHistoryId" = $1 WHERE "id" = $2`, + [historyId, connectedAccountId], + ); + } + + public async deleteMessages( + messageIds: string[], + dataSourceMetadata: DataSourceEntity, + workspaceDataSource: DataSource, + ) { + if (!messageIds || messageIds.length === 0) { + return; + } + + await workspaceDataSource?.query( + `DELETE FROM ${dataSourceMetadata.schema}."message" WHERE "externalId" = ANY($1)`, + [messageIds], + ); + } + + public async deleteEmptyThreads( + messageIds: string[], + connectedAccountId: string, + dataSourceMetadata: DataSourceEntity, + workspaceDataSource: DataSource, + ) { + const messageThreadsToDelete = await workspaceDataSource?.query( + `SELECT "messageThread"."id" FROM ${dataSourceMetadata.schema}."messageThread" "messageThread" + LEFT JOIN ${dataSourceMetadata.schema}."message" message ON "messageThread"."id" = message."messageThreadId" + LEFT JOIN ${dataSourceMetadata.schema}."messageChannel" ON "messageThread"."messageChannelId" = ${dataSourceMetadata.schema}."messageChannel"."id" + WHERE "messageThread"."externalId" = ANY($1) + AND ${dataSourceMetadata.schema}."messageChannel"."connectedAccountId" = $2 + GROUP BY "messageThread"."id" + HAVING COUNT(message."id") = 0`, + [messageIds, connectedAccountId], + ); + + if (!messageThreadsToDelete || messageThreadsToDelete.length === 0) { + return; + } + + const messageThreadIdsToDelete = messageThreadsToDelete.map( + (messageThread) => messageThread.id, + ); + + await workspaceDataSource?.query( + `DELETE FROM ${dataSourceMetadata.schema}."messageThread" WHERE "id" = ANY($1)`, + [messageThreadIdsToDelete], + ); + } } diff --git a/packages/twenty-server/src/workspace/messaging/types/gmailMessage.ts b/packages/twenty-server/src/workspace/messaging/types/gmailMessage.ts index ae12483cff..2eeaf66f53 100644 --- a/packages/twenty-server/src/workspace/messaging/types/gmailMessage.ts +++ b/packages/twenty-server/src/workspace/messaging/types/gmailMessage.ts @@ -1,10 +1,11 @@ import { Attachment } from 'mailparser'; export type GmailMessage = { + historyId: string; externalId: string; headerMessageId: string; subject: string; - messageThreadId: string; + messageThreadExternalId: string; internalDate: string; fromHandle: string; fromDisplayName: string; diff --git a/packages/twenty-server/src/workspace/messaging/types/gmailThread.ts b/packages/twenty-server/src/workspace/messaging/types/gmailThread.ts index b1e847f4dd..c42acb3721 100644 --- a/packages/twenty-server/src/workspace/messaging/types/gmailThread.ts +++ b/packages/twenty-server/src/workspace/messaging/types/gmailThread.ts @@ -1,4 +1,4 @@ export type GmailThread = { id: string; - messageIds: string[]; + subject: string; }; diff --git a/packages/twenty-server/src/workspace/messaging/types/gmailThreadParsedResponse.ts b/packages/twenty-server/src/workspace/messaging/types/gmailThreadParsedResponse.ts deleted file mode 100644 index 169f223737..0000000000 --- a/packages/twenty-server/src/workspace/messaging/types/gmailThreadParsedResponse.ts +++ /dev/null @@ -1,14 +0,0 @@ -type Message = { - id: string; - labels: string[]; -}; - -export type GmailThreadParsedResponse = { - id: string; - messages: Message[]; - error?: { - code: number; - message: string; - status: string; - }; -}; diff --git a/packages/twenty-server/src/workspace/messaging/types/messageOrThreadQuery.ts b/packages/twenty-server/src/workspace/messaging/types/messageOrThreadQuery.ts index 93368b56cc..b5ddedb90f 100644 --- a/packages/twenty-server/src/workspace/messaging/types/messageOrThreadQuery.ts +++ b/packages/twenty-server/src/workspace/messaging/types/messageOrThreadQuery.ts @@ -1,3 +1,3 @@ -export type MessageOrThreadQuery = { +export type MessageQuery = { uri: string; };