From 987b8a8068507c90d84ef61d3d1161ff1de135bb Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Thu, 11 Jul 2024 20:45:08 +0700 Subject: [PATCH] UBERF-7519: Rework backup service (#6050) Signed-off-by: Andrey Sobolev --- packages/core/src/client.ts | 8 +- packages/core/src/measurements/context.ts | 6 +- pods/backup/package.json | 2 + pods/backup/src/index.ts | 33 ++++- server/backup-service/src/config.ts | 25 +++- server/backup-service/src/index.ts | 19 ++- server/backup/src/backup.ts | 47 ++++--- server/backup/src/service.ts | 141 ++++++++++++++++--- server/core/src/pipeline.ts | 6 +- server/core/src/server/aggregator.ts | 4 +- server/core/src/server/index.ts | 6 +- server/core/src/storage.ts | 105 +++++++++++++- server/core/src/types.ts | 13 ++ server/server-pipeline/src/pipeline.ts | 19 +-- server/server-storage/src/blobStorage.ts | 5 +- server/server-storage/src/starter.ts | 41 +++--- server/server/src/backup.ts | 85 +++--------- server/tool/src/connect.ts | 159 +++++++++++++--------- server/ws/src/server.ts | 3 +- server/ws/src/server_http.ts | 10 +- server/ws/src/server_u.ts | 10 +- server/ws/src/types.ts | 18 +-- 22 files changed, 502 insertions(+), 263 deletions(-) diff --git a/packages/core/src/client.ts b/packages/core/src/client.ts index b2615e7cd6..fef9dbc5a3 100644 --- a/packages/core/src/client.ts +++ b/packages/core/src/client.ts @@ -167,7 +167,7 @@ class ClientImpl implements AccountClient, BackupClient { } } } catch (err) { - console.error('failed to apply model transaction, skipping', t) + // console.error('failed to apply model transaction, skipping', t) continue } } @@ -471,7 +471,11 @@ async function buildModel ( try { hierarchy.tx(tx) } catch (err: any) { - console.error('failed to apply model transaction, skipping', tx._id, tx._class, err?.message) + ctx.warn('failed to apply model transaction, skipping', { + _id: tx._id, + _class: tx._class, + message: err?.message + }) } } }) diff --git a/packages/core/src/measurements/context.ts b/packages/core/src/measurements/context.ts index 77cbe98b65..7ce37d48de 100644 --- a/packages/core/src/measurements/context.ts +++ b/packages/core/src/measurements/context.ts @@ -92,11 +92,9 @@ export class MeasureMetricsContext implements MeasureContext { if (value instanceof Promise) { value = await value } - c.end() return value - } catch (err: any) { - c.error('Error during:' + name, { err, ...(this.logParams ?? {}) }) - throw err + } finally { + c.end() } } diff --git a/pods/backup/package.json b/pods/backup/package.json index f22b253097..bfd166bf5f 100644 --- a/pods/backup/package.json +++ b/pods/backup/package.json @@ -49,6 +49,8 @@ "@hcengineering/platform": "^0.6.11", "@hcengineering/server-tool": "^0.6.0", "@hcengineering/server-token": "^0.6.11", + "@hcengineering/server-core": "^0.6.1", + "@hcengineering/server-pipeline": "^0.6.0", "@hcengineering/client": "^0.6.18", "@hcengineering/client-resources": "^0.6.27", "@hcengineering/core": "^0.6.32", diff --git a/pods/backup/src/index.ts b/pods/backup/src/index.ts index 6750032d63..a52fe1ad28 100644 --- a/pods/backup/src/index.ts +++ b/pods/backup/src/index.ts @@ -13,8 +13,37 @@ // limitations under the License. // -import { MeasureMetricsContext } from '@hcengineering/core' import { startBackup } from '@hcengineering/backup-service' +import { MeasureMetricsContext } from '@hcengineering/core' +import { DummyDbAdapter, DummyFullTextAdapter, type PipelineFactory } from '@hcengineering/server-core' +import { createServerPipeline } from '@hcengineering/server-pipeline' const ctx = new MeasureMetricsContext('backup-service', {}) -startBackup(ctx) +startBackup(ctx, (mongoUrl, storageAdapter) => { + const factory: PipelineFactory = createServerPipeline( + ctx, + mongoUrl, + { + externalStorage: storageAdapter, + fullTextUrl: '', + indexParallel: 0, + indexProcessing: 0, + rekoniUrl: '', + usePassedCtx: true + }, + { + adapters: { + FullTextBlob: { + factory: async () => new DummyDbAdapter(), + url: '' + } + }, + fulltextAdapter: { + factory: async () => new DummyFullTextAdapter(), + stages: () => [], + url: '' + } + } + ) + return factory +}) diff --git a/server/backup-service/src/config.ts b/server/backup-service/src/config.ts index 81b1955083..192bbbde4c 100644 --- a/server/backup-service/src/config.ts +++ b/server/backup-service/src/config.ts @@ -24,6 +24,10 @@ interface Config extends Omit { Interval: number // Timeout in seconds Timeout: number // Timeout in seconds BucketName: string + Storage: string // A bucket storage config + WorkspaceStorage: string // A workspace storage config + + SkipWorkspaces: string MongoURL: string } @@ -36,10 +40,22 @@ const envMap: { [key in keyof Config]: string } = { BucketName: 'BUCKET_NAME', Interval: 'INTERVAL', Timeout: 'TIMEOUT', - MongoURL: 'MONGO_URL' + MongoURL: 'MONGO_URL', + SkipWorkspaces: 'SKIP_WORKSPACES', + Storage: 'STORAGE', + WorkspaceStorage: 'WORKSPACE_STORAGE' } -const required: Array = ['TransactorURL', 'AccountsURL', 'Secret', 'ServiceID', 'BucketName', 'MongoURL'] +const required: Array = [ + 'TransactorURL', + 'AccountsURL', + 'Secret', + 'ServiceID', + 'BucketName', + 'MongoURL', + 'Storage', + 'WorkspaceStorage' +] const config: Config = (() => { const params: Partial = { @@ -50,7 +66,10 @@ const config: Config = (() => { ServiceID: process.env[envMap.ServiceID] ?? 'backup-service', Interval: parseInt(process.env[envMap.Interval] ?? '3600'), Timeout: parseInt(process.env[envMap.Timeout] ?? '3600'), - MongoURL: process.env[envMap.MongoURL] + MongoURL: process.env[envMap.MongoURL], + SkipWorkspaces: process.env[envMap.SkipWorkspaces] ?? '', + WorkspaceStorage: process.env[envMap.WorkspaceStorage], + Storage: process.env[envMap.Storage] } const missingEnv = required.filter((key) => params[key] === undefined).map((key) => envMap[key]) diff --git a/server/backup-service/src/index.ts b/server/backup-service/src/index.ts index 7bf123468e..8ea3d7ecdb 100644 --- a/server/backup-service/src/index.ts +++ b/server/backup-service/src/index.ts @@ -16,23 +16,32 @@ import { MeasureContext, systemAccountEmail } from '@hcengineering/core' import { setMetadata } from '@hcengineering/platform' import { backupService } from '@hcengineering/server-backup' -import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage' +import { type PipelineFactory, type StorageAdapter } from '@hcengineering/server-core' +import { buildStorageFromConfig, storageConfigFromEnv, createStorageFromConfig } from '@hcengineering/server-storage' import serverToken, { generateToken } from '@hcengineering/server-token' import toolPlugin from '@hcengineering/server-tool' import config from './config' -export function startBackup (ctx: MeasureContext): void { +export function startBackup ( + ctx: MeasureContext, + pipelineFactoryFactory: (mongoUrl: string, storage: StorageAdapter) => PipelineFactory +): void { setMetadata(serverToken.metadata.Secret, config.Secret) - const storageConfiguration = storageConfigFromEnv() - const storageAdapter = buildStorageFromConfig(storageConfiguration, config.MongoURL) + const backupStorageConfig = storageConfigFromEnv(config.Storage) + const workspaceStorageConfig = storageConfigFromEnv(config.WorkspaceStorage) + + const storageAdapter = createStorageFromConfig(backupStorageConfig.storages[0]) + const workspaceStorageAdapter = buildStorageFromConfig(workspaceStorageConfig, config.MongoURL) setMetadata(toolPlugin.metadata.UserAgent, config.ServiceID) + const pipelineFactory = pipelineFactoryFactory(config.MongoURL, workspaceStorageAdapter) + // A token to access account service const token = generateToken(systemAccountEmail, { name: 'backup', productId: '' }) - const shutdown = backupService(ctx, storageAdapter, { ...config, Token: token }) + const shutdown = backupService(ctx, storageAdapter, { ...config, Token: token }, pipelineFactory) process.on('SIGINT', shutdown) process.on('SIGTERM', shutdown) diff --git a/server/backup/src/backup.ts b/server/backup/src/backup.ts index dfd6498dff..c451fe9b24 100644 --- a/server/backup/src/backup.ts +++ b/server/backup/src/backup.ts @@ -503,6 +503,8 @@ export async function backup ( connectTimeout: number skipBlobContentTypes: string[] blobDownloadLimit: number + connection?: CoreClient & BackupClient + storageAdapter?: StorageAdapter } = { force: false, recheck: false, @@ -529,18 +531,20 @@ export async function backup ( canceled = true }, options.timeout) } - const connection = (await connect( - transactorUrl, - workspaceId, - undefined, - { - mode: 'backup' - }, - undefined, - options.connectTimeout - )) as unknown as CoreClient & BackupClient + const connection = + options.connection ?? + ((await connect( + transactorUrl, + workspaceId, + undefined, + { + mode: 'backup' + }, + undefined, + options.connectTimeout + )) as unknown as CoreClient & BackupClient) - const blobClient = new BlobClient(transactorUrl, workspaceId) + const blobClient = new BlobClient(transactorUrl, workspaceId, { storageAdapter: options.storageAdapter }) ctx.info('starting backup', { workspace: workspaceId.name }) let tmpDir: string | undefined @@ -897,14 +901,15 @@ export async function backup ( ctx.error('error packing file', { err }) } }) - // if (blob.size > 1024 * 1024) { - ctx.info('download blob', { - _id: blob._id, - contentType: blob.contentType, - size: blob.size, - provider: blob.provider, - pending: docs.length - }) + if (blob.size > 1024 * 1024) { + ctx.info('download blob', { + _id: blob._id, + contentType: blob.contentType, + size: blob.size, + provider: blob.provider, + pending: docs.length + }) + } printDownloaded(blob._id, blob.size) } catch (err: any) { @@ -970,7 +975,9 @@ export async function backup ( ctx.error('backup error', { err, workspace: workspaceId.name }) } finally { ctx.info('end backup', { workspace: workspaceId.name }) - await connection.close() + if (options.connection === undefined) { + await connection.close() + } ctx.end() if (options.timeout !== -1) { clearTimeout(timer) diff --git a/server/backup/src/service.ts b/server/backup/src/service.ts index c8fd78a4bd..d0113e19c3 100644 --- a/server/backup/src/service.ts +++ b/server/backup/src/service.ts @@ -13,8 +13,22 @@ // limitations under the License. // -import { BaseWorkspaceInfo, getWorkspaceId, type MeasureContext } from '@hcengineering/core' -import { type StorageAdapter } from '@hcengineering/server-core' +import { + BaseWorkspaceInfo, + getWorkspaceId, + systemAccountEmail, + type BackupClient, + type Client, + type MeasureContext, + type WorkspaceIdWithUrl +} from '@hcengineering/core' +import { + BackupClientOps, + SessionContextImpl, + type Pipeline, + type PipelineFactory, + type StorageAdapter +} from '@hcengineering/server-core' import { backup } from '.' import { createStorageBackupStorage } from './storage' @@ -43,12 +57,14 @@ export interface BackupConfig { Interval: number // Timeout in seconds Timeout: number // Timeout in seconds BucketName: string + SkipWorkspaces: string } class BackupWorker { constructor ( readonly storageAdapter: StorageAdapter, - readonly config: BackupConfig + readonly config: BackupConfig, + readonly pipelineFactory: PipelineFactory ) {} canceled = false @@ -59,29 +75,38 @@ class BackupWorker { clearTimeout(this.interval) } + backupPromise: Promise | undefined + + async triggerBackup (ctx: MeasureContext): Promise { + const failed = await this.backup(ctx) + if (failed.length > 0) { + ctx.info('Failed to backup workspaces, Retry failed workspaces once.', { failed: failed.length }) + await this.doBackup(ctx, failed) + } + } + async schedule (ctx: MeasureContext): Promise { console.log('schedule timeout for', this.config.Interval, ' seconds') this.interval = setTimeout(() => { - void this.backup(ctx).then((failed) => { - if (failed.length > 0) { - ctx.info('Failed to backup workspaces, Retry failed workspaces once.', { failed: failed.length }) - void this.doBackup(ctx, failed).then(() => { - void this.schedule(ctx) - }) - } else { - void this.schedule(ctx) - } - }) + if (this.backupPromise !== undefined) { + void this.backupPromise.then(() => { + void this.triggerBackup(ctx) + }) + } + void this.triggerBackup(ctx) }, this.config.Interval * 1000) } async backup (ctx: MeasureContext): Promise { - const workspaces = await getWorkspaces(this.config.AccountsURL, this.config.Token) + const workspacesIgnore = new Set(this.config.SkipWorkspaces.split(';')) + const workspaces = (await getWorkspaces(this.config.AccountsURL, this.config.Token)).filter((it) => { + return !workspacesIgnore.has(it.workspace) + }) workspaces.sort((a, b) => b.lastVisit - a.lastVisit) return await this.doBackup(ctx, workspaces) } - async doBackup (ctx: MeasureContext, workspaces: BaseWorkspaceInfo[]): Promise { + async doBackup (rootCtx: MeasureContext, workspaces: BaseWorkspaceInfo[]): Promise { let index = 0 const failedWorkspaces: BaseWorkspaceInfo[] = [] @@ -90,12 +115,17 @@ class BackupWorker { return failedWorkspaces } index++ - ctx.info('\n\nBACKUP WORKSPACE ', { + rootCtx.info('\n\nBACKUP WORKSPACE ', { workspace: ws.workspace, productId: ws.productId, index, total: workspaces.length }) + const childLogger = rootCtx.logger.childLogger?.(ws.workspace, { + workspace: ws.workspace, + enableConsole: 'false' + }) + const ctx = rootCtx.newChild(ws.workspace, { workspace: ws.workspace }, {}, childLogger) try { const storage = await createStorageBackupStorage( ctx, @@ -103,6 +133,13 @@ class BackupWorker { getWorkspaceId(this.config.BucketName, ws.productId), ws.workspace ) + const wsUrl: WorkspaceIdWithUrl = { + name: ws.workspace, + productId: ws.productId, + workspaceName: ws.workspaceName ?? '', + workspaceUrl: ws.workspaceUrl ?? '' + } + const pipeline = await this.pipelineFactory(ctx, wsUrl, true, () => {}, null) await ctx.with('backup', { workspace: ws.workspace }, async (ctx) => { await backup(ctx, this.config.TransactorURL, getWorkspaceId(ws.workspace, ws.productId), storage, { skipDomains: [], @@ -111,20 +148,84 @@ class BackupWorker { timeout: this.config.Timeout * 1000, connectTimeout: 5 * 60 * 1000, // 5 minutes to, blobDownloadLimit: 100, - skipBlobContentTypes: [] + skipBlobContentTypes: [], + storageAdapter: pipeline.storage.storageAdapter, + connection: this.wrapPipeline(ctx, pipeline, wsUrl) }) }) } catch (err: any) { - ctx.error('\n\nFAILED to BACKUP', { workspace: ws.workspace, err }) + rootCtx.error('\n\nFAILED to BACKUP', { workspace: ws.workspace, err }) failedWorkspaces.push(ws) + await childLogger?.close() } } return failedWorkspaces } + + wrapPipeline (ctx: MeasureContext, pipeline: Pipeline, wsUrl: WorkspaceIdWithUrl): Client & BackupClient { + const sctx = new SessionContextImpl( + ctx, + systemAccountEmail, + 'backup', + true, + { targets: {}, txes: [] }, + wsUrl, + null, + false + ) + const backupOps = new BackupClientOps(pipeline) + + return { + findAll: async (_class, query, options) => { + return await pipeline.findAll(sctx, _class, query, options) + }, + findOne: async (_class, query, options) => { + return (await pipeline.findAll(sctx, _class, query, { ...options, limit: 1 })).shift() + }, + clean: async (domain, docs) => { + await backupOps.clean(ctx, domain, docs) + }, + close: async () => {}, + closeChunk: async (idx) => { + await backupOps.closeChunk(ctx, idx) + }, + getHierarchy: () => { + return pipeline.storage.hierarchy + }, + getModel: () => { + return pipeline.storage.modelDb + }, + loadChunk: async (domain, idx, recheck) => { + return await backupOps.loadChunk(ctx, domain, idx, recheck) + }, + loadDocs: async (domain, docs) => { + return await backupOps.loadDocs(ctx, domain, docs) + }, + upload: async (domain, docs) => { + await backupOps.upload(ctx, domain, docs) + }, + searchFulltext: async (query, options) => { + return { + docs: [], + total: 0 + } + }, + sendForceClose: async () => {}, + tx: async (tx) => { + return {} + }, + notify: (...tx) => {} + } + } } -export function backupService (ctx: MeasureContext, storage: StorageAdapter, config: BackupConfig): () => void { - const backupWorker = new BackupWorker(storage, config) +export function backupService ( + ctx: MeasureContext, + storage: StorageAdapter, + config: BackupConfig, + pipelineFactory: PipelineFactory +): () => void { + const backupWorker = new BackupWorker(storage, config, pipelineFactory) const shutdown = (): void => { void backupWorker.close() diff --git a/server/core/src/pipeline.ts b/server/core/src/pipeline.ts index f2553c861c..cb318b4965 100644 --- a/server/core/src/pipeline.ts +++ b/server/core/src/pipeline.ts @@ -51,7 +51,8 @@ export async function createPipeline ( constructors: MiddlewareCreator[], upgrade: boolean, broadcast: BroadcastFunc, - branding: Branding | null + branding: Branding | null, + disableTriggers?: boolean ): Promise { const broadcastHandlers: BroadcastFunc[] = [broadcast] const _broadcast: BroadcastFunc = ( @@ -68,7 +69,8 @@ export async function createPipeline ( await createServerStorage(ctx, conf, { upgrade, broadcast: _broadcast, - branding + branding, + disableTriggers }) ) const pipelineResult = await PipelineImpl.create(ctx.newChild('pipeline-operations', {}), storage, constructors) diff --git a/server/core/src/server/aggregator.ts b/server/core/src/server/aggregator.ts index 90cb053a2b..c3352ccc68 100644 --- a/server/core/src/server/aggregator.ts +++ b/server/core/src/server/aggregator.ts @@ -400,11 +400,11 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE export function buildStorage ( config: StorageConfiguration, dbAdapter: RawDBAdapter, - storageFactory: (kind: string, config: StorageConfig) => StorageAdapter + storageFactory: (config: StorageConfig) => StorageAdapter ): AggregatorStorageAdapter { const adapters = new Map() for (const c of config.storages) { - adapters.set(c.name, storageFactory(c.kind, c)) + adapters.set(c.name, storageFactory(c)) } return new AggregatorStorageAdapter(adapters, config.default, dbAdapter) } diff --git a/server/core/src/server/index.ts b/server/core/src/server/index.ts index bddf5e2474..c44d93b6b7 100644 --- a/server/core/src/server/index.ts +++ b/server/core/src/server/index.ts @@ -77,9 +77,11 @@ export async function createServerStorage ( for (const tx of model) { try { hierarchy.tx(tx) - await triggers.tx(tx) + if (options.disableTriggers !== true) { + await triggers.tx(tx) + } } catch (err: any) { - console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err) + ctx.warn('failed to apply model transaction, skipping', { tx: JSON.stringify(tx), err }) } } modelDb.addTxes(ctx, model, false) diff --git a/server/core/src/storage.ts b/server/core/src/storage.ts index 03f2906232..dc6a69f809 100644 --- a/server/core/src/storage.ts +++ b/server/core/src/storage.ts @@ -1,4 +1,15 @@ -import { type WorkspaceId, toWorkspaceString } from '@hcengineering/core' +import { + toWorkspaceString, + type Doc, + type DocInfo, + type Domain, + type MeasureContext, + type Ref, + type StorageIterator, + type WorkspaceId +} from '@hcengineering/core' +import type { Pipeline } from './types' +import { estimateDocSize } from './utils' export * from '@hcengineering/storage' @@ -8,3 +19,95 @@ export * from '@hcengineering/storage' export function getBucketId (workspaceId: WorkspaceId): string { return toWorkspaceString(workspaceId, '.') } + +const chunkSize = 2 * 1024 * 1024 + +/** + * @public + */ +export interface ChunkInfo { + idx: number + index: 0 + finished: boolean + iterator: StorageIterator +} + +/** + * @public + */ +export class BackupClientOps { + constructor (protected readonly _pipeline: Pipeline) {} + + idIndex = 0 + chunkInfo = new Map() + + async loadChunk ( + ctx: MeasureContext, + domain: Domain, + idx?: number, + recheck?: boolean + ): Promise<{ + idx: number + docs: DocInfo[] + finished: boolean + }> { + return await ctx.with('load-chunk', { domain }, async (ctx) => { + idx = idx ?? this.idIndex++ + let chunk: ChunkInfo | undefined = this.chunkInfo.get(idx) + if (chunk !== undefined) { + chunk.index++ + if (chunk.finished === undefined || chunk.finished) { + return { + idx, + docs: [], + finished: true + } + } + } else { + chunk = { idx, iterator: this._pipeline.storage.find(ctx, domain, recheck), finished: false, index: 0 } + this.chunkInfo.set(idx, chunk) + } + let size = 0 + const docs: DocInfo[] = [] + + while (size < chunkSize) { + const doc = await chunk.iterator.next(ctx) + if (doc === undefined) { + chunk.finished = true + break + } + + size += estimateDocSize(doc) + docs.push(doc) + } + + return { + idx, + docs, + finished: chunk.finished + } + }) + } + + async closeChunk (ctx: MeasureContext, idx: number): Promise { + await ctx.with('close-chunk', {}, async () => { + const chunk = this.chunkInfo.get(idx) + this.chunkInfo.delete(idx) + if (chunk != null) { + await chunk.iterator.close(ctx) + } + }) + } + + async loadDocs (ctx: MeasureContext, domain: Domain, docs: Ref[]): Promise { + return await this._pipeline.storage.load(ctx, domain, docs) + } + + async upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise { + await this._pipeline.storage.upload(ctx, domain, docs) + } + + async clean (ctx: MeasureContext, domain: Domain, docs: Ref[]): Promise { + await this._pipeline.storage.clean(ctx, domain, docs) + } +} diff --git a/server/core/src/types.ts b/server/core/src/types.ts index f8d28769fb..9e29f77fae 100644 --- a/server/core/src/types.ts +++ b/server/core/src/types.ts @@ -137,6 +137,17 @@ export interface Pipeline extends LowLevelStorage { close: () => Promise } +/** + * @public + */ +export type PipelineFactory = ( + ctx: MeasureContext, + ws: WorkspaceIdWithUrl, + upgrade: boolean, + broadcast: BroadcastFunc, + branding: Branding | null +) => Promise + /** * @public */ @@ -448,6 +459,8 @@ export interface ServerStorageOptions { broadcast: BroadcastFunc branding: Branding | null + + disableTriggers?: boolean } export interface ServiceAdapter { diff --git a/server/server-pipeline/src/pipeline.ts b/server/server-pipeline/src/pipeline.ts index 654214f722..ac2e317273 100644 --- a/server/server-pipeline/src/pipeline.ts +++ b/server/server-pipeline/src/pipeline.ts @@ -6,7 +6,6 @@ import { DOMAIN_MODEL, DOMAIN_TRANSIENT, DOMAIN_TX, - type BrandingMap, type MeasureContext } from '@hcengineering/core' import { createElasticAdapter, createElasticBackupDataAdapter } from '@hcengineering/elastic' @@ -33,10 +32,9 @@ import { createPipeline, type DbConfiguration, type MiddlewareCreator, - type StorageAdapter, - type StorageConfiguration + type PipelineFactory, + type StorageAdapter } from '@hcengineering/server-core' -import { type PipelineFactory, type ServerFactory } from '@hcengineering/server-ws' import { createIndexStages } from './indexing' /** @@ -48,15 +46,11 @@ export function createServerPipeline ( dbUrl: string, opt: { fullTextUrl: string - storageConfig: StorageConfiguration rekoniUrl: string - port: number - productId: string - brandingMap: BrandingMap - serverFactory: ServerFactory - indexProcessing: number // 1000 indexParallel: number // 2 + disableTriggers?: boolean + usePassedCtx?: boolean externalStorage: StorageAdapter }, @@ -73,7 +67,8 @@ export function createServerPipeline ( QueryJoinMiddleware.create ] return (ctx, workspace, upgrade, broadcast, branding) => { - const wsMetrics = metrics.newChild('🧲 session', {}) + const metricsCtx = opt.usePassedCtx === true ? ctx : metrics + const wsMetrics = metricsCtx.newChild('🧲 session', {}) const conf: DbConfiguration = { domains: { [DOMAIN_TX]: 'MongoTx', @@ -151,6 +146,6 @@ export function createServerPipeline ( storageFactory: opt.externalStorage, workspace } - return createPipeline(ctx, conf, middlewares, upgrade, broadcast, branding) + return createPipeline(ctx, conf, middlewares, upgrade, broadcast, branding, opt.disableTriggers) } } diff --git a/server/server-storage/src/blobStorage.ts b/server/server-storage/src/blobStorage.ts index ac615c82fb..70f714a07b 100644 --- a/server/server-storage/src/blobStorage.ts +++ b/server/server-storage/src/blobStorage.ts @@ -65,10 +65,7 @@ class StorageBlobAdapter implements DbAdapter { } find (ctx: MeasureContext, domain: Domain, recheck?: boolean): StorageIterator { - if (recheck === true) { - return (this.client as StorageAdapterEx).find(ctx, this.workspaceId) - } - return this.blobAdapter.find(ctx, domain, recheck) + return (this.client as StorageAdapterEx).find(ctx, this.workspaceId) } async load (ctx: MeasureContext, domain: Domain, docs: Ref[]): Promise { diff --git a/server/server-storage/src/starter.ts b/server/server-storage/src/starter.ts index b654e4ff0a..8c4a9f1396 100644 --- a/server/server-storage/src/starter.ts +++ b/server/server-storage/src/starter.ts @@ -29,10 +29,10 @@ import { */ -export function storageConfigFromEnv (): StorageConfiguration { +export function storageConfigFromEnv (configEnv: string = 'STORAGE_CONFIG'): StorageConfiguration { const storageConfig: StorageConfiguration = { default: '', storages: [] } - const storageEnv = process.env.STORAGE_CONFIG + const storageEnv = process.env[configEnv] if (storageEnv !== undefined) { parseStorageEnv(storageEnv, storageConfig) } @@ -83,22 +83,25 @@ export function parseStorageEnv (storageEnv: string, storageConfig: StorageConfi } } -export function buildStorageFromConfig (config: StorageConfiguration, dbUrl: string): AggregatorStorageAdapter { - return buildStorage(config, createRawMongoDBAdapter(dbUrl), (kind, config): StorageAdapter => { - if (kind === MinioService.config) { - const c = config as MinioConfig - if (c.endpoint == null || c.accessKey == null || c.secretKey == null) { - throw new Error('One of endpoint/accessKey/secretKey values are not specified') - } - return new MinioService(c) - } else if (kind === S3Service.config) { - const c = config as S3Config - if (c.endpoint == null || c.accessKey == null || c.secretKey == null) { - throw new Error('One of endpoint/accessKey/secretKey values are not specified') - } - return new S3Service(c) - } else { - throw new Error('Unsupported storage kind:' + kind) +export function createStorageFromConfig (config: StorageConfig): StorageAdapter { + const kind = config.kind + if (kind === MinioService.config) { + const c = config as MinioConfig + if (c.endpoint == null || c.accessKey == null || c.secretKey == null) { + throw new Error('One of endpoint/accessKey/secretKey values are not specified') } - }) + return new MinioService(c) + } else if (kind === S3Service.config) { + const c = config as S3Config + if (c.endpoint == null || c.accessKey == null || c.secretKey == null) { + throw new Error('One of endpoint/accessKey/secretKey values are not specified') + } + return new S3Service(c) + } else { + throw new Error('Unsupported storage kind:' + kind) + } +} + +export function buildStorageFromConfig (config: StorageConfiguration, dbUrl: string): AggregatorStorageAdapter { + return buildStorage(config, createRawMongoDBAdapter(dbUrl), createStorageFromConfig) } diff --git a/server/server/src/backup.ts b/server/server/src/backup.ts index 7de61b2698..11b4968b8e 100644 --- a/server/server/src/backup.ts +++ b/server/server/src/backup.ts @@ -1,20 +1,8 @@ -import { Doc, DocInfo, Domain, Ref, StorageIterator } from '@hcengineering/core' -import { Pipeline, estimateDocSize } from '@hcengineering/server-core' +import { Doc, Domain, Ref } from '@hcengineering/core' +import { BackupClientOps, Pipeline } from '@hcengineering/server-core' import { Token } from '@hcengineering/server-token' import { ClientSession, Session, type ClientSessionCtx } from '@hcengineering/server-ws' -const chunkSize = 2 * 1024 * 1024 - -/** - * @public - */ -export interface ChunkInfo { - idx: number - index: 0 - finished: boolean - iterator: StorageIterator -} - /** * @public */ @@ -28,76 +16,35 @@ export interface BackupSession extends Session { * @public */ export class BackupClientSession extends ClientSession implements BackupSession { + ops: BackupClientOps constructor ( protected readonly token: Token, - protected readonly _pipeline: Pipeline + _pipeline: Pipeline ) { super(token, _pipeline) + this.ops = new BackupClientOps(_pipeline) } - idIndex = 0 - chunkInfo = new Map() - async loadChunk (_ctx: ClientSessionCtx, domain: Domain, idx?: number, recheck?: boolean): Promise { this.lastRequest = Date.now() - await _ctx.ctx.with('load-chunk', { domain }, async (ctx) => { - try { - idx = idx ?? this.idIndex++ - let chunk: ChunkInfo | undefined = this.chunkInfo.get(idx) - if (chunk !== undefined) { - chunk.index++ - if (chunk.finished === undefined) { - return { - idx, - docs: [], - finished: true - } - } - } else { - chunk = { idx, iterator: this._pipeline.storage.find(ctx, domain, recheck), finished: false, index: 0 } - this.chunkInfo.set(idx, chunk) - } - let size = 0 - const docs: DocInfo[] = [] - - while (size < chunkSize) { - const doc = await chunk.iterator.next(ctx) - if (doc === undefined) { - chunk.finished = true - break - } - - size += estimateDocSize(doc) - docs.push(doc) - } - - await _ctx.sendResponse({ - idx, - docs, - finished: chunk.finished - }) - } catch (err: any) { - await _ctx.sendResponse({ error: err.message }) - } - }) + try { + const result = await this.ops.loadChunk(_ctx.ctx, domain) + await _ctx.sendResponse(result) + } catch (err: any) { + await _ctx.sendResponse({ error: err.message }) + } } async closeChunk (ctx: ClientSessionCtx, idx: number): Promise { this.lastRequest = Date.now() - await ctx.ctx.with('close-chunk', {}, async () => { - const chunk = this.chunkInfo.get(idx) - this.chunkInfo.delete(idx) - if (chunk != null) { - await chunk.iterator.close(ctx.ctx) - } - await ctx.sendResponse({}) - }) + await this.ops.closeChunk(ctx.ctx, idx) + await ctx.sendResponse({}) } async loadDocs (ctx: ClientSessionCtx, domain: Domain, docs: Ref[]): Promise { this.lastRequest = Date.now() try { - const result = await this._pipeline.storage.load(ctx.ctx, domain, docs) + const result = await this.ops.loadDocs(ctx.ctx, domain, docs) await ctx.sendResponse(result) } catch (err: any) { await ctx.sendResponse({ error: err.message }) @@ -107,7 +54,7 @@ export class BackupClientSession extends ClientSession implements BackupSession async upload (ctx: ClientSessionCtx, domain: Domain, docs: Doc[]): Promise { this.lastRequest = Date.now() try { - await this._pipeline.storage.upload(ctx.ctx, domain, docs) + await this.ops.upload(ctx.ctx, domain, docs) } catch (err: any) { await ctx.sendResponse({ error: err.message }) return @@ -118,7 +65,7 @@ export class BackupClientSession extends ClientSession implements BackupSession async clean (ctx: ClientSessionCtx, domain: Domain, docs: Ref[]): Promise { this.lastRequest = Date.now() try { - await this._pipeline.storage.clean(ctx.ctx, domain, docs) + await this.ops.clean(ctx.ctx, domain, docs) } catch (err: any) { await ctx.sendResponse({ error: err.message }) return diff --git a/server/tool/src/connect.ts b/server/tool/src/connect.ts index 6732473a92..21bff3d20a 100644 --- a/server/tool/src/connect.ts +++ b/server/tool/src/connect.ts @@ -24,6 +24,7 @@ import { type MeasureContext } from '@hcengineering/core' import { addLocation, getMetadata, getResource, setMetadata } from '@hcengineering/platform' +import type { StorageAdapter } from '@hcengineering/server-core' import { generateToken } from '@hcengineering/server-token' import crypto from 'node:crypto' import plugin from './plugin' @@ -92,11 +93,14 @@ export class BlobClient { constructor ( readonly transactorUrl: string, readonly workspace: WorkspaceId, - email?: string, - extra?: Record + readonly opt?: { + email?: string + extra?: Record + storageAdapter?: StorageAdapter + } ) { this.index = 0 - this.token = generateToken(email ?? systemAccountEmail, workspace, extra) + this.token = generateToken(opt?.email ?? systemAccountEmail, workspace, opt?.extra) let url = transactorUrl if (url.endsWith('/')) { url = url.slice(0, url.length - 1) @@ -106,6 +110,12 @@ export class BlobClient { } async checkFile (ctx: MeasureContext, name: string): Promise { + if (this.opt?.storageAdapter !== undefined) { + const obj = await this.opt?.storageAdapter.stat(ctx, this.workspace, name) + if (obj !== undefined) { + return true + } + } for (let i = 0; i < 5; i++) { try { const response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, { @@ -149,51 +159,66 @@ export class BlobClient { for (; i < 5; i++) { try { const st = Date.now() + let chunk: Buffer - const header: Record = { - Authorization: 'Bearer ' + this.token - } - - if (!(size !== -1 && written === 0 && size < chunkSize)) { - header.Range = `bytes=${written}-${size === -1 ? written + chunkSize : Math.min(size - 1, written + chunkSize)}` - } - - response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, { headers: header }) - if (header.Range != null) { - ctx.info('fetch part', { time: Date.now() - st, blobId: name, written, size }) - } - if (response.status === 403) { - i = 5 - // No file, so make it empty - throw new Error(`Unauthorized ${this.transactorAPIUrl}/${this.workspace.name}/${name}`) - } - if (response.status === 404) { - i = 5 - // No file, so make it empty - throw new Error(`No file for ${this.transactorAPIUrl}/${this.workspace.name}/${name}`) - } - if (response.status === 416) { - if (size === -1) { - size = parseInt((response.headers.get('content-range') ?? '').split('*/')[1]) - continue + if (this.opt?.storageAdapter !== undefined) { + const chunks: Buffer[] = [] + const readable = await this.opt.storageAdapter.partial(ctx, this.workspace, name, written, chunkSize) + await new Promise((resolve) => { + readable.on('data', (chunk) => { + chunks.push(chunk) + }) + readable.on('end', () => { + resolve() + }) + }) + chunk = Buffer.concat(chunks) + } else { + const header: Record = { + Authorization: 'Bearer ' + this.token } - // No file, so make it empty - throw new Error(`No file for ${this.transactorAPIUrl}/${this.workspace.name}/${name}`) - } - const chunk = Buffer.from(await response.arrayBuffer()) + if (!(size !== -1 && written === 0 && size < chunkSize)) { + header.Range = `bytes=${written}-${size === -1 ? written + chunkSize : Math.min(size - 1, written + chunkSize)}` + } - if (header.Range == null) { - size = chunk.length - } - // We need to parse - // 'Content-Range': `bytes ${start}-${end}/${size}` - // To determine if something is left - const range = response.headers.get('Content-Range') - if (range !== null) { - const [, total] = range.split(' ')[1].split('/') - if (total !== undefined) { - size = parseInt(total) + response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, { headers: header }) + if (header.Range != null) { + ctx.info('fetch part', { time: Date.now() - st, blobId: name, written, size }) + } + if (response.status === 403) { + i = 5 + // No file, so make it empty + throw new Error(`Unauthorized ${this.transactorAPIUrl}/${this.workspace.name}/${name}`) + } + if (response.status === 404) { + i = 5 + // No file, so make it empty + throw new Error(`No file for ${this.transactorAPIUrl}/${this.workspace.name}/${name}`) + } + if (response.status === 416) { + if (size === -1) { + size = parseInt((response.headers.get('content-range') ?? '').split('*/')[1]) + continue + } + + // No file, so make it empty + throw new Error(`No file for ${this.transactorAPIUrl}/${this.workspace.name}/${name}`) + } + chunk = Buffer.from(await response.arrayBuffer()) + + if (header.Range == null) { + size = chunk.length + } + // We need to parse + // 'Content-Range': `bytes ${start}-${end}/${size}` + // To determine if something is left + const range = response.headers.get('Content-Range') + if (range !== null) { + const [, total] = range.split(' ')[1].split('/') + if (total !== undefined) { + size = parseInt(total) + } } } @@ -219,6 +244,10 @@ export class BlobClient { } break } catch (err: any) { + if (err?.code === 'NoSuchKey') { + ctx.info('No such key', { name }) + return + } if (i > 4) { await new Promise((resolve) => { writable.end(resolve) @@ -236,26 +265,30 @@ export class BlobClient { } async upload (ctx: MeasureContext, name: string, size: number, contentType: string, buffer: Buffer): Promise { - // TODO: We need to improve this logig, to allow restore of huge blobs - for (let i = 0; i < 5; i++) { - try { - await fetch( - this.transactorAPIUrl + - `?name=${encodeURIComponent(name)}&contentType=${encodeURIComponent(contentType)}&size=${size}`, - { - method: 'PUT', - headers: { - Authorization: 'Bearer ' + this.token, - 'Content-Type': contentType - }, - body: buffer + if (this.opt?.storageAdapter !== undefined) { + await this.opt.storageAdapter.put(ctx, this.workspace, name, buffer, contentType, size) + } else { + // TODO: We need to improve this logig, to allow restore of huge blobs + for (let i = 0; i < 5; i++) { + try { + await fetch( + this.transactorAPIUrl + + `?name=${encodeURIComponent(name)}&contentType=${encodeURIComponent(contentType)}&size=${size}`, + { + method: 'PUT', + headers: { + Authorization: 'Bearer ' + this.token, + 'Content-Type': contentType + }, + body: buffer + } + ) + break + } catch (err: any) { + if (i === 4) { + ctx.error('failed to upload file', { name }) + throw err } - ) - break - } catch (err: any) { - if (i === 4) { - ctx.error('failed to upload file', { name }) - throw err } } } diff --git a/server/ws/src/server.ts b/server/ws/src/server.ts index 9b5b1ed49a..b7767c5735 100644 --- a/server/ws/src/server.ts +++ b/server/ws/src/server.ts @@ -32,14 +32,13 @@ import core, { } from '@hcengineering/core' import { unknownError, type Status } from '@hcengineering/platform' import { type HelloRequest, type HelloResponse, type Request, type Response } from '@hcengineering/rpc' -import type { Pipeline, StorageAdapter } from '@hcengineering/server-core' +import type { Pipeline, StorageAdapter, PipelineFactory } from '@hcengineering/server-core' import { type Token } from '@hcengineering/server-token' import { LOGGING_ENABLED, type ClientSessionCtx, type ConnectionSocket, - type PipelineFactory, type ServerFactory, type Session, type SessionManager, diff --git a/server/ws/src/server_http.ts b/server/ws/src/server_http.ts index 1fcb319af6..2d637cf338 100644 --- a/server/ws/src/server_http.ts +++ b/server/ws/src/server_http.ts @@ -25,15 +25,9 @@ import http, { type IncomingMessage } from 'http' import os from 'os' import { WebSocketServer, type RawData, type WebSocket } from 'ws' import { getStatistics, wipeStatistics } from './stats' -import { - LOGGING_ENABLED, - type ConnectionSocket, - type HandleRequestFunction, - type PipelineFactory, - type SessionManager -} from './types' +import { LOGGING_ENABLED, type ConnectionSocket, type HandleRequestFunction, type SessionManager } from './types' -import type { StorageAdapter } from '@hcengineering/server-core' +import { type StorageAdapter, type PipelineFactory } from '@hcengineering/server-core' import 'bufferutil' import 'utf-8-validate' import { getFile, getFileRange, type BlobResponse } from './blobs' diff --git a/server/ws/src/server_u.ts b/server/ws/src/server_u.ts index 291b1ad017..06ec17c703 100644 --- a/server/ws/src/server_u.ts +++ b/server/ws/src/server_u.ts @@ -22,15 +22,9 @@ import { decodeToken } from '@hcengineering/server-token' import { Analytics } from '@hcengineering/analytics' import { RPCHandler } from '@hcengineering/rpc' import { getStatistics, wipeStatistics } from './stats' -import { - LOGGING_ENABLED, - type ConnectionSocket, - type HandleRequestFunction, - type PipelineFactory, - type SessionManager -} from './types' +import { LOGGING_ENABLED, type ConnectionSocket, type HandleRequestFunction, type SessionManager } from './types' -import type { StorageAdapter } from '@hcengineering/server-core' +import { type StorageAdapter, type PipelineFactory } from '@hcengineering/server-core' import uWebSockets, { DISABLED, SHARED_COMPRESSOR, type HttpResponse, type WebSocket } from '@hcengineering/uws' import { Readable } from 'stream' import { getFile, getFileRange, type BlobResponse } from './blobs' diff --git a/server/ws/src/types.ts b/server/ws/src/types.ts index 2d33744bd4..3eb21aced1 100644 --- a/server/ws/src/types.ts +++ b/server/ws/src/types.ts @@ -1,4 +1,5 @@ import { + type Branding, type Class, type Doc, type DocumentQuery, @@ -7,12 +8,10 @@ import { type MeasureContext, type Ref, type Tx, - type WorkspaceId, - type WorkspaceIdWithUrl, - type Branding + type WorkspaceId } from '@hcengineering/core' import { type Request, type Response } from '@hcengineering/rpc' -import { type BroadcastFunc, type Pipeline, type StorageAdapter } from '@hcengineering/server-core' +import { type Pipeline, type PipelineFactory, type StorageAdapter } from '@hcengineering/server-core' import { type Token } from '@hcengineering/server-token' /** @@ -84,17 +83,6 @@ export interface Session { getMode: () => string } -/** - * @public - */ -export type PipelineFactory = ( - ctx: MeasureContext, - ws: WorkspaceIdWithUrl, - upgrade: boolean, - broadcast: BroadcastFunc, - branding: Branding | null -) => Promise - /** * @public */