diff --git a/.gitignore b/.gitignore index 3c21ad7f29..4def1db296 100644 --- a/.gitignore +++ b/.gitignore @@ -104,4 +104,5 @@ tests/profiles **/bundle/model.json .wrangler dump -**/logs/** \ No newline at end of file +**/logs/** +dev/tool/history.json diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index bfd9cf5d72..47ea18eb54 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -947,25 +947,46 @@ export function devTool ( .option('-c, --recheck', 'Force hash recheck on server', false) .option('-i, --include ', 'A list of ; separated domain names to include during backup', '*') .option('-s, --skip ', 'A list of ; separated domain names to skip during backup', '') + .option('--use-storage ', 'Use workspace storage adapter from env variable', '') + .option( + '--history-file ', + 'Store blob send info into file. Will skip already send documents.', + undefined + ) .description('dump workspace transactions and minio resources') .action( async ( dirName: string, workspace: string, date, - cmd: { merge: boolean, parallel: string, recheck: boolean, include: string, skip: string } + cmd: { + merge: boolean + parallel: string + recheck: boolean + include: string + skip: string + useStorage: string + historyFile: string + } ) => { const storage = await createFileBackupStorage(dirName) const wsid = getWorkspaceId(workspace) const endpoint = await getTransactorEndpoint(generateToken(systemAccountEmail, wsid), 'external') + const storageConfig = cmd.useStorage !== '' ? storageConfigFromEnv(process.env[cmd.useStorage]) : undefined + + const workspaceStorage: StorageAdapter | undefined = + storageConfig !== undefined ? buildStorageFromConfig(storageConfig) : undefined await restore(toolCtx, endpoint, wsid, storage, { date: parseInt(date ?? '-1'), merge: cmd.merge, parallel: parseInt(cmd.parallel ?? '1'), recheck: cmd.recheck, include: cmd.include === '*' ? undefined : new Set(cmd.include.split(';')), - skip: new Set(cmd.skip.split(';')) + skip: new Set(cmd.skip.split(';')), + storageAdapter: workspaceStorage, + historyFile: cmd.historyFile }) + await workspaceStorage?.close() } ) diff --git a/server/backup/src/backup.ts b/server/backup/src/backup.ts index cdd487c105..b58b620679 100644 --- a/server/backup/src/backup.ts +++ b/server/backup/src/backup.ts @@ -47,7 +47,15 @@ import { estimateDocSize, type StorageAdapter } from '@hcengineering/server-core import { generateToken } from '@hcengineering/server-token' import { connect } from '@hcengineering/server-tool' import { deepEqual } from 'fast-equals' -import { createReadStream, createWriteStream, existsSync, mkdirSync, statSync } from 'node:fs' +import { + createReadStream, + createWriteStream, + existsSync, + mkdirSync, + readFileSync, + statSync, + writeFileSync +} from 'node:fs' import { rm } from 'node:fs/promises' import { basename, dirname } from 'node:path' import { PassThrough } from 'node:stream' @@ -143,7 +151,7 @@ async function loadDigest ( // Load old JSON snapshot if (d?.snapshot !== undefined) { - const dChanges: SnapshotV6 = JSON.parse(gunzipSync(await storage.loadFile(d.snapshot)).toString()) + const dChanges: SnapshotV6 = JSON.parse(gunzipSync((await storage.loadFile(d.snapshot)) as any).toString()) for (const [k, v] of Object.entries(dChanges.added)) { result.set(k as Ref, v) } @@ -156,7 +164,7 @@ async function loadDigest ( } for (const snapshot of d?.snapshots ?? []) { try { - const dataBlob = gunzipSync(await storage.loadFile(snapshot)) + const dataBlob = gunzipSync((await storage.loadFile(snapshot)) as any) .toString() .split('\n') const addedCount = parseInt(dataBlob.shift() ?? '0') @@ -1653,6 +1661,7 @@ export async function restore ( token?: string progress?: (progress: number) => Promise cleanIndexState?: boolean + historyFile?: string } ): Promise { const infoFile = 'backup.json.gz' @@ -1661,7 +1670,7 @@ export async function restore ( ctx.error('file not pressent', { file: infoFile }) throw new Error(`${infoFile} should present to restore`) } - const backupInfo: BackupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString()) + const backupInfo: BackupInfo = JSON.parse(gunzipSync((await storage.loadFile(infoFile)) as any).toString()) let snapshots = backupInfo.snapshots if (opt.date !== -1) { const bk = backupInfo.snapshots.findIndex((it) => it.date === opt.date) @@ -1684,6 +1693,11 @@ export async function restore ( ctx.info('connecting:', { transactorUrl, workspace: workspaceId.name }) + const historyFile: Record = + opt.historyFile !== undefined && existsSync(opt.historyFile) + ? JSON.parse(readFileSync(opt.historyFile).toString()) + : {} + const token = opt.token ?? generateToken(systemAccountEmail, workspaceId, { @@ -1883,14 +1897,59 @@ export async function restore ( } printUploaded('upload', len) } + let processed = 0 + const blobUploader = new RateLimiter(10) + for (const s of rsnapshots) { const d = s.domains[c] if (d !== undefined && docsToAdd.size > 0) { const sDigest = await loadDigest(ctx, storage, [s], c) const requiredDocs = new Map(Array.from(sDigest.entries()).filter(([it]) => docsToAdd.has(it))) + + let lastSendTime = Date.now() + async function sendBlob (blob: Blob, data: Buffer, next: () => void): Promise { + await blobUploader.add(async () => { + next() + let needSend = true + if (opt.historyFile !== undefined) { + if (historyFile[blob._id] === blob.etag) { + needSend = false + } + } + + if (needSend) { + try { + await blobClient.upload(ctx, blob._id, blob.size, blob.contentType, data) + if (opt.historyFile !== undefined) { + historyFile[blob._id] = blob.etag + if (totalSend % 1000 === 0) { + writeFileSync(opt.historyFile, JSON.stringify(historyFile, undefined, 2)) + } + } + } catch (err: any) { + ctx.warn('failed to upload blob', { _id: blob._id, err, workspace: workspaceId.name }) + } + } + docsToAdd.delete(blob._id) + requiredDocs.delete(blob._id) + printUploaded('upload:' + blobUploader.processingQueue.size, data.length) + totalSend++ + if (lastSendTime < Date.now()) { + lastSendTime = Date.now() + 2500 + + ctx.info('upload ' + c, { + totalSend, + from: docsToAdd.size + totalSend, + sendSize, + workspace: workspaceId.name + }) + } + }) + } + if (requiredDocs.size > 0) { ctx.info('updating', { domain: c, requiredDocs: requiredDocs.size, workspace: workspaceId.name }) // We have required documents here. @@ -1920,19 +1979,13 @@ export async function restore ( next() } else { blobs.delete(name) - const doc = d?.doc as Blob - ;(doc as any)['%hash%'] = changeset.get(doc._id) - let sz = doc.size + const blob = d?.doc as Blob + ;(blob as any)['%hash%'] = changeset.get(blob._id) + let sz = blob.size if (Number.isNaN(sz) || sz !== bf.length) { sz = bf.length } - void blobClient.upload(ctx, doc._id, sz, doc.contentType, bf).then(() => { - void sendChunk(doc, bf.length).finally(() => { - requiredDocs.delete(doc._id) - printUploaded('upload', bf.length) - next() - }) - }) + void sendBlob(blob, bf, next) } }) } else if (name.endsWith('.json') && requiredDocs.has(name.substring(0, name.length - 5) as Ref)) { @@ -1953,22 +2006,7 @@ export async function restore ( } else { blobs.delete(bname) const blob = doc as Blob - void blobClient - .upload( - ctx, - blob._id, - blob.size, - blob.contentType, - d instanceof Buffer ? d : (d.buffer as Buffer) - ) - .then(() => { - ;(doc as any)['%hash%'] = changeset.get(doc._id) - void sendChunk(doc, bf.length).finally(() => { - requiredDocs.delete(doc._id) - next() - printUploaded('upload', bf.length) - }) - }) + void sendBlob(blob, d instanceof Buffer ? d : (d.buffer as Buffer), next) } } else { ;(doc as any)['%hash%'] = changeset.get(doc._id) @@ -1984,6 +2022,8 @@ export async function restore ( stream.resume() // just auto drain the stream }) + await blobUploader.waitProcessing() + const endPromise = new Promise((resolve) => { ex.on('finish', () => { resolve(null)