From f9962fe08664e3c62866420d388419b2ae970729 Mon Sep 17 00:00:00 2001 From: Denis Bykhov Date: Fri, 12 Jul 2024 12:04:29 +0500 Subject: [PATCH] remove duplicate ids tool (#6052) Signed-off-by: Denis Bykhov --- dev/tool/package.json | 2 + dev/tool/src/clean.ts | 348 +++++++++++++++++++++++++++++++- dev/tool/src/index.ts | 11 + models/contact/src/index.ts | 3 +- models/time/src/index.ts | 4 +- packages/core/src/operations.ts | 18 +- packages/text/src/ydoc.ts | 37 +++- 7 files changed, 407 insertions(+), 16 deletions(-) diff --git a/dev/tool/package.json b/dev/tool/package.json index be0414b5b6..e48fd7ea0a 100644 --- a/dev/tool/package.json +++ b/dev/tool/package.json @@ -55,6 +55,7 @@ "@elastic/elasticsearch": "^7.14.0", "@hcengineering/account": "^0.6.0", "@hcengineering/attachment": "^0.6.14", + "@hcengineering/calendar": "^0.6.24", "@hcengineering/chunter": "^0.6.20", "@hcengineering/client": "^0.6.18", "@hcengineering/activity": "^0.6.0", @@ -137,6 +138,7 @@ "@hcengineering/text": "^0.6.5", "@hcengineering/telegram": "^0.6.21", "@hcengineering/tracker": "^0.6.24", + "@hcengineering/collaboration": "^0.6.0", "commander": "^8.1.0", "csv-parse": "~5.1.0", "email-addresses": "^5.0.0", diff --git a/dev/tool/src/clean.ts b/dev/tool/src/clean.ts index 61189a225e..001ae64196 100644 --- a/dev/tool/src/clean.ts +++ b/dev/tool/src/clean.ts @@ -13,19 +13,33 @@ // limitations under the License. // +import { ACCOUNT_DB, listWorkspacesRaw } from '@hcengineering/account' import attachment from '@hcengineering/attachment' +import calendar from '@hcengineering/calendar' import chunter, { type ChatMessage } from '@hcengineering/chunter' +import { loadCollaborativeDoc, saveCollaborativeDoc, yDocToBuffer } from '@hcengineering/collaboration' import contact from '@hcengineering/contact' import core, { + type ArrOf, ClassifierKind, + type CollaborativeDoc, + DOMAIN_BENCHMARK, + DOMAIN_DOC_INDEX_STATE, + DOMAIN_MODEL, DOMAIN_STATUS, DOMAIN_TX, + type DocumentUpdate, + type Hierarchy, + type Markup, + type RefTo, SortingOrder, TxOperations, TxProcessor, generateId, getObjectValue, + getWorkspaceId, toIdMap, + updateAttribute, type BackupClient, type Class, type Client as CoreClient, @@ -39,20 +53,24 @@ import core, { type TxCreateDoc, type TxMixin, type TxUpdateDoc, - type WorkspaceId + type WorkspaceId, + DOMAIN_MIGRATION, + type MigrationState, + type RelatedDocument } from '@hcengineering/core' -import { DOMAIN_ACTIVITY } from '@hcengineering/model-activity' +import activity, { DOMAIN_ACTIVITY } from '@hcengineering/model-activity' import { DOMAIN_SPACE } from '@hcengineering/model-core' import recruitModel, { defaultApplicantStatuses } from '@hcengineering/model-recruit' -import { getWorkspaceDB } from '@hcengineering/mongo' +import { getMongoClient, getWorkspaceDB } from '@hcengineering/mongo' import recruit, { type Applicant, type Vacancy } from '@hcengineering/recruit' import { type StorageAdapter } from '@hcengineering/server-core' import { connect } from '@hcengineering/server-tool' import tags, { type TagCategory, type TagElement, type TagReference } from '@hcengineering/tags' import task, { type ProjectType, type Task, type TaskType } from '@hcengineering/task' +import { updateYDocContent } from '@hcengineering/text' import tracker from '@hcengineering/tracker' import { deepEqual } from 'fast-equals' -import { MongoClient } from 'mongodb' +import { type Db, MongoClient } from 'mongodb' export async function cleanWorkspace ( ctx: MeasureContext, @@ -1030,3 +1048,325 @@ export async function restoreHrTaskTypesFromUpdates ( await connection.close() } } + +export async function removeDuplicateIds ( + ctx: MeasureContext, + mongodbUri: string, + storageAdapter: StorageAdapter, + productId: string, + transactorUrl: string, + initWorkspacesStr: string +): Promise { + const state = 'REMOVE_DUPLICATE_IDS' + const mongoClient = getMongoClient(mongodbUri) + const _client = await mongoClient.getClient() + // disable spaces while change hardocded ids + const skippedDomains: string[] = [DOMAIN_DOC_INDEX_STATE, DOMAIN_BENCHMARK, DOMAIN_TX, DOMAIN_SPACE] + try { + const workspaces = await listWorkspacesRaw(_client.db(ACCOUNT_DB), productId) + workspaces.sort((a, b) => b.lastVisit - a.lastVisit) + const initWorkspaces = initWorkspacesStr.split(';') + const initWS = workspaces.filter((p) => initWorkspaces.includes(p.workspace)) + const ids = new Map() + for (const workspace of initWS) { + const workspaceId = getWorkspaceId(workspace.workspace, productId) + const db = getWorkspaceDB(_client, workspaceId) + + const txex = await db.collection(DOMAIN_TX).find>({}).toArray() + const txesArr = [] + for (const obj of txex) { + if (obj.objectSpace === core.space.Model && !isPersonAccount(obj)) { + continue + } + txesArr.push({ _id: obj._id, _class: obj._class }) + } + txesArr.filter((it, idx, array) => array.findIndex((pt) => pt._id === it._id) === idx) + ids.set(DOMAIN_TX, txesArr) + + const colls = await db.collections() + for (const coll of colls) { + if (skippedDomains.includes(coll.collectionName)) continue + const arr = ids.get(coll.collectionName) ?? [] + const data = await coll.find({}, { projection: { _id: 1, _class: 1 } }).toArray() + for (const obj of data) { + arr.push(obj) + } + ids.set(coll.collectionName, arr) + } + + const arr = ids.get(DOMAIN_MODEL) ?? [] + const data = await db + .collection(DOMAIN_TX) + .find>( + { objectSpace: core.space.Model }, + { projection: { objectId: 1, objectClass: 1, modifiedBy: 1 } } + ) + .toArray() + for (const obj of data) { + if ( + (obj.modifiedBy === core.account.ConfigUser || obj.modifiedBy === core.account.System) && + !isPersonAccount(obj) + ) { + continue + } + if (obj.objectId === core.account.ConfigUser || obj.objectId === core.account.System) continue + arr.push({ _id: obj.objectId, _class: obj.objectClass }) + } + arr.filter((it, idx, array) => array.findIndex((pt) => pt._id === it._id) === idx) + ids.set(DOMAIN_MODEL, arr) + } + + for (let index = 0; index < workspaces.length; index++) { + const workspace = workspaces[index] + // we should skip init workspace first time, for case if something went wrong + if (initWorkspaces.includes(workspace.workspace)) continue + + ctx.info(`Processing workspace ${workspace.workspaceName ?? workspace.workspace}`) + const workspaceId = getWorkspaceId(workspace.workspace, productId) + const db = getWorkspaceDB(_client, workspaceId) + const check = await db.collection(DOMAIN_MIGRATION).findOne({ state, plugin: workspace.workspace }) + if (check != null) continue + + const wsClient = (await connect(transactorUrl, workspaceId, undefined, { + model: 'upgrade' + })) as CoreClient & BackupClient + for (const set of ids) { + if (set[1].length === 0) continue + for (const doc of set[1]) { + await updateId(ctx, wsClient, db, storageAdapter, workspaceId, doc) + } + } + await wsClient.sendForceClose() + await wsClient.close() + await db.collection(DOMAIN_MIGRATION).insertOne({ + _id: generateId(), + state, + plugin: workspace.workspace, + space: core.space.Configuration, + modifiedOn: Date.now(), + modifiedBy: core.account.System, + _class: core.class.MigrationState + }) + ctx.info(`Done ${index} / ${workspaces.length - initWorkspaces.length}`) + } + } catch (err: any) { + console.trace(err) + } finally { + mongoClient.close() + } +} + +function isPersonAccount (tx: TxCUD): boolean { + return tx.objectClass === contact.class.PersonAccount +} + +async function update (h: Hierarchy, db: Db, doc: T, update: DocumentUpdate): Promise { + await db.collection(h.getDomain(doc._class)).updateOne({ _id: doc._id }, { $set: { ...update, '%hash%': null } }) +} + +async function updateId ( + ctx: MeasureContext, + client: CoreClient & BackupClient, + db: Db, + storage: StorageAdapter, + workspaceId: WorkspaceId, + docRef: RelatedDocument +): Promise { + const h = client.getHierarchy() + const txop = new TxOperations(client, core.account.System) + try { + // chech the doc exists + const doc = await client.findOne(docRef._class, { _id: docRef._id }) + if (doc === undefined) return + const domain = h.getDomain(doc._class) + const newId = generateId() + + // update txes + await db.collection(DOMAIN_TX).updateMany({ objectId: doc._id }, { $set: { objectId: newId, '%hash%': null } }) + + // update nested txes + await db + .collection(DOMAIN_TX) + .updateMany({ 'tx.objectId': doc._id }, { $set: { 'tx.objectId': newId, '%hash%': null } }) + + // we have generated ids for calendar, let's update in + if (h.isDerived(doc._class, core.class.Account)) { + await updateId(ctx, client, db, storage, workspaceId, { + _id: `${doc._id}_calendar` as Ref, + _class: calendar.class.Calendar + }) + } + + // update backlinks + const backlinks = await client.findAll(activity.class.ActivityReference, { attachedTo: doc._id }) + for (const backlink of backlinks) { + const contentDoc = await client.findOne(backlink.attachedDocClass ?? backlink.srcDocClass, { + _id: backlink.attachedDocId ?? backlink.srcDocClass + }) + if (contentDoc !== undefined) { + const attrs = h.getAllAttributes(contentDoc._class) + for (const [attrName, attr] of attrs) { + if (attr.type._class === core.class.TypeMarkup) { + const markup = (contentDoc as any)[attrName] as Markup + const newMarkup = markup.replaceAll(doc._id, newId) + await update(h, db, contentDoc, { [attrName]: newMarkup }) + } else if (attr.type._class === core.class.TypeCollaborativeMarkup) { + const markup = (contentDoc as any)[attrName] + const newMarkup = markup.replaceAll(doc._id, newId) + await update(h, db, contentDoc, { [attrName]: newMarkup }) + } else if (attr.type._class === core.class.TypeCollaborativeDoc) { + const collaborativeDoc = (contentDoc as any)[attr.name] as CollaborativeDoc + await updateYDoc(ctx, collaborativeDoc, storage, workspaceId, contentDoc, newId, doc) + } + } + } + await update(h, db, backlink, { attachedTo: newId, message: backlink.message.replaceAll(doc._id, newId) }) + } + + // blobs + + await updateRefs(txop, newId, doc) + + await updateArrRefs(txop, newId, doc) + + // update docIndexState + const docIndexState = await client.findOne(core.class.DocIndexState, { doc: doc._id }) + if (docIndexState !== undefined) { + const { _id, space, modifiedBy, modifiedOn, createdBy, createdOn, _class, ...data } = docIndexState + await txop.createDoc(docIndexState._class, docIndexState.space, { + ...data, + stages: {}, + removed: false + }) + await txop.update(docIndexState, { removed: true }) + } + + if (domain !== DOMAIN_MODEL) { + const raw = await db.collection(domain).findOne({ _id: doc._id }) + await db.collection(domain).insertOne({ + ...raw, + _id: newId as any, + '%hash%': null + }) + await db.collection(domain).deleteOne({ _id: doc._id }) + } + } catch (err: any) { + console.error('Error processing', docRef._id) + } +} + +async function updateYDoc ( + ctx: MeasureContext, + _id: CollaborativeDoc, + storage: StorageAdapter, + workspaceId: WorkspaceId, + contentDoc: Doc, + newId: Ref, + doc: RelatedDocument +): Promise { + try { + const ydoc = await loadCollaborativeDoc(storage, workspaceId, _id, ctx) + if (ydoc === undefined) { + ctx.error('document content not found', { document: contentDoc._id }) + return + } + const buffer = yDocToBuffer(ydoc) + + const updatedYDoc = updateYDocContent(buffer, (body: Record) => { + const str = JSON.stringify(body) + const updated = str.replaceAll(doc._id, newId) + return JSON.parse(updated) + }) + + if (updatedYDoc !== undefined) { + await saveCollaborativeDoc(storage, workspaceId, _id, updatedYDoc, ctx) + } + } catch { + // do nothing, the collaborative doc does not sem to exist yet + } +} + +async function updateRefs (client: TxOperations, newId: Ref, doc: RelatedDocument): Promise { + const h = client.getHierarchy() + const ancestors = h.getAncestors(doc._class) + const reftos = (await client.findAll(core.class.Attribute, { 'type._class': core.class.RefTo })).filter((it) => { + const to = it.type as RefTo + return ancestors.includes(h.getBaseClass(to.to)) + }) + for (const attr of reftos) { + if (attr.name === '_id') { + continue + } + const descendants = h.getDescendants(attr.attributeOf) + for (const d of descendants) { + if (h.isDerived(d, core.class.BenchmarkDoc)) { + continue + } + if (h.isDerived(d, core.class.Tx)) { + continue + } + if (h.findDomain(d) !== undefined) { + while (true) { + const values = await client.findAll(d, { [attr.name]: doc._id }, { limit: 100 }) + if (values.length === 0) { + break + } + + const builder = client.apply(doc._id) + for (const v of values) { + await updateAttribute(builder, v, d, { key: attr.name, attr }, newId, true) + } + const modelTxes = builder.txes.filter((p) => p.objectSpace === core.space.Model) + builder.txes = builder.txes.filter((p) => p.objectSpace !== core.space.Model) + for (const modelTx of modelTxes) { + await client.tx(modelTx) + } + await builder.commit() + } + } + } + } +} + +async function updateArrRefs (client: TxOperations, newId: Ref, doc: RelatedDocument): Promise { + const h = client.getHierarchy() + const ancestors = h.getAncestors(doc._class) + const arrs = await client.findAll(core.class.Attribute, { 'type._class': core.class.ArrOf }) + for (const attr of arrs) { + if (attr.name === '_id') { + continue + } + const to = attr.type as ArrOf + if (to.of._class !== core.class.RefTo) continue + const refto = to.of as RefTo + if (ancestors.includes(h.getBaseClass(refto.to))) { + const descendants = h.getDescendants(attr.attributeOf) + for (const d of descendants) { + if (h.isDerived(d, core.class.BenchmarkDoc)) { + continue + } + if (h.isDerived(d, core.class.Tx)) { + continue + } + if (h.findDomain(d) !== undefined) { + while (true) { + const values = await client.findAll(attr.attributeOf, { [attr.name]: doc._id }, { limit: 100 }) + if (values.length === 0) { + break + } + const builder = client.apply(doc._id) + for (const v of values) { + await updateAttribute(builder, v, d, { key: attr.name, attr }, newId, true) + } + const modelTxes = builder.txes.filter((p) => p.objectSpace === core.space.Model) + builder.txes = builder.txes.filter((p) => p.objectSpace !== core.space.Model) + for (const modelTx of modelTxes) { + await client.tx(modelTx) + } + await builder.commit() + } + } + } + } + } +} diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index eb316e8ff6..d178e14d55 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -82,6 +82,7 @@ import { fixMinioBW, fixSkills, optimizeModel, + removeDuplicateIds, restoreHrTaskTypesFromUpdates, restoreRecruitingTaskTypes } from './clean' @@ -1145,6 +1146,16 @@ export function devTool ( }) }) + program + .command('remove-duplicates-ids ') + .description('remove duplicates ids for futue migration') + .action(async (workspaces: string) => { + const { mongodbUri } = prepareTools() + await withStorage(mongodbUri, async (adapter) => { + await removeDuplicateIds(toolCtx, mongodbUri, adapter, productId, transactorUrl, workspaces) + }) + }) + extendProgram?.(program) program.parse(process.argv) diff --git a/models/contact/src/index.ts b/models/contact/src/index.ts index 7f354789a8..9bc7c59686 100644 --- a/models/contact/src/index.ts +++ b/models/contact/src/index.ts @@ -208,7 +208,8 @@ export class TEmployee extends TPerson implements Employee { @Model(contact.class.PersonAccount, core.class.Account) export class TPersonAccount extends TAccount implements PersonAccount { - person!: Ref + @Prop(TypeRef(contact.class.Person), contact.string.Person) + person!: Ref } @Model(contact.class.ContactsTab, core.class.Doc, DOMAIN_MODEL) diff --git a/models/time/src/index.ts b/models/time/src/index.ts index 33be2f0f91..39b6b4258b 100644 --- a/models/time/src/index.ts +++ b/models/time/src/index.ts @@ -103,7 +103,9 @@ export class TToDO extends TAttachedDoc implements ToDo { priority!: ToDoPriority visibility!: Visibility - attachedSpace?: Ref | undefined + + @Prop(TypeRef(core.class.Space), core.string.Space) + attachedSpace?: Ref | undefined @Prop(TypeString(), calendarPlugin.string.Title) title!: string diff --git a/packages/core/src/operations.ts b/packages/core/src/operations.ts index f6a0b7e258..d3271ecc3a 100644 --- a/packages/core/src/operations.ts +++ b/packages/core/src/operations.ts @@ -537,11 +537,13 @@ export async function updateAttribute ( _class: Ref>, attribute: { key: string, attr: AnyAttribute }, value: any, - modifyBy?: Ref + saveModified: boolean = false ): Promise { const doc = object const attributeKey = attribute.key if ((doc as any)[attributeKey] === value) return + const modifiedOn = saveModified ? doc.modifiedOn : Date.now() + const modifiedBy = attribute.key === 'modifiedBy' ? value : saveModified ? doc.modifiedBy : undefined const attr = attribute.attr if (client.getHierarchy().isMixin(attr.attributeOf)) { await client.updateMixin( @@ -550,30 +552,30 @@ export async function updateAttribute ( doc.space, attr.attributeOf, { [attributeKey]: value }, - Date.now(), - modifyBy + modifiedOn, + modifiedBy ) } else { if (client.getHierarchy().isDerived(attribute.attr.type._class, core.class.ArrOf)) { const oldValue: any[] = (object as any)[attributeKey] ?? [] - const val: any[] = value + const val: any[] = Array.isArray(value) ? value : [value] const toPull = oldValue.filter((it: any) => !val.includes(it)) const toPush = val.filter((it) => !oldValue.includes(it)) if (toPull.length > 0) { - await client.update(object, { $pull: { [attributeKey]: { $in: toPull } } }, false, Date.now(), modifyBy) + await client.update(object, { $pull: { [attributeKey]: { $in: toPull } } }, false, modifiedOn, modifiedBy) } if (toPush.length > 0) { await client.update( object, { $push: { [attributeKey]: { $each: toPush, $position: 0 } } }, false, - Date.now(), - modifyBy + modifiedOn, + modifiedBy ) } } else { - await client.update(object, { [attributeKey]: value }, false, Date.now(), modifyBy) + await client.update(object, { [attributeKey]: value }, false, modifiedOn, modifiedBy) } } } diff --git a/packages/text/src/ydoc.ts b/packages/text/src/ydoc.ts index 5ae1384f91..a266224732 100644 --- a/packages/text/src/ydoc.ts +++ b/packages/text/src/ydoc.ts @@ -15,8 +15,8 @@ import { Extensions, getSchema } from '@tiptap/core' import { Node, Schema } from 'prosemirror-model' -import { yDocToProsemirrorJSON } from 'y-prosemirror' -import { Doc, applyUpdate } from 'yjs' +import { prosemirrorJSONToYDoc, yDocToProsemirrorJSON } from 'y-prosemirror' +import { Doc, applyUpdate, encodeStateAsUpdate } from 'yjs' import { defaultExtensions } from './extensions' /** @@ -81,3 +81,36 @@ export function yDocContentToNodes (content: ArrayBuffer, schema?: Schema, exten return nodes } + +/** + * Update Y.Doc content + * + * @public + */ +export function updateYDocContent ( + content: ArrayBuffer, + updateFn: (body: Record) => Record, + schema?: Schema, + extensions?: Extensions +): Doc | undefined { + schema ??= getSchema(extensions ?? defaultExtensions) + + try { + const ydoc = new Doc() + const res = new Doc({ gc: false }) + const uint8arr = new Uint8Array(content) + applyUpdate(ydoc, uint8arr) + + for (const field of ydoc.share.keys()) { + const body = yDocToProsemirrorJSON(ydoc, field) + const updated = updateFn(body) + const yDoc = prosemirrorJSONToYDoc(schema, updated, field) + const update = encodeStateAsUpdate(yDoc) + applyUpdate(res, update) + } + + return res + } catch (err: any) { + console.error(err) + } +}