From 693f3741f3caacd5dbc1fbb726b687f92132ef2a Mon Sep 17 00:00:00 2001 From: Vyacheslav Tumanov Date: Sat, 30 Mar 2024 14:33:43 +0500 Subject: [PATCH] Single index (#5098) Signed-off-by: Vyacheslav Tumanov --- dev/docker-compose.yaml | 1 + dev/tool/src/index.ts | 1 + pods/server/src/__start.ts | 7 + server/core/src/indexer/fulltextPush.ts | 35 ++++- server/core/src/indexer/types.ts | 4 +- server/core/src/plugin.ts | 3 +- server/core/src/types.ts | 1 + server/elastic/src/adapter.ts | 171 ++++++++++++++++++------ server/elastic/src/backup.ts | 79 ++++++++--- tests/docker-compose.yaml | 1 + 10 files changed, 234 insertions(+), 69 deletions(-) diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index 8663e53bd4..ddc32ba0fe 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -159,6 +159,7 @@ services: - SERVER_PROVIDER=ws - ACCOUNTS_URL=http://account:3000 - LAST_NAME_FIRST=true + - ELASTIC_INDEX_NAME=local_storage_index restart: unless-stopped rekoni: image: hardcoreeng/rekoni-service diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 5cbc0fe191..6164654b5c 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -482,6 +482,7 @@ export function devTool ( program .command('backup ') .description('dump workspace transactions and minio resources') + .requiredOption('-i --index ', 'Index name for elastic') .option('-s, --skip ', 'A list of ; separated domain names to skip during backup', '') .option('-f, --force', 'Force backup', false) .action(async (dirName: string, workspace: string, cmd: { skip: string, force: boolean }) => { diff --git a/pods/server/src/__start.ts b/pods/server/src/__start.ts index 5d7b82c321..14e2f103ba 100644 --- a/pods/server/src/__start.ts +++ b/pods/server/src/__start.ts @@ -89,6 +89,12 @@ if (accountsUrl === undefined) { process.exit(1) } +const elasticIndexName = process.env.ELASTIC_INDEX_NAME +if (elasticIndexName === undefined) { + console.log('Please provide ELASTIC_INDEX_NAME') + process.exit(1) +} + const sesUrl = process.env.SES_URL const cursorMaxTime = process.env.SERVER_CURSOR_MAXTIMEMS @@ -98,6 +104,7 @@ setMetadata(serverCore.metadata.FrontUrl, frontUrl) setMetadata(serverToken.metadata.Secret, serverSecret) setMetadata(serverNotification.metadata.SesUrl, sesUrl ?? '') setMetadata(contactPlugin.metadata.LastNameFirst, lastNameFirst) +setMetadata(serverCore.metadata.ElasticIndexName, elasticIndexName) // eslint-disable-next-line @typescript-eslint/no-floating-promises console.log( diff --git a/server/core/src/indexer/fulltextPush.ts b/server/core/src/indexer/fulltextPush.ts index 5e32b4bc90..a8513787f2 100644 --- a/server/core/src/indexer/fulltextPush.ts +++ b/server/core/src/indexer/fulltextPush.ts @@ -22,6 +22,7 @@ import core, { type DocumentQuery, type DocumentUpdate, extractDocKey, + type Hierarchy, isFullTextAttribute, isIndexedAttribute, type MeasureContext, @@ -119,7 +120,7 @@ export class FullTextPushStage implements FullTextPipelineStage { if ( attrObj !== null && attrObj !== undefined && - (isFullTextAttribute(attrObj) || isIndexedAttribute(attrObj)) && + isIndexedAttribute(attrObj) && (attrObj.type._class === core.class.RefTo || (attrObj.type._class === core.class.ArrOf && (attrObj.type as ArrOf).of._class === core.class.RefTo)) ) { @@ -183,7 +184,7 @@ export class FullTextPushStage implements FullTextPipelineStage { const elasticDoc = createElasticDoc(doc) try { await ctx.with('updateDoc2Elastic', {}, async () => { - updateDoc2Elastic(doc.attributes, elasticDoc) + updateDoc2Elastic(doc.attributes, elasticDoc, undefined, undefined, pipeline.hierarchy) }) // Include all child attributes @@ -193,7 +194,7 @@ export class FullTextPushStage implements FullTextPipelineStage { const fctx = getFullTextContext(pipeline.hierarchy, c.objectClass) if (fctx.parentPropagate ?? true) { await ctx.with('updateDoc2Elastic', {}, async () => { - updateDoc2Elastic(c.attributes, elasticDoc, c._id) + updateDoc2Elastic(c.attributes, elasticDoc, c._id, undefined, pipeline.hierarchy, true) }) } } @@ -214,7 +215,7 @@ export class FullTextPushStage implements FullTextPipelineStage { if (parentDoc !== undefined) { const ppdoc = parentDoc await ctx.with('updateDoc2Elastic', {}, async () => { - updateDoc2Elastic(ppdoc.attributes, elasticDoc, ppdoc._id) + updateDoc2Elastic(ppdoc.attributes, elasticDoc, ppdoc._id, undefined, pipeline.hierarchy, true) }) const collectClasses = collectPropagateClasses(pipeline, parentDoc.objectClass) @@ -226,7 +227,7 @@ export class FullTextPushStage implements FullTextPipelineStage { ) for (const c of collections) { await ctx.with('updateDoc2Elastic', {}, async () => { - updateDoc2Elastic(c.attributes, elasticDoc, c._id) + updateDoc2Elastic(c.attributes, elasticDoc, c._id, undefined, pipeline.hierarchy, true) }) } } @@ -303,7 +304,9 @@ function updateDoc2Elastic ( attributes: Record, doc: IndexedDoc, docIdOverride?: Ref, - refAttribute?: string + refAttribute?: string, + hierarchy?: Hierarchy, + isChildOrParentDoc?: boolean ): void { for (const [k, v] of Object.entries(attributes)) { if (v == null) { @@ -315,6 +318,26 @@ function updateDoc2Elastic ( } let vv: any = v + try { + const attribute = hierarchy?.getAttribute(_class ?? doc._class[0], attr) + if (attribute !== undefined && vv != null) { + if ( + isFullTextAttribute(attribute) || + (isChildOrParentDoc === true && + !( + attribute.type._class === core.class.RefTo || + (attribute.type._class === core.class.ArrOf && + (attribute.type as ArrOf).of._class === core.class.RefTo) + )) + ) { + if (!(doc.fulltextSummary ?? '').includes(vv)) { + doc.fulltextSummary = (doc.fulltextSummary ?? '') + vv + '\n' + continue + } + } + } + } catch (e) {} + if (vv != null && extra.includes('base64')) { vv = Buffer.from(v, 'base64').toString() } diff --git a/server/core/src/indexer/types.ts b/server/core/src/indexer/types.ts index f8d2f6de20..7484f39075 100644 --- a/server/core/src/indexer/types.ts +++ b/server/core/src/indexer/types.ts @@ -102,9 +102,9 @@ export const contentStageId = 'cnt-v2b' /** * @public */ -export const fieldStateId = 'fld-v13a' +export const fieldStateId = 'fld-v13b' /** * @public */ -export const fullTextPushStageId = 'fts-v11a' +export const fullTextPushStageId = 'fts-v11b' diff --git a/server/core/src/plugin.ts b/server/core/src/plugin.ts index ceebc82403..4186f2ed85 100644 --- a/server/core/src/plugin.ts +++ b/server/core/src/plugin.ts @@ -41,7 +41,8 @@ const serverCore = plugin(serverCoreId, { }, metadata: { FrontUrl: '' as Metadata, - CursorMaxTimeMS: '' as Metadata + CursorMaxTimeMS: '' as Metadata, + ElasticIndexName: '' as Metadata } }) diff --git a/server/core/src/types.ts b/server/core/src/types.ts index 53a1433731..48d177d77c 100644 --- a/server/core/src/types.ts +++ b/server/core/src/types.ts @@ -192,6 +192,7 @@ export interface IndexedDoc { searchTitle?: string searchShortTitle?: string searchIcon?: any + fulltextSummary?: string [key: string]: any } diff --git a/server/elastic/src/adapter.ts b/server/elastic/src/adapter.ts index 49dda73582..901d448c4d 100644 --- a/server/elastic/src/adapter.ts +++ b/server/elastic/src/adapter.ts @@ -34,12 +34,14 @@ import type { SearchScoring, IndexedDoc } from '@hcengineering/server-core' +import serverCore from '@hcengineering/server-core' import { Client, errors as esErr } from '@elastic/elasticsearch' import { Domain } from 'node:domain' +import { getMetadata } from '@hcengineering/platform' const DEFAULT_LIMIT = 200 - +const indexName = getMetadata(serverCore.metadata.ElasticIndexName) ?? 'storage_index' class ElasticAdapter implements FullTextAdapter { constructor ( private readonly client: Client, @@ -55,35 +57,66 @@ class ElasticAdapter implements FullTextAdapter { // const mappings = current.body[toWorkspaceString(this.workspaceId)] const result: Record = {} try { - const existsIndex = await this.client.indices.exists({ + const existsOldIndex = await this.client.indices.exists({ index: toWorkspaceString(this.workspaceId) }) + if (existsOldIndex.body) { + await this.client.indices.delete({ + index: toWorkspaceString(this.workspaceId) + }) + } + const existsIndex = await this.client.indices.exists({ + index: indexName + }) if (!existsIndex.body) { const createIndex = await this.client.indices.create({ - index: toWorkspaceString(this.workspaceId) + index: indexName, + body: { + settings: { + analysis: { + filter: { + english_stemmer: { + type: 'stemmer', + language: 'english' + }, + english_possessive_stemmer: { + type: 'stemmer', + language: 'possessive_english' + } + }, + analyzer: { + rebuilt_english: { + type: 'custom', + tokenizer: 'standard', + filter: ['english_possessive_stemmer', 'lowercase', 'english_stemmer'] + } + } + } + } + } }) console.log(createIndex) } const mappings = await this.client.indices.getMapping({ - index: toWorkspaceString(this.workspaceId) + index: indexName }) if (field !== undefined) { console.log('Mapping', mappings.body) } - let wsMappings = mappings.body[toWorkspaceString(this.workspaceId)] + let wsMappings = mappings.body.storage_index if (Object.keys(wsMappings?.mappings?.properties ?? {}).some((k) => k.includes('->'))) { await this.client.indices.delete({ - index: toWorkspaceString(this.workspaceId) + index: indexName }) const createIndex = await this.client.indices.create({ - index: toWorkspaceString(this.workspaceId) + index: indexName }) console.log('recreate index', createIndex) const mappings = await this.client.indices.getMapping({ - index: toWorkspaceString(this.workspaceId) + index: indexName }) - wsMappings = mappings.body[toWorkspaceString(this.workspaceId)] + wsMappings = mappings.body.storage_index } // Collect old values. @@ -93,11 +126,22 @@ class ElasticAdapter implements FullTextAdapter { result[k] = va?.dims as number } } + await this.client.indices.putMapping({ + index: indexName, + body: { + properties: { + fulltextSummary: { + type: 'text', + analyzer: 'rebuilt_english' + } + } + } + }) if (field?.key !== undefined) { if (!(wsMappings?.mappings?.properties?.[field.key]?.type === 'dense_vector')) { result[field.key] = field.dims await this.client.indices.putMapping({ - index: toWorkspaceString(this.workspaceId), + index: indexName, allow_no_indices: true, body: { properties: { @@ -134,19 +178,26 @@ class ElasticAdapter implements FullTextAdapter { function_score: { query: { bool: { - must: { - simple_query_string: { - query: query.query, - analyze_wildcard: true, - flags: 'OR|PREFIX|PHRASE|FUZZY|NOT|ESCAPE', - default_operator: 'and', - fields: [ - 'searchTitle^50', // boost - 'searchShortTitle^50', - '*' // Search in all other fields without a boost - ] + must: [ + { + simple_query_string: { + query: query.query, + analyze_wildcard: true, + flags: 'OR|PREFIX|PHRASE|FUZZY|NOT|ESCAPE', + default_operator: 'and', + fields: [ + 'searchTitle^50', // boost + 'searchShortTitle^50', + '*' // Search in all other fields without a boost + ] + } + }, + { + match: { + workspaceId: toWorkspaceString(this.workspaceId) + } } - } + ] } }, functions: [ @@ -209,7 +260,7 @@ class ElasticAdapter implements FullTextAdapter { } const result = await this.client.search({ - index: toWorkspaceString(this.workspaceId), + index: indexName, body: elasticQuery }) @@ -245,6 +296,11 @@ class ElasticAdapter implements FullTextAdapter { flags: 'OR|PREFIX|PHRASE|FUZZY|NOT|ESCAPE', default_operator: 'and' } + }, + { + match: { + workspaceId: toWorkspaceString(this.workspaceId) + } } ], should: [{ terms: this.getTerms(_classes, '_class', { boost: 10.0 }) }], @@ -288,7 +344,7 @@ class ElasticAdapter implements FullTextAdapter { try { const result = await this.client.search({ - index: toWorkspaceString(this.workspaceId), + index: indexName, body: { query: request, size: size ?? 200, @@ -349,6 +405,11 @@ class ElasticAdapter implements FullTextAdapter { } } ], + must: { + match: { + workspaceId: toWorkspaceString(this.workspaceId) + } + }, filter: [ { bool: { @@ -361,7 +422,7 @@ class ElasticAdapter implements FullTextAdapter { try { const result = await this.client.search({ - index: toWorkspaceString(this.workspaceId), + index: indexName, body: { query: request, size: options?.size ?? 200, @@ -382,20 +443,24 @@ class ElasticAdapter implements FullTextAdapter { } async index (doc: IndexedDoc): Promise { + const wsDoc = { + workspaceId: toWorkspaceString(this.workspaceId), + ...doc + } if (doc.data === undefined) { await this.client.index({ - index: toWorkspaceString(this.workspaceId), + index: indexName, id: doc.id, type: '_doc', - body: doc + body: wsDoc }) } else { await this.client.index({ - index: toWorkspaceString(this.workspaceId), + index: indexName, id: doc.id, type: '_doc', pipeline: 'attachment', - body: doc + body: wsDoc }) } return {} @@ -403,7 +468,7 @@ class ElasticAdapter implements FullTextAdapter { async update (id: Ref, update: Record): Promise { await this.client.update({ - index: toWorkspaceString(this.workspaceId), + index: indexName, id, body: { doc: update @@ -418,10 +483,10 @@ class ElasticAdapter implements FullTextAdapter { while (parts.length > 0) { const part = parts.splice(0, 1000) - const operations = part.flatMap((doc) => [ - { index: { _index: toWorkspaceString(this.workspaceId), _id: doc.id } }, - { ...doc, type: '_doc' } - ]) + const operations = part.flatMap((doc) => { + const wsDoc = { workspaceId: toWorkspaceString(this.workspaceId), ...doc } + return [{ index: { _index: indexName, _id: doc.id } }, { ...wsDoc, type: '_doc' }] + }) const response = await this.client.bulk({ refresh: true, body: operations }) if ((response as any).body.errors === true) { @@ -448,12 +513,23 @@ class ElasticAdapter implements FullTextAdapter { await this.client.deleteByQuery( { type: '_doc', - index: toWorkspaceString(this.workspaceId), + index: indexName, body: { query: { - terms: { - _id: part, - boost: 1.0 + bool: { + must: [ + { + terms: { + _id: part, + boost: 1.0 + } + }, + { + match: { + workspaceId: toWorkspaceString(this.workspaceId) + } + } + ] } }, size: part.length @@ -472,13 +548,24 @@ class ElasticAdapter implements FullTextAdapter { async load (docs: Ref[]): Promise { const resp = await this.client.search({ - index: toWorkspaceString(this.workspaceId), + index: indexName, type: '_doc', body: { query: { - terms: { - _id: docs, - boost: 1.0 + bool: { + must: [ + { + terms: { + _id: docs, + boost: 1.0 + } + }, + { + match: { + workspaceId: toWorkspaceString(this.workspaceId) + } + } + ] } }, size: docs.length diff --git a/server/elastic/src/backup.ts b/server/elastic/src/backup.ts index c3695f57f5..cce62a7d06 100644 --- a/server/elastic/src/backup.ts +++ b/server/elastic/src/backup.ts @@ -35,10 +35,11 @@ import core, { WorkspaceId, toWorkspaceString } from '@hcengineering/core' -import { PlatformError, unknownStatus } from '@hcengineering/platform' -import { DbAdapter, IndexedDoc } from '@hcengineering/server-core' +import { getMetadata, PlatformError, unknownStatus } from '@hcengineering/platform' +import serverCore, { DbAdapter, IndexedDoc } from '@hcengineering/server-core' import { createHash } from 'node:crypto' +const indexName = getMetadata(serverCore.metadata.ElasticIndexName) ?? 'storage_index' class ElasticDataAdapter implements DbAdapter { constructor ( readonly workspaceId: WorkspaceId, @@ -81,14 +82,20 @@ class ElasticDataAdapter implements DbAdapter { try { if (!listRecieved) { const q = { - index: toWorkspaceString(this.workspaceId), + index: indexName, type: '_doc', scroll: '23h', // search_type: 'scan', //if I use search_type then it requires size otherwise it shows 0 result size: 100, body: { query: { - match_all: {} + bool: { + must: { + match: { + workspaceId: toWorkspaceString(this.workspaceId) + } + } + } } } } @@ -163,13 +170,24 @@ class ElasticDataAdapter implements DbAdapter { const result: Doc[] = [] const resp = await this.client.search({ - index: toWorkspaceString(this.workspaceId), + index: indexName, type: '_doc', body: { query: { - terms: { - _id: docs, - boost: 1.0 + bool: { + must: [ + { + terms: { + _id: docs, + boost: 1.0 + } + }, + { + match: { + workspaceId: toWorkspaceString(this.workspaceId) + } + } + ] } }, size: docs.length @@ -198,12 +216,23 @@ class ElasticDataAdapter implements DbAdapter { await this.client.deleteByQuery( { type: '_doc', - index: toWorkspaceString(this.workspaceId), + index: indexName, body: { query: { - terms: { - _id: Array.from(part.map((it) => it._id)), - boost: 1.0 + bool: { + must: [ + { + terms: { + _id: Array.from(part.map((it) => it._id)), + boost: 1.0 + } + }, + { + match: { + workspaceId: toWorkspaceString(this.workspaceId) + } + } + ] } }, size: part.length @@ -216,8 +245,11 @@ class ElasticDataAdapter implements DbAdapter { } const operations = part.flatMap((doc) => [ - { index: { _index: toWorkspaceString(this.workspaceId), _id: doc._id } }, - (doc as FullTextData).data + { index: { _index: indexName, _id: doc._id } }, + { + workspaceId: toWorkspaceString(this.workspaceId), + ...(doc as FullTextData).data + } ]) await this.client.bulk({ refresh: true, body: operations }) @@ -234,12 +266,23 @@ class ElasticDataAdapter implements DbAdapter { await this.client.deleteByQuery( { type: '_doc', - index: toWorkspaceString(this.workspaceId), + index: indexName, body: { query: { - terms: { - _id: part, - boost: 1.0 + bool: { + must: [ + { + terms: { + _id: part, + boost: 1.0 + } + }, + { + match: { + workspaceId: toWorkspaceString(this.workspaceId) + } + } + ] } }, size: part.length diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml index 42b9c4ff38..344f8450ac 100644 --- a/tests/docker-compose.yaml +++ b/tests/docker-compose.yaml @@ -103,6 +103,7 @@ services: - FRONT_URL=http://localhost:8083 - ACCOUNTS_URL=http://account:3003 - LAST_NAME_FIRST=true + - ELASTIC_INDEX_NAME=local_storage_index collaborator: image: hardcoreeng/collaborator links: