From e44a8a4f7280cb15c4abed270efcbb42337e8373 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Sat, 15 Jun 2024 22:41:52 +0700 Subject: [PATCH] UBERF-7286: Backup retry (#5830) Signed-off-by: Andrey Sobolev --- dev/tool/src/index.ts | 2 +- server/backup/src/backup.ts | 41 +++++++++++++++++++++++++++++-------- server/tool/src/connect.ts | 36 ++++++++++++++++++++++---------- 3 files changed, 58 insertions(+), 21 deletions(-) diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index cd599c4f01..27833716d5 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -862,7 +862,7 @@ export function devTool ( productId }) const buffer = readFileSync(local) - await blobClient.upload(remote, buffer.length, contentType, buffer) + await blobClient.upload(toolCtx, remote, buffer.length, contentType, buffer) }) }) diff --git a/server/backup/src/backup.ts b/server/backup/src/backup.ts index d7580cc5cc..8fffc1f1e2 100644 --- a/server/backup/src/backup.ts +++ b/server/backup/src/backup.ts @@ -359,7 +359,8 @@ export async function cloneWorkspace ( const blob = d as Blob const blobs: Buffer[] = [] try { - await blobClientSource.writeTo(new MeasureMetricsContext('upload', {}), blob._id, blob.size, { + const ctx = new MeasureMetricsContext('upload', {}) + await blobClientSource.writeTo(ctx, blob._id, blob.size, { write: (b, cb) => { blobs.push(b) cb() @@ -369,7 +370,7 @@ export async function cloneWorkspace ( } }) - await blobClientTarget.upload(blob._id, blob.size, blob.contentType, Buffer.concat(blobs)) + await blobClientTarget.upload(ctx, blob._id, blob.size, blob.contentType, Buffer.concat(blobs)) } catch (err: any) { console.error(err) } @@ -928,6 +929,22 @@ export async function restore ( domains.add(d) } + let uploadedMb = 0 + let uploaded = 0 + + const printUploaded = (msg: string, size: number): void => { + uploaded += size + const newDownloadedMb = Math.round(uploaded / (1024 * 1024)) + const newId = Math.round(newDownloadedMb / 10) + if (uploadedMb !== newId) { + uploadedMb = newId + ctx.info('Uploaded', { + msg, + written: newDownloadedMb + }) + } + } + async function processDomain (c: Domain): Promise { const changeset = await loadDigest(ctx, storage, snapshots, c, opt.date) // We need to load full changeset from server @@ -986,6 +1003,7 @@ export async function restore ( docs.push(doc) } sendSize = sendSize + len + if (sendSize > dataUploadSize || (doc === undefined && docs.length > 0)) { console.log('upload', docs.length, `send: ${totalSend} from ${docsToAdd.size + totalSend}`, 'size:', sendSize) totalSend += docs.length @@ -993,6 +1011,7 @@ export async function restore ( docs.length = 0 sendSize = 0 } + printUploaded('upload', len) } let processed = 0 @@ -1033,9 +1052,10 @@ export async function restore ( blobs.delete(name) const doc = d?.doc as Blob ;(doc as any)['%hash%'] = changeset.get(doc._id) - void blobClient.upload(doc._id, doc.size, doc.contentType, bf).then(() => { + void blobClient.upload(ctx, doc._id, doc.size, doc.contentType, bf).then(() => { void sendChunk(doc, bf.length).finally(() => { requiredDocs.delete(doc._id) + printUploaded('upload', bf.length) next() }) }) @@ -1059,13 +1079,16 @@ export async function restore ( } else { blobs.delete(bname) const blob = doc as Blob - void blobClient.upload(blob._id, blob.size, blob.contentType, d.buffer as Buffer).then(() => { - ;(doc as any)['%hash%'] = changeset.get(doc._id) - void sendChunk(doc, bf.length).finally(() => { - requiredDocs.delete(doc._id) - next() + void blobClient + .upload(ctx, blob._id, blob.size, blob.contentType, d.buffer as Buffer) + .then(() => { + ;(doc as any)['%hash%'] = changeset.get(doc._id) + void sendChunk(doc, bf.length).finally(() => { + requiredDocs.delete(doc._id) + next() + printUploaded('upload', bf.length) + }) }) - }) } } else { ;(doc as any)['%hash%'] = changeset.get(doc._id) diff --git a/server/tool/src/connect.ts b/server/tool/src/connect.ts index 46f6d19ac3..6a9fe6851e 100644 --- a/server/tool/src/connect.ts +++ b/server/tool/src/connect.ts @@ -218,17 +218,31 @@ export class BlobClient { }) } - async upload (name: string, size: number, contentType: string, buffer: Buffer): Promise { - await fetch( - this.transactorAPIUrl + `?name=${encodeURIComponent(name)}&contentType=${encodeURIComponent(contentType)}`, - { - method: 'PUT', - headers: { - Authorization: 'Bearer ' + this.token, - 'Content-Type': 'application/octet-stream' - }, - body: buffer + async upload (ctx: MeasureContext, name: string, size: number, contentType: string, buffer: Buffer): Promise { + // TODO: We need to improve this logig, to allow restore of huge blobs + for (let i = 0; i < 5; i++) { + try { + await fetch( + this.transactorAPIUrl + `?name=${encodeURIComponent(name)}&contentType=${encodeURIComponent(contentType)}`, + { + method: 'PUT', + headers: { + Authorization: 'Bearer ' + this.token, + 'Content-Type': 'application/octet-stream' + }, + body: buffer + } + ) + break + } catch (err: any) { + if (i === 4) { + ctx.error('failed to upload file', { name }) + throw err + } + await new Promise((resolve) => { + setTimeout(resolve, 500) + }) } - ) + } } }