diff --git a/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages-commands.module.ts b/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages-commands.module.ts index b8b7fce8a2..bad683a35e 100644 --- a/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages-commands.module.ts +++ b/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages-commands.module.ts @@ -6,18 +6,15 @@ import { TypeORMModule } from 'src/database/typeorm/typeorm.module'; import { DataSourceModule } from 'src/metadata/data-source/data-source.module'; import { GmailFullSyncCommand } from 'src/workspace/messaging/commands/gmail-full-sync.command'; import { GmailPartialSyncCommand } from 'src/workspace/messaging/commands/gmail-partial-sync.command'; -import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; +import { ConnectedAccountModule } from 'src/workspace/messaging/connected-account/connected-account.module'; @Module({ imports: [ DataSourceModule, TypeORMModule, TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), + ConnectedAccountModule, ], - providers: [ - GmailFullSyncCommand, - GmailPartialSyncCommand, - MessagingUtilsService, - ], + providers: [GmailFullSyncCommand, GmailPartialSyncCommand], }) export class FetchWorkspaceMessagesCommandsModule {} diff --git a/packages/twenty-server/src/workspace/messaging/commands/gmail-full-sync.command.ts b/packages/twenty-server/src/workspace/messaging/commands/gmail-full-sync.command.ts index 24a5c131cf..81837964cc 100644 --- a/packages/twenty-server/src/workspace/messaging/commands/gmail-full-sync.command.ts +++ b/packages/twenty-server/src/workspace/messaging/commands/gmail-full-sync.command.ts @@ -8,13 +8,13 @@ import { FeatureFlagEntity, FeatureFlagKeys, } from 'src/core/feature-flag/feature-flag.entity'; -import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; import { GmailFullSyncJobData, GmailFullSyncJob, } from 'src/workspace/messaging/jobs/gmail-full-sync.job'; +import { ConnectedAccountService } from 'src/workspace/messaging/connected-account/connected-account.service'; interface GmailFullSyncOptions { workspaceId: string; @@ -26,11 +26,11 @@ interface GmailFullSyncOptions { }) export class GmailFullSyncCommand extends CommandRunner { constructor( - private readonly utils: MessagingUtilsService, @InjectRepository(FeatureFlagEntity, 'core') private readonly featureFlagRepository: Repository, @Inject(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, + private readonly connectedAccountService: ConnectedAccountService, ) { super(); } @@ -64,13 +64,8 @@ export class GmailFullSyncCommand extends CommandRunner { } private async fetchWorkspaceMessages(workspaceId: string): Promise { - const { workspaceDataSource, dataSourceMetadata } = - await this.utils.getDataSourceMetadataWorkspaceMetadata(workspaceId); - - const connectedAccounts = await this.utils.getConnectedAccounts( - dataSourceMetadata, - workspaceDataSource, - ); + const connectedAccounts = + await this.connectedAccountService.getAll(workspaceId); for (const connectedAccount of connectedAccounts) { await this.messageQueueService.add( diff --git a/packages/twenty-server/src/workspace/messaging/commands/gmail-partial-sync.command.ts b/packages/twenty-server/src/workspace/messaging/commands/gmail-partial-sync.command.ts index 1119d263b9..ddadd8c254 100644 --- a/packages/twenty-server/src/workspace/messaging/commands/gmail-partial-sync.command.ts +++ b/packages/twenty-server/src/workspace/messaging/commands/gmail-partial-sync.command.ts @@ -8,13 +8,13 @@ import { FeatureFlagEntity, FeatureFlagKeys, } from 'src/core/feature-flag/feature-flag.entity'; -import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; import { GmailPartialSyncJob, GmailPartialSyncJobData, } from 'src/workspace/messaging/jobs/gmail-partial-sync.job'; +import { ConnectedAccountService } from 'src/workspace/messaging/connected-account/connected-account.service'; interface GmailPartialSyncOptions { workspaceId: string; @@ -26,11 +26,11 @@ interface GmailPartialSyncOptions { }) export class GmailPartialSyncCommand extends CommandRunner { constructor( - private readonly utils: MessagingUtilsService, @InjectRepository(FeatureFlagEntity, 'core') private readonly featureFlagRepository: Repository, @Inject(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, + private readonly connectedAccountService: ConnectedAccountService, ) { super(); } @@ -64,13 +64,8 @@ export class GmailPartialSyncCommand extends CommandRunner { } private async fetchWorkspaceMessages(workspaceId: string): Promise { - const { workspaceDataSource, dataSourceMetadata } = - await this.utils.getDataSourceMetadataWorkspaceMetadata(workspaceId); - - const connectedAccounts = await this.utils.getConnectedAccounts( - dataSourceMetadata, - workspaceDataSource, - ); + const connectedAccounts = + await this.connectedAccountService.getAll(workspaceId); for (const connectedAccount of connectedAccounts) { await this.messageQueueService.add( diff --git a/packages/twenty-server/src/workspace/messaging/connected-account/connected-account.module.ts b/packages/twenty-server/src/workspace/messaging/connected-account/connected-account.module.ts new file mode 100644 index 0000000000..376c7c6fed --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/connected-account/connected-account.module.ts @@ -0,0 +1,11 @@ +import { Module } from '@nestjs/common'; + +import { ConnectedAccountService } from 'src/workspace/messaging/connected-account/connected-account.service'; +import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module'; + +@Module({ + imports: [WorkspaceDataSourceModule], + providers: [ConnectedAccountService], + exports: [ConnectedAccountService], +}) +export class ConnectedAccountModule {} diff --git a/packages/twenty-server/src/workspace/messaging/connected-account/connected-account.service.ts b/packages/twenty-server/src/workspace/messaging/connected-account/connected-account.service.ts new file mode 100644 index 0000000000..0a7ef2278f --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/connected-account/connected-account.service.ts @@ -0,0 +1,61 @@ +import { Injectable } from '@nestjs/common'; + +import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; +import { ConnectedAccountObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/connected-account.object-metadata'; +import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record'; + +@Injectable() +export class ConnectedAccountService { + constructor( + private readonly workspaceDataSourceService: WorkspaceDataSourceService, + ) {} + + public async getAll( + workspaceId: string, + ): Promise[]> { + const { dataSource: workspaceDataSource, dataSourceMetadata } = + await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( + workspaceId, + ); + + return await workspaceDataSource?.query( + `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'google'`, + ); + } + + public async getByIdOrFail( + connectedAccountId: string, + workspaceId: string, + ): Promise> { + const { dataSource: workspaceDataSource, dataSourceMetadata } = + await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( + workspaceId, + ); + const connectedAccounts = await workspaceDataSource?.query( + `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "id" = $1 LIMIT 1`, + [connectedAccountId], + ); + + if (!connectedAccounts || connectedAccounts.length === 0) { + throw new Error('No connected account found'); + } + + return connectedAccounts[0]; + } + + public async saveLastSyncHistoryId( + historyId: string, + connectedAccountId: string, + workspaceId: string, + ) { + const { dataSource: workspaceDataSource, dataSourceMetadata } = + await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( + workspaceId, + ); + + await workspaceDataSource?.query( + `UPDATE ${dataSourceMetadata.schema}."connectedAccount" SET "lastSyncHistoryId" = $1 WHERE "id" = $2`, + [historyId, connectedAccountId], + ); + } +} diff --git a/packages/twenty-server/src/workspace/messaging/message-channel-message-association/message-channel-message-assocation.module.ts b/packages/twenty-server/src/workspace/messaging/message-channel-message-association/message-channel-message-assocation.module.ts new file mode 100644 index 0000000000..6a6b22d973 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/message-channel-message-association/message-channel-message-assocation.module.ts @@ -0,0 +1,11 @@ +import { Module } from '@nestjs/common'; + +import { MessageChannelMessageAssociationService } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-association.service'; +import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module'; + +@Module({ + imports: [WorkspaceDataSourceModule], + providers: [MessageChannelMessageAssociationService], + exports: [MessageChannelMessageAssociationService], +}) +export class MessageChannelMessageAssociationModule {} diff --git a/packages/twenty-server/src/workspace/messaging/message-channel-message-association/message-channel-message-association.service.ts b/packages/twenty-server/src/workspace/messaging/message-channel-message-association/message-channel-message-association.service.ts new file mode 100644 index 0000000000..96168d6682 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/message-channel-message-association/message-channel-message-association.service.ts @@ -0,0 +1,116 @@ +import { Injectable } from '@nestjs/common'; + +import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; +import { MessageChannelMessageAssociationObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/message-channel-message-association.object-metadata'; +import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record'; + +@Injectable() +export class MessageChannelMessageAssociationService { + constructor( + private readonly workspaceDataSourceService: WorkspaceDataSourceService, + ) {} + + public async getByMessageExternalIdsAndMessageChannelId( + messageExternalIds: string[], + messageChannelId: string, + workspaceId: string, + ): Promise[]> { + const { dataSource: workspaceDataSource, dataSourceMetadata } = + await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( + workspaceId, + ); + + return await workspaceDataSource?.query( + `SELECT * FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" + WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`, + [messageExternalIds, messageChannelId], + ); + } + + public async countByMessageExternalIdsAndMessageChannelId( + messageExternalIds: string[], + messageChannelId: string, + workspaceId: string, + ): Promise { + const { dataSource: workspaceDataSource, dataSourceMetadata } = + await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( + workspaceId, + ); + + const result = await workspaceDataSource?.query( + `SELECT COUNT(*) FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" + WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`, + [messageExternalIds, messageChannelId], + ); + + return result[0]?.count; + } + + public async deleteByMessageExternalIdsAndMessageChannelId( + messageExternalIds: string[], + messageChannelId: string, + workspaceId: string, + ) { + const { dataSource: workspaceDataSource, dataSourceMetadata } = + await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( + workspaceId, + ); + + await workspaceDataSource?.query( + `DELETE FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`, + [messageExternalIds, messageChannelId], + ); + } + + public async getByMessageThreadExternalIds( + messageThreadExternalIds: string[], + workspaceId: string, + ): Promise[]> { + const { dataSource: workspaceDataSource, dataSourceMetadata } = + await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( + workspaceId, + ); + + return await workspaceDataSource?.query( + `SELECT * FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" + WHERE "messageThreadExternalId" = ANY($1)`, + [messageThreadExternalIds], + ); + } + + public async getFirstByMessageThreadExternalId( + messageThreadExternalId: string, + workspaceId: string, + ): Promise | null> { + const existingMessageChannelMessageAssociations = + await this.getByMessageThreadExternalIds( + [messageThreadExternalId], + workspaceId, + ); + + if ( + !existingMessageChannelMessageAssociations || + existingMessageChannelMessageAssociations.length === 0 + ) { + return null; + } + + return existingMessageChannelMessageAssociations[0]; + } + + public async getByMessageIds( + messageIds: string[], + workspaceId: string, + ): Promise[]> { + const { dataSource: workspaceDataSource, dataSourceMetadata } = + await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( + workspaceId, + ); + + return await workspaceDataSource?.query( + `SELECT * FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" + WHERE "messageId" = ANY($1)`, + [messageIds], + ); + } +} diff --git a/packages/twenty-server/src/workspace/messaging/message-channel/message-channel.module.ts b/packages/twenty-server/src/workspace/messaging/message-channel/message-channel.module.ts new file mode 100644 index 0000000000..ef8d77674b --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/message-channel/message-channel.module.ts @@ -0,0 +1,11 @@ +import { Module } from '@nestjs/common'; + +import { MessageChannelService } from 'src/workspace/messaging/message-channel/message-channel.service'; +import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module'; + +@Module({ + imports: [WorkspaceDataSourceModule], + providers: [MessageChannelService], + exports: [MessageChannelService], +}) +export class MessageChannelModule {} diff --git a/packages/twenty-server/src/workspace/messaging/message-channel/message-channel.service.ts b/packages/twenty-server/src/workspace/messaging/message-channel/message-channel.service.ts new file mode 100644 index 0000000000..0749f1ce62 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/message-channel/message-channel.service.ts @@ -0,0 +1,43 @@ +import { Injectable } from '@nestjs/common'; + +import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; +import { MessageChannelObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/message-channel.object-metadata'; +import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record'; + +@Injectable() +export class MessageChannelService { + constructor( + private readonly workspaceDataSourceService: WorkspaceDataSourceService, + ) {} + + public async getByConnectedAccountId( + workspaceId: string, + connectedAccountId: string, + ): Promise[]> { + const { dataSource: workspaceDataSource, dataSourceMetadata } = + await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( + workspaceId, + ); + + return await workspaceDataSource?.query( + `SELECT * FROM ${dataSourceMetadata.schema}."messageChannel" WHERE "connectedAccountId" = $1 AND "type" = 'email' LIMIT 1`, + [connectedAccountId], + ); + } + + public async getFirstByConnectedAccountIdOrFail( + workspaceId: string, + connectedAccountId: string, + ): Promise> { + const messageChannels = await this.getByConnectedAccountId( + workspaceId, + connectedAccountId, + ); + + if (!messageChannels || messageChannels.length === 0) { + throw new Error('No message channel found'); + } + + return messageChannels[0]; + } +} diff --git a/packages/twenty-server/src/workspace/messaging/message/message.module.ts b/packages/twenty-server/src/workspace/messaging/message/message.module.ts new file mode 100644 index 0000000000..b9380587f6 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/message/message.module.ts @@ -0,0 +1,11 @@ +import { Module } from '@nestjs/common'; + +import { MessageService } from 'src/workspace/messaging/message/message.service'; +import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module'; + +@Module({ + imports: [WorkspaceDataSourceModule], + providers: [MessageService], + exports: [MessageService], +}) +export class MessageModule {} diff --git a/packages/twenty-server/src/workspace/messaging/message/message.service.ts b/packages/twenty-server/src/workspace/messaging/message/message.service.ts new file mode 100644 index 0000000000..d5a444d268 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/message/message.service.ts @@ -0,0 +1,63 @@ +import { Injectable } from '@nestjs/common'; + +import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; +import { MessageObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/message.object-metadata'; +import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record'; + +@Injectable() +export class MessageService { + constructor( + private readonly workspaceDataSourceService: WorkspaceDataSourceService, + ) {} + + public async getFirstByHeaderMessageId( + workspaceId: string, + headerMessageId: string, + ): Promise | null> { + const { dataSource: workspaceDataSource, dataSourceMetadata } = + await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( + workspaceId, + ); + + const messages = await workspaceDataSource?.query( + `SELECT * FROM ${dataSourceMetadata.schema}."message" WHERE "headerMessageId" = $1 LIMIT 1`, + [headerMessageId], + ); + + if (!messages || messages.length === 0) { + return null; + } + + return messages[0]; + } + + public async getByIds( + workspaceId: string, + messageIds: string[], + ): Promise[]> { + const { dataSource: workspaceDataSource, dataSourceMetadata } = + await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( + workspaceId, + ); + + return await workspaceDataSource?.query( + `SELECT * FROM ${dataSourceMetadata.schema}."message" WHERE "id" = ANY($1)`, + [messageIds], + ); + } + + public async deleteByIds( + workspaceId: string, + messageIds: string[], + ): Promise { + const { dataSource: workspaceDataSource, dataSourceMetadata } = + await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( + workspaceId, + ); + + await workspaceDataSource?.query( + `DELETE FROM ${dataSourceMetadata.schema}."message" WHERE "id" = ANY($1)`, + [messageIds], + ); + } +} diff --git a/packages/twenty-server/src/workspace/messaging/services/fetch-messages-by-batches.service.ts b/packages/twenty-server/src/workspace/messaging/services/fetch-messages-by-batches.service.ts index c776868260..66006a906d 100644 --- a/packages/twenty-server/src/workspace/messaging/services/fetch-messages-by-batches.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/fetch-messages-by-batches.service.ts @@ -7,9 +7,9 @@ import planer from 'planer'; import { GmailMessage, Participant, -} from 'src/workspace/messaging/types/gmailMessage'; -import { MessageQuery } from 'src/workspace/messaging/types/messageOrThreadQuery'; -import { GmailMessageParsedResponse } from 'src/workspace/messaging/types/gmailMessageParsedResponse'; +} from 'src/workspace/messaging/types/gmail-message'; +import { MessageQuery } from 'src/workspace/messaging/types/message-or-thread-query'; +import { GmailMessageParsedResponse } from 'src/workspace/messaging/types/gmail-message-parsed-response'; @Injectable() export class FetchMessagesByBatchesService { diff --git a/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.module.ts b/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.module.ts index 7238712fa4..92994bd4af 100644 --- a/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.module.ts +++ b/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.module.ts @@ -1,17 +1,27 @@ import { Module } from '@nestjs/common'; -import { TypeORMModule } from 'src/database/typeorm/typeorm.module'; import { EnvironmentModule } from 'src/integrations/environment/environment.module'; -import { DataSourceModule } from 'src/metadata/data-source/data-source.module'; +import { ConnectedAccountModule } from 'src/workspace/messaging/connected-account/connected-account.module'; +import { MessageChannelMessageAssociationModule } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-assocation.module'; +import { MessageChannelModule } from 'src/workspace/messaging/message-channel/message-channel.module'; +import { MessageModule } from 'src/workspace/messaging/message/message.module'; import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider'; import { FetchMessagesByBatchesService } from 'src/workspace/messaging/services/fetch-messages-by-batches.service'; import { GmailFullSyncService } from 'src/workspace/messaging/services/gmail-full-sync.service'; import { GmailPartialSyncService } from 'src/workspace/messaging/services/gmail-partial-sync.service'; import { GmailRefreshAccessTokenService } from 'src/workspace/messaging/services/gmail-refresh-access-token.service'; import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; +import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module'; @Module({ - imports: [TypeORMModule, DataSourceModule, EnvironmentModule], + imports: [ + EnvironmentModule, + WorkspaceDataSourceModule, + ConnectedAccountModule, + MessageChannelModule, + MessageChannelMessageAssociationModule, + MessageModule, + ], providers: [ GmailFullSyncService, GmailPartialSyncService, diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts index bfdf62f22f..816c5fec72 100644 --- a/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts @@ -9,6 +9,10 @@ import { GmailFullSyncJobData, GmailFullSyncJob, } from 'src/workspace/messaging/jobs/gmail-full-sync.job'; +import { ConnectedAccountService } from 'src/workspace/messaging/connected-account/connected-account.service'; +import { MessageChannelService } from 'src/workspace/messaging/message-channel/message-channel.service'; +import { MessageChannelMessageAssociationService } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-association.service'; +import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; @Injectable() export class GmailFullSyncService { @@ -18,6 +22,10 @@ export class GmailFullSyncService { private readonly utils: MessagingUtilsService, @Inject(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, + private readonly workspaceDataSourceService: WorkspaceDataSourceService, + private readonly connectedAccountService: ConnectedAccountService, + private readonly messageChannelService: MessageChannelService, + private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService, ) {} public async fetchConnectedAccountThreads( @@ -25,13 +33,14 @@ export class GmailFullSyncService { connectedAccountId: string, nextPageToken?: string, ): Promise { - const { workspaceDataSource, dataSourceMetadata } = - await this.utils.getDataSourceMetadataWorkspaceMetadata(workspaceId); + const { dataSource: workspaceDataSource, dataSourceMetadata } = + await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( + workspaceId, + ); - const connectedAccount = await this.utils.getConnectedAcountByIdOrFail( + const connectedAccount = await this.connectedAccountService.getByIdOrFail( connectedAccountId, - dataSourceMetadata, - workspaceDataSource, + workspaceId, ); const accessToken = connectedAccount.accessToken; @@ -41,18 +50,13 @@ export class GmailFullSyncService { throw new Error('No refresh token found'); } - const gmailMessageChannel = await workspaceDataSource?.query( - `SELECT * FROM ${dataSourceMetadata.schema}."messageChannel" WHERE "connectedAccountId" = $1 AND "type" = 'email' LIMIT 1`, - [connectedAccountId], - ); - - if (!gmailMessageChannel.length) { - throw new Error( - `No gmail message channel found for connected account ${connectedAccountId}`, + const gmailMessageChannel = + await this.messageChannelService.getFirstByConnectedAccountIdOrFail( + workspaceId, + connectedAccountId, ); - } - const gmailMessageChannelId = gmailMessageChannel[0].id; + const gmailMessageChannelId = gmailMessageChannel.id; const gmailClient = await this.gmailClientProvider.getGmailClient(refreshToken); @@ -74,11 +78,10 @@ export class GmailFullSyncService { } const existingMessageChannelMessageAssociations = - await this.utils.getMessageChannelMessageAssociations( + await this.messageChannelMessageAssociationService.getByMessageExternalIdsAndMessageChannelId( messageExternalIds, gmailMessageChannelId, - dataSourceMetadata, - workspaceDataSource, + workspaceId, ); const existingMessageChannelMessageAssociationsExternalIds = @@ -113,6 +116,7 @@ export class GmailFullSyncService { workspaceDataSource, connectedAccount, gmailMessageChannelId, + workspaceId, ); if (errors.length) throw new Error('Error fetching messages'); @@ -125,11 +129,10 @@ export class GmailFullSyncService { if (!historyId) throw new Error('No history id found'); - await this.utils.saveLastSyncHistoryId( + await this.connectedAccountService.saveLastSyncHistoryId( historyId, connectedAccount.id, - dataSourceMetadata, - workspaceDataSource, + workspaceId, ); if (messages.data.nextPageToken) { diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts index 1f1c1f29fc..f816c0e10b 100644 --- a/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts @@ -11,6 +11,9 @@ import { GmailFullSyncJob, GmailFullSyncJobData, } from 'src/workspace/messaging/jobs/gmail-full-sync.job'; +import { ConnectedAccountService } from 'src/workspace/messaging/connected-account/connected-account.service'; +import { MessageChannelService } from 'src/workspace/messaging/message-channel/message-channel.service'; +import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; @Injectable() export class GmailPartialSyncService { @@ -20,21 +23,20 @@ export class GmailPartialSyncService { private readonly utils: MessagingUtilsService, @Inject(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, + private readonly workspaceDataSourceService: WorkspaceDataSourceService, + private readonly connectedAccountService: ConnectedAccountService, + private readonly messageChannelService: MessageChannelService, ) {} - private async getHistory( + private async getHistoryFromGmail( workspaceId: string, connectedAccountId: string, lastSyncHistoryId: string, maxResults: number, ) { - const { workspaceDataSource, dataSourceMetadata } = - await this.utils.getDataSourceMetadataWorkspaceMetadata(workspaceId); - - const connectedAccount = await this.utils.getConnectedAcountByIdOrFail( + const connectedAccount = await this.connectedAccountService.getByIdOrFail( connectedAccountId, - dataSourceMetadata, - workspaceDataSource, + workspaceId, ); const gmailClient = await this.gmailClientProvider.getGmailClient( @@ -56,13 +58,14 @@ export class GmailPartialSyncService { connectedAccountId: string, maxResults = 500, ): Promise { - const { workspaceDataSource, dataSourceMetadata } = - await this.utils.getDataSourceMetadataWorkspaceMetadata(workspaceId); + const { dataSource: workspaceDataSource, dataSourceMetadata } = + await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( + workspaceId, + ); - const connectedAccount = await this.utils.getConnectedAcountByIdOrFail( + const connectedAccount = await this.connectedAccountService.getByIdOrFail( connectedAccountId, - dataSourceMetadata, - workspaceDataSource, + workspaceId, ); const lastSyncHistoryId = connectedAccount.lastSyncHistoryId; @@ -88,7 +91,7 @@ export class GmailPartialSyncService { throw new Error('No refresh token found'); } - const history = await this.getHistory( + const history = await this.getHistoryFromGmail( workspaceId, connectedAccountId, lastSyncHistoryId, @@ -106,28 +109,22 @@ export class GmailPartialSyncService { } if (!history.history) { - await this.utils.saveLastSyncHistoryId( + await this.connectedAccountService.saveLastSyncHistoryId( historyId, connectedAccountId, - dataSourceMetadata, - workspaceDataSource, + workspaceId, ); return; } - const gmailMessageChannel = await workspaceDataSource?.query( - `SELECT * FROM ${dataSourceMetadata.schema}."messageChannel" WHERE "connectedAccountId" = $1 AND "type" = 'email' LIMIT 1`, - [connectedAccountId], - ); - - if (!gmailMessageChannel.length) { - throw new Error( - `No gmail message channel found for connected account ${connectedAccountId}`, + const gmailMessageChannel = + await this.messageChannelService.getFirstByConnectedAccountIdOrFail( + workspaceId, + connectedAccountId, ); - } - const gmailMessageChannelId = gmailMessageChannel[0].id; + const gmailMessageChannelId = gmailMessageChannel.id; const { messagesAdded, messagesDeleted } = await this.getMessageIdsFromHistory(history); @@ -147,22 +144,21 @@ export class GmailPartialSyncService { workspaceDataSource, connectedAccount, gmailMessageChannelId, + workspaceId, ); - await this.utils.deleteMessageChannelMessageAssociations( + await this.utils.deleteMessages( messagesDeleted, gmailMessageChannelId, - dataSourceMetadata, - workspaceDataSource, + workspaceId, ); if (errors.length) throw new Error('Error fetching messages'); - await this.utils.saveLastSyncHistoryId( + await this.connectedAccountService.saveLastSyncHistoryId( historyId, connectedAccount.id, - dataSourceMetadata, - workspaceDataSource, + workspaceId, ); } diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-refresh-access-token.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-refresh-access-token.service.ts index cd08d00cbc..faca5245d5 100644 --- a/packages/twenty-server/src/workspace/messaging/services/gmail-refresh-access-token.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-refresh-access-token.service.ts @@ -2,30 +2,25 @@ import { Injectable } from '@nestjs/common'; import axios from 'axios'; -import { TypeORMService } from 'src/database/typeorm/typeorm.service'; import { EnvironmentService } from 'src/integrations/environment/environment.service'; -import { DataSourceService } from 'src/metadata/data-source/data-source.service'; +import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; @Injectable() export class GmailRefreshAccessTokenService { constructor( private readonly environmentService: EnvironmentService, - private readonly dataSourceService: DataSourceService, - private readonly typeORMService: TypeORMService, + private readonly workspaceDataSourceService: WorkspaceDataSourceService, ) {} async refreshAndSaveAccessToken( workspaceId: string, connectedAccountId: string, ): Promise { - const dataSourceMetadata = - await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( + const { dataSource: workspaceDataSource, dataSourceMetadata } = + await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( workspaceId, ); - const workspaceDataSource = - await this.typeORMService.connectToDataSource(dataSourceMetadata); - if (!workspaceDataSource) { throw new Error('No workspace data source found'); } diff --git a/packages/twenty-server/src/workspace/messaging/services/messaging-utils.service.ts b/packages/twenty-server/src/workspace/messaging/services/messaging-utils.service.ts index 722414a14b..bc92959b10 100644 --- a/packages/twenty-server/src/workspace/messaging/services/messaging-utils.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/messaging-utils.service.ts @@ -3,20 +3,20 @@ import { Injectable } from '@nestjs/common'; import { EntityManager, DataSource } from 'typeorm'; import { v4 } from 'uuid'; -import { TypeORMService } from 'src/database/typeorm/typeorm.service'; -import { DataSourceService } from 'src/metadata/data-source/data-source.service'; import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity'; import { GmailMessage, Participant, -} from 'src/workspace/messaging/types/gmailMessage'; -import { MessageQuery } from 'src/workspace/messaging/types/messageOrThreadQuery'; +} from 'src/workspace/messaging/types/gmail-message'; +import { MessageQuery } from 'src/workspace/messaging/types/message-or-thread-query'; +import { MessageChannelMessageAssociationService } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-association.service'; +import { MessageService } from 'src/workspace/messaging/message/message.service'; @Injectable() export class MessagingUtilsService { constructor( - private readonly dataSourceService: DataSourceService, - private readonly typeORMService: TypeORMService, + private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService, + private readonly messageService: MessageService, ) {} public createQueriesFromMessageIds( @@ -33,16 +33,18 @@ export class MessagingUtilsService { workspaceDataSource: DataSource, connectedAccount, gmailMessageChannelId: string, + workspaceId: string, ) { for (const message of messages) { await workspaceDataSource?.transaction(async (manager) => { - const existingMessageChannelMessageAssociations = await manager.query( - `SELECT COUNT(*) FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" - WHERE "messageExternalId" = $1 AND "messageChannelId" = $2`, - [message.externalId, gmailMessageChannelId], - ); + const existingMessageChannelMessageAssociationsCount = + await this.messageChannelMessageAssociationService.countByMessageExternalIdsAndMessageChannelId( + [message.externalId], + gmailMessageChannelId, + workspaceId, + ); - if (existingMessageChannelMessageAssociations[0]?.count > 0) { + if (existingMessageChannelMessageAssociationsCount > 0) { return; } @@ -50,7 +52,8 @@ export class MessagingUtilsService { await this.saveMessageThreadOrReturnExistingMessageThread( message.messageThreadExternalId, dataSourceMetadata, - workspaceDataSource, + manager, + workspaceId, ); const savedOrExistingMessageId = @@ -60,6 +63,7 @@ export class MessagingUtilsService { connectedAccount, dataSourceMetadata, manager, + workspaceId, ); await manager.query( @@ -82,12 +86,13 @@ export class MessagingUtilsService { connectedAccount, dataSourceMetadata: DataSourceEntity, manager: EntityManager, + workspaceId: string, ): Promise { - const existingMessages = await manager.query( - `SELECT "message"."id" FROM ${dataSourceMetadata.schema}."message" WHERE ${dataSourceMetadata.schema}."message"."headerMessageId" = $1 LIMIT 1`, - [message.headerMessageId], + const existingMessage = await this.messageService.getFirstByHeaderMessageId( + workspaceId, + message.headerMessageId, ); - const existingMessageId: string = existingMessages[0]?.id; + const existingMessageId = existingMessage?.id; if (existingMessageId) { return Promise.resolve(existingMessageId); @@ -127,14 +132,17 @@ export class MessagingUtilsService { private async saveMessageThreadOrReturnExistingMessageThread( messageThreadExternalId: string, dataSourceMetadata: DataSourceEntity, - workspaceDataSource: DataSource, + manager: EntityManager, + workspaceId: string, ) { - const existingMessageThreads = await workspaceDataSource?.query( - `SELECT "messageChannelMessageAssociation"."messageThreadId" FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" WHERE "messageThreadExternalId" = $1 LIMIT 1`, - [messageThreadExternalId], - ); + const existingMessageChannelMessageAssociationByMessageThreadExternalId = + await this.messageChannelMessageAssociationService.getFirstByMessageThreadExternalId( + messageThreadExternalId, + workspaceId, + ); - const existingMessageThread = existingMessageThreads[0]?.messageThreadId; + const existingMessageThread = + existingMessageChannelMessageAssociationByMessageThreadExternalId?.messageThreadId; if (existingMessageThread) { return Promise.resolve(existingMessageThread); @@ -142,7 +150,7 @@ export class MessagingUtilsService { const newMessageThreadId = v4(); - await workspaceDataSource?.query( + await manager.query( `INSERT INTO ${dataSourceMetadata.schema}."messageThread" ("id") VALUES ($1)`, [newMessageThreadId], ); @@ -190,95 +198,50 @@ export class MessagingUtilsService { } } - public async deleteMessageChannelMessageAssociations( - messageExternalIds: string[], - connectedAccountId: string, - dataSourceMetadata: DataSourceEntity, - workspaceDataSource: DataSource, - ) { - await workspaceDataSource?.query( - `DELETE FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`, - [messageExternalIds, connectedAccountId], - ); - } - - public async getConnectedAccounts( - dataSourceMetadata: DataSourceEntity, - workspaceDataSource: DataSource, - ): Promise { - const connectedAccounts = await workspaceDataSource?.query( - `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'google'`, - ); - - return connectedAccounts; - } - - public async getConnectedAcountByIdOrFail( - connectedAccountId: string, - dataSourceMetadata: DataSourceEntity, - workspaceDataSource: DataSource, - ): Promise { - const connectedAccounts = await workspaceDataSource?.query( - `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "id" = $1`, - [connectedAccountId], - ); - - if (!connectedAccounts || connectedAccounts.length === 0) { - throw new Error('No connected account found'); - } - - return connectedAccounts[0]; - } - - public async getDataSourceMetadataWorkspaceMetadata( + public async deleteMessages( + messagesDeleted: string[], + gmailMessageChannelId: string, workspaceId: string, - ): Promise<{ - dataSourceMetadata: DataSourceEntity; - workspaceDataSource: DataSource; - }> { - const dataSourceMetadata = - await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( + ) { + const messageChannelMessageAssociationsToDelete = + await this.messageChannelMessageAssociationService.getByMessageExternalIdsAndMessageChannelId( + messagesDeleted, + gmailMessageChannelId, workspaceId, ); - const workspaceDataSource = - await this.typeORMService.connectToDataSource(dataSourceMetadata); - - if (!workspaceDataSource) { - throw new Error('No workspace data source found'); - } - - return { - dataSourceMetadata, - workspaceDataSource, - }; - } - - public async saveLastSyncHistoryId( - historyId: string, - connectedAccountId: string, - dataSourceMetadata: DataSourceEntity, - workspaceDataSource: DataSource, - ) { - await workspaceDataSource?.query( - `UPDATE ${dataSourceMetadata.schema}."connectedAccount" SET "lastSyncHistoryId" = $1 WHERE "id" = $2`, - [historyId, connectedAccountId], - ); - } - - public async getMessageChannelMessageAssociations( - messageExternalIds: string[], - gmailMessageChannelId: string, - dataSourceMetadata: DataSourceEntity, - workspaceDataSource: DataSource, - ) { - const existingMessageChannelMessageAssociation = - await workspaceDataSource?.query( - `SELECT * FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" - WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`, - [messageExternalIds, gmailMessageChannelId], + const messageIdsFromMessageChannelMessageAssociationsToDelete = + messageChannelMessageAssociationsToDelete.map( + (messageChannelMessageAssociationToDelete) => + messageChannelMessageAssociationToDelete.messageId, ); - return existingMessageChannelMessageAssociation; + await this.messageChannelMessageAssociationService.deleteByMessageExternalIdsAndMessageChannelId( + messagesDeleted, + gmailMessageChannelId, + workspaceId, + ); + + const messageChannelMessageAssociationByMessageIds = + await this.messageChannelMessageAssociationService.getByMessageIds( + messageIdsFromMessageChannelMessageAssociationsToDelete, + workspaceId, + ); + + const messageIdsFromMessageChannelMessageAssociationByMessageIds = + messageChannelMessageAssociationByMessageIds.map( + (messageChannelMessageAssociation) => + messageChannelMessageAssociation.messageId, + ); + + const messageIdsToDelete = + messageIdsFromMessageChannelMessageAssociationsToDelete.filter( + (messageId) => + !messageIdsFromMessageChannelMessageAssociationByMessageIds.includes( + messageId, + ), + ); + + await this.messageService.deleteByIds(workspaceId, messageIdsToDelete); } } diff --git a/packages/twenty-server/src/workspace/messaging/types/gmailMessageParsedResponse.ts b/packages/twenty-server/src/workspace/messaging/types/gmail-message-parsed-response.ts similarity index 100% rename from packages/twenty-server/src/workspace/messaging/types/gmailMessageParsedResponse.ts rename to packages/twenty-server/src/workspace/messaging/types/gmail-message-parsed-response.ts diff --git a/packages/twenty-server/src/workspace/messaging/types/gmailMessage.ts b/packages/twenty-server/src/workspace/messaging/types/gmail-message.ts similarity index 100% rename from packages/twenty-server/src/workspace/messaging/types/gmailMessage.ts rename to packages/twenty-server/src/workspace/messaging/types/gmail-message.ts diff --git a/packages/twenty-server/src/workspace/messaging/types/gmailThread.ts b/packages/twenty-server/src/workspace/messaging/types/gmail-thread.ts similarity index 100% rename from packages/twenty-server/src/workspace/messaging/types/gmailThread.ts rename to packages/twenty-server/src/workspace/messaging/types/gmail-thread.ts diff --git a/packages/twenty-server/src/workspace/messaging/types/messageOrThreadQuery.ts b/packages/twenty-server/src/workspace/messaging/types/message-or-thread-query.ts similarity index 100% rename from packages/twenty-server/src/workspace/messaging/types/messageOrThreadQuery.ts rename to packages/twenty-server/src/workspace/messaging/types/message-or-thread-query.ts diff --git a/packages/twenty-server/src/workspace/workspace-datasource/workspace-datasource.service.ts b/packages/twenty-server/src/workspace/workspace-datasource/workspace-datasource.service.ts index 8d78c5bdf4..5bc48263f9 100644 --- a/packages/twenty-server/src/workspace/workspace-datasource/workspace-datasource.service.ts +++ b/packages/twenty-server/src/workspace/workspace-datasource/workspace-datasource.service.ts @@ -22,6 +22,15 @@ export class WorkspaceDataSourceService { public async connectToWorkspaceDataSource( workspaceId: string, ): Promise { + const { dataSource } = + await this.connectedToWorkspaceDataSourceAndReturnMetadata(workspaceId); + + return dataSource; + } + + public async connectedToWorkspaceDataSourceAndReturnMetadata( + workspaceId: string, + ) { const dataSourceMetadata = await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( workspaceId, @@ -36,7 +45,7 @@ export class WorkspaceDataSourceService { ); } - return dataSource; + return { dataSource, dataSourceMetadata }; } /** diff --git a/packages/twenty-server/src/workspace/workspace-sync-metadata/types/object-record.ts b/packages/twenty-server/src/workspace/workspace-sync-metadata/types/object-record.ts new file mode 100644 index 0000000000..df388ea032 --- /dev/null +++ b/packages/twenty-server/src/workspace/workspace-sync-metadata/types/object-record.ts @@ -0,0 +1,10 @@ +import { BaseObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/base.object-metadata'; + +// Note: This is actually not enterely correct, id field should only be added if the relation is MANY_TO_ONE or ONE_TO_ONE +export type ObjectRecord = { + [K in keyof T as T[K] extends BaseObjectMetadata + ? `${Extract}Id` + : K]: T[K] extends BaseObjectMetadata ? string : T[K]; +} & { + [K in keyof T]: T[K] extends BaseObjectMetadata ? ObjectRecord : T[K]; +};