Update foreign table to distant table schema (#5508)

Closes #5069 back-end part

And:
- do not display schemaPendingUpdates status on remote server lists as
this call will become too costly if there are dozens of servers
- (refacto) create foreignTableService

After this is merged we will be able to delete remoteTable's
availableTables column
This commit is contained in:
Marie 2024-05-21 21:25:38 +02:00 committed by GitHub
parent 29c27800fb
commit 3deda2f29a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 618 additions and 296 deletions

View File

@ -338,6 +338,8 @@ export enum FileFolder {
export type FindManyRemoteTablesInput = {
/** The id of the remote server. */
id: Scalars['ID']['input'];
/** Indicates if pending schema updates status should be computed. */
shouldFetchPendingSchemaUpdates?: InputMaybe<Scalars['Boolean']['input']>;
};
export type FullName = {

View File

@ -10,11 +10,13 @@ import {
type UseGetDatabaseConnectionTablesParams = {
connectionId: string;
skip?: boolean;
shouldFetchPendingSchemaUpdates?: boolean;
};
export const useGetDatabaseConnectionTables = ({
connectionId,
skip,
shouldFetchPendingSchemaUpdates,
}: UseGetDatabaseConnectionTablesParams) => {
const apolloMetadataClient = useApolloMetadataClient();
@ -27,6 +29,7 @@ export const useGetDatabaseConnectionTables = ({
variables: {
input: {
id: connectionId,
shouldFetchPendingSchemaUpdates,
},
},
});

View File

@ -53,6 +53,7 @@ export const SettingsIntegrationDatabaseConnectionSummaryCard = ({
<>
<SettingsIntegrationDatabaseConnectionSyncStatus
connectionId={connectionId}
shouldFetchPendingSchemaUpdates
/>
<Dropdown
dropdownId={dropdownId}

View File

@ -6,15 +6,18 @@ import { isDefined } from '~/utils/isDefined';
type SettingsIntegrationDatabaseConnectionSyncStatusProps = {
connectionId: string;
skip?: boolean;
shouldFetchPendingSchemaUpdates?: boolean;
};
export const SettingsIntegrationDatabaseConnectionSyncStatus = ({
connectionId,
skip,
shouldFetchPendingSchemaUpdates,
}: SettingsIntegrationDatabaseConnectionSyncStatusProps) => {
const { tables, error } = useGetDatabaseConnectionTables({
connectionId,
skip,
shouldFetchPendingSchemaUpdates,
});
if (isDefined(error)) {

View File

@ -42,6 +42,7 @@ export const useDatabaseConnection = () => {
const { tables } = useGetDatabaseConnectionTables({
connectionId,
skip: !connection,
shouldFetchPendingSchemaUpdates: true,
});
return { connection, integration, databaseKey, tables };

View File

@ -0,0 +1,17 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
export class RemoveAvailableTables1716310822694 implements MigrationInterface {
name = 'RemoveAvailableTables1716310822694';
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE "metadata"."remoteServer" DROP COLUMN "availableTables"`,
);
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE "metadata"."remoteServer" ADD "availableTables" jsonb`,
);
}
}

View File

@ -11,7 +11,6 @@ import {
import { RemoteTableEntity } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table.entity';
import { UserMappingOptions } from 'src/engine/metadata-modules/remote-server/types/user-mapping-options';
import { DistantTables } from 'src/engine/metadata-modules/remote-server/remote-table/distant-table/types/distant-table';
export enum RemoteServerType {
POSTGRES_FDW = 'postgres_fdw',
@ -59,9 +58,6 @@ export class RemoteServerEntity<T extends RemoteServerType> {
@Column({ nullable: false, type: 'uuid' })
workspaceId: string;
@Column({ type: 'jsonb', nullable: true })
availableTables: DistantTables;
@OneToMany(() => RemoteTableEntity, (table) => table.server, {
cascade: true,
})

View File

@ -23,43 +23,40 @@ export class DistantTableService {
>,
) {}
public getDistantTableColumns(
remoteServer: RemoteServerEntity<RemoteServerType>,
tableName: string,
): PostgresTableSchemaColumn[] {
if (!remoteServer.availableTables) {
throw new BadRequestException(
'Remote server available tables are not defined',
);
}
return remoteServer.availableTables[tableName];
}
public async fetchDistantTables(
remoteServer: RemoteServerEntity<RemoteServerType>,
workspaceId: string,
): Promise<DistantTables> {
return this.createAvailableTables(remoteServer, workspaceId);
}
private async createAvailableTables(
remoteServer: RemoteServerEntity<RemoteServerType>,
workspaceId: string,
tableName?: string,
): Promise<DistantTables> {
if (remoteServer.schema) {
return this.createAvailableTablesFromDynamicSchema(
return this.getDistantTablesFromDynamicSchema(
remoteServer,
workspaceId,
tableName,
);
}
return this.createAvailableTablesFromStaticSchema(remoteServer);
return this.getDistantTablesFromStaticSchema(remoteServer);
}
private async createAvailableTablesFromDynamicSchema(
public async getDistantTableColumns(
remoteServer: RemoteServerEntity<RemoteServerType>,
workspaceId: string,
tableName: string,
): Promise<PostgresTableSchemaColumn[]> {
const distantTables = await this.fetchDistantTables(
remoteServer,
workspaceId,
tableName,
);
return distantTables[tableName] || [];
}
private async getDistantTablesFromDynamicSchema(
remoteServer: RemoteServerEntity<RemoteServerType>,
workspaceId: string,
tableName?: string,
): Promise<DistantTables> {
if (!remoteServer.schema) {
throw new BadRequestException('Remote server schema is not defined');
@ -73,12 +70,16 @@ export class DistantTableService {
workspaceId,
);
const availableTables = await workspaceDataSource.transaction(
const distantTables = await workspaceDataSource.transaction(
async (entityManager: EntityManager) => {
await entityManager.query(`CREATE SCHEMA "${tmpSchemaName}"`);
const tableLimitationsOptions = tableName
? ` LIMIT TO (${tableName})`
: '';
await entityManager.query(
`IMPORT FOREIGN SCHEMA "${remoteServer.schema}" FROM SERVER "${remoteServer.foreignDataWrapperId}" INTO "${tmpSchemaName}"`,
`IMPORT FOREIGN SCHEMA "${remoteServer.schema}"${tableLimitationsOptions} FROM SERVER "${remoteServer.foreignDataWrapperId}" INTO "${tmpSchemaName}"`,
);
const createdForeignTableNames = await entityManager.query(
@ -106,22 +107,14 @@ export class DistantTableService {
},
);
await this.remoteServerRepository.update(remoteServer.id, {
availableTables,
});
return availableTables;
return distantTables;
}
private async createAvailableTablesFromStaticSchema(
private async getDistantTablesFromStaticSchema(
remoteServer: RemoteServerEntity<RemoteServerType>,
): Promise<DistantTables> {
switch (remoteServer.foreignDataWrapperType) {
case RemoteServerType.STRIPE_FDW:
this.remoteServerRepository.update(remoteServer.id, {
availableTables: STRIPE_DISTANT_TABLES,
});
return STRIPE_DISTANT_TABLES;
default:
throw new BadRequestException(

View File

@ -1,5 +1,5 @@
import { PostgresTableSchemaColumn } from 'src/engine/metadata-modules/remote-server/types/postgres-table-schema-column';
export type DistantTables = {
[tableName: string]: PostgresTableSchemaColumn[];
[distantTableName: string]: PostgresTableSchemaColumn[];
};

View File

@ -1,9 +1,18 @@
import { InputType, ID } from '@nestjs/graphql';
import { InputType, ID, Field } from '@nestjs/graphql';
import { IDField } from '@ptc-org/nestjs-query-graphql';
import { IsOptional } from 'class-validator';
@InputType()
export class FindManyRemoteTablesInput {
@IDField(() => ID, { description: 'The id of the remote server.' })
id!: string;
@IsOptional()
@Field(() => Boolean, {
description:
'Indicates if pending schema updates status should be computed.',
nullable: true,
})
shouldFetchPendingSchemaUpdates?: boolean;
}

View File

@ -0,0 +1,19 @@
import { Module } from '@nestjs/common';
import { ForeignTableService } from 'src/engine/metadata-modules/remote-server/remote-table/foreign-table/foreign-table.service';
import { WorkspaceCacheVersionModule } from 'src/engine/metadata-modules/workspace-cache-version/workspace-cache-version.module';
import { WorkspaceMigrationModule } from 'src/engine/metadata-modules/workspace-migration/workspace-migration.module';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
import { WorkspaceMigrationRunnerModule } from 'src/engine/workspace-manager/workspace-migration-runner/workspace-migration-runner.module';
@Module({
imports: [
WorkspaceMigrationModule,
WorkspaceMigrationRunnerModule,
WorkspaceDataSourceModule,
WorkspaceCacheVersionModule,
],
providers: [ForeignTableService],
exports: [ForeignTableService],
})
export class ForeignTableModule {}

View File

@ -0,0 +1,173 @@
import { BadRequestException, Injectable } from '@nestjs/common';
import {
RemoteServerEntity,
RemoteServerType,
} from 'src/engine/metadata-modules/remote-server/remote-server.entity';
import { RemoteTableStatus } from 'src/engine/metadata-modules/remote-server/remote-table/dtos/remote-table.dto';
import { getForeignTableColumnName } from 'src/engine/metadata-modules/remote-server/remote-table/foreign-table/utils/get-foreign-table-column-name.util';
import { PostgresTableSchemaColumn } from 'src/engine/metadata-modules/remote-server/types/postgres-table-schema-column';
import { WorkspaceCacheVersionService } from 'src/engine/metadata-modules/workspace-cache-version/workspace-cache-version.service';
import { generateMigrationName } from 'src/engine/metadata-modules/workspace-migration/utils/generate-migration-name.util';
import {
ReferencedTable,
WorkspaceMigrationTableActionType,
WorkspaceMigrationForeignColumnDefinition,
WorkspaceMigrationForeignTable,
WorkspaceMigrationColumnAction,
} from 'src/engine/metadata-modules/workspace-migration/workspace-migration.entity';
import { WorkspaceMigrationService } from 'src/engine/metadata-modules/workspace-migration/workspace-migration.service';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { WorkspaceMigrationRunnerService } from 'src/engine/workspace-manager/workspace-migration-runner/workspace-migration-runner.service';
@Injectable()
export class ForeignTableService {
constructor(
private readonly workspaceMigrationService: WorkspaceMigrationService,
private readonly workspaceMigrationRunnerService: WorkspaceMigrationRunnerService,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly workspaceCacheVersionService: WorkspaceCacheVersionService,
) {}
public async fetchForeignTableNamesWithinWorkspace(
workspaceId: string,
foreignDataWrapperId: string,
): Promise<string[]> {
const workspaceDataSource =
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
workspaceId,
);
return (
await workspaceDataSource.query(
`SELECT foreign_table_name, foreign_server_name FROM information_schema.foreign_tables WHERE foreign_server_name = $1`,
[foreignDataWrapperId],
)
).map((foreignTable) => foreignTable.foreign_table_name);
}
public async createForeignTable(
workspaceId: string,
localTableName: string,
remoteServer: RemoteServerEntity<RemoteServerType>,
distantTableName: string,
distantTableColumns: PostgresTableSchemaColumn[],
) {
const referencedTable: ReferencedTable = this.buildReferencedTable(
remoteServer,
distantTableName,
);
const workspaceMigration =
await this.workspaceMigrationService.createCustomMigration(
generateMigrationName(`create-foreign-table-${localTableName}`),
workspaceId,
[
{
name: localTableName,
action: WorkspaceMigrationTableActionType.CREATE_FOREIGN_TABLE,
foreignTable: {
columns: distantTableColumns.map(
(column) =>
({
columnName: getForeignTableColumnName(column.columnName),
columnType: column.dataType,
distantColumnName: column.columnName,
}) satisfies WorkspaceMigrationForeignColumnDefinition,
),
referencedTable,
foreignDataWrapperId: remoteServer.foreignDataWrapperId,
} satisfies WorkspaceMigrationForeignTable,
},
],
);
// TODO: This should be done in a transaction. Waiting for a global refactoring of transaction management.
try {
await this.workspaceMigrationRunnerService.executeMigrationFromPendingMigrations(
workspaceId,
);
} catch (exception) {
this.workspaceMigrationService.deleteById(workspaceMigration.id);
throw new BadRequestException(
'Could not create foreign table. The table may already exists or a column type may not be supported.',
);
}
}
public async updateForeignTable(
foreignTableName: string,
workspaceId: string,
columnsUpdates?: WorkspaceMigrationColumnAction[],
) {
const workspaceMigration =
await this.workspaceMigrationService.createCustomMigration(
generateMigrationName(`alter-foreign-table-${foreignTableName}`),
workspaceId,
[
{
name: foreignTableName,
action: WorkspaceMigrationTableActionType.ALTER_FOREIGN_TABLE,
columns: columnsUpdates,
},
],
);
// TODO: This should be done in a transaction. Waiting for a global refactoring of transaction management.
try {
await this.workspaceMigrationRunnerService.executeMigrationFromPendingMigrations(
workspaceId,
);
await this.workspaceCacheVersionService.incrementVersion(workspaceId);
return {
name: foreignTableName,
status: RemoteTableStatus.SYNCED,
schemaPendingUpdates: [],
};
} catch (exception) {
this.workspaceMigrationService.deleteById(workspaceMigration.id);
throw new BadRequestException('Could not alter foreign table.');
}
}
public async deleteForeignTable(
foreignTableName: string,
workspaceId: string,
) {
await this.workspaceMigrationService.createCustomMigration(
generateMigrationName(`drop-foreign-table-${foreignTableName}`),
workspaceId,
[
{
name: foreignTableName,
action: WorkspaceMigrationTableActionType.DROP_FOREIGN_TABLE,
},
],
);
return this.workspaceMigrationRunnerService.executeMigrationFromPendingMigrations(
workspaceId,
);
}
private buildReferencedTable(
remoteServer: RemoteServerEntity<RemoteServerType>,
distantTableName: string,
): ReferencedTable {
switch (remoteServer.foreignDataWrapperType) {
case RemoteServerType.POSTGRES_FDW:
return {
table_name: distantTableName,
schema_name: remoteServer.schema,
};
case RemoteServerType.STRIPE_FDW:
return { object: distantTableName };
default:
throw new BadRequestException('Foreign data wrapper not supported');
}
}
}

View File

@ -0,0 +1,11 @@
import { Module } from '@nestjs/common';
import { RemoteTableSchemaUpdateService } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table-schema-update/remote-table-schema-update.service';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
@Module({
imports: [WorkspaceDataSourceModule],
providers: [RemoteTableSchemaUpdateService],
exports: [RemoteTableSchemaUpdateService],
})
export class RemoteTableSchemaUpdateModule {}

View File

@ -0,0 +1,176 @@
import { Injectable } from '@nestjs/common';
import { getForeignTableColumnName as convertToForeignTableColumnName } from 'src/engine/metadata-modules/remote-server/remote-table/foreign-table/utils/get-foreign-table-column-name.util';
import { DistantTables } from 'src/engine/metadata-modules/remote-server/remote-table/distant-table/types/distant-table';
import {
RemoteTableStatus,
DistantTableUpdate,
} from 'src/engine/metadata-modules/remote-server/remote-table/dtos/remote-table.dto';
import { RemoteTableEntity } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table.entity';
import { fetchTableColumns } from 'src/engine/metadata-modules/remote-server/remote-table/utils/fetch-table-columns.util';
import { PostgresTableSchemaColumn } from 'src/engine/metadata-modules/remote-server/types/postgres-table-schema-column';
import {
WorkspaceMigrationColumnAction,
WorkspaceMigrationColumnCreate,
WorkspaceMigrationColumnActionType,
WorkspaceMigrationColumnDrop,
} from 'src/engine/metadata-modules/workspace-migration/workspace-migration.entity';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
@Injectable()
export class RemoteTableSchemaUpdateService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
public async getDistantTablesWithUpdates({
remoteServerSchema,
workspaceId,
remoteTables,
distantTables,
}: {
remoteServerSchema: string;
workspaceId: string;
remoteTables: RemoteTableEntity[];
distantTables: DistantTables;
}) {
const schemaPendingUpdates =
await this.getSchemaUpdatesBetweenForeignAndDistantTables({
workspaceId,
remoteTables,
distantTables,
});
const remoteTablesDistantNames = new Set(
remoteTables.map((remoteTable) => remoteTable.distantTableName),
);
const distantTablesWithUpdates = Object.keys(distantTables).map(
(tableName) => ({
name: tableName,
schema: remoteServerSchema,
status: remoteTablesDistantNames.has(tableName)
? RemoteTableStatus.SYNCED
: RemoteTableStatus.NOT_SYNCED,
schemaPendingUpdates: schemaPendingUpdates[tableName] || [],
}),
);
const deletedTables = Object.entries(schemaPendingUpdates)
.filter(([_tableName, updates]) =>
updates.includes(DistantTableUpdate.TABLE_DELETED),
)
.map(([tableName, updates]) => ({
name: tableName,
schema: remoteServerSchema,
status: RemoteTableStatus.SYNCED,
schemaPendingUpdates: updates,
}));
return [...distantTablesWithUpdates, ...deletedTables];
}
public computeForeignTableColumnsUpdates = (
foreignTableColumns: PostgresTableSchemaColumn[],
distantTableColumns: PostgresTableSchemaColumn[],
): WorkspaceMigrationColumnAction[] => {
const { columnsAdded, columnsDeleted } = this.compareForeignTableColumns(
foreignTableColumns,
distantTableColumns,
);
const columnsAddedUpdates: WorkspaceMigrationColumnCreate[] =
columnsAdded.map((columnAdded) => ({
action: WorkspaceMigrationColumnActionType.CREATE,
columnName: columnAdded.name,
columnType: columnAdded.type,
}));
const columnsDeletedUpdates: WorkspaceMigrationColumnDrop[] =
columnsDeleted.map((columnDeleted) => ({
action: WorkspaceMigrationColumnActionType.DROP,
columnName: columnDeleted,
}));
return [...columnsAddedUpdates, ...columnsDeletedUpdates];
};
private async getSchemaUpdatesBetweenForeignAndDistantTables({
workspaceId,
remoteTables,
distantTables,
}: {
workspaceId: string;
remoteTables: RemoteTableEntity[];
distantTables: DistantTables;
}): Promise<{ [tablename: string]: DistantTableUpdate[] }> {
const updates = {};
for (const remoteTable of remoteTables) {
const distantTable = distantTables[remoteTable.distantTableName];
const tableName = remoteTable.distantTableName;
if (!distantTable) {
updates[tableName] = [DistantTableUpdate.TABLE_DELETED];
continue;
}
const foreignTable = await fetchTableColumns(
this.workspaceDataSourceService,
workspaceId,
remoteTable.localTableName,
);
const { columnsAdded, columnsDeleted } = this.compareForeignTableColumns(
foreignTable,
distantTable,
);
if (columnsAdded.length > 0) {
updates[tableName] = [
...(updates[tableName] || []),
DistantTableUpdate.COLUMNS_ADDED,
];
}
if (columnsDeleted.length > 0) {
updates[tableName] = [
...(updates[tableName] || []),
DistantTableUpdate.COLUMNS_DELETED,
];
}
}
return updates;
}
private compareForeignTableColumns = (
foreignTableColumns: PostgresTableSchemaColumn[],
distantTableColumns: PostgresTableSchemaColumn[],
) => {
const foreignTableColumnNames = new Set(
foreignTableColumns.map((column) => column.columnName),
);
const distantTableColumnsWithConvertedName = distantTableColumns.map(
(column) => {
return {
name: convertToForeignTableColumnName(column.columnName),
type: column.dataType,
};
},
);
const columnsAdded = distantTableColumnsWithConvertedName.filter(
(column) => !foreignTableColumnNames.has(column.name),
);
const columnsDeleted = Array.from(foreignTableColumnNames).filter(
(columnName) =>
!distantTableColumnsWithConvertedName
.map((column) => column.name)
.includes(columnName),
);
return {
columnsAdded,
columnsDeleted,
};
};
}

View File

@ -6,13 +6,13 @@ import { FieldMetadataModule } from 'src/engine/metadata-modules/field-metadata/
import { ObjectMetadataModule } from 'src/engine/metadata-modules/object-metadata/object-metadata.module';
import { RemoteServerEntity } from 'src/engine/metadata-modules/remote-server/remote-server.entity';
import { DistantTableModule } from 'src/engine/metadata-modules/remote-server/remote-table/distant-table/distant-table.module';
import { ForeignTableModule } from 'src/engine/metadata-modules/remote-server/remote-table/foreign-table/foreign-table.module';
import { RemoteTableSchemaUpdateModule } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table-schema-update/remote-table-schema-update.module';
import { RemoteTableEntity } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table.entity';
import { RemoteTableResolver } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table.resolver';
import { RemoteTableService } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table.service';
import { WorkspaceCacheVersionModule } from 'src/engine/metadata-modules/workspace-cache-version/workspace-cache-version.module';
import { WorkspaceMigrationModule } from 'src/engine/metadata-modules/workspace-migration/workspace-migration.module';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
import { WorkspaceMigrationRunnerModule } from 'src/engine/workspace-manager/workspace-migration-runner/workspace-migration-runner.module';
@Module({
imports: [
@ -25,9 +25,9 @@ import { WorkspaceMigrationRunnerModule } from 'src/engine/workspace-manager/wor
ObjectMetadataModule,
FieldMetadataModule,
WorkspaceCacheVersionModule,
WorkspaceMigrationModule,
WorkspaceMigrationRunnerModule,
WorkspaceDataSourceModule,
ForeignTableModule,
RemoteTableSchemaUpdateModule,
],
providers: [RemoteTableService, RemoteTableResolver],
exports: [RemoteTableService],

View File

@ -19,9 +19,10 @@ export class RemoteTableResolver {
@Args('input') input: FindManyRemoteTablesInput,
@AuthWorkspace() { id: workspaceId }: Workspace,
) {
return this.remoteTableService.findDistantTablesByServerId(
return this.remoteTableService.findDistantTablesWithStatusByServerId(
input.id,
workspaceId,
input.shouldFetchPendingSchemaUpdates,
);
}
@ -40,4 +41,15 @@ export class RemoteTableResolver {
) {
return this.remoteTableService.unsyncRemoteTable(input, workspaceId);
}
@Mutation(() => RemoteTableDTO)
async syncRemoteTableSchemaChanges(
@Args('input') input: RemoteTableInput,
@AuthWorkspace() { id: workspaceId }: Workspace,
) {
return this.remoteTableService.syncRemoteTableSchemaChanges(
input,
workspaceId,
);
}
}

View File

@ -3,15 +3,13 @@ import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { plural } from 'pluralize';
import isEmpty from 'lodash.isempty';
import {
RemoteServerType,
RemoteServerEntity,
} from 'src/engine/metadata-modules/remote-server/remote-server.entity';
import {
RemoteTableStatus,
DistantTableUpdate,
} from 'src/engine/metadata-modules/remote-server/remote-table/dtos/remote-table.dto';
import { RemoteTableStatus } from 'src/engine/metadata-modules/remote-server/remote-table/dtos/remote-table.dto';
import {
mapUdtNameToFieldType,
mapUdtNameToFieldSettings,
@ -25,22 +23,14 @@ import { CreateFieldInput } from 'src/engine/metadata-modules/field-metadata/dto
import { WorkspaceCacheVersionService } from 'src/engine/metadata-modules/workspace-cache-version/workspace-cache-version.service';
import { camelCase } from 'src/utils/camel-case';
import { camelToTitleCase } from 'src/utils/camel-to-title-case';
import { WorkspaceMigrationService } from 'src/engine/metadata-modules/workspace-migration/workspace-migration.service';
import { WorkspaceMigrationRunnerService } from 'src/engine/workspace-manager/workspace-migration-runner/workspace-migration-runner.service';
import { generateMigrationName } from 'src/engine/metadata-modules/workspace-migration/utils/generate-migration-name.util';
import {
ReferencedTable,
WorkspaceMigrationForeignColumnDefinition,
WorkspaceMigrationForeignTable,
WorkspaceMigrationTableActionType,
} from 'src/engine/metadata-modules/workspace-migration/workspace-migration.entity';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { RemoteTableEntity } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table.entity';
import { getRemoteTableLocalName } from 'src/engine/metadata-modules/remote-server/remote-table/utils/get-remote-table-local-name.util';
import { DistantTableService } from 'src/engine/metadata-modules/remote-server/remote-table/distant-table/distant-table.service';
import { DistantTables } from 'src/engine/metadata-modules/remote-server/remote-table/distant-table/types/distant-table';
import { getForeignTableColumnName } from 'src/engine/metadata-modules/remote-server/remote-table/utils/get-foreign-table-column-name.util';
import { PostgresTableSchemaColumn } from 'src/engine/metadata-modules/remote-server/types/postgres-table-schema-column';
import { fetchTableColumns } from 'src/engine/metadata-modules/remote-server/remote-table/utils/fetch-table-columns.util';
import { ForeignTableService } from 'src/engine/metadata-modules/remote-server/remote-table/foreign-table/foreign-table.service';
import { RemoteTableSchemaUpdateService } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table-schema-update/remote-table-schema-update.service';
export class RemoteTableService {
private readonly logger = new Logger(RemoteTableService.name);
@ -57,12 +47,16 @@ export class RemoteTableService {
private readonly objectMetadataService: ObjectMetadataService,
private readonly fieldMetadataService: FieldMetadataService,
private readonly distantTableService: DistantTableService,
private readonly workspaceMigrationService: WorkspaceMigrationService,
private readonly workspaceMigrationRunnerService: WorkspaceMigrationRunnerService,
private readonly foreignTableService: ForeignTableService,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly remoteTableSchemaUpdateService: RemoteTableSchemaUpdateService,
) {}
public async findDistantTablesByServerId(id: string, workspaceId: string) {
public async findDistantTablesWithStatusByServerId(
id: string,
workspaceId: string,
shouldFetchPendingSchemaUpdates?: boolean,
) {
const remoteServer = await this.remoteServerRepository.findOne({
where: {
id,
@ -88,7 +82,7 @@ export class RemoteTableService {
workspaceId,
);
if (currentRemoteTables.length === 0) {
if (currentRemoteTables.length === 0 || !shouldFetchPendingSchemaUpdates) {
const distantTablesWithStatus = Object.keys(distantTables).map(
(tableName) => ({
name: tableName,
@ -102,7 +96,7 @@ export class RemoteTableService {
return distantTablesWithStatus;
}
return this.getDistantTablesWithUpdates({
return this.remoteTableSchemaUpdateService.getDistantTablesWithUpdates({
remoteServerSchema: remoteServer.schema,
workspaceId,
remoteTables: currentRemoteTables,
@ -110,109 +104,6 @@ export class RemoteTableService {
});
}
private async getDistantTablesWithUpdates({
remoteServerSchema,
workspaceId,
remoteTables,
distantTables,
}: {
remoteServerSchema: string;
workspaceId: string;
remoteTables: RemoteTableEntity[];
distantTables: DistantTables;
}) {
const schemaPendingUpdates =
await this.getSchemaUpdatesBetweenForeignAndDistantTables({
workspaceId,
remoteTables,
distantTables,
});
const remoteTablesDistantNames = remoteTables.map(
(remoteTable) => remoteTable.distantTableName,
);
const distantTablesWithUpdates = Object.keys(distantTables).map(
(tableName) => ({
name: tableName,
schema: remoteServerSchema,
status: remoteTablesDistantNames.includes(tableName)
? RemoteTableStatus.SYNCED
: RemoteTableStatus.NOT_SYNCED,
schemaPendingUpdates: schemaPendingUpdates[tableName],
}),
);
const deletedTables = Object.entries(schemaPendingUpdates)
.filter(([_tableName, updates]) =>
updates.includes(DistantTableUpdate.TABLE_DELETED),
)
.map(([tableName, updates]) => ({
name: tableName,
schema: remoteServerSchema,
status: RemoteTableStatus.SYNCED,
schemaPendingUpdates: updates,
}));
return distantTablesWithUpdates.concat(deletedTables);
}
private async getSchemaUpdatesBetweenForeignAndDistantTables({
workspaceId,
remoteTables,
distantTables,
}: {
workspaceId: string;
remoteTables: RemoteTableEntity[];
distantTables: DistantTables;
}): Promise<{ [tablename: string]: DistantTableUpdate[] }> {
const updates = {};
for (const remoteTable of remoteTables) {
const distantTable = distantTables[remoteTable.distantTableName];
const tableName = remoteTable.distantTableName;
if (!distantTable) {
updates[tableName] = [DistantTableUpdate.TABLE_DELETED];
continue;
}
const distantTableColumnNames = new Set(
distantTable.map((column) =>
getForeignTableColumnName(column.columnName),
),
);
const foreignTableColumnNames = new Set(
(
await this.fetchTableColumns(workspaceId, remoteTable.localTableName)
).map((column) => column.columnName),
);
const columnsAdded = [...distantTableColumnNames].filter(
(columnName) => !foreignTableColumnNames.has(columnName),
);
const columnsDeleted = [...foreignTableColumnNames].filter(
(columnName) => !distantTableColumnNames.has(columnName),
);
if (columnsAdded.length > 0) {
updates[tableName] = [
...(updates[tableName] || []),
DistantTableUpdate.COLUMNS_ADDED,
];
}
if (columnsDeleted.length > 0) {
updates[tableName] = [
...(updates[tableName] || []),
DistantTableUpdate.COLUMNS_DELETED,
];
}
}
return updates;
}
public async findRemoteTablesByServerId({
remoteServerId,
workspaceId,
@ -281,8 +172,10 @@ export class RemoteTableService {
remoteServerId: remoteServer.id,
});
const distantTableColumns = this.distantTableService.getDistantTableColumns(
const distantTableColumns =
await this.distantTableService.getDistantTableColumns(
remoteServer,
workspaceId,
input.name,
);
@ -295,11 +188,11 @@ export class RemoteTableService {
throw new BadRequestException('Remote table must have an id column');
}
await this.createForeignTable(
await this.foreignTableService.createForeignTable(
workspaceId,
localTableName,
input,
remoteServer,
input.name,
distantTableColumns,
);
@ -373,13 +266,86 @@ export class RemoteTableService {
}
}
public async syncRemoteTableSchemaChanges(
input: RemoteTableInput,
workspaceId: string,
) {
const remoteServer = await this.remoteServerRepository.findOne({
where: {
id: input.remoteServerId,
workspaceId,
},
});
if (!remoteServer) {
throw new NotFoundException('Remote server does not exist');
}
const remoteTable = await this.remoteTableRepository.findOne({
where: {
distantTableName: input.name,
remoteServerId: remoteServer.id,
workspaceId,
},
});
if (!remoteTable) {
throw new NotFoundException('Remote table does not exist');
}
const distantTableColumns =
await this.distantTableService.getDistantTableColumns(
remoteServer,
workspaceId,
remoteTable.distantTableName,
);
if (isEmpty(distantTableColumns)) {
await this.unsyncOne(workspaceId, remoteTable, remoteServer);
return {};
}
const foreignTableColumns = await fetchTableColumns(
this.workspaceDataSourceService,
workspaceId,
remoteTable.localTableName,
);
const columnsUpdates =
this.remoteTableSchemaUpdateService.computeForeignTableColumnsUpdates(
foreignTableColumns,
distantTableColumns,
);
if (isEmpty(columnsUpdates)) {
this.logger.log(
`No update to perform on table "${remoteTable.localTableName}" for workspace ${workspaceId}`,
);
return {
name: remoteTable.localTableName,
status: RemoteTableStatus.SYNCED,
schemaPendingUpdates: [],
};
}
const updatedTable = await this.foreignTableService.updateForeignTable(
remoteTable.localTableName,
workspaceId,
columnsUpdates,
);
return updatedTable;
}
private async unsyncOne(
workspaceId: string,
remoteTable: RemoteTableEntity,
remoteServer: RemoteServerEntity<RemoteServerType>,
) {
const currentForeignTableNames =
await this.fetchForeignTableNamesWithinWorkspace(
await this.foreignTableService.fetchForeignTableNamesWithinWorkspace(
workspaceId,
remoteServer.foreignDataWrapperId,
);
@ -400,18 +366,8 @@ export class RemoteTableService {
);
}
await this.workspaceMigrationService.createCustomMigration(
generateMigrationName(`drop-foreign-table-${remoteTable.localTableName}`),
workspaceId,
[
{
name: remoteTable.localTableName,
action: WorkspaceMigrationTableActionType.DROP_FOREIGN_TABLE,
},
],
);
await this.workspaceMigrationRunnerService.executeMigrationFromPendingMigrations(
await this.foreignTableService.deleteForeignTable(
remoteTable.localTableName,
workspaceId,
);
@ -420,97 +376,6 @@ export class RemoteTableService {
await this.workspaceCacheVersionService.incrementVersion(workspaceId);
}
private async fetchForeignTableNamesWithinWorkspace(
workspaceId: string,
foreignDataWrapperId: string,
): Promise<string[]> {
const workspaceDataSource =
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
workspaceId,
);
return (
await workspaceDataSource.query(
`SELECT foreign_table_name, foreign_server_name FROM information_schema.foreign_tables WHERE foreign_server_name = '${foreignDataWrapperId}'`,
)
).map((foreignTable) => foreignTable.foreign_table_name);
}
private async fetchTableColumns(
workspaceId: string,
tableName: string,
): Promise<PostgresTableSchemaColumn[]> {
const workspaceDataSource =
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
workspaceId,
);
const schemaName =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const res = await workspaceDataSource.query(
`SELECT column_name, data_type, udt_name
FROM information_schema.columns
WHERE table_schema = '${schemaName}' AND table_name = '${tableName}'`,
);
return res.map((column) => ({
columnName: column.column_name,
dataType: column.data_type,
udtName: column.udt_name,
}));
}
private async createForeignTable(
workspaceId: string,
localTableName: string,
remoteTableInput: RemoteTableInput,
remoteServer: RemoteServerEntity<RemoteServerType>,
distantTableColumns: PostgresTableSchemaColumn[],
) {
const referencedTable: ReferencedTable = this.buildReferencedTable(
remoteServer,
remoteTableInput,
);
const workspaceMigration =
await this.workspaceMigrationService.createCustomMigration(
generateMigrationName(`create-foreign-table-${localTableName}`),
workspaceId,
[
{
name: localTableName,
action: WorkspaceMigrationTableActionType.CREATE_FOREIGN_TABLE,
foreignTable: {
columns: distantTableColumns.map(
(column) =>
({
columnName: getForeignTableColumnName(column.columnName),
columnType: column.dataType,
distantColumnName: column.columnName,
}) satisfies WorkspaceMigrationForeignColumnDefinition,
),
referencedTable,
foreignDataWrapperId: remoteServer.foreignDataWrapperId,
} satisfies WorkspaceMigrationForeignTable,
},
],
);
// TODO: This should be done in a transaction. Waiting for a global refactoring of transaction management.
try {
await this.workspaceMigrationRunnerService.executeMigrationFromPendingMigrations(
workspaceId,
);
} catch (exception) {
this.workspaceMigrationService.deleteById(workspaceMigration.id);
throw new BadRequestException(
'Could not create foreign table. The table may already exists or a column type may not be supported.',
);
}
}
private async createRemoteTableMetadata(
workspaceId: string,
localTableBaseName: string,
@ -573,21 +438,4 @@ export class RemoteTableService {
}
}
}
private buildReferencedTable(
remoteServer: RemoteServerEntity<RemoteServerType>,
remoteTableInput: RemoteTableInput,
): ReferencedTable {
switch (remoteServer.foreignDataWrapperType) {
case RemoteServerType.POSTGRES_FDW:
return {
table_name: remoteTableInput.name,
schema_name: remoteServer.schema,
};
case RemoteServerType.STRIPE_FDW:
return { object: remoteTableInput.name };
default:
throw new BadRequestException('Foreign data wrapper not supported');
}
}
}

View File

@ -0,0 +1,24 @@
import { PostgresTableSchemaColumn } from 'src/engine/metadata-modules/remote-server/types/postgres-table-schema-column';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
export const fetchTableColumns = async (
workspaceDataSourceService: WorkspaceDataSourceService,
workspaceId: string,
tableName: string,
): Promise<PostgresTableSchemaColumn[]> => {
const schemaName = workspaceDataSourceService.getSchemaName(workspaceId);
const res = await workspaceDataSourceService.executeRawQuery(
`SELECT column_name, data_type, udt_name
FROM information_schema.columns
WHERE table_schema = $1 AND table_name = $2`,
[schemaName, tableName],
workspaceId,
);
return res.map((column) => ({
columnName: column.column_name,
dataType: column.data_type,
udtName: column.udt_name,
}));
};

View File

@ -104,6 +104,7 @@ export enum WorkspaceMigrationTableActionType {
DROP = 'drop',
CREATE_FOREIGN_TABLE = 'create_foreign_table',
DROP_FOREIGN_TABLE = 'drop_foreign_table',
ALTER_FOREIGN_TABLE = 'alter_foreign_table',
}
export type WorkspaceMigrationTableAction = {

View File

@ -155,6 +155,14 @@ export class WorkspaceMigrationRunnerService {
`DROP FOREIGN TABLE ${schemaName}."${tableMigration.name}"`,
);
break;
case WorkspaceMigrationTableActionType.ALTER_FOREIGN_TABLE:
await this.alterForeignTable(
queryRunner,
schemaName,
tableMigration.name,
tableMigration.columns,
);
break;
default:
throw new Error(
`Migration table action ${tableMigration.action} not supported`,
@ -507,4 +515,29 @@ export class WorkspaceMigrationRunnerService {
COMMENT ON FOREIGN TABLE "${schemaName}"."${name}" IS '@graphql({"primary_key_columns": ["id"], "totalCount": {"enabled": true}})';
`);
}
private async alterForeignTable(
queryRunner: QueryRunner,
schemaName: string,
name: string,
columns: WorkspaceMigrationColumnAction[] | undefined,
) {
const columnUpdatesQuery = columns
?.map((column) => {
switch (column.action) {
case WorkspaceMigrationColumnActionType.DROP:
return `DROP COLUMN "${column.columnName}"`;
case WorkspaceMigrationColumnActionType.CREATE:
return `ADD COLUMN "${column.columnName}" ${column.columnType}`;
default:
return '';
}
})
.filter(Boolean)
.join(', ');
await queryRunner.query(
`ALTER FOREIGN TABLE ${schemaName}."${name}" ${columnUpdatesQuery};`,
);
}
}