mirror of
https://github.com/hcengineering/platform.git
synced 2024-11-25 19:58:30 +03:00
UBERF-6296: Fix elastic queries (#5155)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
parent
0075f86bec
commit
1e4978cfb8
@ -14,6 +14,7 @@
|
||||
//
|
||||
|
||||
import core, {
|
||||
Blob,
|
||||
Class,
|
||||
CollaborativeDoc,
|
||||
Doc,
|
||||
@ -106,7 +107,7 @@ export class CollaborativeContentRetrievalStage implements FullTextPipelineStage
|
||||
if (collaborativeDoc !== undefined && collaborativeDoc !== '') {
|
||||
const { documentId } = parseCollaborativeDoc(collaborativeDoc)
|
||||
|
||||
const docInfo: any | undefined = await this.storageAdapter?.stat(this.metrics, this.workspace, documentId)
|
||||
const docInfo: Blob | undefined = await this.storageAdapter?.stat(this.metrics, this.workspace, documentId)
|
||||
|
||||
if (docInfo !== undefined) {
|
||||
const digest = docInfo.etag
|
||||
@ -114,7 +115,7 @@ export class CollaborativeContentRetrievalStage implements FullTextPipelineStage
|
||||
if (doc.attributes[digestKey] !== digest) {
|
||||
;(update as any)[docUpdKey(digestKey)] = digest
|
||||
|
||||
const contentType = ((docInfo.metaData['content-type'] as string) ?? '').split(';')[0]
|
||||
const contentType = (docInfo.contentType ?? '').split(';')[0]
|
||||
const readable = await this.storageAdapter?.get(this.metrics, this.workspace, documentId)
|
||||
|
||||
if (readable !== undefined) {
|
||||
|
@ -65,12 +65,7 @@ export class FullTextIndex implements WithFind {
|
||||
) {
|
||||
if (!upgrade) {
|
||||
// Schedule indexing after consistency check
|
||||
this.consistency = this.indexer.checkIndexConsistency(dbStorage)
|
||||
|
||||
// Schedule indexing after consistency check
|
||||
void this.consistency.then(() => {
|
||||
void this.indexer.startIndexing()
|
||||
})
|
||||
void this.indexer.startIndexing()
|
||||
}
|
||||
}
|
||||
|
||||
@ -116,13 +111,14 @@ export class FullTextIndex implements WithFind {
|
||||
// Object created and deleted, skip index
|
||||
stDocs.delete(cud.objectId as Ref<DocIndexState>)
|
||||
continue
|
||||
} else if (old !== undefined) {
|
||||
} else {
|
||||
// Create and update
|
||||
if (old.removed) continue
|
||||
if (old?.removed === true) continue
|
||||
else {
|
||||
stDocs.set(cud.objectId as Ref<DocIndexState>, {
|
||||
...old,
|
||||
updated: cud._class !== core.class.TxRemoveDoc,
|
||||
create: cud._class !== core.class.TxRemoveDoc ? old?.create : undefined,
|
||||
updated: cud._class !== core.class.TxRemoveDoc && old?.create === undefined,
|
||||
removed: cud._class === core.class.TxRemoveDoc
|
||||
})
|
||||
}
|
||||
|
@ -14,6 +14,7 @@
|
||||
//
|
||||
|
||||
import core, {
|
||||
type Blob,
|
||||
type Class,
|
||||
type Doc,
|
||||
type DocIndexState,
|
||||
@ -100,10 +101,10 @@ export class ContentRetrievalStage implements FullTextPipelineStage {
|
||||
// We need retrieve value of attached document content.
|
||||
const ref = doc.attributes[docKey(val.name, { _class: val.attributeOf })] as Ref<Doc>
|
||||
if (ref !== undefined && ref !== '') {
|
||||
const docInfo: any | undefined = await this.storageAdapter?.stat(this.metrics, this.workspace, ref)
|
||||
const docInfo: Blob | undefined = await this.storageAdapter?.stat(this.metrics, this.workspace, ref)
|
||||
if (docInfo !== undefined && docInfo.size < 30 * 1024 * 1024) {
|
||||
// We have blob, we need to decode it to string.
|
||||
const contentType = ((docInfo.metaData['content-type'] as string) ?? '').split(';')[0]
|
||||
const contentType = (docInfo.contentType ?? '').split(';')[0]
|
||||
|
||||
if (!contentType.includes('image')) {
|
||||
const digest = docInfo.etag
|
||||
|
@ -258,6 +258,9 @@ function updateDoc2Elastic (
|
||||
}
|
||||
|
||||
let vv: any = v
|
||||
if (vv != null && extra.includes('base64')) {
|
||||
vv = Buffer.from(v, 'base64').toString()
|
||||
}
|
||||
try {
|
||||
const attribute = hierarchy?.getAttribute(_class ?? doc._class[0], attr)
|
||||
if (attribute !== undefined && vv != null) {
|
||||
@ -278,10 +281,6 @@ function updateDoc2Elastic (
|
||||
}
|
||||
} catch (e) {}
|
||||
|
||||
if (vv != null && extra.includes('base64')) {
|
||||
vv = Buffer.from(v, 'base64').toString()
|
||||
}
|
||||
|
||||
docId = docIdOverride ?? docId
|
||||
if (docId === undefined) {
|
||||
if (typeof vv !== 'object' || isCustomAttr(k)) {
|
||||
|
@ -14,10 +14,8 @@
|
||||
//
|
||||
|
||||
import core, {
|
||||
type AttachedDoc,
|
||||
type Class,
|
||||
DOMAIN_DOC_INDEX_STATE,
|
||||
DOMAIN_FULLTEXT_BLOB,
|
||||
type Doc,
|
||||
type DocIndexState,
|
||||
type DocumentQuery,
|
||||
@ -26,22 +24,18 @@ import core, {
|
||||
type MeasureContext,
|
||||
type ModelDb,
|
||||
type Ref,
|
||||
type ServerStorage,
|
||||
SortingOrder,
|
||||
TxFactory,
|
||||
type WorkspaceId,
|
||||
_getOperator,
|
||||
docKey,
|
||||
generateId,
|
||||
setObjectValue,
|
||||
toFindResult,
|
||||
versionToString
|
||||
toFindResult
|
||||
} from '@hcengineering/core'
|
||||
import { type DbAdapter } from '../adapter'
|
||||
import { RateLimiter } from '../limitter'
|
||||
import type { IndexedDoc } from '../types'
|
||||
import { type FullTextPipeline, type FullTextPipelineStage } from './types'
|
||||
import { createStateDoc, isClassIndexable } from './utils'
|
||||
|
||||
export * from './content'
|
||||
export * from './field'
|
||||
@ -612,162 +606,4 @@ export class FullTextIndexPipeline implements FullTextPipeline {
|
||||
}
|
||||
return toIndex
|
||||
}
|
||||
|
||||
// TODO: Move to migration
|
||||
async checkIndexConsistency (dbStorage: ServerStorage): Promise<void> {
|
||||
await rateLimiter.exec(async () => {
|
||||
await this.metrics.with('check-index-consistency', {}, async (ctx) => {
|
||||
if (process.env.MODEL_VERSION !== undefined && process.env.MODEL_VERSION !== '') {
|
||||
const modelVersion = (await this.model.findAll(core.class.Version, {}))[0]
|
||||
if (modelVersion !== undefined) {
|
||||
const modelVersionString = versionToString(modelVersion)
|
||||
if (modelVersionString !== process.env.MODEL_VERSION) {
|
||||
console.error(
|
||||
`Indexer: Model version mismatch model: ${modelVersionString} env: ${process.env.MODEL_VERSION}`
|
||||
)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
this.hierarchy.domains()
|
||||
const allClasses = this.hierarchy.getDescendants(core.class.Doc)
|
||||
for (const c of allClasses) {
|
||||
if (this.cancelling) {
|
||||
return
|
||||
}
|
||||
|
||||
if (!isClassIndexable(this.hierarchy, c)) {
|
||||
// No need, since no indexable fields or attachments.
|
||||
continue
|
||||
}
|
||||
|
||||
console.log(this.workspace.name, 'checking index', c)
|
||||
|
||||
const generationId = generateId()
|
||||
|
||||
let lastId = ''
|
||||
|
||||
while (true) {
|
||||
if (this.cancelling) {
|
||||
return
|
||||
}
|
||||
|
||||
let newDocs: DocIndexState[] = []
|
||||
let updates = new Map<Ref<DocIndexState>, DocumentUpdate<DocIndexState>>()
|
||||
|
||||
try {
|
||||
const docs = await dbStorage.findAll<Doc>(
|
||||
ctx,
|
||||
c,
|
||||
{ _class: c, _id: { $gt: lastId as any } },
|
||||
{
|
||||
limit: 10000,
|
||||
sort: { _id: 1 },
|
||||
projection: { _id: 1, attachedTo: 1, attachedToClass: 1 } as any,
|
||||
prefix: 'indexer'
|
||||
}
|
||||
)
|
||||
|
||||
if (docs.length === 0) {
|
||||
// All updated for this class
|
||||
break
|
||||
}
|
||||
|
||||
lastId = docs[docs.length - 1]._id
|
||||
|
||||
const states = (
|
||||
await this.storage.findAll(
|
||||
ctx,
|
||||
core.class.DocIndexState,
|
||||
{
|
||||
objectClass: c,
|
||||
_id: {
|
||||
$gte: docs[0]._id as any,
|
||||
$lte: docs[docs.length - 1]._id as any
|
||||
}
|
||||
},
|
||||
{ projection: { _id: 1 } }
|
||||
)
|
||||
).map((it) => it._id)
|
||||
const statesSet = new Set(states)
|
||||
|
||||
// create missing index states
|
||||
newDocs = docs
|
||||
.filter((it) => !statesSet.has(it._id as Ref<DocIndexState>))
|
||||
.map((it) => {
|
||||
return createStateDoc(it._id, c, {
|
||||
generationId,
|
||||
stages: {},
|
||||
attributes: {},
|
||||
removed: false,
|
||||
space: it.space,
|
||||
attachedTo: (it as AttachedDoc)?.attachedTo ?? undefined,
|
||||
attachedToClass: (it as AttachedDoc)?.attachedToClass ?? undefined
|
||||
})
|
||||
})
|
||||
|
||||
// update generationId for existing index states
|
||||
updates = new Map()
|
||||
docs
|
||||
.filter((it) => statesSet.has(it._id as Ref<DocIndexState>))
|
||||
.forEach((it) => {
|
||||
updates.set(it._id as Ref<DocIndexState>, { generationId })
|
||||
})
|
||||
} catch (e) {
|
||||
console.error(e)
|
||||
break
|
||||
}
|
||||
|
||||
try {
|
||||
await this.storage.update(DOMAIN_DOC_INDEX_STATE, updates)
|
||||
} catch (err: any) {
|
||||
console.error(err)
|
||||
}
|
||||
|
||||
try {
|
||||
await this.storage.upload(DOMAIN_DOC_INDEX_STATE, newDocs)
|
||||
} catch (err: any) {
|
||||
console.error(err)
|
||||
}
|
||||
}
|
||||
|
||||
// remove index states for documents that do not exist
|
||||
const toRemove = (
|
||||
await this.storage.findAll(
|
||||
ctx,
|
||||
core.class.DocIndexState,
|
||||
{ objectClass: c, generationId: { $ne: generationId } },
|
||||
{ projection: { _id: 1 } }
|
||||
)
|
||||
).map((it) => it._id)
|
||||
|
||||
if (toRemove.length > 0) {
|
||||
await this.storage.clean(DOMAIN_DOC_INDEX_STATE, toRemove)
|
||||
await this.storage.clean(DOMAIN_FULLTEXT_BLOB, toRemove)
|
||||
}
|
||||
}
|
||||
|
||||
// Clean for non existing classes
|
||||
|
||||
while (true) {
|
||||
const docRefs = await this.storage.findAll(
|
||||
ctx,
|
||||
core.class.DocIndexState,
|
||||
{ objectClass: { $nin: allClasses } },
|
||||
{ projection: { _id: 1, objectClass: 1 }, limit: 10000 }
|
||||
)
|
||||
const unknownClasses = docRefs.map((it) => it._id)
|
||||
|
||||
console.log('cleaning', docRefs.length, Array.from(new Set(docRefs.map((it) => it.objectClass))).join(', '))
|
||||
|
||||
if (unknownClasses.length > 0) {
|
||||
await this.storage.clean(DOMAIN_DOC_INDEX_STATE, unknownClasses)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -98,7 +98,7 @@ export interface FullTextPipelineStage {
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export const contentStageId = 'cnt-v2b'
|
||||
export const contentStageId = 'cnt-v3'
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
@ -107,4 +107,4 @@ export const fieldStateId = 'fld-v13b'
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export const fullTextPushStageId = 'fts-v11b'
|
||||
export const fullTextPushStageId = 'fts-v12'
|
||||
|
@ -194,7 +194,7 @@ class ElasticAdapter implements FullTextAdapter {
|
||||
},
|
||||
{
|
||||
match: {
|
||||
workspaceId: toWorkspaceString(this.workspaceId)
|
||||
workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' }
|
||||
}
|
||||
}
|
||||
]
|
||||
@ -299,7 +299,7 @@ class ElasticAdapter implements FullTextAdapter {
|
||||
},
|
||||
{
|
||||
match: {
|
||||
workspaceId: toWorkspaceString(this.workspaceId)
|
||||
workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' }
|
||||
}
|
||||
}
|
||||
],
|
||||
@ -407,7 +407,7 @@ class ElasticAdapter implements FullTextAdapter {
|
||||
],
|
||||
must: {
|
||||
match: {
|
||||
workspaceId: toWorkspaceString(this.workspaceId)
|
||||
workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' }
|
||||
}
|
||||
},
|
||||
filter: [
|
||||
@ -526,7 +526,7 @@ class ElasticAdapter implements FullTextAdapter {
|
||||
},
|
||||
{
|
||||
match: {
|
||||
workspaceId: toWorkspaceString(this.workspaceId)
|
||||
workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' }
|
||||
}
|
||||
}
|
||||
]
|
||||
@ -562,7 +562,7 @@ class ElasticAdapter implements FullTextAdapter {
|
||||
},
|
||||
{
|
||||
match: {
|
||||
workspaceId: toWorkspaceString(this.workspaceId)
|
||||
workspaceId: { query: toWorkspaceString(this.workspaceId), operator: 'and' }
|
||||
}
|
||||
}
|
||||
]
|
||||
|
@ -111,20 +111,24 @@ export class MinioService implements StorageAdapter {
|
||||
}
|
||||
}
|
||||
|
||||
async stat (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<Blob> {
|
||||
const result = await this.client.statObject(getBucketId(workspaceId), objectName)
|
||||
return {
|
||||
provider: '',
|
||||
_class: core.class.Blob,
|
||||
_id: objectName as Ref<Blob>,
|
||||
storageId: objectName,
|
||||
contentType: result.metaData['content-type'],
|
||||
size: result.size,
|
||||
etag: result.etag,
|
||||
space: core.space.Configuration,
|
||||
modifiedBy: core.account.System,
|
||||
modifiedOn: result.lastModified.getTime(),
|
||||
version: result.versionId ?? null
|
||||
async stat (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<Blob | undefined> {
|
||||
try {
|
||||
const result = await this.client.statObject(getBucketId(workspaceId), objectName)
|
||||
return {
|
||||
provider: '',
|
||||
_class: core.class.Blob,
|
||||
_id: objectName as Ref<Blob>,
|
||||
storageId: objectName,
|
||||
contentType: result.metaData['content-type'],
|
||||
size: result.size,
|
||||
etag: result.etag,
|
||||
space: core.space.Configuration,
|
||||
modifiedBy: core.account.System,
|
||||
modifiedOn: result.lastModified.getTime(),
|
||||
version: result.versionId ?? null
|
||||
}
|
||||
} catch (err: any) {
|
||||
await ctx.error('no object found', err)
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user