Delete message when no more association (#3701)

* Delete message when no more association

* remove unused injections

* rename methods

* fix after review
This commit is contained in:
Weiko 2024-01-30 17:58:36 +01:00 committed by GitHub
parent 8b9d62e425
commit 64b2ef3dc2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 501 additions and 201 deletions

View File

@ -6,18 +6,15 @@ import { TypeORMModule } from 'src/database/typeorm/typeorm.module';
import { DataSourceModule } from 'src/metadata/data-source/data-source.module'; import { DataSourceModule } from 'src/metadata/data-source/data-source.module';
import { GmailFullSyncCommand } from 'src/workspace/messaging/commands/gmail-full-sync.command'; import { GmailFullSyncCommand } from 'src/workspace/messaging/commands/gmail-full-sync.command';
import { GmailPartialSyncCommand } from 'src/workspace/messaging/commands/gmail-partial-sync.command'; import { GmailPartialSyncCommand } from 'src/workspace/messaging/commands/gmail-partial-sync.command';
import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; import { ConnectedAccountModule } from 'src/workspace/messaging/connected-account/connected-account.module';
@Module({ @Module({
imports: [ imports: [
DataSourceModule, DataSourceModule,
TypeORMModule, TypeORMModule,
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
ConnectedAccountModule,
], ],
providers: [ providers: [GmailFullSyncCommand, GmailPartialSyncCommand],
GmailFullSyncCommand,
GmailPartialSyncCommand,
MessagingUtilsService,
],
}) })
export class FetchWorkspaceMessagesCommandsModule {} export class FetchWorkspaceMessagesCommandsModule {}

View File

@ -8,13 +8,13 @@ import {
FeatureFlagEntity, FeatureFlagEntity,
FeatureFlagKeys, FeatureFlagKeys,
} from 'src/core/feature-flag/feature-flag.entity'; } from 'src/core/feature-flag/feature-flag.entity';
import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service';
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
import { import {
GmailFullSyncJobData, GmailFullSyncJobData,
GmailFullSyncJob, GmailFullSyncJob,
} from 'src/workspace/messaging/jobs/gmail-full-sync.job'; } from 'src/workspace/messaging/jobs/gmail-full-sync.job';
import { ConnectedAccountService } from 'src/workspace/messaging/connected-account/connected-account.service';
interface GmailFullSyncOptions { interface GmailFullSyncOptions {
workspaceId: string; workspaceId: string;
@ -26,11 +26,11 @@ interface GmailFullSyncOptions {
}) })
export class GmailFullSyncCommand extends CommandRunner { export class GmailFullSyncCommand extends CommandRunner {
constructor( constructor(
private readonly utils: MessagingUtilsService,
@InjectRepository(FeatureFlagEntity, 'core') @InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>, private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
@Inject(MessageQueue.messagingQueue) @Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService, private readonly messageQueueService: MessageQueueService,
private readonly connectedAccountService: ConnectedAccountService,
) { ) {
super(); super();
} }
@ -64,13 +64,8 @@ export class GmailFullSyncCommand extends CommandRunner {
} }
private async fetchWorkspaceMessages(workspaceId: string): Promise<void> { private async fetchWorkspaceMessages(workspaceId: string): Promise<void> {
const { workspaceDataSource, dataSourceMetadata } = const connectedAccounts =
await this.utils.getDataSourceMetadataWorkspaceMetadata(workspaceId); await this.connectedAccountService.getAll(workspaceId);
const connectedAccounts = await this.utils.getConnectedAccounts(
dataSourceMetadata,
workspaceDataSource,
);
for (const connectedAccount of connectedAccounts) { for (const connectedAccount of connectedAccounts) {
await this.messageQueueService.add<GmailFullSyncJobData>( await this.messageQueueService.add<GmailFullSyncJobData>(

View File

@ -8,13 +8,13 @@ import {
FeatureFlagEntity, FeatureFlagEntity,
FeatureFlagKeys, FeatureFlagKeys,
} from 'src/core/feature-flag/feature-flag.entity'; } from 'src/core/feature-flag/feature-flag.entity';
import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service';
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
import { import {
GmailPartialSyncJob, GmailPartialSyncJob,
GmailPartialSyncJobData, GmailPartialSyncJobData,
} from 'src/workspace/messaging/jobs/gmail-partial-sync.job'; } from 'src/workspace/messaging/jobs/gmail-partial-sync.job';
import { ConnectedAccountService } from 'src/workspace/messaging/connected-account/connected-account.service';
interface GmailPartialSyncOptions { interface GmailPartialSyncOptions {
workspaceId: string; workspaceId: string;
@ -26,11 +26,11 @@ interface GmailPartialSyncOptions {
}) })
export class GmailPartialSyncCommand extends CommandRunner { export class GmailPartialSyncCommand extends CommandRunner {
constructor( constructor(
private readonly utils: MessagingUtilsService,
@InjectRepository(FeatureFlagEntity, 'core') @InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>, private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
@Inject(MessageQueue.messagingQueue) @Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService, private readonly messageQueueService: MessageQueueService,
private readonly connectedAccountService: ConnectedAccountService,
) { ) {
super(); super();
} }
@ -64,13 +64,8 @@ export class GmailPartialSyncCommand extends CommandRunner {
} }
private async fetchWorkspaceMessages(workspaceId: string): Promise<void> { private async fetchWorkspaceMessages(workspaceId: string): Promise<void> {
const { workspaceDataSource, dataSourceMetadata } = const connectedAccounts =
await this.utils.getDataSourceMetadataWorkspaceMetadata(workspaceId); await this.connectedAccountService.getAll(workspaceId);
const connectedAccounts = await this.utils.getConnectedAccounts(
dataSourceMetadata,
workspaceDataSource,
);
for (const connectedAccount of connectedAccounts) { for (const connectedAccount of connectedAccounts) {
await this.messageQueueService.add<GmailPartialSyncJobData>( await this.messageQueueService.add<GmailPartialSyncJobData>(

View File

@ -0,0 +1,11 @@
import { Module } from '@nestjs/common';
import { ConnectedAccountService } from 'src/workspace/messaging/connected-account/connected-account.service';
import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module';
@Module({
imports: [WorkspaceDataSourceModule],
providers: [ConnectedAccountService],
exports: [ConnectedAccountService],
})
export class ConnectedAccountModule {}

View File

@ -0,0 +1,61 @@
import { Injectable } from '@nestjs/common';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import { ConnectedAccountObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/connected-account.object-metadata';
import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record';
@Injectable()
export class ConnectedAccountService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
public async getAll(
workspaceId: string,
): Promise<ObjectRecord<ConnectedAccountObjectMetadata>[]> {
const { dataSource: workspaceDataSource, dataSourceMetadata } =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId,
);
return await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'google'`,
);
}
public async getByIdOrFail(
connectedAccountId: string,
workspaceId: string,
): Promise<ObjectRecord<ConnectedAccountObjectMetadata>> {
const { dataSource: workspaceDataSource, dataSourceMetadata } =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId,
);
const connectedAccounts = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "id" = $1 LIMIT 1`,
[connectedAccountId],
);
if (!connectedAccounts || connectedAccounts.length === 0) {
throw new Error('No connected account found');
}
return connectedAccounts[0];
}
public async saveLastSyncHistoryId(
historyId: string,
connectedAccountId: string,
workspaceId: string,
) {
const { dataSource: workspaceDataSource, dataSourceMetadata } =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId,
);
await workspaceDataSource?.query(
`UPDATE ${dataSourceMetadata.schema}."connectedAccount" SET "lastSyncHistoryId" = $1 WHERE "id" = $2`,
[historyId, connectedAccountId],
);
}
}

View File

@ -0,0 +1,11 @@
import { Module } from '@nestjs/common';
import { MessageChannelMessageAssociationService } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-association.service';
import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module';
@Module({
imports: [WorkspaceDataSourceModule],
providers: [MessageChannelMessageAssociationService],
exports: [MessageChannelMessageAssociationService],
})
export class MessageChannelMessageAssociationModule {}

View File

@ -0,0 +1,116 @@
import { Injectable } from '@nestjs/common';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import { MessageChannelMessageAssociationObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/message-channel-message-association.object-metadata';
import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record';
@Injectable()
export class MessageChannelMessageAssociationService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
public async getByMessageExternalIdsAndMessageChannelId(
messageExternalIds: string[],
messageChannelId: string,
workspaceId: string,
): Promise<ObjectRecord<MessageChannelMessageAssociationObjectMetadata>[]> {
const { dataSource: workspaceDataSource, dataSourceMetadata } =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId,
);
return await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation"
WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`,
[messageExternalIds, messageChannelId],
);
}
public async countByMessageExternalIdsAndMessageChannelId(
messageExternalIds: string[],
messageChannelId: string,
workspaceId: string,
): Promise<number> {
const { dataSource: workspaceDataSource, dataSourceMetadata } =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId,
);
const result = await workspaceDataSource?.query(
`SELECT COUNT(*) FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation"
WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`,
[messageExternalIds, messageChannelId],
);
return result[0]?.count;
}
public async deleteByMessageExternalIdsAndMessageChannelId(
messageExternalIds: string[],
messageChannelId: string,
workspaceId: string,
) {
const { dataSource: workspaceDataSource, dataSourceMetadata } =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId,
);
await workspaceDataSource?.query(
`DELETE FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`,
[messageExternalIds, messageChannelId],
);
}
public async getByMessageThreadExternalIds(
messageThreadExternalIds: string[],
workspaceId: string,
): Promise<ObjectRecord<MessageChannelMessageAssociationObjectMetadata>[]> {
const { dataSource: workspaceDataSource, dataSourceMetadata } =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId,
);
return await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation"
WHERE "messageThreadExternalId" = ANY($1)`,
[messageThreadExternalIds],
);
}
public async getFirstByMessageThreadExternalId(
messageThreadExternalId: string,
workspaceId: string,
): Promise<ObjectRecord<MessageChannelMessageAssociationObjectMetadata> | null> {
const existingMessageChannelMessageAssociations =
await this.getByMessageThreadExternalIds(
[messageThreadExternalId],
workspaceId,
);
if (
!existingMessageChannelMessageAssociations ||
existingMessageChannelMessageAssociations.length === 0
) {
return null;
}
return existingMessageChannelMessageAssociations[0];
}
public async getByMessageIds(
messageIds: string[],
workspaceId: string,
): Promise<ObjectRecord<MessageChannelMessageAssociationObjectMetadata>[]> {
const { dataSource: workspaceDataSource, dataSourceMetadata } =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId,
);
return await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation"
WHERE "messageId" = ANY($1)`,
[messageIds],
);
}
}

View File

@ -0,0 +1,11 @@
import { Module } from '@nestjs/common';
import { MessageChannelService } from 'src/workspace/messaging/message-channel/message-channel.service';
import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module';
@Module({
imports: [WorkspaceDataSourceModule],
providers: [MessageChannelService],
exports: [MessageChannelService],
})
export class MessageChannelModule {}

View File

@ -0,0 +1,43 @@
import { Injectable } from '@nestjs/common';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import { MessageChannelObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/message-channel.object-metadata';
import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record';
@Injectable()
export class MessageChannelService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
public async getByConnectedAccountId(
workspaceId: string,
connectedAccountId: string,
): Promise<ObjectRecord<MessageChannelObjectMetadata>[]> {
const { dataSource: workspaceDataSource, dataSourceMetadata } =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId,
);
return await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."messageChannel" WHERE "connectedAccountId" = $1 AND "type" = 'email' LIMIT 1`,
[connectedAccountId],
);
}
public async getFirstByConnectedAccountIdOrFail(
workspaceId: string,
connectedAccountId: string,
): Promise<ObjectRecord<MessageChannelObjectMetadata>> {
const messageChannels = await this.getByConnectedAccountId(
workspaceId,
connectedAccountId,
);
if (!messageChannels || messageChannels.length === 0) {
throw new Error('No message channel found');
}
return messageChannels[0];
}
}

View File

@ -0,0 +1,11 @@
import { Module } from '@nestjs/common';
import { MessageService } from 'src/workspace/messaging/message/message.service';
import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module';
@Module({
imports: [WorkspaceDataSourceModule],
providers: [MessageService],
exports: [MessageService],
})
export class MessageModule {}

View File

@ -0,0 +1,63 @@
import { Injectable } from '@nestjs/common';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import { MessageObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/message.object-metadata';
import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record';
@Injectable()
export class MessageService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
public async getFirstByHeaderMessageId(
workspaceId: string,
headerMessageId: string,
): Promise<ObjectRecord<MessageObjectMetadata> | null> {
const { dataSource: workspaceDataSource, dataSourceMetadata } =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId,
);
const messages = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."message" WHERE "headerMessageId" = $1 LIMIT 1`,
[headerMessageId],
);
if (!messages || messages.length === 0) {
return null;
}
return messages[0];
}
public async getByIds(
workspaceId: string,
messageIds: string[],
): Promise<ObjectRecord<MessageObjectMetadata>[]> {
const { dataSource: workspaceDataSource, dataSourceMetadata } =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId,
);
return await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."message" WHERE "id" = ANY($1)`,
[messageIds],
);
}
public async deleteByIds(
workspaceId: string,
messageIds: string[],
): Promise<void> {
const { dataSource: workspaceDataSource, dataSourceMetadata } =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId,
);
await workspaceDataSource?.query(
`DELETE FROM ${dataSourceMetadata.schema}."message" WHERE "id" = ANY($1)`,
[messageIds],
);
}
}

View File

@ -7,9 +7,9 @@ import planer from 'planer';
import { import {
GmailMessage, GmailMessage,
Participant, Participant,
} from 'src/workspace/messaging/types/gmailMessage'; } from 'src/workspace/messaging/types/gmail-message';
import { MessageQuery } from 'src/workspace/messaging/types/messageOrThreadQuery'; import { MessageQuery } from 'src/workspace/messaging/types/message-or-thread-query';
import { GmailMessageParsedResponse } from 'src/workspace/messaging/types/gmailMessageParsedResponse'; import { GmailMessageParsedResponse } from 'src/workspace/messaging/types/gmail-message-parsed-response';
@Injectable() @Injectable()
export class FetchMessagesByBatchesService { export class FetchMessagesByBatchesService {

View File

@ -1,17 +1,27 @@
import { Module } from '@nestjs/common'; import { Module } from '@nestjs/common';
import { TypeORMModule } from 'src/database/typeorm/typeorm.module';
import { EnvironmentModule } from 'src/integrations/environment/environment.module'; import { EnvironmentModule } from 'src/integrations/environment/environment.module';
import { DataSourceModule } from 'src/metadata/data-source/data-source.module'; import { ConnectedAccountModule } from 'src/workspace/messaging/connected-account/connected-account.module';
import { MessageChannelMessageAssociationModule } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-assocation.module';
import { MessageChannelModule } from 'src/workspace/messaging/message-channel/message-channel.module';
import { MessageModule } from 'src/workspace/messaging/message/message.module';
import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider'; import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider';
import { FetchMessagesByBatchesService } from 'src/workspace/messaging/services/fetch-messages-by-batches.service'; import { FetchMessagesByBatchesService } from 'src/workspace/messaging/services/fetch-messages-by-batches.service';
import { GmailFullSyncService } from 'src/workspace/messaging/services/gmail-full-sync.service'; import { GmailFullSyncService } from 'src/workspace/messaging/services/gmail-full-sync.service';
import { GmailPartialSyncService } from 'src/workspace/messaging/services/gmail-partial-sync.service'; import { GmailPartialSyncService } from 'src/workspace/messaging/services/gmail-partial-sync.service';
import { GmailRefreshAccessTokenService } from 'src/workspace/messaging/services/gmail-refresh-access-token.service'; import { GmailRefreshAccessTokenService } from 'src/workspace/messaging/services/gmail-refresh-access-token.service';
import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service';
import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module';
@Module({ @Module({
imports: [TypeORMModule, DataSourceModule, EnvironmentModule], imports: [
EnvironmentModule,
WorkspaceDataSourceModule,
ConnectedAccountModule,
MessageChannelModule,
MessageChannelMessageAssociationModule,
MessageModule,
],
providers: [ providers: [
GmailFullSyncService, GmailFullSyncService,
GmailPartialSyncService, GmailPartialSyncService,

View File

@ -9,6 +9,10 @@ import {
GmailFullSyncJobData, GmailFullSyncJobData,
GmailFullSyncJob, GmailFullSyncJob,
} from 'src/workspace/messaging/jobs/gmail-full-sync.job'; } from 'src/workspace/messaging/jobs/gmail-full-sync.job';
import { ConnectedAccountService } from 'src/workspace/messaging/connected-account/connected-account.service';
import { MessageChannelService } from 'src/workspace/messaging/message-channel/message-channel.service';
import { MessageChannelMessageAssociationService } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-association.service';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
@Injectable() @Injectable()
export class GmailFullSyncService { export class GmailFullSyncService {
@ -18,6 +22,10 @@ export class GmailFullSyncService {
private readonly utils: MessagingUtilsService, private readonly utils: MessagingUtilsService,
@Inject(MessageQueue.messagingQueue) @Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService, private readonly messageQueueService: MessageQueueService,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly connectedAccountService: ConnectedAccountService,
private readonly messageChannelService: MessageChannelService,
private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService,
) {} ) {}
public async fetchConnectedAccountThreads( public async fetchConnectedAccountThreads(
@ -25,13 +33,14 @@ export class GmailFullSyncService {
connectedAccountId: string, connectedAccountId: string,
nextPageToken?: string, nextPageToken?: string,
): Promise<void> { ): Promise<void> {
const { workspaceDataSource, dataSourceMetadata } = const { dataSource: workspaceDataSource, dataSourceMetadata } =
await this.utils.getDataSourceMetadataWorkspaceMetadata(workspaceId); await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId,
);
const connectedAccount = await this.utils.getConnectedAcountByIdOrFail( const connectedAccount = await this.connectedAccountService.getByIdOrFail(
connectedAccountId, connectedAccountId,
dataSourceMetadata, workspaceId,
workspaceDataSource,
); );
const accessToken = connectedAccount.accessToken; const accessToken = connectedAccount.accessToken;
@ -41,18 +50,13 @@ export class GmailFullSyncService {
throw new Error('No refresh token found'); throw new Error('No refresh token found');
} }
const gmailMessageChannel = await workspaceDataSource?.query( const gmailMessageChannel =
`SELECT * FROM ${dataSourceMetadata.schema}."messageChannel" WHERE "connectedAccountId" = $1 AND "type" = 'email' LIMIT 1`, await this.messageChannelService.getFirstByConnectedAccountIdOrFail(
[connectedAccountId], workspaceId,
); connectedAccountId,
if (!gmailMessageChannel.length) {
throw new Error(
`No gmail message channel found for connected account ${connectedAccountId}`,
); );
}
const gmailMessageChannelId = gmailMessageChannel[0].id; const gmailMessageChannelId = gmailMessageChannel.id;
const gmailClient = const gmailClient =
await this.gmailClientProvider.getGmailClient(refreshToken); await this.gmailClientProvider.getGmailClient(refreshToken);
@ -74,11 +78,10 @@ export class GmailFullSyncService {
} }
const existingMessageChannelMessageAssociations = const existingMessageChannelMessageAssociations =
await this.utils.getMessageChannelMessageAssociations( await this.messageChannelMessageAssociationService.getByMessageExternalIdsAndMessageChannelId(
messageExternalIds, messageExternalIds,
gmailMessageChannelId, gmailMessageChannelId,
dataSourceMetadata, workspaceId,
workspaceDataSource,
); );
const existingMessageChannelMessageAssociationsExternalIds = const existingMessageChannelMessageAssociationsExternalIds =
@ -113,6 +116,7 @@ export class GmailFullSyncService {
workspaceDataSource, workspaceDataSource,
connectedAccount, connectedAccount,
gmailMessageChannelId, gmailMessageChannelId,
workspaceId,
); );
if (errors.length) throw new Error('Error fetching messages'); if (errors.length) throw new Error('Error fetching messages');
@ -125,11 +129,10 @@ export class GmailFullSyncService {
if (!historyId) throw new Error('No history id found'); if (!historyId) throw new Error('No history id found');
await this.utils.saveLastSyncHistoryId( await this.connectedAccountService.saveLastSyncHistoryId(
historyId, historyId,
connectedAccount.id, connectedAccount.id,
dataSourceMetadata, workspaceId,
workspaceDataSource,
); );
if (messages.data.nextPageToken) { if (messages.data.nextPageToken) {

View File

@ -11,6 +11,9 @@ import {
GmailFullSyncJob, GmailFullSyncJob,
GmailFullSyncJobData, GmailFullSyncJobData,
} from 'src/workspace/messaging/jobs/gmail-full-sync.job'; } from 'src/workspace/messaging/jobs/gmail-full-sync.job';
import { ConnectedAccountService } from 'src/workspace/messaging/connected-account/connected-account.service';
import { MessageChannelService } from 'src/workspace/messaging/message-channel/message-channel.service';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
@Injectable() @Injectable()
export class GmailPartialSyncService { export class GmailPartialSyncService {
@ -20,21 +23,20 @@ export class GmailPartialSyncService {
private readonly utils: MessagingUtilsService, private readonly utils: MessagingUtilsService,
@Inject(MessageQueue.messagingQueue) @Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService, private readonly messageQueueService: MessageQueueService,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly connectedAccountService: ConnectedAccountService,
private readonly messageChannelService: MessageChannelService,
) {} ) {}
private async getHistory( private async getHistoryFromGmail(
workspaceId: string, workspaceId: string,
connectedAccountId: string, connectedAccountId: string,
lastSyncHistoryId: string, lastSyncHistoryId: string,
maxResults: number, maxResults: number,
) { ) {
const { workspaceDataSource, dataSourceMetadata } = const connectedAccount = await this.connectedAccountService.getByIdOrFail(
await this.utils.getDataSourceMetadataWorkspaceMetadata(workspaceId);
const connectedAccount = await this.utils.getConnectedAcountByIdOrFail(
connectedAccountId, connectedAccountId,
dataSourceMetadata, workspaceId,
workspaceDataSource,
); );
const gmailClient = await this.gmailClientProvider.getGmailClient( const gmailClient = await this.gmailClientProvider.getGmailClient(
@ -56,13 +58,14 @@ export class GmailPartialSyncService {
connectedAccountId: string, connectedAccountId: string,
maxResults = 500, maxResults = 500,
): Promise<void> { ): Promise<void> {
const { workspaceDataSource, dataSourceMetadata } = const { dataSource: workspaceDataSource, dataSourceMetadata } =
await this.utils.getDataSourceMetadataWorkspaceMetadata(workspaceId); await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId,
);
const connectedAccount = await this.utils.getConnectedAcountByIdOrFail( const connectedAccount = await this.connectedAccountService.getByIdOrFail(
connectedAccountId, connectedAccountId,
dataSourceMetadata, workspaceId,
workspaceDataSource,
); );
const lastSyncHistoryId = connectedAccount.lastSyncHistoryId; const lastSyncHistoryId = connectedAccount.lastSyncHistoryId;
@ -88,7 +91,7 @@ export class GmailPartialSyncService {
throw new Error('No refresh token found'); throw new Error('No refresh token found');
} }
const history = await this.getHistory( const history = await this.getHistoryFromGmail(
workspaceId, workspaceId,
connectedAccountId, connectedAccountId,
lastSyncHistoryId, lastSyncHistoryId,
@ -106,28 +109,22 @@ export class GmailPartialSyncService {
} }
if (!history.history) { if (!history.history) {
await this.utils.saveLastSyncHistoryId( await this.connectedAccountService.saveLastSyncHistoryId(
historyId, historyId,
connectedAccountId, connectedAccountId,
dataSourceMetadata, workspaceId,
workspaceDataSource,
); );
return; return;
} }
const gmailMessageChannel = await workspaceDataSource?.query( const gmailMessageChannel =
`SELECT * FROM ${dataSourceMetadata.schema}."messageChannel" WHERE "connectedAccountId" = $1 AND "type" = 'email' LIMIT 1`, await this.messageChannelService.getFirstByConnectedAccountIdOrFail(
[connectedAccountId], workspaceId,
); connectedAccountId,
if (!gmailMessageChannel.length) {
throw new Error(
`No gmail message channel found for connected account ${connectedAccountId}`,
); );
}
const gmailMessageChannelId = gmailMessageChannel[0].id; const gmailMessageChannelId = gmailMessageChannel.id;
const { messagesAdded, messagesDeleted } = const { messagesAdded, messagesDeleted } =
await this.getMessageIdsFromHistory(history); await this.getMessageIdsFromHistory(history);
@ -147,22 +144,21 @@ export class GmailPartialSyncService {
workspaceDataSource, workspaceDataSource,
connectedAccount, connectedAccount,
gmailMessageChannelId, gmailMessageChannelId,
workspaceId,
); );
await this.utils.deleteMessageChannelMessageAssociations( await this.utils.deleteMessages(
messagesDeleted, messagesDeleted,
gmailMessageChannelId, gmailMessageChannelId,
dataSourceMetadata, workspaceId,
workspaceDataSource,
); );
if (errors.length) throw new Error('Error fetching messages'); if (errors.length) throw new Error('Error fetching messages');
await this.utils.saveLastSyncHistoryId( await this.connectedAccountService.saveLastSyncHistoryId(
historyId, historyId,
connectedAccount.id, connectedAccount.id,
dataSourceMetadata, workspaceId,
workspaceDataSource,
); );
} }

View File

@ -2,30 +2,25 @@ import { Injectable } from '@nestjs/common';
import axios from 'axios'; import axios from 'axios';
import { TypeORMService } from 'src/database/typeorm/typeorm.service';
import { EnvironmentService } from 'src/integrations/environment/environment.service'; import { EnvironmentService } from 'src/integrations/environment/environment.service';
import { DataSourceService } from 'src/metadata/data-source/data-source.service'; import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
@Injectable() @Injectable()
export class GmailRefreshAccessTokenService { export class GmailRefreshAccessTokenService {
constructor( constructor(
private readonly environmentService: EnvironmentService, private readonly environmentService: EnvironmentService,
private readonly dataSourceService: DataSourceService, private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly typeORMService: TypeORMService,
) {} ) {}
async refreshAndSaveAccessToken( async refreshAndSaveAccessToken(
workspaceId: string, workspaceId: string,
connectedAccountId: string, connectedAccountId: string,
): Promise<void> { ): Promise<void> {
const dataSourceMetadata = const { dataSource: workspaceDataSource, dataSourceMetadata } =
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId, workspaceId,
); );
const workspaceDataSource =
await this.typeORMService.connectToDataSource(dataSourceMetadata);
if (!workspaceDataSource) { if (!workspaceDataSource) {
throw new Error('No workspace data source found'); throw new Error('No workspace data source found');
} }

View File

@ -3,20 +3,20 @@ import { Injectable } from '@nestjs/common';
import { EntityManager, DataSource } from 'typeorm'; import { EntityManager, DataSource } from 'typeorm';
import { v4 } from 'uuid'; import { v4 } from 'uuid';
import { TypeORMService } from 'src/database/typeorm/typeorm.service';
import { DataSourceService } from 'src/metadata/data-source/data-source.service';
import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity'; import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity';
import { import {
GmailMessage, GmailMessage,
Participant, Participant,
} from 'src/workspace/messaging/types/gmailMessage'; } from 'src/workspace/messaging/types/gmail-message';
import { MessageQuery } from 'src/workspace/messaging/types/messageOrThreadQuery'; import { MessageQuery } from 'src/workspace/messaging/types/message-or-thread-query';
import { MessageChannelMessageAssociationService } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-association.service';
import { MessageService } from 'src/workspace/messaging/message/message.service';
@Injectable() @Injectable()
export class MessagingUtilsService { export class MessagingUtilsService {
constructor( constructor(
private readonly dataSourceService: DataSourceService, private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService,
private readonly typeORMService: TypeORMService, private readonly messageService: MessageService,
) {} ) {}
public createQueriesFromMessageIds( public createQueriesFromMessageIds(
@ -33,16 +33,18 @@ export class MessagingUtilsService {
workspaceDataSource: DataSource, workspaceDataSource: DataSource,
connectedAccount, connectedAccount,
gmailMessageChannelId: string, gmailMessageChannelId: string,
workspaceId: string,
) { ) {
for (const message of messages) { for (const message of messages) {
await workspaceDataSource?.transaction(async (manager) => { await workspaceDataSource?.transaction(async (manager) => {
const existingMessageChannelMessageAssociations = await manager.query( const existingMessageChannelMessageAssociationsCount =
`SELECT COUNT(*) FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" await this.messageChannelMessageAssociationService.countByMessageExternalIdsAndMessageChannelId(
WHERE "messageExternalId" = $1 AND "messageChannelId" = $2`, [message.externalId],
[message.externalId, gmailMessageChannelId], gmailMessageChannelId,
); workspaceId,
);
if (existingMessageChannelMessageAssociations[0]?.count > 0) { if (existingMessageChannelMessageAssociationsCount > 0) {
return; return;
} }
@ -50,7 +52,8 @@ export class MessagingUtilsService {
await this.saveMessageThreadOrReturnExistingMessageThread( await this.saveMessageThreadOrReturnExistingMessageThread(
message.messageThreadExternalId, message.messageThreadExternalId,
dataSourceMetadata, dataSourceMetadata,
workspaceDataSource, manager,
workspaceId,
); );
const savedOrExistingMessageId = const savedOrExistingMessageId =
@ -60,6 +63,7 @@ export class MessagingUtilsService {
connectedAccount, connectedAccount,
dataSourceMetadata, dataSourceMetadata,
manager, manager,
workspaceId,
); );
await manager.query( await manager.query(
@ -82,12 +86,13 @@ export class MessagingUtilsService {
connectedAccount, connectedAccount,
dataSourceMetadata: DataSourceEntity, dataSourceMetadata: DataSourceEntity,
manager: EntityManager, manager: EntityManager,
workspaceId: string,
): Promise<string> { ): Promise<string> {
const existingMessages = await manager.query( const existingMessage = await this.messageService.getFirstByHeaderMessageId(
`SELECT "message"."id" FROM ${dataSourceMetadata.schema}."message" WHERE ${dataSourceMetadata.schema}."message"."headerMessageId" = $1 LIMIT 1`, workspaceId,
[message.headerMessageId], message.headerMessageId,
); );
const existingMessageId: string = existingMessages[0]?.id; const existingMessageId = existingMessage?.id;
if (existingMessageId) { if (existingMessageId) {
return Promise.resolve(existingMessageId); return Promise.resolve(existingMessageId);
@ -127,14 +132,17 @@ export class MessagingUtilsService {
private async saveMessageThreadOrReturnExistingMessageThread( private async saveMessageThreadOrReturnExistingMessageThread(
messageThreadExternalId: string, messageThreadExternalId: string,
dataSourceMetadata: DataSourceEntity, dataSourceMetadata: DataSourceEntity,
workspaceDataSource: DataSource, manager: EntityManager,
workspaceId: string,
) { ) {
const existingMessageThreads = await workspaceDataSource?.query( const existingMessageChannelMessageAssociationByMessageThreadExternalId =
`SELECT "messageChannelMessageAssociation"."messageThreadId" FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" WHERE "messageThreadExternalId" = $1 LIMIT 1`, await this.messageChannelMessageAssociationService.getFirstByMessageThreadExternalId(
[messageThreadExternalId], messageThreadExternalId,
); workspaceId,
);
const existingMessageThread = existingMessageThreads[0]?.messageThreadId; const existingMessageThread =
existingMessageChannelMessageAssociationByMessageThreadExternalId?.messageThreadId;
if (existingMessageThread) { if (existingMessageThread) {
return Promise.resolve(existingMessageThread); return Promise.resolve(existingMessageThread);
@ -142,7 +150,7 @@ export class MessagingUtilsService {
const newMessageThreadId = v4(); const newMessageThreadId = v4();
await workspaceDataSource?.query( await manager.query(
`INSERT INTO ${dataSourceMetadata.schema}."messageThread" ("id") VALUES ($1)`, `INSERT INTO ${dataSourceMetadata.schema}."messageThread" ("id") VALUES ($1)`,
[newMessageThreadId], [newMessageThreadId],
); );
@ -190,95 +198,50 @@ export class MessagingUtilsService {
} }
} }
public async deleteMessageChannelMessageAssociations( public async deleteMessages(
messageExternalIds: string[], messagesDeleted: string[],
connectedAccountId: string, gmailMessageChannelId: string,
dataSourceMetadata: DataSourceEntity,
workspaceDataSource: DataSource,
) {
await workspaceDataSource?.query(
`DELETE FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`,
[messageExternalIds, connectedAccountId],
);
}
public async getConnectedAccounts(
dataSourceMetadata: DataSourceEntity,
workspaceDataSource: DataSource,
): Promise<any[]> {
const connectedAccounts = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'google'`,
);
return connectedAccounts;
}
public async getConnectedAcountByIdOrFail(
connectedAccountId: string,
dataSourceMetadata: DataSourceEntity,
workspaceDataSource: DataSource,
): Promise<any> {
const connectedAccounts = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "id" = $1`,
[connectedAccountId],
);
if (!connectedAccounts || connectedAccounts.length === 0) {
throw new Error('No connected account found');
}
return connectedAccounts[0];
}
public async getDataSourceMetadataWorkspaceMetadata(
workspaceId: string, workspaceId: string,
): Promise<{ ) {
dataSourceMetadata: DataSourceEntity; const messageChannelMessageAssociationsToDelete =
workspaceDataSource: DataSource; await this.messageChannelMessageAssociationService.getByMessageExternalIdsAndMessageChannelId(
}> { messagesDeleted,
const dataSourceMetadata = gmailMessageChannelId,
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
workspaceId, workspaceId,
); );
const workspaceDataSource = const messageIdsFromMessageChannelMessageAssociationsToDelete =
await this.typeORMService.connectToDataSource(dataSourceMetadata); messageChannelMessageAssociationsToDelete.map(
(messageChannelMessageAssociationToDelete) =>
if (!workspaceDataSource) { messageChannelMessageAssociationToDelete.messageId,
throw new Error('No workspace data source found');
}
return {
dataSourceMetadata,
workspaceDataSource,
};
}
public async saveLastSyncHistoryId(
historyId: string,
connectedAccountId: string,
dataSourceMetadata: DataSourceEntity,
workspaceDataSource: DataSource,
) {
await workspaceDataSource?.query(
`UPDATE ${dataSourceMetadata.schema}."connectedAccount" SET "lastSyncHistoryId" = $1 WHERE "id" = $2`,
[historyId, connectedAccountId],
);
}
public async getMessageChannelMessageAssociations(
messageExternalIds: string[],
gmailMessageChannelId: string,
dataSourceMetadata: DataSourceEntity,
workspaceDataSource: DataSource,
) {
const existingMessageChannelMessageAssociation =
await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation"
WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`,
[messageExternalIds, gmailMessageChannelId],
); );
return existingMessageChannelMessageAssociation; await this.messageChannelMessageAssociationService.deleteByMessageExternalIdsAndMessageChannelId(
messagesDeleted,
gmailMessageChannelId,
workspaceId,
);
const messageChannelMessageAssociationByMessageIds =
await this.messageChannelMessageAssociationService.getByMessageIds(
messageIdsFromMessageChannelMessageAssociationsToDelete,
workspaceId,
);
const messageIdsFromMessageChannelMessageAssociationByMessageIds =
messageChannelMessageAssociationByMessageIds.map(
(messageChannelMessageAssociation) =>
messageChannelMessageAssociation.messageId,
);
const messageIdsToDelete =
messageIdsFromMessageChannelMessageAssociationsToDelete.filter(
(messageId) =>
!messageIdsFromMessageChannelMessageAssociationByMessageIds.includes(
messageId,
),
);
await this.messageService.deleteByIds(workspaceId, messageIdsToDelete);
} }
} }

View File

@ -22,6 +22,15 @@ export class WorkspaceDataSourceService {
public async connectToWorkspaceDataSource( public async connectToWorkspaceDataSource(
workspaceId: string, workspaceId: string,
): Promise<DataSource> { ): Promise<DataSource> {
const { dataSource } =
await this.connectedToWorkspaceDataSourceAndReturnMetadata(workspaceId);
return dataSource;
}
public async connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId: string,
) {
const dataSourceMetadata = const dataSourceMetadata =
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
workspaceId, workspaceId,
@ -36,7 +45,7 @@ export class WorkspaceDataSourceService {
); );
} }
return dataSource; return { dataSource, dataSourceMetadata };
} }
/** /**

View File

@ -0,0 +1,10 @@
import { BaseObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/base.object-metadata';
// Note: This is actually not enterely correct, id field should only be added if the relation is MANY_TO_ONE or ONE_TO_ONE
export type ObjectRecord<T extends BaseObjectMetadata> = {
[K in keyof T as T[K] extends BaseObjectMetadata
? `${Extract<K, string>}Id`
: K]: T[K] extends BaseObjectMetadata ? string : T[K];
} & {
[K in keyof T]: T[K] extends BaseObjectMetadata ? ObjectRecord<T[K]> : T[K];
};