fix: add limit and retries to move files tool (#6368)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
Alexander Onnikov 2024-08-22 18:04:00 +07:00 committed by GitHub
parent df2a9b2708
commit 56ac8eb4f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 60 additions and 10 deletions

View File

@ -1044,7 +1044,8 @@ export function devTool (
program
.command('move-files')
.option('-w, --workspace <workspace>', 'Selected workspace only', '')
.action(async (cmd: { workspace: string }) => {
.option('-bl, --blobLimit <blobLimit>', 'A blob size limit in megabytes (default 50mb)', '50')
.action(async (cmd: { workspace: string, blobLimit: string }) => {
const { mongodbUri } = prepareTools()
await withDatabase(mongodbUri, async (db, client) => {
await withStorage(mongodbUri, async (adapter) => {
@ -1063,7 +1064,7 @@ export function devTool (
}
const wsId = getWorkspaceId(workspace.workspace, productId)
await moveFiles(toolCtx, wsId, exAdapter)
await moveFiles(toolCtx, wsId, exAdapter, parseInt(cmd.blobLimit))
}
} catch (err: any) {
console.error(err)

View File

@ -13,14 +13,15 @@
// limitations under the License.
//
import { type MeasureContext, type WorkspaceId } from '@hcengineering/core'
import { type Blob, type MeasureContext, type WorkspaceId } from '@hcengineering/core'
import { type StorageAdapterEx } from '@hcengineering/server-core'
import { PassThrough } from 'stream'
export async function moveFiles (
ctx: MeasureContext,
workspaceId: WorkspaceId,
exAdapter: StorageAdapterEx
exAdapter: StorageAdapterEx,
blobSizeLimitMb: number
): Promise<void> {
if (exAdapter.adapters === undefined) return
@ -45,13 +46,20 @@ export async function moveFiles (
if (blob === undefined) continue
if (blob.provider === target) continue
if (blob.size > blobSizeLimitMb * 1024 * 1024) {
console.log('skipping large blob', name, data._id, Math.round(blob.size / 1024 / 1024))
continue
}
try {
const readable = await exAdapter.get(ctx, workspaceId, data._id)
readable.on('end', () => {
readable.destroy()
})
const stream = readable.pipe(new PassThrough())
await exAdapter.put(ctx, workspaceId, data._id, stream, blob.contentType, blob.size)
await retryOnFailure(
ctx,
5,
async () => {
await moveFile(ctx, exAdapter, workspaceId, blob)
},
50
)
} catch (err) {
console.error('failed to process blob', name, data._id, err)
}
@ -66,3 +74,44 @@ export async function moveFiles (
console.log('...done', workspaceId.name, count)
}
async function moveFile (
ctx: MeasureContext,
exAdapter: StorageAdapterEx,
workspaceId: WorkspaceId,
blob: Blob
): Promise<void> {
const readable = await exAdapter.get(ctx, workspaceId, blob._id)
try {
readable.on('end', () => {
readable.destroy()
})
const stream = readable.pipe(new PassThrough())
await exAdapter.put(ctx, workspaceId, blob._id, stream, blob.contentType, blob.size)
} catch (err) {
readable.destroy()
throw err
}
}
async function retryOnFailure<T> (
ctx: MeasureContext,
retries: number,
op: () => Promise<T>,
delay: number = 0
): Promise<T> {
let error: any
while (retries > 0) {
retries--
try {
return await op()
} catch (err: any) {
error = err
ctx.error('error', { err, retries })
if (retries !== 0 && delay > 0) {
await new Promise((resolve) => setTimeout(resolve, delay))
}
}
}
throw error
}