mirror of
https://github.com/hcengineering/platform.git
synced 2024-11-22 21:50:34 +03:00
indexing-related fixes (#4360)
This commit is contained in:
parent
960c7d9963
commit
7ba9e53639
@ -142,6 +142,7 @@ services:
|
||||
environment:
|
||||
- SERVER_PORT=3333
|
||||
- SERVER_SECRET=secret
|
||||
- SERVER_CURSOR_MAXTIMEMS=30000
|
||||
- ELASTIC_URL=http://elastic:9200
|
||||
- MONGO_URL=mongodb://mongodb:27017
|
||||
- METRICS_CONSOLE=false
|
||||
|
@ -501,3 +501,21 @@ function getInNiN (query1: any, query2: any): any {
|
||||
}
|
||||
return {}
|
||||
}
|
||||
|
||||
export function cutObjectArray (obj: any): any {
|
||||
const r = {}
|
||||
for (const key of Object.keys(obj)) {
|
||||
if (Array.isArray(obj[key])) {
|
||||
if (obj[key].length > 3) {
|
||||
Object.assign(r, { [key]: `[${obj[key].slice(0, 3)}, ... and ${obj[key].length - 3} more]` })
|
||||
} else Object.assign(r, { [key]: obj[key] })
|
||||
continue
|
||||
}
|
||||
if (typeof obj[key] === 'object') {
|
||||
Object.assign(r, { [key]: cutObjectArray(obj[key]) })
|
||||
continue
|
||||
}
|
||||
Object.assign(r, { [key]: obj[key] })
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
@ -83,7 +83,9 @@ if (frontUrl === undefined) {
|
||||
}
|
||||
|
||||
const sesUrl = process.env.SES_URL
|
||||
const cursorMaxTime = process.env.SERVER_CURSOR_MAXTIMEMS
|
||||
|
||||
setMetadata(serverCore.metadata.CursorMaxTimeMS, cursorMaxTime)
|
||||
setMetadata(serverCore.metadata.FrontUrl, frontUrl)
|
||||
setMetadata(serverToken.metadata.Secret, serverSecret)
|
||||
setMetadata(serverNotification.metadata.SesUrl, sesUrl ?? '')
|
||||
|
@ -43,6 +43,7 @@ export interface DbAdapter {
|
||||
init: (model: Tx[]) => Promise<void>
|
||||
|
||||
createIndexes: (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>) => Promise<void>
|
||||
removeOldIndex: (domain: Domain, deletePattern: RegExp, keepPattern: RegExp) => Promise<void>
|
||||
|
||||
close: () => Promise<void>
|
||||
findAll: <T extends Doc>(
|
||||
@ -102,6 +103,7 @@ export class DummyDbAdapter implements DbAdapter {
|
||||
}
|
||||
|
||||
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {}
|
||||
async removeOldIndex (domain: Domain, deletePattern: RegExp, keepPattern: RegExp): Promise<void> {}
|
||||
|
||||
async tx (...tx: Tx[]): Promise<TxResult> {
|
||||
return {}
|
||||
|
@ -310,7 +310,12 @@ function updateDoc2Elastic (
|
||||
const docIdAttr = docKey(attr, docKeyOpts)
|
||||
if (vv !== null) {
|
||||
// Since we replace array of values, we could ignore null
|
||||
doc[docIdAttr] = typeof doc[docIdAttr] === 'string' ? [doc[docIdAttr]] : [...(doc[docIdAttr] ?? [])]
|
||||
doc[docIdAttr] =
|
||||
doc[docIdAttr] == null
|
||||
? []
|
||||
: typeof doc[docIdAttr] === 'string' || !Array.isArray(doc[docIdAttr])
|
||||
? [doc[docIdAttr]]
|
||||
: doc[docIdAttr]
|
||||
if (vv !== '') {
|
||||
if (typeof vv !== 'object') {
|
||||
doc[docIdAttr] = Array.from(new Set([...doc[docIdAttr], vv]))
|
||||
|
@ -292,7 +292,12 @@ export class FullTextIndexPipeline implements FullTextPipeline {
|
||||
if (!this.indexesCreated) {
|
||||
this.indexesCreated = true
|
||||
// We need to be sure we have individual indexes per stage.
|
||||
const oldStagesRegex = [/fld-v.*/, /cnt-v.*/, /fts-v.*/, /sum-v.*/]
|
||||
for (const st of this.stages) {
|
||||
const regexp = oldStagesRegex.find((r) => r.test(st.stageId))
|
||||
if (regexp !== undefined) {
|
||||
await this.storage.removeOldIndex(DOMAIN_DOC_INDEX_STATE, regexp, new RegExp(st.stageId))
|
||||
}
|
||||
await this.storage.createIndexes(DOMAIN_DOC_INDEX_STATE, {
|
||||
indexes: [
|
||||
{
|
||||
|
@ -107,4 +107,4 @@ export const fieldStateId = 'fld-v11'
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export const fullTextPushStageId = 'fts-v8'
|
||||
export const fullTextPushStageId = 'fts-v9'
|
||||
|
@ -40,7 +40,8 @@ const serverCore = plugin(serverCoreId, {
|
||||
TriggerState: '' as Ref<Space>
|
||||
},
|
||||
metadata: {
|
||||
FrontUrl: '' as Metadata<string>
|
||||
FrontUrl: '' as Metadata<string>,
|
||||
CursorMaxTimeMS: '' as Metadata<string>
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -59,6 +59,7 @@ class ElasticDataAdapter implements DbAdapter {
|
||||
async init (model: Tx[]): Promise<void> {}
|
||||
|
||||
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {}
|
||||
async removeOldIndex (domain: Domain, deletePattern: RegExp, keepPattern: RegExp): Promise<void> {}
|
||||
|
||||
async close (): Promise<void> {
|
||||
await this.client.close()
|
||||
|
@ -57,6 +57,7 @@ import core, {
|
||||
getTypeOf
|
||||
} from '@hcengineering/core'
|
||||
import type { DbAdapter, TxAdapter } from '@hcengineering/server-core'
|
||||
import serverCore from '@hcengineering/server-core'
|
||||
import {
|
||||
type AnyBulkWriteOperation,
|
||||
type Collection,
|
||||
@ -69,6 +70,8 @@ import {
|
||||
} from 'mongodb'
|
||||
import { createHash } from 'node:crypto'
|
||||
import { getMongoClient, getWorkspaceDB } from './utils'
|
||||
import { cutObjectArray } from '@hcengineering/core'
|
||||
import { getMetadata } from '@hcengineering/platform'
|
||||
|
||||
function translateDoc (doc: Doc): Document {
|
||||
return { ...doc, '%hash%': null }
|
||||
@ -118,6 +121,21 @@ abstract class MongoAdapterBase implements DbAdapter {
|
||||
}
|
||||
}
|
||||
|
||||
async removeOldIndex (domain: Domain, deletePattern: RegExp, keepPattern: RegExp): Promise<void> {
|
||||
try {
|
||||
const existingIndexes = await this.db.collection(domain).indexes()
|
||||
for (const existingIndex of existingIndexes) {
|
||||
const name: string = existingIndex.name
|
||||
if (deletePattern.test(name) && !keepPattern.test(name)) {
|
||||
console.log('removing old index', name, keepPattern)
|
||||
await this.db.collection(domain).dropIndex(existingIndex.name)
|
||||
}
|
||||
}
|
||||
} catch (err: any) {
|
||||
console.error(err)
|
||||
}
|
||||
}
|
||||
|
||||
async tx (...tx: Tx[]): Promise<TxResult> {
|
||||
return {}
|
||||
}
|
||||
@ -433,8 +451,13 @@ abstract class MongoAdapterBase implements DbAdapter {
|
||||
checkKeys: false,
|
||||
enableUtf8Validation: false
|
||||
})
|
||||
cursor.maxTimeMS(30000)
|
||||
const res = (await cursor.toArray())[0]
|
||||
cursor.maxTimeMS(parseInt(getMetadata(serverCore.metadata.CursorMaxTimeMS) ?? '30000'))
|
||||
let res: Document = []
|
||||
try {
|
||||
res = (await cursor.toArray())[0]
|
||||
} catch (e) {
|
||||
console.error('error during executing cursor in findWithPipeline', clazz, cutObjectArray(query), options, e)
|
||||
}
|
||||
const result = res.results as WithLookup<T>[]
|
||||
const total = options?.total === true ? res.totalCount?.shift()?.count ?? 0 : -1
|
||||
for (const row of result) {
|
||||
@ -572,10 +595,14 @@ abstract class MongoAdapterBase implements DbAdapter {
|
||||
}
|
||||
|
||||
// Error in case of timeout
|
||||
cursor.maxTimeMS(30000)
|
||||
cursor.maxTimeMS(parseInt(getMetadata(serverCore.metadata.CursorMaxTimeMS) ?? '30000'))
|
||||
cursor.maxAwaitTimeMS(30000)
|
||||
|
||||
const res = await cursor.toArray()
|
||||
let res: T[] = []
|
||||
try {
|
||||
res = await cursor.toArray()
|
||||
} catch (e) {
|
||||
console.error('error during executing cursor in findAll', _class, cutObjectArray(query), options, e)
|
||||
}
|
||||
if (options?.total === true && options?.limit === undefined) {
|
||||
total = res.length
|
||||
}
|
||||
|
@ -56,6 +56,7 @@ class MinioBlobAdapter implements DbAdapter {
|
||||
async init (model: Tx[]): Promise<void> {}
|
||||
|
||||
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {}
|
||||
async removeOldIndex (domain: Domain, deletePattern: RegExp, keepPattern: RegExp): Promise<void> {}
|
||||
|
||||
async close (): Promise<void> {}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user