UBERF-7501: Copy few blobs in parallel (#5995)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-07-03 21:29:59 +07:00 committed by GitHub
parent 6756c43e4e
commit b836039903
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 68 additions and 46 deletions

View File

@ -41,6 +41,7 @@
"@hcengineering/core": "^0.6.32",
"@hcengineering/platform": "^0.6.11",
"@hcengineering/storage": "^0.6.0",
"@hcengineering/analytics": "^0.6.0",
"toposort": "^2.0.2",
"fast-equals": "^5.0.1"
},

View File

@ -1,3 +1,4 @@
import { Analytics } from '@hcengineering/analytics'
import core, {
Class,
Client,
@ -154,7 +155,13 @@ export async function tryMigrate (client: MigrationClient, plugin: string, migra
const states = client.migrateState.get(plugin) ?? new Set()
for (const migration of migrations) {
if (states.has(migration.state)) continue
await migration.func(client)
try {
await migration.func(client)
} catch (err: any) {
console.error(err)
Analytics.handleError(err)
continue
}
const st: MigrationState = {
plugin,
state: migration.state,
@ -181,7 +188,13 @@ export async function tryUpgrade (
for (const migration of migrations) {
if (states.has(migration.state)) continue
const _client = await client()
await migration.func(_client)
try {
await migration.func(_client)
} catch (err: any) {
console.error(err)
Analytics.handleError(err)
continue
}
const st: Data<MigrationState> = {
plugin,
state: migration.state

View File

@ -968,13 +968,13 @@ export async function createWorkspace (
getWorkspaceId(workspaceInfo.workspace, productId),
true,
async (value) => {
await updateInfo({ createProgress: 20 + Math.round((Math.min(value, 100) / 100) * 30) })
await updateInfo({ createProgress: 20 + Math.round((Math.min(value, 100) / 100) * 70) })
},
true,
getStorageAdapter()
)
const modelVersion = getModelVersion()
await updateInfo({ createProgress: 50 })
await updateInfo({ createProgress: 90 })
// Skip tx update if version of init workspace are proper one.
const skipTxUpdate =
@ -992,11 +992,11 @@ export async function createWorkspace (
ctxModellogger,
skipTxUpdate,
async (value) => {
await updateInfo({ createProgress: Math.round(50 + (Math.min(value, 100) / 100) * 40) })
await updateInfo({ createProgress: Math.round(90 + (Math.min(value, 100) / 100) * 10) })
}
)
)
await updateInfo({ createProgress: 90 })
await updateInfo({ createProgress: 99 })
} else {
await childLogger.withLog('init-workspace', {}, async (ctx) => {
await initModel(ctx, getTransactor(), wsId, txes, migrationOperation, ctxModellogger, async (value) => {

View File

@ -46,6 +46,7 @@
"@hcengineering/client-resources": "^0.6.27",
"@hcengineering/client": "^0.6.18",
"@hcengineering/model": "^0.6.11",
"@hcengineering/analytics": "^0.6.0",
"tar-stream": "^2.2.0",
"@hcengineering/server-tool": "^0.6.0",
"@hcengineering/server-core": "^0.6.1"

View File

@ -43,6 +43,7 @@ import { Writable } from 'stream'
import { extract, Pack, pack } from 'tar-stream'
import { createGunzip, gunzipSync, gzipSync } from 'zlib'
import { BackupStorage } from './storage'
import { Analytics } from '@hcengineering/analytics'
export * from './storage'
const dataBlobSize = 50 * 1024 * 1024
@ -231,7 +232,7 @@ export async function cloneWorkspace (
clearTime: boolean = true,
progress: (value: number) => Promise<void>,
skipFullText: boolean,
storageAdapter?: StorageAdapter
storageAdapter: StorageAdapter
): Promise<void> {
await ctx.with(
'clone-workspace',
@ -255,10 +256,6 @@ export async function cloneWorkspace (
admin: 'true'
})) as unknown as CoreClient & BackupClient
)
const blobClientSource = new BlobClient(transactorUrl, sourceWorkspaceId)
const blobClientTarget = new BlobClient(transactorUrl, targetWorkspaceId)
try {
const domains = sourceConnection
.getHierarchy()
@ -290,6 +287,7 @@ export async function cloneWorkspace (
const needRetrieveChunks: Ref<Doc>[][] = []
let processed = 0
let domainProgress = 0
let st = Date.now()
// Load all digest from collection.
await ctx.with('retrieve-domain-info', { domain: c }, async (ctx) => {
@ -351,12 +349,12 @@ export async function cloneWorkspace (
if (clearTime) {
docs = prepareClonedDocuments(docs, sourceConnection, skipFullText)
}
const executor = new RateLimiter(10)
for (const d of docs) {
if (d._class === core.class.Blob) {
const blob = d as Blob
const blobs: Buffer[] = []
try {
if (storageAdapter !== undefined) {
await executor.exec(async () => {
try {
ctx.info('clone blob', { name: blob._id, contentType: blob.contentType })
const readable = await storageAdapter.get(ctx, sourceWorkspaceId, blob._id)
const passThrue = new PassThrough()
@ -369,29 +367,18 @@ export async function cloneWorkspace (
blob.contentType,
blob.size
)
} else {
ctx.info('clone blob', { name: blob._id, contentType: blob.contentType })
await ctx.with('download-blob', { contentType: blob.contentType }, async (ctx) => {
await blobClientSource.writeTo(ctx, blob._id, blob.size, {
write: (b, cb) => {
blobs.push(b)
cb()
},
end: (cb) => {
cb()
}
})
})
await ctx.with('upload-blob', { contentType: blob.contentType }, async (ctx) => {
const buffer = Buffer.concat(blobs)
await blobClientTarget.upload(ctx, blob._id, buffer.length, blob.contentType, buffer)
})
} catch (err: any) {
Analytics.handleError(err)
console.error(err)
}
} catch (err: any) {
console.error(err)
}
domainProgress++
await progress((100 / domains.length) * i + (100 / domains.length / processed) * domainProgress)
})
} else {
domainProgress++
}
}
await executor.waitProcessing()
await ctx.with(
'upload-docs',
{},
@ -400,8 +387,10 @@ export async function cloneWorkspace (
},
{ length: docs.length }
)
await progress((100 / domains.length) * i + (100 / domains.length / processed) * domainProgress)
} catch (err: any) {
console.log(err)
Analytics.handleError(err)
// Put back.
needRetrieveChunks.push(needRetrieve)
continue
@ -414,6 +403,7 @@ export async function cloneWorkspace (
}
} catch (err: any) {
console.error(err)
Analytics.handleError(err)
} finally {
ctx.info('end clone')
await ctx.with('close-source', {}, async (ctx) => {

View File

@ -1,3 +1,4 @@
import { Analytics } from '@hcengineering/analytics'
import type {
Doc,
Domain,
@ -5,24 +6,32 @@ import type {
FieldIndex,
Hierarchy,
MeasureContext,
ModelDb
ModelDb,
WorkspaceId
} from '@hcengineering/core'
import core, { DOMAIN_MODEL, IndexKind, IndexOrder } from '@hcengineering/core'
import { deepEqual } from 'fast-equals'
import type { DomainHelper, DomainHelperOperations } from '../adapter'
import { Analytics } from '@hcengineering/analytics'
export class DomainIndexHelperImpl implements DomainHelper {
domains = new Map<Domain, Set<string | FieldIndex<Doc>>>()
domainConfigurations: DomainIndexConfiguration[] = []
constructor (
readonly ctx: MeasureContext,
readonly hierarchy: Hierarchy,
readonly model: ModelDb
readonly model: ModelDb,
readonly workspaceId: WorkspaceId
) {
const classes = model.findAllSync(core.class.Class, {})
this.domainConfigurations =
model.findAllSync<DomainIndexConfiguration>(core.class.DomainIndexConfiguration, {}) ?? []
try {
this.domainConfigurations =
model.findAllSync<DomainIndexConfiguration>(core.class.DomainIndexConfiguration, {}) ?? []
} catch (err: any) {
this.domainConfigurations = []
Analytics.handleError(err)
ctx.error('failed to find domain index configuration', { err })
}
this.domains = new Map<Domain, Set<string | FieldIndex<Doc>>>()
// Find all domains and indexed fields inside
@ -81,7 +90,7 @@ export class DomainIndexHelperImpl implements DomainHelper {
if (forceCreate && !exists) {
await operations.create(domain)
console.log('collection will be created', domain)
ctx.info('collection will be created', domain)
exists = true
}
if (!exists) {

View File

@ -163,7 +163,7 @@ export async function createServerStorage (
)
}
const domainHelper = new DomainIndexHelperImpl(hierarchy, modelDb)
const domainHelper = new DomainIndexHelperImpl(metrics, hierarchy, modelDb, conf.workspace)
return new TServerStorage(
conf.domains,

View File

@ -155,9 +155,16 @@ export async function initModel (
await progress(30)
// Create update indexes
await createUpdateIndexes(ctx, connection, db, logger, async (value) => {
await progress(30 + (Math.min(value, 100) / 100) * 70)
})
await createUpdateIndexes(
ctx,
connection,
db,
logger,
async (value) => {
await progress(30 + (Math.min(value, 100) / 100) * 70)
},
workspaceId
)
await progress(100)
} catch (e: any) {
logger.error('error', { error: e })
@ -403,9 +410,10 @@ async function createUpdateIndexes (
connection: CoreClient,
db: Db,
logger: ModelLogger,
progress: (value: number) => Promise<void>
progress: (value: number) => Promise<void>,
workspaceId: WorkspaceId
): Promise<void> {
const domainHelper = new DomainIndexHelperImpl(connection.getHierarchy(), connection.getModel())
const domainHelper = new DomainIndexHelperImpl(ctx, connection.getHierarchy(), connection.getModel(), workspaceId)
const dbHelper = new DBCollectionHelper(db)
await dbHelper.init()
let completed = 0