remove duplicate ids tool (#6052)

Signed-off-by: Denis Bykhov <bykhov.denis@gmail.com>
This commit is contained in:
Denis Bykhov 2024-07-12 12:04:29 +05:00 committed by GitHub
parent bda3404c50
commit f9962fe086
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 407 additions and 16 deletions

View File

@ -55,6 +55,7 @@
"@elastic/elasticsearch": "^7.14.0",
"@hcengineering/account": "^0.6.0",
"@hcengineering/attachment": "^0.6.14",
"@hcengineering/calendar": "^0.6.24",
"@hcengineering/chunter": "^0.6.20",
"@hcengineering/client": "^0.6.18",
"@hcengineering/activity": "^0.6.0",
@ -137,6 +138,7 @@
"@hcengineering/text": "^0.6.5",
"@hcengineering/telegram": "^0.6.21",
"@hcengineering/tracker": "^0.6.24",
"@hcengineering/collaboration": "^0.6.0",
"commander": "^8.1.0",
"csv-parse": "~5.1.0",
"email-addresses": "^5.0.0",

View File

@ -13,19 +13,33 @@
// limitations under the License.
//
import { ACCOUNT_DB, listWorkspacesRaw } from '@hcengineering/account'
import attachment from '@hcengineering/attachment'
import calendar from '@hcengineering/calendar'
import chunter, { type ChatMessage } from '@hcengineering/chunter'
import { loadCollaborativeDoc, saveCollaborativeDoc, yDocToBuffer } from '@hcengineering/collaboration'
import contact from '@hcengineering/contact'
import core, {
type ArrOf,
ClassifierKind,
type CollaborativeDoc,
DOMAIN_BENCHMARK,
DOMAIN_DOC_INDEX_STATE,
DOMAIN_MODEL,
DOMAIN_STATUS,
DOMAIN_TX,
type DocumentUpdate,
type Hierarchy,
type Markup,
type RefTo,
SortingOrder,
TxOperations,
TxProcessor,
generateId,
getObjectValue,
getWorkspaceId,
toIdMap,
updateAttribute,
type BackupClient,
type Class,
type Client as CoreClient,
@ -39,20 +53,24 @@ import core, {
type TxCreateDoc,
type TxMixin,
type TxUpdateDoc,
type WorkspaceId
type WorkspaceId,
DOMAIN_MIGRATION,
type MigrationState,
type RelatedDocument
} from '@hcengineering/core'
import { DOMAIN_ACTIVITY } from '@hcengineering/model-activity'
import activity, { DOMAIN_ACTIVITY } from '@hcengineering/model-activity'
import { DOMAIN_SPACE } from '@hcengineering/model-core'
import recruitModel, { defaultApplicantStatuses } from '@hcengineering/model-recruit'
import { getWorkspaceDB } from '@hcengineering/mongo'
import { getMongoClient, getWorkspaceDB } from '@hcengineering/mongo'
import recruit, { type Applicant, type Vacancy } from '@hcengineering/recruit'
import { type StorageAdapter } from '@hcengineering/server-core'
import { connect } from '@hcengineering/server-tool'
import tags, { type TagCategory, type TagElement, type TagReference } from '@hcengineering/tags'
import task, { type ProjectType, type Task, type TaskType } from '@hcengineering/task'
import { updateYDocContent } from '@hcengineering/text'
import tracker from '@hcengineering/tracker'
import { deepEqual } from 'fast-equals'
import { MongoClient } from 'mongodb'
import { type Db, MongoClient } from 'mongodb'
export async function cleanWorkspace (
ctx: MeasureContext,
@ -1030,3 +1048,325 @@ export async function restoreHrTaskTypesFromUpdates (
await connection.close()
}
}
export async function removeDuplicateIds (
ctx: MeasureContext,
mongodbUri: string,
storageAdapter: StorageAdapter,
productId: string,
transactorUrl: string,
initWorkspacesStr: string
): Promise<void> {
const state = 'REMOVE_DUPLICATE_IDS'
const mongoClient = getMongoClient(mongodbUri)
const _client = await mongoClient.getClient()
// disable spaces while change hardocded ids
const skippedDomains: string[] = [DOMAIN_DOC_INDEX_STATE, DOMAIN_BENCHMARK, DOMAIN_TX, DOMAIN_SPACE]
try {
const workspaces = await listWorkspacesRaw(_client.db(ACCOUNT_DB), productId)
workspaces.sort((a, b) => b.lastVisit - a.lastVisit)
const initWorkspaces = initWorkspacesStr.split(';')
const initWS = workspaces.filter((p) => initWorkspaces.includes(p.workspace))
const ids = new Map<string, RelatedDocument[]>()
for (const workspace of initWS) {
const workspaceId = getWorkspaceId(workspace.workspace, productId)
const db = getWorkspaceDB(_client, workspaceId)
const txex = await db.collection(DOMAIN_TX).find<TxCUD<Doc>>({}).toArray()
const txesArr = []
for (const obj of txex) {
if (obj.objectSpace === core.space.Model && !isPersonAccount(obj)) {
continue
}
txesArr.push({ _id: obj._id, _class: obj._class })
}
txesArr.filter((it, idx, array) => array.findIndex((pt) => pt._id === it._id) === idx)
ids.set(DOMAIN_TX, txesArr)
const colls = await db.collections()
for (const coll of colls) {
if (skippedDomains.includes(coll.collectionName)) continue
const arr = ids.get(coll.collectionName) ?? []
const data = await coll.find<RelatedDocument>({}, { projection: { _id: 1, _class: 1 } }).toArray()
for (const obj of data) {
arr.push(obj)
}
ids.set(coll.collectionName, arr)
}
const arr = ids.get(DOMAIN_MODEL) ?? []
const data = await db
.collection(DOMAIN_TX)
.find<TxCUD<Doc>>(
{ objectSpace: core.space.Model },
{ projection: { objectId: 1, objectClass: 1, modifiedBy: 1 } }
)
.toArray()
for (const obj of data) {
if (
(obj.modifiedBy === core.account.ConfigUser || obj.modifiedBy === core.account.System) &&
!isPersonAccount(obj)
) {
continue
}
if (obj.objectId === core.account.ConfigUser || obj.objectId === core.account.System) continue
arr.push({ _id: obj.objectId, _class: obj.objectClass })
}
arr.filter((it, idx, array) => array.findIndex((pt) => pt._id === it._id) === idx)
ids.set(DOMAIN_MODEL, arr)
}
for (let index = 0; index < workspaces.length; index++) {
const workspace = workspaces[index]
// we should skip init workspace first time, for case if something went wrong
if (initWorkspaces.includes(workspace.workspace)) continue
ctx.info(`Processing workspace ${workspace.workspaceName ?? workspace.workspace}`)
const workspaceId = getWorkspaceId(workspace.workspace, productId)
const db = getWorkspaceDB(_client, workspaceId)
const check = await db.collection(DOMAIN_MIGRATION).findOne({ state, plugin: workspace.workspace })
if (check != null) continue
const wsClient = (await connect(transactorUrl, workspaceId, undefined, {
model: 'upgrade'
})) as CoreClient & BackupClient
for (const set of ids) {
if (set[1].length === 0) continue
for (const doc of set[1]) {
await updateId(ctx, wsClient, db, storageAdapter, workspaceId, doc)
}
}
await wsClient.sendForceClose()
await wsClient.close()
await db.collection<MigrationState>(DOMAIN_MIGRATION).insertOne({
_id: generateId(),
state,
plugin: workspace.workspace,
space: core.space.Configuration,
modifiedOn: Date.now(),
modifiedBy: core.account.System,
_class: core.class.MigrationState
})
ctx.info(`Done ${index} / ${workspaces.length - initWorkspaces.length}`)
}
} catch (err: any) {
console.trace(err)
} finally {
mongoClient.close()
}
}
function isPersonAccount (tx: TxCUD<Doc>): boolean {
return tx.objectClass === contact.class.PersonAccount
}
async function update<T extends Doc> (h: Hierarchy, db: Db, doc: T, update: DocumentUpdate<T>): Promise<void> {
await db.collection(h.getDomain(doc._class)).updateOne({ _id: doc._id }, { $set: { ...update, '%hash%': null } })
}
async function updateId (
ctx: MeasureContext,
client: CoreClient & BackupClient,
db: Db,
storage: StorageAdapter,
workspaceId: WorkspaceId,
docRef: RelatedDocument
): Promise<void> {
const h = client.getHierarchy()
const txop = new TxOperations(client, core.account.System)
try {
// chech the doc exists
const doc = await client.findOne(docRef._class, { _id: docRef._id })
if (doc === undefined) return
const domain = h.getDomain(doc._class)
const newId = generateId()
// update txes
await db.collection(DOMAIN_TX).updateMany({ objectId: doc._id }, { $set: { objectId: newId, '%hash%': null } })
// update nested txes
await db
.collection(DOMAIN_TX)
.updateMany({ 'tx.objectId': doc._id }, { $set: { 'tx.objectId': newId, '%hash%': null } })
// we have generated ids for calendar, let's update in
if (h.isDerived(doc._class, core.class.Account)) {
await updateId(ctx, client, db, storage, workspaceId, {
_id: `${doc._id}_calendar` as Ref<Doc>,
_class: calendar.class.Calendar
})
}
// update backlinks
const backlinks = await client.findAll(activity.class.ActivityReference, { attachedTo: doc._id })
for (const backlink of backlinks) {
const contentDoc = await client.findOne(backlink.attachedDocClass ?? backlink.srcDocClass, {
_id: backlink.attachedDocId ?? backlink.srcDocClass
})
if (contentDoc !== undefined) {
const attrs = h.getAllAttributes(contentDoc._class)
for (const [attrName, attr] of attrs) {
if (attr.type._class === core.class.TypeMarkup) {
const markup = (contentDoc as any)[attrName] as Markup
const newMarkup = markup.replaceAll(doc._id, newId)
await update(h, db, contentDoc, { [attrName]: newMarkup })
} else if (attr.type._class === core.class.TypeCollaborativeMarkup) {
const markup = (contentDoc as any)[attrName]
const newMarkup = markup.replaceAll(doc._id, newId)
await update(h, db, contentDoc, { [attrName]: newMarkup })
} else if (attr.type._class === core.class.TypeCollaborativeDoc) {
const collaborativeDoc = (contentDoc as any)[attr.name] as CollaborativeDoc
await updateYDoc(ctx, collaborativeDoc, storage, workspaceId, contentDoc, newId, doc)
}
}
}
await update(h, db, backlink, { attachedTo: newId, message: backlink.message.replaceAll(doc._id, newId) })
}
// blobs
await updateRefs(txop, newId, doc)
await updateArrRefs(txop, newId, doc)
// update docIndexState
const docIndexState = await client.findOne(core.class.DocIndexState, { doc: doc._id })
if (docIndexState !== undefined) {
const { _id, space, modifiedBy, modifiedOn, createdBy, createdOn, _class, ...data } = docIndexState
await txop.createDoc(docIndexState._class, docIndexState.space, {
...data,
stages: {},
removed: false
})
await txop.update(docIndexState, { removed: true })
}
if (domain !== DOMAIN_MODEL) {
const raw = await db.collection(domain).findOne({ _id: doc._id })
await db.collection(domain).insertOne({
...raw,
_id: newId as any,
'%hash%': null
})
await db.collection(domain).deleteOne({ _id: doc._id })
}
} catch (err: any) {
console.error('Error processing', docRef._id)
}
}
async function updateYDoc (
ctx: MeasureContext,
_id: CollaborativeDoc,
storage: StorageAdapter,
workspaceId: WorkspaceId,
contentDoc: Doc,
newId: Ref<Doc>,
doc: RelatedDocument
): Promise<void> {
try {
const ydoc = await loadCollaborativeDoc(storage, workspaceId, _id, ctx)
if (ydoc === undefined) {
ctx.error('document content not found', { document: contentDoc._id })
return
}
const buffer = yDocToBuffer(ydoc)
const updatedYDoc = updateYDocContent(buffer, (body: Record<string, any>) => {
const str = JSON.stringify(body)
const updated = str.replaceAll(doc._id, newId)
return JSON.parse(updated)
})
if (updatedYDoc !== undefined) {
await saveCollaborativeDoc(storage, workspaceId, _id, updatedYDoc, ctx)
}
} catch {
// do nothing, the collaborative doc does not sem to exist yet
}
}
async function updateRefs (client: TxOperations, newId: Ref<Doc>, doc: RelatedDocument): Promise<void> {
const h = client.getHierarchy()
const ancestors = h.getAncestors(doc._class)
const reftos = (await client.findAll(core.class.Attribute, { 'type._class': core.class.RefTo })).filter((it) => {
const to = it.type as RefTo<Doc>
return ancestors.includes(h.getBaseClass(to.to))
})
for (const attr of reftos) {
if (attr.name === '_id') {
continue
}
const descendants = h.getDescendants(attr.attributeOf)
for (const d of descendants) {
if (h.isDerived(d, core.class.BenchmarkDoc)) {
continue
}
if (h.isDerived(d, core.class.Tx)) {
continue
}
if (h.findDomain(d) !== undefined) {
while (true) {
const values = await client.findAll(d, { [attr.name]: doc._id }, { limit: 100 })
if (values.length === 0) {
break
}
const builder = client.apply(doc._id)
for (const v of values) {
await updateAttribute(builder, v, d, { key: attr.name, attr }, newId, true)
}
const modelTxes = builder.txes.filter((p) => p.objectSpace === core.space.Model)
builder.txes = builder.txes.filter((p) => p.objectSpace !== core.space.Model)
for (const modelTx of modelTxes) {
await client.tx(modelTx)
}
await builder.commit()
}
}
}
}
}
async function updateArrRefs (client: TxOperations, newId: Ref<Doc>, doc: RelatedDocument): Promise<void> {
const h = client.getHierarchy()
const ancestors = h.getAncestors(doc._class)
const arrs = await client.findAll(core.class.Attribute, { 'type._class': core.class.ArrOf })
for (const attr of arrs) {
if (attr.name === '_id') {
continue
}
const to = attr.type as ArrOf<Doc>
if (to.of._class !== core.class.RefTo) continue
const refto = to.of as RefTo<Doc>
if (ancestors.includes(h.getBaseClass(refto.to))) {
const descendants = h.getDescendants(attr.attributeOf)
for (const d of descendants) {
if (h.isDerived(d, core.class.BenchmarkDoc)) {
continue
}
if (h.isDerived(d, core.class.Tx)) {
continue
}
if (h.findDomain(d) !== undefined) {
while (true) {
const values = await client.findAll(attr.attributeOf, { [attr.name]: doc._id }, { limit: 100 })
if (values.length === 0) {
break
}
const builder = client.apply(doc._id)
for (const v of values) {
await updateAttribute(builder, v, d, { key: attr.name, attr }, newId, true)
}
const modelTxes = builder.txes.filter((p) => p.objectSpace === core.space.Model)
builder.txes = builder.txes.filter((p) => p.objectSpace !== core.space.Model)
for (const modelTx of modelTxes) {
await client.tx(modelTx)
}
await builder.commit()
}
}
}
}
}
}

View File

@ -82,6 +82,7 @@ import {
fixMinioBW,
fixSkills,
optimizeModel,
removeDuplicateIds,
restoreHrTaskTypesFromUpdates,
restoreRecruitingTaskTypes
} from './clean'
@ -1145,6 +1146,16 @@ export function devTool (
})
})
program
.command('remove-duplicates-ids <workspaces>')
.description('remove duplicates ids for futue migration')
.action(async (workspaces: string) => {
const { mongodbUri } = prepareTools()
await withStorage(mongodbUri, async (adapter) => {
await removeDuplicateIds(toolCtx, mongodbUri, adapter, productId, transactorUrl, workspaces)
})
})
extendProgram?.(program)
program.parse(process.argv)

View File

@ -208,7 +208,8 @@ export class TEmployee extends TPerson implements Employee {
@Model(contact.class.PersonAccount, core.class.Account)
export class TPersonAccount extends TAccount implements PersonAccount {
person!: Ref<Person>
@Prop(TypeRef(contact.class.Person), contact.string.Person)
person!: Ref<Person>
}
@Model(contact.class.ContactsTab, core.class.Doc, DOMAIN_MODEL)

View File

@ -103,7 +103,9 @@ export class TToDO extends TAttachedDoc implements ToDo {
priority!: ToDoPriority
visibility!: Visibility
attachedSpace?: Ref<Space> | undefined
@Prop(TypeRef(core.class.Space), core.string.Space)
attachedSpace?: Ref<Space> | undefined
@Prop(TypeString(), calendarPlugin.string.Title)
title!: string

View File

@ -537,11 +537,13 @@ export async function updateAttribute (
_class: Ref<Class<Doc>>,
attribute: { key: string, attr: AnyAttribute },
value: any,
modifyBy?: Ref<Account>
saveModified: boolean = false
): Promise<void> {
const doc = object
const attributeKey = attribute.key
if ((doc as any)[attributeKey] === value) return
const modifiedOn = saveModified ? doc.modifiedOn : Date.now()
const modifiedBy = attribute.key === 'modifiedBy' ? value : saveModified ? doc.modifiedBy : undefined
const attr = attribute.attr
if (client.getHierarchy().isMixin(attr.attributeOf)) {
await client.updateMixin(
@ -550,30 +552,30 @@ export async function updateAttribute (
doc.space,
attr.attributeOf,
{ [attributeKey]: value },
Date.now(),
modifyBy
modifiedOn,
modifiedBy
)
} else {
if (client.getHierarchy().isDerived(attribute.attr.type._class, core.class.ArrOf)) {
const oldValue: any[] = (object as any)[attributeKey] ?? []
const val: any[] = value
const val: any[] = Array.isArray(value) ? value : [value]
const toPull = oldValue.filter((it: any) => !val.includes(it))
const toPush = val.filter((it) => !oldValue.includes(it))
if (toPull.length > 0) {
await client.update(object, { $pull: { [attributeKey]: { $in: toPull } } }, false, Date.now(), modifyBy)
await client.update(object, { $pull: { [attributeKey]: { $in: toPull } } }, false, modifiedOn, modifiedBy)
}
if (toPush.length > 0) {
await client.update(
object,
{ $push: { [attributeKey]: { $each: toPush, $position: 0 } } },
false,
Date.now(),
modifyBy
modifiedOn,
modifiedBy
)
}
} else {
await client.update(object, { [attributeKey]: value }, false, Date.now(), modifyBy)
await client.update(object, { [attributeKey]: value }, false, modifiedOn, modifiedBy)
}
}
}

View File

@ -15,8 +15,8 @@
import { Extensions, getSchema } from '@tiptap/core'
import { Node, Schema } from 'prosemirror-model'
import { yDocToProsemirrorJSON } from 'y-prosemirror'
import { Doc, applyUpdate } from 'yjs'
import { prosemirrorJSONToYDoc, yDocToProsemirrorJSON } from 'y-prosemirror'
import { Doc, applyUpdate, encodeStateAsUpdate } from 'yjs'
import { defaultExtensions } from './extensions'
/**
@ -81,3 +81,36 @@ export function yDocContentToNodes (content: ArrayBuffer, schema?: Schema, exten
return nodes
}
/**
* Update Y.Doc content
*
* @public
*/
export function updateYDocContent (
content: ArrayBuffer,
updateFn: (body: Record<string, any>) => Record<string, any>,
schema?: Schema,
extensions?: Extensions
): Doc | undefined {
schema ??= getSchema(extensions ?? defaultExtensions)
try {
const ydoc = new Doc()
const res = new Doc({ gc: false })
const uint8arr = new Uint8Array(content)
applyUpdate(ydoc, uint8arr)
for (const field of ydoc.share.keys()) {
const body = yDocToProsemirrorJSON(ydoc, field)
const updated = updateFn(body)
const yDoc = prosemirrorJSONToYDoc(schema, updated, field)
const update = encodeStateAsUpdate(yDoc)
applyUpdate(res, update)
}
return res
} catch (err: any) {
console.error(err)
}
}