diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index f0bb2ba9dd..7069995c1f 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -1044,7 +1044,8 @@ export function devTool ( program .command('move-files') .option('-w, --workspace ', 'Selected workspace only', '') - .action(async (cmd: { workspace: string }) => { + .option('-bl, --blobLimit ', 'A blob size limit in megabytes (default 50mb)', '50') + .action(async (cmd: { workspace: string, blobLimit: string }) => { const { mongodbUri } = prepareTools() await withDatabase(mongodbUri, async (db, client) => { await withStorage(mongodbUri, async (adapter) => { @@ -1063,7 +1064,7 @@ export function devTool ( } const wsId = getWorkspaceId(workspace.workspace, productId) - await moveFiles(toolCtx, wsId, exAdapter) + await moveFiles(toolCtx, wsId, exAdapter, parseInt(cmd.blobLimit)) } } catch (err: any) { console.error(err) diff --git a/dev/tool/src/storage.ts b/dev/tool/src/storage.ts index 6231c8ca36..f091549a37 100644 --- a/dev/tool/src/storage.ts +++ b/dev/tool/src/storage.ts @@ -13,14 +13,15 @@ // limitations under the License. // -import { type MeasureContext, type WorkspaceId } from '@hcengineering/core' +import { type Blob, type MeasureContext, type WorkspaceId } from '@hcengineering/core' import { type StorageAdapterEx } from '@hcengineering/server-core' import { PassThrough } from 'stream' export async function moveFiles ( ctx: MeasureContext, workspaceId: WorkspaceId, - exAdapter: StorageAdapterEx + exAdapter: StorageAdapterEx, + blobSizeLimitMb: number ): Promise { if (exAdapter.adapters === undefined) return @@ -45,13 +46,20 @@ export async function moveFiles ( if (blob === undefined) continue if (blob.provider === target) continue + if (blob.size > blobSizeLimitMb * 1024 * 1024) { + console.log('skipping large blob', name, data._id, Math.round(blob.size / 1024 / 1024)) + continue + } + try { - const readable = await exAdapter.get(ctx, workspaceId, data._id) - readable.on('end', () => { - readable.destroy() - }) - const stream = readable.pipe(new PassThrough()) - await exAdapter.put(ctx, workspaceId, data._id, stream, blob.contentType, blob.size) + await retryOnFailure( + ctx, + 5, + async () => { + await moveFile(ctx, exAdapter, workspaceId, blob) + }, + 50 + ) } catch (err) { console.error('failed to process blob', name, data._id, err) } @@ -66,3 +74,44 @@ export async function moveFiles ( console.log('...done', workspaceId.name, count) } + +async function moveFile ( + ctx: MeasureContext, + exAdapter: StorageAdapterEx, + workspaceId: WorkspaceId, + blob: Blob +): Promise { + const readable = await exAdapter.get(ctx, workspaceId, blob._id) + try { + readable.on('end', () => { + readable.destroy() + }) + const stream = readable.pipe(new PassThrough()) + await exAdapter.put(ctx, workspaceId, blob._id, stream, blob.contentType, blob.size) + } catch (err) { + readable.destroy() + throw err + } +} + +async function retryOnFailure ( + ctx: MeasureContext, + retries: number, + op: () => Promise, + delay: number = 0 +): Promise { + let error: any + while (retries > 0) { + retries-- + try { + return await op() + } catch (err: any) { + error = err + ctx.error('error', { err, retries }) + if (retries !== 0 && delay > 0) { + await new Promise((resolve) => setTimeout(resolve, delay)) + } + } + } + throw error +}