Import full distant schema and store in remote server (#5211)

We should not depend on the foreign data wrapper type to manage distant
table. The remote server should be enough to handle the table creation.

Here is the new flow to fetch available tables:
- check if the remote server have available tables already stored
- if not, import full schema in a temporary schema
- copy the tables into the available tables field 
- delete the schema

Left todo:
- update remote server input for postgres so we receive the schema

---------

Co-authored-by: Thomas Trompette <thomast@twenty.com>
This commit is contained in:
Thomas Trompette 2024-04-30 14:18:33 +02:00 committed by GitHub
parent 907f0a1ea6
commit 3a61c922f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 356 additions and 332 deletions

View File

@ -811,7 +811,6 @@ export type RemoteTable = {
export type RemoteTableInput = {
name: Scalars['String']['input'];
remoteServerId: Scalars['ID']['input'];
schema: Scalars['String']['input'];
};
/** Status of the table */

View File

@ -124,6 +124,8 @@ export const RecordShowContainer = ({
: 'inlineFieldMetadataItems',
);
const isReadOnly = objectMetadataItem.isRemote;
return (
<RecoilScope CustomRecoilScopeContext={ShowPageRecoilScopeContext}>
<ShowPageContainer>
@ -162,7 +164,7 @@ export const RecordShowContainer = ({
hotkeyScope: InlineCellHotkeyScope.InlineCell,
}}
>
<RecordInlineCell />
<RecordInlineCell readonly={isReadOnly} />
</FieldContext.Provider>
}
avatarType={recordIdentifier?.avatarType ?? 'rounded'}
@ -191,7 +193,7 @@ export const RecordShowContainer = ({
hotkeyScope: InlineCellHotkeyScope.InlineCell,
}}
>
<RecordInlineCell />
<RecordInlineCell readonly={isReadOnly} />
</FieldContext.Provider>
))}
</PropertyBox>

View File

@ -10,6 +10,7 @@ export const settingsIntegrationPostgreSQLConnectionFormSchema = z.object({
port: z.preprocess((val) => parseInt(val as string), z.number().positive()),
username: z.string().min(1),
password: z.string().min(1),
schema: z.string().min(1),
});
type SettingsIntegrationPostgreSQLConnectionFormValues = z.infer<
@ -42,6 +43,7 @@ export const SettingsIntegrationPostgreSQLConnectionForm = () => {
{ name: 'port' as const, label: 'Port' },
{ name: 'username' as const, label: 'Username' },
{ name: 'password' as const, label: 'Password', type: 'password' },
{ name: 'schema' as const, label: 'Schema' },
].map(({ name, label, type }) => (
<Controller
key={name}

View File

@ -53,13 +53,11 @@ export const SettingsIntegrationDatabaseTablesListCard = ({
await syncRemoteTable({
remoteServerId: connectionId,
name: tableName,
schema: table.schema,
});
} else {
await unsyncRemoteTable({
remoteServerId: connectionId,
name: tableName,
schema: table.schema,
});
}
},

View File

@ -43,6 +43,7 @@ const createRemoteServerInputSchema = newConnectionSchema
password: values.password,
username: values.username,
},
schema: values.schema,
}));
type SettingsIntegrationNewConnectionFormValues = z.infer<

View File

@ -0,0 +1,25 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
export class AddSchemaAndAvailableTablesToServer1714382420165
implements MigrationInterface
{
name = 'AddSchemaAndAvailableTablesToServer1714382420165';
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE "metadata"."remoteServer" ADD "schema" text`,
);
await queryRunner.query(
`ALTER TABLE "metadata"."remoteServer" ADD "availableTables" jsonb`,
);
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE "metadata"."remoteServer" DROP COLUMN "availableTables"`,
);
await queryRunner.query(
`ALTER TABLE "metadata"."remoteServer" DROP COLUMN "schema"`,
);
}
}

View File

@ -59,7 +59,7 @@ import { computeColumnName } from 'src/engine/metadata-modules/field-metadata/ut
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import { validateObjectMetadataInput } from 'src/engine/metadata-modules/object-metadata/utils/validate-object-metadata-input.util';
import { mapUdtNameToFieldType } from 'src/engine/metadata-modules/remote-server/remote-table/remote-postgres-table/utils/remote-postgres-table.util';
import { mapUdtNameToFieldType } from 'src/engine/metadata-modules/remote-server/remote-table/utils/udt-name-mapper.util';
import { ObjectMetadataEntity } from './object-metadata.entity';

View File

@ -21,4 +21,8 @@ export class CreateRemoteServerInput<T extends RemoteServerType> {
@IsOptional()
@Field(() => UserMappingOptionsInput, { nullable: true })
userMappingOptions?: UserMappingOptions;
@IsOptional()
@Field(() => String, { nullable: true })
schema?: string;
}

View File

@ -10,6 +10,7 @@ import {
} from 'typeorm';
import { RemoteTableEntity } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table.entity';
import { DistantTables } from 'src/engine/metadata-modules/remote-server/remote-table/distant-table/types/distant-table';
export enum RemoteServerType {
POSTGRES_FDW = 'postgres_fdw',
@ -49,13 +50,19 @@ export class RemoteServerEntity<T extends RemoteServerType> {
@Column({ nullable: true, type: 'jsonb' })
userMappingOptions: UserMappingOptions;
@Column({ type: 'text', nullable: true })
schema: string;
@Column({ nullable: false, type: 'uuid' })
workspaceId: string;
@Column({ type: 'jsonb', nullable: true })
availableTables: DistantTables;
@OneToMany(() => RemoteTableEntity, (table) => table.server, {
cascade: true,
})
tables: Relation<RemoteTableEntity[]>;
syncedTables: Relation<RemoteTableEntity[]>;
@CreateDateColumn({ type: 'timestamptz' })
createdAt: Date;

View File

@ -2,6 +2,7 @@ import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { ForeignDataWrapperQueryFactory } from 'src/engine/api/graphql/workspace-query-builder/factories/foreign-data-wrapper-query.factory';
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { RemoteServerEntity } from 'src/engine/metadata-modules/remote-server/remote-server.entity';
import { RemoteServerResolver } from 'src/engine/metadata-modules/remote-server/remote-server.resolver';
import { RemoteServerService } from 'src/engine/metadata-modules/remote-server/remote-server.service';
@ -13,6 +14,7 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
TypeOrmModule.forFeature([RemoteServerEntity], 'metadata'),
RemoteTableModule,
WorkspaceDataSourceModule,
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
],
providers: [
RemoteServerService,

View File

@ -24,6 +24,8 @@ import { RemoteTableService } from 'src/engine/metadata-modules/remote-server/re
import { UpdateRemoteServerInput } from 'src/engine/metadata-modules/remote-server/dtos/update-remote-server.input';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { updateRemoteServerRawQuery } from 'src/engine/metadata-modules/remote-server/utils/build-update-remote-server-raw-query.utils';
import { validateRemoteServerType } from 'src/engine/metadata-modules/remote-server/utils/validate-remote-server-type.util';
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
@Injectable()
export class RemoteServerService<T extends RemoteServerType> {
@ -38,14 +40,22 @@ export class RemoteServerService<T extends RemoteServerType> {
private readonly foreignDataWrapperQueryFactory: ForeignDataWrapperQueryFactory,
private readonly remoteTableService: RemoteTableService,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
@InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
) {}
async createOneRemoteServer(
public async createOneRemoteServer(
remoteServerInput: CreateRemoteServerInput<T>,
workspaceId: string,
): Promise<RemoteServerEntity<RemoteServerType>> {
this.validateRemoteServerInputAgainstInjections(remoteServerInput);
validateRemoteServerType(
remoteServerInput.foreignDataWrapperType,
this.featureFlagRepository,
workspaceId,
);
const foreignDataWrapperId = v4();
let remoteServerToCreate = {
@ -99,7 +109,7 @@ export class RemoteServerService<T extends RemoteServerType> {
);
}
async updateOneRemoteServer(
public async updateOneRemoteServer(
remoteServerInput: UpdateRemoteServerInput<T>,
workspaceId: string,
): Promise<RemoteServerEntity<RemoteServerType>> {
@ -178,21 +188,7 @@ export class RemoteServerService<T extends RemoteServerType> {
);
}
private validateRemoteServerInputAgainstInjections(
remoteServerInput: CreateRemoteServerInput<T> | UpdateRemoteServerInput<T>,
) {
if (remoteServerInput.foreignDataWrapperOptions) {
validateObjectAgainstInjections(
remoteServerInput.foreignDataWrapperOptions,
);
}
if (remoteServerInput.userMappingOptions) {
validateObjectAgainstInjections(remoteServerInput.userMappingOptions);
}
}
async deleteOneRemoteServer(
public async deleteOneRemoteServer(
id: string,
workspaceId: string,
): Promise<RemoteServerEntity<RemoteServerType>> {
@ -265,4 +261,18 @@ export class RemoteServerService<T extends RemoteServerType> {
return updateResult[0][0];
}
private validateRemoteServerInputAgainstInjections(
remoteServerInput: CreateRemoteServerInput<T> | UpdateRemoteServerInput<T>,
) {
if (remoteServerInput.foreignDataWrapperOptions) {
validateObjectAgainstInjections(
remoteServerInput.foreignDataWrapperOptions,
);
}
if (remoteServerInput.userMappingOptions) {
validateObjectAgainstInjections(remoteServerInput.userMappingOptions);
}
}
}

View File

@ -0,0 +1,16 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { RemoteServerEntity } from 'src/engine/metadata-modules/remote-server/remote-server.entity';
import { DistantTableService } from 'src/engine/metadata-modules/remote-server/remote-table/distant-table/distant-table.service';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
@Module({
imports: [
WorkspaceDataSourceModule,
TypeOrmModule.forFeature([RemoteServerEntity], 'metadata'),
],
providers: [DistantTableService],
exports: [DistantTableService],
})
export class DistantTableModule {}

View File

@ -0,0 +1,102 @@
import { BadRequestException, Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { EntityManager, Repository } from 'typeorm';
import { v4 } from 'uuid';
import {
RemoteServerEntity,
RemoteServerType,
} from 'src/engine/metadata-modules/remote-server/remote-server.entity';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { DistantTableColumn } from 'src/engine/metadata-modules/remote-server/remote-table/distant-table/types/distant-table-column';
import { DistantTables } from 'src/engine/metadata-modules/remote-server/remote-table/distant-table/types/distant-table';
@Injectable()
export class DistantTableService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
@InjectRepository(RemoteServerEntity, 'metadata')
private readonly remoteServerRepository: Repository<
RemoteServerEntity<RemoteServerType>
>,
) {}
public async fetchDistantTableColumns(
remoteServer: RemoteServerEntity<RemoteServerType>,
tableName: string,
): Promise<DistantTableColumn[]> {
if (!remoteServer.availableTables) {
throw new Error('Remote server available tables are not defined');
}
return remoteServer.availableTables[tableName];
}
public async fetchDistantTableNames(
remoteServer: RemoteServerEntity<RemoteServerType>,
workspaceId: string,
): Promise<string[]> {
const availableTables =
remoteServer.availableTables ??
(await this.createAvailableTables(remoteServer, workspaceId));
return Object.keys(availableTables);
}
private async createAvailableTables(
remoteServer: RemoteServerEntity<RemoteServerType>,
workspaceId: string,
): Promise<DistantTables> {
if (!remoteServer.schema) {
throw new BadRequestException('Remote server schema is not defined');
}
const tmpSchemaId = v4();
const tmpSchemaName = `${workspaceId}_${remoteServer.id}_${tmpSchemaId}`;
const workspaceDataSource =
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
workspaceId,
);
const availableTables = await workspaceDataSource.transaction(
async (entityManager: EntityManager) => {
await entityManager.query(`CREATE SCHEMA "${tmpSchemaName}"`);
await entityManager.query(
`IMPORT FOREIGN SCHEMA "${remoteServer.schema}" FROM SERVER "${remoteServer.foreignDataWrapperId}" INTO "${tmpSchemaName}"`,
);
const createdForeignTableNames = await entityManager.query(
`SELECT table_name, column_name, data_type, udt_name FROM information_schema.columns WHERE table_schema = '${tmpSchemaName}'`,
);
await entityManager.query(`DROP SCHEMA "${tmpSchemaName}" CASCADE`);
return createdForeignTableNames.reduce(
(acc, { table_name, column_name, data_type, udt_name }) => {
if (!acc[table_name]) {
acc[table_name] = [];
}
acc[table_name].push({
columnName: column_name,
dataType: data_type,
udtName: udt_name,
});
return acc;
},
{},
);
},
);
await this.remoteServerRepository.update(remoteServer.id, {
availableTables,
});
return availableTables;
}
}

View File

@ -1,5 +1,5 @@
// Type will evolve as we add more remote table types
export type RemoteTableColumn = {
export type DistantTableColumn = {
columnName: string;
dataType: string;
udtName: string;

View File

@ -0,0 +1,5 @@
import { DistantTableColumn } from 'src/engine/metadata-modules/remote-server/remote-table/distant-table/types/distant-table-column';
export type DistantTables = {
[tableName: string]: DistantTableColumn[];
};

View File

@ -7,7 +7,4 @@ export class RemoteTableInput {
@Field(() => String)
name: string;
@Field(() => String)
schema?: string;
}

View File

@ -1,9 +0,0 @@
import { Module } from '@nestjs/common';
import { RemotePostgresTableService } from 'src/engine/metadata-modules/remote-server/remote-table/remote-postgres-table/remote-postgres-table.service';
@Module({
providers: [RemotePostgresTableService],
exports: [RemotePostgresTableService],
})
export class RemotePostgresTableModule {}

View File

@ -1,83 +0,0 @@
import { Injectable } from '@nestjs/common';
import { DataSource } from 'typeorm';
import {
RemoteServerEntity,
RemoteServerType,
} from 'src/engine/metadata-modules/remote-server/remote-server.entity';
import {
buildPostgresUrl,
EXCLUDED_POSTGRES_SCHEMAS,
} from 'src/engine/metadata-modules/remote-server/remote-table/remote-postgres-table/utils/remote-postgres-table.util';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
import { RemoteTableColumn } from 'src/engine/metadata-modules/remote-server/remote-table/types/remote-table-column';
import { RemoteTable } from 'src/engine/metadata-modules/remote-server/remote-table/types/remote-table';
@Injectable()
export class RemotePostgresTableService {
constructor(private readonly environmentService: EnvironmentService) {}
public async fetchPostgresTableColumnsSchema(
remoteServer: RemoteServerEntity<RemoteServerType>,
tableName: string,
tableSchema: string,
): Promise<RemoteTableColumn[]> {
const dataSource = new DataSource({
url: buildPostgresUrl(
this.environmentService.get('LOGIN_TOKEN_SECRET'),
remoteServer,
),
type: 'postgres',
logging: true,
});
await dataSource.initialize();
const columns = await dataSource.query(
`SELECT column_name, data_type, udt_name FROM information_schema.columns WHERE table_name = '${tableName}' AND table_schema = '${tableSchema}'`,
);
await dataSource.destroy();
return columns.map((column) => ({
columnName: column.column_name,
dataType: column.data_type,
udtName: column.udt_name,
}));
}
public async fetchTablesFromRemotePostgresSchema(
remoteServer: RemoteServerEntity<RemoteServerType>,
): Promise<RemoteTable[]> {
const dataSource = new DataSource({
url: buildPostgresUrl(
this.environmentService.get('LOGIN_TOKEN_SECRET'),
remoteServer,
),
type: 'postgres',
logging: true,
});
await dataSource.initialize();
const schemaNames = await dataSource.query(
`SELECT schema_name FROM information_schema.schemata where schema_name not in ( ${EXCLUDED_POSTGRES_SCHEMAS.map(
(schema) => `'${schema}'`,
).join(', ')} ) order by schema_name limit 1`,
);
const remotePostgresTables = await dataSource.query(
`SELECT table_name, table_schema FROM information_schema.tables WHERE table_schema IN (${schemaNames
.map((schemaName) => `'${schemaName.schema_name}'`)
.join(', ')})`,
);
await dataSource.destroy();
return remotePostgresTables.map((table) => ({
tableName: table.table_name,
tableSchema: table.table_schema,
}));
}
}

View File

@ -1,88 +0,0 @@
import { Repository } from 'typeorm/repository/Repository';
import { FieldMetadataSettings } from 'src/engine/metadata-modules/field-metadata/interfaces/field-metadata-settings.interface';
import { decryptText } from 'src/engine/core-modules/auth/auth.util';
import {
FeatureFlagEntity,
FeatureFlagKeys,
} from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { FieldMetadataType } from 'src/engine/metadata-modules/field-metadata/field-metadata.entity';
import {
RemoteServerEntity,
RemoteServerType,
} from 'src/engine/metadata-modules/remote-server/remote-server.entity';
export const EXCLUDED_POSTGRES_SCHEMAS = [
'information_schema',
'pg_catalog',
'pg_toast',
];
export const buildPostgresUrl = (
secretKey: string,
remoteServer: RemoteServerEntity<RemoteServerType>,
): string => {
const foreignDataWrapperOptions = remoteServer.foreignDataWrapperOptions;
const userMappingOptions = remoteServer.userMappingOptions;
const password = decryptText(userMappingOptions.password, secretKey);
const url = `postgres://${userMappingOptions.username}:${password}@${foreignDataWrapperOptions.host}:${foreignDataWrapperOptions.port}/${foreignDataWrapperOptions.dbname}`;
return url;
};
export const mapUdtNameToFieldType = (udtName: string): FieldMetadataType => {
switch (udtName) {
case 'uuid':
return FieldMetadataType.UUID;
case 'varchar':
return FieldMetadataType.TEXT;
case 'bool':
return FieldMetadataType.BOOLEAN;
case 'timestamp':
case 'timestamptz':
return FieldMetadataType.DATE_TIME;
case 'integer':
case 'int2':
case 'int4':
case 'int8':
return FieldMetadataType.NUMBER;
default:
return FieldMetadataType.TEXT;
}
};
export const mapUdtNameToSettings = (
udtName: string,
): FieldMetadataSettings<FieldMetadataType> | undefined => {
switch (udtName) {
case 'integer':
case 'int2':
case 'int4':
case 'int8':
return {
precision: 0,
} satisfies FieldMetadataSettings<FieldMetadataType.NUMBER>;
default:
return undefined;
}
};
export const isPostgreSQLIntegrationEnabled = async (
featureFlagRepository: Repository<FeatureFlagEntity>,
workspaceId: string,
) => {
const featureFlag = await featureFlagRepository.findOneBy({
workspaceId,
key: FeatureFlagKeys.IsPostgreSQLIntegrationEnabled,
value: true,
});
const featureFlagEnabled = featureFlag && featureFlag.value;
if (!featureFlagEnabled) {
throw new Error('PostgreSQL integration is not enabled');
}
};

View File

@ -31,7 +31,7 @@ export class RemoteTableEntity {
@Column({ nullable: false, type: 'uuid' })
remoteServerId: string;
@ManyToOne(() => RemoteServerEntity, (server) => server.tables, {
@ManyToOne(() => RemoteServerEntity, (server) => server.syncedTables, {
onDelete: 'CASCADE',
})
@JoinColumn({ name: 'remoteServerId' })

View File

@ -1,12 +1,11 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { DataSourceModule } from 'src/engine/metadata-modules/data-source/data-source.module';
import { FieldMetadataModule } from 'src/engine/metadata-modules/field-metadata/field-metadata.module';
import { ObjectMetadataModule } from 'src/engine/metadata-modules/object-metadata/object-metadata.module';
import { RemoteServerEntity } from 'src/engine/metadata-modules/remote-server/remote-server.entity';
import { RemotePostgresTableModule } from 'src/engine/metadata-modules/remote-server/remote-table/remote-postgres-table/remote-postgres-table.module';
import { DistantTableModule } from 'src/engine/metadata-modules/remote-server/remote-table/distant-table/distant-table.module';
import { RemoteTableEntity } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table.entity';
import { RemoteTableResolver } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table.resolver';
import { RemoteTableService } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table.service';
@ -17,15 +16,14 @@ import { WorkspaceMigrationRunnerModule } from 'src/engine/workspace-manager/wor
@Module({
imports: [
DistantTableModule,
TypeOrmModule.forFeature(
[RemoteServerEntity, RemoteTableEntity],
'metadata',
),
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
DataSourceModule,
ObjectMetadataModule,
FieldMetadataModule,
RemotePostgresTableModule,
WorkspaceCacheVersionModule,
WorkspaceMigrationModule,
WorkspaceMigrationRunnerModule,

View File

@ -1,4 +1,4 @@
import { BadRequestException, NotFoundException } from '@nestjs/common';
import { BadRequestException, Logger, NotFoundException } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
@ -10,18 +10,15 @@ import {
} from 'src/engine/metadata-modules/remote-server/remote-server.entity';
import { RemoteTableStatus } from 'src/engine/metadata-modules/remote-server/remote-table/dtos/remote-table.dto';
import {
isPostgreSQLIntegrationEnabled,
mapUdtNameToFieldType,
mapUdtNameToSettings,
} from 'src/engine/metadata-modules/remote-server/remote-table/remote-postgres-table/utils/remote-postgres-table.util';
mapUdtNameToFieldSettings,
} from 'src/engine/metadata-modules/remote-server/remote-table/utils/udt-name-mapper.util';
import { RemoteTableInput } from 'src/engine/metadata-modules/remote-server/remote-table/dtos/remote-table-input';
import { DataSourceService } from 'src/engine/metadata-modules/data-source/data-source.service';
import { ObjectMetadataService } from 'src/engine/metadata-modules/object-metadata/object-metadata.service';
import { CreateObjectInput } from 'src/engine/metadata-modules/object-metadata/dtos/create-object.input';
import { FieldMetadataService } from 'src/engine/metadata-modules/field-metadata/field-metadata.service';
import { CreateFieldInput } from 'src/engine/metadata-modules/field-metadata/dtos/create-field.input';
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { RemotePostgresTableService } from 'src/engine/metadata-modules/remote-server/remote-table/remote-postgres-table/remote-postgres-table.service';
import { WorkspaceCacheVersionService } from 'src/engine/metadata-modules/workspace-cache-version/workspace-cache-version.service';
import { camelCase } from 'src/utils/camel-case';
import { camelToTitleCase } from 'src/utils/camel-to-title-case';
@ -29,17 +26,19 @@ import { WorkspaceMigrationService } from 'src/engine/metadata-modules/workspace
import { WorkspaceMigrationRunnerService } from 'src/engine/workspace-manager/workspace-migration-runner/workspace-migration-runner.service';
import { generateMigrationName } from 'src/engine/metadata-modules/workspace-migration/utils/generate-migration-name.util';
import {
WorkspaceMigrationColumnDefinition,
WorkspaceMigrationForeignColumnDefinition,
WorkspaceMigrationForeignTable,
WorkspaceMigrationTableActionType,
} from 'src/engine/metadata-modules/workspace-migration/workspace-migration.entity';
import { RemoteTableColumn } from 'src/engine/metadata-modules/remote-server/remote-table/types/remote-table-column';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { RemoteTable } from 'src/engine/metadata-modules/remote-server/remote-table/types/remote-table';
import { RemoteTableEntity } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table.entity';
import { getRemoteTableLocalName } from 'src/engine/metadata-modules/remote-server/remote-table/utils/get-remote-table-local-name.util';
import { DistantTableService } from 'src/engine/metadata-modules/remote-server/remote-table/distant-table/distant-table.service';
import { DistantTableColumn } from 'src/engine/metadata-modules/remote-server/remote-table/distant-table/types/distant-table-column';
export class RemoteTableService {
private readonly logger = new Logger(RemoteTableService.name);
constructor(
@InjectRepository(RemoteTableEntity, 'metadata')
private readonly remoteTableRepository: Repository<RemoteTableEntity>,
@ -47,13 +46,11 @@ export class RemoteTableService {
private readonly remoteServerRepository: Repository<
RemoteServerEntity<RemoteServerType>
>,
@InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
private readonly workspaceCacheVersionService: WorkspaceCacheVersionService,
private readonly dataSourceService: DataSourceService,
private readonly objectMetadataService: ObjectMetadataService,
private readonly fieldMetadataService: FieldMetadataService,
private readonly remotePostgresTableService: RemotePostgresTableService,
private readonly distantTableService: DistantTableService,
private readonly workspaceMigrationService: WorkspaceMigrationService,
private readonly workspaceMigrationRunnerService: WorkspaceMigrationRunnerService,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
@ -83,13 +80,16 @@ export class RemoteTableService {
(remoteTable) => remoteTable.distantTableName,
);
const tablesInRemoteSchema =
await this.fetchTablesFromRemoteSchema(remoteServer);
const distantTableNames =
await this.distantTableService.fetchDistantTableNames(
remoteServer,
workspaceId,
);
return tablesInRemoteSchema.map((remoteTable) => ({
name: remoteTable.tableName,
schema: remoteTable.tableSchema,
status: currentRemoteTableDistantNames.includes(remoteTable.tableName)
return distantTableNames.map((tableName) => ({
name: tableName,
schema: remoteServer.schema,
status: currentRemoteTableDistantNames.includes(tableName)
? RemoteTableStatus.SYNCED
: RemoteTableStatus.NOT_SYNCED,
}));
@ -111,12 +111,6 @@ export class RemoteTableService {
}
public async syncRemoteTable(input: RemoteTableInput, workspaceId: string) {
if (!input.schema) {
throw new BadRequestException(
'Schema is required for syncing remote table',
);
}
const remoteServer = await this.remoteServerRepository.findOne({
where: {
id: input.remoteServerId,
@ -161,18 +155,18 @@ export class RemoteTableService {
remoteServerId: remoteServer.id,
});
const remoteTableColumns = await this.fetchTableColumnsSchema(
remoteServer,
input.name,
input.schema,
);
const distantTableColumns =
await this.distantTableService.fetchDistantTableColumns(
remoteServer,
input.name,
);
// We only support remote tables with an id column for now.
const remoteTableIdColumn = remoteTableColumns.find(
const distantTableIdColumn = distantTableColumns.find(
(column) => column.columnName === 'id',
);
if (!remoteTableIdColumn) {
if (!distantTableIdColumn) {
throw new BadRequestException('Remote table must have an id column');
}
@ -181,14 +175,14 @@ export class RemoteTableService {
localTableName,
input,
remoteServer,
remoteTableColumns,
distantTableColumns,
);
await this.createRemoteTableMetadata(
workspaceId,
localTableName,
remoteTableColumns,
remoteTableIdColumn,
distantTableColumns,
distantTableIdColumn,
dataSourceMetatada.id,
);
@ -199,7 +193,7 @@ export class RemoteTableService {
return {
id: remoteTableEntity.id,
name: input.name,
schema: input.schema,
schema: remoteServer.schema,
status: RemoteTableStatus.SYNCED,
};
}
@ -232,7 +226,7 @@ export class RemoteTableService {
return {
name: input.name,
schema: input.schema,
schema: remoteServer.schema,
status: RemoteTableStatus.NOT_SYNCED,
};
}
@ -300,46 +294,6 @@ export class RemoteTableService {
await this.workspaceCacheVersionService.incrementVersion(workspaceId);
}
private async fetchTableColumnsSchema(
remoteServer: RemoteServerEntity<RemoteServerType>,
tableName: string,
tableSchema: string,
): Promise<RemoteTableColumn[]> {
switch (remoteServer.foreignDataWrapperType) {
case RemoteServerType.POSTGRES_FDW:
await isPostgreSQLIntegrationEnabled(
this.featureFlagRepository,
remoteServer.workspaceId,
);
return this.remotePostgresTableService.fetchPostgresTableColumnsSchema(
remoteServer,
tableName,
tableSchema,
);
default:
throw new BadRequestException('Unsupported foreign data wrapper type');
}
}
private async fetchTablesFromRemoteSchema(
remoteServer: RemoteServerEntity<RemoteServerType>,
): Promise<RemoteTable[]> {
switch (remoteServer.foreignDataWrapperType) {
case RemoteServerType.POSTGRES_FDW:
await isPostgreSQLIntegrationEnabled(
this.featureFlagRepository,
remoteServer.workspaceId,
);
return this.remotePostgresTableService.fetchTablesFromRemotePostgresSchema(
remoteServer,
);
default:
throw new BadRequestException('Unsupported foreign data wrapper type');
}
}
private async validateTableNameDoesNotExists(
tableName: string,
workspaceId: string,
@ -382,14 +336,8 @@ export class RemoteTableService {
localTableName: string,
remoteTableInput: RemoteTableInput,
remoteServer: RemoteServerEntity<RemoteServerType>,
remoteTableColumns: RemoteTableColumn[],
distantTableColumns: DistantTableColumn[],
) {
if (!remoteTableInput.schema) {
throw new BadRequestException(
'Schema is required for creating foreign table',
);
}
const workspaceMigration =
await this.workspaceMigrationService.createCustomMigration(
generateMigrationName(`create-foreign-table-${localTableName}`),
@ -399,15 +347,16 @@ export class RemoteTableService {
name: localTableName,
action: WorkspaceMigrationTableActionType.CREATE_FOREIGN_TABLE,
foreignTable: {
columns: remoteTableColumns.map(
columns: distantTableColumns.map(
(column) =>
({
columnName: column.columnName,
columnName: camelCase(column.columnName),
columnType: column.dataType,
}) satisfies WorkspaceMigrationColumnDefinition,
distantColumnName: column.columnName,
}) satisfies WorkspaceMigrationForeignColumnDefinition,
),
referencedTableName: remoteTableInput.name,
referencedTableSchema: remoteTableInput.schema,
referencedTableSchema: remoteServer.schema,
foreignDataWrapperId: remoteServer.foreignDataWrapperId,
} satisfies WorkspaceMigrationForeignTable,
},
@ -431,8 +380,8 @@ export class RemoteTableService {
private async createRemoteTableMetadata(
workspaceId: string,
localTableName: string,
remoteTableColumns: RemoteTableColumn[],
remoteTableIdColumn: RemoteTableColumn,
distantTableColumns: DistantTableColumn[],
distantTableIdColumn: DistantTableColumn,
dataSourceMetadataId: string,
) {
const objectMetadata = await this.objectMetadataService.createOne({
@ -445,33 +394,39 @@ export class RemoteTableService {
workspaceId: workspaceId,
icon: 'IconPlug',
isRemote: true,
primaryKeyColumnType: remoteTableIdColumn.udtName,
// TODO: function should work for other types than Postgres
primaryKeyFieldMetadataSettings: mapUdtNameToSettings(
remoteTableIdColumn.udtName,
primaryKeyColumnType: distantTableIdColumn.udtName,
primaryKeyFieldMetadataSettings: mapUdtNameToFieldSettings(
distantTableIdColumn.udtName,
),
} satisfies CreateObjectInput);
for (const column of remoteTableColumns) {
const field = await this.fieldMetadataService.createOne({
name: column.columnName,
label: camelToTitleCase(camelCase(column.columnName)),
description: 'Field of remote',
// TODO: function should work for other types than Postgres
type: mapUdtNameToFieldType(column.udtName),
workspaceId: workspaceId,
objectMetadataId: objectMetadata.id,
isRemoteCreation: true,
isNullable: true,
icon: 'IconPlug',
// TODO: function should work for other types than Postgres
settings: mapUdtNameToSettings(column.udtName),
} satisfies CreateFieldInput);
for (const column of distantTableColumns) {
const columnName = camelCase(column.columnName);
if (column.columnName === 'id') {
await this.objectMetadataService.updateOne(objectMetadata.id, {
labelIdentifierFieldMetadataId: field.id,
});
// TODO: return error to the user when a column cannot be managed
try {
const field = await this.fieldMetadataService.createOne({
name: columnName,
label: camelToTitleCase(columnName),
description: 'Field of remote',
type: mapUdtNameToFieldType(column.udtName),
workspaceId: workspaceId,
objectMetadataId: objectMetadata.id,
isRemoteCreation: true,
isNullable: true,
icon: 'IconPlug',
settings: mapUdtNameToFieldSettings(column.udtName),
} satisfies CreateFieldInput);
if (columnName === 'id') {
await this.objectMetadataService.updateOne(objectMetadata.id, {
labelIdentifierFieldMetadataId: field.id,
});
}
} catch (error) {
this.logger.error(
`Could not create field ${columnName} for remote table ${localTableName}: ${error}`,
);
}
}
}

View File

@ -1,5 +0,0 @@
// Type will evolve as we add more remote table types
export type RemoteTable = {
tableName: string;
tableSchema: string;
};

View File

@ -0,0 +1,38 @@
import { FieldMetadataSettings } from 'src/engine/metadata-modules/field-metadata/interfaces/field-metadata-settings.interface';
import { FieldMetadataType } from 'src/engine/metadata-modules/field-metadata/field-metadata.entity';
export const mapUdtNameToFieldType = (udtName: string): FieldMetadataType => {
switch (udtName) {
case 'uuid':
return FieldMetadataType.UUID;
case 'varchar':
return FieldMetadataType.TEXT;
case 'bool':
return FieldMetadataType.BOOLEAN;
case 'timestamp':
case 'timestamptz':
return FieldMetadataType.DATE_TIME;
case 'integer':
case 'int2':
case 'int4':
return FieldMetadataType.NUMBER;
default:
return FieldMetadataType.TEXT;
}
};
export const mapUdtNameToFieldSettings = (
udtName: string,
): FieldMetadataSettings<FieldMetadataType> | undefined => {
switch (udtName) {
case 'integer':
case 'int2':
case 'int4':
return {
precision: 0,
} satisfies FieldMetadataSettings<FieldMetadataType.NUMBER>;
default:
return undefined;
}
};

View File

@ -0,0 +1,40 @@
import { BadRequestException } from '@nestjs/common';
import { Repository } from 'typeorm';
import {
FeatureFlagEntity,
FeatureFlagKeys,
} from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { RemoteServerType } from 'src/engine/metadata-modules/remote-server/remote-server.entity';
export const validateRemoteServerType = async (
remoteServerType: RemoteServerType,
featureFlagRepository: Repository<FeatureFlagEntity>,
workspaceId: string,
) => {
const featureFlagKey = getFeatureFlagKey(remoteServerType);
const featureFlag = await featureFlagRepository.findOneBy({
workspaceId,
key: featureFlagKey,
value: true,
});
const featureFlagEnabled = featureFlag && featureFlag.value;
if (!featureFlagEnabled) {
throw new BadRequestException(`Type ${remoteServerType} is not supported.`);
}
};
const getFeatureFlagKey = (remoteServerType: RemoteServerType) => {
switch (remoteServerType) {
case RemoteServerType.POSTGRES_FDW:
return FeatureFlagKeys.IsPostgreSQLIntegrationEnabled;
default:
throw new BadRequestException(
`Type ${remoteServerType} is not supported.`,
);
}
};

View File

@ -62,8 +62,13 @@ export type WorkspaceMigrationCreateComment = {
comment: string;
};
export type WorkspaceMigrationForeignColumnDefinition =
WorkspaceMigrationColumnDefinition & {
distantColumnName: string;
};
export type WorkspaceMigrationForeignTable = {
columns: WorkspaceMigrationColumnDefinition[];
columns: WorkspaceMigrationForeignColumnDefinition[];
referencedTableName: string;
referencedTableSchema: string;
foreignDataWrapperId: string;

View File

@ -489,7 +489,10 @@ export class WorkspaceMigrationRunnerService {
}
const foreignTableColumns = foreignTable.columns
.map((column) => `"${column.columnName}" ${column.columnType}`)
.map(
(column) =>
`"${column.columnName}" ${column.columnType} OPTIONS (column_name '${column.distantColumnName}')`,
)
.join(', ');
await queryRunner.query(