diff --git a/.vscode/launch.json b/.vscode/launch.json index feb3517f20..212b10d824 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -293,16 +293,17 @@ "args": ["src/index.ts"], "env": { "ACCOUNTS_URL": "http://localhost:3000", - "MINIO_ENDPOINT": "localhost", - "MINIO_ACCESS_KEY": "minioadmin", - "MINIO_SECRET_KEY": "minioadmin", + "STORAGE": "minio|localhost?accessKey=minioadmin&secretKey=minioadmin", + "WORKSPACE_STORAGE": "minio|localhost?accessKey=minioadmin&secretKey=minioadmin", + "MONGO_URL": "mongodb://localhost:27017", "SECRET": "secret", - "BUCKET_NAME":"test_backups", - "INTERVAL":"30" + "BUCKET_NAME":"backups", + "INTERVAL":"30", }, "runtimeArgs": ["--nolazy", "-r", "ts-node/register"], "showAsyncStacks": true, "sourceMaps": true, + "outputCapture": "std", "cwd": "${workspaceRoot}/pods/backup", "protocol": "inspector" }, diff --git a/desktop/src/main/backup.ts b/desktop/src/main/backup.ts index d74f591375..7857638632 100644 --- a/desktop/src/main/backup.ts +++ b/desktop/src/main/backup.ts @@ -55,7 +55,7 @@ async function doBackup (dirName: string, token: string, endpoint: string, works progress: (value: number): void => { notify('backup', value) }, - connection: client + getConnection: async () => client }) } finally { await client.close() diff --git a/packages/analytics-service/src/logging.ts b/packages/analytics-service/src/logging.ts index 83eed07dd4..0288c690ed 100644 --- a/packages/analytics-service/src/logging.ts +++ b/packages/analytics-service/src/logging.ts @@ -71,8 +71,6 @@ export class SplitLogger implements MeasureLogger { ) }) ) - } else { - console.log('Logging only in files') } this.logger.info( '####################################################################################################################' diff --git a/pods/backup/src/index.ts b/pods/backup/src/index.ts index 8648efad7b..c2ec43a82e 100644 --- a/pods/backup/src/index.ts +++ b/pods/backup/src/index.ts @@ -14,16 +14,16 @@ // import { Analytics } from '@hcengineering/analytics' +import { configureAnalytics, SplitLogger } from '@hcengineering/analytics-service' import { startBackup } from '@hcengineering/backup-service' import { MeasureMetricsContext, metricsToString, newMetrics } from '@hcengineering/core' -import { DummyDbAdapter, DummyFullTextAdapter, type PipelineFactory } from '@hcengineering/server-core' -import { createServerPipeline } from '@hcengineering/server-pipeline' -import { configureAnalytics, SplitLogger } from '@hcengineering/analytics-service' +import { type PipelineFactory } from '@hcengineering/server-core' +import { createBackupPipeline, getConfig } from '@hcengineering/server-pipeline' import { writeFile } from 'fs/promises' import { join } from 'path' const metricsContext = new MeasureMetricsContext( - 'github', + 'backup', {}, {}, newMetrics(), @@ -55,35 +55,26 @@ const onClose = (): void => { metricsContext.info('Closed') } -startBackup(metricsContext, (mongoUrl, storageAdapter) => { - const factory: PipelineFactory = createServerPipeline( - metricsContext, - mongoUrl, - { +startBackup( + metricsContext, + (mongoUrl, storageAdapter) => { + const factory: PipelineFactory = createBackupPipeline(metricsContext, mongoUrl, { externalStorage: storageAdapter, + usePassedCtx: true + }) + return factory + }, + (ctx, dbUrls, workspace, branding, externalStorage) => { + return getConfig(ctx, dbUrls, workspace, branding, ctx, { + externalStorage, fullTextUrl: '', indexParallel: 0, indexProcessing: 0, - disableTriggers: true, rekoniUrl: '', - usePassedCtx: true - }, - { - adapters: { - FullTextBlob: { - factory: async () => new DummyDbAdapter(), - url: '' - } - }, - fulltextAdapter: { - factory: async () => new DummyFullTextAdapter(), - stages: () => [], - url: '' - } - } - ) - return factory -}) + disableTriggers: true + }) + } +) process.on('SIGINT', onClose) process.on('SIGTERM', onClose) diff --git a/server/backup-service/src/index.ts b/server/backup-service/src/index.ts index b22c9e6103..42bcd7ec9f 100644 --- a/server/backup-service/src/index.ts +++ b/server/backup-service/src/index.ts @@ -13,30 +13,39 @@ // limitations under the License. // -import { MeasureContext, systemAccountEmail } from '@hcengineering/core' +import { MeasureContext, systemAccountEmail, type Branding, type WorkspaceIdWithUrl } from '@hcengineering/core' import { setMetadata } from '@hcengineering/platform' import { backupService } from '@hcengineering/server-backup' import serverClientPlugin from '@hcengineering/server-client' -import { type PipelineFactory, type StorageAdapter } from '@hcengineering/server-core' +import { type DbConfiguration, type PipelineFactory, type StorageAdapter } from '@hcengineering/server-core' import { buildStorageFromConfig, createStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage' import serverToken, { generateToken } from '@hcengineering/server-token' import config from './config' export function startBackup ( ctx: MeasureContext, - pipelineFactoryFactory: (mongoUrl: string, storage: StorageAdapter) => PipelineFactory + pipelineFactoryFactory: (mongoUrl: string, storage: StorageAdapter) => PipelineFactory, + getConfig: ( + ctx: MeasureContext, + dbUrls: string, + workspace: WorkspaceIdWithUrl, + branding: Branding | null, + externalStorage: StorageAdapter + ) => DbConfiguration ): void { setMetadata(serverToken.metadata.Secret, config.Secret) setMetadata(serverClientPlugin.metadata.Endpoint, config.AccountsURL) setMetadata(serverClientPlugin.metadata.UserAgent, config.ServiceID) + const [mainDbUrl, rawDbUrl] = config.MongoURL.split(';') + const backupStorageConfig = storageConfigFromEnv(config.Storage) const workspaceStorageConfig = storageConfigFromEnv(config.WorkspaceStorage) const storageAdapter = createStorageFromConfig(backupStorageConfig.storages[0]) - const workspaceStorageAdapter = buildStorageFromConfig(workspaceStorageConfig, config.MongoURL) + const workspaceStorageAdapter = buildStorageFromConfig(workspaceStorageConfig, rawDbUrl ?? mainDbUrl) - const pipelineFactory = pipelineFactoryFactory(config.MongoURL, workspaceStorageAdapter) + const pipelineFactory = pipelineFactoryFactory(mainDbUrl, workspaceStorageAdapter) // A token to access account service const token = generateToken(systemAccountEmail, { name: 'backup' }) @@ -46,7 +55,10 @@ export function startBackup ( storageAdapter, { ...config, Token: token }, pipelineFactory, - workspaceStorageAdapter + workspaceStorageAdapter, + (ctx, workspace, branding, externalStorage) => { + return getConfig(ctx, mainDbUrl, workspace, branding, externalStorage) + } ) process.on('SIGINT', shutdown) diff --git a/server/backup/src/backup.ts b/server/backup/src/backup.ts index e998441fb8..1f89dd1935 100644 --- a/server/backup/src/backup.ts +++ b/server/backup/src/backup.ts @@ -35,7 +35,8 @@ import core, { TxCollectionCUD, WorkspaceId, type Blob, - type DocIndexState + type DocIndexState, + type Tx } from '@hcengineering/core' import { BlobClient, createClient } from '@hcengineering/server-client' import { fullTextPushStagePrefix, type StorageAdapter } from '@hcengineering/server-core' @@ -514,7 +515,8 @@ export async function backup ( connectTimeout: number skipBlobContentTypes: string[] blobDownloadLimit: number - connection?: CoreClient & BackupClient + getLastTx?: () => Promise + getConnection?: () => Promise storageAdapter?: StorageAdapter // Return true in case isCanceled?: () => boolean @@ -529,7 +531,7 @@ export async function backup ( skipBlobContentTypes: [], blobDownloadLimit: 15 } -): Promise { +): Promise { ctx = ctx.newChild('backup', { workspaceId: workspaceId.name, force: options.force, @@ -555,36 +557,11 @@ export async function backup ( }, options.timeout) } - const token = - options.token ?? - generateToken(systemAccountEmail, workspaceId, { - mode: 'backup' - }) - - const connection = - options.connection ?? - ((await createClient(transactorUrl, options.token ?? token, undefined, options.connectTimeout)) as CoreClient & - BackupClient) - - const blobClient = new BlobClient(transactorUrl, token, workspaceId, { storageAdapter: options.storageAdapter }) - ctx.info('starting backup', { workspace: workspaceId.name }) + const st = Date.now() + let connection!: CoreClient & BackupClient + let printEnd = true try { - const domains = [ - ...connection - .getHierarchy() - .domains() - .filter( - (it) => - it !== DOMAIN_TRANSIENT && - it !== DOMAIN_MODEL && - it !== ('fulltext-blob' as Domain) && - !options.skipDomains.includes(it) && - (options.include === undefined || options.include.has(it)) - ) - ] - ctx.info('domains for dump', { domains: domains.length }) - let backupInfo: BackupInfo = { workspace: workspaceId.name, version: '0.6.2', @@ -602,19 +579,70 @@ export async function backup ( backupInfo.workspace = workspaceId.name + let lastTx: Tx | undefined + + let lastTxChecked = false // Skip backup if there is no transaction changes. - const lastTx = await connection.findOne( - core.class.Tx, - {}, - { limit: 1, sort: { modifiedOn: SortingOrder.Descending } } - ) - if (lastTx !== undefined) { - if (lastTx._id === backupInfo.lastTxId && !options.force) { - ctx.info('No transaction changes. Skipping backup.', { workspace: workspaceId.name }) - return + if (options.getLastTx !== undefined) { + lastTx = await options.getLastTx() + if (lastTx !== undefined) { + if (lastTx._id === backupInfo.lastTxId && !options.force) { + printEnd = false + ctx.info('No transaction changes. Skipping backup.', { workspace: workspaceId.name }) + return false + } + } + lastTxChecked = true + } + const token = + options.token ?? + generateToken(systemAccountEmail, workspaceId, { + mode: 'backup' + }) + + ctx.warn('starting backup', { workspace: workspaceId.name }) + + connection = + options.getConnection !== undefined + ? await options.getConnection() + : ((await createClient( + transactorUrl, + options.token ?? token, + undefined, + options.connectTimeout + )) as CoreClient & BackupClient) + + if (!lastTxChecked) { + lastTx = await connection.findOne(core.class.Tx, {}, { limit: 1, sort: { modifiedOn: SortingOrder.Descending } }) + if (lastTx !== undefined) { + if (lastTx._id === backupInfo.lastTxId && !options.force) { + ctx.info('No transaction changes. Skipping backup.', { workspace: workspaceId.name }) + if (options.getConnection === undefined) { + await connection.close() + } + return false + } } } + const blobClient = new BlobClient(transactorUrl, token, workspaceId, { storageAdapter: options.storageAdapter }) + + const domains = [ + ...connection + .getHierarchy() + .domains() + .filter( + (it) => + it !== DOMAIN_TRANSIENT && + it !== DOMAIN_MODEL && + it !== ('fulltext-blob' as Domain) && + !options.skipDomains.includes(it) && + (options.include === undefined || options.include.has(it)) + ) + ] + + ctx.info('domains for dump', { domains: domains.length }) + backupInfo.lastTxId = '' // Clear until full backup will be complete const snapshot: BackupSnapshot = { @@ -1006,11 +1034,15 @@ export async function backup ( backupInfo.lastTxId = lastTx?._id ?? '0' // We could store last tx, since full backup is complete await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })) } + return true } catch (err: any) { ctx.error('backup error', { err, workspace: workspaceId.name }) + return false } finally { - ctx.info('end backup', { workspace: workspaceId.name }) - if (options.connection === undefined) { + if (printEnd) { + ctx.info('end backup', { workspace: workspaceId.name, totalTime: Date.now() - st }) + } + if (options.getConnection === undefined && connection !== undefined) { await connection.close() } ctx.end() diff --git a/server/backup/src/service.ts b/server/backup/src/service.ts index ecaf26d10d..78e432189b 100644 --- a/server/backup/src/service.ts +++ b/server/backup/src/service.ts @@ -15,11 +15,17 @@ import { BaseWorkspaceInfo, + DOMAIN_TX, getWorkspaceId, + Hierarchy, + ModelDb, + SortingOrder, systemAccountEmail, type BackupClient, + type Branding, type Client, type MeasureContext, + type Tx, type WorkspaceIdWithUrl } from '@hcengineering/core' import { PlatformError, unknownError } from '@hcengineering/platform' @@ -27,6 +33,7 @@ import { listAccountWorkspaces } from '@hcengineering/server-client' import { BackupClientOps, SessionDataImpl, + type DbConfiguration, type Pipeline, type PipelineFactory, type StorageAdapter @@ -48,7 +55,13 @@ class BackupWorker { readonly storageAdapter: StorageAdapter, readonly config: BackupConfig, readonly pipelineFactory: PipelineFactory, - readonly workspaceStorageAdapter: StorageAdapter + readonly workspaceStorageAdapter: StorageAdapter, + readonly getConfig: ( + ctx: MeasureContext, + workspace: WorkspaceIdWithUrl, + branding: Branding | null, + externalStorage: StorageAdapter + ) => DbConfiguration ) {} canceled = false @@ -61,11 +74,26 @@ class BackupWorker { backupPromise: Promise | undefined + printStats ( + ctx: MeasureContext, + stats: { failedWorkspaces: BaseWorkspaceInfo[], processed: number, skipped: number } + ): void { + ctx.warn( + `**************************************** + backup statistics:`, + { + backuped: stats.processed, + notChanges: stats.skipped, + failed: stats.failedWorkspaces.length + } + ) + } + async triggerBackup (ctx: MeasureContext): Promise { - const failed = await this.backup(ctx) - if (failed.length > 0) { - ctx.info('Failed to backup workspaces, Retry failed workspaces once.', { failed: failed.length }) - await this.doBackup(ctx, failed) + const { failedWorkspaces } = await this.backup(ctx) + if (failedWorkspaces.length > 0) { + ctx.info('Failed to backup workspaces, Retry failed workspaces once.', { failed: failedWorkspaces.length }) + this.printStats(ctx, await this.doBackup(ctx, failedWorkspaces)) } } @@ -81,7 +109,9 @@ class BackupWorker { }, this.config.Interval * 1000) } - async backup (ctx: MeasureContext): Promise { + async backup ( + ctx: MeasureContext + ): Promise<{ failedWorkspaces: BaseWorkspaceInfo[], processed: number, skipped: number }> { const workspacesIgnore = new Set(this.config.SkipWorkspaces.split(';')) const workspaces = (await listAccountWorkspaces(this.config.Token)).filter((it) => { return !workspacesIgnore.has(it.workspace) @@ -90,16 +120,20 @@ class BackupWorker { return await this.doBackup(ctx, workspaces) } - async doBackup (rootCtx: MeasureContext, workspaces: BaseWorkspaceInfo[]): Promise { + async doBackup ( + rootCtx: MeasureContext, + workspaces: BaseWorkspaceInfo[] + ): Promise<{ failedWorkspaces: BaseWorkspaceInfo[], processed: number, skipped: number }> { let index = 0 const failedWorkspaces: BaseWorkspaceInfo[] = [] + let processed = 0 for (const ws of workspaces) { if (this.canceled) { - return failedWorkspaces + return { failedWorkspaces, processed, skipped: workspaces.length - processed } } index++ - rootCtx.info('\n\nBACKUP WORKSPACE ', { + rootCtx.warn('\n\nBACKUP WORKSPACE ', { workspace: ws.workspace, index, total: workspaces.length @@ -109,6 +143,7 @@ class BackupWorker { enableConsole: 'true' }) const ctx = rootCtx.newChild(ws.workspace, { workspace: ws.workspace }, {}, childLogger) + let pipeline: Pipeline | undefined try { const storage = await createStorageBackupStorage( ctx, @@ -121,28 +156,67 @@ class BackupWorker { workspaceName: ws.workspaceName ?? '', workspaceUrl: ws.workspaceUrl ?? '' } - const pipeline = await this.pipelineFactory(ctx, wsUrl, true, () => {}, null) + processed += (await ctx.with( + 'backup', + { workspace: ws.workspace }, + async (ctx) => + await backup(ctx, '', getWorkspaceId(ws.workspace), storage, { + skipDomains: [], + force: false, + recheck: false, + timeout: this.config.Timeout * 1000, + connectTimeout: 5 * 60 * 1000, // 5 minutes to, + blobDownloadLimit: 100, + skipBlobContentTypes: [], + storageAdapter: this.workspaceStorageAdapter, + getLastTx: async (): Promise => { + const config = this.getConfig(ctx, wsUrl, null, this.workspaceStorageAdapter) + const adapterConf = config.adapters[config.domains[DOMAIN_TX]] + const hierarchy = new Hierarchy() + const modelDb = new ModelDb(hierarchy) + const txAdapter = await adapterConf.factory( + ctx, + hierarchy, + adapterConf.url, + wsUrl, + modelDb, + this.workspaceStorageAdapter + ) + try { + await txAdapter.init?.() - await ctx.with('backup', { workspace: ws.workspace }, async (ctx) => { - await backup(ctx, '', getWorkspaceId(ws.workspace), storage, { - skipDomains: [], - force: false, - recheck: false, - timeout: this.config.Timeout * 1000, - connectTimeout: 5 * 60 * 1000, // 5 minutes to, - blobDownloadLimit: 100, - skipBlobContentTypes: [], - storageAdapter: this.workspaceStorageAdapter, - connection: this.wrapPipeline(ctx, pipeline, wsUrl) - }) - }) + return ( + await txAdapter.rawFindAll( + DOMAIN_TX, + {}, + { limit: 1, sort: { modifiedOn: SortingOrder.Descending } } + ) + ).shift() + } finally { + await txAdapter.close() + } + }, + getConnection: async () => { + if (pipeline === undefined) { + pipeline = await this.pipelineFactory(ctx, wsUrl, true, () => {}, null) + } + return this.wrapPipeline(ctx, pipeline, wsUrl) + } + }) + )) + ? 1 + : 0 } catch (err: any) { rootCtx.error('\n\nFAILED to BACKUP', { workspace: ws.workspace, err }) failedWorkspaces.push(ws) await childLogger?.close() + } finally { + if (pipeline !== undefined) { + await pipeline.close() + } } } - return failedWorkspaces + return { failedWorkspaces, processed, skipped: workspaces.length - processed } } wrapPipeline (ctx: MeasureContext, pipeline: Pipeline, wsUrl: WorkspaceIdWithUrl): Client & BackupClient { @@ -213,15 +287,22 @@ export function backupService ( storage: StorageAdapter, config: BackupConfig, pipelineFactory: PipelineFactory, - workspaceStorageAdapter: StorageAdapter + workspaceStorageAdapter: StorageAdapter, + getConfig: ( + ctx: MeasureContext, + workspace: WorkspaceIdWithUrl, + branding: Branding | null, + externalStorage: StorageAdapter + ) => DbConfiguration ): () => void { - const backupWorker = new BackupWorker(storage, config, pipelineFactory, workspaceStorageAdapter) + const backupWorker = new BackupWorker(storage, config, pipelineFactory, workspaceStorageAdapter, getConfig) const shutdown = (): void => { void backupWorker.close() } - void backupWorker.backup(ctx).then(() => { + void backupWorker.backup(ctx).then((res) => { + backupWorker.printStats(ctx, res) void backupWorker.schedule(ctx) }) return shutdown diff --git a/server/backup/src/storage.ts b/server/backup/src/storage.ts index 718e7b83f0..e127ab5ec7 100644 --- a/server/backup/src/storage.ts +++ b/server/backup/src/storage.ts @@ -82,7 +82,7 @@ class AdapterStorage implements BackupStorage { async exists (name: string): Promise { try { return (await this.client.stat(this.ctx, this.workspaceId, join(this.root, name))) !== undefined - } catch (err) { + } catch (err: any) { return false } } diff --git a/server/minio/src/index.ts b/server/minio/src/index.ts index 1a56fba05c..d0c2888bbb 100644 --- a/server/minio/src/index.ts +++ b/server/minio/src/index.ts @@ -286,6 +286,15 @@ export class MinioService implements StorageAdapter { version: result.versionId ?? null } } catch (err: any) { + if ( + err?.code === 'NoSuchKey' || + err?.code === 'NotFound' || + err?.message === 'No such key' || + err?.Code === 'NoSuchKey' + ) { + // Do not print error in this case + return + } ctx.error('no object found', { error: err, objectName, workspaceId: workspaceId.name }) } } diff --git a/server/server-pipeline/src/pipeline.ts b/server/server-pipeline/src/pipeline.ts index f777844232..52c62fe1c5 100644 --- a/server/server-pipeline/src/pipeline.ts +++ b/server/server-pipeline/src/pipeline.ts @@ -49,6 +49,7 @@ import { createBenchmarkAdapter, createInMemoryAdapter, createPipeline, + DummyDbAdapter, DummyFullTextAdapter, FullTextMiddleware, type AggregatorStorageAdapter, @@ -154,6 +155,74 @@ export function createServerPipeline ( } } +/** + * @public + */ + +export function createBackupPipeline ( + metrics: MeasureContext, + dbUrls: string, + opt: { + usePassedCtx?: boolean + adapterSecurity?: boolean + + externalStorage: StorageAdapter + } +): PipelineFactory { + return (ctx, workspace, upgrade, broadcast, branding) => { + const metricsCtx = opt.usePassedCtx === true ? ctx : metrics + const wsMetrics = metricsCtx.newChild('🧲 backup', {}) + const conf = getConfig( + metrics, + dbUrls, + workspace, + branding, + wsMetrics, + { + ...opt, + fullTextUrl: '', + indexParallel: 0, + indexProcessing: 0, + rekoniUrl: '', + disableTriggers: true + }, + { + adapters: { + FullTextBlob: { + factory: async () => new DummyDbAdapter(), + url: '' + } + }, + fulltextAdapter: { + factory: async () => new DummyFullTextAdapter(), + stages: () => [], + url: '' + } + } + ) + + const middlewares: MiddlewareCreator[] = [ + LowLevelMiddleware.create, + ContextNameMiddleware.create, + DomainFindMiddleware.create, + DBAdapterInitMiddleware.create, + ModelMiddleware.create, + DBAdapterMiddleware.create(conf) + ] + + const hierarchy = new Hierarchy() + const modelDb = new ModelDb(hierarchy) + const context: PipelineContext = { + workspace, + branding, + modelDb, + hierarchy, + storageAdapter: opt.externalStorage + } + return createPipeline(ctx, middlewares, context) + } +} + export async function getServerPipeline ( ctx: MeasureContext, mongodbUri: string, @@ -211,7 +280,7 @@ export async function getServerPipeline ( } } -function getConfig ( +export function getConfig ( metrics: MeasureContext, dbUrls: string, workspace: WorkspaceIdWithUrl, diff --git a/server/server/src/sessionManager.ts b/server/server/src/sessionManager.ts index e5c7e29627..b06aaa8dfb 100644 --- a/server/server/src/sessionManager.ts +++ b/server/server/src/sessionManager.ts @@ -865,16 +865,22 @@ class TSessionManager implements SessionManager { request: Request, workspace: string // wsId, toWorkspaceString() ): void { - const userCtx = requestCtx.newChild('📞 client', { - workspace: '🧲 ' + workspace - }) + const backupMode = service.getMode() === 'backup' + + const userCtx = requestCtx.newChild( + '📞 client', + !backupMode + ? { + workspace: '🧲 ' + workspace + } + : {} + ) // Calculate total number of clients const reqId = generateId() const st = Date.now() try { - const backupMode = 'loadChunk' in service void userCtx.with(`🧭 ${backupMode ? 'handleBackup' : 'handleRequest'}`, {}, async (ctx) => { if (request.time != null) { const delta = Date.now() - request.time