Speedup move tool and fix backup error (#6987)
Some checks are pending
CI / build (push) Waiting to run
CI / svelte-check (push) Blocked by required conditions
CI / formatting (push) Blocked by required conditions
CI / test (push) Blocked by required conditions
CI / uitest (push) Waiting to run
CI / uitest-pg (push) Waiting to run
CI / uitest-qms (push) Waiting to run
CI / docker-build (push) Blocked by required conditions
CI / dist-build (push) Blocked by required conditions

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-10-19 01:24:39 +07:00 committed by GitHub
parent 1192e23d18
commit 7bd0db43c4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 398 additions and 41 deletions

View File

@ -15,7 +15,6 @@
//
import accountPlugin, {
type AccountDB,
assignWorkspace,
confirmEmail,
createAcc,
@ -34,6 +33,7 @@ import accountPlugin, {
setAccountAdmin,
setRole,
updateWorkspace,
type AccountDB,
type Workspace
} from '@hcengineering/account'
import { setMetadata } from '@hcengineering/platform'
@ -41,13 +41,21 @@ import {
backup,
backupFind,
backupList,
backupRemoveLast,
backupSize,
checkBackupIntegrity,
compactBackup,
createFileBackupStorage,
createStorageBackupStorage,
restore
} from '@hcengineering/server-backup'
import serverClientPlugin, { BlobClient, createClient, getTransactorEndpoint } from '@hcengineering/server-client'
import serverClientPlugin, {
BlobClient,
createClient,
getTransactorEndpoint,
listAccountWorkspaces,
updateBackupInfo
} from '@hcengineering/server-client'
import { getServerPipeline, registerServerPlugins, registerStringLoaders } from '@hcengineering/server-pipeline'
import serverToken, { decodeToken, generateToken } from '@hcengineering/server-token'
import toolPlugin, { FileModelLogger } from '@hcengineering/server-tool'
@ -65,6 +73,7 @@ import core, {
getWorkspaceId,
MeasureMetricsContext,
metricsToString,
RateLimiter,
systemAccountEmail,
versionToString,
type Data,
@ -77,8 +86,8 @@ import core, {
} from '@hcengineering/core'
import { consoleModelLogger, type MigrateOperation } from '@hcengineering/model'
import contact from '@hcengineering/model-contact'
import { backupDownload } from '@hcengineering/server-backup/src/backup'
import { getMongoClient, getWorkspaceMongoDB, shutdown } from '@hcengineering/mongo'
import { backupDownload } from '@hcengineering/server-backup/src/backup'
import type { StorageAdapter, StorageAdapterEx } from '@hcengineering/server-core'
import { deepEqual } from 'fast-equals'
@ -104,7 +113,7 @@ import {
restoreRecruitingTaskTypes
} from './clean'
import { changeConfiguration } from './configuration'
import { moveFromMongoToPG, moveWorkspaceFromMongoToPG, moveAccountDbFromMongoToPG } from './db'
import { moveAccountDbFromMongoToPG, moveFromMongoToPG, moveWorkspaceFromMongoToPG } from './db'
import { fixJsonMarkup, migrateMarkup, restoreLostMarkup } from './markup'
import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin'
import { fixAccountEmails, renameAccount } from './renameAccount'
@ -814,6 +823,13 @@ export function devTool (
const storage = await createFileBackupStorage(dirName)
await compactBackup(toolCtx, storage, cmd.force)
})
program
.command('backup-check <dirName>')
.description('Compact a given backup, will create one snapshot clean unused resources')
.action(async (dirName: string, cmd: any) => {
const storage = await createFileBackupStorage(dirName)
await checkBackupIntegrity(toolCtx, storage)
})
program
.command('backup-restore <dirName> <workspace> [date]')
@ -864,6 +880,61 @@ export function devTool (
await backup(toolCtx, endpoint, wsid, storage)
})
})
program
.command('backup-s3-clean <bucketName> <days>')
.description('dump workspace transactions and minio resources')
.action(async (bucketName: string, days: string, cmd) => {
const backupStorageConfig = storageConfigFromEnv(process.env.STORAGE)
const storageAdapter = createStorageFromConfig(backupStorageConfig.storages[0])
const daysInterval = Date.now() - parseInt(days) * 24 * 60 * 60 * 1000
try {
const token = generateToken(systemAccountEmail, { name: 'any' })
const workspaces = (await listAccountWorkspaces(token)).filter((it) => {
const lastBackup = it.backupInfo?.lastBackup ?? 0
if (lastBackup > daysInterval) {
// No backup required, interval not elapsed
return true
}
if (it.lastVisit == null) {
return false
}
return false
})
workspaces.sort((a, b) => {
return (b.backupInfo?.backupSize ?? 0) - (a.backupInfo?.backupSize ?? 0)
})
for (const ws of workspaces) {
const storage = await createStorageBackupStorage(
toolCtx,
storageAdapter,
getWorkspaceId(bucketName),
ws.workspace
)
await backupRemoveLast(storage, daysInterval)
await updateBackupInfo(generateToken(systemAccountEmail, { name: 'any' }), {
backups: ws.backupInfo?.backups ?? 0,
backupSize: ws.backupInfo?.backupSize ?? 0,
blobsSize: ws.backupInfo?.blobsSize ?? 0,
dataSize: ws.backupInfo?.dataSize ?? 0,
lastBackup: daysInterval
})
}
} finally {
await storageAdapter.close()
}
})
program
.command('backup-clean <dirName> <days>')
.description('dump workspace transactions and minio resources')
.action(async (dirName: string, days: string, cmd) => {
const daysInterval = Date.now() - parseInt(days) * 24 * 60 * 60 * 1000
const storage = await createFileBackupStorage(dirName)
await backupRemoveLast(storage, daysInterval)
})
program
.command('backup-s3-compact <bucketName> <dirName>')
@ -880,6 +951,20 @@ export function devTool (
}
await storageAdapter.close()
})
program
.command('backup-s3-check <bucketName> <dirName>')
.description('Compact a given backup to just one snapshot')
.action(async (bucketName: string, dirName: string, cmd: any) => {
const backupStorageConfig = storageConfigFromEnv(process.env.STORAGE)
const storageAdapter = createStorageFromConfig(backupStorageConfig.storages[0])
try {
const storage = await createStorageBackupStorage(toolCtx, storageAdapter, getWorkspaceId(bucketName), dirName)
await checkBackupIntegrity(toolCtx, storage)
} catch (err: any) {
toolCtx.error('failed to size backup', { err })
}
await storageAdapter.close()
})
program
.command('backup-s3-restore <bucketName> <dirName> <workspace> [date]')
@ -1100,7 +1185,7 @@ export function devTool (
.command('move-files')
.option('-w, --workspace <workspace>', 'Selected workspace only', '')
.option('-m, --move <move>', 'When set to true, the files will be moved, otherwise copied', 'false')
.option('-bl, --blobLimit <blobLimit>', 'A blob size limit in megabytes (default 50mb)', '50')
.option('-bl, --blobLimit <blobLimit>', 'A blob size limit in megabytes (default 50mb)', '999999')
.option('-c, --concurrency <concurrency>', 'Number of files being processed concurrently', '10')
.option('--disabled', 'Include disabled workspaces', false)
.action(
@ -1125,6 +1210,7 @@ export function devTool (
const workspaces = await listWorkspacesPure(db)
workspaces.sort((a, b) => b.lastVisit - a.lastVisit)
const rateLimit = new RateLimiter(10)
for (const workspace of workspaces) {
if (cmd.workspace !== '' && workspace.workspace !== cmd.workspace) {
continue
@ -1134,12 +1220,14 @@ export function devTool (
continue
}
await rateLimit.exec(async () => {
console.log('start', workspace.workspace, index, '/', workspaces.length)
await moveFiles(toolCtx, getWorkspaceId(workspace.workspace), exAdapter, params)
console.log('done', workspace.workspace)
index += 1
})
}
await rateLimit.waitProcessing()
} catch (err: any) {
console.error(err)
}

View File

@ -144,6 +144,23 @@ async function processAdapter (
let movedBytes = 0
let batchBytes = 0
function printStats (): void {
const duration = Date.now() - time
console.log(
'...processed',
processedCnt,
Math.round(processedBytes / 1024 / 1024) + 'MB',
'moved',
movedCnt,
Math.round(movedBytes / 1024 / 1024) + 'MB',
'+' + Math.round(batchBytes / 1024 / 1024) + 'MB',
Math.round(duration / 1000) + 's'
)
batchBytes = 0
time = Date.now()
}
const rateLimiter = new RateLimiter(params.concurrency)
const iterator = await source.listStream(ctx, workspaceId)
@ -152,6 +169,17 @@ async function processAdapter (
const targetBlobs = new Map<Ref<Blob>, ListBlobResult>()
let targetFilled = false
const toRemove: string[] = []
try {
while (true) {
const dataBulk = await iterator.next()
if (dataBulk.length === 0) break
if (!targetFilled) {
// Only fill target if have something to move.
targetFilled = true
while (true) {
const part = await targetIterator.next()
for (const p of part) {
@ -161,12 +189,7 @@ async function processAdapter (
break
}
}
const toRemove: string[] = []
try {
while (true) {
const dataBulk = await iterator.next()
if (dataBulk.length === 0) break
}
for (const data of dataBulk) {
let targetBlob: Blob | ListBlobResult | undefined = targetBlobs.get(data._id)
@ -219,22 +242,7 @@ async function processAdapter (
if (processedCnt % 100 === 0) {
await rateLimiter.waitProcessing()
const duration = Date.now() - time
console.log(
'...processed',
processedCnt,
Math.round(processedBytes / 1024 / 1024) + 'MB',
'moved',
movedCnt,
Math.round(movedBytes / 1024 / 1024) + 'MB',
'+' + Math.round(batchBytes / 1024 / 1024) + 'MB',
Math.round(duration / 1000) + 's'
)
batchBytes = 0
time = Date.now()
printStats()
}
}
}
@ -246,6 +254,7 @@ async function processAdapter (
await source.remove(ctx, workspaceId, part)
}
}
printStats()
} finally {
await iterator.close()
}

View File

@ -189,6 +189,190 @@ async function loadDigest (
ctx.end()
return result
}
async function verifyDigest (
ctx: MeasureContext,
storage: BackupStorage,
snapshots: BackupSnapshot[],
domain: Domain
): Promise<boolean> {
ctx = ctx.newChild('verify digest', { domain, count: snapshots.length })
ctx.info('verify-digest', { domain, count: snapshots.length })
let modified = false
for (const s of snapshots) {
const d = s.domains[domain]
if (d === undefined) {
continue
}
const storageToRemove = new Set<string>()
// We need to verify storage has all necessary resources
ctx.info('checking', { domain })
// We have required documents here.
const validDocs = new Set<Ref<Doc>>()
for (const sf of d.storage ?? []) {
const blobs = new Map<string, { doc: Doc | undefined, buffer: Buffer | undefined }>()
try {
ctx.info('checking storage', { sf })
const readStream = await storage.load(sf)
const ex = extract()
ex.on('entry', (headers, stream, next) => {
const name = headers.name ?? ''
// We found blob data
if (name.endsWith('.json')) {
const chunks: Buffer[] = []
const bname = name.substring(0, name.length - 5)
stream.on('data', (chunk) => {
chunks.push(chunk)
})
stream.on('end', () => {
const bf = Buffer.concat(chunks as any)
const doc = JSON.parse(bf.toString()) as Doc
if (doc._class === core.class.Blob || doc._class === 'core:class:BlobData') {
const data = migradeBlobData(doc as Blob, '')
const d = blobs.get(bname) ?? (data !== '' ? Buffer.from(data, 'base64') : undefined)
if (d === undefined) {
blobs.set(bname, { doc, buffer: undefined })
} else {
blobs.delete(bname)
const blob = doc as Blob
if (blob.size === bf.length) {
validDocs.add(name as Ref<Doc>)
}
}
} else {
validDocs.add(name as Ref<Doc>)
}
next()
})
} else {
const chunks: Buffer[] = []
stream.on('data', (chunk) => {
chunks.push(chunk)
})
stream.on('end', () => {
const bf = Buffer.concat(chunks as any)
const d = blobs.get(name)
if (d === undefined) {
blobs.set(name, { doc: undefined, buffer: bf })
} else {
blobs.delete(name)
const doc = d?.doc as Blob
let sz = doc.size
if (Number.isNaN(sz) || sz !== bf.length) {
sz = bf.length
}
// If blob size matches doc size, remove from requiredDocs
if (sz === bf.length) {
validDocs.add(name as Ref<Doc>)
}
}
next()
})
}
stream.resume() // just auto drain the stream
})
const unzip = createGunzip({ level: defaultLevel })
const endPromise = new Promise((resolve) => {
ex.on('finish', () => {
resolve(null)
})
unzip.on('error', (err) => {
ctx.error('error during reading of', { sf, err })
modified = true
storageToRemove.add(sf)
resolve(null)
})
})
readStream.on('end', () => {
readStream.destroy()
})
readStream.pipe(unzip)
unzip.pipe(ex)
await endPromise
} catch (err: any) {
ctx.error('error during reading of', { sf, err })
// In case of invalid archive, we need to
// We need to remove broken storage file
modified = true
storageToRemove.add(sf)
}
}
if (storageToRemove.size > 0) {
modified = true
d.storage = (d.storage ?? []).filter((it) => !storageToRemove.has(it))
}
// if (d?.snapshot !== undefined) {
// Will not check old format
// }
const digestToRemove = new Set<string>()
for (const snapshot of d?.snapshots ?? []) {
try {
ctx.info('checking', { snapshot })
const changes: Snapshot = {
added: new Map(),
removed: [],
updated: new Map()
}
let lmodified = false
try {
const dataBlob = gunzipSync(await storage.loadFile(snapshot))
.toString()
.split('\n')
const addedCount = parseInt(dataBlob.shift() ?? '0')
const added = dataBlob.splice(0, addedCount)
for (const it of added) {
const [k, v] = it.split(';')
if (validDocs.has(k as any)) {
changes.added.set(k as Ref<Doc>, v)
} else {
lmodified = true
}
}
const updatedCount = parseInt(dataBlob.shift() ?? '0')
const updated = dataBlob.splice(0, updatedCount)
for (const it of updated) {
const [k, v] = it.split(';')
if (validDocs.has(k as any)) {
changes.updated.set(k as Ref<Doc>, v)
} else {
lmodified = true
}
}
const removedCount = parseInt(dataBlob.shift() ?? '0')
const removed = dataBlob.splice(0, removedCount)
changes.removed = removed as Ref<Doc>[]
} catch (err: any) {
ctx.warn('failed during processing of snapshot file, it will be skipped', { snapshot })
digestToRemove.add(snapshot)
modified = true
}
if (lmodified) {
modified = true
// Store changes without missing files
await writeChanges(storage, snapshot, changes)
}
} catch (err: any) {
digestToRemove.add(snapshot)
ctx.error('digest is broken, will do full backup for', { domain })
modified = true
}
}
d.snapshots = (d.snapshots ?? []).filter((it) => !digestToRemove.has(it))
}
ctx.end()
return modified
}
async function write (chunk: any, stream: Writable): Promise<void> {
let needDrain = false
@ -982,10 +1166,12 @@ export async function backup (
storageZip.pipe(sizePass)
_packClose = async () => {
await new Promise<void>((resolve) => {
tempFile.on('close', () => {
resolve()
})
_pack?.finalize()
storageZip.destroy()
_pack?.destroy()
tempFile.destroy()
})
// We need to upload file to storage
ctx.info('Upload pack file', { storageFile, size: sz, workspace: workspaceId.name })
@ -1241,6 +1427,26 @@ export async function backupList (storage: BackupStorage): Promise<void> {
}
}
/**
* @public
*/
export async function backupRemoveLast (storage: BackupStorage, date: number): Promise<void> {
const infoFile = 'backup.json.gz'
if (!(await storage.exists(infoFile))) {
throw new Error(`${infoFile} should present to restore`)
}
const backupInfo: BackupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString())
console.log('workspace:', backupInfo.workspace ?? '', backupInfo.version)
const old = backupInfo.snapshots.length
backupInfo.snapshots = backupInfo.snapshots.filter((it) => it.date < date)
if (old !== backupInfo.snapshots.length) {
console.log('removed snapshots: id:', old - backupInfo.snapshots.length)
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }))
}
}
/**
* @public
*/
@ -1980,7 +2186,7 @@ export async function compactBackup (
chunks.push(chunk)
})
stream.on('end', () => {
const bf = Buffer.concat(chunks)
const bf = Buffer.concat(chunks as any)
const d = blobs.get(name)
if (d === undefined) {
blobs.set(name, { doc: undefined, buffer: bf })
@ -2031,12 +2237,16 @@ export async function compactBackup (
stream.resume() // just auto drain the stream
})
const unzip = createGunzip({ level: defaultLevel })
const endPromise = new Promise((resolve) => {
ex.on('finish', () => {
resolve(null)
})
unzip.on('error', (err) => {
ctx.error('error during processing', { snapshot, err })
resolve(null)
})
})
const unzip = createGunzip({ level: defaultLevel })
readStream.on('end', () => {
readStream.destroy()
@ -2111,3 +2321,53 @@ function migradeBlobData (blob: Blob, etag: string): string {
}
return ''
}
/**
* Will check backup integrity, and in case of some missing resources, will update digest files, so next backup will backup all missing parts.
* @public
*/
export async function checkBackupIntegrity (ctx: MeasureContext, storage: BackupStorage): Promise<void> {
console.log('starting backup compaction')
try {
let backupInfo: BackupInfo
// Version 0.6.2, format of digest file is changed to
const infoFile = 'backup.json.gz'
if (await storage.exists(infoFile)) {
backupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString())
} else {
console.log('No backup found')
return
}
if (backupInfo.version !== '0.6.2') {
console.log('Invalid backup version')
return
}
const domains: Domain[] = []
for (const sn of backupInfo.snapshots) {
for (const d of Object.keys(sn.domains)) {
if (!domains.includes(d as Domain)) {
domains.push(d as Domain)
}
}
}
let modified = false
for (const domain of domains) {
console.log('checking domain...', domain)
if (await verifyDigest(ctx, storage, backupInfo.snapshots, domain)) {
modified = true
}
}
if (modified) {
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }))
}
} catch (err: any) {
console.error(err)
} finally {
console.log('end compacting')
}
}