Single index (#5098)

Signed-off-by: Vyacheslav Tumanov <me@slavatumanov.me>
This commit is contained in:
Vyacheslav Tumanov 2024-03-30 14:33:43 +05:00 committed by GitHub
parent ea006bf28c
commit 693f3741f3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 234 additions and 69 deletions

View File

@ -159,6 +159,7 @@ services:
- SERVER_PROVIDER=ws - SERVER_PROVIDER=ws
- ACCOUNTS_URL=http://account:3000 - ACCOUNTS_URL=http://account:3000
- LAST_NAME_FIRST=true - LAST_NAME_FIRST=true
- ELASTIC_INDEX_NAME=local_storage_index
restart: unless-stopped restart: unless-stopped
rekoni: rekoni:
image: hardcoreeng/rekoni-service image: hardcoreeng/rekoni-service

View File

@ -482,6 +482,7 @@ export function devTool (
program program
.command('backup <dirName> <workspace>') .command('backup <dirName> <workspace>')
.description('dump workspace transactions and minio resources') .description('dump workspace transactions and minio resources')
.requiredOption('-i --index <index>', 'Index name for elastic')
.option('-s, --skip <skip>', 'A list of ; separated domain names to skip during backup', '') .option('-s, --skip <skip>', 'A list of ; separated domain names to skip during backup', '')
.option('-f, --force', 'Force backup', false) .option('-f, --force', 'Force backup', false)
.action(async (dirName: string, workspace: string, cmd: { skip: string, force: boolean }) => { .action(async (dirName: string, workspace: string, cmd: { skip: string, force: boolean }) => {

View File

@ -89,6 +89,12 @@ if (accountsUrl === undefined) {
process.exit(1) process.exit(1)
} }
const elasticIndexName = process.env.ELASTIC_INDEX_NAME
if (elasticIndexName === undefined) {
console.log('Please provide ELASTIC_INDEX_NAME')
process.exit(1)
}
const sesUrl = process.env.SES_URL const sesUrl = process.env.SES_URL
const cursorMaxTime = process.env.SERVER_CURSOR_MAXTIMEMS const cursorMaxTime = process.env.SERVER_CURSOR_MAXTIMEMS
@ -98,6 +104,7 @@ setMetadata(serverCore.metadata.FrontUrl, frontUrl)
setMetadata(serverToken.metadata.Secret, serverSecret) setMetadata(serverToken.metadata.Secret, serverSecret)
setMetadata(serverNotification.metadata.SesUrl, sesUrl ?? '') setMetadata(serverNotification.metadata.SesUrl, sesUrl ?? '')
setMetadata(contactPlugin.metadata.LastNameFirst, lastNameFirst) setMetadata(contactPlugin.metadata.LastNameFirst, lastNameFirst)
setMetadata(serverCore.metadata.ElasticIndexName, elasticIndexName)
// eslint-disable-next-line @typescript-eslint/no-floating-promises // eslint-disable-next-line @typescript-eslint/no-floating-promises
console.log( console.log(

View File

@ -22,6 +22,7 @@ import core, {
type DocumentQuery, type DocumentQuery,
type DocumentUpdate, type DocumentUpdate,
extractDocKey, extractDocKey,
type Hierarchy,
isFullTextAttribute, isFullTextAttribute,
isIndexedAttribute, isIndexedAttribute,
type MeasureContext, type MeasureContext,
@ -119,7 +120,7 @@ export class FullTextPushStage implements FullTextPipelineStage {
if ( if (
attrObj !== null && attrObj !== null &&
attrObj !== undefined && attrObj !== undefined &&
(isFullTextAttribute(attrObj) || isIndexedAttribute(attrObj)) && isIndexedAttribute(attrObj) &&
(attrObj.type._class === core.class.RefTo || (attrObj.type._class === core.class.RefTo ||
(attrObj.type._class === core.class.ArrOf && (attrObj.type as ArrOf<any>).of._class === core.class.RefTo)) (attrObj.type._class === core.class.ArrOf && (attrObj.type as ArrOf<any>).of._class === core.class.RefTo))
) { ) {
@ -183,7 +184,7 @@ export class FullTextPushStage implements FullTextPipelineStage {
const elasticDoc = createElasticDoc(doc) const elasticDoc = createElasticDoc(doc)
try { try {
await ctx.with('updateDoc2Elastic', {}, async () => { await ctx.with('updateDoc2Elastic', {}, async () => {
updateDoc2Elastic(doc.attributes, elasticDoc) updateDoc2Elastic(doc.attributes, elasticDoc, undefined, undefined, pipeline.hierarchy)
}) })
// Include all child attributes // Include all child attributes
@ -193,7 +194,7 @@ export class FullTextPushStage implements FullTextPipelineStage {
const fctx = getFullTextContext(pipeline.hierarchy, c.objectClass) const fctx = getFullTextContext(pipeline.hierarchy, c.objectClass)
if (fctx.parentPropagate ?? true) { if (fctx.parentPropagate ?? true) {
await ctx.with('updateDoc2Elastic', {}, async () => { await ctx.with('updateDoc2Elastic', {}, async () => {
updateDoc2Elastic(c.attributes, elasticDoc, c._id) updateDoc2Elastic(c.attributes, elasticDoc, c._id, undefined, pipeline.hierarchy, true)
}) })
} }
} }
@ -214,7 +215,7 @@ export class FullTextPushStage implements FullTextPipelineStage {
if (parentDoc !== undefined) { if (parentDoc !== undefined) {
const ppdoc = parentDoc const ppdoc = parentDoc
await ctx.with('updateDoc2Elastic', {}, async () => { await ctx.with('updateDoc2Elastic', {}, async () => {
updateDoc2Elastic(ppdoc.attributes, elasticDoc, ppdoc._id) updateDoc2Elastic(ppdoc.attributes, elasticDoc, ppdoc._id, undefined, pipeline.hierarchy, true)
}) })
const collectClasses = collectPropagateClasses(pipeline, parentDoc.objectClass) const collectClasses = collectPropagateClasses(pipeline, parentDoc.objectClass)
@ -226,7 +227,7 @@ export class FullTextPushStage implements FullTextPipelineStage {
) )
for (const c of collections) { for (const c of collections) {
await ctx.with('updateDoc2Elastic', {}, async () => { await ctx.with('updateDoc2Elastic', {}, async () => {
updateDoc2Elastic(c.attributes, elasticDoc, c._id) updateDoc2Elastic(c.attributes, elasticDoc, c._id, undefined, pipeline.hierarchy, true)
}) })
} }
} }
@ -303,7 +304,9 @@ function updateDoc2Elastic (
attributes: Record<string, any>, attributes: Record<string, any>,
doc: IndexedDoc, doc: IndexedDoc,
docIdOverride?: Ref<DocIndexState>, docIdOverride?: Ref<DocIndexState>,
refAttribute?: string refAttribute?: string,
hierarchy?: Hierarchy,
isChildOrParentDoc?: boolean
): void { ): void {
for (const [k, v] of Object.entries(attributes)) { for (const [k, v] of Object.entries(attributes)) {
if (v == null) { if (v == null) {
@ -315,6 +318,26 @@ function updateDoc2Elastic (
} }
let vv: any = v let vv: any = v
try {
const attribute = hierarchy?.getAttribute(_class ?? doc._class[0], attr)
if (attribute !== undefined && vv != null) {
if (
isFullTextAttribute(attribute) ||
(isChildOrParentDoc === true &&
!(
attribute.type._class === core.class.RefTo ||
(attribute.type._class === core.class.ArrOf &&
(attribute.type as ArrOf<any>).of._class === core.class.RefTo)
))
) {
if (!(doc.fulltextSummary ?? '').includes(vv)) {
doc.fulltextSummary = (doc.fulltextSummary ?? '') + vv + '\n'
continue
}
}
}
} catch (e) {}
if (vv != null && extra.includes('base64')) { if (vv != null && extra.includes('base64')) {
vv = Buffer.from(v, 'base64').toString() vv = Buffer.from(v, 'base64').toString()
} }

View File

@ -102,9 +102,9 @@ export const contentStageId = 'cnt-v2b'
/** /**
* @public * @public
*/ */
export const fieldStateId = 'fld-v13a' export const fieldStateId = 'fld-v13b'
/** /**
* @public * @public
*/ */
export const fullTextPushStageId = 'fts-v11a' export const fullTextPushStageId = 'fts-v11b'

View File

@ -41,7 +41,8 @@ const serverCore = plugin(serverCoreId, {
}, },
metadata: { metadata: {
FrontUrl: '' as Metadata<string>, FrontUrl: '' as Metadata<string>,
CursorMaxTimeMS: '' as Metadata<string> CursorMaxTimeMS: '' as Metadata<string>,
ElasticIndexName: '' as Metadata<string>
} }
}) })

View File

@ -192,6 +192,7 @@ export interface IndexedDoc {
searchTitle?: string searchTitle?: string
searchShortTitle?: string searchShortTitle?: string
searchIcon?: any searchIcon?: any
fulltextSummary?: string
[key: string]: any [key: string]: any
} }

View File

@ -34,12 +34,14 @@ import type {
SearchScoring, SearchScoring,
IndexedDoc IndexedDoc
} from '@hcengineering/server-core' } from '@hcengineering/server-core'
import serverCore from '@hcengineering/server-core'
import { Client, errors as esErr } from '@elastic/elasticsearch' import { Client, errors as esErr } from '@elastic/elasticsearch'
import { Domain } from 'node:domain' import { Domain } from 'node:domain'
import { getMetadata } from '@hcengineering/platform'
const DEFAULT_LIMIT = 200 const DEFAULT_LIMIT = 200
const indexName = getMetadata(serverCore.metadata.ElasticIndexName) ?? 'storage_index'
class ElasticAdapter implements FullTextAdapter { class ElasticAdapter implements FullTextAdapter {
constructor ( constructor (
private readonly client: Client, private readonly client: Client,
@ -55,35 +57,66 @@ class ElasticAdapter implements FullTextAdapter {
// const mappings = current.body[toWorkspaceString(this.workspaceId)] // const mappings = current.body[toWorkspaceString(this.workspaceId)]
const result: Record<string, number> = {} const result: Record<string, number> = {}
try { try {
const existsIndex = await this.client.indices.exists({ const existsOldIndex = await this.client.indices.exists({
index: toWorkspaceString(this.workspaceId) index: toWorkspaceString(this.workspaceId)
}) })
if (existsOldIndex.body) {
await this.client.indices.delete({
index: toWorkspaceString(this.workspaceId)
})
}
const existsIndex = await this.client.indices.exists({
index: indexName
})
if (!existsIndex.body) { if (!existsIndex.body) {
const createIndex = await this.client.indices.create({ const createIndex = await this.client.indices.create({
index: toWorkspaceString(this.workspaceId) index: indexName,
body: {
settings: {
analysis: {
filter: {
english_stemmer: {
type: 'stemmer',
language: 'english'
},
english_possessive_stemmer: {
type: 'stemmer',
language: 'possessive_english'
}
},
analyzer: {
rebuilt_english: {
type: 'custom',
tokenizer: 'standard',
filter: ['english_possessive_stemmer', 'lowercase', 'english_stemmer']
}
}
}
}
}
}) })
console.log(createIndex) console.log(createIndex)
} }
const mappings = await this.client.indices.getMapping({ const mappings = await this.client.indices.getMapping({
index: toWorkspaceString(this.workspaceId) index: indexName
}) })
if (field !== undefined) { if (field !== undefined) {
console.log('Mapping', mappings.body) console.log('Mapping', mappings.body)
} }
let wsMappings = mappings.body[toWorkspaceString(this.workspaceId)] let wsMappings = mappings.body.storage_index
if (Object.keys(wsMappings?.mappings?.properties ?? {}).some((k) => k.includes('->'))) { if (Object.keys(wsMappings?.mappings?.properties ?? {}).some((k) => k.includes('->'))) {
await this.client.indices.delete({ await this.client.indices.delete({
index: toWorkspaceString(this.workspaceId) index: indexName
}) })
const createIndex = await this.client.indices.create({ const createIndex = await this.client.indices.create({
index: toWorkspaceString(this.workspaceId) index: indexName
}) })
console.log('recreate index', createIndex) console.log('recreate index', createIndex)
const mappings = await this.client.indices.getMapping({ const mappings = await this.client.indices.getMapping({
index: toWorkspaceString(this.workspaceId) index: indexName
}) })
wsMappings = mappings.body[toWorkspaceString(this.workspaceId)] wsMappings = mappings.body.storage_index
} }
// Collect old values. // Collect old values.
@ -93,11 +126,22 @@ class ElasticAdapter implements FullTextAdapter {
result[k] = va?.dims as number result[k] = va?.dims as number
} }
} }
await this.client.indices.putMapping({
index: indexName,
body: {
properties: {
fulltextSummary: {
type: 'text',
analyzer: 'rebuilt_english'
}
}
}
})
if (field?.key !== undefined) { if (field?.key !== undefined) {
if (!(wsMappings?.mappings?.properties?.[field.key]?.type === 'dense_vector')) { if (!(wsMappings?.mappings?.properties?.[field.key]?.type === 'dense_vector')) {
result[field.key] = field.dims result[field.key] = field.dims
await this.client.indices.putMapping({ await this.client.indices.putMapping({
index: toWorkspaceString(this.workspaceId), index: indexName,
allow_no_indices: true, allow_no_indices: true,
body: { body: {
properties: { properties: {
@ -134,19 +178,26 @@ class ElasticAdapter implements FullTextAdapter {
function_score: { function_score: {
query: { query: {
bool: { bool: {
must: { must: [
simple_query_string: { {
query: query.query, simple_query_string: {
analyze_wildcard: true, query: query.query,
flags: 'OR|PREFIX|PHRASE|FUZZY|NOT|ESCAPE', analyze_wildcard: true,
default_operator: 'and', flags: 'OR|PREFIX|PHRASE|FUZZY|NOT|ESCAPE',
fields: [ default_operator: 'and',
'searchTitle^50', // boost fields: [
'searchShortTitle^50', 'searchTitle^50', // boost
'*' // Search in all other fields without a boost 'searchShortTitle^50',
] '*' // Search in all other fields without a boost
]
}
},
{
match: {
workspaceId: toWorkspaceString(this.workspaceId)
}
} }
} ]
} }
}, },
functions: [ functions: [
@ -209,7 +260,7 @@ class ElasticAdapter implements FullTextAdapter {
} }
const result = await this.client.search({ const result = await this.client.search({
index: toWorkspaceString(this.workspaceId), index: indexName,
body: elasticQuery body: elasticQuery
}) })
@ -245,6 +296,11 @@ class ElasticAdapter implements FullTextAdapter {
flags: 'OR|PREFIX|PHRASE|FUZZY|NOT|ESCAPE', flags: 'OR|PREFIX|PHRASE|FUZZY|NOT|ESCAPE',
default_operator: 'and' default_operator: 'and'
} }
},
{
match: {
workspaceId: toWorkspaceString(this.workspaceId)
}
} }
], ],
should: [{ terms: this.getTerms(_classes, '_class', { boost: 10.0 }) }], should: [{ terms: this.getTerms(_classes, '_class', { boost: 10.0 }) }],
@ -288,7 +344,7 @@ class ElasticAdapter implements FullTextAdapter {
try { try {
const result = await this.client.search({ const result = await this.client.search({
index: toWorkspaceString(this.workspaceId), index: indexName,
body: { body: {
query: request, query: request,
size: size ?? 200, size: size ?? 200,
@ -349,6 +405,11 @@ class ElasticAdapter implements FullTextAdapter {
} }
} }
], ],
must: {
match: {
workspaceId: toWorkspaceString(this.workspaceId)
}
},
filter: [ filter: [
{ {
bool: { bool: {
@ -361,7 +422,7 @@ class ElasticAdapter implements FullTextAdapter {
try { try {
const result = await this.client.search({ const result = await this.client.search({
index: toWorkspaceString(this.workspaceId), index: indexName,
body: { body: {
query: request, query: request,
size: options?.size ?? 200, size: options?.size ?? 200,
@ -382,20 +443,24 @@ class ElasticAdapter implements FullTextAdapter {
} }
async index (doc: IndexedDoc): Promise<TxResult> { async index (doc: IndexedDoc): Promise<TxResult> {
const wsDoc = {
workspaceId: toWorkspaceString(this.workspaceId),
...doc
}
if (doc.data === undefined) { if (doc.data === undefined) {
await this.client.index({ await this.client.index({
index: toWorkspaceString(this.workspaceId), index: indexName,
id: doc.id, id: doc.id,
type: '_doc', type: '_doc',
body: doc body: wsDoc
}) })
} else { } else {
await this.client.index({ await this.client.index({
index: toWorkspaceString(this.workspaceId), index: indexName,
id: doc.id, id: doc.id,
type: '_doc', type: '_doc',
pipeline: 'attachment', pipeline: 'attachment',
body: doc body: wsDoc
}) })
} }
return {} return {}
@ -403,7 +468,7 @@ class ElasticAdapter implements FullTextAdapter {
async update (id: Ref<Doc>, update: Record<string, any>): Promise<TxResult> { async update (id: Ref<Doc>, update: Record<string, any>): Promise<TxResult> {
await this.client.update({ await this.client.update({
index: toWorkspaceString(this.workspaceId), index: indexName,
id, id,
body: { body: {
doc: update doc: update
@ -418,10 +483,10 @@ class ElasticAdapter implements FullTextAdapter {
while (parts.length > 0) { while (parts.length > 0) {
const part = parts.splice(0, 1000) const part = parts.splice(0, 1000)
const operations = part.flatMap((doc) => [ const operations = part.flatMap((doc) => {
{ index: { _index: toWorkspaceString(this.workspaceId), _id: doc.id } }, const wsDoc = { workspaceId: toWorkspaceString(this.workspaceId), ...doc }
{ ...doc, type: '_doc' } return [{ index: { _index: indexName, _id: doc.id } }, { ...wsDoc, type: '_doc' }]
]) })
const response = await this.client.bulk({ refresh: true, body: operations }) const response = await this.client.bulk({ refresh: true, body: operations })
if ((response as any).body.errors === true) { if ((response as any).body.errors === true) {
@ -448,12 +513,23 @@ class ElasticAdapter implements FullTextAdapter {
await this.client.deleteByQuery( await this.client.deleteByQuery(
{ {
type: '_doc', type: '_doc',
index: toWorkspaceString(this.workspaceId), index: indexName,
body: { body: {
query: { query: {
terms: { bool: {
_id: part, must: [
boost: 1.0 {
terms: {
_id: part,
boost: 1.0
}
},
{
match: {
workspaceId: toWorkspaceString(this.workspaceId)
}
}
]
} }
}, },
size: part.length size: part.length
@ -472,13 +548,24 @@ class ElasticAdapter implements FullTextAdapter {
async load (docs: Ref<Doc>[]): Promise<IndexedDoc[]> { async load (docs: Ref<Doc>[]): Promise<IndexedDoc[]> {
const resp = await this.client.search({ const resp = await this.client.search({
index: toWorkspaceString(this.workspaceId), index: indexName,
type: '_doc', type: '_doc',
body: { body: {
query: { query: {
terms: { bool: {
_id: docs, must: [
boost: 1.0 {
terms: {
_id: docs,
boost: 1.0
}
},
{
match: {
workspaceId: toWorkspaceString(this.workspaceId)
}
}
]
} }
}, },
size: docs.length size: docs.length

View File

@ -35,10 +35,11 @@ import core, {
WorkspaceId, WorkspaceId,
toWorkspaceString toWorkspaceString
} from '@hcengineering/core' } from '@hcengineering/core'
import { PlatformError, unknownStatus } from '@hcengineering/platform' import { getMetadata, PlatformError, unknownStatus } from '@hcengineering/platform'
import { DbAdapter, IndexedDoc } from '@hcengineering/server-core' import serverCore, { DbAdapter, IndexedDoc } from '@hcengineering/server-core'
import { createHash } from 'node:crypto' import { createHash } from 'node:crypto'
const indexName = getMetadata(serverCore.metadata.ElasticIndexName) ?? 'storage_index'
class ElasticDataAdapter implements DbAdapter { class ElasticDataAdapter implements DbAdapter {
constructor ( constructor (
readonly workspaceId: WorkspaceId, readonly workspaceId: WorkspaceId,
@ -81,14 +82,20 @@ class ElasticDataAdapter implements DbAdapter {
try { try {
if (!listRecieved) { if (!listRecieved) {
const q = { const q = {
index: toWorkspaceString(this.workspaceId), index: indexName,
type: '_doc', type: '_doc',
scroll: '23h', scroll: '23h',
// search_type: 'scan', //if I use search_type then it requires size otherwise it shows 0 result // search_type: 'scan', //if I use search_type then it requires size otherwise it shows 0 result
size: 100, size: 100,
body: { body: {
query: { query: {
match_all: {} bool: {
must: {
match: {
workspaceId: toWorkspaceString(this.workspaceId)
}
}
}
} }
} }
} }
@ -163,13 +170,24 @@ class ElasticDataAdapter implements DbAdapter {
const result: Doc[] = [] const result: Doc[] = []
const resp = await this.client.search({ const resp = await this.client.search({
index: toWorkspaceString(this.workspaceId), index: indexName,
type: '_doc', type: '_doc',
body: { body: {
query: { query: {
terms: { bool: {
_id: docs, must: [
boost: 1.0 {
terms: {
_id: docs,
boost: 1.0
}
},
{
match: {
workspaceId: toWorkspaceString(this.workspaceId)
}
}
]
} }
}, },
size: docs.length size: docs.length
@ -198,12 +216,23 @@ class ElasticDataAdapter implements DbAdapter {
await this.client.deleteByQuery( await this.client.deleteByQuery(
{ {
type: '_doc', type: '_doc',
index: toWorkspaceString(this.workspaceId), index: indexName,
body: { body: {
query: { query: {
terms: { bool: {
_id: Array.from(part.map((it) => it._id)), must: [
boost: 1.0 {
terms: {
_id: Array.from(part.map((it) => it._id)),
boost: 1.0
}
},
{
match: {
workspaceId: toWorkspaceString(this.workspaceId)
}
}
]
} }
}, },
size: part.length size: part.length
@ -216,8 +245,11 @@ class ElasticDataAdapter implements DbAdapter {
} }
const operations = part.flatMap((doc) => [ const operations = part.flatMap((doc) => [
{ index: { _index: toWorkspaceString(this.workspaceId), _id: doc._id } }, { index: { _index: indexName, _id: doc._id } },
(doc as FullTextData).data {
workspaceId: toWorkspaceString(this.workspaceId),
...(doc as FullTextData).data
}
]) ])
await this.client.bulk({ refresh: true, body: operations }) await this.client.bulk({ refresh: true, body: operations })
@ -234,12 +266,23 @@ class ElasticDataAdapter implements DbAdapter {
await this.client.deleteByQuery( await this.client.deleteByQuery(
{ {
type: '_doc', type: '_doc',
index: toWorkspaceString(this.workspaceId), index: indexName,
body: { body: {
query: { query: {
terms: { bool: {
_id: part, must: [
boost: 1.0 {
terms: {
_id: part,
boost: 1.0
}
},
{
match: {
workspaceId: toWorkspaceString(this.workspaceId)
}
}
]
} }
}, },
size: part.length size: part.length

View File

@ -103,6 +103,7 @@ services:
- FRONT_URL=http://localhost:8083 - FRONT_URL=http://localhost:8083
- ACCOUNTS_URL=http://account:3003 - ACCOUNTS_URL=http://account:3003
- LAST_NAME_FIRST=true - LAST_NAME_FIRST=true
- ELASTIC_INDEX_NAME=local_storage_index
collaborator: collaborator:
image: hardcoreeng/collaborator image: hardcoreeng/collaborator
links: links: