diff --git a/packages/model/package.json b/packages/model/package.json index 0fffd805e8..718f46bd79 100644 --- a/packages/model/package.json +++ b/packages/model/package.json @@ -41,6 +41,7 @@ "@hcengineering/core": "^0.6.32", "@hcengineering/platform": "^0.6.11", "@hcengineering/storage": "^0.6.0", + "@hcengineering/analytics": "^0.6.0", "toposort": "^2.0.2", "fast-equals": "^5.0.1" }, diff --git a/packages/model/src/migration.ts b/packages/model/src/migration.ts index 7cc54caf18..070535147f 100644 --- a/packages/model/src/migration.ts +++ b/packages/model/src/migration.ts @@ -1,3 +1,4 @@ +import { Analytics } from '@hcengineering/analytics' import core, { Class, Client, @@ -154,7 +155,13 @@ export async function tryMigrate (client: MigrationClient, plugin: string, migra const states = client.migrateState.get(plugin) ?? new Set() for (const migration of migrations) { if (states.has(migration.state)) continue - await migration.func(client) + try { + await migration.func(client) + } catch (err: any) { + console.error(err) + Analytics.handleError(err) + continue + } const st: MigrationState = { plugin, state: migration.state, @@ -181,7 +188,13 @@ export async function tryUpgrade ( for (const migration of migrations) { if (states.has(migration.state)) continue const _client = await client() - await migration.func(_client) + try { + await migration.func(_client) + } catch (err: any) { + console.error(err) + Analytics.handleError(err) + continue + } const st: Data = { plugin, state: migration.state diff --git a/server/account/src/operations.ts b/server/account/src/operations.ts index 9f0358150f..2adc542461 100644 --- a/server/account/src/operations.ts +++ b/server/account/src/operations.ts @@ -968,13 +968,13 @@ export async function createWorkspace ( getWorkspaceId(workspaceInfo.workspace, productId), true, async (value) => { - await updateInfo({ createProgress: 20 + Math.round((Math.min(value, 100) / 100) * 30) }) + await updateInfo({ createProgress: 20 + Math.round((Math.min(value, 100) / 100) * 70) }) }, true, getStorageAdapter() ) const modelVersion = getModelVersion() - await updateInfo({ createProgress: 50 }) + await updateInfo({ createProgress: 90 }) // Skip tx update if version of init workspace are proper one. const skipTxUpdate = @@ -992,11 +992,11 @@ export async function createWorkspace ( ctxModellogger, skipTxUpdate, async (value) => { - await updateInfo({ createProgress: Math.round(50 + (Math.min(value, 100) / 100) * 40) }) + await updateInfo({ createProgress: Math.round(90 + (Math.min(value, 100) / 100) * 10) }) } ) ) - await updateInfo({ createProgress: 90 }) + await updateInfo({ createProgress: 99 }) } else { await childLogger.withLog('init-workspace', {}, async (ctx) => { await initModel(ctx, getTransactor(), wsId, txes, migrationOperation, ctxModellogger, async (value) => { diff --git a/server/backup/package.json b/server/backup/package.json index bcc197c704..ddb895f7c2 100644 --- a/server/backup/package.json +++ b/server/backup/package.json @@ -46,6 +46,7 @@ "@hcengineering/client-resources": "^0.6.27", "@hcengineering/client": "^0.6.18", "@hcengineering/model": "^0.6.11", + "@hcengineering/analytics": "^0.6.0", "tar-stream": "^2.2.0", "@hcengineering/server-tool": "^0.6.0", "@hcengineering/server-core": "^0.6.1" diff --git a/server/backup/src/backup.ts b/server/backup/src/backup.ts index 92ea016db6..ea256d4e0d 100644 --- a/server/backup/src/backup.ts +++ b/server/backup/src/backup.ts @@ -43,6 +43,7 @@ import { Writable } from 'stream' import { extract, Pack, pack } from 'tar-stream' import { createGunzip, gunzipSync, gzipSync } from 'zlib' import { BackupStorage } from './storage' +import { Analytics } from '@hcengineering/analytics' export * from './storage' const dataBlobSize = 50 * 1024 * 1024 @@ -231,7 +232,7 @@ export async function cloneWorkspace ( clearTime: boolean = true, progress: (value: number) => Promise, skipFullText: boolean, - storageAdapter?: StorageAdapter + storageAdapter: StorageAdapter ): Promise { await ctx.with( 'clone-workspace', @@ -255,10 +256,6 @@ export async function cloneWorkspace ( admin: 'true' })) as unknown as CoreClient & BackupClient ) - - const blobClientSource = new BlobClient(transactorUrl, sourceWorkspaceId) - const blobClientTarget = new BlobClient(transactorUrl, targetWorkspaceId) - try { const domains = sourceConnection .getHierarchy() @@ -290,6 +287,7 @@ export async function cloneWorkspace ( const needRetrieveChunks: Ref[][] = [] let processed = 0 + let domainProgress = 0 let st = Date.now() // Load all digest from collection. await ctx.with('retrieve-domain-info', { domain: c }, async (ctx) => { @@ -351,12 +349,12 @@ export async function cloneWorkspace ( if (clearTime) { docs = prepareClonedDocuments(docs, sourceConnection, skipFullText) } + const executor = new RateLimiter(10) for (const d of docs) { if (d._class === core.class.Blob) { const blob = d as Blob - const blobs: Buffer[] = [] - try { - if (storageAdapter !== undefined) { + await executor.exec(async () => { + try { ctx.info('clone blob', { name: blob._id, contentType: blob.contentType }) const readable = await storageAdapter.get(ctx, sourceWorkspaceId, blob._id) const passThrue = new PassThrough() @@ -369,29 +367,18 @@ export async function cloneWorkspace ( blob.contentType, blob.size ) - } else { - ctx.info('clone blob', { name: blob._id, contentType: blob.contentType }) - await ctx.with('download-blob', { contentType: blob.contentType }, async (ctx) => { - await blobClientSource.writeTo(ctx, blob._id, blob.size, { - write: (b, cb) => { - blobs.push(b) - cb() - }, - end: (cb) => { - cb() - } - }) - }) - await ctx.with('upload-blob', { contentType: blob.contentType }, async (ctx) => { - const buffer = Buffer.concat(blobs) - await blobClientTarget.upload(ctx, blob._id, buffer.length, blob.contentType, buffer) - }) + } catch (err: any) { + Analytics.handleError(err) + console.error(err) } - } catch (err: any) { - console.error(err) - } + domainProgress++ + await progress((100 / domains.length) * i + (100 / domains.length / processed) * domainProgress) + }) + } else { + domainProgress++ } } + await executor.waitProcessing() await ctx.with( 'upload-docs', {}, @@ -400,8 +387,10 @@ export async function cloneWorkspace ( }, { length: docs.length } ) + await progress((100 / domains.length) * i + (100 / domains.length / processed) * domainProgress) } catch (err: any) { console.log(err) + Analytics.handleError(err) // Put back. needRetrieveChunks.push(needRetrieve) continue @@ -414,6 +403,7 @@ export async function cloneWorkspace ( } } catch (err: any) { console.error(err) + Analytics.handleError(err) } finally { ctx.info('end clone') await ctx.with('close-source', {}, async (ctx) => { diff --git a/server/core/src/server/domainHelper.ts b/server/core/src/server/domainHelper.ts index 7f84c004f8..e5b6892a15 100644 --- a/server/core/src/server/domainHelper.ts +++ b/server/core/src/server/domainHelper.ts @@ -1,3 +1,4 @@ +import { Analytics } from '@hcengineering/analytics' import type { Doc, Domain, @@ -5,24 +6,32 @@ import type { FieldIndex, Hierarchy, MeasureContext, - ModelDb + ModelDb, + WorkspaceId } from '@hcengineering/core' import core, { DOMAIN_MODEL, IndexKind, IndexOrder } from '@hcengineering/core' import { deepEqual } from 'fast-equals' import type { DomainHelper, DomainHelperOperations } from '../adapter' -import { Analytics } from '@hcengineering/analytics' export class DomainIndexHelperImpl implements DomainHelper { domains = new Map>>() domainConfigurations: DomainIndexConfiguration[] = [] constructor ( + readonly ctx: MeasureContext, readonly hierarchy: Hierarchy, - readonly model: ModelDb + readonly model: ModelDb, + readonly workspaceId: WorkspaceId ) { const classes = model.findAllSync(core.class.Class, {}) - this.domainConfigurations = - model.findAllSync(core.class.DomainIndexConfiguration, {}) ?? [] + try { + this.domainConfigurations = + model.findAllSync(core.class.DomainIndexConfiguration, {}) ?? [] + } catch (err: any) { + this.domainConfigurations = [] + Analytics.handleError(err) + ctx.error('failed to find domain index configuration', { err }) + } this.domains = new Map>>() // Find all domains and indexed fields inside @@ -81,7 +90,7 @@ export class DomainIndexHelperImpl implements DomainHelper { if (forceCreate && !exists) { await operations.create(domain) - console.log('collection will be created', domain) + ctx.info('collection will be created', domain) exists = true } if (!exists) { diff --git a/server/core/src/server/index.ts b/server/core/src/server/index.ts index d5033da857..bddf5e2474 100644 --- a/server/core/src/server/index.ts +++ b/server/core/src/server/index.ts @@ -163,7 +163,7 @@ export async function createServerStorage ( ) } - const domainHelper = new DomainIndexHelperImpl(hierarchy, modelDb) + const domainHelper = new DomainIndexHelperImpl(metrics, hierarchy, modelDb, conf.workspace) return new TServerStorage( conf.domains, diff --git a/server/tool/src/index.ts b/server/tool/src/index.ts index 81d3d49173..66e41a2168 100644 --- a/server/tool/src/index.ts +++ b/server/tool/src/index.ts @@ -155,9 +155,16 @@ export async function initModel ( await progress(30) // Create update indexes - await createUpdateIndexes(ctx, connection, db, logger, async (value) => { - await progress(30 + (Math.min(value, 100) / 100) * 70) - }) + await createUpdateIndexes( + ctx, + connection, + db, + logger, + async (value) => { + await progress(30 + (Math.min(value, 100) / 100) * 70) + }, + workspaceId + ) await progress(100) } catch (e: any) { logger.error('error', { error: e }) @@ -403,9 +410,10 @@ async function createUpdateIndexes ( connection: CoreClient, db: Db, logger: ModelLogger, - progress: (value: number) => Promise + progress: (value: number) => Promise, + workspaceId: WorkspaceId ): Promise { - const domainHelper = new DomainIndexHelperImpl(connection.getHierarchy(), connection.getModel()) + const domainHelper = new DomainIndexHelperImpl(ctx, connection.getHierarchy(), connection.getModel(), workspaceId) const dbHelper = new DBCollectionHelper(db) await dbHelper.init() let completed = 0