mirror of
https://github.com/hcengineering/platform.git
synced 2024-12-22 11:01:54 +03:00
UBERF-8499: Optimize indexer operation (#6959)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
parent
51679bf82c
commit
f89df5921a
@ -955,7 +955,8 @@ export function createModel (builder: Builder): void {
|
||||
)
|
||||
|
||||
// Allow to use fuzzy search for mixins
|
||||
builder.mixin(contact.class.Contact, core.class.Class, core.mixin.FullTextSearchContext, {
|
||||
builder.createDoc(core.class.FullTextSearchContext, core.space.Model, {
|
||||
toClass: contact.class.Contact,
|
||||
fullTextSummary: true
|
||||
})
|
||||
|
||||
|
@ -549,7 +549,8 @@ export function createModel (builder: Builder): void {
|
||||
func: documents.function.GetAllDocumentStates
|
||||
})
|
||||
|
||||
builder.mixin(documents.class.Document, core.class.Class, core.mixin.FullTextSearchContext, {
|
||||
builder.createDoc(core.class.FullTextSearchContext, core.space.Model, {
|
||||
toClass: documents.class.Document,
|
||||
fullTextSummary: true,
|
||||
childProcessingAllowed: true
|
||||
})
|
||||
@ -886,11 +887,13 @@ export function defineNotifications (builder: Builder): void {
|
||||
}
|
||||
|
||||
export function defineSearch (builder: Builder): void {
|
||||
builder.mixin(documents.class.Document, core.class.Class, core.mixin.FullTextSearchContext, {
|
||||
builder.createDoc(core.class.FullTextSearchContext, core.space.Model, {
|
||||
toClass: documents.class.Document,
|
||||
parentPropagate: true
|
||||
})
|
||||
|
||||
builder.mixin(documents.class.DocumentMeta, core.class.Class, core.mixin.FullTextSearchContext, {
|
||||
builder.createDoc(core.class.FullTextSearchContext, core.space.Model, {
|
||||
toClass: documents.class.DocumentMeta,
|
||||
fullTextSummary: true,
|
||||
childProcessingAllowed: true,
|
||||
propagate: []
|
||||
|
@ -361,8 +361,10 @@ export class TDocIndexState extends TDoc implements DocIndexState {
|
||||
generationId?: string
|
||||
}
|
||||
|
||||
@MMixin(core.mixin.FullTextSearchContext, core.class.Class)
|
||||
export class TFullTextSearchContext extends TClass implements FullTextSearchContext {}
|
||||
@Model(core.class.FullTextSearchContext, core.class.Doc, DOMAIN_MODEL)
|
||||
export class TFullTextSearchContext extends TDoc implements FullTextSearchContext {
|
||||
toClass!: Ref<Class<Doc<Space>>>
|
||||
}
|
||||
|
||||
@MMixin(core.mixin.ConfigurationElement, core.class.Class)
|
||||
export class TConfigurationElement extends TClass implements ConfigurationElement {
|
||||
|
@ -309,7 +309,8 @@ export function createModel (builder: Builder): void {
|
||||
]
|
||||
})
|
||||
|
||||
builder.mixin(core.class.Space, core.class.Class, core.mixin.FullTextSearchContext, {
|
||||
builder.createDoc(core.class.FullTextSearchContext, core.space.Model, {
|
||||
toClass: core.class.Space,
|
||||
childProcessingAllowed: false
|
||||
})
|
||||
|
||||
|
@ -262,7 +262,11 @@ export const coreOperation: MigrateOperation = {
|
||||
async migrate (client: MigrationClient): Promise<void> {
|
||||
// We need to delete all documents in doc index state for missing classes
|
||||
const allClasses = client.hierarchy.getDescendants(core.class.Doc)
|
||||
const allIndexed = allClasses.filter((it) => isClassIndexable(client.hierarchy, it))
|
||||
const contexts = new Map(
|
||||
client.model.findAllSync(core.class.FullTextSearchContext, {}).map((it) => [it.toClass, it])
|
||||
)
|
||||
|
||||
const allIndexed = allClasses.filter((it) => isClassIndexable(client.hierarchy, it, contexts))
|
||||
|
||||
// Next remove all non indexed classes and missing classes as well.
|
||||
await client.update(
|
||||
|
@ -214,7 +214,8 @@ export function createModel (builder: Builder): void {
|
||||
gmail.action.WriteEmail
|
||||
)
|
||||
|
||||
builder.mixin(gmail.class.Message, core.class.Class, core.mixin.FullTextSearchContext, {
|
||||
builder.createDoc(core.class.FullTextSearchContext, core.space.Model, {
|
||||
toClass: gmail.class.Message,
|
||||
parentPropagate: false
|
||||
})
|
||||
|
||||
|
@ -1438,13 +1438,15 @@ export function createModel (builder: Builder): void {
|
||||
)
|
||||
|
||||
// Allow to use fuzzy search for mixins
|
||||
builder.mixin(recruit.class.Vacancy, core.class.Class, core.mixin.FullTextSearchContext, {
|
||||
builder.createDoc(core.class.FullTextSearchContext, core.space.Model, {
|
||||
toClass: recruit.class.Vacancy,
|
||||
fullTextSummary: true,
|
||||
childProcessingAllowed: true,
|
||||
propagate: []
|
||||
})
|
||||
|
||||
builder.mixin(recruit.mixin.Candidate, core.class.Class, core.mixin.FullTextSearchContext, {
|
||||
builder.createDoc(core.class.FullTextSearchContext, core.space.Model, {
|
||||
toClass: recruit.mixin.Candidate,
|
||||
fullTextSummary: true,
|
||||
propagate: [recruit.class.Applicant],
|
||||
childProcessingAllowed: true,
|
||||
@ -1457,7 +1459,8 @@ export function createModel (builder: Builder): void {
|
||||
})
|
||||
|
||||
// Allow to use fuzzy search for mixins
|
||||
builder.mixin(recruit.class.Applicant, core.class.Class, core.mixin.FullTextSearchContext, {
|
||||
builder.createDoc(core.class.FullTextSearchContext, core.space.Model, {
|
||||
toClass: recruit.class.Applicant,
|
||||
fullTextSummary: true,
|
||||
forceIndex: true,
|
||||
childProcessingAllowed: true,
|
||||
|
@ -179,7 +179,8 @@ export function createModel (builder: Builder): void {
|
||||
telegram.ids.TelegramMessageSharedActivityViewlet
|
||||
)
|
||||
|
||||
builder.mixin(telegram.class.Message, core.class.Class, core.mixin.FullTextSearchContext, {
|
||||
builder.createDoc(core.class.FullTextSearchContext, core.space.Model, {
|
||||
toClass: telegram.class.Message,
|
||||
parentPropagate: false,
|
||||
childProcessingAllowed: true
|
||||
})
|
||||
|
@ -578,7 +578,8 @@ export interface BlobLookup extends Blob {
|
||||
*
|
||||
* If defined for class, this class will be enabled for embedding search like openai.
|
||||
*/
|
||||
export interface FullTextSearchContext extends Class<Doc> {
|
||||
export interface FullTextSearchContext extends Doc {
|
||||
toClass: Ref<Class<Doc>>
|
||||
fullTextSummary?: boolean
|
||||
forceIndex?: boolean
|
||||
|
||||
|
@ -142,10 +142,10 @@ export default plugin(coreId, {
|
||||
StatusCategory: '' as Ref<Class<StatusCategory>>,
|
||||
MigrationState: '' as Ref<Class<MigrationState>>,
|
||||
|
||||
BenchmarkDoc: '' as Ref<Class<BenchmarkDoc>>
|
||||
BenchmarkDoc: '' as Ref<Class<BenchmarkDoc>>,
|
||||
FullTextSearchContext: '' as Ref<Mixin<FullTextSearchContext>>
|
||||
},
|
||||
mixin: {
|
||||
FullTextSearchContext: '' as Ref<Mixin<FullTextSearchContext>>,
|
||||
ConfigurationElement: '' as Ref<Mixin<ConfigurationElement>>,
|
||||
IndexConfiguration: '' as Ref<Mixin<IndexingConfiguration<Doc>>>,
|
||||
SpacesTypeData: '' as Ref<Mixin<Space>>
|
||||
|
@ -46,9 +46,9 @@ import core from './component'
|
||||
import { Hierarchy } from './hierarchy'
|
||||
import { TxOperations } from './operations'
|
||||
import { isPredicate } from './predicate'
|
||||
import { Branding, BrandingMap } from './server'
|
||||
import { DocumentQuery, FindResult } from './storage'
|
||||
import { DOMAIN_TX } from './tx'
|
||||
import { Branding, BrandingMap } from './server'
|
||||
|
||||
function toHex (value: number, chars: number): string {
|
||||
const result = value.toString(16)
|
||||
@ -686,39 +686,48 @@ export function getFullTextIndexableAttributes (
|
||||
return result
|
||||
}
|
||||
|
||||
const ctxKey = 'indexer_ftc'
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export function getFullTextContext (
|
||||
hierarchy: Hierarchy,
|
||||
objectClass: Ref<Class<Doc>>
|
||||
objectClass: Ref<Class<Doc>>,
|
||||
contexts: Map<Ref<Class<Doc>>, FullTextSearchContext>
|
||||
): Omit<FullTextSearchContext, keyof Class<Doc>> {
|
||||
let objClass = hierarchy.getClass(objectClass)
|
||||
|
||||
while (true) {
|
||||
if (hierarchy.hasMixin(objClass, core.mixin.FullTextSearchContext)) {
|
||||
const ctx = hierarchy.as<Class<Doc>, FullTextSearchContext>(objClass, core.mixin.FullTextSearchContext)
|
||||
let ctx: Omit<FullTextSearchContext, keyof Class<Doc>> | undefined = hierarchy.getClassifierProp(objectClass, ctxKey)
|
||||
if (ctx !== undefined) {
|
||||
return ctx
|
||||
}
|
||||
if (typeof ctx !== 'string') {
|
||||
const anc = hierarchy.getAncestors(objectClass)
|
||||
for (const oc of anc) {
|
||||
const ctx = contexts.get(oc)
|
||||
if (ctx !== undefined) {
|
||||
hierarchy.setClassifierProp(objectClass, ctxKey, ctx)
|
||||
return ctx
|
||||
}
|
||||
if (objClass.extends === undefined) {
|
||||
break
|
||||
}
|
||||
objClass = hierarchy.getClass(objClass.extends)
|
||||
}
|
||||
return {
|
||||
ctx = {
|
||||
toClass: objectClass,
|
||||
fullTextSummary: false,
|
||||
forceIndex: false,
|
||||
propagate: [],
|
||||
childProcessingAllowed: true
|
||||
}
|
||||
hierarchy.setClassifierProp(objectClass, ctxKey, ctx)
|
||||
return ctx
|
||||
}
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export function isClassIndexable (hierarchy: Hierarchy, c: Ref<Class<Doc>>): boolean {
|
||||
export function isClassIndexable (
|
||||
hierarchy: Hierarchy,
|
||||
c: Ref<Class<Doc>>,
|
||||
contexts: Map<Ref<Class<Doc>>, FullTextSearchContext>
|
||||
): boolean {
|
||||
const indexed = hierarchy.getClassifierProp(c, 'class_indexed')
|
||||
if (indexed !== undefined) {
|
||||
return indexed as boolean
|
||||
@ -756,13 +765,13 @@ export function isClassIndexable (hierarchy: Hierarchy, c: Ref<Class<Doc>>): boo
|
||||
|
||||
let result = true
|
||||
|
||||
if (attrs.length === 0 && !(getFullTextContext(hierarchy, c)?.forceIndex ?? false)) {
|
||||
if (attrs.length === 0 && !(getFullTextContext(hierarchy, c, contexts)?.forceIndex ?? false)) {
|
||||
result = false
|
||||
// We need check if document has collections with indexable fields.
|
||||
const attrs = hierarchy.getAllAttributes(c).values()
|
||||
for (const attr of attrs) {
|
||||
if (attr.type._class === core.class.Collection) {
|
||||
if (isClassIndexable(hierarchy, (attr.type as Collection<AttachedDoc>).of)) {
|
||||
if (isClassIndexable(hierarchy, (attr.type as Collection<AttachedDoc>).of, contexts)) {
|
||||
result = true
|
||||
break
|
||||
}
|
||||
|
@ -8,7 +8,7 @@
|
||||
"template": "@hcengineering/node-package",
|
||||
"license": "EPL-2.0",
|
||||
"scripts": {
|
||||
"start": "rush bundle --to @hcengineering/pod-server && cross-env NODE_ENV=production ELASTIC_INDEX_NAME=local_storage_index MODEL_VERSION=$(node ../../common/scripts/show_version.js) ACCOUNTS_URL=http://localhost:3000 REKONI_URL=http://localhost:4004 MONGO_URL=mongodb://localhost:27017 ELASTIC_URL=http://localhost:9200 FRONT_URL=http://localhost:8087 UPLOAD_URL=/upload MINIO_ENDPOINT=localhost MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin METRICS_CONSOLE=true SERVER_SECRET=secret OPERATION_PROFILING=false MODEL_JSON=../../models/all/bundle/model.json node bundle/bundle.js",
|
||||
"start": "rush bundle --to @hcengineering/pod-server && cross-env NODE_ENV=production ELASTIC_INDEX_NAME=local_storage_index MODEL_VERSION=$(node ../../common/scripts/show_version.js) ACCOUNTS_URL=http://localhost:3000 REKONI_URL=http://localhost:4004 MONGO_URL=mongodb://localhost:27017 DB_URL=mongodb://localhost:27017 ELASTIC_URL=http://localhost:9200 FRONT_URL=http://localhost:8087 UPLOAD_URL=/upload MINIO_ENDPOINT=localhost MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin METRICS_CONSOLE=true SERVER_SECRET=secret OPERATION_PROFILING=false MODEL_JSON=../../models/all/bundle/model.json node --inspect bundle/bundle.js",
|
||||
"start-u": "rush bundle --to @hcengineering/pod-server && ./bundle/ && cross-env NODE_ENV=production SERVER_PROVIDER=uweb ELASTIC_INDEX_NAME=local_storage_index MODEL_VERSION=$(node ../../common/scripts/show_version.js) ACCOUNTS_URL=http://localhost:3000 REKONI_URL=http://localhost:4004 MONGO_URL=mongodb://localhost:27017 ELASTIC_URL=http://localhost:9200 FRONT_URL=http://localhost:8087 UPLOAD_URL=/upload MINIO_ENDPOINT=localhost MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin METRICS_CONSOLE=true SERVER_SECRET=secret MODEL_JSON=../../models/all/bundle/model.json node bundle/bundle.js",
|
||||
"start-flame": "rush bundle --to @hcengineering/pod-server && cross-env NODE_ENV=production ELASTIC_INDEX_NAME=local_storage_index MODEL_VERSION=$(node ../../common/scripts/show_version.js) ACCOUNTS_URL=http://localhost:3000 REKONI_URL=http://localhost:4004 MONGO_URL=mongodb://localhost:27017 ELASTIC_URL=http://localhost:9200 FRONT_URL=http://localhost:8087 UPLOAD_URL=/upload MINIO_ENDPOINT=localhost MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin METRICS_CONSOLE=true SERVER_SECRET=secret MODEL_JSON=../../models/all/bundle/model.json clinic flame --dest ./out -- node --nolazy -r ts-node/register --enable-source-maps src/__start.ts",
|
||||
"build": "compile",
|
||||
|
@ -522,7 +522,7 @@ class ElasticAdapter implements FullTextAdapter {
|
||||
async updateMany (docs: IndexedDoc[]): Promise<TxResult[]> {
|
||||
const parts = Array.from(docs)
|
||||
while (parts.length > 0) {
|
||||
const part = parts.splice(0, 1000)
|
||||
const part = parts.splice(0, 500)
|
||||
|
||||
const operations = part.flatMap((doc) => {
|
||||
const wsDoc = { workspaceId: this.workspaceString, ...doc }
|
||||
|
@ -45,9 +45,9 @@ import core, {
|
||||
toFindResult
|
||||
} from '@hcengineering/core'
|
||||
import type { FullTextAdapter, IndexedDoc, SessionFindAll, StorageAdapter, WithFind } from '@hcengineering/server-core'
|
||||
import { getScoringConfig, mapSearchResultDoc } from './mapper'
|
||||
import { type FullTextIndexPipeline } from './indexer'
|
||||
import { createStateDoc } from './indexer/utils'
|
||||
import { getScoringConfig, mapSearchResultDoc } from './mapper'
|
||||
|
||||
/**
|
||||
* @public
|
||||
@ -90,7 +90,7 @@ export class FullTextIndex implements WithFind {
|
||||
if (TxProcessor.isExtendsCUD(tx._class)) {
|
||||
const cud = tx as TxCUD<Doc>
|
||||
|
||||
if (!isClassIndexable(this.hierarchy, cud.objectClass)) {
|
||||
if (!isClassIndexable(this.hierarchy, cud.objectClass, this.indexer.contexts)) {
|
||||
// No need, since no indixable fields or attachments.
|
||||
continue
|
||||
}
|
||||
|
@ -50,6 +50,7 @@ export class IndexedFieldStage implements FullTextPipelineStage {
|
||||
updateFields: DocUpdateHandler[] = []
|
||||
|
||||
enabled = true
|
||||
|
||||
constructor (private readonly dbStorageFindAll: SessionFindAll) {}
|
||||
|
||||
async initialize (ctx: MeasureContext, storage: DbAdapter, pipeline: FullTextPipeline): Promise<void> {}
|
||||
@ -150,7 +151,7 @@ export class IndexedFieldStage implements FullTextPipelineStage {
|
||||
}
|
||||
|
||||
if (docState.attachedTo != null && changes > 0) {
|
||||
const ctx = getFullTextContext(pipeline.hierarchy, objClass)
|
||||
const ctx = getFullTextContext(pipeline.hierarchy, objClass, pipeline.contexts)
|
||||
if (ctx.parentPropagate ?? true) {
|
||||
// We need to clear field stage from parent, so it will be re indexed.
|
||||
await pipeline.update(docState.attachedTo as Ref<DocIndexState>, false, {})
|
||||
@ -173,17 +174,13 @@ export class IndexedFieldStage implements FullTextPipelineStage {
|
||||
{
|
||||
attachedTo: ids.length === 1 ? ids[0] : { $in: ids }
|
||||
},
|
||||
{ limit: ids.length }
|
||||
{ limit: ids.length, skipSpace: true, skipClass: true }
|
||||
)
|
||||
)
|
||||
}
|
||||
const childs = allChildDocs.filter((it) => it.attachedTo === docState._id)
|
||||
for (const u of childs) {
|
||||
if (propagate.some((it) => pipeline.hierarchy.isDerived(u.objectClass, it))) {
|
||||
pipeline.add(u)
|
||||
await pipeline.update(u._id, false, {})
|
||||
}
|
||||
}
|
||||
// Marck childs to be indexed on next step
|
||||
await pipeline.queue(metrics, new Map(childs.map((it) => [it._id, { updated: true, removed: false }])))
|
||||
}
|
||||
|
||||
await pipeline.update(docState._id, true, docUpdate)
|
||||
|
@ -30,12 +30,13 @@ import core, {
|
||||
type MeasureContext,
|
||||
RateLimiter,
|
||||
type Ref,
|
||||
SortingOrder,
|
||||
toIdMap,
|
||||
type WorkspaceId
|
||||
} from '@hcengineering/core'
|
||||
import { type DbAdapter, type FullTextAdapter, type IndexedDoc, type SessionFindAll } from '@hcengineering/server-core'
|
||||
import { updateDocWithPresenter } from '../mapper'
|
||||
import { jsonToText, markupToJSON } from '@hcengineering/text'
|
||||
import { updateDocWithPresenter } from '../mapper'
|
||||
import {
|
||||
contentStageId,
|
||||
type DocUpdateHandler,
|
||||
@ -118,7 +119,7 @@ export class FullTextPushStage implements FullTextPipelineStage {
|
||||
|
||||
const childIds = toIndexPart
|
||||
.filter((it) => {
|
||||
const fctx = getFullTextContext(pipeline.hierarchy, it.objectClass)
|
||||
const fctx = getFullTextContext(pipeline.hierarchy, it.objectClass, pipeline.contexts)
|
||||
return fctx.childProcessingAllowed ?? true
|
||||
})
|
||||
.map((it) => it._id)
|
||||
@ -127,9 +128,17 @@ export class FullTextPushStage implements FullTextPipelineStage {
|
||||
'find-child',
|
||||
{},
|
||||
async (ctx) =>
|
||||
await this.dbStorageFindAll(ctx, core.class.DocIndexState, {
|
||||
await this.dbStorageFindAll(
|
||||
ctx,
|
||||
core.class.DocIndexState,
|
||||
{
|
||||
attachedTo: childIds.length === 1 ? childIds[0] : { $in: childIds }
|
||||
})
|
||||
},
|
||||
{
|
||||
skipClass: true,
|
||||
skipSpace: true
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
// spaces
|
||||
@ -138,14 +147,22 @@ export class FullTextPushStage implements FullTextPipelineStage {
|
||||
'find-spaces',
|
||||
{},
|
||||
async (ctx) =>
|
||||
await this.dbStorageFindAll(ctx, core.class.DocIndexState, {
|
||||
await this.dbStorageFindAll(
|
||||
ctx,
|
||||
core.class.DocIndexState,
|
||||
{
|
||||
_id: {
|
||||
$in: toIndexPart.map(
|
||||
(doc) =>
|
||||
(doc.attributes[docKey('space', { _class: doc.objectClass })] ?? doc.space) as Ref<DocIndexState>
|
||||
)
|
||||
}
|
||||
})
|
||||
},
|
||||
{
|
||||
skipClass: true,
|
||||
skipSpace: true
|
||||
}
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
@ -163,7 +180,7 @@ export class FullTextPushStage implements FullTextPipelineStage {
|
||||
const childDocs = allChildDocs.filter((it) => it.attachedTo === doc._id)
|
||||
if (childDocs.length > 0) {
|
||||
for (const c of childDocs) {
|
||||
const fctx = getFullTextContext(pipeline.hierarchy, c.objectClass)
|
||||
const fctx = getFullTextContext(pipeline.hierarchy, c.objectClass, pipeline.contexts)
|
||||
if (fctx.parentPropagate ?? true) {
|
||||
ctx.withSync('updateDoc2Elastic', {}, (ctx) => {
|
||||
updateDoc2Elastic(
|
||||
@ -195,7 +212,11 @@ export class FullTextPushStage implements FullTextPipelineStage {
|
||||
{
|
||||
_id: doc.attachedTo as Ref<DocIndexState>
|
||||
},
|
||||
{ limit: 1 }
|
||||
{
|
||||
limit: 1,
|
||||
skipClass: true,
|
||||
skipSpace: true
|
||||
}
|
||||
)
|
||||
).shift()
|
||||
))
|
||||
@ -217,10 +238,29 @@ export class FullTextPushStage implements FullTextPipelineStage {
|
||||
|
||||
const collectClasses = collectPropagateClasses(pipeline, parentDoc.objectClass)
|
||||
if (collectClasses.length > 0) {
|
||||
const collections = await this.dbStorageFindAll<DocIndexState>(ctx, core.class.DocIndexState, {
|
||||
let last: number = 0
|
||||
while (true) {
|
||||
const collections = await this.dbStorageFindAll<DocIndexState>(
|
||||
ctx,
|
||||
core.class.DocIndexState,
|
||||
{
|
||||
attachedTo: parentDoc._id,
|
||||
objectClass: { $in: collectClasses }
|
||||
})
|
||||
objectClass: { $in: collectClasses },
|
||||
modifiedOn: { $gt: last }
|
||||
},
|
||||
{
|
||||
sort: {
|
||||
modifiedOn: SortingOrder.Ascending
|
||||
},
|
||||
skipClass: true,
|
||||
skipSpace: true,
|
||||
limit: 500
|
||||
}
|
||||
)
|
||||
if (collections.length === 0) {
|
||||
break
|
||||
}
|
||||
last = collections[collections.length - 1].modifiedOn
|
||||
for (const c of collections) {
|
||||
ctx.withSync('updateDoc2Elastic', {}, (ctx) => {
|
||||
updateDoc2Elastic(
|
||||
@ -239,6 +279,7 @@ export class FullTextPushStage implements FullTextPipelineStage {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
const spaceDoc = spaceDocs.get(
|
||||
(doc.attributes[docKey('space', { _class: doc.objectClass })] ?? doc.space) as Ref<DocIndexState>
|
||||
)
|
||||
@ -262,7 +303,7 @@ export class FullTextPushStage implements FullTextPipelineStage {
|
||||
}
|
||||
// Perform bulk update to elastic
|
||||
|
||||
void pushQueue.add(async () => {
|
||||
await pushQueue.exec(async () => {
|
||||
try {
|
||||
try {
|
||||
await ctx.with('push-elastic', {}, async () => {
|
||||
|
@ -21,6 +21,7 @@ import core, {
|
||||
type DocIndexState,
|
||||
type DocumentQuery,
|
||||
type DocumentUpdate,
|
||||
type FullTextSearchContext,
|
||||
type Hierarchy,
|
||||
type MeasureContext,
|
||||
type ModelDb,
|
||||
@ -79,6 +80,10 @@ export class FullTextIndexPipeline implements FullTextPipeline {
|
||||
|
||||
uploadOps: DocIndexState[] = []
|
||||
|
||||
contexts: Map<Ref<Class<Doc>>, FullTextSearchContext>
|
||||
propogage = new Map<Ref<Class<Doc>>, Ref<Class<Doc>>[]>()
|
||||
propogageClasses = new Map<Ref<Class<Doc>>, Ref<Class<Doc>>[]>()
|
||||
|
||||
constructor (
|
||||
private readonly storage: DbAdapter,
|
||||
private readonly stages: FullTextPipelineStage[],
|
||||
@ -90,6 +95,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
|
||||
) {
|
||||
this.readyStages = stages.map((it) => it.stageId)
|
||||
this.readyStages.sort()
|
||||
this.contexts = new Map(model.findAllSync(core.class.FullTextSearchContext, {}).map((it) => [it.toClass, it]))
|
||||
}
|
||||
|
||||
async cancel (): Promise<void> {
|
||||
@ -386,8 +392,6 @@ export class FullTextIndexPipeline implements FullTextPipeline {
|
||||
)
|
||||
|
||||
// Also update doc index state queries.
|
||||
_classes.push(core.class.DocIndexState)
|
||||
|
||||
_classes.forEach((it) => this.broadcastClasses.add(it))
|
||||
|
||||
if (this.triggerCounts > 0) {
|
||||
@ -410,10 +414,16 @@ export class FullTextIndexPipeline implements FullTextPipeline {
|
||||
}
|
||||
}, 5000)
|
||||
|
||||
let notified = false
|
||||
await new Promise((resolve) => {
|
||||
this.triggerIndexing = () => {
|
||||
this.triggerCounts++
|
||||
if (!notified) {
|
||||
notified = true
|
||||
setTimeout(() => {
|
||||
resolve(null)
|
||||
}, 500) // Start indexing only after cooldown
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -435,14 +445,18 @@ export class FullTextIndexPipeline implements FullTextPipeline {
|
||||
})
|
||||
|
||||
let result: DocIndexState[] | undefined = await ctx.with('get-indexable', {}, async () => {
|
||||
const q: DocumentQuery<DocIndexState> = {
|
||||
return await this.storage.findAll(
|
||||
ctx,
|
||||
core.class.DocIndexState,
|
||||
{
|
||||
needIndex: true
|
||||
}
|
||||
return await this.storage.findAll(ctx, core.class.DocIndexState, q, {
|
||||
},
|
||||
{
|
||||
limit: globalIndexer.processingSize,
|
||||
skipClass: true,
|
||||
skipSpace: true
|
||||
})
|
||||
}
|
||||
)
|
||||
})
|
||||
if (result === undefined) {
|
||||
// No more results
|
||||
|
@ -114,7 +114,7 @@ export class FullSummaryStage implements FullTextPipelineStage {
|
||||
const childDocs = allChildDocs.filter((it) => it.attachedTo === doc._id)
|
||||
if (childDocs.length > 0) {
|
||||
for (const c of childDocs) {
|
||||
const ctx = getFullTextContext(pipeline.hierarchy, c.objectClass)
|
||||
const ctx = getFullTextContext(pipeline.hierarchy, c.objectClass, pipeline.contexts)
|
||||
if (ctx.parentPropagate ?? true) {
|
||||
if (embeddingText.length > this.summaryLimit) {
|
||||
break
|
||||
@ -137,15 +137,35 @@ export class FullSummaryStage implements FullTextPipelineStage {
|
||||
metrics,
|
||||
core.class.DocIndexState,
|
||||
{ _id: doc.attachedTo as Ref<DocIndexState> },
|
||||
{ limit: 1 }
|
||||
{
|
||||
limit: 1,
|
||||
skipSpace: true,
|
||||
skipClass: true
|
||||
}
|
||||
)
|
||||
if (parentDoc !== undefined) {
|
||||
const ctx = collectPropagateClasses(pipeline, parentDoc.objectClass)
|
||||
if (ctx.length > 0) {
|
||||
const collections = await this.dbStorageFindAll(metrics, core.class.DocIndexState, {
|
||||
let last = 0
|
||||
while (true) {
|
||||
const collections = await this.dbStorageFindAll(
|
||||
metrics,
|
||||
core.class.DocIndexState,
|
||||
{
|
||||
attachedTo: parentDoc._id,
|
||||
objectClass: ctx.length === 1 ? ctx[0] : { $in: ctx }
|
||||
})
|
||||
objectClass: ctx.length === 1 ? ctx[0] : { $in: ctx },
|
||||
modifiedOn: { $gt: last }
|
||||
},
|
||||
{
|
||||
limit: 250,
|
||||
skipClass: true,
|
||||
skipSpace: true
|
||||
}
|
||||
)
|
||||
if (collections.length === 0) {
|
||||
break
|
||||
}
|
||||
last = collections[collections.length - 1].modifiedOn
|
||||
for (const c of collections) {
|
||||
embeddingText +=
|
||||
'\n' +
|
||||
@ -155,6 +175,7 @@ export class FullSummaryStage implements FullTextPipelineStage {
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (embeddingText.length > this.summaryLimit) {
|
||||
break
|
||||
@ -188,7 +209,7 @@ export class FullSummaryStage implements FullTextPipelineStage {
|
||||
* @public
|
||||
*/
|
||||
export function isIndexingRequired (pipeline: FullTextPipeline, doc: DocIndexState): boolean {
|
||||
return getFullTextContext(pipeline.hierarchy, doc.objectClass).fullTextSummary ?? false
|
||||
return getFullTextContext(pipeline.hierarchy, doc.objectClass, pipeline.contexts).fullTextSummary ?? false
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -19,6 +19,7 @@ import {
|
||||
type DocIndexState,
|
||||
type DocumentQuery,
|
||||
type DocumentUpdate,
|
||||
type FullTextSearchContext,
|
||||
type Hierarchy,
|
||||
type MeasureContext,
|
||||
type ModelDb,
|
||||
@ -32,6 +33,12 @@ import type { DbAdapter, IndexedDoc } from '@hcengineering/server-core'
|
||||
export interface FullTextPipeline {
|
||||
hierarchy: Hierarchy
|
||||
model: ModelDb
|
||||
|
||||
contexts: Map<Ref<Class<Doc>>, FullTextSearchContext>
|
||||
|
||||
propogage: Map<Ref<Class<Doc>>, Ref<Class<Doc>>[]>
|
||||
propogageClasses: Map<Ref<Class<Doc>>, Ref<Class<Doc>>[]>
|
||||
|
||||
update: (
|
||||
docId: Ref<DocIndexState>,
|
||||
mark: boolean,
|
||||
@ -49,6 +56,11 @@ export interface FullTextPipeline {
|
||||
from?: number
|
||||
) => Promise<{ docs: IndexedDoc[], pass: boolean }>
|
||||
|
||||
queue: (
|
||||
ctx: MeasureContext,
|
||||
updates: Map<Ref<DocIndexState>, { create?: DocIndexState, updated: boolean, removed: boolean }>
|
||||
) => Promise<void>
|
||||
|
||||
cancelling: boolean
|
||||
}
|
||||
|
||||
|
@ -82,58 +82,55 @@ export function traverseFullTextContexts (
|
||||
pipeline: FullTextPipeline,
|
||||
objectClass: Ref<Class<Doc>>,
|
||||
op: (ftc: Omit<FullTextSearchContext, keyof Class<Doc>>) => void
|
||||
): Ref<Class<Doc>>[] {
|
||||
const desc = new Set(pipeline.hierarchy.getDescendants(objectClass))
|
||||
const propagate = new Set<Ref<Class<Doc>>>()
|
||||
|
||||
const ftContext = getFullTextContext(pipeline.hierarchy, objectClass)
|
||||
): void {
|
||||
const cl = pipeline.hierarchy.getBaseClass(objectClass)
|
||||
const ftContext = getFullTextContext(pipeline.hierarchy, cl, pipeline.contexts)
|
||||
if (ftContext !== undefined) {
|
||||
op(ftContext)
|
||||
}
|
||||
|
||||
// Add all parent mixins as well
|
||||
for (const a of pipeline.hierarchy.getAncestors(objectClass)) {
|
||||
const ftContext = getFullTextContext(pipeline.hierarchy, a)
|
||||
if (ftContext !== undefined) {
|
||||
op(ftContext)
|
||||
}
|
||||
const dsca = pipeline.hierarchy.getDescendants(a)
|
||||
const dsca = pipeline.hierarchy.getDescendants(cl)
|
||||
for (const dd of dsca) {
|
||||
if (pipeline.hierarchy.isMixin(dd)) {
|
||||
desc.add(dd)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (const d of desc) {
|
||||
if (pipeline.hierarchy.isMixin(d)) {
|
||||
const mContext = getFullTextContext(pipeline.hierarchy, d)
|
||||
const mContext = getFullTextContext(pipeline.hierarchy, dd, pipeline.contexts)
|
||||
if (mContext !== undefined) {
|
||||
op(mContext)
|
||||
}
|
||||
}
|
||||
}
|
||||
return Array.from(propagate.values())
|
||||
}
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export function collectPropagate (pipeline: FullTextPipeline, objectClass: Ref<Class<Doc>>): Ref<Class<Doc>>[] {
|
||||
const propagate = new Set<Ref<Class<Doc>>>()
|
||||
traverseFullTextContexts(pipeline, objectClass, (fts) => fts?.propagate?.forEach((it) => propagate.add(it)))
|
||||
let propagate = pipeline.propogage.get(objectClass)
|
||||
if (propagate !== undefined) {
|
||||
return propagate
|
||||
}
|
||||
const set = new Set<Ref<Class<Doc>>>()
|
||||
traverseFullTextContexts(pipeline, objectClass, (fts) => {
|
||||
fts?.propagate?.forEach((it) => {
|
||||
set.add(it)
|
||||
})
|
||||
})
|
||||
|
||||
return Array.from(propagate.values())
|
||||
propagate = Array.from(set.values())
|
||||
pipeline.propogage.set(objectClass, propagate)
|
||||
return propagate
|
||||
}
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export function collectPropagateClasses (pipeline: FullTextPipeline, objectClass: Ref<Class<Doc>>): Ref<Class<Doc>>[] {
|
||||
const propagate = new Set<Ref<Class<Doc>>>()
|
||||
traverseFullTextContexts(pipeline, objectClass, (fts) => fts?.propagateClasses?.forEach((it) => propagate.add(it)))
|
||||
let propagate = pipeline.propogageClasses.get(objectClass)
|
||||
if (propagate !== undefined) {
|
||||
return propagate
|
||||
}
|
||||
const set = new Set<Ref<Class<Doc>>>()
|
||||
traverseFullTextContexts(pipeline, objectClass, (fts) => fts?.propagateClasses?.forEach((it) => set.add(it)))
|
||||
|
||||
return Array.from(propagate.values())
|
||||
propagate = Array.from(set.values())
|
||||
pipeline.propogageClasses.set(objectClass, propagate)
|
||||
return propagate
|
||||
}
|
||||
|
||||
const CUSTOM_ATTR_KEY = 'customAttributes'
|
||||
|
Loading…
Reference in New Issue
Block a user