Rebuild elastic (#652)

Signed-off-by: Denis Bykhov <80476319+BykhovDenis@users.noreply.github.com>
This commit is contained in:
Denis Bykhov 2021-12-17 15:09:26 +06:00 committed by GitHub
parent b0ffc0ed46
commit 51e83c183d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 272 additions and 33 deletions

View File

@ -11880,10 +11880,11 @@ packages:
dev: false
file:projects/tool.tgz:
resolution: {integrity: sha512-Ymz2K4mDdajWonLpamGYn7fTN7OqKKjW6SWBB0iZzBxb9BtZOpIY3IVaV9Bm3PYDpm7WOgJ82EjwZbO9+nczFQ==, tarball: file:projects/tool.tgz}
resolution: {integrity: sha512-lU+YKp5b1L8tgTQ+RopEhvT+Jn94y4eIclpXgrcaLG+xk2YAfRdF+fTD8QyA09jXAlWL9Eua+mi3GD/P0a+KJQ==, tarball: file:projects/tool.tgz}
name: '@rush-temp/tool'
version: 0.0.0
dependencies:
'@elastic/elasticsearch': 7.15.0
'@rushstack/heft': 0.41.1
'@types/heft-jest': 1.0.2
'@types/minio': 7.0.10

View File

@ -12,7 +12,7 @@
"bundle": "esbuild src/index.ts --bundle --minify --platform=node > bundle.js",
"docker:build": "docker build -t anticrm/tool .",
"docker:push": "docker push anticrm/tool",
"run-local": "cross-env MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost MONGO_URL=mongodb://localhost:27017 TRANSACTOR_URL=ws:/localhost:3333 TELEGRAM_DATABASE=telegram-service ts-node ./src/index.ts",
"run-local": "cross-env MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost MONGO_URL=mongodb://localhost:27017 TRANSACTOR_URL=ws:/localhost:3333 TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 ts-node ./src/index.ts",
"lint": "eslint src",
"format": "prettier --write src && eslint --fix src"
},
@ -52,6 +52,18 @@
"@anticrm/client": "~0.6.1",
"@anticrm/platform": "~0.6.5",
"@anticrm/model": "~0.6.0",
"fast-equals": "^2.0.3"
"fast-equals": "^2.0.3",
"@elastic/elasticsearch": "^7.14.0",
"@anticrm/elastic": "~0.6.0",
"@anticrm/server-core": "~0.6.1",
"@anticrm/model-attachment": "~0.6.0",
"@anticrm/mongo": "~0.6.0",
"@anticrm/dev-storage": "~0.6.0",
"@anticrm/server-chunter": "~0.6.1",
"@anticrm/server-chunter-resources": "~0.6.0",
"@anticrm/server-recruit": "~0.6.0",
"@anticrm/server-recruit-resources": "~0.6.0",
"@anticrm/server-task": "~0.6.0",
"@anticrm/server-task-resources": "~0.6.0"
}
}

165
dev/tool/src/elastic.ts Normal file
View File

@ -0,0 +1,165 @@
//
// Copyright © 2020, 2021 Anticrm Platform Contributors.
// Copyright © 2021 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import core, { Account, Doc, DOMAIN_TX, generateId, Ref, ServerStorage, Tx } from '@anticrm/core'
import { Client as ElasticClient } from '@elastic/elasticsearch'
import { Db, MongoClient } from 'mongodb'
import { Client } from 'minio'
import { createElasticAdapter } from '@anticrm/elastic'
import { createServerStorage, DbConfiguration, FullTextAdapter, IndexedDoc } from '@anticrm/server-core'
import { DOMAIN_ATTACHMENT } from '@anticrm/model-attachment'
import { createInMemoryAdapter, createInMemoryTxAdapter } from '@anticrm/dev-storage'
import { serverChunterId } from '@anticrm/server-chunter'
import { serverRecruitId } from '@anticrm/server-recruit'
import { serverViewId } from '@anticrm/server-task'
import { addLocation } from '@anticrm/platform'
import { listMinioObjects } from './minio'
export async function rebuildElastic (
mongoUrl: string,
dbName: string,
minio: Client,
elasticUrl: string
): Promise<void> {
await dropElastic(elasticUrl, dbName)
return await restoreElastic(mongoUrl, dbName, minio, elasticUrl)
}
async function dropElastic (elasticUrl: string, dbName: string): Promise<void> {
const client = new ElasticClient({
node: elasticUrl
})
await new Promise((resolve, reject) => {
client.indices.exists(
{
index: dbName
},
(err: any, result: any) => {
if (err != null) reject(err)
if (result.body === true) {
client.indices.delete(
{
index: dbName
},
(err: any, result: any) => {
if (err != null) reject(err)
resolve(result)
}
)
} else {
resolve(result)
}
}
)
})
await client.close()
}
async function restoreElastic (mongoUrl: string, dbName: string, minio: Client, elasticUrl: string): Promise<void> {
addLocation(serverChunterId, () => import('@anticrm/server-chunter-resources'))
addLocation(serverRecruitId, () => import('@anticrm/server-recruit-resources'))
addLocation(serverViewId, () => import('@anticrm/server-task-resources'))
const mongoClient = new MongoClient(mongoUrl)
try {
await mongoClient.connect()
const db = mongoClient.db(dbName)
const elastic = await createElasticAdapter(elasticUrl, dbName)
const storage = await createStorage(mongoUrl, elasticUrl, dbName)
const txes = (await db.collection(DOMAIN_TX).find().sort({ _id: 1 }).toArray()) as Tx[]
const data = txes.filter((tx) => tx.objectSpace !== core.space.Model)
for (const tx of data) {
await storage.tx(tx)
}
if (await minio.bucketExists(dbName)) {
const minioObjects = await listMinioObjects(minio, dbName)
for (const d of minioObjects) {
await indexAttachment(elastic, minio, db, dbName, d.name)
}
}
} finally {
await mongoClient.close()
}
}
async function createStorage (mongoUrl: string, elasticUrl: string, workspace: string): Promise<ServerStorage> {
const conf: DbConfiguration = {
domains: {
[DOMAIN_TX]: 'MongoTx'
},
defaultAdapter: 'InMemory',
adapters: {
MongoTx: {
factory: createInMemoryTxAdapter,
url: mongoUrl
},
InMemory: {
factory: createInMemoryAdapter,
url: ''
}
},
fulltextAdapter: {
factory: createElasticAdapter,
url: elasticUrl
},
workspace
}
return await createServerStorage(conf)
}
async function indexAttachment (
elastic: FullTextAdapter,
minio: Client,
db: Db,
dbName: string,
name: string
): Promise<void> {
const doc = await db.collection(DOMAIN_ATTACHMENT).findOne({
file: name
})
if (doc == null) return
const data = await minio.getObject(dbName, name)
const chunks: Buffer[] = []
await new Promise((resolve) => {
data.on('readable', () => {
let chunk
while ((chunk = data.read()) !== null) {
const b = chunk as Buffer
chunks.push(b)
}
})
data.on('end', () => {
resolve(null)
})
})
const id: Ref<Doc> = (generateId() + '/attachments/') as Ref<Doc>
const indexedDoc: IndexedDoc = {
id: id,
_class: doc._class,
space: doc.space,
modifiedOn: doc.modifiedOn,
modifiedBy: 'core:account:System' as Ref<Account>,
attachedTo: doc.attachedTo,
data: Buffer.concat(chunks).toString('base64')
}
await elastic.index(indexedDoc)
}

View File

@ -15,7 +15,14 @@
//
import {
ACCOUNT_DB, assignWorkspace, createAccount, createWorkspace, dropAccount, dropWorkspace, getAccount, listWorkspaces
ACCOUNT_DB,
assignWorkspace,
createAccount,
createWorkspace,
dropAccount,
dropWorkspace,
getAccount,
listWorkspaces
} from '@anticrm/account'
import contact, { combineName } from '@anticrm/contact'
import core, { TxOperations } from '@anticrm/core'
@ -23,6 +30,7 @@ import { program } from 'commander'
import { Client } from 'minio'
import { Db, MongoClient } from 'mongodb'
import { connect } from './connect'
import { rebuildElastic } from './elastic'
import { clearTelegramHistory } from './telegram'
import { diffWorkspace, dumpWorkspace, initWorkspace, restoreWorkspace, upgradeWorkspace } from './workspace'
@ -56,6 +64,12 @@ if (minioSecretKey === undefined) {
process.exit(1)
}
const elasticUrl = process.env.ELASTIC_URL
if (elasticUrl === undefined) {
console.error('please provide elastic url')
process.exit(1)
}
const minio = new Client({
endPoint: minioEndpoint,
port: 9000,
@ -192,7 +206,7 @@ program
.command('restore-workspace <workspace> <dirName>')
.description('restore workspace transactions and minio resources from previous dump.')
.action(async (workspace, dirName, cmd) => {
return await restoreWorkspace(mongodbUri, workspace, dirName, minio)
return await restoreWorkspace(mongodbUri, workspace, dirName, minio, elasticUrl)
})
program
@ -225,4 +239,12 @@ program
})
})
program
.command('rebuild-elastic')
.description('rebuild elastic index')
.option('-w, --workspace <workspace>', 'target workspace')
.action(async (w, cmd) => {
await rebuildElastic(mongodbUri, w.workspace, minio, elasticUrl)
})
program.parse(process.argv)

33
dev/tool/src/minio.ts Normal file
View File

@ -0,0 +1,33 @@
//
// Copyright © 2020, 2021 Anticrm Platform Contributors.
// Copyright © 2021 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { BucketItem, Client, ItemBucketMetadata } from 'minio'
export type MinioWorkspaceItem = BucketItem & { metaData: ItemBucketMetadata }
export async function listMinioObjects (minio: Client, dbName: string): Promise<MinioWorkspaceItem[]> {
const items: MinioWorkspaceItem[] = []
const list = await minio.listObjects(dbName, undefined, true)
await new Promise((resolve) => {
list.on('data', (data) => {
items.push({ ...data, metaData: {} })
})
list.on('end', () => {
resolve(null)
})
})
return items
}

View File

@ -19,19 +19,26 @@ import core, { DOMAIN_TX, Tx } from '@anticrm/core'
import builder, { migrateOperations } from '@anticrm/model-all'
import { existsSync } from 'fs'
import { mkdir, open, readFile, writeFile } from 'fs/promises'
import { BucketItem, Client, ItemBucketMetadata } from 'minio'
import { Client } from 'minio'
import { Document, MongoClient } from 'mongodb'
import { join } from 'path'
import { connect } from './connect'
import { MigrateClientImpl } from './upgrade'
import { generateModelDiff, printDiff } from './mdiff'
import { listMinioObjects, MinioWorkspaceItem } from './minio'
import { rebuildElastic } from './elastic'
const txes = JSON.parse(JSON.stringify(builder.getTxes())) as Tx[]
/**
* @public
*/
export async function initWorkspace (mongoUrl: string, dbName: string, transactorUrl: string, minio: Client): Promise<void> {
export async function initWorkspace (
mongoUrl: string,
dbName: string,
transactorUrl: string,
minio: Client
): Promise<void> {
const client = new MongoClient(mongoUrl)
try {
await client.connect()
@ -114,8 +121,6 @@ interface CollectionInfo {
file: string
}
type MinioWorkspaceItem = BucketItem & { metaData: ItemBucketMetadata }
interface WorkspaceInfo {
version: string
collections: CollectionInfo[]
@ -169,7 +174,7 @@ export async function dumpWorkspace (mongoUrl: string, dbName: string, fileName:
data.on('readable', () => {
let chunk
while ((chunk = data.read()) !== null) {
const b = (chunk as Buffer)
const b = chunk as Buffer
chunks.push(b)
}
})
@ -191,21 +196,13 @@ export async function dumpWorkspace (mongoUrl: string, dbName: string, fileName:
}
}
async function listMinioObjects (minio: Client, dbName: string): Promise<MinioWorkspaceItem[]> {
const items: MinioWorkspaceItem[] = []
const list = await minio.listObjects(dbName, undefined, true)
await new Promise((resolve) => {
list.on('data', (data) => {
items.push({ ...data, metaData: {} })
})
list.on('end', () => {
resolve(null)
})
})
return items
}
export async function restoreWorkspace (mongoUrl: string, dbName: string, fileName: string, minio: Client): Promise<void> {
export async function restoreWorkspace (
mongoUrl: string,
dbName: string,
fileName: string,
minio: Client,
elasticUrl: string
): Promise<void> {
const client = new MongoClient(mongoUrl)
try {
await client.connect()
@ -231,7 +228,7 @@ export async function restoreWorkspace (mongoUrl: string, dbName: string, fileNa
console.log('Restore minio objects')
if (await minio.bucketExists(dbName)) {
const objectNames = (await listMinioObjects(minio, dbName)).map(i => i.name)
const objectNames = (await listMinioObjects(minio, dbName)).map((i) => i.name)
await minio.removeObjects(dbName, objectNames)
await minio.removeBucket(dbName)
}
@ -242,6 +239,8 @@ export async function restoreWorkspace (mongoUrl: string, dbName: string, fileNa
const data = await readFile(join(minioDbLocation, d.name))
await minio.putObject(dbName, d.name, data, d.size, d.metaData)
}
await rebuildElastic(mongoUrl, dbName, minio, elasticUrl)
} finally {
await client.close()
}
@ -255,16 +254,21 @@ export async function diffWorkspace (mongoUrl: string, dbName: string): Promise<
console.log('diffing transactions...')
const currentModel = await db.collection(DOMAIN_TX).find<Tx>({
const currentModel = await db
.collection(DOMAIN_TX)
.find<Tx>({
objectSpace: core.space.Model,
modifiedBy: core.account.System,
objectClass: { $ne: contact.class.EmployeeAccount }
}).toArray()
})
.toArray()
const txes = builder.getTxes().filter(tx => {
return tx.objectSpace === core.space.Model &&
const txes = builder.getTxes().filter((tx) => {
return (
tx.objectSpace === core.space.Model &&
tx.modifiedBy === core.account.System &&
(tx as any).objectClass !== contact.class.EmployeeAccount
)
})
const { diffTx, dropTx } = await generateModelDiff(currentModel, txes)

View File

@ -34,6 +34,9 @@ import type { IndexedDoc, FullTextAdapter, WithFind } from './types'
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
const NO_INDEX = [] as AnyAttribute[]
/**
* @public
*/
export class FullTextIndex extends TxProcessor implements Storage {
private readonly indexes = new Map<Ref<Class<Obj>>, AnyAttribute[]>()
@ -160,9 +163,7 @@ export class FullTextIndex extends TxProcessor implements Storage {
private getContent (attributes: AnyAttribute[] | undefined, doc: Doc): any[] {
if (attributes === undefined) return []
return attributes.map((attr) =>
(doc as any)[attr.name]?.toString() ?? ''
)
return attributes.map((attr) => (doc as any)[attr.name]?.toString() ?? '')
}
private async updateAttachedDocs (tx: TxUpdateDoc<Doc>, update: any): Promise<void> {

View File

@ -15,5 +15,6 @@
//
export * from './types'
export * from './fulltext'
export * from './storage'
export { default } from './plugin'