Indexer fix (#4729)

Signed-off-by: Vyacheslav Tumanov <me@slavatumanov.me>
This commit is contained in:
Vyacheslav Tumanov 2024-02-20 20:17:11 +05:00 committed by GitHub
parent 2955033b68
commit bdf6f3e80d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 143 additions and 143 deletions

View File

@ -48,7 +48,7 @@ import { type Db, MongoClient } from 'mongodb'
import { clearTelegramHistory } from './telegram'
import { diffWorkspace, updateField } from './workspace'
import { type Data, getWorkspaceId, RateLimitter, type Tx, type Version } from '@hcengineering/core'
import { type Data, getWorkspaceId, RateLimiter, type Tx, type Version } from '@hcengineering/core'
import { type MinioService } from '@hcengineering/minio'
import { consoleModelLogger, type MigrateOperation } from '@hcengineering/model'
import { openAIConfigDefaults } from '@hcengineering/openai'
@ -306,7 +306,7 @@ export function devTool (
}
if (cmd.parallel !== '0') {
const parallel = parseInt(cmd.parallel) ?? 1
const rateLimit = new RateLimitter(() => ({ rate: parallel }))
const rateLimit = new RateLimiter(parallel)
console.log('parallel upgrade', parallel, cmd.parallel)
for (const ws of workspaces) {
await rateLimit.exec(() => {

View File

@ -305,20 +305,22 @@ export class DocManager {
* @public
*/
export class RateLimitter {
export class RateLimiter {
idCounter: number = 0
processingQueue = new Map<string, Promise<void>>()
last: number = 0
rate: number
queue: (() => Promise<void>)[] = []
constructor (readonly config: () => { rate: number, perSecond?: number }) {}
constructor (rate: number) {
this.rate = rate
}
async exec<T, B extends Record<string, any> = any>(op: (args?: B) => Promise<T>, args?: B): Promise<T> {
const processingId = `${this.idCounter++}`
const cfg = this.config()
while (this.processingQueue.size > cfg.rate) {
while (this.processingQueue.size > this.rate) {
await Promise.race(this.processingQueue.values())
}
try {
@ -331,9 +333,7 @@ export class RateLimitter {
}
async add<T, B extends Record<string, any> = any>(op: (args?: B) => Promise<T>, args?: B): Promise<void> {
const cfg = this.config()
if (this.processingQueue.size < cfg.rate) {
if (this.processingQueue.size < this.rate) {
void this.exec(op, args)
} else {
await this.exec(op, args)

View File

@ -20,7 +20,7 @@
DocumentQuery,
DocumentUpdate,
FindOptions,
RateLimitter,
RateLimiter,
Ref,
Space
} from '@hcengineering/core'
@ -57,7 +57,7 @@
const dispatch = createEventDispatcher()
const limiter = new RateLimitter(() => ({ rate: 10 }))
const limiter = new RateLimiter(10)
async function move (state: CategoryType): Promise<void> {
if (dragCard === undefined) {

View File

@ -20,7 +20,7 @@
DocumentQuery,
FindOptions,
IdMap,
RateLimitter,
RateLimiter,
Ref,
Space,
toIdMap
@ -44,7 +44,7 @@
export let query: DocumentQuery<DocWithRank> = {}
export let options: FindOptions<DocWithRank> | undefined = undefined
export let groupByKey: any
export let limiter: RateLimitter
export let limiter: RateLimiter
export let cardDragOver: (evt: CardDragEvent, object: Item) => void
export let cardDrop: (evt: CardDragEvent, object: Item) => void

View File

@ -20,7 +20,7 @@
FindOptions,
Ref,
Space,
RateLimitter,
RateLimiter,
mergeQueries
} from '@hcengineering/core'
import { IntlString, getResource } from '@hcengineering/platform'
@ -52,7 +52,7 @@
export let compactMode: boolean = false
export let listProvider: SelectionFocusProvider
const limiter = new RateLimitter(() => ({ rate: 10 }))
const limiter = new RateLimiter(10)
let docs: Doc[] = []
let fastDocs: Doc[] = []

View File

@ -21,7 +21,7 @@
FindOptions,
generateId,
Lookup,
RateLimitter,
RateLimiter,
Ref,
Space
} from '@hcengineering/core'
@ -83,7 +83,7 @@
export let resultQuery: DocumentQuery<Doc>
export let resultOptions: FindOptions<Doc>
export let limiter: RateLimitter
export let limiter: RateLimiter
export let listProvider: SelectionFocusProvider
$: groupByKey = viewOptions.groupBy[level] ?? noCategory

View File

@ -23,7 +23,7 @@
Hierarchy,
Lookup,
PrimitiveType,
RateLimitter,
RateLimiter,
Ref,
Space
} from '@hcengineering/core'
@ -85,7 +85,7 @@
export let resultQuery: DocumentQuery<Doc>
export let resultOptions: FindOptions<Doc>
export let parentCategories: number = 0
export let limiter: RateLimitter
export let limiter: RateLimiter
export let listProvider: SelectionFocusProvider
$: lastLevel = level + 1 >= viewOptions.groupBy.length

View File

@ -38,7 +38,7 @@ import {
IndexedDoc,
isIndexingRequired,
loadIndexStageStage,
RateLimitter
RateLimiter
} from '@hcengineering/server-core'
import got from 'got'
@ -94,7 +94,7 @@ export class OpenAIEmbeddingsStage implements FullTextPipelineStage {
stageValue: boolean | string = true
limitter = new RateLimitter(() => ({ rate: this.rate }))
limiter = new RateLimiter(this.rate)
indexState?: IndexStageState
@ -280,9 +280,9 @@ export class OpenAIEmbeddingsStage implements FullTextPipelineStage {
if (pipeline.cancelling) {
return
}
await this.limitter.add(() => this.collectDoc(doc, pipeline, metrics))
await this.limiter.add(() => this.collectDoc(doc, pipeline, metrics))
}
await this.limitter.waitProcessing()
await this.limiter.waitProcessing()
}
async collectDoc (doc: DocIndexState, pipeline: FullTextPipeline, metrics: MeasureContext): Promise<void> {

View File

@ -1,5 +1,5 @@
import { Configuration } from '@hcengineering/core'
import { RateLimitter } from '@hcengineering/server-core'
import { RateLimiter } from '@hcengineering/server-core'
/**
* @public
@ -30,4 +30,4 @@ export const openAIConfigDefaults: {
* @public
*
*/
export const openAIRatelimitter = new RateLimitter(() => ({ rate: 3 }))
export const openAIRatelimitter = new RateLimiter(3)

View File

@ -37,7 +37,7 @@ import core, {
versionToString
} from '@hcengineering/core'
import { DbAdapter } from '../adapter'
import { RateLimitter } from '../limitter'
import { RateLimiter } from '../limitter'
import type { IndexedDoc } from '../types'
import { FullTextPipeline, FullTextPipelineStage } from './types'
import { createStateDoc, isClassIndexable } from './utils'
@ -57,7 +57,7 @@ export const globalIndexer = {
processingSize: 25
}
const rateLimitter = new RateLimitter(() => ({ rate: globalIndexer.allowParallel }))
const rateLimiter = new RateLimiter(globalIndexer.allowParallel)
let indexCounter = 0
/**
@ -349,13 +349,11 @@ export class FullTextIndexPipeline implements FullTextPipeline {
await this.processRemove()
})
const _classes = await rateLimitter.exec(() => {
return this.metrics.with(
'processIndex',
{ workspace: this.workspace.name },
async (ctx) => await this.processIndex(ctx)
)
})
const _classes = await this.metrics.with(
'processIndex',
{ workspace: this.workspace.name },
async (ctx) => await this.processIndex(ctx)
)
// Also update doc index state queries.
_classes.push(core.class.DocIndexState)
@ -397,126 +395,128 @@ export class FullTextIndexPipeline implements FullTextPipeline {
const _classUpdate = new Set<Ref<Class<Doc>>>()
for (const st of this.stages) {
idx++
while (true) {
try {
if (this.cancelling) {
return Array.from(_classUpdate.values())
}
if (!st.enabled) {
break
}
await ctx.with('flush', {}, async () => {
await this.flush(true)
})
const toSkip = Array.from(this.skipped.entries())
.filter((it) => it[1] > 3)
.map((it) => it[0])
let result = await ctx.with(
'get-to-index',
{},
async (ctx) =>
await this.storage.findAll(
ctx,
core.class.DocIndexState,
{
[`stages.${st.stageId}`]: { $ne: st.stageValue },
_id: { $nin: toSkip },
removed: false
},
{
limit: globalIndexer.processingSize
}
)
)
const toRemove: DocIndexState[] = []
// Check and remove missing class documents.
result = toFindResult(
result.filter((doc) => {
const _class = this.model.findObject(doc.objectClass)
if (_class === undefined) {
// no _class present, remove doc
toRemove.push(doc)
return false
}
return true
}),
result.total
)
if (toRemove.length > 0) {
try {
await this.storage.clean(
DOMAIN_DOC_INDEX_STATE,
toRemove.map((it) => it._id)
)
} catch (err: any) {
// QuotaExceededError, ignore
}
}
if (result.length > 0) {
console.log(
this.workspace.name,
`Full text: Indexing ${this.indexId} ${st.stageId}`,
Object.entries(this.currentStages)
.map((it) => `${it[0]}:${it[1]}`)
.join(' ')
)
} else {
// Nothing to index, check on next cycle.
break
}
this.toIndex = new Map(result.map((it) => [it._id, it]))
this.extraIndex.clear()
this.stageChanged = 0
// Find documents matching query
const toIndex = this.matchStates(st)
if (toIndex.length > 0) {
// Do Indexing
this.currentStage = st
await ctx.with('collect-' + st.stageId, {}, async (ctx) => {
await st.collect(toIndex, this, ctx)
})
await rateLimiter.exec(async () => {
while (true) {
try {
if (this.cancelling) {
return Array.from(_classUpdate.values())
}
if (!st.enabled) {
break
}
await ctx.with('flush', {}, async () => {
await this.flush(true)
})
const toSkip = Array.from(this.skipped.entries())
.filter((it) => it[1] > 3)
.map((it) => it[0])
let result = await ctx.with(
'get-to-index',
{},
async (ctx) =>
await this.storage.findAll(
ctx,
core.class.DocIndexState,
{
[`stages.${st.stageId}`]: { $ne: st.stageValue },
_id: { $nin: toSkip },
removed: false
},
{
limit: globalIndexer.processingSize
}
)
)
const toRemove: DocIndexState[] = []
// Check and remove missing class documents.
result = toFindResult(
result.filter((doc) => {
const _class = this.model.findObject(doc.objectClass)
if (_class === undefined) {
// no _class present, remove doc
toRemove.push(doc)
return false
}
return true
}),
result.total
)
if (toRemove.length > 0) {
try {
await this.storage.clean(
DOMAIN_DOC_INDEX_STATE,
toRemove.map((it) => it._id)
)
} catch (err: any) {
// QuotaExceededError, ignore
}
}
if (result.length > 0) {
console.log(
this.workspace.name,
`Full text: Indexing ${this.indexId} ${st.stageId}`,
Object.entries(this.currentStages)
.map((it) => `${it[0]}:${it[1]}`)
.join(' ')
)
} else {
// Nothing to index, check on next cycle.
break
}
toIndex.forEach((it) => _classUpdate.add(it.objectClass))
this.toIndex = new Map(result.map((it) => [it._id, it]))
// go with next stages if they accept it
for (const nst of this.stages.slice(idx)) {
const toIndex2 = this.matchStates(nst)
if (toIndex2.length > 0) {
this.currentStage = nst
await ctx.with('collect-' + nst.stageId, {}, async (ctx) => {
await nst.collect(toIndex2, this, ctx)
})
}
this.extraIndex.clear()
this.stageChanged = 0
// Find documents matching query
const toIndex = this.matchStates(st)
if (toIndex.length > 0) {
// Do Indexing
this.currentStage = st
await ctx.with('collect-' + st.stageId, {}, async (ctx) => {
await st.collect(toIndex, this, ctx)
})
if (this.cancelling) {
break
}
}
} else {
break
}
// Check items with not updated state.
for (const d of toIndex) {
if (d.stages?.[st.stageId] === false) {
this.skipped.set(d._id, (this.skipped.get(d._id) ?? 0) + 1)
toIndex.forEach((it) => _classUpdate.add(it.objectClass))
// go with next stages if they accept it
for (const nst of this.stages.slice(idx)) {
const toIndex2 = this.matchStates(nst)
if (toIndex2.length > 0) {
this.currentStage = nst
await ctx.with('collect-' + nst.stageId, {}, async (ctx) => {
await nst.collect(toIndex2, this, ctx)
})
}
if (this.cancelling) {
break
}
}
} else {
this.skipped.delete(d._id)
break
}
// Check items with not updated state.
for (const d of toIndex) {
if (d.stages?.[st.stageId] === false) {
this.skipped.set(d._id, (this.skipped.get(d._id) ?? 0) + 1)
} else {
this.skipped.delete(d._id)
}
}
} catch (err: any) {
console.error(err)
}
} catch (err: any) {
console.error(err)
}
}
})
}
return Array.from(_classUpdate.values())
}
@ -594,7 +594,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
// TODO: Move to migration
async checkIndexConsistency (dbStorage: ServerStorage): Promise<void> {
await rateLimitter.exec(async () => {
await rateLimiter.exec(async () => {
await this.metrics.with('check-index-consistency', {}, async (ctx) => {
if (process.env.MODEL_VERSION !== undefined && process.env.MODEL_VERSION !== '') {
const modelVersion = (await this.model.findAll(core.class.Version, {}))[0]

View File

@ -1 +1 @@
export { RateLimitter } from '@hcengineering/core'
export { RateLimiter } from '@hcengineering/core'