UBERF-4287: Fix Indexer peak memory usage (#3993)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2023-11-15 11:48:49 +07:00 committed by GitHub
parent 64ae21db07
commit 5d8c1a6b76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -559,91 +559,93 @@ export class FullTextIndexPipeline implements FullTextPipeline {
}
async checkIndexConsistency (dbStorage: ServerStorage): Promise<void> {
if (process.env.MODEL_VERSION !== undefined) {
const modelVersion = (await this.model.findAll(core.class.Version, {}))[0]
if (modelVersion !== undefined) {
const modelVersionString = versionToString(modelVersion)
if (modelVersionString !== process.env.MODEL_VERSION) {
console.error(
`Indexer: Model version mismatch model: ${modelVersionString} env: ${process.env.MODEL_VERSION}`
)
return
await rateLimitter.exec(async () => {
if (process.env.MODEL_VERSION !== undefined) {
const modelVersion = (await this.model.findAll(core.class.Version, {}))[0]
if (modelVersion !== undefined) {
const modelVersionString = versionToString(modelVersion)
if (modelVersionString !== process.env.MODEL_VERSION) {
console.error(
`Indexer: Model version mismatch model: ${modelVersionString} env: ${process.env.MODEL_VERSION}`
)
return
}
}
}
}
this.hierarchy.domains()
const allClasses = this.hierarchy.getDescendants(core.class.Doc)
for (const c of allClasses) {
if (this.cancelling) {
return
}
if (!isClassIndexable(this.hierarchy, c)) {
// No need, since no indexable fields or attachments.
continue
}
console.log(this.workspace.name, 'checking index', c)
// All saved state documents
const states = (
await this.storage.findAll(core.class.DocIndexState, { objectClass: c }, { projection: { _id: 1 } })
).map((it) => it._id)
while (true) {
this.hierarchy.domains()
const allClasses = this.hierarchy.getDescendants(core.class.Doc)
for (const c of allClasses) {
if (this.cancelling) {
return
}
const newDocs: DocIndexState[] = (
await dbStorage.findAll<Doc>(
this.metrics,
c,
{ _class: c, _id: { $nin: states } },
{ limit: 1000, projection: { _id: 1, attachedTo: 1, attachedToClass: 1 } as any }
)
).map((it) => {
return createStateDoc(it._id, c, {
stages: {},
attributes: {},
removed: false,
space: it.space,
attachedTo: (it as AttachedDoc)?.attachedTo ?? undefined,
attachedToClass: (it as AttachedDoc)?.attachedToClass ?? undefined
if (!isClassIndexable(this.hierarchy, c)) {
// No need, since no indexable fields or attachments.
continue
}
console.log(this.workspace.name, 'checking index', c)
// All saved state documents
const states = (
await this.storage.findAll(core.class.DocIndexState, { objectClass: c }, { projection: { _id: 1 } })
).map((it) => it._id)
while (true) {
if (this.cancelling) {
return
}
const newDocs: DocIndexState[] = (
await dbStorage.findAll<Doc>(
this.metrics,
c,
{ _class: c, _id: { $nin: states } },
{ limit: 1000, projection: { _id: 1, attachedTo: 1, attachedToClass: 1 } as any }
)
).map((it) => {
return createStateDoc(it._id, c, {
stages: {},
attributes: {},
removed: false,
space: it.space,
attachedTo: (it as AttachedDoc)?.attachedTo ?? undefined,
attachedToClass: (it as AttachedDoc)?.attachedToClass ?? undefined
})
})
})
states.push(...newDocs.map((it) => it._id))
states.push(...newDocs.map((it) => it._id))
if (newDocs.length === 0) {
// All updated for this class
break
}
try {
await this.storage.upload(DOMAIN_DOC_INDEX_STATE, newDocs)
} catch (err: any) {
console.error(err)
if (newDocs.length === 0) {
// All updated for this class
break
}
try {
await this.storage.upload(DOMAIN_DOC_INDEX_STATE, newDocs)
} catch (err: any) {
console.error(err)
}
}
const statesSet = new Set(states)
const docIds = (await dbStorage.findAll<Doc>(this.metrics, c, { _class: c }, { projection: { _id: 1 } }))
.filter((it) => !statesSet.has(it._id as Ref<DocIndexState>))
.map((it) => it._id)
await this.storage.clean(DOMAIN_DOC_INDEX_STATE, docIds)
}
const statesSet = new Set(states)
const docIds = (await dbStorage.findAll<Doc>(this.metrics, c, { _class: c }, { projection: { _id: 1 } }))
.filter((it) => !statesSet.has(it._id as Ref<DocIndexState>))
.map((it) => it._id)
await this.storage.clean(DOMAIN_DOC_INDEX_STATE, docIds)
}
// Clean for non existing classes
// Clean for non existing classes
const unknownClasses = (
await this.storage.findAll(
core.class.DocIndexState,
{ objectClass: { $nin: allClasses } },
{ projection: { _id: 1 } }
)
).map((it) => it._id)
if (unknownClasses.length > 0) {
await this.storage.clean(DOMAIN_DOC_INDEX_STATE, unknownClasses)
}
const unknownClasses = (
await this.storage.findAll(
core.class.DocIndexState,
{ objectClass: { $nin: allClasses } },
{ projection: { _id: 1 } }
)
).map((it) => it._id)
if (unknownClasses.length > 0) {
await this.storage.clean(DOMAIN_DOC_INDEX_STATE, unknownClasses)
}
})
}
}