mirror of
https://github.com/hcengineering/platform.git
synced 2024-12-23 03:22:19 +03:00
Restore DB should upgrade model before Elastic rebuild (#760)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
parent
a7699648dd
commit
2248599550
@ -63,6 +63,7 @@ export async function rebuildElastic (
|
||||
}
|
||||
|
||||
async function dropElastic (elasticUrl: string, dbName: string): Promise<void> {
|
||||
console.log('drop existing elastic docment')
|
||||
const client = new ElasticClient({
|
||||
node: elasticUrl
|
||||
})
|
||||
@ -166,18 +167,30 @@ async function restoreElastic (mongoUrl: string, dbName: string, minio: Client,
|
||||
const tool = new ElasticTool(mongoUrl, dbName, minio, elasticUrl)
|
||||
const done = await tool.connect()
|
||||
try {
|
||||
const txes = (await tool.db.collection<Tx>(DOMAIN_TX).find().sort({ _id: 1 }).toArray())
|
||||
const txes = (await tool.db.collection<Tx>(DOMAIN_TX).find().toArray())
|
||||
const data = txes.filter((tx) => tx.objectSpace !== core.space.Model)
|
||||
const metricsCtx = new MeasureMetricsContext('elastic', {})
|
||||
for (const tx of data) {
|
||||
await tool.storage.tx(metricsCtx, tx)
|
||||
}
|
||||
if (await minio.bucketExists(dbName)) {
|
||||
const minioObjects = await listMinioObjects(minio, dbName)
|
||||
console.log('reply elastic documents', minioObjects.length)
|
||||
for (const d of minioObjects) {
|
||||
await tool.indexAttachment(d.name)
|
||||
}
|
||||
}
|
||||
console.log('replay elastic transactions', data.length)
|
||||
let pos = 0
|
||||
for (const tx of data) {
|
||||
pos++
|
||||
if (pos % 10000 === 0) {
|
||||
console.log('replay elastic transactions', pos, data.length)
|
||||
}
|
||||
try {
|
||||
await tool.storage.tx(metricsCtx, tx)
|
||||
} catch (err: any) {
|
||||
console.error('failed to replay tx', tx, err.message)
|
||||
}
|
||||
}
|
||||
console.log('replay elastic transactions done')
|
||||
} finally {
|
||||
await done()
|
||||
}
|
||||
|
@ -218,7 +218,7 @@ program
|
||||
.command('restore-workspace <workspace> <dirName>')
|
||||
.description('restore workspace transactions and minio resources from previous dump.')
|
||||
.action(async (workspace, dirName, cmd) => {
|
||||
return await restoreWorkspace(mongodbUri, workspace, dirName, minio, elasticUrl)
|
||||
return await restoreWorkspace(mongodbUri, workspace, dirName, minio, elasticUrl, transactorUrl)
|
||||
})
|
||||
|
||||
program
|
||||
|
@ -205,21 +205,21 @@ export async function restoreWorkspace (
|
||||
dbName: string,
|
||||
fileName: string,
|
||||
minio: Client,
|
||||
elasticUrl: string
|
||||
elasticUrl: string,
|
||||
transactorUrl: string
|
||||
): Promise<void> {
|
||||
const client = new MongoClient(mongoUrl)
|
||||
try {
|
||||
await client.connect()
|
||||
const db = client.db(dbName)
|
||||
|
||||
console.log('dumping transactions...')
|
||||
|
||||
const workspaceInfo = JSON.parse((await readFile(fileName + '.workspace.json')).toString()) as WorkspaceInfo
|
||||
|
||||
// Drop existing collections
|
||||
|
||||
const cols = await db.collections()
|
||||
for (const c of cols) {
|
||||
console.log('dropping existing table', c.collectionName)
|
||||
await db.dropCollection(c.collectionName)
|
||||
}
|
||||
// Restore collections.
|
||||
@ -228,11 +228,11 @@ export async function restoreWorkspace (
|
||||
await collection.deleteMany({})
|
||||
const data = JSON.parse((await readFile(fileName + c.name + '.json')).toString()) as Document[]
|
||||
if (data.length > 0) {
|
||||
console.log('restore existing collection', c.name, data.length)
|
||||
await collection.insertMany(data)
|
||||
}
|
||||
}
|
||||
|
||||
console.log('Restore minio objects')
|
||||
if (await minio.bucketExists(dbName)) {
|
||||
const objectNames = (await listMinioObjects(minio, dbName)).map((i) => i.name)
|
||||
await minio.removeObjects(dbName, objectNames)
|
||||
@ -241,11 +241,14 @@ export async function restoreWorkspace (
|
||||
await minio.makeBucket(dbName, 'k8s')
|
||||
|
||||
const minioDbLocation = fileName + '.minio'
|
||||
console.log('Restore minio objects', workspaceInfo.minioData.length)
|
||||
for (const d of workspaceInfo.minioData) {
|
||||
const data = await readFile(join(minioDbLocation, d.name))
|
||||
await minio.putObject(dbName, d.name, data, d.size, d.metaData)
|
||||
}
|
||||
|
||||
await upgradeWorkspace(mongoUrl, dbName, transactorUrl, minio)
|
||||
|
||||
await rebuildElastic(mongoUrl, dbName, minio, elasticUrl)
|
||||
} finally {
|
||||
await client.close()
|
||||
|
@ -119,7 +119,6 @@ export class FullTextIndex implements WithFind {
|
||||
const { _id, $search, ...mainQuery } = query
|
||||
if ($search === undefined) return []
|
||||
const docs = await this.adapter.search(_class, query, options?.limit)
|
||||
console.log(docs)
|
||||
const ids: Set<Ref<Doc>> = new Set<Ref<Doc>>(docs.map(p => p.id))
|
||||
for (const doc of docs) {
|
||||
if (doc.attachedTo !== undefined) {
|
||||
@ -238,7 +237,16 @@ export class FullTextIndex implements WithFind {
|
||||
}
|
||||
}
|
||||
for (const attached of allAttached) {
|
||||
await this.adapter.update(attached._id, docUpdate)
|
||||
try {
|
||||
await this.adapter.update(attached._id, docUpdate)
|
||||
} catch (err: any) {
|
||||
if (((err.message as string) ?? '').includes('document_missing_exception:')) {
|
||||
console.error('missing document in elastic for', tx.objectId, 'attached', attached._id, 'collection', attached.collection)
|
||||
// We have no document for attached object, so ignore for now. it is probable rebuild of elastic DB.
|
||||
continue
|
||||
}
|
||||
throw err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -95,44 +95,32 @@ class ElasticAdapter implements FullTextAdapter {
|
||||
|
||||
async index (doc: IndexedDoc): Promise<TxResult> {
|
||||
if (doc.data === undefined) {
|
||||
try {
|
||||
await this.client.index({
|
||||
index: this.db,
|
||||
id: doc.id,
|
||||
type: '_doc',
|
||||
body: doc
|
||||
})
|
||||
} catch (err: any) {
|
||||
console.error('elastic-exception', err)
|
||||
}
|
||||
await this.client.index({
|
||||
index: this.db,
|
||||
id: doc.id,
|
||||
type: '_doc',
|
||||
body: doc
|
||||
})
|
||||
} else {
|
||||
try {
|
||||
await this.client.index({
|
||||
index: this.db,
|
||||
id: doc.id,
|
||||
type: '_doc',
|
||||
pipeline: 'attachment',
|
||||
body: doc
|
||||
})
|
||||
} catch (err: any) {
|
||||
console.error('elastic-exception', err)
|
||||
}
|
||||
await this.client.index({
|
||||
index: this.db,
|
||||
id: doc.id,
|
||||
type: '_doc',
|
||||
pipeline: 'attachment',
|
||||
body: doc
|
||||
})
|
||||
}
|
||||
return {}
|
||||
}
|
||||
|
||||
async update (id: Ref<Doc>, update: Record<string, any>): Promise<TxResult> {
|
||||
try {
|
||||
await this.client.update({
|
||||
index: this.db,
|
||||
id,
|
||||
body: {
|
||||
doc: update
|
||||
}
|
||||
})
|
||||
} catch (err: any) {
|
||||
console.error('elastic-exception', err)
|
||||
}
|
||||
await this.client.update({
|
||||
index: this.db,
|
||||
id,
|
||||
body: {
|
||||
doc: update
|
||||
}
|
||||
})
|
||||
|
||||
return {}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user