mirror of
https://github.com/hcengineering/platform.git
synced 2024-11-22 11:42:30 +03:00
UBERF-7510: add logging and catch errors on cleanup (#6003)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
parent
245c4cb699
commit
8075bbef63
@ -17,7 +17,6 @@ import {
|
||||
DOMAIN_BLOB,
|
||||
DOMAIN_CONFIGURATION,
|
||||
DOMAIN_DOC_INDEX_STATE,
|
||||
DOMAIN_FULLTEXT_BLOB,
|
||||
DOMAIN_MIGRATION,
|
||||
DOMAIN_MODEL,
|
||||
IndexKind,
|
||||
@ -38,7 +37,6 @@ import {
|
||||
type Enum,
|
||||
type EnumOf,
|
||||
type FieldIndex,
|
||||
type FullTextData,
|
||||
type FullTextSearchContext,
|
||||
type IndexStageState,
|
||||
type IndexingConfiguration,
|
||||
@ -309,10 +307,6 @@ export class TPluginConfiguration extends TDoc implements PluginConfiguration {
|
||||
enabled!: boolean
|
||||
beta!: boolean
|
||||
}
|
||||
@Model(core.class.FulltextData, core.class.Doc, DOMAIN_FULLTEXT_BLOB)
|
||||
export class TFulltextData extends TDoc implements FullTextData {
|
||||
data!: any
|
||||
}
|
||||
|
||||
@Model(core.class.DocIndexState, core.class.Doc, DOMAIN_DOC_INDEX_STATE)
|
||||
export class TDocIndexState extends TDoc implements DocIndexState {
|
||||
|
@ -49,7 +49,6 @@ import {
|
||||
TEnum,
|
||||
TEnumOf,
|
||||
TFullTextSearchContext,
|
||||
TFulltextData,
|
||||
TIndexConfiguration,
|
||||
TIndexStageState,
|
||||
TInterface,
|
||||
@ -164,7 +163,6 @@ export function createModel (builder: Builder): void {
|
||||
TUserStatus,
|
||||
TEnum,
|
||||
TTypeAny,
|
||||
TFulltextData,
|
||||
TTypeRelatedDocument,
|
||||
TDocIndexState,
|
||||
TIndexStageState,
|
||||
|
@ -494,14 +494,6 @@ export function versionToString (version: Version | Data<Version>): string {
|
||||
return `${version?.major}.${version?.minor}.${version?.patch}`
|
||||
}
|
||||
|
||||
/**
|
||||
* Blob data from s3 storage
|
||||
* @public
|
||||
*/
|
||||
export interface FullTextData extends Doc {
|
||||
data: any
|
||||
}
|
||||
|
||||
/**
|
||||
* @public
|
||||
*
|
||||
|
@ -30,7 +30,6 @@ import type {
|
||||
DomainIndexConfiguration,
|
||||
Enum,
|
||||
EnumOf,
|
||||
FullTextData,
|
||||
FullTextSearchContext,
|
||||
Hyperlink,
|
||||
IndexStageState,
|
||||
@ -134,7 +133,6 @@ export default plugin(coreId, {
|
||||
Version: '' as Ref<Class<Version>>,
|
||||
PluginConfiguration: '' as Ref<Class<PluginConfiguration>>,
|
||||
UserStatus: '' as Ref<Class<UserStatus>>,
|
||||
FulltextData: '' as Ref<Class<FullTextData>>,
|
||||
TypeRelatedDocument: '' as Ref<Class<Type<RelatedDocument>>>,
|
||||
DocIndexState: '' as Ref<Class<DocIndexState>>,
|
||||
IndexStageState: '' as Ref<Class<IndexStageState>>,
|
||||
|
@ -28,7 +28,6 @@ import {
|
||||
DocIndexState,
|
||||
DOMAIN_BLOB,
|
||||
DOMAIN_DOC_INDEX_STATE,
|
||||
DOMAIN_FULLTEXT_BLOB,
|
||||
DOMAIN_MODEL,
|
||||
DOMAIN_TRANSIENT,
|
||||
FullTextSearchContext,
|
||||
@ -713,7 +712,6 @@ export function isClassIndexable (hierarchy: Hierarchy, c: Ref<Class<Doc>>): boo
|
||||
domain === DOMAIN_TX ||
|
||||
domain === DOMAIN_MODEL ||
|
||||
domain === DOMAIN_BLOB ||
|
||||
domain === DOMAIN_FULLTEXT_BLOB ||
|
||||
domain === DOMAIN_TRANSIENT
|
||||
) {
|
||||
hierarchy.setClassifierProp(c, 'class_indexed', false)
|
||||
|
@ -1276,9 +1276,8 @@ export class LiveQuery implements WithTx, Client {
|
||||
for (const tx of txes) {
|
||||
if (tx._class === core.class.TxWorkspaceEvent) {
|
||||
const evt = tx as TxWorkspaceEvent
|
||||
console.info('checking workspace event', evt._id, evt.params)
|
||||
await this.checkUpdateEvents(tx as TxWorkspaceEvent)
|
||||
await this.changePrivateHandler(tx as TxWorkspaceEvent)
|
||||
await this.checkUpdateEvents(evt)
|
||||
await this.changePrivateHandler(evt)
|
||||
}
|
||||
result.push(await this._tx(tx, docCache))
|
||||
}
|
||||
|
@ -970,7 +970,6 @@ export async function createWorkspace (
|
||||
async (value) => {
|
||||
await updateInfo({ createProgress: 20 + Math.round((Math.min(value, 100) / 100) * 70) })
|
||||
},
|
||||
true,
|
||||
getStorageAdapter()
|
||||
)
|
||||
const modelVersion = getModelVersion()
|
||||
|
@ -14,12 +14,15 @@
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import { Analytics } from '@hcengineering/analytics'
|
||||
import core, {
|
||||
AttachedDoc,
|
||||
BackupClient,
|
||||
Client as CoreClient,
|
||||
Doc,
|
||||
Domain,
|
||||
DOMAIN_BLOB,
|
||||
DOMAIN_DOC_INDEX_STATE,
|
||||
DOMAIN_FULLTEXT_BLOB,
|
||||
DOMAIN_MODEL,
|
||||
DOMAIN_TRANSIENT,
|
||||
@ -33,7 +36,7 @@ import core, {
|
||||
type Blob,
|
||||
type DocIndexState
|
||||
} from '@hcengineering/core'
|
||||
import type { StorageAdapter } from '@hcengineering/server-core'
|
||||
import { fullTextPushStagePrefix, type StorageAdapter } from '@hcengineering/server-core'
|
||||
import { BlobClient, connect } from '@hcengineering/server-tool'
|
||||
import { mkdtemp, writeFile } from 'node:fs/promises'
|
||||
import { PassThrough } from 'node:stream'
|
||||
@ -43,7 +46,6 @@ import { Writable } from 'stream'
|
||||
import { extract, Pack, pack } from 'tar-stream'
|
||||
import { createGunzip, gunzipSync, gzipSync } from 'zlib'
|
||||
import { BackupStorage } from './storage'
|
||||
import { Analytics } from '@hcengineering/analytics'
|
||||
export * from './storage'
|
||||
|
||||
const dataBlobSize = 50 * 1024 * 1024
|
||||
@ -231,7 +233,6 @@ export async function cloneWorkspace (
|
||||
targetWorkspaceId: WorkspaceId,
|
||||
clearTime: boolean = true,
|
||||
progress: (value: number) => Promise<void>,
|
||||
skipFullText: boolean,
|
||||
storageAdapter: StorageAdapter
|
||||
): Promise<void> {
|
||||
await ctx.with(
|
||||
@ -264,10 +265,6 @@ export async function cloneWorkspace (
|
||||
|
||||
let i = 0
|
||||
for (const c of domains) {
|
||||
if (skipFullText && c === DOMAIN_FULLTEXT_BLOB) {
|
||||
ctx.info('clone skip domain...', { domain: c, workspace: targetWorkspaceId.name })
|
||||
continue
|
||||
}
|
||||
ctx.info('clone domain...', { domain: c, workspace: targetWorkspaceId.name })
|
||||
|
||||
// We need to clean target connection before copying something.
|
||||
@ -347,7 +344,7 @@ export async function cloneWorkspace (
|
||||
try {
|
||||
docs = await ctx.with('load-docs', {}, async (ctx) => await sourceConnection.loadDocs(c, needRetrieve))
|
||||
if (clearTime) {
|
||||
docs = prepareClonedDocuments(docs, sourceConnection, skipFullText)
|
||||
docs = prepareClonedDocuments(docs, sourceConnection)
|
||||
}
|
||||
const executor = new RateLimiter(10)
|
||||
for (const d of docs) {
|
||||
@ -422,11 +419,7 @@ export async function cloneWorkspace (
|
||||
)
|
||||
}
|
||||
|
||||
function prepareClonedDocuments (
|
||||
docs: Doc[],
|
||||
sourceConnection: CoreClient & BackupClient,
|
||||
skipFullText: boolean
|
||||
): Doc[] {
|
||||
function prepareClonedDocuments (docs: Doc[], sourceConnection: CoreClient & BackupClient): Doc[] {
|
||||
docs = docs.map((p) => {
|
||||
let collectionCud = false
|
||||
try {
|
||||
@ -436,8 +429,13 @@ function prepareClonedDocuments (
|
||||
}
|
||||
|
||||
// if full text is skipped, we need to clean stages for indexes.
|
||||
if (p._class === core.class.DocIndexState && skipFullText) {
|
||||
;(p as DocIndexState).stages = {}
|
||||
if (p._class === core.class.DocIndexState) {
|
||||
for (const k of Object.keys((p as DocIndexState).stages)) {
|
||||
if (k.startsWith(fullTextPushStagePrefix)) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
|
||||
delete (p as DocIndexState).stages[k]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (collectionCud) {
|
||||
@ -556,6 +554,7 @@ export async function backup (
|
||||
(it) =>
|
||||
it !== DOMAIN_TRANSIENT &&
|
||||
it !== DOMAIN_MODEL &&
|
||||
it !== ('fulltext-blob' as Domain) &&
|
||||
!options.skipDomains.includes(it) &&
|
||||
(options.include === undefined || options.include.has(it))
|
||||
)
|
||||
@ -1090,6 +1089,7 @@ export async function restore (
|
||||
const infoFile = 'backup.json.gz'
|
||||
|
||||
if (!(await storage.exists(infoFile))) {
|
||||
ctx.error('file not pressent', { file: infoFile })
|
||||
throw new Error(`${infoFile} should present to restore`)
|
||||
}
|
||||
const backupInfo: BackupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString())
|
||||
@ -1097,13 +1097,14 @@ export async function restore (
|
||||
if (opt.date !== -1) {
|
||||
const bk = backupInfo.snapshots.findIndex((it) => it.date === opt.date)
|
||||
if (bk === -1) {
|
||||
ctx.error('could not restore to', { date: opt.date, file: infoFile, workspaceId: workspaceId.name })
|
||||
throw new Error(`${infoFile} could not restore to ${opt.date}. Snapshot is missing.`)
|
||||
}
|
||||
snapshots = backupInfo.snapshots.slice(0, bk + 1)
|
||||
} else {
|
||||
opt.date = snapshots[snapshots.length - 1].date
|
||||
}
|
||||
console.log('restore to ', opt.date, new Date(opt.date))
|
||||
ctx.info('restore to ', { id: opt.date, date: new Date(opt.date).toDateString() })
|
||||
const rsnapshots = Array.from(snapshots).reverse()
|
||||
|
||||
// Collect all possible domains
|
||||
@ -1112,7 +1113,7 @@ export async function restore (
|
||||
Object.keys(s.domains).forEach((it) => domains.add(it as Domain))
|
||||
}
|
||||
|
||||
console.log('connecting:', transactorUrl, workspaceId.name)
|
||||
ctx.info('connecting:', { transactorUrl, workspace: workspaceId.name })
|
||||
const connection = (await connect(transactorUrl, workspaceId, undefined, {
|
||||
mode: 'backup',
|
||||
model: 'upgrade'
|
||||
@ -1127,6 +1128,9 @@ export async function restore (
|
||||
domains.add(d)
|
||||
}
|
||||
|
||||
// We do not backup elastic anymore
|
||||
domains.delete('fulltext-blob' as Domain)
|
||||
|
||||
let uploadedMb = 0
|
||||
let uploaded = 0
|
||||
|
||||
@ -1138,7 +1142,8 @@ export async function restore (
|
||||
uploadedMb = newId
|
||||
ctx.info('Uploaded', {
|
||||
msg,
|
||||
written: newDownloadedMb
|
||||
written: newDownloadedMb,
|
||||
workspace: workspaceId.name
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -1167,7 +1172,7 @@ export async function restore (
|
||||
}
|
||||
|
||||
if (el > 2500) {
|
||||
console.log(c, ' loaded from server', loaded, el, chunks)
|
||||
ctx.info('loaded from server', { domain: c, loaded, el, chunks, workspace: workspaceId.name })
|
||||
el = 0
|
||||
chunks = 0
|
||||
}
|
||||
@ -1180,8 +1185,12 @@ export async function restore (
|
||||
await connection.closeChunk(idx)
|
||||
}
|
||||
}
|
||||
console.log(' loaded', loaded)
|
||||
console.log('\tcompare documents', changeset.size, serverChangeset.size)
|
||||
ctx.info('loaded', { loaded, workspace: workspaceId.name })
|
||||
ctx.info('\tcompare documents', {
|
||||
size: changeset.size,
|
||||
serverSize: serverChangeset.size,
|
||||
workspace: workspaceId.name
|
||||
})
|
||||
|
||||
// Let's find difference
|
||||
const docsToAdd = new Map(
|
||||
@ -1208,7 +1217,13 @@ export async function restore (
|
||||
|
||||
if (sendSize > dataUploadSize || (doc === undefined && docs.length > 0)) {
|
||||
totalSend += docs.length
|
||||
console.log('upload', docs.length, `send: ${totalSend} from ${docsToAdd.size + totalSend}`, 'size:', sendSize)
|
||||
ctx.info('upload', {
|
||||
docs: docs.length,
|
||||
totalSend,
|
||||
from: docsToAdd.size + totalSend,
|
||||
sendSize,
|
||||
workspace: workspaceId.name
|
||||
})
|
||||
await connection.upload(c, docs)
|
||||
docs.length = 0
|
||||
sendSize = 0
|
||||
@ -1224,13 +1239,13 @@ export async function restore (
|
||||
const sDigest = await loadDigest(ctx, storage, [s], c)
|
||||
const requiredDocs = new Map(Array.from(sDigest.entries()).filter(([it]) => docsToAdd.has(it)))
|
||||
if (requiredDocs.size > 0) {
|
||||
console.log('updating', c, requiredDocs.size)
|
||||
ctx.info('updating', { domain: c, requiredDocs: requiredDocs.size, workspace: workspaceId.name })
|
||||
// We have required documents here.
|
||||
for (const sf of d.storage ?? []) {
|
||||
if (docsToAdd.size === 0) {
|
||||
break
|
||||
}
|
||||
console.log('processing', sf, processed)
|
||||
ctx.info('processing', { storageFile: sf, processed, workspace: workspaceId.name })
|
||||
|
||||
const readStream = await storage.load(sf)
|
||||
const ex = extract()
|
||||
@ -1332,11 +1347,27 @@ export async function restore (
|
||||
}
|
||||
|
||||
await sendChunk(undefined, 0)
|
||||
if (docsToRemove.length > 0 && opt.merge !== true) {
|
||||
console.log('cleanup', docsToRemove.length)
|
||||
async function performCleanOfDomain (docsToRemove: Ref<Doc>[], c: Domain): Promise<void> {
|
||||
ctx.info('cleanup', { toRemove: docsToRemove.length, workspace: workspaceId.name, domain: c })
|
||||
while (docsToRemove.length > 0) {
|
||||
const part = docsToRemove.splice(0, 10000)
|
||||
await connection.clean(c, part)
|
||||
try {
|
||||
await connection.clean(c, part)
|
||||
} catch (err: any) {
|
||||
ctx.error('failed to clean, will retry', { error: err, workspaceId: workspaceId.name })
|
||||
docsToRemove.push(...part)
|
||||
}
|
||||
}
|
||||
}
|
||||
if (c !== DOMAIN_BLOB) {
|
||||
// Clean domain documents if not blob
|
||||
if (docsToRemove.length > 0 && opt.merge !== true) {
|
||||
if (c === DOMAIN_DOC_INDEX_STATE) {
|
||||
// We need o clean a FULLTEXT domain as well
|
||||
await performCleanOfDomain([...docsToRemove], DOMAIN_FULLTEXT_BLOB)
|
||||
}
|
||||
|
||||
await performCleanOfDomain(docsToRemove, c)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1352,7 +1383,7 @@ export async function restore (
|
||||
continue
|
||||
}
|
||||
await limiter.exec(async () => {
|
||||
console.log('processing domain', c)
|
||||
ctx.info('processing domain', { domain: c, workspaceId: workspaceId.name })
|
||||
let retry = 5
|
||||
let delay = 1
|
||||
while (retry > 0) {
|
||||
@ -1360,13 +1391,13 @@ export async function restore (
|
||||
try {
|
||||
await processDomain(c)
|
||||
if (delay > 1) {
|
||||
console.log('retry-success')
|
||||
ctx.warn('retry-success', { retry, delay, workspaceId: workspaceId.name })
|
||||
}
|
||||
break
|
||||
} catch (err: any) {
|
||||
console.error('error', err)
|
||||
ctx.error('failed to process domain', { err, domain: c, workspaceId: workspaceId.name })
|
||||
if (retry !== 0) {
|
||||
console.log('cool-down to retry', delay)
|
||||
ctx.warn('cool-down to retry', { delay, domain: c, workspaceId: workspaceId.name })
|
||||
await new Promise((resolve) => setTimeout(resolve, delay * 1000))
|
||||
delay++
|
||||
}
|
||||
|
@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import { Analytics } from '@hcengineering/analytics'
|
||||
import core, {
|
||||
type Class,
|
||||
DOMAIN_DOC_INDEX_STATE,
|
||||
@ -37,7 +38,6 @@ import { type DbAdapter } from '../adapter'
|
||||
import { RateLimiter } from '../limitter'
|
||||
import type { IndexedDoc } from '../types'
|
||||
import { type FullTextPipeline, type FullTextPipelineStage } from './types'
|
||||
import { Analytics } from '@hcengineering/analytics'
|
||||
|
||||
export * from './content'
|
||||
export * from './field'
|
||||
@ -383,8 +383,12 @@ export class FullTextIndexPipeline implements FullTextPipeline {
|
||||
// We need to send index update event
|
||||
clearTimeout(this.updateBroadcast)
|
||||
this.updateBroadcast = setTimeout(() => {
|
||||
this.broadcastUpdate(Array.from(this.broadcastClasses.values()))
|
||||
this.broadcastClasses.clear()
|
||||
this.broadcastClasses.delete(core.class.DocIndexState)
|
||||
if (this.broadcastClasses.size > 0) {
|
||||
const toSend = Array.from(this.broadcastClasses.values())
|
||||
this.broadcastClasses.clear()
|
||||
this.broadcastUpdate(toSend)
|
||||
}
|
||||
}, 5000)
|
||||
|
||||
await new Promise((resolve) => {
|
||||
|
@ -108,3 +108,8 @@ export const fieldStateId = 'fld-v15'
|
||||
* @public
|
||||
*/
|
||||
export const fullTextPushStageId = 'fts-v17'
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export const fullTextPushStagePrefix = 'fts-'
|
||||
|
@ -1,63 +0,0 @@
|
||||
import { DbAdapter } from '@hcengineering/server-core'
|
||||
import { Domain, getWorkspaceId, Hierarchy, MeasureMetricsContext } from '@hcengineering/core'
|
||||
import { createElasticBackupDataAdapter } from '../backup'
|
||||
|
||||
import { Client } from '@elastic/elasticsearch'
|
||||
|
||||
describe('Elastic Data Adapter', () => {
|
||||
const url = process.env.ELASTIC_URL ?? 'http://localhost:9200/'
|
||||
const domain = 'test' as Domain
|
||||
|
||||
let adapter: DbAdapter
|
||||
|
||||
beforeEach(async () => {
|
||||
adapter = await createElasticBackupDataAdapter(
|
||||
new MeasureMetricsContext('test', {}),
|
||||
new Hierarchy(),
|
||||
url,
|
||||
getWorkspaceId('ws1', '')
|
||||
)
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
await adapter.close()
|
||||
})
|
||||
|
||||
it('should init', () => {
|
||||
expect(adapter).toBeTruthy()
|
||||
})
|
||||
|
||||
describe('Scroll Contexts', () => {
|
||||
let client: Client
|
||||
|
||||
beforeEach(async () => {
|
||||
client = new Client({ node: url })
|
||||
await client.cluster.putSettings({
|
||||
body: {
|
||||
persistent: { 'search.max_open_scroll_context': '2' },
|
||||
transient: { 'search.max_open_scroll_context': '2' }
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
// Use afterEach() to make sure we clean up even if test fail
|
||||
afterEach(async () => {
|
||||
await client.cluster.putSettings({
|
||||
body: {
|
||||
persistent: { 'search.max_open_scroll_context': null },
|
||||
transient: { 'search.max_open_scroll_context': null }
|
||||
}
|
||||
})
|
||||
await client.close()
|
||||
})
|
||||
|
||||
it('should get properly closed', async () => {
|
||||
const ctx = new MeasureMetricsContext('test', {})
|
||||
for (let i = 0; i <= 3; i++) {
|
||||
const cursor = adapter.find(ctx, domain)
|
||||
await cursor.next(ctx)
|
||||
await cursor.close(ctx)
|
||||
}
|
||||
})
|
||||
})
|
||||
})
|
@ -18,7 +18,6 @@ import {
|
||||
Class,
|
||||
Doc,
|
||||
DocumentQuery,
|
||||
FullTextData,
|
||||
IndexingConfiguration,
|
||||
MeasureContext,
|
||||
Ref,
|
||||
@ -54,8 +53,8 @@ function getIndexVersion (): string {
|
||||
|
||||
class ElasticAdapter implements FullTextAdapter {
|
||||
private readonly workspaceString: string
|
||||
private readonly getFulltextDocId: (doc: Ref<Doc>) => Ref<FullTextData>
|
||||
private readonly getDocId: (fulltext: Ref<FullTextData>) => Ref<Doc>
|
||||
private readonly getFulltextDocId: (doc: Ref<Doc>) => Ref<Doc>
|
||||
private readonly getDocId: (fulltext: Ref<Doc>) => Ref<Doc>
|
||||
private readonly indexName: string
|
||||
|
||||
constructor (
|
||||
@ -67,7 +66,7 @@ class ElasticAdapter implements FullTextAdapter {
|
||||
) {
|
||||
this.indexName = `${indexBaseName}_${indexVersion}`
|
||||
this.workspaceString = toWorkspaceString(workspaceId)
|
||||
this.getFulltextDocId = (doc) => `${doc}@${this.workspaceString}` as Ref<FullTextData>
|
||||
this.getFulltextDocId = (doc) => `${doc}@${this.workspaceString}` as Ref<Doc>
|
||||
this.getDocId = (fulltext) => fulltext.slice(0, -1 * (this.workspaceString.length + 1)) as Ref<Doc>
|
||||
}
|
||||
|
||||
|
@ -13,9 +13,8 @@
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import { ApiResponse, Client } from '@elastic/elasticsearch'
|
||||
import { SearchResponse } from '@elastic/elasticsearch/api/types'
|
||||
import core, {
|
||||
import { Client } from '@elastic/elasticsearch'
|
||||
import {
|
||||
Class,
|
||||
Doc,
|
||||
DocumentQuery,
|
||||
@ -23,21 +22,18 @@ import core, {
|
||||
Domain,
|
||||
FindOptions,
|
||||
FindResult,
|
||||
FullTextData,
|
||||
Hierarchy,
|
||||
IndexingConfiguration,
|
||||
MeasureContext,
|
||||
Ref,
|
||||
Space,
|
||||
StorageIterator,
|
||||
toWorkspaceString,
|
||||
Tx,
|
||||
TxResult,
|
||||
WorkspaceId,
|
||||
toWorkspaceString
|
||||
WorkspaceId
|
||||
} from '@hcengineering/core'
|
||||
import { getMetadata, PlatformError, unknownStatus } from '@hcengineering/platform'
|
||||
import serverCore, { DbAdapter, IndexedDoc } from '@hcengineering/server-core'
|
||||
import { createHash } from 'node:crypto'
|
||||
import { getMetadata } from '@hcengineering/platform'
|
||||
import serverCore, { DbAdapter } from '@hcengineering/server-core'
|
||||
|
||||
function getIndexName (): string {
|
||||
return getMetadata(serverCore.metadata.ElasticIndexName) ?? 'storage_index'
|
||||
@ -49,8 +45,8 @@ function getIndexVersion (): string {
|
||||
|
||||
class ElasticDataAdapter implements DbAdapter {
|
||||
private readonly workspaceString: string
|
||||
private readonly getFulltextDocId: (doc: Ref<Doc>) => Ref<FullTextData>
|
||||
private readonly getDocId: (fulltext: Ref<FullTextData>) => Ref<Doc>
|
||||
private readonly getFulltextDocId: (doc: Ref<Doc>) => Ref<Doc>
|
||||
private readonly getDocId: (fulltext: Ref<Doc>) => Ref<Doc>
|
||||
private readonly indexName: string
|
||||
|
||||
constructor (
|
||||
@ -61,7 +57,7 @@ class ElasticDataAdapter implements DbAdapter {
|
||||
) {
|
||||
this.indexName = `${indexBaseName}_${indexVersion}`
|
||||
this.workspaceString = toWorkspaceString(workspaceId)
|
||||
this.getFulltextDocId = (doc) => `${doc}@${this.workspaceString}` as Ref<FullTextData>
|
||||
this.getFulltextDocId = (doc) => `${doc}@${this.workspaceString}` as Ref<Doc>
|
||||
this.getDocId = (fulltext) => fulltext.slice(0, -1 * (this.workspaceString.length + 1)) as Ref<Doc>
|
||||
}
|
||||
|
||||
@ -86,195 +82,15 @@ class ElasticDataAdapter implements DbAdapter {
|
||||
}
|
||||
|
||||
find (ctx: MeasureContext, domain: Domain): StorageIterator {
|
||||
let listRecieved = false
|
||||
let pos = 0
|
||||
let buffer: { _id: string, data: IndexedDoc }[] = []
|
||||
let resp: ApiResponse | null = null
|
||||
let finished = false
|
||||
// eslint-disable-next-line @typescript-eslint/naming-convention
|
||||
let scroll_id: string | undefined
|
||||
|
||||
const stIterator = {
|
||||
next: async () => {
|
||||
try {
|
||||
if (!listRecieved) {
|
||||
const q = {
|
||||
index: this.indexName,
|
||||
type: '_doc',
|
||||
scroll: '23h',
|
||||
// search_type: 'scan', //if I use search_type then it requires size otherwise it shows 0 result
|
||||
size: 100,
|
||||
body: {
|
||||
query: {
|
||||
bool: {
|
||||
must: {
|
||||
match: {
|
||||
workspaceId: { query: this.workspaceString, operator: 'and' }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
resp = await this.client.search(q)
|
||||
if (resp.statusCode !== 200) {
|
||||
if (resp.body?.error?.type === 'index_not_found_exception') {
|
||||
return undefined
|
||||
}
|
||||
console.error('failed elastic query', q, resp)
|
||||
throw new PlatformError(unknownStatus(`failed to elastic query ${JSON.stringify(resp)}`))
|
||||
}
|
||||
buffer = resp.body.hits.hits.map((hit: any) => ({ _id: hit._id, data: hit._source }))
|
||||
if (buffer.length === 0) {
|
||||
finished = true
|
||||
await stIterator.close()
|
||||
}
|
||||
scroll_id = (resp.body as SearchResponse)._scroll_id
|
||||
listRecieved = true
|
||||
}
|
||||
if (resp !== null && pos === buffer.length && !finished) {
|
||||
const params = {
|
||||
scroll_id,
|
||||
scroll: '23h'
|
||||
}
|
||||
resp = await this.client.scroll(params, { maxRetries: 5 })
|
||||
|
||||
if (resp.statusCode !== 200) {
|
||||
console.error('failed elastic query scroll', scroll_id, resp)
|
||||
throw new PlatformError(unknownStatus(`failed to elastic query ${JSON.stringify(resp)}`))
|
||||
}
|
||||
buffer = resp.body.hits.hits.map((hit: any) => ({ _id: hit._id, data: hit._source }))
|
||||
if (buffer.length === 0) {
|
||||
finished = true
|
||||
await stIterator.close()
|
||||
}
|
||||
pos = 0
|
||||
}
|
||||
if (pos < buffer.length) {
|
||||
const item = buffer[pos]
|
||||
const hash = createHash('sha256')
|
||||
const json = JSON.stringify(item.data)
|
||||
hash.update(json)
|
||||
const digest = hash.digest('base64')
|
||||
const result = {
|
||||
id: this.getDocId(item._id as Ref<FullTextData>),
|
||||
hash: digest,
|
||||
size: json.length
|
||||
}
|
||||
pos++
|
||||
return result
|
||||
}
|
||||
} catch (e: any) {
|
||||
if (e?.meta?.body?.error?.type === 'index_not_found_exception') {
|
||||
return undefined
|
||||
}
|
||||
await stIterator.close()
|
||||
console.error('elastic error:', e)
|
||||
throw new PlatformError(e)
|
||||
}
|
||||
},
|
||||
close: async () => {
|
||||
if (scroll_id != null) {
|
||||
await this.client.clearScroll({ scroll_id })
|
||||
scroll_id = undefined
|
||||
}
|
||||
}
|
||||
}
|
||||
return stIterator
|
||||
throw new Error('Method not implemented.')
|
||||
}
|
||||
|
||||
async load (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
|
||||
const result: Doc[] = []
|
||||
const toLoad = [...docs]
|
||||
|
||||
while (toLoad.length > 0) {
|
||||
const part = toLoad.splice(0, 5000)
|
||||
const resp = await this.client.search({
|
||||
index: this.indexName,
|
||||
type: '_doc',
|
||||
body: {
|
||||
query: {
|
||||
bool: {
|
||||
must: [
|
||||
{
|
||||
terms: {
|
||||
_id: part.map(this.getFulltextDocId),
|
||||
boost: 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
match: {
|
||||
workspaceId: { query: this.workspaceString, operator: 'and' }
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
size: part.length
|
||||
}
|
||||
})
|
||||
const buffer = resp.body.hits.hits.map((hit: any) => ({ _id: hit._id, data: hit._source }))
|
||||
|
||||
for (const item of buffer) {
|
||||
const dta: FullTextData = {
|
||||
_id: this.getDocId(item._id) as Ref<FullTextData>, // Export without workspace portion of ID
|
||||
_class: core.class.FulltextData,
|
||||
space: 'fulltext-blob' as Ref<Space>,
|
||||
modifiedOn: item.data.modifiedOn,
|
||||
modifiedBy: item.data.modifiedBy,
|
||||
data: item.data
|
||||
}
|
||||
result.push(dta)
|
||||
}
|
||||
}
|
||||
return result
|
||||
throw new Error('Method not implemented.')
|
||||
}
|
||||
|
||||
async upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise<void> {
|
||||
while (docs.length > 0) {
|
||||
const part = docs.splice(0, 10000)
|
||||
try {
|
||||
await this.client.deleteByQuery(
|
||||
{
|
||||
type: '_doc',
|
||||
index: this.indexName,
|
||||
body: {
|
||||
query: {
|
||||
bool: {
|
||||
must: [
|
||||
{
|
||||
terms: {
|
||||
_id: part.map((it) => this.getFulltextDocId(it._id)),
|
||||
boost: 1.0
|
||||
}
|
||||
},
|
||||
{
|
||||
match: {
|
||||
workspaceId: { query: this.workspaceString, operator: 'and' }
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
size: part.length
|
||||
}
|
||||
},
|
||||
undefined
|
||||
)
|
||||
} catch (err: any) {
|
||||
console.error(err)
|
||||
}
|
||||
|
||||
const operations = part.flatMap((doc) => [
|
||||
{ index: { _index: this.indexName, _id: this.getFulltextDocId(doc._id) } },
|
||||
{
|
||||
...(doc as FullTextData).data,
|
||||
workspaceId: this.workspaceString
|
||||
}
|
||||
])
|
||||
|
||||
await this.client.bulk({ refresh: true, body: operations })
|
||||
}
|
||||
throw new Error('Method not implemented.')
|
||||
}
|
||||
|
||||
async update (ctx: MeasureContext, domain: Domain, operations: Map<Ref<Doc>, DocumentUpdate<Doc>>): Promise<void> {
|
||||
@ -282,6 +98,14 @@ class ElasticDataAdapter implements DbAdapter {
|
||||
}
|
||||
|
||||
async clean (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<void> {
|
||||
const indexExists = await this.client.indices.exists({
|
||||
index: this.indexName
|
||||
})
|
||||
if (!indexExists.body) {
|
||||
// No need to clean, no index exists.
|
||||
return
|
||||
}
|
||||
|
||||
while (docs.length > 0) {
|
||||
const part = docs.splice(0, 10000)
|
||||
await this.client.deleteByQuery(
|
||||
|
@ -95,6 +95,7 @@ test.describe('channel tests', () => {
|
||||
await channelPageSecond.checkIfChannelDefaultExist(false, data.channelName)
|
||||
await channelPageSecond.clickChannelTab()
|
||||
await channelPageSecond.checkIfChannelTableExist(data.channelName, false)
|
||||
await page2.close()
|
||||
})
|
||||
|
||||
test('create new public channel tests and check if the new user have access to it by default', async ({
|
||||
@ -124,6 +125,7 @@ test.describe('channel tests', () => {
|
||||
await channelPageSecond.checkIfChannelDefaultExist(false, data.channelName)
|
||||
await channelPageSecond.clickChannelTab()
|
||||
await channelPageSecond.checkIfChannelTableExist(data.channelName, true)
|
||||
await page2.close()
|
||||
})
|
||||
|
||||
test('create new private channel and test if the user can exchange the messages', async ({ browser, page }) => {
|
||||
@ -158,6 +160,7 @@ test.describe('channel tests', () => {
|
||||
await channelPageSecond.checkMessageExist('My dream is to fly', true, 'My dream is to fly')
|
||||
await channelPage.clickOnClosePopupButton()
|
||||
await channelPage.checkMessageExist('My dream is to fly', true, 'My dream is to fly')
|
||||
await page2.close()
|
||||
})
|
||||
|
||||
test('create new private channel add user to it', async ({ browser, page }) => {
|
||||
@ -195,6 +198,7 @@ test.describe('channel tests', () => {
|
||||
await channelPageSecond.checkMessageExist('One two', true, 'One two')
|
||||
await channelPage.clickChooseChannel(data.channelName)
|
||||
await channelPage.checkMessageExist('One two', true, 'One two')
|
||||
await page2.close()
|
||||
})
|
||||
|
||||
test('go to general channel add user to it', async ({ browser, page }) => {
|
||||
@ -225,6 +229,7 @@ test.describe('channel tests', () => {
|
||||
await channelPage.clickOnClosePopupButton()
|
||||
await channelPage.clickChannel('general')
|
||||
await channelPage.checkMessageExist('One two', true, 'One two')
|
||||
await page2.close()
|
||||
})
|
||||
|
||||
test('go to random channel add user to it', async ({ browser, page }) => {
|
||||
@ -255,6 +260,7 @@ test.describe('channel tests', () => {
|
||||
await channelPage.clickOnClosePopupButton()
|
||||
await channelPage.clickChannel('random')
|
||||
await channelPage.checkMessageExist('One two', true, 'One two')
|
||||
await page2.close()
|
||||
})
|
||||
|
||||
test('check if user can add emoji', async () => {
|
||||
@ -374,6 +380,7 @@ test.describe('channel tests', () => {
|
||||
await channelPageSecond.clickChannel('general')
|
||||
await channelPageSecond.clickOnOpenChannelDetails()
|
||||
await channelPageSecond.checkIfUserIsAdded(data.lastName + ' ' + data.firstName, false)
|
||||
await page2.close()
|
||||
})
|
||||
|
||||
test('Check if we can create new public channel tests and check if the new user have can be added through preview', async ({
|
||||
@ -400,5 +407,6 @@ test.describe('channel tests', () => {
|
||||
await channelPage.clickChannel(data.channelName)
|
||||
await channelPage.clickOnOpenChannelDetails()
|
||||
await channelPage.addMemberToChannelPreview(newUser2.lastName + ' ' + newUser2.firstName)
|
||||
await page2.close()
|
||||
})
|
||||
})
|
||||
|
@ -125,6 +125,7 @@ test.describe('Inbox tests', () => {
|
||||
await leftSideMenuPageSecond.clickTracker()
|
||||
await leftSideMenuPageSecond.clickNotification()
|
||||
await inboxPageSecond.checkIfTaskIsPresentInInbox(newIssue.title)
|
||||
await page2.close()
|
||||
})
|
||||
|
||||
test('User is able to assign someone else and he should be able to open the task', async ({ page, browser }) => {
|
||||
@ -158,6 +159,7 @@ test.describe('Inbox tests', () => {
|
||||
milestone: 'Milestone',
|
||||
estimation: '2h'
|
||||
})
|
||||
await page2.close()
|
||||
})
|
||||
test.skip('User is able to create a task, assign a other user and close it from inbox', async ({ page, browser }) => {
|
||||
await leftSideMenuPage.openProfileMenu()
|
||||
@ -192,6 +194,7 @@ test.describe('Inbox tests', () => {
|
||||
})
|
||||
await inboxPage.clickCloseLeftSidePanel()
|
||||
// ADD ASSERT ONCE THE ISSUE IS FIXED
|
||||
await page2.close()
|
||||
})
|
||||
|
||||
test('User is able to send message to other user and he should see it in inbox', async ({ page, browser }) => {
|
||||
@ -221,6 +224,7 @@ test.describe('Inbox tests', () => {
|
||||
await inboxPageSecond.checkIfInboxChatExists('Channel general', true)
|
||||
await inboxPageSecond.clickOnInboxChat('Channel general')
|
||||
await inboxPageSecond.checkIfTextInChatIsPresent('Test message')
|
||||
await page2.close()
|
||||
})
|
||||
|
||||
test('User is able to turn off notification and he should not receive messages to inbox', async ({
|
||||
@ -254,6 +258,7 @@ test.describe('Inbox tests', () => {
|
||||
await channelPage.checkMessageExist('Test message', true, 'Test message')
|
||||
await leftSideMenuPageSecond.clickNotification()
|
||||
await inboxPageSecond.checkIfInboxChatExists('Channel general', false)
|
||||
await page2.close()
|
||||
})
|
||||
|
||||
test('User is able to change filter in inbox', async ({ page, browser }) => {
|
||||
@ -290,5 +295,6 @@ test.describe('Inbox tests', () => {
|
||||
await inboxPageSecond.clickOnInboxFilter('Issues')
|
||||
await inboxPageSecond.checkIfIssueIsPresentInInbox(newIssue.title)
|
||||
await inboxPageSecond.checkIfInboxChatExists('Channel general', false)
|
||||
await page2.close()
|
||||
})
|
||||
})
|
||||
|
@ -154,6 +154,7 @@ test.describe('Workspace tests', () => {
|
||||
|
||||
const leftSideMenuPage2 = new LeftSideMenuPage(page2)
|
||||
await leftSideMenuPage2.clickTracker()
|
||||
await page2.close()
|
||||
})
|
||||
|
||||
test('Create a workspace with join link - existing account', async ({ page, browser }) => {
|
||||
@ -198,6 +199,7 @@ test.describe('Workspace tests', () => {
|
||||
|
||||
const leftSideMenuPage2 = new LeftSideMenuPage(page2)
|
||||
await leftSideMenuPage2.clickTracker()
|
||||
await page2.close()
|
||||
})
|
||||
|
||||
test('Create workspace with LastToken in the localStorage', async ({ page, browser }) => {
|
||||
@ -225,6 +227,7 @@ test.describe('Workspace tests', () => {
|
||||
// Use the tracker in the second context
|
||||
const leftSideMenuPageSecond = new LeftSideMenuPage(pageSecond)
|
||||
await leftSideMenuPageSecond.clickTracker()
|
||||
await pageSecond.close()
|
||||
})
|
||||
})
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user