diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index f7aebcb3d3..56671c74f9 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -90,7 +90,7 @@ import { restoreRecruitingTaskTypes } from './clean' import { changeConfiguration } from './configuration' -import { fixJsonMarkup } from './markup' +import { fixJsonMarkup, migrateMarkup } from './markup' import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin' import { fixAccountEmails, renameAccount } from './renameAccount' import { moveFiles } from './storage' @@ -1290,6 +1290,34 @@ export function devTool ( }) }) + program + .command('migrate-markup') + .description('migrates collaborative markup to storage') + .option('-w, --workspace ', 'Selected workspace only', '') + .option('-c, --concurrency ', 'Number of documents being processed concurrently', '10') + .action(async (cmd: { workspace: string, concurrency: string }) => { + const { mongodbUri } = prepareTools() + await withDatabase(mongodbUri, async (db, client) => { + await withStorage(mongodbUri, async (adapter) => { + const workspaces = await listWorkspacesPure(db) + for (const workspace of workspaces) { + if (cmd.workspace !== '' && workspace.workspace !== cmd.workspace) { + continue + } + + const wsId = getWorkspaceId(workspace.workspace) + const endpoint = await getTransactorEndpoint(generateToken(systemAccountEmail, wsId), 'external') + + console.log('processing workspace', workspace.workspace) + + await migrateMarkup(toolCtx, adapter, wsId, client, endpoint, parseInt(cmd.concurrency)) + + console.log('...done', workspace.workspace) + } + }) + }) + }) + program .command('remove-duplicates-ids ') .description('remove duplicates ids for futue migration') diff --git a/dev/tool/src/markup.ts b/dev/tool/src/markup.ts index da3cdb9d75..7c194924ff 100644 --- a/dev/tool/src/markup.ts +++ b/dev/tool/src/markup.ts @@ -1,19 +1,23 @@ +import { saveCollaborativeDoc } from '@hcengineering/collaboration' import core, { type AnyAttribute, type Class, type Client as CoreClient, type Doc, type Domain, + type Hierarchy, type MeasureContext, type Ref, type WorkspaceId, + RateLimiter, + collaborativeDocParse, makeCollaborativeDoc } from '@hcengineering/core' import { getMongoClient, getWorkspaceDB } from '@hcengineering/mongo' import { type StorageAdapter } from '@hcengineering/server-core' import { connect } from '@hcengineering/server-tool' -import { jsonToText } from '@hcengineering/text' -import { type Db } from 'mongodb' +import { jsonToText, markupToYDoc } from '@hcengineering/text' +import { type Db, type FindCursor, type MongoClient } from 'mongodb' export async function fixJsonMarkup ( ctx: MeasureContext, @@ -110,3 +114,105 @@ async function processFixJsonMarkupFor ( console.log('...processed', docs.length) } + +export async function migrateMarkup ( + ctx: MeasureContext, + storageAdapter: StorageAdapter, + workspaceId: WorkspaceId, + client: MongoClient, + transactorUrl: string, + concurrency: number +): Promise { + const connection = (await connect(transactorUrl, workspaceId, undefined, { + mode: 'backup' + })) as unknown as CoreClient + + const hierarchy = connection.getHierarchy() + + const workspaceDb = client.db(workspaceId.name) + + try { + const classes = hierarchy.getDescendants(core.class.Doc) + for (const _class of classes) { + const domain = hierarchy.findDomain(_class) + if (domain === undefined) continue + + const allAttributes = hierarchy.getAllAttributes(_class) + const attributes = Array.from(allAttributes.values()).filter((attribute) => { + return hierarchy.isDerived(attribute.type._class, 'core:class:TypeCollaborativeMarkup' as Ref>) + }) + + if (attributes.length === 0) continue + if (hierarchy.isMixin(_class) && attributes.every((p) => p.attributeOf !== _class)) continue + + const collection = workspaceDb.collection(domain) + + const filter = hierarchy.isMixin(_class) ? { [_class]: { $exists: true } } : { _class } + + const count = await collection.countDocuments(filter) + const iterator = collection.find(filter) + + try { + console.log('processing', _class, '->', count) + await processMigrateMarkupFor(ctx, hierarchy, storageAdapter, workspaceId, attributes, iterator, concurrency) + } finally { + await iterator.close() + } + } + } finally { + await connection.close() + } +} + +async function processMigrateMarkupFor ( + ctx: MeasureContext, + hierarchy: Hierarchy, + storageAdapter: StorageAdapter, + workspaceId: WorkspaceId, + attributes: AnyAttribute[], + iterator: FindCursor, + concurrency: number +): Promise { + const rateLimiter = new RateLimiter(concurrency) + + let processed = 0 + + while (true) { + const doc = await iterator.next() + if (doc === null) break + + const timestamp = Date.now() + const revisionId = `${timestamp}` + + await rateLimiter.exec(async () => { + for (const attribute of attributes) { + const collaborativeDoc = makeCollaborativeDoc(doc._id, attribute.name, revisionId) + const { documentId } = collaborativeDocParse(collaborativeDoc) + + const value = hierarchy.isMixin(attribute.attributeOf) + ? ((doc as any)[attribute.attributeOf]?.[attribute.name] as string) + : ((doc as any)[attribute.name] as string) + + if (value != null && value.startsWith('{')) { + const blob = await storageAdapter.stat(ctx, workspaceId, documentId) + // only for documents not in storage + if (blob === undefined) { + const ydoc = markupToYDoc(value, attribute.name) + await saveCollaborativeDoc(storageAdapter, workspaceId, collaborativeDoc, ydoc, ctx) + } + } + } + }) + + processed += 1 + + if (processed % 100 === 0) { + await rateLimiter.waitProcessing() + console.log('...processing', processed) + } + } + + await rateLimiter.waitProcessing() + + console.log('processed', processed) +} diff --git a/dev/tool/src/storage.ts b/dev/tool/src/storage.ts index 47387298de..27f7e23792 100644 --- a/dev/tool/src/storage.ts +++ b/dev/tool/src/storage.ts @@ -14,22 +14,22 @@ // import { type Blob, type MeasureContext, type WorkspaceId, RateLimiter } from '@hcengineering/core' -import { type StorageAdapterEx } from '@hcengineering/server-core' +import { type StorageAdapter, type StorageAdapterEx } from '@hcengineering/server-core' import { PassThrough } from 'stream' +export interface MoveFilesParams { + blobSizeLimitMb: number + concurrency: number +} + export async function moveFiles ( ctx: MeasureContext, workspaceId: WorkspaceId, exAdapter: StorageAdapterEx, - params: { - blobSizeLimitMb: number - concurrency: number - } + params: MoveFilesParams ): Promise { if (exAdapter.adapters === undefined) return - let count = 0 - console.log('start', workspaceId.name) // We assume that the adapter moves all new files to the default adapter @@ -38,74 +38,125 @@ export async function moveFiles ( for (const [name, adapter] of exAdapter.adapters.entries()) { if (name === target) continue + console.log('moving from', name, 'limit', params.blobSizeLimitMb, 'concurrency', params.concurrency) - let time = Date.now() + // we attempt retry the whole process in case of failure + // files that were already moved will be skipped + await retryOnFailure(ctx, 5, async () => { + await processAdapter(ctx, exAdapter, adapter, workspaceId, params) + }) + } - const rateLimiter = new RateLimiter(params.concurrency) + console.log('...done', workspaceId.name) +} - const iterator = await adapter.listStream(ctx, workspaceId) +async function processAdapter ( + ctx: MeasureContext, + exAdapter: StorageAdapterEx, + adapter: StorageAdapter, + workspaceId: WorkspaceId, + params: MoveFilesParams +): Promise { + const target = exAdapter.defaultAdapter + + let time = Date.now() + let processedCnt = 0 + let processedBytes = 0 + let skippedCnt = 0 + let movedCnt = 0 + let movedBytes = 0 + let batchBytes = 0 + + const rateLimiter = new RateLimiter(params.concurrency) + + const iterator = await adapter.listStream(ctx, workspaceId) + try { while (true) { const data = await iterator.next() if (data === undefined) break - const blob = await exAdapter.stat(ctx, workspaceId, data._id) - if (blob === undefined) continue - if (blob.provider === target) continue + const blob = + (await exAdapter.stat(ctx, workspaceId, data._id)) ?? (await adapter.stat(ctx, workspaceId, data._id)) - if (blob.size > params.blobSizeLimitMb * 1024 * 1024) { - console.log('skipping large blob', name, data._id, Math.round(blob.size / 1024 / 1024)) + if (blob === undefined) { + console.error('blob not found', data._id) continue } - await rateLimiter.exec(async () => { - try { - await retryOnFailure( - ctx, - 5, - async () => { - await moveFile(ctx, exAdapter, workspaceId, blob) - }, - 50 - ) - } catch (err) { - console.error('failed to process blob', name, data._id, err) + if (blob.provider !== target) { + if (blob.size <= params.blobSizeLimitMb * 1024 * 1024) { + await rateLimiter.exec(async () => { + try { + await retryOnFailure( + ctx, + 5, + async () => { + await processFile(ctx, exAdapter, adapter, workspaceId, blob) + }, + 50 + ) + movedCnt += 1 + movedBytes += blob.size + batchBytes += blob.size + } catch (err) { + console.error('failed to process blob', data._id, err) + } + }) + } else { + skippedCnt += 1 + console.log('skipping large blob', data._id, Math.round(blob.size / 1024 / 1024)) } - }) + } - count += 1 - if (count % 100 === 0) { + processedCnt += 1 + processedBytes += blob.size + + if (processedCnt % 100 === 0) { await rateLimiter.waitProcessing() + const duration = Date.now() - time + + console.log( + '...processed', + processedCnt, + Math.round(processedBytes / 1024 / 1024) + 'MB', + 'moved', + movedCnt, + Math.round(movedBytes / 1024 / 1024) + 'MB', + '+' + Math.round(batchBytes / 1024 / 1024) + 'MB', + 'skipped', + skippedCnt, + Math.round(duration / 1000) + 's' + ) + + batchBytes = 0 time = Date.now() - console.log('...moved: ', count, Math.round(duration / 1000)) } } await rateLimiter.waitProcessing() - + } finally { await iterator.close() } - - console.log('...done', workspaceId.name, count) } -async function moveFile ( +async function processFile ( ctx: MeasureContext, exAdapter: StorageAdapterEx, + adapter: StorageAdapter, workspaceId: WorkspaceId, blob: Blob ): Promise { - const readable = await exAdapter.get(ctx, workspaceId, blob._id) + const readable = await adapter.get(ctx, workspaceId, blob._id) try { readable.on('end', () => { readable.destroy() }) const stream = readable.pipe(new PassThrough()) await exAdapter.put(ctx, workspaceId, blob._id, stream, blob.contentType, blob.size) - } catch (err) { + } finally { readable.destroy() - throw err } } @@ -115,18 +166,19 @@ async function retryOnFailure ( op: () => Promise, delay: number = 0 ): Promise { - let error: any + let lastError: any while (retries > 0) { retries-- try { return await op() } catch (err: any) { - error = err + console.error(err) + lastError = err ctx.error('error', { err, retries }) if (retries !== 0 && delay > 0) { await new Promise((resolve) => setTimeout(resolve, delay)) } } } - throw error + throw lastError } diff --git a/models/core/src/migration.ts b/models/core/src/migration.ts index 6efe17ca4c..7282d9ae27 100644 --- a/models/core/src/migration.ts +++ b/models/core/src/migration.ts @@ -165,16 +165,20 @@ async function migrateCollaborativeContentToStorage (client: MigrationClient): P const domain = hierarchy.findDomain(_class) if (domain === undefined) continue - const attributes = hierarchy.getAllAttributes(_class) - const filtered = Array.from(attributes.values()).filter((attribute) => { + const allAttributes = hierarchy.getAllAttributes(_class) + const attributes = Array.from(allAttributes.values()).filter((attribute) => { return hierarchy.isDerived(attribute.type._class, core.class.TypeCollaborativeDoc) }) - if (filtered.length === 0) continue - const iterator = await client.traverse(domain, { _class }) + if (attributes.length === 0) continue + if (hierarchy.isMixin(_class) && attributes.every((p) => p.attributeOf !== _class)) continue + + const query = hierarchy.isMixin(_class) ? { [_class]: { $exists: true } } : { _class } + + const iterator = await client.traverse(domain, query) try { console.log('processing', _class) - await processMigrateContentFor(ctx, domain, filtered, client, storageAdapter, iterator) + await processMigrateContentFor(ctx, domain, attributes, client, storageAdapter, iterator) } finally { await iterator.close() } @@ -189,6 +193,8 @@ async function processMigrateContentFor ( storageAdapter: StorageAdapter, iterator: MigrationIterator ): Promise { + const hierarchy = client.hierarchy + const rateLimiter = new RateLimiter(10) let processed = 0 @@ -211,7 +217,14 @@ async function processMigrateContentFor ( for (const attribute of attributes) { const collaborativeDoc = makeCollaborativeDoc(doc._id, attribute.name, revisionId) - const value = (doc as any)[attribute.name] as string + const value = hierarchy.isMixin(attribute.attributeOf) + ? ((doc as any)[attribute.attributeOf]?.[attribute.name] as string) + : ((doc as any)[attribute.name] as string) + + const attributeName = hierarchy.isMixin(attribute.attributeOf) + ? `${attribute.attributeOf}.${attribute.name}` + : attribute.name + if (value != null && value.startsWith('{')) { const { documentId } = collaborativeDocParse(collaborativeDoc) const blob = await storageAdapter.stat(ctx, client.workspaceId, documentId) @@ -221,9 +234,9 @@ async function processMigrateContentFor ( await saveCollaborativeDoc(storageAdapter, client.workspaceId, collaborativeDoc, ydoc, ctx) } - update[attribute.name] = collaborativeDoc - } else if (value == null) { - update[attribute.name] = makeCollaborativeDoc(doc._id, attribute.name, revisionId) + update[attributeName] = collaborativeDoc + } else if (value == null || value === '') { + update[attributeName] = collaborativeDoc } }