UBERF-8957: Allow to use storage in backup-restore via tool (#7491)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-12-17 16:46:38 +07:00 committed by GitHub
parent 898395d83d
commit 0f4d461689
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 95 additions and 33 deletions

3
.gitignore vendored
View File

@ -104,4 +104,5 @@ tests/profiles
**/bundle/model.json
.wrangler
dump
**/logs/**
**/logs/**
dev/tool/history.json

View File

@ -947,25 +947,46 @@ export function devTool (
.option('-c, --recheck', 'Force hash recheck on server', false)
.option('-i, --include <include>', 'A list of ; separated domain names to include during backup', '*')
.option('-s, --skip <skip>', 'A list of ; separated domain names to skip during backup', '')
.option('--use-storage <useStorage>', 'Use workspace storage adapter from env variable', '')
.option(
'--history-file <historyFile>',
'Store blob send info into file. Will skip already send documents.',
undefined
)
.description('dump workspace transactions and minio resources')
.action(
async (
dirName: string,
workspace: string,
date,
cmd: { merge: boolean, parallel: string, recheck: boolean, include: string, skip: string }
cmd: {
merge: boolean
parallel: string
recheck: boolean
include: string
skip: string
useStorage: string
historyFile: string
}
) => {
const storage = await createFileBackupStorage(dirName)
const wsid = getWorkspaceId(workspace)
const endpoint = await getTransactorEndpoint(generateToken(systemAccountEmail, wsid), 'external')
const storageConfig = cmd.useStorage !== '' ? storageConfigFromEnv(process.env[cmd.useStorage]) : undefined
const workspaceStorage: StorageAdapter | undefined =
storageConfig !== undefined ? buildStorageFromConfig(storageConfig) : undefined
await restore(toolCtx, endpoint, wsid, storage, {
date: parseInt(date ?? '-1'),
merge: cmd.merge,
parallel: parseInt(cmd.parallel ?? '1'),
recheck: cmd.recheck,
include: cmd.include === '*' ? undefined : new Set(cmd.include.split(';')),
skip: new Set(cmd.skip.split(';'))
skip: new Set(cmd.skip.split(';')),
storageAdapter: workspaceStorage,
historyFile: cmd.historyFile
})
await workspaceStorage?.close()
}
)

View File

@ -47,7 +47,15 @@ import { estimateDocSize, type StorageAdapter } from '@hcengineering/server-core
import { generateToken } from '@hcengineering/server-token'
import { connect } from '@hcengineering/server-tool'
import { deepEqual } from 'fast-equals'
import { createReadStream, createWriteStream, existsSync, mkdirSync, statSync } from 'node:fs'
import {
createReadStream,
createWriteStream,
existsSync,
mkdirSync,
readFileSync,
statSync,
writeFileSync
} from 'node:fs'
import { rm } from 'node:fs/promises'
import { basename, dirname } from 'node:path'
import { PassThrough } from 'node:stream'
@ -143,7 +151,7 @@ async function loadDigest (
// Load old JSON snapshot
if (d?.snapshot !== undefined) {
const dChanges: SnapshotV6 = JSON.parse(gunzipSync(await storage.loadFile(d.snapshot)).toString())
const dChanges: SnapshotV6 = JSON.parse(gunzipSync((await storage.loadFile(d.snapshot)) as any).toString())
for (const [k, v] of Object.entries(dChanges.added)) {
result.set(k as Ref<Doc>, v)
}
@ -156,7 +164,7 @@ async function loadDigest (
}
for (const snapshot of d?.snapshots ?? []) {
try {
const dataBlob = gunzipSync(await storage.loadFile(snapshot))
const dataBlob = gunzipSync((await storage.loadFile(snapshot)) as any)
.toString()
.split('\n')
const addedCount = parseInt(dataBlob.shift() ?? '0')
@ -1653,6 +1661,7 @@ export async function restore (
token?: string
progress?: (progress: number) => Promise<void>
cleanIndexState?: boolean
historyFile?: string
}
): Promise<boolean> {
const infoFile = 'backup.json.gz'
@ -1661,7 +1670,7 @@ export async function restore (
ctx.error('file not pressent', { file: infoFile })
throw new Error(`${infoFile} should present to restore`)
}
const backupInfo: BackupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString())
const backupInfo: BackupInfo = JSON.parse(gunzipSync((await storage.loadFile(infoFile)) as any).toString())
let snapshots = backupInfo.snapshots
if (opt.date !== -1) {
const bk = backupInfo.snapshots.findIndex((it) => it.date === opt.date)
@ -1684,6 +1693,11 @@ export async function restore (
ctx.info('connecting:', { transactorUrl, workspace: workspaceId.name })
const historyFile: Record<string, string> =
opt.historyFile !== undefined && existsSync(opt.historyFile)
? JSON.parse(readFileSync(opt.historyFile).toString())
: {}
const token =
opt.token ??
generateToken(systemAccountEmail, workspaceId, {
@ -1883,14 +1897,59 @@ export async function restore (
}
printUploaded('upload', len)
}
let processed = 0
const blobUploader = new RateLimiter(10)
for (const s of rsnapshots) {
const d = s.domains[c]
if (d !== undefined && docsToAdd.size > 0) {
const sDigest = await loadDigest(ctx, storage, [s], c)
const requiredDocs = new Map(Array.from(sDigest.entries()).filter(([it]) => docsToAdd.has(it)))
let lastSendTime = Date.now()
async function sendBlob (blob: Blob, data: Buffer, next: () => void): Promise<void> {
await blobUploader.add(async () => {
next()
let needSend = true
if (opt.historyFile !== undefined) {
if (historyFile[blob._id] === blob.etag) {
needSend = false
}
}
if (needSend) {
try {
await blobClient.upload(ctx, blob._id, blob.size, blob.contentType, data)
if (opt.historyFile !== undefined) {
historyFile[blob._id] = blob.etag
if (totalSend % 1000 === 0) {
writeFileSync(opt.historyFile, JSON.stringify(historyFile, undefined, 2))
}
}
} catch (err: any) {
ctx.warn('failed to upload blob', { _id: blob._id, err, workspace: workspaceId.name })
}
}
docsToAdd.delete(blob._id)
requiredDocs.delete(blob._id)
printUploaded('upload:' + blobUploader.processingQueue.size, data.length)
totalSend++
if (lastSendTime < Date.now()) {
lastSendTime = Date.now() + 2500
ctx.info('upload ' + c, {
totalSend,
from: docsToAdd.size + totalSend,
sendSize,
workspace: workspaceId.name
})
}
})
}
if (requiredDocs.size > 0) {
ctx.info('updating', { domain: c, requiredDocs: requiredDocs.size, workspace: workspaceId.name })
// We have required documents here.
@ -1920,19 +1979,13 @@ export async function restore (
next()
} else {
blobs.delete(name)
const doc = d?.doc as Blob
;(doc as any)['%hash%'] = changeset.get(doc._id)
let sz = doc.size
const blob = d?.doc as Blob
;(blob as any)['%hash%'] = changeset.get(blob._id)
let sz = blob.size
if (Number.isNaN(sz) || sz !== bf.length) {
sz = bf.length
}
void blobClient.upload(ctx, doc._id, sz, doc.contentType, bf).then(() => {
void sendChunk(doc, bf.length).finally(() => {
requiredDocs.delete(doc._id)
printUploaded('upload', bf.length)
next()
})
})
void sendBlob(blob, bf, next)
}
})
} else if (name.endsWith('.json') && requiredDocs.has(name.substring(0, name.length - 5) as Ref<Doc>)) {
@ -1953,22 +2006,7 @@ export async function restore (
} else {
blobs.delete(bname)
const blob = doc as Blob
void blobClient
.upload(
ctx,
blob._id,
blob.size,
blob.contentType,
d instanceof Buffer ? d : (d.buffer as Buffer)
)
.then(() => {
;(doc as any)['%hash%'] = changeset.get(doc._id)
void sendChunk(doc, bf.length).finally(() => {
requiredDocs.delete(doc._id)
next()
printUploaded('upload', bf.length)
})
})
void sendBlob(blob, d instanceof Buffer ? d : (d.buffer as Buffer), next)
}
} else {
;(doc as any)['%hash%'] = changeset.get(doc._id)
@ -1984,6 +2022,8 @@ export async function restore (
stream.resume() // just auto drain the stream
})
await blobUploader.waitProcessing()
const endPromise = new Promise((resolve) => {
ex.on('finish', () => {
resolve(null)