Improve cockroach migrator (#6590)

Signed-off-by: Denis Bykhov <bykhov.denis@gmail.com>
This commit is contained in:
Denis Bykhov 2024-09-17 14:49:01 +05:00 committed by GitHub
parent c51122ac8d
commit 434773d794
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 121 additions and 51 deletions

View File

@ -10405,9 +10405,6 @@ packages:
/ajv-formats@2.1.1:
resolution: {integrity: sha512-Wx0Kx52hxE7C18hkMEggYlEifqWZtYaRgouJor+WMdPnQyEK13vgEWyVNup7SoeeoLMsr4kf5h6dOW11I15MUA==}
peerDependenciesMeta:
ajv:
optional: true
dependencies:
ajv: 8.12.0
dev: false
@ -34820,7 +34817,7 @@ packages:
dev: false
file:projects/tool.tgz(bufferutil@4.0.8)(utf-8-validate@6.0.4):
resolution: {integrity: sha512-sZH5yB7zg/kTpuIhLSqPYh0wFgw4aOpsriMq4wad8ZHRzlHASseyJAbEylIP8ltfPbFFN4Yy1nXaUOXS49anHg==, tarball: file:projects/tool.tgz}
resolution: {integrity: sha512-lYSbeq8uIDsDUr/awBsGP6yBmVJO+2iToNOy93OhjZ8meyneJvNUnOeoAOWj/vtmO+UIyEc5pKYHlHJW4NNsAw==, tarball: file:projects/tool.tgz}
id: file:projects/tool.tgz
name: '@rush-temp/tool'
version: 0.0.0
@ -34830,6 +34827,7 @@ packages:
'@types/mime-types': 2.1.4
'@types/minio': 7.0.18
'@types/node': 20.11.19
'@types/pg': 8.11.6
'@types/request': 2.48.12
'@types/ws': 8.5.11
'@typescript-eslint/eslint-plugin': 6.21.0(@typescript-eslint/parser@6.21.0)(eslint@8.56.0)(typescript@5.3.3)
@ -34850,6 +34848,7 @@ packages:
libphonenumber-js: 1.10.56
mime-types: 2.1.35
mongodb: 6.8.0
pg: 8.12.0
prettier: 3.2.5
ts-jest: 29.1.2(esbuild@0.20.1)(jest@29.7.0)(typescript@5.3.3)
ts-node: 10.9.2(@types/node@20.11.19)(typescript@5.3.3)
@ -34869,6 +34868,7 @@ packages:
- kerberos
- mongodb-client-encryption
- node-notifier
- pg-native
- snappy
- socks
- supports-color

View File

@ -51,7 +51,8 @@
"@types/request": "~2.48.8",
"jest": "^29.7.0",
"ts-jest": "^29.1.1",
"@types/jest": "^29.5.5"
"@types/jest": "^29.5.5",
"@types/pg": "^8.11.6"
},
"dependencies": {
"@elastic/elasticsearch": "^7.14.0",
@ -155,6 +156,7 @@
"libphonenumber-js": "^1.9.46",
"mime-types": "~2.1.34",
"mongodb": "^6.8.0",
"pg": "8.12.0",
"ws": "^8.18.0"
}
}

View File

@ -1,11 +1,19 @@
import { type Doc, type WorkspaceId } from '@hcengineering/core'
import { updateWorkspace, type Workspace } from '@hcengineering/account'
import { type BackupClient, type Client, getWorkspaceId, systemAccountEmail, type Doc } from '@hcengineering/core'
import { getMongoClient, getWorkspaceDB } from '@hcengineering/mongo'
import { convertDoc, createTable, getDBClient, retryTxn, translateDomain } from '@hcengineering/postgres'
import { getTransactorEndpoint } from '@hcengineering/server-client'
import { generateToken } from '@hcengineering/server-token'
import { connect } from '@hcengineering/server-tool'
import { type Db, type MongoClient } from 'mongodb'
import { type Pool } from 'pg'
export async function moveFromMongoToPG (
accountDb: Db,
mongoUrl: string,
dbUrl: string | undefined,
workspaces: WorkspaceId[]
workspaces: Workspace[],
region: string
): Promise<void> {
if (dbUrl === undefined) {
throw new Error('dbUrl is required')
@ -18,51 +26,94 @@ export async function moveFromMongoToPG (
for (let index = 0; index < workspaces.length; index++) {
const ws = workspaces[index]
try {
const mongoDB = getWorkspaceDB(mongo, ws)
const collections = await mongoDB.collections()
await createTable(
pgClient,
collections.map((c) => c.collectionName)
)
for (const collection of collections) {
const cursor = collection.find()
const domain = translateDomain(collection.collectionName)
while (true) {
const doc = (await cursor.next()) as Doc | null
if (doc === null) break
try {
const converted = convertDoc(doc, ws.name)
await retryTxn(pgClient, async (client) => {
await client.query(
`INSERT INTO ${domain} (_id, "workspaceId", _class, "createdBy", "modifiedBy", "modifiedOn", "createdOn", space, "attachedTo", data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`,
[
converted._id,
converted.workspaceId,
converted._class,
converted.createdBy,
converted.modifiedBy,
converted.modifiedOn,
converted.createdOn,
converted.space,
converted.attachedTo,
converted.data
]
)
})
} catch (err) {
console.log('error when move doc', doc._id, doc._class, err)
continue
}
}
}
if (index % 100 === 0) {
console.log('Move workspace', index, workspaces.length)
}
await moveWorkspace(accountDb, mongo, pgClient, ws, region)
console.log('Move workspace', index, workspaces.length)
} catch (err) {
console.log('Error when move workspace', ws.name, err)
console.log('Error when move workspace', ws.workspaceName ?? ws.workspace, err)
throw err
}
}
pg.close()
client.close()
}
async function moveWorkspace (
accountDb: Db,
mongo: MongoClient,
pgClient: Pool,
ws: Workspace,
region: string
): Promise<void> {
try {
const wsId = getWorkspaceId(ws.workspace)
const mongoDB = getWorkspaceDB(mongo, wsId)
const collections = await mongoDB.collections()
await createTable(
pgClient,
collections.map((c) => c.collectionName)
)
const token = generateToken(systemAccountEmail, wsId)
const endpoint = await getTransactorEndpoint(token, 'external')
const connection = (await connect(endpoint, wsId, undefined, {
model: 'upgrade'
})) as unknown as Client & BackupClient
for (const collection of collections) {
const cursor = collection.find()
const domain = translateDomain(collection.collectionName)
console.log('move domain', domain)
while (true) {
const doc = (await cursor.next()) as Doc | null
if (doc === null) break
try {
const converted = convertDoc(doc, ws.workspaceName ?? ws.workspace)
await retryTxn(pgClient, async (client) => {
await client.query(
`INSERT INTO ${domain} (_id, "workspaceId", _class, "createdBy", "modifiedBy", "modifiedOn", "createdOn", space, "attachedTo", data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)`,
[
converted._id,
converted.workspaceId,
converted._class,
converted.createdBy ?? converted.modifiedBy,
converted.modifiedBy,
converted.modifiedOn,
converted.createdOn ?? converted.modifiedOn,
converted.space,
converted.attachedTo,
converted.data
]
)
})
} catch (err) {
console.log('error when move doc', doc._id, doc._class, err)
continue
}
}
}
await updateWorkspace(accountDb, ws, { region })
await connection.sendForceClose()
await connection.close()
} catch (err) {
console.log('Error when move workspace', ws.workspaceName ?? ws.workspace, err)
throw err
}
}
export async function moveWorkspaceFromMongoToPG (
accountDb: Db,
mongoUrl: string,
dbUrl: string | undefined,
ws: Workspace,
region: string
): Promise<void> {
if (dbUrl === undefined) {
throw new Error('dbUrl is required')
}
const client = getMongoClient(mongoUrl)
const mongo = await client.getClient()
const pg = getDBClient(dbUrl)
const pgClient = await pg.getClient()
await moveWorkspace(accountDb, mongo, pgClient, ws, region)
pg.close()
client.close()
}

View File

@ -111,7 +111,7 @@ import {
restoreRecruitingTaskTypes
} from './clean'
import { changeConfiguration } from './configuration'
import { moveFromMongoToPG } from './db'
import { moveFromMongoToPG, moveWorkspaceFromMongoToPG } from './db'
import { fixJsonMarkup, migrateMarkup } from './markup'
import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin'
import { importNotion } from './notion'
@ -1521,18 +1521,35 @@ export function devTool (
})
})
program.command('move-to-pg').action(async () => {
program.command('move-to-pg <region>').action(async (region: string) => {
const { mongodbUri, dbUrl } = prepareTools()
await withDatabase(mongodbUri, async (db) => {
const workspaces = await listWorkspacesRaw(db)
workspaces.sort((a, b) => b.lastVisit - a.lastVisit)
await moveFromMongoToPG(
db,
mongodbUri,
dbUrl,
workspaces.map((it) => getWorkspaceId(it.workspace))
workspaces.filter((p) => p.region !== region),
region
)
})
})
program.command('move-workspace-to-pg <workspace> <region>').action(async (workspace: string, region: string) => {
const { mongodbUri, dbUrl } = prepareTools()
await withDatabase(mongodbUri, async (db) => {
const workspaceInfo = await getWorkspaceById(db, workspace)
if (workspaceInfo === null) {
throw new Error(`workspace ${workspace} not found`)
}
if (workspaceInfo.region === region) {
throw new Error(`workspace ${workspace} is already migrated`)
}
await moveWorkspaceFromMongoToPG(db, mongodbUri, dbUrl, workspaceInfo, region)
})
})
program
.command('perfomance')
.option('-p, --parallel', '', false)