mirror of
https://github.com/hcengineering/platform.git
synced 2024-11-22 11:42:30 +03:00
TSK-1201: Fix bitrix migration and too to clean removed transactions (#2995)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
parent
26b2a634fa
commit
359c1a81bc
@ -125,3 +125,40 @@ export async function cleanWorkspace (
|
||||
await connection.close()
|
||||
}
|
||||
}
|
||||
|
||||
export async function cleanRemovedTransactions (workspaceId: WorkspaceId, transactorUrl: string): Promise<void> {
|
||||
const connection = (await connect(transactorUrl, workspaceId, undefined, {
|
||||
mode: 'backup'
|
||||
})) as unknown as CoreClient & BackupClient
|
||||
try {
|
||||
let count = 0
|
||||
while (true) {
|
||||
const removedDocs = await connection.findAll(
|
||||
core.class.TxCollectionCUD,
|
||||
{ 'tx._class': core.class.TxRemoveDoc },
|
||||
{ limit: 1000 }
|
||||
)
|
||||
if (removedDocs.length === 0) {
|
||||
break
|
||||
}
|
||||
|
||||
const toRemove = await connection.findAll(core.class.TxCollectionCUD, {
|
||||
'tx._class': { $in: [core.class.TxCreateDoc, core.class.TxRemoveDoc, core.class.TxUpdateDoc] },
|
||||
'tx.objectId': { $in: removedDocs.map((it) => it.tx.objectId) }
|
||||
})
|
||||
await connection.clean(
|
||||
DOMAIN_TX,
|
||||
toRemove.map((it) => it._id)
|
||||
)
|
||||
|
||||
count += toRemove.length
|
||||
console.log('processed', count, removedDocs.total)
|
||||
}
|
||||
|
||||
console.log('total docs with remove', count)
|
||||
} catch (err: any) {
|
||||
console.trace(err)
|
||||
} finally {
|
||||
await connection.close()
|
||||
}
|
||||
}
|
||||
|
@ -51,7 +51,7 @@ import { MigrateOperation } from '@hcengineering/model'
|
||||
import { openAIConfigDefaults } from '@hcengineering/openai'
|
||||
import { rebuildElastic } from './elastic'
|
||||
import { openAIConfig } from './openai'
|
||||
import { cleanWorkspace } from './clean'
|
||||
import { cleanRemovedTransactions, cleanWorkspace } from './clean'
|
||||
|
||||
/**
|
||||
* @public
|
||||
@ -456,5 +456,12 @@ export function devTool (
|
||||
})
|
||||
})
|
||||
|
||||
program
|
||||
.command('clean-removed-transactions <workspace>')
|
||||
.description('set user role')
|
||||
.action(async (workspace: string, cmd: any) => {
|
||||
await cleanRemovedTransactions(getWorkspaceId(workspace, productId), transactorUrl)
|
||||
})
|
||||
|
||||
program.parse(process.argv)
|
||||
}
|
||||
|
@ -101,10 +101,11 @@ export async function syncDocument (
|
||||
resultDoc: ConvertResult,
|
||||
info: LoginInfo,
|
||||
frontUrl: string,
|
||||
syncAttachments: boolean,
|
||||
ops: SyncOptions,
|
||||
extraDocs: Map<Ref<Class<Doc>>, Doc[]>,
|
||||
monitor?: (doc: ConvertResult) => void
|
||||
): Promise<void> {
|
||||
const st = Date.now()
|
||||
const hierarchy = client.getHierarchy()
|
||||
|
||||
try {
|
||||
@ -117,7 +118,7 @@ export async function syncDocument (
|
||||
|
||||
// Operations could add more change instructions
|
||||
for (const op of resultDoc.postOperations) {
|
||||
await op(resultDoc, extraDocs, existing)
|
||||
await op(resultDoc, extraDocs, ops, existing)
|
||||
}
|
||||
|
||||
// const newDoc = existing === undefined
|
||||
@ -154,7 +155,7 @@ export async function syncDocument (
|
||||
await syncClass(applyOp, cl, vals, idMapping, resultDoc.document._id)
|
||||
}
|
||||
|
||||
if (syncAttachments) {
|
||||
if (ops.syncAttachments ?? true) {
|
||||
// Sync gmail documents
|
||||
const emailAccount = resultDoc.extraSync.find(
|
||||
(it) =>
|
||||
@ -197,7 +198,11 @@ export async function syncDocument (
|
||||
|
||||
let updated = false
|
||||
for (const existingObj of existingBlobs) {
|
||||
if (existingObj.name === ed.name && existingObj.size === ed.size && existingObj.type === ed.type) {
|
||||
if (
|
||||
existingObj.name === ed.name &&
|
||||
existingObj.size === ed.size &&
|
||||
(existingObj.type ?? null) === (ed.type ?? null)
|
||||
) {
|
||||
if (!updated) {
|
||||
await updateAttachedDoc(existingObj, applyOp, ed)
|
||||
updated = true
|
||||
@ -232,7 +237,10 @@ export async function syncDocument (
|
||||
}
|
||||
}
|
||||
}
|
||||
console.log('Syncronized before commit', resultDoc.document._class, resultDoc.document.bitrixId, Date.now() - st)
|
||||
await applyOp.commit()
|
||||
const ed = Date.now()
|
||||
console.log('Syncronized', resultDoc.document._class, resultDoc.document.bitrixId, ed - st)
|
||||
} catch (err: any) {
|
||||
console.error(err)
|
||||
}
|
||||
@ -260,7 +268,7 @@ export async function syncDocument (
|
||||
}
|
||||
const existingIdx = existingByClass.findIndex((it) => {
|
||||
const bdoc = hierarchy.as<Doc, BitrixSyncDoc>(it, bitrix.mixin.BitrixSyncDoc)
|
||||
return bdoc.bitrixId === valValue.bitrixId && bdoc.type === valValue.type
|
||||
return bdoc.bitrixId === valValue.bitrixId && (bdoc.type ?? null) === (valValue.type ?? null)
|
||||
})
|
||||
let existing: Doc | undefined
|
||||
if (existingIdx >= 0) {
|
||||
@ -443,6 +451,7 @@ export interface SyncOptions {
|
||||
space: Ref<Space> | undefined
|
||||
mapping: WithLookup<BitrixEntityMapping>
|
||||
limit: number
|
||||
skip?: number
|
||||
direction: 'ASC' | 'DSC'
|
||||
frontUrl: string
|
||||
loginInfo: LoginInfo
|
||||
@ -455,6 +464,7 @@ export interface SyncOptions {
|
||||
syncComments?: boolean
|
||||
syncEmails?: boolean
|
||||
syncAttachments?: boolean
|
||||
syncVacancy?: boolean
|
||||
}
|
||||
interface SyncOptionsExtra {
|
||||
ownerTypeValues: BitrixOwnerType[]
|
||||
@ -512,7 +522,7 @@ async function doPerformSync (ops: SyncOptions & SyncOptionsExtra): Promise<Bitr
|
||||
if (ops.space === undefined || ops.mapping.$lookup?.fields === undefined) {
|
||||
return []
|
||||
}
|
||||
let processed = 0
|
||||
let processed = ops.skip ?? 0
|
||||
|
||||
let added = 0
|
||||
|
||||
@ -594,7 +604,7 @@ async function doPerformSync (ops: SyncOptions & SyncOptionsExtra): Promise<Bitr
|
||||
|
||||
if (res.syncRequests.length > 0) {
|
||||
for (const r of res.syncRequests) {
|
||||
const m = ops.allMappings.find((it) => it.type === r.type)
|
||||
const m = ops.allMappings.find((it) => (it.type ?? null) === (r.type ?? null))
|
||||
if (m !== undefined) {
|
||||
const [d] = await doPerformSync({
|
||||
...ops,
|
||||
@ -617,7 +627,7 @@ async function doPerformSync (ops: SyncOptions & SyncOptionsExtra): Promise<Bitr
|
||||
res,
|
||||
ops.loginInfo,
|
||||
ops.frontUrl,
|
||||
ops.mapping.attachments && (ops.syncAttachments ?? true),
|
||||
{ ...ops, syncAttachments: ops.mapping.attachments && (ops.syncAttachments ?? true) },
|
||||
extraDocs,
|
||||
() => {
|
||||
ops.monitor?.(total)
|
||||
|
@ -32,7 +32,8 @@ import bitrix, {
|
||||
CreateTagOperation,
|
||||
DownloadAttachmentOperation,
|
||||
FindReferenceOperation,
|
||||
MappingOperation
|
||||
MappingOperation,
|
||||
SyncOptions
|
||||
} from '.'
|
||||
import { createApplication, createVacancy } from './hr'
|
||||
|
||||
@ -74,6 +75,7 @@ export interface BitrixSyncRequest {
|
||||
export type PostOperation = (
|
||||
doc: ConvertResult,
|
||||
extraDocs: Map<Ref<Class<Doc>>, Doc[]>,
|
||||
ops: SyncOptions,
|
||||
existing?: Doc
|
||||
) => Promise<void>
|
||||
|
||||
@ -406,8 +408,11 @@ export async function convert (
|
||||
const getCreateAttachedValue = async (attr: AnyAttribute, operation: CreateHRApplication): Promise<void> => {
|
||||
const vacancyName = extractValue(operation.vacancyField)
|
||||
const sourceStatusName = extractValue(operation.stateField)
|
||||
postOperations.push(async (doc, extraDocs, existingDoc) => {
|
||||
postOperations.push(async (doc, extraDocs, ops, existingDoc) => {
|
||||
let vacancyId: Ref<Vacancy> | undefined
|
||||
if (ops.syncVacancy === false) {
|
||||
return
|
||||
}
|
||||
|
||||
const vacancies = (extraDocs.get(recruit.class.Vacancy) ?? []) as Vacancy[]
|
||||
const applications = (extraDocs.get(recruit.class.Applicant) ?? []) as Applicant[]
|
||||
|
@ -273,6 +273,7 @@ export async function backup (transactorUrl: string, workspaceId: WorkspaceId, s
|
||||
const needRetrieveChunks: Ref<Doc>[][] = []
|
||||
|
||||
let processed = 0
|
||||
let st = Date.now()
|
||||
// Load all digest from collection.
|
||||
while (true) {
|
||||
try {
|
||||
@ -284,7 +285,8 @@ export async function backup (transactorUrl: string, workspaceId: WorkspaceId, s
|
||||
for (const [k, v] of Object.entries(it.docs)) {
|
||||
processed++
|
||||
if (processed % 10000 === 0) {
|
||||
console.log('processed', processed, digest.size)
|
||||
console.log('processed', processed, digest.size, Date.now() - st)
|
||||
st = Date.now()
|
||||
}
|
||||
const kHash = digest.get(k as Ref<Doc>)
|
||||
if (kHash !== undefined) {
|
||||
@ -314,6 +316,7 @@ export async function backup (transactorUrl: string, workspaceId: WorkspaceId, s
|
||||
}
|
||||
// Try again
|
||||
idx = undefined
|
||||
processed = 0
|
||||
}
|
||||
}
|
||||
while (needRetrieveChunks.length > 0) {
|
||||
|
@ -69,9 +69,9 @@ class ElasticDataAdapter implements DbAdapter {
|
||||
const q = {
|
||||
index: toWorkspaceString(this.workspaceId),
|
||||
type: '_doc',
|
||||
scroll: '1s',
|
||||
scroll: '23h',
|
||||
// search_type: 'scan', //if I use search_type then it requires size otherwise it shows 0 result
|
||||
size: 500,
|
||||
size: 2500,
|
||||
body: {
|
||||
query: {
|
||||
match_all: {}
|
||||
@ -92,9 +92,9 @@ class ElasticDataAdapter implements DbAdapter {
|
||||
if (pos === buffer.length && !finished) {
|
||||
const params = {
|
||||
scrollId: resp.body._scroll_id as string,
|
||||
scroll: '1s'
|
||||
scroll: '23h'
|
||||
}
|
||||
resp = await this.client.scroll(params)
|
||||
resp = await this.client.scroll(params, { maxRetries: 5 })
|
||||
if (resp.statusCode !== 200) {
|
||||
console.error('failed elastic query scroll', params, resp)
|
||||
throw new PlatformError(unknownStatus(`failed to elastic query ${JSON.stringify(resp)}`))
|
||||
|
@ -542,7 +542,7 @@ abstract class MongoAdapterBase implements DbAdapter {
|
||||
|
||||
find (domain: Domain): StorageIterator {
|
||||
const coll = this.db.collection<Doc>(domain)
|
||||
const iterator = coll.find({}, { sort: { _id: 1 } }).batchSize(100)
|
||||
const iterator = coll.find({}, { sort: { _id: -1 } })
|
||||
|
||||
return {
|
||||
next: async () => {
|
||||
|
Loading…
Reference in New Issue
Block a user