mirror of
https://github.com/twentyhq/twenty.git
synced 2025-01-06 03:45:15 +03:00
Add backfill position job by workspace (#5725)
- Removing existing listener that was backfilling created records without position - Switch to a job that backfill all objects within workspace - Adapting `FIND_BY_POSITION` so it can fetch objects without position. Currently we needed to input a number
This commit is contained in:
parent
b1f12d7257
commit
25f4e44aec
@ -19,14 +19,14 @@ describe('RecordPositionQueryFactory', () => {
|
||||
it('should return query and params for FIND_BY_POSITION', async () => {
|
||||
const positionValue = 1;
|
||||
const queryType = RecordPositionQueryType.FIND_BY_POSITION;
|
||||
const [query, params] = await factory.create(
|
||||
const [query, params] = factory.create(
|
||||
{ positionValue, recordPositionQueryType: queryType },
|
||||
objectMetadataItem,
|
||||
dataSourceSchema,
|
||||
);
|
||||
|
||||
expect(query).toEqual(
|
||||
`SELECT position FROM ${dataSourceSchema}."${objectMetadataItem.nameSingular}"
|
||||
`SELECT id, position FROM ${dataSourceSchema}."${objectMetadataItem.nameSingular}"
|
||||
WHERE "position" = $1`,
|
||||
);
|
||||
expect(params).toEqual([positionValue]);
|
||||
@ -34,7 +34,7 @@ describe('RecordPositionQueryFactory', () => {
|
||||
|
||||
it('should return query and params for FIND_MIN_POSITION', async () => {
|
||||
const queryType = RecordPositionQueryType.FIND_MIN_POSITION;
|
||||
const [query, params] = await factory.create(
|
||||
const [query, params] = factory.create(
|
||||
{ recordPositionQueryType: queryType },
|
||||
objectMetadataItem,
|
||||
dataSourceSchema,
|
||||
@ -48,7 +48,7 @@ describe('RecordPositionQueryFactory', () => {
|
||||
|
||||
it('should return query and params for FIND_MAX_POSITION', async () => {
|
||||
const queryType = RecordPositionQueryType.FIND_MAX_POSITION;
|
||||
const [query, params] = await factory.create(
|
||||
const [query, params] = factory.create(
|
||||
{ recordPositionQueryType: queryType },
|
||||
objectMetadataItem,
|
||||
dataSourceSchema,
|
||||
@ -64,7 +64,7 @@ describe('RecordPositionQueryFactory', () => {
|
||||
const positionValue = 1;
|
||||
const recordId = '1';
|
||||
const queryType = RecordPositionQueryType.UPDATE_POSITION;
|
||||
const [query, params] = await factory.create(
|
||||
const [query, params] = factory.create(
|
||||
{ positionValue, recordId, recordPositionQueryType: queryType },
|
||||
objectMetadataItem,
|
||||
dataSourceSchema,
|
||||
|
@ -10,7 +10,7 @@ export enum RecordPositionQueryType {
|
||||
}
|
||||
|
||||
type FindByPositionQueryArgs = {
|
||||
positionValue: number;
|
||||
positionValue: number | null;
|
||||
recordPositionQueryType: RecordPositionQueryType.FIND_BY_POSITION;
|
||||
};
|
||||
|
||||
@ -77,10 +77,12 @@ export class RecordPositionQueryFactory {
|
||||
name: string,
|
||||
dataSourceSchema: string,
|
||||
): [RecordPositionQuery, RecordPositionQueryParams] {
|
||||
const positionStringParam = positionValue ? '= $1' : 'IS NULL';
|
||||
|
||||
return [
|
||||
`SELECT position FROM ${dataSourceSchema}."${name}"
|
||||
WHERE "position" = $1`,
|
||||
[positionValue],
|
||||
`SELECT id, position FROM ${dataSourceSchema}."${name}"
|
||||
WHERE "position" ${positionStringParam}`,
|
||||
positionValue ? [positionValue] : [],
|
||||
];
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,57 @@
|
||||
import { Inject } from '@nestjs/common';
|
||||
|
||||
import { Command, CommandRunner, Option } from 'nest-commander';
|
||||
|
||||
import {
|
||||
RecordPositionBackfillJob,
|
||||
RecordPositionBackfillJobData,
|
||||
} from 'src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job';
|
||||
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
|
||||
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
|
||||
|
||||
export type RecordPositionBackfillCommandOptions = {
|
||||
workspaceId: string;
|
||||
dryRun?: boolean;
|
||||
};
|
||||
|
||||
@Command({
|
||||
name: 'migrate-0.20:backfill-record-position',
|
||||
description: 'Backfill record position',
|
||||
})
|
||||
export class RecordPositionBackfillCommand extends CommandRunner {
|
||||
constructor(
|
||||
@Inject(MessageQueue.recordPositionBackfillQueue)
|
||||
private readonly messageQueueService: MessageQueueService,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
@Option({
|
||||
flags: '-w, --workspace-id [workspace_id]',
|
||||
description: 'workspace id',
|
||||
required: true,
|
||||
})
|
||||
parseWorkspaceId(value: string): string {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Option({
|
||||
flags: '-d, --dry-run [dry run]',
|
||||
description: 'Dry run: Log backfill actions.',
|
||||
required: false,
|
||||
})
|
||||
dryRun(value: string): boolean {
|
||||
return Boolean(value);
|
||||
}
|
||||
|
||||
async run(
|
||||
_passedParam: string[],
|
||||
options: RecordPositionBackfillCommandOptions,
|
||||
): Promise<void> {
|
||||
this.messageQueueService.add<RecordPositionBackfillJobData>(
|
||||
RecordPositionBackfillJob.name,
|
||||
{ workspaceId: options.workspaceId, dryRun: options.dryRun ?? false },
|
||||
{ retryLimit: 3 },
|
||||
);
|
||||
}
|
||||
}
|
@ -6,8 +6,7 @@ import { RecordPositionBackfillService } from 'src/engine/api/graphql/workspace-
|
||||
|
||||
export type RecordPositionBackfillJobData = {
|
||||
workspaceId: string;
|
||||
objectMetadata: { nameSingular: string; isCustom: boolean };
|
||||
recordId: string;
|
||||
dryRun: boolean;
|
||||
};
|
||||
|
||||
@Injectable()
|
||||
@ -19,10 +18,6 @@ export class RecordPositionBackfillJob
|
||||
) {}
|
||||
|
||||
async handle(data: RecordPositionBackfillJobData): Promise<void> {
|
||||
this.recordPositionBackfillService.backfill(
|
||||
data.workspaceId,
|
||||
data.objectMetadata,
|
||||
data.recordId,
|
||||
);
|
||||
this.recordPositionBackfillService.backfill(data.workspaceId, data.dryRun);
|
||||
}
|
||||
}
|
||||
|
@ -1,59 +0,0 @@
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import { OnEvent } from '@nestjs/event-emitter';
|
||||
|
||||
import { ObjectMetadataInterface } from 'src/engine/metadata-modules/field-metadata/interfaces/object-metadata.interface';
|
||||
|
||||
import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event';
|
||||
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
|
||||
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
|
||||
import {
|
||||
RecordPositionBackfillJob,
|
||||
RecordPositionBackfillJobData,
|
||||
} from 'src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job';
|
||||
|
||||
@Injectable()
|
||||
export class RecordPositionListener {
|
||||
constructor(
|
||||
@Inject(MessageQueue.recordPositionBackfillQueue)
|
||||
private readonly messageQueueService: MessageQueueService,
|
||||
) {}
|
||||
|
||||
@OnEvent('*.created')
|
||||
async handleAllCreate(payload: ObjectRecordCreateEvent<any>) {
|
||||
if (!hasPositionField(payload.objectMetadata)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (hasPositionSet(payload.properties.after)) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.messageQueueService.add<RecordPositionBackfillJobData>(
|
||||
RecordPositionBackfillJob.name,
|
||||
{
|
||||
workspaceId: payload.workspaceId,
|
||||
recordId: payload.recordId,
|
||||
objectMetadata: {
|
||||
nameSingular: payload.objectMetadata.nameSingular,
|
||||
isCustom: payload.objectMetadata.isCustom,
|
||||
},
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: use objectMetadata instead of hardcoded standard objects name
|
||||
const hasPositionField = (
|
||||
createdObjectMetadata: ObjectMetadataInterface,
|
||||
): boolean => {
|
||||
return (
|
||||
createdObjectMetadata.isCustom ||
|
||||
['opportunity', 'company', 'people'].includes(
|
||||
createdObjectMetadata.nameSingular,
|
||||
)
|
||||
);
|
||||
};
|
||||
|
||||
const hasPositionSet = (createdRecord: any): boolean => {
|
||||
return !!createdRecord?.position;
|
||||
};
|
@ -0,0 +1,139 @@
|
||||
import { TestingModule, Test } from '@nestjs/testing';
|
||||
|
||||
import { RecordPositionQueryFactory } from 'src/engine/api/graphql/workspace-query-builder/factories/record-position-query.factory';
|
||||
import { RecordPositionFactory } from 'src/engine/api/graphql/workspace-query-runner/factories/record-position.factory';
|
||||
import { RecordPositionBackfillService } from 'src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-service';
|
||||
import { ObjectMetadataService } from 'src/engine/metadata-modules/object-metadata/object-metadata.service';
|
||||
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
|
||||
|
||||
describe('RecordPositionBackfillService', () => {
|
||||
let recordPositionQueryFactory;
|
||||
let recordPositionFactory;
|
||||
let objectMetadataService;
|
||||
let workspaceDataSourceService;
|
||||
|
||||
let service: RecordPositionBackfillService;
|
||||
|
||||
beforeEach(async () => {
|
||||
recordPositionQueryFactory = {
|
||||
create: jest.fn().mockReturnValue(['query', []]),
|
||||
};
|
||||
|
||||
recordPositionFactory = {
|
||||
create: jest.fn().mockResolvedValue([
|
||||
{
|
||||
position: 1,
|
||||
},
|
||||
]),
|
||||
};
|
||||
|
||||
objectMetadataService = {
|
||||
findManyWithinWorkspace: jest.fn().mockReturnValue([]),
|
||||
};
|
||||
|
||||
workspaceDataSourceService = {
|
||||
getSchemaName: jest.fn().mockReturnValue('schemaName'),
|
||||
executeRawQuery: jest.fn().mockResolvedValue([]),
|
||||
};
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
RecordPositionBackfillService,
|
||||
{
|
||||
provide: RecordPositionQueryFactory,
|
||||
useValue: recordPositionQueryFactory,
|
||||
},
|
||||
{
|
||||
provide: RecordPositionFactory,
|
||||
useValue: recordPositionFactory,
|
||||
},
|
||||
{
|
||||
provide: WorkspaceDataSourceService,
|
||||
useValue: workspaceDataSourceService,
|
||||
},
|
||||
{
|
||||
provide: ObjectMetadataService,
|
||||
useValue: objectMetadataService,
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
service = module.get<RecordPositionBackfillService>(
|
||||
RecordPositionBackfillService,
|
||||
);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
|
||||
it('should be defined', () => {
|
||||
expect(service).toBeDefined();
|
||||
});
|
||||
|
||||
it('when no object metadata found, should do nothing', async () => {
|
||||
await service.backfill('workspaceId', false);
|
||||
expect(workspaceDataSourceService.executeRawQuery).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('when objectMetadata without position, should do nothing', async () => {
|
||||
objectMetadataService.findManyWithinWorkspace.mockReturnValue([
|
||||
{
|
||||
id: '1',
|
||||
nameSingular: 'name',
|
||||
fields: [],
|
||||
},
|
||||
]);
|
||||
await service.backfill('workspaceId', false);
|
||||
expect(workspaceDataSourceService.executeRawQuery).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('when objectMetadata but all record with position, should create and run query once', async () => {
|
||||
objectMetadataService.findManyWithinWorkspace.mockReturnValue([
|
||||
{
|
||||
id: '1',
|
||||
nameSingular: 'company',
|
||||
fields: [],
|
||||
},
|
||||
]);
|
||||
await service.backfill('workspaceId', false);
|
||||
expect(workspaceDataSourceService.executeRawQuery).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('when record without position, should create and run query twice', async () => {
|
||||
objectMetadataService.findManyWithinWorkspace.mockReturnValue([
|
||||
{
|
||||
id: '1',
|
||||
nameSingular: 'company',
|
||||
fields: [],
|
||||
},
|
||||
]);
|
||||
workspaceDataSourceService.executeRawQuery.mockResolvedValueOnce([
|
||||
{
|
||||
id: '1',
|
||||
},
|
||||
]);
|
||||
await service.backfill('workspaceId', false);
|
||||
expect(workspaceDataSourceService.executeRawQuery).toHaveBeenCalledTimes(2);
|
||||
expect(recordPositionFactory.create).toHaveBeenCalledTimes(1);
|
||||
expect(recordPositionQueryFactory.create).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('when dryRun is true, should not update position', async () => {
|
||||
objectMetadataService.findManyWithinWorkspace.mockReturnValue([
|
||||
{
|
||||
id: '1',
|
||||
nameSingular: 'company',
|
||||
fields: [],
|
||||
},
|
||||
]);
|
||||
workspaceDataSourceService.executeRawQuery.mockResolvedValueOnce([
|
||||
{
|
||||
id: '1',
|
||||
},
|
||||
]);
|
||||
await service.backfill('workspaceId', true);
|
||||
expect(workspaceDataSourceService.executeRawQuery).toHaveBeenCalledTimes(1);
|
||||
expect(recordPositionFactory.create).toHaveBeenCalledTimes(1);
|
||||
expect(recordPositionQueryFactory.create).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
@ -4,9 +4,10 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
|
||||
import { RecordPositionQueryFactory } from 'src/engine/api/graphql/workspace-query-builder/factories/record-position-query.factory';
|
||||
import { RecordPositionFactory } from 'src/engine/api/graphql/workspace-query-runner/factories/record-position.factory';
|
||||
import { RecordPositionBackfillService } from 'src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-service';
|
||||
import { ObjectMetadataModule } from 'src/engine/metadata-modules/object-metadata/object-metadata.module';
|
||||
|
||||
@Module({
|
||||
imports: [WorkspaceDataSourceModule],
|
||||
imports: [WorkspaceDataSourceModule, ObjectMetadataModule],
|
||||
providers: [
|
||||
RecordPositionFactory,
|
||||
RecordPositionQueryFactory,
|
||||
|
@ -1,6 +1,6 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import { ObjectMetadataInterface } from 'src/engine/metadata-modules/field-metadata/interfaces/object-metadata.interface';
|
||||
import { isDefined } from 'class-validator';
|
||||
|
||||
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
|
||||
import {
|
||||
@ -8,44 +8,112 @@ import {
|
||||
RecordPositionQueryType,
|
||||
} from 'src/engine/api/graphql/workspace-query-builder/factories/record-position-query.factory';
|
||||
import { RecordPositionFactory } from 'src/engine/api/graphql/workspace-query-runner/factories/record-position.factory';
|
||||
import { ObjectMetadataService } from 'src/engine/metadata-modules/object-metadata/object-metadata.service';
|
||||
import { hasPositionField } from 'src/engine/metadata-modules/object-metadata/utils/has-position-field.util';
|
||||
|
||||
@Injectable()
|
||||
export class RecordPositionBackfillService {
|
||||
private readonly logger = new Logger(RecordPositionBackfillService.name);
|
||||
constructor(
|
||||
private readonly objectMetadataService: ObjectMetadataService,
|
||||
private readonly recordPositionFactory: RecordPositionFactory,
|
||||
private readonly recordPositionQueryFactory: RecordPositionQueryFactory,
|
||||
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
|
||||
) {}
|
||||
|
||||
async backfill(
|
||||
workspaceId: string,
|
||||
objectMetadata: { nameSingular: string; isCustom: boolean },
|
||||
recordId: string,
|
||||
) {
|
||||
const position = await this.recordPositionFactory.create(
|
||||
'last',
|
||||
objectMetadata as ObjectMetadataInterface,
|
||||
workspaceId,
|
||||
async backfill(workspaceId: string, dryRun: boolean) {
|
||||
this.logger.log(
|
||||
`Starting backfilling record positions for workspace ${workspaceId}`,
|
||||
);
|
||||
|
||||
const dataSourceSchema =
|
||||
this.workspaceDataSourceService.getSchemaName(workspaceId);
|
||||
|
||||
const [query, params] = await this.recordPositionQueryFactory.create(
|
||||
{
|
||||
recordPositionQueryType: RecordPositionQueryType.UPDATE_POSITION,
|
||||
recordId,
|
||||
positionValue: position,
|
||||
},
|
||||
objectMetadata as ObjectMetadataInterface,
|
||||
dataSourceSchema,
|
||||
);
|
||||
const objectMetadataEntities =
|
||||
await this.objectMetadataService.findManyWithinWorkspace(workspaceId, {
|
||||
where: { isSystem: false },
|
||||
});
|
||||
|
||||
this.workspaceDataSourceService.executeRawQuery(
|
||||
query,
|
||||
params,
|
||||
workspaceId,
|
||||
undefined,
|
||||
);
|
||||
const objectMetadataWithPosition =
|
||||
objectMetadataEntities.filter(hasPositionField);
|
||||
|
||||
for (const objectMetadata of objectMetadataWithPosition) {
|
||||
const [recordsWithoutPositionQuery, recordsWithoutPositionQueryParams] =
|
||||
this.recordPositionQueryFactory.create(
|
||||
{
|
||||
recordPositionQueryType: RecordPositionQueryType.FIND_BY_POSITION,
|
||||
positionValue: null,
|
||||
},
|
||||
objectMetadata,
|
||||
dataSourceSchema,
|
||||
);
|
||||
|
||||
const recordsWithoutPosition =
|
||||
await this.workspaceDataSourceService.executeRawQuery(
|
||||
recordsWithoutPositionQuery,
|
||||
recordsWithoutPositionQueryParams,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
if (
|
||||
!isDefined(recordsWithoutPosition) ||
|
||||
recordsWithoutPosition?.length === 0
|
||||
) {
|
||||
this.logger.log(
|
||||
`No records without position for ${objectMetadata.nameSingular}`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
const position = await this.recordPositionFactory.create(
|
||||
'last',
|
||||
{
|
||||
isCustom: objectMetadata.isCustom,
|
||||
nameSingular: objectMetadata.nameSingular,
|
||||
},
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
for (
|
||||
let recordIndex = 0;
|
||||
recordIndex < recordsWithoutPosition.length;
|
||||
recordIndex++
|
||||
) {
|
||||
const recordId = recordsWithoutPosition[recordIndex].id;
|
||||
|
||||
if (!recordId) {
|
||||
this.logger.log(
|
||||
`Fetched record without id for ${objectMetadata.nameSingular}`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
const backfilledPosition = position + recordIndex;
|
||||
|
||||
this.logger.log(
|
||||
`Backfilling position ${backfilledPosition} for ${objectMetadata.nameSingular} ${recordId}`,
|
||||
);
|
||||
|
||||
if (dryRun) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const [query, params] = this.recordPositionQueryFactory.create(
|
||||
{
|
||||
recordPositionQueryType: RecordPositionQueryType.UPDATE_POSITION,
|
||||
recordId: recordsWithoutPosition[recordIndex].id,
|
||||
positionValue: position + recordIndex,
|
||||
},
|
||||
objectMetadata,
|
||||
dataSourceSchema,
|
||||
);
|
||||
|
||||
await this.workspaceDataSourceService.executeRawQuery(
|
||||
query,
|
||||
params,
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5,7 +5,6 @@ import { WorkspaceQueryBuilderModule } from 'src/engine/api/graphql/workspace-qu
|
||||
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
|
||||
import { WorkspacePreQueryHookModule } from 'src/engine/api/graphql/workspace-query-runner/workspace-pre-query-hook/workspace-pre-query-hook.module';
|
||||
import { workspaceQueryRunnerFactories } from 'src/engine/api/graphql/workspace-query-runner/factories';
|
||||
import { RecordPositionListener } from 'src/engine/api/graphql/workspace-query-runner/listeners/record-position.listener';
|
||||
import { AuthModule } from 'src/engine/core-modules/auth/auth.module';
|
||||
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
|
||||
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||
@ -13,6 +12,7 @@ import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/sta
|
||||
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
|
||||
import { TelemetryListener } from 'src/engine/api/graphql/workspace-query-runner/listeners/telemetry.listener';
|
||||
import { AnalyticsModule } from 'src/engine/core-modules/analytics/analytics.module';
|
||||
import { RecordPositionBackfillCommand } from 'src/engine/api/graphql/workspace-query-runner/commands/0-20-record-position-backfill.command';
|
||||
|
||||
import { WorkspaceQueryRunnerService } from './workspace-query-runner.service';
|
||||
|
||||
@ -31,9 +31,9 @@ import { EntityEventsToDbListener } from './listeners/entity-events-to-db.listen
|
||||
providers: [
|
||||
WorkspaceQueryRunnerService,
|
||||
...workspaceQueryRunnerFactories,
|
||||
RecordPositionListener,
|
||||
EntityEventsToDbListener,
|
||||
TelemetryListener,
|
||||
RecordPositionBackfillCommand,
|
||||
],
|
||||
exports: [WorkspaceQueryRunnerService],
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user