UBER-1178: rework indexing fields (#4261)

Signed-off-by: Vyacheslav Tumanov <me@slavatumanov.me>
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
Co-authored-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Vyacheslav Tumanov 2023-12-27 12:29:30 +05:00 committed by GitHub
parent 66368621a0
commit 689e2d4caa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 56 additions and 39 deletions

View File

@ -132,16 +132,7 @@ export function docUpdKey (name: string, opt?: IndexKeyOptions): string {
*/
export function docKey (name: string, opt?: IndexKeyOptions): string {
const extra = opt?.extra !== undefined && opt?.extra?.length > 0 ? `#${opt.extra?.join('#') ?? ''}` : ''
let key =
(opt?.docId !== undefined ? opt.docId.split('.').join('_') + '|' : '') +
(opt?._class === undefined ? name : `${opt?._class}%${name}${extra}`)
if (opt?.refAttribute !== undefined) {
key = `${opt?.refAttribute}->${key}`
}
if (opt?.refAttribute !== undefined || (opt?.relative !== undefined && opt?.relative)) {
key = '|' + key
}
return key
return opt?._class === undefined ? name : `${opt?._class}%${name}${extra}`
}
/**

View File

@ -302,7 +302,7 @@ function updateDoc2Elastic (
const docIdAttr = docKey(attr, docKeyOpts)
if (vv !== null) {
// Since we replace array of values, we could ignore null
doc[docIdAttr] = [...(doc[docIdAttr] ?? [])]
doc[docIdAttr] = typeof doc[docIdAttr] === 'string' ? [doc[docIdAttr]] : [...(doc[docIdAttr] ?? [])]
if (vv !== '') {
if (typeof vv !== 'object') {
doc[docIdAttr].push(vv)

View File

@ -438,8 +438,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
`Full text: Indexing ${this.indexId} ${st.stageId}`,
Object.entries(this.currentStages)
.map((it) => `${it[0]}:${it[1]}`)
.join(' '),
result.total
.join(' ')
)
} else {
// Nothing to index, check on next cycle.
@ -569,9 +568,10 @@ export class FullTextIndexPipeline implements FullTextPipeline {
return toIndex
}
// TODO: Move to migration
async checkIndexConsistency (dbStorage: ServerStorage): Promise<void> {
await rateLimitter.exec(async () => {
if (process.env.MODEL_VERSION !== undefined) {
if (process.env.MODEL_VERSION !== undefined && process.env.MODEL_VERSION !== '') {
const modelVersion = (await this.model.findAll(core.class.Version, {}))[0]
if (modelVersion !== undefined) {
const modelVersionString = versionToString(modelVersion)
@ -607,23 +607,29 @@ export class FullTextIndexPipeline implements FullTextPipeline {
if (this.cancelling) {
return
}
const newDocs: DocIndexState[] = (
await dbStorage.findAll<Doc>(
this.metrics,
c,
{ _class: c, _id: { $nin: states } },
{ limit: 1000, projection: { _id: 1, attachedTo: 1, attachedToClass: 1 } as any }
)
).map((it) => {
return createStateDoc(it._id, c, {
stages: {},
attributes: {},
removed: false,
space: it.space,
attachedTo: (it as AttachedDoc)?.attachedTo ?? undefined,
attachedToClass: (it as AttachedDoc)?.attachedToClass ?? undefined
let newDocs: DocIndexState[] = []
try {
newDocs = (
await dbStorage.findAll<Doc>(
this.metrics,
c,
{ _class: c, _id: { $nin: states } },
{ limit: 500, projection: { _id: 1, attachedTo: 1, attachedToClass: 1 } as any }
)
).map((it) => {
return createStateDoc(it._id, c, {
stages: {},
attributes: {},
removed: false,
space: it.space,
attachedTo: (it as AttachedDoc)?.attachedTo ?? undefined,
attachedToClass: (it as AttachedDoc)?.attachedToClass ?? undefined
})
})
})
} catch (e) {
console.error(e)
break
}
states.push(...newDocs.map((it) => it._id))
@ -647,15 +653,21 @@ export class FullTextIndexPipeline implements FullTextPipeline {
// Clean for non existing classes
const unknownClasses = (
await this.storage.findAll(
while (true) {
const docRefs = await this.storage.findAll(
core.class.DocIndexState,
{ objectClass: { $nin: allClasses } },
{ projection: { _id: 1 } }
{ projection: { _id: 1, objectClass: 1 }, limit: 10000 }
)
).map((it) => it._id)
if (unknownClasses.length > 0) {
await this.storage.clean(DOMAIN_DOC_INDEX_STATE, unknownClasses)
const unknownClasses = docRefs.map((it) => it._id)
console.log('cleaning', docRefs.length, Array.from(new Set(docRefs.map((it) => it.objectClass))).join(', '))
if (unknownClasses.length > 0) {
await this.storage.clean(DOMAIN_DOC_INDEX_STATE, unknownClasses)
} else {
break
}
}
})
}

View File

@ -102,9 +102,9 @@ export const contentStageId = 'cnt-v2b'
/**
* @public
*/
export const fieldStateId = 'fld-v10'
export const fieldStateId = 'fld-v11'
/**
* @public
*/
export const fullTextPushStageId = 'fts-v6'
export const fullTextPushStageId = 'fts-v7'

View File

@ -9,6 +9,7 @@ interface IndexedReader {
getDoc: (attribute: string) => IndexedReader | undefined
}
// TODO: Rework to use mongo
function createIndexedReader (
_class: Ref<Class<Doc>>,
hierarchy: Hierarchy,

View File

@ -71,7 +71,20 @@ class ElasticAdapter implements FullTextAdapter {
if (field !== undefined) {
console.log('Mapping', mappings.body)
}
const wsMappings = mappings.body[toWorkspaceString(this.workspaceId)]
let wsMappings = mappings.body[toWorkspaceString(this.workspaceId)]
if (Object.keys(wsMappings?.mappings?.properties ?? {}).some((k) => k.includes('->'))) {
await this.client.indices.delete({
index: toWorkspaceString(this.workspaceId)
})
const createIndex = await this.client.indices.create({
index: toWorkspaceString(this.workspaceId)
})
console.log('recreate index', createIndex)
const mappings = await this.client.indices.getMapping({
index: toWorkspaceString(this.workspaceId)
})
wsMappings = mappings.body[toWorkspaceString(this.workspaceId)]
}
// Collect old values.
for (const [k, v] of Object.entries(wsMappings?.mappings?.properties ?? {})) {