Speedup move files (#6849)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-10-09 14:46:19 +07:00 committed by GitHub
parent 9dc8ac3b4a
commit e9b3ef523e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 45 additions and 29 deletions

View File

@ -14,9 +14,9 @@
//
import { type Attachment } from '@hcengineering/attachment'
import { type Blob, type MeasureContext, type WorkspaceId, RateLimiter } from '@hcengineering/core'
import { type Blob, type MeasureContext, type Ref, type WorkspaceId, RateLimiter } from '@hcengineering/core'
import { DOMAIN_ATTACHMENT } from '@hcengineering/model-attachment'
import { type StorageAdapter, type StorageAdapterEx } from '@hcengineering/server-core'
import { type ListBlobResult, type StorageAdapter, type StorageAdapterEx } from '@hcengineering/server-core'
import { type Db } from 'mongodb'
import { PassThrough } from 'stream'
@ -143,6 +143,20 @@ async function processAdapter (
const iterator = await source.listStream(ctx, workspaceId)
const targetIterator = await target.listStream(ctx, workspaceId)
const targetBlobs = new Map<Ref<Blob>, ListBlobResult>()
while (true) {
const part = await targetIterator.next()
for (const p of part) {
targetBlobs.set(p._id, p)
}
if (part.length === 0) {
break
}
}
const toRemove: string[] = []
try {
while (true) {
@ -150,18 +164,18 @@ async function processAdapter (
if (dataBulk.length === 0) break
for (const data of dataBulk) {
let targetBlob = await target.stat(ctx, workspaceId, data._id)
const sourceBlob = await source.stat(ctx, workspaceId, data._id)
if (sourceBlob === undefined) {
console.error('blob not found', data._id)
continue
}
let targetBlob: Blob | ListBlobResult | undefined = targetBlobs.get(data._id)
if (targetBlob !== undefined) {
console.log('Target blob already exists', targetBlob._id, targetBlob.contentType)
console.log('Target blob already exists', targetBlob._id)
}
if (targetBlob === undefined) {
const sourceBlob = await source.stat(ctx, workspaceId, data._id)
if (sourceBlob === undefined) {
console.error('blob not found', data._id)
continue
}
await rateLimiter.exec(async () => {
try {
await retryOnFailure(
@ -170,7 +184,12 @@ async function processAdapter (
async () => {
await processFile(ctx, source, target, workspaceId, sourceBlob)
// We need to sync and update aggregator table for now.
await exAdapter.syncBlobFromStorage(ctx, workspaceId, sourceBlob._id, exAdapter.defaultAdapter)
targetBlob = await exAdapter.syncBlobFromStorage(
ctx,
workspaceId,
sourceBlob._id,
exAdapter.defaultAdapter
)
},
50
)
@ -181,23 +200,14 @@ async function processAdapter (
console.error('failed to process blob', data._id, err)
}
})
}
if (targetBlob === undefined) {
targetBlob = await target.stat(ctx, workspaceId, data._id)
if (targetBlob !== undefined && 'size' in targetBlob && (targetBlob as Blob).size === sourceBlob.size) {
// We could safely delete source blob
toRemove.push(sourceBlob._id)
}
processedBytes += sourceBlob.size
}
if (
targetBlob !== undefined &&
targetBlob.size === sourceBlob.size &&
targetBlob.contentType === sourceBlob.contentType
) {
// We could safely delete source blob
toRemove.push(sourceBlob._id)
}
processedCnt += 1
processedBytes += sourceBlob.size
if (processedCnt % 100 === 0) {
await rateLimiter.waitProcessing()

View File

@ -14,6 +14,7 @@
//
import { type Blob, type MeasureContext, type StorageIterator, type WorkspaceId } from '@hcengineering/core'
import { PlatformError, unknownError } from '@hcengineering/platform'
import { type Readable } from 'stream'
export type ListBlobResult = Omit<Blob, 'contentType' | 'version'>
@ -77,7 +78,7 @@ export interface StorageAdapterEx extends StorageAdapter {
workspaceId: WorkspaceId,
objectName: string,
provider?: string
) => Promise<void>
) => Promise<Blob>
find: (ctx: MeasureContext, workspaceId: WorkspaceId) => StorageIterator
}
@ -87,7 +88,9 @@ export interface StorageAdapterEx extends StorageAdapter {
*/
export class DummyStorageAdapter implements StorageAdapter, StorageAdapterEx {
defaultAdapter: string = ''
async syncBlobFromStorage (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<void> {}
async syncBlobFromStorage (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<Blob> {
throw new PlatformError(unknownError('Method not implemented'))
}
async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {}

View File

@ -299,7 +299,7 @@ export class S3Service implements StorageAdapter {
version: result.VersionId ?? null
}
} catch (err: any) {
ctx.error('no object found', { error: err, objectName, workspaceId: workspaceId.name })
ctx.warn('no object found', { error: err, objectName, workspaceId: workspaceId.name })
}
}

View File

@ -51,7 +51,7 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
workspaceId: WorkspaceId,
objectName: string,
providerId?: string
): Promise<void> {
): Promise<Blob> {
let current: Blob | undefined = (
await this.dbAdapter.find<Blob>(ctx, workspaceId, DOMAIN_BLOB, { _id: objectName as Ref<Blob> }, { limit: 1 })
).shift()
@ -74,6 +74,9 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
}
await this.dbAdapter.upload<Blob>(ctx, workspaceId, DOMAIN_BLOB, [stat])
// TODO: We need to send notification about Blob is changed.
return stat
} else {
throw new NoSuchKeyError('No such blob found')
}
}