From b06f433babd8c5dc89aa4e1f63599d6b1b0ff2a3 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Tue, 17 Sep 2024 02:25:31 +0700 Subject: [PATCH] UBERF-8122: Fix backup service Signed-off-by: Andrey Sobolev --- .vscode/launch.json | 7 +- dev/tool/package.json | 3 +- dev/tool/src/clean.ts | 29 +++--- dev/tool/src/storage.ts | 125 ++++++++++++----------- packages/core/src/server.ts | 2 +- packages/storage/src/index.ts | 26 ++--- pods/server/Dockerfile | 2 - server/backup/src/backup.ts | 69 ++++++------- server/core/src/__tests__/memAdapters.ts | 5 +- server/core/src/adapter.ts | 2 +- server/core/src/mem.ts | 2 +- server/core/src/server/aggregator.ts | 62 +++++------ server/core/src/storage.ts | 14 +-- server/minio/src/index.ts | 14 +-- server/mongo/src/rawAdapter.ts | 12 ++- server/mongo/src/storage.ts | 82 ++++++++------- server/mongo/src/utils.ts | 21 +--- server/postgres/src/storage.ts | 79 +++++++------- server/s3/src/index.ts | 11 +- server/server-storage/src/blobStorage.ts | 4 +- 20 files changed, 287 insertions(+), 284 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 2e571562ed..ac8ccac484 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -221,7 +221,7 @@ "name": "Debug backup tool", "type": "node", "request": "launch", - "args": ["src/index.ts", "backup", "../../../dump/alex-staff-agency2", "alex-staff-agency"], + "args": ["src/index.ts", "backup", "../../../dump/platform2", "platform"], "env": { "MINIO_ACCESS_KEY": "minioadmin", "MINIO_SECRET_KEY": "minioadmin", @@ -234,7 +234,10 @@ "runtimeArgs": ["--nolazy", "-r", "ts-node/register"], "sourceMaps": true, "cwd": "${workspaceRoot}/dev/tool", - "protocol": "inspector" + "protocol": "inspector", + "outputCapture": "std", + "runtimeVersion": "20", + "showAsyncStacks": true, }, { "name": "Debug tool upgrade", diff --git a/dev/tool/package.json b/dev/tool/package.json index 476c331442..bd62efdc73 100644 --- a/dev/tool/package.json +++ b/dev/tool/package.json @@ -14,12 +14,13 @@ "_phase:bundle": "rushx bundle", "_phase:docker-build": "rushx docker:build", "_phase:docker-staging": "rushx docker:staging", - "bundle": "mkdir -p bundle && esbuild src/__start.ts --bundle --minify --platform=node --define:process.env.MODEL_VERSION=$(node ../../common/scripts/show_version.js) --define:process.env.GIT_REVISION=$(../../common/scripts/git_version.sh) > bundle/bundle.js", + "bundle": "mkdir -p bundle && esbuild src/__start.ts --bundle --keep-names --sourcemap=external --platform=node --define:process.env.MODEL_VERSION=$(node ../../common/scripts/show_version.js) --define:process.env.GIT_REVISION=$(../../common/scripts/git_version.sh) --log-level=error --outfile=bundle/bundle.js", "docker:build": "../../common/scripts/docker_build.sh hardcoreeng/tool", "docker:tbuild": "docker build -t hardcoreeng/tool . --platform=linux/amd64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/tool", "docker:staging": "../../common/scripts/docker_tag.sh hardcoreeng/tool staging", "docker:push": "../../common/scripts/docker_tag.sh hardcoreeng/tool", "run-local": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3333 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost MONGO_URL=mongodb://localhost:27017 TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) node --max-old-space-size=18000 ./bundle/bundle.js", + "run-local-brk": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3333 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost MONGO_URL=mongodb://localhost:27017 TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) node --inspect-brk --enable-source-maps --max-old-space-size=18000 ./bundle/bundle.js", "run": "rush bundle --to @hcengineering/tool >/dev/null && cross-env node --max-old-space-size=8000 ./bundle/bundle.js", "upgrade": "rushx run-local upgrade", "format": "format src", diff --git a/dev/tool/src/clean.ts b/dev/tool/src/clean.ts index 612dcb0a92..cd199eec1e 100644 --- a/dev/tool/src/clean.ts +++ b/dev/tool/src/clean.ts @@ -104,12 +104,15 @@ export async function cleanWorkspace ( const minioList = await storageAdapter.listStream(ctx, workspaceId) const toClean: string[] = [] while (true) { - const mv = await minioList.next() - if (mv === undefined) { + const mvFiles = await minioList.next() + if (mvFiles.length === 0) { break } - if (!files.has(mv._id)) { - toClean.push(mv._id) + + for (const mv of mvFiles) { + if (!files.has(mv._id)) { + toClean.push(mv._id) + } } } await storageAdapter.remove(ctx, workspaceId, toClean) @@ -192,16 +195,18 @@ export async function fixMinioBW ( const list = await storageService.listStream(ctx, workspaceId) let removed = 0 while (true) { - const obj = await list.next() - if (obj === undefined) { + const objs = await list.next() + if (objs.length === 0) { break } - if (obj.modifiedOn < from) continue - if ((obj._id as string).includes('%preview%')) { - await storageService.remove(ctx, workspaceId, [obj._id]) - removed++ - if (removed % 100 === 0) { - console.log('removed: ', removed) + for (const obj of objs) { + if (obj.modifiedOn < from) continue + if ((obj._id as string).includes('%preview%')) { + await storageService.remove(ctx, workspaceId, [obj._id]) + removed++ + if (removed % 100 === 0) { + console.log('removed: ', removed) + } } } } diff --git a/dev/tool/src/storage.ts b/dev/tool/src/storage.ts index 1dc0f55d96..e5a305a910 100644 --- a/dev/tool/src/storage.ts +++ b/dev/tool/src/storage.ts @@ -40,20 +40,22 @@ export async function syncFiles ( const iterator = await adapter.listStream(ctx, workspaceId) try { while (true) { - const data = await iterator.next() - if (data === undefined) break + const dataBulk = await iterator.next() + if (dataBulk.length === 0) break - const blob = await exAdapter.stat(ctx, workspaceId, data._id) - if (blob !== undefined) continue + for (const data of dataBulk) { + const blob = await exAdapter.stat(ctx, workspaceId, data._id) + if (blob !== undefined) continue - await exAdapter.syncBlobFromStorage(ctx, workspaceId, data._id, name) + await exAdapter.syncBlobFromStorage(ctx, workspaceId, data._id, name) - count += 1 - if (count % 100 === 0) { - const duration = Date.now() - time - time = Date.now() + count += 1 + if (count % 100 === 0) { + const duration = Date.now() - time + time = Date.now() - console.log('...processed', count, Math.round(duration / 1000) + 's') + console.log('...processed', count, Math.round(duration / 1000) + 's') + } } } console.log('processed', count) @@ -112,64 +114,67 @@ async function processAdapter ( const iterator = await source.listStream(ctx, workspaceId) try { while (true) { - const data = await iterator.next() - if (data === undefined) break + const dataBulk = await iterator.next() + if (dataBulk.length === 0) break - const blob = (await exAdapter.stat(ctx, workspaceId, data._id)) ?? (await source.stat(ctx, workspaceId, data._id)) + for (const data of dataBulk) { + const blob = + (await exAdapter.stat(ctx, workspaceId, data._id)) ?? (await source.stat(ctx, workspaceId, data._id)) - if (blob === undefined) { - console.error('blob not found', data._id) - continue - } - - if (blob.provider !== exAdapter.defaultAdapter) { - if (blob.size <= params.blobSizeLimitMb * 1024 * 1024) { - await rateLimiter.exec(async () => { - try { - await retryOnFailure( - ctx, - 5, - async () => { - await processFile(ctx, source, params.move ? exAdapter : target, workspaceId, blob) - }, - 50 - ) - movedCnt += 1 - movedBytes += blob.size - batchBytes += blob.size - } catch (err) { - console.error('failed to process blob', data._id, err) - } - }) - } else { - skippedCnt += 1 - console.log('skipping large blob', data._id, Math.round(blob.size / 1024 / 1024)) + if (blob === undefined) { + console.error('blob not found', data._id) + continue } - } - processedCnt += 1 - processedBytes += blob.size + if (blob.provider !== exAdapter.defaultAdapter) { + if (blob.size <= params.blobSizeLimitMb * 1024 * 1024) { + await rateLimiter.exec(async () => { + try { + await retryOnFailure( + ctx, + 5, + async () => { + await processFile(ctx, source, params.move ? exAdapter : target, workspaceId, blob) + }, + 50 + ) + movedCnt += 1 + movedBytes += blob.size + batchBytes += blob.size + } catch (err) { + console.error('failed to process blob', data._id, err) + } + }) + } else { + skippedCnt += 1 + console.log('skipping large blob', data._id, Math.round(blob.size / 1024 / 1024)) + } + } - if (processedCnt % 100 === 0) { - await rateLimiter.waitProcessing() + processedCnt += 1 + processedBytes += blob.size - const duration = Date.now() - time + if (processedCnt % 100 === 0) { + await rateLimiter.waitProcessing() - console.log( - '...processed', - processedCnt, - Math.round(processedBytes / 1024 / 1024) + 'MB', - 'moved', - movedCnt, - Math.round(movedBytes / 1024 / 1024) + 'MB', - '+' + Math.round(batchBytes / 1024 / 1024) + 'MB', - 'skipped', - skippedCnt, - Math.round(duration / 1000) + 's' - ) + const duration = Date.now() - time - batchBytes = 0 - time = Date.now() + console.log( + '...processed', + processedCnt, + Math.round(processedBytes / 1024 / 1024) + 'MB', + 'moved', + movedCnt, + Math.round(movedBytes / 1024 / 1024) + 'MB', + '+' + Math.round(batchBytes / 1024 / 1024) + 'MB', + 'skipped', + skippedCnt, + Math.round(duration / 1000) + 's' + ) + + batchBytes = 0 + time = Date.now() + } } } diff --git a/packages/core/src/server.ts b/packages/core/src/server.ts index b6c86d422f..8ae4596a6e 100644 --- a/packages/core/src/server.ts +++ b/packages/core/src/server.ts @@ -31,7 +31,7 @@ export interface DocInfo { * @public */ export interface StorageIterator { - next: (ctx: MeasureContext) => Promise + next: (ctx: MeasureContext) => Promise close: (ctx: MeasureContext) => Promise } diff --git a/packages/storage/src/index.ts b/packages/storage/src/index.ts index 95a0e97cc0..5a1a850d42 100644 --- a/packages/storage/src/index.ts +++ b/packages/storage/src/index.ts @@ -24,7 +24,7 @@ export interface UploadedObjectInfo { } export interface BlobStorageIterator { - next: () => Promise + next: () => Promise close: () => Promise } @@ -99,7 +99,7 @@ export class DummyStorageAdapter implements StorageAdapter, StorageAdapterEx { find (ctx: MeasureContext, workspaceId: WorkspaceId): StorageIterator { return { - next: async (ctx) => undefined, + next: async (ctx) => [], close: async (ctx) => {} } } @@ -120,8 +120,8 @@ export class DummyStorageAdapter implements StorageAdapter, StorageAdapterEx { async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise { return { - next: async (): Promise => { - return undefined + next: async (): Promise => { + return [] }, close: async () => {} } @@ -179,14 +179,16 @@ export async function removeAllObjects ( const iterator = await storage.listStream(ctx, workspaceId) let bulk: string[] = [] while (true) { - const obj = await iterator.next() - if (obj === undefined) { + const objs = await iterator.next() + if (objs.length === 0) { break } - bulk.push(obj.storageId) - if (bulk.length > 50) { - await storage.remove(ctx, workspaceId, bulk) - bulk = [] + for (const obj of objs) { + bulk.push(obj.storageId) + if (bulk.length > 50) { + await storage.remove(ctx, workspaceId, bulk) + bulk = [] + } } } if (bulk.length > 0) { @@ -206,10 +208,10 @@ export async function objectsToArray ( const bulk: ListBlobResult[] = [] while (true) { const obj = await iterator.next() - if (obj === undefined) { + if (obj.length === 0) { break } - bulk.push(obj) + bulk.push(...obj) } await iterator.close() return bulk diff --git a/pods/server/Dockerfile b/pods/server/Dockerfile index 555b5c694d..938ff0edc2 100644 --- a/pods/server/Dockerfile +++ b/pods/server/Dockerfile @@ -2,7 +2,6 @@ FROM node:20 WORKDIR /usr/src/app RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd snappy msgpackr msgpackr-extract --unsafe-perm -RUN npm install --ignore-scripts=false --verbose uNetworking/uWebSockets.js#v20.47.0 RUN apt-get update RUN apt-get install libjemalloc2 @@ -10,7 +9,6 @@ RUN apt-get install libjemalloc2 ENV LD_PRELOAD=libjemalloc.so.2 ENV MALLOC_CONF=dirty_decay_ms:1000,narenas:2,background_thread:true -RUN mv node_modules/uWebSockets.js/*.node . COPY bundle/bundle.js ./ COPY bundle/bundle.js.map ./ diff --git a/server/backup/src/backup.ts b/server/backup/src/backup.ts index d92f368002..f464372c88 100644 --- a/server/backup/src/backup.ts +++ b/server/backup/src/backup.ts @@ -41,7 +41,6 @@ import { BlobClient, createClient } from '@hcengineering/server-client' import { fullTextPushStagePrefix, type StorageAdapter } from '@hcengineering/server-core' import { generateToken } from '@hcengineering/server-token' import { connect } from '@hcengineering/server-tool' -import { mkdtemp, writeFile } from 'node:fs/promises' import { PassThrough } from 'node:stream' import { createGzip } from 'node:zlib' import { join } from 'path' @@ -488,6 +487,16 @@ async function cleanDomain (ctx: MeasureContext, connection: CoreClient & Backup } } +function doTrimHash (s: string | undefined): string { + if (s == null) { + return '' + } + if (s.startsWith('"') && s.endsWith('"')) { + return s.slice(1, s.length - 1) + } + return s +} + /** * @public */ @@ -526,11 +535,15 @@ export async function backup ( let canceled = false let timer: any + let ops = 0 if (options.timeout > 0) { - timer = setTimeout(() => { - ctx.error('Timeout during backup', { workspace: workspaceId.name, timeout: options.timeout / 1000 }) - canceled = true + timer = setInterval(() => { + if (ops === 0) { + ctx.error('Timeout during backup', { workspace: workspaceId.name, timeout: options.timeout / 1000 }) + ops = 0 + canceled = true + } }, options.timeout) } @@ -545,8 +558,6 @@ export async function backup ( const blobClient = new BlobClient(transactorUrl, token, workspaceId, { storageAdapter: options.storageAdapter }) ctx.info('starting backup', { workspace: workspaceId.name }) - let tmpDir: string | undefined - try { const domains = [ ...connection @@ -613,6 +624,7 @@ export async function backup ( if (size == null || Number.isNaN(size)) { return } + ops++ downloaded += size const newDownloadedMb = Math.round(downloaded / (1024 * 1024)) const newId = Math.round(newDownloadedMb / 10) @@ -641,6 +653,7 @@ export async function backup ( try { const currentChunk = await ctx.with('loadChunk', {}, () => connection.loadChunk(domain, idx, options.recheck)) idx = currentChunk.idx + ops++ let needRetrieve: Ref[] = [] let currentNeedRetrieveSize = 0 @@ -656,17 +669,18 @@ export async function backup ( }) st = Date.now() } - const kHash = digest.get(id as Ref) + const _hash = doTrimHash(hash) + const kHash = doTrimHash(digest.get(id as Ref)) if (kHash !== undefined) { digest.delete(id as Ref) - if (kHash !== hash) { - changes.updated.set(id as Ref, hash) + if (kHash !== _hash) { + changes.updated.set(id as Ref, _hash) needRetrieve.push(id as Ref) currentNeedRetrieveSize += size changed++ } } else { - changes.added.set(id as Ref, hash) + changes.added.set(id as Ref, _hash) needRetrieve.push(id as Ref) changed++ currentNeedRetrieveSize += size @@ -728,19 +742,13 @@ export async function backup ( } // Cumulative digest - const digest = await ctx.with( - 'load-digest', - {}, - async (ctx) => await loadDigest(ctx, storage, backupInfo.snapshots, domain) - ) + const digest = await ctx.with('load-digest', {}, (ctx) => loadDigest(ctx, storage, backupInfo.snapshots, domain)) let _pack: Pack | undefined let addedDocuments = 0 - let { changed, needRetrieveChunks } = await ctx.with( - 'load-chunks', - { domain }, - async (ctx) => await loadChangesFromServer(ctx, domain, digest, changes) + let { changed, needRetrieveChunks } = await ctx.with('load-chunks', { domain }, (ctx) => + loadChangesFromServer(ctx, domain, digest, changes) ) if (needRetrieveChunks.length > 0) { @@ -761,6 +769,7 @@ export async function backup ( let docs: Doc[] = [] try { docs = await ctx.with('load-docs', {}, async (ctx) => await connection.loadDocs(domain, needRetrieve)) + ops++ } catch (err: any) { ctx.error('error loading docs', { domain, err, workspace: workspaceId.name }) // Put back. @@ -876,16 +885,12 @@ export async function backup ( const finalBuffer = Buffer.concat(buffers) if (finalBuffer.length !== blob.size) { - tmpDir = tmpDir ?? (await mkdtemp('backup', {})) - const tmpFile = join(tmpDir, blob._id) - await writeFile(tmpFile, finalBuffer) - await writeFile(tmpFile + '.json', JSON.stringify(blob, undefined, 2)) ctx.error('download blob size mismatch', { _id: blob._id, contentType: blob.contentType, size: blob.size, - provider: blob.provider, - tempDir: tmpDir + bufferSize: finalBuffer.length, + provider: blob.provider }) } _pack.entry({ name: d._id + '.json' }, descrJson, (err) => { @@ -975,7 +980,7 @@ export async function backup ( } ctx.end() if (options.timeout !== -1) { - clearTimeout(timer) + clearInterval(timer) } } } @@ -1200,22 +1205,12 @@ export async function restore ( workspace: workspaceId.name }) - const doTrim = (s: string | undefined): string | undefined => { - if (s == null) { - return s - } - if (s.startsWith('"') && s.endsWith('"')) { - return s.slice(1, s.length - 1) - } - return s - } - // Let's find difference const docsToAdd = new Map( Array.from(changeset.entries()).filter( ([it]) => !serverChangeset.has(it) || - (serverChangeset.has(it) && doTrim(serverChangeset.get(it)) !== doTrim(changeset.get(it))) + (serverChangeset.has(it) && doTrimHash(serverChangeset.get(it)) !== doTrimHash(changeset.get(it))) ) ) const docsToRemove = Array.from(serverChangeset.keys()).filter((it) => !changeset.has(it)) diff --git a/server/core/src/__tests__/memAdapters.ts b/server/core/src/__tests__/memAdapters.ts index a3109c6a59..02060086e2 100644 --- a/server/core/src/__tests__/memAdapters.ts +++ b/server/core/src/__tests__/memAdapters.ts @@ -56,7 +56,7 @@ export class MemStorageAdapter implements StorageAdapter { const files = Array.from(this.files.values()).filter((it) => it.workspace === workspaceId.name) return { next: async () => { - return files.shift() + return files.splice(0, 100) }, close: async () => {} } @@ -189,8 +189,7 @@ export class MemRawDBAdapter implements RawDBAdapter { } return { next: async () => { - const doc = result.shift() - return doc + return result.splice(0, 50) }, close: async () => {} } diff --git a/server/core/src/adapter.ts b/server/core/src/adapter.ts index 2c14a685dd..5d25a954b6 100644 --- a/server/core/src/adapter.ts +++ b/server/core/src/adapter.ts @@ -57,7 +57,7 @@ export interface DomainHelper { } export interface RawDBAdapterStream { - next: () => Promise + next: () => Promise close: () => Promise } diff --git a/server/core/src/mem.ts b/server/core/src/mem.ts index 60fa7c117a..8b615044d6 100644 --- a/server/core/src/mem.ts +++ b/server/core/src/mem.ts @@ -86,7 +86,7 @@ export class DummyDbAdapter implements DbAdapter { find (ctx: MeasureContext, domain: Domain): StorageIterator { return { - next: async () => undefined, + next: async () => [], close: async () => {} } } diff --git a/server/core/src/server/aggregator.ts b/server/core/src/server/aggregator.ts index d000c7a2b0..e176aeb6b0 100644 --- a/server/core/src/server/aggregator.ts +++ b/server/core/src/server/aggregator.ts @@ -99,31 +99,20 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE find (ctx: MeasureContext, workspaceId: WorkspaceId): StorageIterator { const storageIterator = this.makeStorageIterator(ctx, workspaceId) - let buffer: ListBlobResult[] = [] - return { - next: async (ctx) => { - const docInfo = await storageIterator.next() - if (docInfo !== undefined) { - buffer.push(docInfo) + next: async () => { + const docInfos = await storageIterator.next() + if (docInfos.length > 0) { + await this.doSyncDocs(ctx, workspaceId, docInfos) } - if (buffer.length > 50) { - await this.doSyncDocs(ctx, workspaceId, buffer) - buffer = [] - } - if (docInfo !== undefined) { - return { - hash: docInfo.etag, - id: docInfo._id, - size: docInfo.size - } - } + return docInfos.map((it) => ({ + hash: it.etag, + id: it._id, + size: it.size + })) }, close: async (ctx) => { - if (buffer.length > 0) { - await this.doSyncDocs(ctx, workspaceId, buffer) - } await storageIterator.close() } } @@ -134,22 +123,21 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE let iterator: BlobStorageIterator | undefined return { next: async () => { - while (true) { - if (iterator === undefined && adapters.length > 0) { - iterator = await (adapters.shift() as StorageAdapter).listStream(ctx, workspaceId) - } - if (iterator === undefined) { - return undefined - } - const docInfo = await iterator.next() - if (docInfo !== undefined) { - // We need to check if our stored version is fine - return docInfo - } else { - // We need to take next adapter - await iterator.close() - iterator = undefined - } + if (iterator === undefined && adapters.length > 0) { + iterator = await (adapters.shift() as StorageAdapter).listStream(ctx, workspaceId) + } + if (iterator === undefined) { + return [] + } + const docInfos = await iterator.next() + if (docInfos.length > 0) { + // We need to check if our stored version is fine + return docInfos + } else { + // We need to take next adapter + await iterator.close() + iterator = undefined + return [] } }, close: async () => { @@ -227,7 +215,7 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise { const data = await this.dbAdapter.findStream(ctx, workspaceId, DOMAIN_BLOB, {}) return { - next: async (): Promise => { + next: async (): Promise => { return await data.next() }, close: async () => { diff --git a/server/core/src/storage.ts b/server/core/src/storage.ts index e252bd4bd9..8d436aa733 100644 --- a/server/core/src/storage.ts +++ b/server/core/src/storage.ts @@ -9,6 +9,7 @@ import { type StorageIterator, type WorkspaceId } from '@hcengineering/core' +import { estimateDocSize } from './utils' export * from '@hcengineering/storage' @@ -19,7 +20,7 @@ export function getBucketId (workspaceId: WorkspaceId): string { return toWorkspaceString(workspaceId) } -const chunkSize = 2 * 1024 * 1024 +const chunkSize = 512 * 1024 /** * @public @@ -70,14 +71,15 @@ export class BackupClientOps { const docs: DocInfo[] = [] while (size < chunkSize) { - const doc = await chunk.iterator.next(ctx) - if (doc === undefined) { + const _docs = await chunk.iterator.next(ctx) + if (_docs.length === 0) { chunk.finished = true break } - - size += doc.size - docs.push(doc) + for (const doc of _docs) { + size += estimateDocSize(doc) + docs.push(doc) + } } return { diff --git a/server/minio/src/index.ts b/server/minio/src/index.ts index f0987da53f..1a56fba05c 100644 --- a/server/minio/src/index.ts +++ b/server/minio/src/index.ts @@ -192,7 +192,7 @@ export class MinioService implements StorageAdapter { const rootPrefix = this.rootPrefix(workspaceId) return { - next: async (): Promise => { + next: async (): Promise => { try { if (stream === undefined && !done) { const rprefix = rootPrefix ?? '' @@ -227,7 +227,7 @@ export class MinioService implements StorageAdapter { }) } onNext() - if (buffer.length > 5) { + if (buffer.length > 100) { stream?.pause() } }) @@ -236,24 +236,24 @@ export class MinioService implements StorageAdapter { const msg = (err?.message as string) ?? '' if (msg.includes('Invalid bucket name') || msg.includes('The specified bucket does not exist')) { hasMore = false - return + return [] } error = err } if (buffer.length > 0) { - return buffer.shift() + return buffer.splice(0, 50) } if (!hasMore) { - return undefined + return [] } - return await new Promise((resolve, reject) => { + return await new Promise((resolve, reject) => { onNext = () => { if (error != null) { reject(error) } onNext = () => {} - resolve(buffer.shift()) + resolve(buffer.splice(0, 50)) } stream?.resume() }) diff --git a/server/mongo/src/rawAdapter.ts b/server/mongo/src/rawAdapter.ts index e1b0a1135d..f677e4eda5 100644 --- a/server/mongo/src/rawAdapter.ts +++ b/server/mongo/src/rawAdapter.ts @@ -105,7 +105,17 @@ export function createRawMongoDBAdapter (url: string): RawDBAdapter { const { cursor } = await getCursor(workspace, domain, query, options) return { - next: async () => (await cursor.next()) ?? undefined, + next: async () => { + const result: T[] = [] + const doc = await cursor.next() + if (doc != null) { + result.push(doc) + } + if (cursor.bufferedCount() > 0) { + result.push(...cursor.readBufferedDocuments()) + } + return result + }, close: async () => { await cursor.close() } diff --git a/server/mongo/src/storage.ts b/server/mongo/src/storage.ts index 6e5a2d733c..48b982d49b 100644 --- a/server/mongo/src/storage.ts +++ b/server/mongo/src/storage.ts @@ -16,7 +16,6 @@ import core, { DOMAIN_MODEL, DOMAIN_TX, - type Iterator, SortingOrder, TxProcessor, addOperation, @@ -30,6 +29,7 @@ import core, { type AttachedDoc, type Class, type Doc, + type DocInfo, type DocumentQuery, type DocumentUpdate, type Domain, @@ -38,6 +38,7 @@ import core, { type FindOptions, type FindResult, type Hierarchy, + type Iterator, type Lookup, type MeasureContext, type Mixin, @@ -135,7 +136,7 @@ export async function toArray (cursor: AbstractCursor): Promise { } export interface DbAdapterOptions { - calculateHash?: (doc: Doc) => string + calculateHash?: (doc: Doc) => { digest: string, size: number } } abstract class MongoAdapterBase implements DbAdapter { @@ -1034,44 +1035,17 @@ abstract class MongoAdapterBase implements DbAdapter { iterator = coll.find({ '%hash%': { $in: ['', null] } }) d = await ctx.with('next', { mode }, async () => await iterator.next()) } - if (d == null) { - return undefined + const result: DocInfo[] = [] + if (d != null) { + result.push(this.toDocInfo(d, bulkUpdate)) } - let digest: string | null = (d as any)['%hash%'] - if ('%hash%' in d) { - delete d['%hash%'] - } - const pos = (digest ?? '').indexOf('|') - if (digest == null || digest === '') { - const cs = ctx.newChild('calc-size', {}) - const size = estimateDocSize(d) - cs.end() - - if (this.options?.calculateHash !== undefined) { - digest = this.options.calculateHash(d) - } else { - const hash = createHash('sha256') - updateHashForDoc(hash, d) - digest = hash.digest('base64') - } - - bulkUpdate.set(d._id, `${digest}|${size.toString(16)}`) - - await ctx.with('flush', {}, async () => { - await flush() - }) - return { - id: d._id, - hash: digest, - size - } - } else { - return { - id: d._id, - hash: digest.slice(0, pos), - size: parseInt(digest.slice(pos + 1), 16) - } + if (iterator.bufferedCount() > 0) { + result.push(...iterator.readBufferedDocuments().map((it) => this.toDocInfo(it, bulkUpdate))) } + await ctx.with('flush', {}, async () => { + await flush() + }) + return result }, close: async () => { await ctx.with('flush', {}, async () => { @@ -1085,6 +1059,38 @@ abstract class MongoAdapterBase implements DbAdapter { } } + private toDocInfo (d: Doc, bulkUpdate: Map, string>): DocInfo { + let digest: string | null = (d as any)['%hash%'] + if ('%hash%' in d) { + delete d['%hash%'] + } + const pos = (digest ?? '').indexOf('|') + if (digest == null || digest === '') { + let size = estimateDocSize(d) + + if (this.options?.calculateHash !== undefined) { + ;({ digest, size } = this.options.calculateHash(d)) + } else { + const hash = createHash('sha256') + updateHashForDoc(hash, d) + digest = hash.digest('base64') + } + + bulkUpdate.set(d._id, `${digest}|${size.toString(16)}`) + return { + id: d._id, + hash: digest, + size + } + } else { + return { + id: d._id, + hash: digest.slice(0, pos), + size: parseInt(digest.slice(pos + 1), 16) + } + } + } + async load (ctx: MeasureContext, domain: Domain, docs: Ref[]): Promise { return await ctx.with('load', { domain }, async () => { if (docs.length === 0) { diff --git a/server/mongo/src/utils.ts b/server/mongo/src/utils.ts index 8f555a3ff8..616e8dab30 100644 --- a/server/mongo/src/utils.ts +++ b/server/mongo/src/utils.ts @@ -23,7 +23,7 @@ import { } from '@hcengineering/core' import { PlatformError, unknownStatus } from '@hcengineering/platform' import { type DomainHelperOperations } from '@hcengineering/server-core' -import { MongoClient, type Collection, type Db, type Document, type MongoClientOptions } from 'mongodb' +import { MongoClient, type Collection, type Db, type Document } from 'mongodb' const connections = new Map() @@ -121,31 +121,20 @@ export class ClientRef implements MongoClientReference { * Initialize a workspace connection to DB * @public */ -export function getMongoClient (uri: string, options?: MongoClientOptions): MongoClientReference { +export function getMongoClient (uri: string): MongoClientReference { const extraOptions = JSON.parse(process.env.MONGO_OPTIONS ?? '{}') - const key = `${uri}${process.env.MONGO_OPTIONS ?? '{}'}_${JSON.stringify(options ?? {})}` + const key = `${uri}${process.env.MONGO_OPTIONS ?? '{}'}` let existing = connections.get(key) - const allOptions: MongoClientOptions = { - ...options, - ...extraOptions - } - - // Make poll size stable - if (allOptions.maxPoolSize !== undefined) { - allOptions.minPoolSize = allOptions.maxPoolSize - } - allOptions.monitorCommands = false - allOptions.noDelay = true - // If not created or closed if (existing === undefined) { existing = new MongoClientReferenceImpl( MongoClient.connect(uri, { + retryReads: true, appName: 'transactor', enableUtf8Validation: false, - ...allOptions + ...extraOptions }), () => { connections.delete(key) diff --git a/server/postgres/src/storage.ts b/server/postgres/src/storage.ts index 5cb5eeceb1..7f50259e79 100644 --- a/server/postgres/src/storage.ts +++ b/server/postgres/src/storage.ts @@ -18,6 +18,7 @@ import core, { type AttachedDoc, type Class, type Doc, + type DocInfo, type DocumentQuery, type DocumentUpdate, type Domain, @@ -941,12 +942,12 @@ abstract class PostgresAdapterBase implements DbAdapter { ) } - const next = async (): Promise => { - const result = await client.query(`FETCH 1 FROM ${cursorName}`) + const next = async (limit: number): Promise => { + const result = await client.query(`FETCH ${limit} FROM ${cursorName}`) if (result.rows.length === 0) { - return null + return [] } - return result.rows[0] !== undefined ? parseDoc(result.rows[0]) : null + return result.rows.filter((it) => it != null).map((it) => parseDoc(it)) } const flush = async (flush = false): Promise => { @@ -975,47 +976,51 @@ abstract class PostgresAdapterBase implements DbAdapter { await init('_id, data', "data ->> '%hash%' IS NOT NULL AND data ->> '%hash%' <> ''") initialized = true } - let d = await ctx.with('next', { mode }, async () => await next()) - if (d == null && mode === 'hashed') { + let docs = await ctx.with('next', { mode }, async () => await next(50)) + if (docs.length === 0 && mode === 'hashed') { await close(cursorName) mode = 'non_hashed' await init('*', "data ->> '%hash%' IS NULL OR data ->> '%hash%' = ''") - d = await ctx.with('next', { mode }, async () => await next()) + docs = await ctx.with('next', { mode }, async () => await next(50)) } - if (d == null) { - return undefined + if (docs.length === 0) { + return [] } - let digest: string | null = (d as any)['%hash%'] - if ('%hash%' in d) { - delete d['%hash%'] - } - const pos = (digest ?? '').indexOf('|') - if (digest == null || digest === '') { - const cs = ctx.newChild('calc-size', {}) - const size = estimateDocSize(d) - cs.end() - - const hash = createHash('sha256') - updateHashForDoc(hash, d) - digest = hash.digest('base64') - - bulkUpdate.set(d._id, `${digest}|${size.toString(16)}`) - - await ctx.with('flush', {}, async () => { - await flush() - }) - return { - id: d._id, - hash: digest, - size + const result: DocInfo[] = [] + for (const d of docs) { + let digest: string | null = (d as any)['%hash%'] + if ('%hash%' in d) { + delete d['%hash%'] } - } else { - return { - id: d._id, - hash: digest.slice(0, pos), - size: parseInt(digest.slice(pos + 1), 16) + const pos = (digest ?? '').indexOf('|') + if (digest == null || digest === '') { + const cs = ctx.newChild('calc-size', {}) + const size = estimateDocSize(d) + cs.end() + + const hash = createHash('sha256') + updateHashForDoc(hash, d) + digest = hash.digest('base64') + + bulkUpdate.set(d._id, `${digest}|${size.toString(16)}`) + + await ctx.with('flush', {}, async () => { + await flush() + }) + result.push({ + id: d._id, + hash: digest, + size + }) + } else { + result.push({ + id: d._id, + hash: digest.slice(0, pos), + size: parseInt(digest.slice(pos + 1), 16) + }) } } + return result }, close: async () => { await ctx.with('flush', {}, async () => { diff --git a/server/s3/src/index.ts b/server/s3/src/index.ts index cc2c435481..1314ebd98e 100644 --- a/server/s3/src/index.ts +++ b/server/s3/src/index.ts @@ -239,9 +239,9 @@ export class S3Service implements StorageAdapter { const rootPrefix = this.rootPrefix(workspaceId) return { - next: async (): Promise => { + next: async (): Promise => { try { - if (hasMore && buffer.length === 0) { + while (hasMore && buffer.length < 50) { const res = await this.client.listObjectsV2({ Bucket: this.getBucketId(workspaceId), Prefix: rootPrefix ?? '', @@ -271,12 +271,7 @@ export class S3Service implements StorageAdapter { } catch (err: any) { ctx.error('Failed to get list', { error: err, workspaceId: workspaceId.name }) } - if (buffer.length > 0) { - return buffer.shift() - } - if (!hasMore) { - return undefined - } + return buffer.splice(0, 50) }, close: async () => {} } diff --git a/server/server-storage/src/blobStorage.ts b/server/server-storage/src/blobStorage.ts index 5f9202d98e..a44d147ad0 100644 --- a/server/server-storage/src/blobStorage.ts +++ b/server/server-storage/src/blobStorage.ts @@ -170,9 +170,9 @@ export async function createStorageDataAdapter ( calculateHash: (d) => { const blob = d as Blob if (storageEx?.adapters !== undefined && storageEx.adapters.get(blob.provider) === undefined) { - return blob.etag + '_' + storageEx.defaultAdapter // Replace tag to be able to move to new provider + return { digest: blob.etag + '_' + storageEx.defaultAdapter, size: blob.size } } - return blob.etag + return { digest: blob.etag, size: blob.size } } }) return new StorageBlobAdapter(workspaceId, storage, ctx, blobAdapter)