3202 fetch emails by threads (#3214)

* change fetchAllByBatches and fetchBatch to allow messages and threads to be fetched by batches

* wip

* format threads batches

* command is working

* command is working

* fix typing

* updates
This commit is contained in:
bosiraphael 2024-01-03 15:01:22 +01:00 committed by GitHub
parent ea06f04350
commit 67fca68480
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 198 additions and 108 deletions

View File

@ -4,8 +4,10 @@ import axios, { AxiosInstance, AxiosResponse } from 'axios';
import { simpleParser } from 'mailparser'; import { simpleParser } from 'mailparser';
import { GmailMessage } from 'src/workspace/messaging/types/gmailMessage'; import { GmailMessage } from 'src/workspace/messaging/types/gmailMessage';
import { MessageQuery } from 'src/workspace/messaging/types/messageQuery'; import { MessageOrThreadQuery } from 'src/workspace/messaging/types/messageOrThreadQuery';
import { GmailParsedResponse } from 'src/workspace/messaging/types/gmailParsedResponse'; import { GmailMessageParsedResponse } from 'src/workspace/messaging/types/gmailMessageParsedResponse';
import { GmailThreadParsedResponse } from 'src/workspace/messaging/types/gmailThreadParsedResponse';
import { GmailThread } from 'src/workspace/messaging/types/gmailThread';
@Injectable() @Injectable()
export class FetchBatchMessagesService { export class FetchBatchMessagesService {
@ -17,60 +19,95 @@ export class FetchBatchMessagesService {
}); });
} }
async fetchAllByBatches( async fetchAllMessages(
messageQueries: MessageQuery[], queries: MessageOrThreadQuery[],
accessToken: string, accessToken: string,
): Promise<GmailMessage[]> { ): Promise<GmailMessage[]> {
const batchLimit = 100; const batchResponses = await this.fetchAllByBatches(
queries,
accessToken,
'batch_gmail_messages',
);
let batchOffset = 0; const messages = await this.formatBatchResponsesAsGmailMessages(
batchResponses,
let messages: GmailMessage[] = []; );
while (batchOffset < messageQueries.length) {
const batchResponse = await this.fetchBatch(
messageQueries,
accessToken,
batchOffset,
batchLimit,
);
messages = messages.concat(batchResponse);
batchOffset += batchLimit;
}
return messages; return messages;
} }
async fetchAllThreads(
queries: MessageOrThreadQuery[],
accessToken: string,
): Promise<GmailThread[]> {
const batchResponses = await this.fetchAllByBatches(
queries,
accessToken,
'batch_gmail_threads',
);
const threads = await this.formatBatchResponsesAsGmailThreads(
batchResponses,
);
return threads;
}
async fetchAllByBatches(
queries: MessageOrThreadQuery[],
accessToken: string,
boundary: string,
): Promise<AxiosResponse<any, any>[]> {
const batchLimit = 100;
let batchOffset = 0;
let batchResponses: AxiosResponse<any, any>[] = [];
while (batchOffset < queries.length) {
const batchResponse = await this.fetchBatch(
queries,
accessToken,
batchOffset,
batchLimit,
boundary,
);
batchResponses = batchResponses.concat(batchResponse);
batchOffset += batchLimit;
}
return batchResponses;
}
async fetchBatch( async fetchBatch(
messageQueries: MessageQuery[], queries: MessageOrThreadQuery[],
accessToken: string, accessToken: string,
batchOffset: number, batchOffset: number,
batchLimit: number, batchLimit: number,
): Promise<GmailMessage[]> { boundary: string,
const limitedMessageQueries = messageQueries.slice( ): Promise<AxiosResponse<any, any>> {
batchOffset, const limitedQueries = queries.slice(batchOffset, batchOffset + batchLimit);
batchOffset + batchLimit,
);
const response = await this.httpService.post( const response = await this.httpService.post(
'/', '/',
this.createBatchBody(limitedMessageQueries, 'batch_gmail_messages'), this.createBatchBody(limitedQueries, boundary),
{ {
headers: { headers: {
'Content-Type': 'multipart/mixed; boundary=batch_gmail_messages', 'Content-Type': 'multipart/mixed; boundary=' + boundary,
Authorization: 'Bearer ' + accessToken, Authorization: 'Bearer ' + accessToken,
}, },
}, },
); );
const formattedResponse = await this.formatBatchResponse(response); return response;
return formattedResponse;
} }
createBatchBody(messageQueries: MessageQuery[], boundary: string): string { createBatchBody(
messageQueries: MessageOrThreadQuery[],
boundary: string,
): string {
let batchBody: string[] = []; let batchBody: string[] = [];
messageQueries.forEach(function (call) { messageQueries.forEach(function (call) {
@ -96,8 +133,10 @@ export class FetchBatchMessagesService {
parseBatch( parseBatch(
responseCollection: AxiosResponse<any, any>, responseCollection: AxiosResponse<any, any>,
): GmailParsedResponse[] { ): GmailMessageParsedResponse[] | GmailThreadParsedResponse[] {
const responseItems: GmailParsedResponse[] = []; const responseItems:
| GmailMessageParsedResponse[]
| GmailThreadParsedResponse[] = [];
const boundary = this.getBatchSeparator(responseCollection); const boundary = this.getBatchSeparator(responseCollection);
@ -121,8 +160,8 @@ export class FetchBatchMessagesService {
return responseItems; return responseItems;
} }
getBatchSeparator(response: AxiosResponse<any, any>): string { getBatchSeparator(responseCollection: AxiosResponse<any, any>): string {
const headers = response.headers; const headers = responseCollection.headers;
const contentType: string = headers['content-type']; const contentType: string = headers['content-type'];
@ -135,25 +174,27 @@ export class FetchBatchMessagesService {
return boundary?.replace('boundary=', '').trim() || ''; return boundary?.replace('boundary=', '').trim() || '';
} }
async formatBatchResponse( async formatBatchResponseAsGmailMessage(
response: AxiosResponse<any, any>, responseCollection: AxiosResponse<any, any>,
): Promise<GmailMessage[]> { ): Promise<GmailMessage[]> {
const parsedResponses = this.parseBatch(response); const parsedResponses = this.parseBatch(
responseCollection,
) as GmailMessageParsedResponse[];
const formattedResponse = Promise.all( const formattedResponse = Promise.all(
parsedResponses.map(async (item) => { parsedResponses.map(async (message: GmailMessageParsedResponse) => {
if (item.error) { if (message.error) {
console.log('Error', item.error); console.log('Error', message.error);
return; return;
} }
const { id, threadId, internalDate, raw } = item; const { id, threadId, internalDate, raw } = message;
const message = atob(raw?.replace(/-/g, '+').replace(/_/g, '/')); const body = atob(raw?.replace(/-/g, '+').replace(/_/g, '/'));
try { try {
const parsed = await simpleParser(message); const parsed = await simpleParser(body);
const { const {
subject, subject,
@ -190,9 +231,75 @@ export class FetchBatchMessagesService {
); );
const filteredResponse = (await formattedResponse).filter( const filteredResponse = (await formattedResponse).filter(
(item) => item, (message) => message,
) as GmailMessage[]; ) as GmailMessage[];
return filteredResponse; return filteredResponse;
} }
async formatBatchResponsesAsGmailMessages(
batchResponses: AxiosResponse<any, any>[],
): Promise<GmailMessage[]> {
const formattedResponses = await Promise.all(
batchResponses.map(async (response) => {
const formattedResponse = await this.formatBatchResponseAsGmailMessage(
response,
);
return formattedResponse;
}),
);
return formattedResponses.flat();
}
async formatBatchResponseAsGmailThread(
responseCollection: AxiosResponse<any, any>,
): Promise<GmailThread[]> {
const parsedResponses = this.parseBatch(
responseCollection,
) as GmailThreadParsedResponse[];
const formattedResponse = Promise.all(
parsedResponses.map(async (thread: GmailThreadParsedResponse) => {
if (thread.error) {
console.log('Error', thread.error);
return;
}
try {
const { id, messages } = thread;
return {
id,
messageIds: messages.map((message) => message.id) || [],
};
} catch (error) {
console.log('Error', error);
}
}),
);
const filteredResponse = (await formattedResponse).filter(
(item) => item,
) as GmailThread[];
return filteredResponse;
}
async formatBatchResponsesAsGmailThreads(
batchResponses: AxiosResponse<any, any>[],
): Promise<GmailThread[]> {
const formattedResponses = await Promise.all(
batchResponses.map(async (response) => {
const formattedResponse = await this.formatBatchResponseAsGmailThread(
response,
);
return formattedResponse;
}),
);
return formattedResponses.flat();
}
} }

View File

@ -9,7 +9,7 @@ import { EnvironmentService } from 'src/integrations/environment/environment.ser
import { DataSourceService } from 'src/metadata/data-source/data-source.service'; import { DataSourceService } from 'src/metadata/data-source/data-source.service';
import { FetchBatchMessagesService } from 'src/workspace/messaging/services/fetch-batch-messages.service'; import { FetchBatchMessagesService } from 'src/workspace/messaging/services/fetch-batch-messages.service';
import { GmailMessage } from 'src/workspace/messaging/types/gmailMessage'; import { GmailMessage } from 'src/workspace/messaging/types/gmailMessage';
import { MessageQuery } from 'src/workspace/messaging/types/messageQuery'; import { MessageOrThreadQuery } from 'src/workspace/messaging/types/messageOrThreadQuery';
import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity'; import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity';
@Injectable() @Injectable()
@ -26,10 +26,6 @@ export class FetchWorkspaceMessagesService {
workspaceId, workspaceId,
'20202020-0687-4c41-b707-ed1bfca972a7', '20202020-0687-4c41-b707-ed1bfca972a7',
); );
await this.fetchWorkspaceMemberMessages(
workspaceId,
'20202020-0687-4c41-b707-ed1bfca972a7',
);
} }
async fetchWorkspaceMemberThreads( async fetchWorkspaceMemberThreads(
@ -59,6 +55,7 @@ export class FetchWorkspaceMessagesService {
throw new Error('No connected account found'); throw new Error('No connected account found');
} }
const accessToken = connectedAccounts[0]?.accessToken;
const refreshToken = connectedAccounts[0]?.refreshToken; const refreshToken = connectedAccounts[0]?.refreshToken;
if (!refreshToken) { if (!refreshToken) {
@ -74,7 +71,7 @@ export class FetchWorkspaceMessagesService {
const threadsData = threads.data.threads; const threadsData = threads.data.threads;
if (!threadsData) { if (!threadsData || threadsData?.length === 0) {
return; return;
} }
@ -84,61 +81,29 @@ export class FetchWorkspaceMessagesService {
workspaceDataSource, workspaceDataSource,
connectedAccounts[0].id, connectedAccounts[0].id,
); );
}
async fetchWorkspaceMemberMessages( const threadQueries: MessageOrThreadQuery[] = threadsData.map((thread) => ({
workspaceId: string, uri: '/gmail/v1/users/me/threads/' + thread.id + '?format=minimal',
workspaceMemberId: string,
maxResults = 500,
): Promise<void> {
const dataSourceMetadata =
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
workspaceId,
);
const workspaceDataSource = await this.typeORMService.connectToDataSource(
dataSourceMetadata,
);
if (!workspaceDataSource) {
throw new Error('No workspace data source found');
}
const connectedAccounts = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail' AND "accountOwnerId" = $1`,
[workspaceMemberId],
);
if (!connectedAccounts || connectedAccounts.length === 0) {
throw new Error('No connected account found');
}
const accessToken = connectedAccounts[0]?.accessToken;
const refreshToken = connectedAccounts[0]?.refreshToken;
if (!accessToken || !refreshToken) {
throw new Error('No access token or refresh token found');
}
const gmailClient = await this.getGmailClient(refreshToken);
const messages = await gmailClient.users.messages.list({
userId: 'me',
maxResults,
});
const messagesData = messages.data.messages;
if (!messagesData || messagesData?.length === 0) {
return;
}
const messageQueries: MessageQuery[] = messagesData.map((message) => ({
uri: '/gmail/v1/users/me/messages/' + message.id + '?format=RAW',
})); }));
const threadsWithMessageIds =
await this.fetchBatchMessagesService.fetchAllThreads(
threadQueries,
accessToken,
);
const messageIds = threadsWithMessageIds
.map((thread) => thread.messageIds)
.flat();
const messageQueries: MessageOrThreadQuery[] = messageIds.map(
(messageId) => ({
uri: '/gmail/v1/users/me/messages/' + messageId + '?format=RAW',
}),
);
const messagesResponse = const messagesResponse =
await this.fetchBatchMessagesService.fetchAllByBatches( await this.fetchBatchMessagesService.fetchAllMessages(
messageQueries, messageQueries,
accessToken, accessToken,
); );

View File

@ -1,4 +1,4 @@
export type GmailParsedResponse = { export type GmailMessageParsedResponse = {
id: string; id: string;
threadId: string; threadId: string;
labelIds: string[]; labelIds: string[];

View File

@ -0,0 +1,4 @@
export type GmailThread = {
id: string;
messageIds: string[];
};

View File

@ -0,0 +1,14 @@
type Message = {
id: string;
labels: string[];
};
export type GmailThreadParsedResponse = {
id: string;
messages: Message[];
error?: {
code: number;
message: string;
status: string;
};
};

View File

@ -0,0 +1,3 @@
export type MessageOrThreadQuery = {
uri: string;
};

View File

@ -1,3 +0,0 @@
export type MessageQuery = {
uri: string;
};