diff --git a/dev/tool/src/storage.ts b/dev/tool/src/storage.ts index 7cd974d9a7..394cc4729f 100644 --- a/dev/tool/src/storage.ts +++ b/dev/tool/src/storage.ts @@ -14,9 +14,9 @@ // import { type Attachment } from '@hcengineering/attachment' -import { type Blob, type MeasureContext, type WorkspaceId, RateLimiter } from '@hcengineering/core' +import { type Blob, type MeasureContext, type Ref, type WorkspaceId, RateLimiter } from '@hcengineering/core' import { DOMAIN_ATTACHMENT } from '@hcengineering/model-attachment' -import { type StorageAdapter, type StorageAdapterEx } from '@hcengineering/server-core' +import { type ListBlobResult, type StorageAdapter, type StorageAdapterEx } from '@hcengineering/server-core' import { type Db } from 'mongodb' import { PassThrough } from 'stream' @@ -143,6 +143,20 @@ async function processAdapter ( const iterator = await source.listStream(ctx, workspaceId) + const targetIterator = await target.listStream(ctx, workspaceId) + + const targetBlobs = new Map, ListBlobResult>() + + while (true) { + const part = await targetIterator.next() + for (const p of part) { + targetBlobs.set(p._id, p) + } + if (part.length === 0) { + break + } + } + const toRemove: string[] = [] try { while (true) { @@ -150,18 +164,18 @@ async function processAdapter ( if (dataBulk.length === 0) break for (const data of dataBulk) { - let targetBlob = await target.stat(ctx, workspaceId, data._id) - const sourceBlob = await source.stat(ctx, workspaceId, data._id) - - if (sourceBlob === undefined) { - console.error('blob not found', data._id) - continue - } + let targetBlob: Blob | ListBlobResult | undefined = targetBlobs.get(data._id) if (targetBlob !== undefined) { - console.log('Target blob already exists', targetBlob._id, targetBlob.contentType) + console.log('Target blob already exists', targetBlob._id) } if (targetBlob === undefined) { + const sourceBlob = await source.stat(ctx, workspaceId, data._id) + + if (sourceBlob === undefined) { + console.error('blob not found', data._id) + continue + } await rateLimiter.exec(async () => { try { await retryOnFailure( @@ -170,7 +184,12 @@ async function processAdapter ( async () => { await processFile(ctx, source, target, workspaceId, sourceBlob) // We need to sync and update aggregator table for now. - await exAdapter.syncBlobFromStorage(ctx, workspaceId, sourceBlob._id, exAdapter.defaultAdapter) + targetBlob = await exAdapter.syncBlobFromStorage( + ctx, + workspaceId, + sourceBlob._id, + exAdapter.defaultAdapter + ) }, 50 ) @@ -181,23 +200,14 @@ async function processAdapter ( console.error('failed to process blob', data._id, err) } }) - } - if (targetBlob === undefined) { - targetBlob = await target.stat(ctx, workspaceId, data._id) + if (targetBlob !== undefined && 'size' in targetBlob && (targetBlob as Blob).size === sourceBlob.size) { + // We could safely delete source blob + toRemove.push(sourceBlob._id) + } + processedBytes += sourceBlob.size } - - if ( - targetBlob !== undefined && - targetBlob.size === sourceBlob.size && - targetBlob.contentType === sourceBlob.contentType - ) { - // We could safely delete source blob - toRemove.push(sourceBlob._id) - } - processedCnt += 1 - processedBytes += sourceBlob.size if (processedCnt % 100 === 0) { await rateLimiter.waitProcessing() diff --git a/packages/storage/src/index.ts b/packages/storage/src/index.ts index 5a1a850d42..4390e23f75 100644 --- a/packages/storage/src/index.ts +++ b/packages/storage/src/index.ts @@ -14,6 +14,7 @@ // import { type Blob, type MeasureContext, type StorageIterator, type WorkspaceId } from '@hcengineering/core' +import { PlatformError, unknownError } from '@hcengineering/platform' import { type Readable } from 'stream' export type ListBlobResult = Omit @@ -77,7 +78,7 @@ export interface StorageAdapterEx extends StorageAdapter { workspaceId: WorkspaceId, objectName: string, provider?: string - ) => Promise + ) => Promise find: (ctx: MeasureContext, workspaceId: WorkspaceId) => StorageIterator } @@ -87,7 +88,9 @@ export interface StorageAdapterEx extends StorageAdapter { */ export class DummyStorageAdapter implements StorageAdapter, StorageAdapterEx { defaultAdapter: string = '' - async syncBlobFromStorage (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise {} + async syncBlobFromStorage (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise { + throw new PlatformError(unknownError('Method not implemented')) + } async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise {} diff --git a/server/s3/src/index.ts b/server/s3/src/index.ts index 1314ebd98e..9eaacc2440 100644 --- a/server/s3/src/index.ts +++ b/server/s3/src/index.ts @@ -299,7 +299,7 @@ export class S3Service implements StorageAdapter { version: result.VersionId ?? null } } catch (err: any) { - ctx.error('no object found', { error: err, objectName, workspaceId: workspaceId.name }) + ctx.warn('no object found', { error: err, objectName, workspaceId: workspaceId.name }) } } diff --git a/server/server-storage/src/aggregator.ts b/server/server-storage/src/aggregator.ts index b17b63a43c..7ad1c60b0b 100644 --- a/server/server-storage/src/aggregator.ts +++ b/server/server-storage/src/aggregator.ts @@ -51,7 +51,7 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE workspaceId: WorkspaceId, objectName: string, providerId?: string - ): Promise { + ): Promise { let current: Blob | undefined = ( await this.dbAdapter.find(ctx, workspaceId, DOMAIN_BLOB, { _id: objectName as Ref }, { limit: 1 }) ).shift() @@ -74,6 +74,9 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE } await this.dbAdapter.upload(ctx, workspaceId, DOMAIN_BLOB, [stat]) // TODO: We need to send notification about Blob is changed. + return stat + } else { + throw new NoSuchKeyError('No such blob found') } }