mirror of
https://github.com/hcengineering/platform.git
synced 2025-01-08 21:27:45 +03:00
UBERF-4555: Fix elastic backup/restore (#4144)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
parent
926c2fdd9a
commit
7b5fdb67c2
@ -14,6 +14,7 @@
|
||||
//
|
||||
|
||||
import { ApiResponse, Client } from '@elastic/elasticsearch'
|
||||
import { SearchResponse } from '@elastic/elasticsearch/api/types'
|
||||
import core, {
|
||||
Class,
|
||||
Doc,
|
||||
@ -28,15 +29,14 @@ import core, {
|
||||
Ref,
|
||||
Space,
|
||||
StorageIterator,
|
||||
toWorkspaceString,
|
||||
Tx,
|
||||
TxResult,
|
||||
WorkspaceId
|
||||
WorkspaceId,
|
||||
toWorkspaceString
|
||||
} from '@hcengineering/core'
|
||||
import { PlatformError, unknownStatus } from '@hcengineering/platform'
|
||||
import { DbAdapter, IndexedDoc } from '@hcengineering/server-core'
|
||||
import { createHash } from 'node:crypto'
|
||||
import { SearchResponse } from '@elastic/elasticsearch/api/types'
|
||||
|
||||
class ElasticDataAdapter implements DbAdapter {
|
||||
constructor (
|
||||
@ -70,7 +70,10 @@ class ElasticDataAdapter implements DbAdapter {
|
||||
let buffer: { _id: string, data: IndexedDoc }[] = []
|
||||
let resp: ApiResponse | null = null
|
||||
let finished = false
|
||||
return {
|
||||
// eslint-disable-next-line @typescript-eslint/naming-convention
|
||||
let scroll_id: string | undefined
|
||||
|
||||
const stIterator = {
|
||||
next: async () => {
|
||||
try {
|
||||
if (!listRecieved) {
|
||||
@ -79,7 +82,7 @@ class ElasticDataAdapter implements DbAdapter {
|
||||
type: '_doc',
|
||||
scroll: '23h',
|
||||
// search_type: 'scan', //if I use search_type then it requires size otherwise it shows 0 result
|
||||
size: 2500,
|
||||
size: 100,
|
||||
body: {
|
||||
query: {
|
||||
match_all: {}
|
||||
@ -97,21 +100,26 @@ class ElasticDataAdapter implements DbAdapter {
|
||||
buffer = resp.body.hits.hits.map((hit: any) => ({ _id: hit._id, data: hit._source }))
|
||||
if (buffer.length === 0) {
|
||||
finished = true
|
||||
await stIterator.close()
|
||||
}
|
||||
scroll_id = (resp.body as SearchResponse)._scroll_id
|
||||
listRecieved = true
|
||||
}
|
||||
if (resp !== null && pos === buffer.length && !finished) {
|
||||
const params = {
|
||||
scroll_id: (resp.body as SearchResponse)._scroll_id
|
||||
scroll_id,
|
||||
scroll: '23h'
|
||||
}
|
||||
resp = await this.client.scroll(params, { maxRetries: 5 })
|
||||
|
||||
if (resp.statusCode !== 200) {
|
||||
console.error('failed elastic query scroll', params, resp)
|
||||
console.error('failed elastic query scroll', scroll_id, resp)
|
||||
throw new PlatformError(unknownStatus(`failed to elastic query ${JSON.stringify(resp)}`))
|
||||
}
|
||||
buffer = resp.body.hits.hits.map((hit: any) => ({ _id: hit._id, data: hit._source }))
|
||||
if (buffer.length === 0) {
|
||||
finished = true
|
||||
await stIterator.close()
|
||||
}
|
||||
pos = 0
|
||||
}
|
||||
@ -133,18 +141,19 @@ class ElasticDataAdapter implements DbAdapter {
|
||||
if (e?.meta?.body?.error?.type === 'index_not_found_exception') {
|
||||
return undefined
|
||||
}
|
||||
await stIterator.close()
|
||||
console.error('elastic error:', e)
|
||||
throw new PlatformError(e)
|
||||
}
|
||||
},
|
||||
close: async () => {
|
||||
if (resp !== null && (resp.body as SearchResponse)?._scroll_id != null) {
|
||||
await this.client.clearScroll({
|
||||
scroll_id: (resp.body as SearchResponse)._scroll_id
|
||||
})
|
||||
if (scroll_id != null) {
|
||||
await this.client.clearScroll({ scroll_id })
|
||||
scroll_id = undefined
|
||||
}
|
||||
}
|
||||
}
|
||||
return stIterator
|
||||
}
|
||||
|
||||
async load (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
|
||||
|
Loading…
Reference in New Issue
Block a user