UBERF-7286: Backup retry (#5830)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-06-15 22:41:52 +07:00 committed by GitHub
parent 11904d4138
commit e44a8a4f72
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 58 additions and 21 deletions

View File

@ -862,7 +862,7 @@ export function devTool (
productId
})
const buffer = readFileSync(local)
await blobClient.upload(remote, buffer.length, contentType, buffer)
await blobClient.upload(toolCtx, remote, buffer.length, contentType, buffer)
})
})

View File

@ -359,7 +359,8 @@ export async function cloneWorkspace (
const blob = d as Blob
const blobs: Buffer[] = []
try {
await blobClientSource.writeTo(new MeasureMetricsContext('upload', {}), blob._id, blob.size, {
const ctx = new MeasureMetricsContext('upload', {})
await blobClientSource.writeTo(ctx, blob._id, blob.size, {
write: (b, cb) => {
blobs.push(b)
cb()
@ -369,7 +370,7 @@ export async function cloneWorkspace (
}
})
await blobClientTarget.upload(blob._id, blob.size, blob.contentType, Buffer.concat(blobs))
await blobClientTarget.upload(ctx, blob._id, blob.size, blob.contentType, Buffer.concat(blobs))
} catch (err: any) {
console.error(err)
}
@ -928,6 +929,22 @@ export async function restore (
domains.add(d)
}
let uploadedMb = 0
let uploaded = 0
const printUploaded = (msg: string, size: number): void => {
uploaded += size
const newDownloadedMb = Math.round(uploaded / (1024 * 1024))
const newId = Math.round(newDownloadedMb / 10)
if (uploadedMb !== newId) {
uploadedMb = newId
ctx.info('Uploaded', {
msg,
written: newDownloadedMb
})
}
}
async function processDomain (c: Domain): Promise<void> {
const changeset = await loadDigest(ctx, storage, snapshots, c, opt.date)
// We need to load full changeset from server
@ -986,6 +1003,7 @@ export async function restore (
docs.push(doc)
}
sendSize = sendSize + len
if (sendSize > dataUploadSize || (doc === undefined && docs.length > 0)) {
console.log('upload', docs.length, `send: ${totalSend} from ${docsToAdd.size + totalSend}`, 'size:', sendSize)
totalSend += docs.length
@ -993,6 +1011,7 @@ export async function restore (
docs.length = 0
sendSize = 0
}
printUploaded('upload', len)
}
let processed = 0
@ -1033,9 +1052,10 @@ export async function restore (
blobs.delete(name)
const doc = d?.doc as Blob
;(doc as any)['%hash%'] = changeset.get(doc._id)
void blobClient.upload(doc._id, doc.size, doc.contentType, bf).then(() => {
void blobClient.upload(ctx, doc._id, doc.size, doc.contentType, bf).then(() => {
void sendChunk(doc, bf.length).finally(() => {
requiredDocs.delete(doc._id)
printUploaded('upload', bf.length)
next()
})
})
@ -1059,13 +1079,16 @@ export async function restore (
} else {
blobs.delete(bname)
const blob = doc as Blob
void blobClient.upload(blob._id, blob.size, blob.contentType, d.buffer as Buffer).then(() => {
;(doc as any)['%hash%'] = changeset.get(doc._id)
void sendChunk(doc, bf.length).finally(() => {
requiredDocs.delete(doc._id)
next()
void blobClient
.upload(ctx, blob._id, blob.size, blob.contentType, d.buffer as Buffer)
.then(() => {
;(doc as any)['%hash%'] = changeset.get(doc._id)
void sendChunk(doc, bf.length).finally(() => {
requiredDocs.delete(doc._id)
next()
printUploaded('upload', bf.length)
})
})
})
}
} else {
;(doc as any)['%hash%'] = changeset.get(doc._id)

View File

@ -218,17 +218,31 @@ export class BlobClient {
})
}
async upload (name: string, size: number, contentType: string, buffer: Buffer): Promise<void> {
await fetch(
this.transactorAPIUrl + `?name=${encodeURIComponent(name)}&contentType=${encodeURIComponent(contentType)}`,
{
method: 'PUT',
headers: {
Authorization: 'Bearer ' + this.token,
'Content-Type': 'application/octet-stream'
},
body: buffer
async upload (ctx: MeasureContext, name: string, size: number, contentType: string, buffer: Buffer): Promise<void> {
// TODO: We need to improve this logig, to allow restore of huge blobs
for (let i = 0; i < 5; i++) {
try {
await fetch(
this.transactorAPIUrl + `?name=${encodeURIComponent(name)}&contentType=${encodeURIComponent(contentType)}`,
{
method: 'PUT',
headers: {
Authorization: 'Bearer ' + this.token,
'Content-Type': 'application/octet-stream'
},
body: buffer
}
)
break
} catch (err: any) {
if (i === 4) {
ctx.error('failed to upload file', { name })
throw err
}
await new Promise<void>((resolve) => {
setTimeout(resolve, 500)
})
}
)
}
}
}