diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml index 44477bd948..82cc5ac829 100644 --- a/common/config/rush/pnpm-lock.yaml +++ b/common/config/rush/pnpm-lock.yaml @@ -10405,9 +10405,6 @@ packages: /ajv-formats@2.1.1: resolution: {integrity: sha512-Wx0Kx52hxE7C18hkMEggYlEifqWZtYaRgouJor+WMdPnQyEK13vgEWyVNup7SoeeoLMsr4kf5h6dOW11I15MUA==} - peerDependenciesMeta: - ajv: - optional: true dependencies: ajv: 8.12.0 dev: false @@ -34820,7 +34817,7 @@ packages: dev: false file:projects/tool.tgz(bufferutil@4.0.8)(utf-8-validate@6.0.4): - resolution: {integrity: sha512-sZH5yB7zg/kTpuIhLSqPYh0wFgw4aOpsriMq4wad8ZHRzlHASseyJAbEylIP8ltfPbFFN4Yy1nXaUOXS49anHg==, tarball: file:projects/tool.tgz} + resolution: {integrity: sha512-lYSbeq8uIDsDUr/awBsGP6yBmVJO+2iToNOy93OhjZ8meyneJvNUnOeoAOWj/vtmO+UIyEc5pKYHlHJW4NNsAw==, tarball: file:projects/tool.tgz} id: file:projects/tool.tgz name: '@rush-temp/tool' version: 0.0.0 @@ -34830,6 +34827,7 @@ packages: '@types/mime-types': 2.1.4 '@types/minio': 7.0.18 '@types/node': 20.11.19 + '@types/pg': 8.11.6 '@types/request': 2.48.12 '@types/ws': 8.5.11 '@typescript-eslint/eslint-plugin': 6.21.0(@typescript-eslint/parser@6.21.0)(eslint@8.56.0)(typescript@5.3.3) @@ -34850,6 +34848,7 @@ packages: libphonenumber-js: 1.10.56 mime-types: 2.1.35 mongodb: 6.8.0 + pg: 8.12.0 prettier: 3.2.5 ts-jest: 29.1.2(esbuild@0.20.1)(jest@29.7.0)(typescript@5.3.3) ts-node: 10.9.2(@types/node@20.11.19)(typescript@5.3.3) @@ -34869,6 +34868,7 @@ packages: - kerberos - mongodb-client-encryption - node-notifier + - pg-native - snappy - socks - supports-color diff --git a/dev/tool/package.json b/dev/tool/package.json index bd62efdc73..cbb03b0ce6 100644 --- a/dev/tool/package.json +++ b/dev/tool/package.json @@ -51,7 +51,8 @@ "@types/request": "~2.48.8", "jest": "^29.7.0", "ts-jest": "^29.1.1", - "@types/jest": "^29.5.5" + "@types/jest": "^29.5.5", + "@types/pg": "^8.11.6" }, "dependencies": { "@elastic/elasticsearch": "^7.14.0", @@ -155,6 +156,7 @@ "libphonenumber-js": "^1.9.46", "mime-types": "~2.1.34", "mongodb": "^6.8.0", + "pg": "8.12.0", "ws": "^8.18.0" } } diff --git a/dev/tool/src/db.ts b/dev/tool/src/db.ts index 51301c2203..59fb147536 100644 --- a/dev/tool/src/db.ts +++ b/dev/tool/src/db.ts @@ -1,11 +1,19 @@ -import { type Doc, type WorkspaceId } from '@hcengineering/core' +import { updateWorkspace, type Workspace } from '@hcengineering/account' +import { type BackupClient, type Client, getWorkspaceId, systemAccountEmail, type Doc } from '@hcengineering/core' import { getMongoClient, getWorkspaceDB } from '@hcengineering/mongo' import { convertDoc, createTable, getDBClient, retryTxn, translateDomain } from '@hcengineering/postgres' +import { getTransactorEndpoint } from '@hcengineering/server-client' +import { generateToken } from '@hcengineering/server-token' +import { connect } from '@hcengineering/server-tool' +import { type Db, type MongoClient } from 'mongodb' +import { type Pool } from 'pg' export async function moveFromMongoToPG ( + accountDb: Db, mongoUrl: string, dbUrl: string | undefined, - workspaces: WorkspaceId[] + workspaces: Workspace[], + region: string ): Promise { if (dbUrl === undefined) { throw new Error('dbUrl is required') @@ -18,51 +26,94 @@ export async function moveFromMongoToPG ( for (let index = 0; index < workspaces.length; index++) { const ws = workspaces[index] try { - const mongoDB = getWorkspaceDB(mongo, ws) - const collections = await mongoDB.collections() - await createTable( - pgClient, - collections.map((c) => c.collectionName) - ) - for (const collection of collections) { - const cursor = collection.find() - const domain = translateDomain(collection.collectionName) - while (true) { - const doc = (await cursor.next()) as Doc | null - if (doc === null) break - try { - const converted = convertDoc(doc, ws.name) - await retryTxn(pgClient, async (client) => { - await client.query( - `INSERT INTO ${domain} (_id, "workspaceId", _class, "createdBy", "modifiedBy", "modifiedOn", "createdOn", space, "attachedTo", data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, - [ - converted._id, - converted.workspaceId, - converted._class, - converted.createdBy, - converted.modifiedBy, - converted.modifiedOn, - converted.createdOn, - converted.space, - converted.attachedTo, - converted.data - ] - ) - }) - } catch (err) { - console.log('error when move doc', doc._id, doc._class, err) - continue - } - } - } - if (index % 100 === 0) { - console.log('Move workspace', index, workspaces.length) - } + await moveWorkspace(accountDb, mongo, pgClient, ws, region) + console.log('Move workspace', index, workspaces.length) } catch (err) { - console.log('Error when move workspace', ws.name, err) + console.log('Error when move workspace', ws.workspaceName ?? ws.workspace, err) throw err } } pg.close() client.close() } + +async function moveWorkspace ( + accountDb: Db, + mongo: MongoClient, + pgClient: Pool, + ws: Workspace, + region: string +): Promise { + try { + const wsId = getWorkspaceId(ws.workspace) + const mongoDB = getWorkspaceDB(mongo, wsId) + const collections = await mongoDB.collections() + await createTable( + pgClient, + collections.map((c) => c.collectionName) + ) + const token = generateToken(systemAccountEmail, wsId) + const endpoint = await getTransactorEndpoint(token, 'external') + const connection = (await connect(endpoint, wsId, undefined, { + model: 'upgrade' + })) as unknown as Client & BackupClient + for (const collection of collections) { + const cursor = collection.find() + const domain = translateDomain(collection.collectionName) + console.log('move domain', domain) + while (true) { + const doc = (await cursor.next()) as Doc | null + if (doc === null) break + try { + const converted = convertDoc(doc, ws.workspaceName ?? ws.workspace) + await retryTxn(pgClient, async (client) => { + await client.query( + `INSERT INTO ${domain} (_id, "workspaceId", _class, "createdBy", "modifiedBy", "modifiedOn", "createdOn", space, "attachedTo", data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`, + [ + converted._id, + converted.workspaceId, + converted._class, + converted.createdBy ?? converted.modifiedBy, + converted.modifiedBy, + converted.modifiedOn, + converted.createdOn ?? converted.modifiedOn, + converted.space, + converted.attachedTo, + converted.data + ] + ) + }) + } catch (err) { + console.log('error when move doc', doc._id, doc._class, err) + continue + } + } + } + await updateWorkspace(accountDb, ws, { region }) + await connection.sendForceClose() + await connection.close() + } catch (err) { + console.log('Error when move workspace', ws.workspaceName ?? ws.workspace, err) + throw err + } +} + +export async function moveWorkspaceFromMongoToPG ( + accountDb: Db, + mongoUrl: string, + dbUrl: string | undefined, + ws: Workspace, + region: string +): Promise { + if (dbUrl === undefined) { + throw new Error('dbUrl is required') + } + const client = getMongoClient(mongoUrl) + const mongo = await client.getClient() + const pg = getDBClient(dbUrl) + const pgClient = await pg.getClient() + + await moveWorkspace(accountDb, mongo, pgClient, ws, region) + pg.close() + client.close() +} diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 2fbb945a10..655292ffb1 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -111,7 +111,7 @@ import { restoreRecruitingTaskTypes } from './clean' import { changeConfiguration } from './configuration' -import { moveFromMongoToPG } from './db' +import { moveFromMongoToPG, moveWorkspaceFromMongoToPG } from './db' import { fixJsonMarkup, migrateMarkup } from './markup' import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin' import { importNotion } from './notion' @@ -1521,18 +1521,35 @@ export function devTool ( }) }) - program.command('move-to-pg').action(async () => { + program.command('move-to-pg ').action(async (region: string) => { const { mongodbUri, dbUrl } = prepareTools() await withDatabase(mongodbUri, async (db) => { const workspaces = await listWorkspacesRaw(db) + workspaces.sort((a, b) => b.lastVisit - a.lastVisit) await moveFromMongoToPG( + db, mongodbUri, dbUrl, - workspaces.map((it) => getWorkspaceId(it.workspace)) + workspaces.filter((p) => p.region !== region), + region ) }) }) + program.command('move-workspace-to-pg ').action(async (workspace: string, region: string) => { + const { mongodbUri, dbUrl } = prepareTools() + await withDatabase(mongodbUri, async (db) => { + const workspaceInfo = await getWorkspaceById(db, workspace) + if (workspaceInfo === null) { + throw new Error(`workspace ${workspace} not found`) + } + if (workspaceInfo.region === region) { + throw new Error(`workspace ${workspace} is already migrated`) + } + await moveWorkspaceFromMongoToPG(db, mongodbUri, dbUrl, workspaceInfo, region) + }) + }) + program .command('perfomance') .option('-p, --parallel', '', false)