From bdf6f3e80d0f6ade8701ebe4e62427b155180a0e Mon Sep 17 00:00:00 2001 From: Vyacheslav Tumanov Date: Tue, 20 Feb 2024 20:17:11 +0500 Subject: [PATCH] Indexer fix (#4729) Signed-off-by: Vyacheslav Tumanov --- dev/tool/src/index.ts | 4 +- packages/core/src/utils.ts | 14 +- packages/kanban/src/components/Kanban.svelte | 4 +- .../kanban/src/components/KanbanRow.svelte | 4 +- .../src/components/list/List.svelte | 4 +- .../src/components/list/ListCategories.svelte | 4 +- .../src/components/list/ListCategory.svelte | 4 +- server-plugins/openai/src/openai.ts | 8 +- server-plugins/openai/src/types.ts | 4 +- server/core/src/indexer/indexer.ts | 234 +++++++++--------- server/core/src/limitter.ts | 2 +- 11 files changed, 143 insertions(+), 143 deletions(-) diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index eb13b75212..cac9b21f19 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -48,7 +48,7 @@ import { type Db, MongoClient } from 'mongodb' import { clearTelegramHistory } from './telegram' import { diffWorkspace, updateField } from './workspace' -import { type Data, getWorkspaceId, RateLimitter, type Tx, type Version } from '@hcengineering/core' +import { type Data, getWorkspaceId, RateLimiter, type Tx, type Version } from '@hcengineering/core' import { type MinioService } from '@hcengineering/minio' import { consoleModelLogger, type MigrateOperation } from '@hcengineering/model' import { openAIConfigDefaults } from '@hcengineering/openai' @@ -306,7 +306,7 @@ export function devTool ( } if (cmd.parallel !== '0') { const parallel = parseInt(cmd.parallel) ?? 1 - const rateLimit = new RateLimitter(() => ({ rate: parallel })) + const rateLimit = new RateLimiter(parallel) console.log('parallel upgrade', parallel, cmd.parallel) for (const ws of workspaces) { await rateLimit.exec(() => { diff --git a/packages/core/src/utils.ts b/packages/core/src/utils.ts index c7800f3640..ec9cb5e98f 100644 --- a/packages/core/src/utils.ts +++ b/packages/core/src/utils.ts @@ -305,20 +305,22 @@ export class DocManager { * @public */ -export class RateLimitter { +export class RateLimiter { idCounter: number = 0 processingQueue = new Map>() last: number = 0 + rate: number queue: (() => Promise)[] = [] - constructor (readonly config: () => { rate: number, perSecond?: number }) {} + constructor (rate: number) { + this.rate = rate + } async exec = any>(op: (args?: B) => Promise, args?: B): Promise { const processingId = `${this.idCounter++}` - const cfg = this.config() - while (this.processingQueue.size > cfg.rate) { + while (this.processingQueue.size > this.rate) { await Promise.race(this.processingQueue.values()) } try { @@ -331,9 +333,7 @@ export class RateLimitter { } async add = any>(op: (args?: B) => Promise, args?: B): Promise { - const cfg = this.config() - - if (this.processingQueue.size < cfg.rate) { + if (this.processingQueue.size < this.rate) { void this.exec(op, args) } else { await this.exec(op, args) diff --git a/packages/kanban/src/components/Kanban.svelte b/packages/kanban/src/components/Kanban.svelte index 6a19f658c1..635622f47d 100644 --- a/packages/kanban/src/components/Kanban.svelte +++ b/packages/kanban/src/components/Kanban.svelte @@ -20,7 +20,7 @@ DocumentQuery, DocumentUpdate, FindOptions, - RateLimitter, + RateLimiter, Ref, Space } from '@hcengineering/core' @@ -57,7 +57,7 @@ const dispatch = createEventDispatcher() - const limiter = new RateLimitter(() => ({ rate: 10 })) + const limiter = new RateLimiter(10) async function move (state: CategoryType): Promise { if (dragCard === undefined) { diff --git a/packages/kanban/src/components/KanbanRow.svelte b/packages/kanban/src/components/KanbanRow.svelte index 70d0febf5d..3dfabe60a4 100644 --- a/packages/kanban/src/components/KanbanRow.svelte +++ b/packages/kanban/src/components/KanbanRow.svelte @@ -20,7 +20,7 @@ DocumentQuery, FindOptions, IdMap, - RateLimitter, + RateLimiter, Ref, Space, toIdMap @@ -44,7 +44,7 @@ export let query: DocumentQuery = {} export let options: FindOptions | undefined = undefined export let groupByKey: any - export let limiter: RateLimitter + export let limiter: RateLimiter export let cardDragOver: (evt: CardDragEvent, object: Item) => void export let cardDrop: (evt: CardDragEvent, object: Item) => void diff --git a/plugins/view-resources/src/components/list/List.svelte b/plugins/view-resources/src/components/list/List.svelte index 864341c12c..a439d817dc 100644 --- a/plugins/view-resources/src/components/list/List.svelte +++ b/plugins/view-resources/src/components/list/List.svelte @@ -20,7 +20,7 @@ FindOptions, Ref, Space, - RateLimitter, + RateLimiter, mergeQueries } from '@hcengineering/core' import { IntlString, getResource } from '@hcengineering/platform' @@ -52,7 +52,7 @@ export let compactMode: boolean = false export let listProvider: SelectionFocusProvider - const limiter = new RateLimitter(() => ({ rate: 10 })) + const limiter = new RateLimiter(10) let docs: Doc[] = [] let fastDocs: Doc[] = [] diff --git a/plugins/view-resources/src/components/list/ListCategories.svelte b/plugins/view-resources/src/components/list/ListCategories.svelte index ab5a71ae64..f82971916b 100644 --- a/plugins/view-resources/src/components/list/ListCategories.svelte +++ b/plugins/view-resources/src/components/list/ListCategories.svelte @@ -21,7 +21,7 @@ FindOptions, generateId, Lookup, - RateLimitter, + RateLimiter, Ref, Space } from '@hcengineering/core' @@ -83,7 +83,7 @@ export let resultQuery: DocumentQuery export let resultOptions: FindOptions - export let limiter: RateLimitter + export let limiter: RateLimiter export let listProvider: SelectionFocusProvider $: groupByKey = viewOptions.groupBy[level] ?? noCategory diff --git a/plugins/view-resources/src/components/list/ListCategory.svelte b/plugins/view-resources/src/components/list/ListCategory.svelte index 6237566d5d..a703994c79 100644 --- a/plugins/view-resources/src/components/list/ListCategory.svelte +++ b/plugins/view-resources/src/components/list/ListCategory.svelte @@ -23,7 +23,7 @@ Hierarchy, Lookup, PrimitiveType, - RateLimitter, + RateLimiter, Ref, Space } from '@hcengineering/core' @@ -85,7 +85,7 @@ export let resultQuery: DocumentQuery export let resultOptions: FindOptions export let parentCategories: number = 0 - export let limiter: RateLimitter + export let limiter: RateLimiter export let listProvider: SelectionFocusProvider $: lastLevel = level + 1 >= viewOptions.groupBy.length diff --git a/server-plugins/openai/src/openai.ts b/server-plugins/openai/src/openai.ts index e760340731..6b111f8141 100644 --- a/server-plugins/openai/src/openai.ts +++ b/server-plugins/openai/src/openai.ts @@ -38,7 +38,7 @@ import { IndexedDoc, isIndexingRequired, loadIndexStageStage, - RateLimitter + RateLimiter } from '@hcengineering/server-core' import got from 'got' @@ -94,7 +94,7 @@ export class OpenAIEmbeddingsStage implements FullTextPipelineStage { stageValue: boolean | string = true - limitter = new RateLimitter(() => ({ rate: this.rate })) + limiter = new RateLimiter(this.rate) indexState?: IndexStageState @@ -280,9 +280,9 @@ export class OpenAIEmbeddingsStage implements FullTextPipelineStage { if (pipeline.cancelling) { return } - await this.limitter.add(() => this.collectDoc(doc, pipeline, metrics)) + await this.limiter.add(() => this.collectDoc(doc, pipeline, metrics)) } - await this.limitter.waitProcessing() + await this.limiter.waitProcessing() } async collectDoc (doc: DocIndexState, pipeline: FullTextPipeline, metrics: MeasureContext): Promise { diff --git a/server-plugins/openai/src/types.ts b/server-plugins/openai/src/types.ts index 4c3f304a19..2db8d0b9a3 100644 --- a/server-plugins/openai/src/types.ts +++ b/server-plugins/openai/src/types.ts @@ -1,5 +1,5 @@ import { Configuration } from '@hcengineering/core' -import { RateLimitter } from '@hcengineering/server-core' +import { RateLimiter } from '@hcengineering/server-core' /** * @public @@ -30,4 +30,4 @@ export const openAIConfigDefaults: { * @public * */ -export const openAIRatelimitter = new RateLimitter(() => ({ rate: 3 })) +export const openAIRatelimitter = new RateLimiter(3) diff --git a/server/core/src/indexer/indexer.ts b/server/core/src/indexer/indexer.ts index 27bfee29f4..69d3119999 100644 --- a/server/core/src/indexer/indexer.ts +++ b/server/core/src/indexer/indexer.ts @@ -37,7 +37,7 @@ import core, { versionToString } from '@hcengineering/core' import { DbAdapter } from '../adapter' -import { RateLimitter } from '../limitter' +import { RateLimiter } from '../limitter' import type { IndexedDoc } from '../types' import { FullTextPipeline, FullTextPipelineStage } from './types' import { createStateDoc, isClassIndexable } from './utils' @@ -57,7 +57,7 @@ export const globalIndexer = { processingSize: 25 } -const rateLimitter = new RateLimitter(() => ({ rate: globalIndexer.allowParallel })) +const rateLimiter = new RateLimiter(globalIndexer.allowParallel) let indexCounter = 0 /** @@ -349,13 +349,11 @@ export class FullTextIndexPipeline implements FullTextPipeline { await this.processRemove() }) - const _classes = await rateLimitter.exec(() => { - return this.metrics.with( - 'processIndex', - { workspace: this.workspace.name }, - async (ctx) => await this.processIndex(ctx) - ) - }) + const _classes = await this.metrics.with( + 'processIndex', + { workspace: this.workspace.name }, + async (ctx) => await this.processIndex(ctx) + ) // Also update doc index state queries. _classes.push(core.class.DocIndexState) @@ -397,126 +395,128 @@ export class FullTextIndexPipeline implements FullTextPipeline { const _classUpdate = new Set>>() for (const st of this.stages) { idx++ - while (true) { - try { - if (this.cancelling) { - return Array.from(_classUpdate.values()) - } - if (!st.enabled) { - break - } - await ctx.with('flush', {}, async () => { - await this.flush(true) - }) - const toSkip = Array.from(this.skipped.entries()) - .filter((it) => it[1] > 3) - .map((it) => it[0]) - - let result = await ctx.with( - 'get-to-index', - {}, - async (ctx) => - await this.storage.findAll( - ctx, - core.class.DocIndexState, - { - [`stages.${st.stageId}`]: { $ne: st.stageValue }, - _id: { $nin: toSkip }, - removed: false - }, - { - limit: globalIndexer.processingSize - } - ) - ) - const toRemove: DocIndexState[] = [] - // Check and remove missing class documents. - result = toFindResult( - result.filter((doc) => { - const _class = this.model.findObject(doc.objectClass) - if (_class === undefined) { - // no _class present, remove doc - toRemove.push(doc) - return false - } - return true - }), - result.total - ) - - if (toRemove.length > 0) { - try { - await this.storage.clean( - DOMAIN_DOC_INDEX_STATE, - toRemove.map((it) => it._id) - ) - } catch (err: any) { - // QuotaExceededError, ignore - } - } - - if (result.length > 0) { - console.log( - this.workspace.name, - `Full text: Indexing ${this.indexId} ${st.stageId}`, - Object.entries(this.currentStages) - .map((it) => `${it[0]}:${it[1]}`) - .join(' ') - ) - } else { - // Nothing to index, check on next cycle. - break - } - - this.toIndex = new Map(result.map((it) => [it._id, it])) - - this.extraIndex.clear() - this.stageChanged = 0 - // Find documents matching query - const toIndex = this.matchStates(st) - - if (toIndex.length > 0) { - // Do Indexing - this.currentStage = st - - await ctx.with('collect-' + st.stageId, {}, async (ctx) => { - await st.collect(toIndex, this, ctx) - }) + await rateLimiter.exec(async () => { + while (true) { + try { if (this.cancelling) { + return Array.from(_classUpdate.values()) + } + if (!st.enabled) { + break + } + await ctx.with('flush', {}, async () => { + await this.flush(true) + }) + const toSkip = Array.from(this.skipped.entries()) + .filter((it) => it[1] > 3) + .map((it) => it[0]) + + let result = await ctx.with( + 'get-to-index', + {}, + async (ctx) => + await this.storage.findAll( + ctx, + core.class.DocIndexState, + { + [`stages.${st.stageId}`]: { $ne: st.stageValue }, + _id: { $nin: toSkip }, + removed: false + }, + { + limit: globalIndexer.processingSize + } + ) + ) + const toRemove: DocIndexState[] = [] + // Check and remove missing class documents. + result = toFindResult( + result.filter((doc) => { + const _class = this.model.findObject(doc.objectClass) + if (_class === undefined) { + // no _class present, remove doc + toRemove.push(doc) + return false + } + return true + }), + result.total + ) + + if (toRemove.length > 0) { + try { + await this.storage.clean( + DOMAIN_DOC_INDEX_STATE, + toRemove.map((it) => it._id) + ) + } catch (err: any) { + // QuotaExceededError, ignore + } + } + + if (result.length > 0) { + console.log( + this.workspace.name, + `Full text: Indexing ${this.indexId} ${st.stageId}`, + Object.entries(this.currentStages) + .map((it) => `${it[0]}:${it[1]}`) + .join(' ') + ) + } else { + // Nothing to index, check on next cycle. break } - toIndex.forEach((it) => _classUpdate.add(it.objectClass)) + this.toIndex = new Map(result.map((it) => [it._id, it])) - // go with next stages if they accept it - for (const nst of this.stages.slice(idx)) { - const toIndex2 = this.matchStates(nst) - if (toIndex2.length > 0) { - this.currentStage = nst - await ctx.with('collect-' + nst.stageId, {}, async (ctx) => { - await nst.collect(toIndex2, this, ctx) - }) - } + this.extraIndex.clear() + this.stageChanged = 0 + // Find documents matching query + const toIndex = this.matchStates(st) + + if (toIndex.length > 0) { + // Do Indexing + this.currentStage = st + + await ctx.with('collect-' + st.stageId, {}, async (ctx) => { + await st.collect(toIndex, this, ctx) + }) if (this.cancelling) { break } - } - } else { - break - } - // Check items with not updated state. - for (const d of toIndex) { - if (d.stages?.[st.stageId] === false) { - this.skipped.set(d._id, (this.skipped.get(d._id) ?? 0) + 1) + toIndex.forEach((it) => _classUpdate.add(it.objectClass)) + + // go with next stages if they accept it + for (const nst of this.stages.slice(idx)) { + const toIndex2 = this.matchStates(nst) + if (toIndex2.length > 0) { + this.currentStage = nst + await ctx.with('collect-' + nst.stageId, {}, async (ctx) => { + await nst.collect(toIndex2, this, ctx) + }) + } + if (this.cancelling) { + break + } + } } else { - this.skipped.delete(d._id) + break } + + // Check items with not updated state. + for (const d of toIndex) { + if (d.stages?.[st.stageId] === false) { + this.skipped.set(d._id, (this.skipped.get(d._id) ?? 0) + 1) + } else { + this.skipped.delete(d._id) + } + } + } catch (err: any) { + console.error(err) } - } catch (err: any) { - console.error(err) } - } + }) } return Array.from(_classUpdate.values()) } @@ -594,7 +594,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { // TODO: Move to migration async checkIndexConsistency (dbStorage: ServerStorage): Promise { - await rateLimitter.exec(async () => { + await rateLimiter.exec(async () => { await this.metrics.with('check-index-consistency', {}, async (ctx) => { if (process.env.MODEL_VERSION !== undefined && process.env.MODEL_VERSION !== '') { const modelVersion = (await this.model.findAll(core.class.Version, {}))[0] diff --git a/server/core/src/limitter.ts b/server/core/src/limitter.ts index 56b7526338..a2c3405122 100644 --- a/server/core/src/limitter.ts +++ b/server/core/src/limitter.ts @@ -1 +1 @@ -export { RateLimitter } from '@hcengineering/core' +export { RateLimiter } from '@hcengineering/core'