feat(server): add data migration system

This commit is contained in:
forehalo 2023-10-27 17:31:52 +08:00
parent ae6376edee
commit 98d0ac3c90
No known key found for this signature in database
11 changed files with 466 additions and 67 deletions

View File

@ -16,7 +16,7 @@ spec:
containers:
- name: {{ .Chart.Name }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
command: ["yarn", "prisma", "migrate", "deploy"]
command: ["yarn", "predeploy"]
env:
- name: NODE_ENV
value: "{{ .Values.env }}"

View File

@ -0,0 +1,9 @@
-- CreateTable
CREATE TABLE "_data_migrations" (
"id" VARCHAR(36) NOT NULL,
"name" VARCHAR NOT NULL,
"started_at" TIMESTAMPTZ(6) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"finished_at" TIMESTAMPTZ(6),
CONSTRAINT "_data_migrations_pkey" PRIMARY KEY ("id")
);

View File

@ -13,7 +13,9 @@
"dev": "nodemon ./src/index.ts",
"test": "ava --concurrency 1 --serial",
"test:coverage": "c8 ava --concurrency 1 --serial",
"postinstall": "prisma generate"
"postinstall": "prisma generate",
"data-migration": "node --loader ts-node/esm.mjs --es-module-specifier-resolution node ./src/data/app.ts",
"predeploy": "yarn prisma migrate deploy && node --es-module-specifier-resolution node ./dist/data/app.js run"
},
"dependencies": {
"@apollo/server": "^4.9.4",
@ -59,6 +61,7 @@
"keyv": "^4.5.4",
"lodash-es": "^4.17.21",
"nanoid": "^5.0.1",
"nest-commander": "^3.12.0",
"nestjs-throttler-storage-redis": "^0.4.1",
"next-auth": "^4.23.2",
"nodemailer": "^6.9.6",

View File

@ -164,3 +164,12 @@ model NewFeaturesWaitingList {
@@map("new_features_waiting_list")
}
model DataMigration {
id String @id @default(uuid()) @db.VarChar(36)
name String @db.VarChar
startedAt DateTime @default(now()) @map("started_at") @db.Timestamptz(6)
finishedAt DateTime? @map("finished_at") @db.Timestamptz(6)
@@map("_data_migrations")
}

View File

@ -0,0 +1,18 @@
import { Logger, Module } from '@nestjs/common';
import { CommandFactory } from 'nest-commander';
import { PrismaModule } from '../prisma';
import { CreateCommand, NameQuestion } from './commands/create';
import { RevertCommand, RunCommand } from './commands/run';
@Module({
imports: [PrismaModule],
providers: [NameQuestion, CreateCommand, RunCommand, RevertCommand],
})
class AppModule {}
async function bootstrap() {
await CommandFactory.run(AppModule, new Logger());
}
await bootstrap();

View File

@ -0,0 +1,73 @@
import { writeFileSync } from 'node:fs';
import { join } from 'node:path';
import { fileURLToPath } from 'node:url';
import { Logger } from '@nestjs/common';
import { camelCase, snakeCase, upperFirst } from 'lodash-es';
import {
Command,
CommandRunner,
InquirerService,
Question,
QuestionSet,
} from 'nest-commander';
@QuestionSet({ name: 'name-questions' })
export class NameQuestion {
@Question({
name: 'name',
message: 'Name of the data migration script:',
})
parseName(val: string) {
return val.trim();
}
}
@Command({
name: 'create',
arguments: '[name]',
description: 'create a data migration script',
})
export class CreateCommand extends CommandRunner {
logger = new Logger(CreateCommand.name);
constructor(private readonly inquirer: InquirerService) {
super();
}
override async run(inputs: string[]): Promise<void> {
let name = inputs[0];
if (!name) {
name = (
await this.inquirer.ask<{ name: string }>('name-questions', undefined)
).name;
}
const timestamp = Date.now();
const content = this.createScript(upperFirst(camelCase(name)) + timestamp);
const fileName = `${timestamp}-${snakeCase(name)}.ts`;
const filePath = join(
fileURLToPath(import.meta.url),
'../../migrations',
fileName
);
this.logger.log(`Creating ${fileName}...`);
writeFileSync(filePath, content);
this.logger.log('Done');
}
private createScript(name: string) {
const contents = ["import { PrismaService } from '../../prisma';", ''];
contents.push(`export class ${name} {`);
contents.push(' // do the migration');
contents.push(' static async up(db: PrismaService) {}');
contents.push('');
contents.push(' // revert the migration');
contents.push(' static async down(db: PrismaService) {}');
contents.push('}');
return contents.join('\n');
}
}

View File

@ -0,0 +1,149 @@
import { readdirSync } from 'node:fs';
import { join } from 'node:path';
import { fileURLToPath } from 'node:url';
import { Logger } from '@nestjs/common';
import { Command, CommandRunner } from 'nest-commander';
import { PrismaService } from '../../prisma';
interface Migration {
file: string;
name: string;
up: (db: PrismaService) => Promise<void>;
down: (db: PrismaService) => Promise<void>;
}
async function collectMigrations(): Promise<Migration[]> {
const folder = join(fileURLToPath(import.meta.url), '../../migrations');
const migrationFiles = readdirSync(folder)
.filter(desc => desc.endsWith('.ts') && desc !== 'index.ts')
.map(desc => join(folder, desc));
const migrations: Migration[] = await Promise.all(
migrationFiles.map(async file => {
return import(file).then(mod => {
const migration = mod[Object.keys(mod)[0]];
return {
file,
name: migration.name,
up: migration.up,
down: migration.down,
};
});
})
);
return migrations;
}
@Command({
name: 'run',
description: 'Run all pending data migrations',
})
export class RunCommand extends CommandRunner {
logger = new Logger(RunCommand.name);
constructor(private readonly db: PrismaService) {
super();
}
override async run(): Promise<void> {
const migrations = await collectMigrations();
const done: Migration[] = [];
for (const migration of migrations) {
const exists = await this.db.dataMigration.count({
where: {
name: migration.name,
},
});
if (exists) {
continue;
}
try {
this.logger.log(`Running ${migration.name}...`);
const record = await this.db.dataMigration.create({
data: {
name: migration.name,
startedAt: new Date(),
},
});
await migration.up(this.db);
await this.db.dataMigration.update({
where: {
id: record.id,
},
data: {
finishedAt: new Date(),
},
});
done.push(migration);
} catch (e) {
this.logger.error('Failed to run data migration', e);
}
}
this.logger.log(`Done ${done.length} migrations`);
done.forEach(migration => {
this.logger.log(`${migration.name}`);
});
}
}
@Command({
name: 'revert',
arguments: '[name]',
description: 'Revert one data migration with given name',
})
export class RevertCommand extends CommandRunner {
logger = new Logger(RevertCommand.name);
constructor(private readonly db: PrismaService) {
super();
}
override async run(inputs: string[]): Promise<void> {
const name = inputs[0];
if (!name) {
throw new Error('A migration name is required');
}
const migrations = await collectMigrations();
const migration = migrations.find(m => m.name === name);
if (!migration) {
this.logger.error('Available migration names:');
migrations.forEach(m => {
this.logger.error(` - ${m.name}`);
});
throw new Error(`Unknown migration name: ${name}.`);
}
const record = await this.db.dataMigration.findFirst({
where: {
name: migration.name,
},
});
if (!record) {
throw new Error(`Migration ${name} has not been executed.`);
}
try {
this.logger.log(`Reverting ${name}...`);
await migration.down(this.db);
this.logger.log('Done reverting');
} catch (e) {
this.logger.error(`Failed to revert data migration ${name}`, e);
}
await this.db.dataMigration.delete({
where: {
id: record.id,
},
});
}
}

View File

@ -0,0 +1,117 @@
import { applyUpdate, Doc, encodeStateAsUpdate } from 'yjs';
import { PrismaService } from '../../prisma';
import { DocID } from '../../utils/doc';
export class Guid1698398506533 {
// do the migration
static async up(db: PrismaService) {
let turn = 0;
let lastTurnCount = 100;
while (lastTurnCount === 100) {
const docs = await db.snapshot.findMany({
select: {
workspaceId: true,
id: true,
},
skip: turn * 100,
take: 100,
orderBy: {
createdAt: 'asc',
},
});
lastTurnCount = docs.length;
for (const doc of docs) {
const docId = new DocID(doc.id, doc.workspaceId);
// NOTE:
// `doc.id` could be 'space:xxx' or 'xxx'
// `docId.guid` is always 'xxx'
// what we want achieve is:
// if both 'space:xxx' and 'xxx' exist, merge 'space:xxx' to 'xxx' and delete it
// else just modify 'space:xxx' to 'xxx'
if (docId && !docId.isWorkspace && docId.guid !== doc.id) {
const existingUpdate = await db.snapshot.findFirst({
where: {
id: docId.guid,
workspaceId: doc.workspaceId,
},
select: {
blob: true,
},
});
// we have missing update with wrong id used before and need to be recovered
if (existingUpdate) {
const toBeMergeUpdate = await db.snapshot.findFirst({
// id 'space:xxx'
where: {
id: doc.id,
workspaceId: doc.workspaceId,
},
select: {
blob: true,
},
});
// no conflict
// actually unreachable path
if (!toBeMergeUpdate) {
continue;
}
// recover
const yDoc = new Doc();
applyUpdate(yDoc, toBeMergeUpdate.blob);
applyUpdate(yDoc, existingUpdate.blob);
const update = encodeStateAsUpdate(yDoc);
await db.$transaction([
// we already have 'xxx', delete 'space:xxx'
db.snapshot.deleteMany({
where: {
id: doc.id,
workspaceId: doc.workspaceId,
},
}),
db.snapshot.update({
where: {
id_workspaceId: {
id: docId.guid,
workspaceId: doc.workspaceId,
},
},
data: {
blob: Buffer.from(update),
},
}),
]);
} else {
// there is no updates need to be merged
// just modify the id the required one
await db.snapshot.update({
where: {
id_workspaceId: {
id: doc.id,
workspaceId: doc.workspaceId,
},
},
data: {
id: docId.guid,
},
});
}
}
}
turn++;
}
}
// revert the migration
static async down() {
//
}
}

View File

@ -30,7 +30,7 @@ export class ExceptionLogger implements ExceptionFilter {
new Error(
`${requestId ? `requestId-${requestId}: ` : ''}${exception.message}${
shouldVerboseLog ? '\n' + exception.stack : ''
}}`,
}`,
{ cause: exception }
)
);

View File

@ -2,7 +2,6 @@ import {
Inject,
Injectable,
Logger,
OnApplicationBootstrap,
OnModuleDestroy,
OnModuleInit,
} from '@nestjs/common';
@ -14,7 +13,6 @@ import { Config } from '../../config';
import { Metrics } from '../../metrics/metrics';
import { PrismaService } from '../../prisma';
import { mergeUpdatesInApplyWay as jwstMergeUpdates } from '../../storage';
import { DocID } from '../../utils/doc';
function compare(yBinary: Buffer, jwstBinary: Buffer, strict = false): boolean {
if (yBinary.equals(jwstBinary)) {
@ -44,9 +42,7 @@ const MAX_SEQ_NUM = 0x3fffffff; // u31
* along side all the updates that have not been applies to that snapshot(timestamp).
*/
@Injectable()
export class DocManager
implements OnModuleInit, OnModuleDestroy, OnApplicationBootstrap
{
export class DocManager implements OnModuleInit, OnModuleDestroy {
protected logger = new Logger(DocManager.name);
private job: NodeJS.Timeout | null = null;
private seqMap = new Map<string, number>();
@ -60,12 +56,6 @@ export class DocManager
protected readonly metrics: Metrics
) {}
async onApplicationBootstrap() {
if (!this.config.node.test) {
await this.refreshDocGuid();
}
}
onModuleInit() {
if (this.automation) {
this.logger.log('Use Database');
@ -421,56 +411,4 @@ export class DocManager
return last + 1;
}
}
/**
* deal with old records that has wrong guid format
* correct guid with `${non-wsId}:${variant}:${subId}` to `${subId}`
*
* @TODO delete in next release
* @deprecated
*/
private async refreshDocGuid() {
let turn = 0;
let lastTurnCount = 100;
while (lastTurnCount === 100) {
const docs = await this.db.snapshot.findMany({
select: {
workspaceId: true,
id: true,
},
skip: turn * 100,
take: 100,
orderBy: {
createdAt: 'asc',
},
});
lastTurnCount = docs.length;
for (const doc of docs) {
const docId = new DocID(doc.id, doc.workspaceId);
if (docId && !docId.isWorkspace && docId.guid !== doc.id) {
await this.db.snapshot.deleteMany({
where: {
id: docId.guid,
workspaceId: doc.workspaceId,
},
});
await this.db.snapshot.update({
where: {
id_workspaceId: {
id: doc.id,
workspaceId: doc.workspaceId,
},
},
data: {
id: docId.guid,
},
});
}
}
turn++;
}
}
}

View File

@ -722,6 +722,7 @@ __metadata:
keyv: "npm:^4.5.4"
lodash-es: "npm:^4.17.21"
nanoid: "npm:^5.0.1"
nest-commander: "npm:^3.12.0"
nestjs-throttler-storage-redis: "npm:^0.4.1"
next-auth: "npm:^4.23.2"
nodemailer: "npm:^6.9.6"
@ -5273,6 +5274,17 @@ __metadata:
languageName: node
linkType: hard
"@fig/complete-commander@npm:^2.0.1":
version: 2.0.1
resolution: "@fig/complete-commander@npm:2.0.1"
dependencies:
prettier: "npm:^2.3.2"
peerDependencies:
commander: ^8.0.0
checksum: 6acd599c77e4c6630304401cc37d2551383d7ac1b2c2c6da743840184363a3900213f28004186c4f39f6bf7395c9dc5fd90d61622b140230fad772765a15da40
languageName: node
linkType: hard
"@floating-ui/core@npm:^1.4.2":
version: 1.5.0
resolution: "@floating-ui/core@npm:1.5.0"
@ -5337,6 +5349,18 @@ __metadata:
languageName: node
linkType: hard
"@golevelup/nestjs-discovery@npm:4.0.0":
version: 4.0.0
resolution: "@golevelup/nestjs-discovery@npm:4.0.0"
dependencies:
lodash: "npm:^4.17.21"
peerDependencies:
"@nestjs/common": ^10.x
"@nestjs/core": ^10.x
checksum: d74cd6075623ffb26c84f21ee4217db04fb79c4de09021896f675892c598efef7be1a841fa815eea6ec145251fd9222380399af6d395f521f7efd9da92dece88
languageName: node
linkType: hard
"@google-cloud/opentelemetry-cloud-monitoring-exporter@npm:^0.17.0":
version: 0.17.0
resolution: "@google-cloud/opentelemetry-cloud-monitoring-exporter@npm:0.17.0"
@ -17266,6 +17290,13 @@ __metadata:
languageName: node
linkType: hard
"commander@npm:11.0.0":
version: 11.0.0
resolution: "commander@npm:11.0.0"
checksum: 71cf453771c15d4e94afdd76a1e9bb31597dbc5f33130a1d399a4a7bc14eac765ebca7f0e077f347e5119087f6faa0017fd5e3cb6e4fc5c453853334c26162bc
languageName: node
linkType: hard
"commander@npm:11.1.0":
version: 11.1.0
resolution: "commander@npm:11.1.0"
@ -17681,6 +17712,18 @@ __metadata:
languageName: node
linkType: hard
"cosmiconfig@npm:8.2.0":
version: 8.2.0
resolution: "cosmiconfig@npm:8.2.0"
dependencies:
import-fresh: "npm:^3.2.1"
js-yaml: "npm:^4.1.0"
parse-json: "npm:^5.0.0"
path-type: "npm:^4.0.0"
checksum: e0b188f9a672ee7135851bf9d9fc8f0ba00f9769c95fda5af0ebc274804f6aeb713b753e04e706f595e1fbd0fa67c5073840666019068c0296a06057560ab39d
languageName: node
linkType: hard
"cosmiconfig@npm:^6.0.0":
version: 6.0.0
resolution: "cosmiconfig@npm:6.0.0"
@ -22888,6 +22931,29 @@ __metadata:
languageName: node
linkType: hard
"inquirer@npm:8.2.5":
version: 8.2.5
resolution: "inquirer@npm:8.2.5"
dependencies:
ansi-escapes: "npm:^4.2.1"
chalk: "npm:^4.1.1"
cli-cursor: "npm:^3.1.0"
cli-width: "npm:^3.0.0"
external-editor: "npm:^3.0.3"
figures: "npm:^3.0.0"
lodash: "npm:^4.17.21"
mute-stream: "npm:0.0.8"
ora: "npm:^5.4.1"
run-async: "npm:^2.4.0"
rxjs: "npm:^7.5.5"
string-width: "npm:^4.1.0"
strip-ansi: "npm:^6.0.0"
through: "npm:^2.3.6"
wrap-ansi: "npm:^7.0.0"
checksum: 50a240dfeaca37a14e6a6d11d7d6f7da947be3a9fe1e34ac41db6a49fc27022e7b3875ebe8ccd739497359808694488f3509792cc986f9ac48c43135f4e14172
languageName: node
linkType: hard
"inquirer@npm:^8.0.0, inquirer@npm:^8.2.0":
version: 8.2.6
resolution: "inquirer@npm:8.2.6"
@ -26962,6 +27028,23 @@ __metadata:
languageName: node
linkType: hard
"nest-commander@npm:^3.12.0":
version: 3.12.0
resolution: "nest-commander@npm:3.12.0"
dependencies:
"@fig/complete-commander": "npm:^2.0.1"
"@golevelup/nestjs-discovery": "npm:4.0.0"
commander: "npm:11.0.0"
cosmiconfig: "npm:8.2.0"
inquirer: "npm:8.2.5"
peerDependencies:
"@nestjs/common": ^8.0.0 || ^9.0.0 || ^10.0.0
"@nestjs/core": ^8.0.0 || ^9.0.0 || ^10.0.0
"@types/inquirer": ^8.1.3
checksum: 29554f1cef113e0b0fa05a958c81b63613a046c682a2da7f7856b3f03f1e0b4792d76637fa1e47cdac4db98862dba34bbae46b964db71ad72f5bb9fff72ef530
languageName: node
linkType: hard
"nestjs-throttler-storage-redis@npm:^0.4.1":
version: 0.4.1
resolution: "nestjs-throttler-storage-redis@npm:0.4.1"
@ -29154,7 +29237,7 @@ __metadata:
languageName: node
linkType: hard
"prettier@npm:^2.8.0":
"prettier@npm:^2.3.2, prettier@npm:^2.8.0":
version: 2.8.8
resolution: "prettier@npm:2.8.8"
bin: