From b64dc7b54f710b0271b9eab839b1fb41bf53ad6d Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Wed, 17 Apr 2024 22:18:15 +0700 Subject: [PATCH] UBERF-6490: Rework backup tool (#5386) Signed-off-by: Andrey Sobolev --- common/config/rush/pnpm-lock.yaml | 47 +- dev/tool/src/index.ts | 78 +- packages/core/src/classes.ts | 17 + plugins/client-resources/src/connection.ts | 17 +- .../src/components/GuestApp.svelte | 4 +- plugins/guest-resources/src/connect.ts | 22 +- .../components/issues/edit/EditIssue.svelte | 4 +- .../src/components/WorkbenchApp.svelte | 4 +- plugins/workbench-resources/src/connect.ts | 33 +- pods/backup/package.json | 6 +- pods/backup/src/index.ts | 24 +- pods/backup/src/platform.ts | 119 -- rush.json | 5 + server/account/src/index.ts | 25 +- server/backup-service/.eslintrc.js | 7 + server/backup-service/.npmignore | 4 + server/backup-service/config/rig.json | 4 + server/backup-service/jest.config.js | 7 + server/backup-service/package.json | 51 + .../backup-service}/src/config.ts | 15 +- server/backup-service/src/index.ts | 58 + server/backup-service/tsconfig.json | 10 + server/backup/src/backup.ts | 1322 +++++++++++++++++ server/backup/src/index.ts | 1234 +-------------- server/backup/src/service.ts | 115 ++ server/ws/src/server.ts | 52 +- server/ws/src/server_http.ts | 8 +- 27 files changed, 1824 insertions(+), 1468 deletions(-) delete mode 100644 pods/backup/src/platform.ts create mode 100644 server/backup-service/.eslintrc.js create mode 100644 server/backup-service/.npmignore create mode 100644 server/backup-service/config/rig.json create mode 100644 server/backup-service/jest.config.js create mode 100644 server/backup-service/package.json rename {pods/backup => server/backup-service}/src/config.ts (81%) create mode 100644 server/backup-service/src/index.ts create mode 100644 server/backup-service/tsconfig.json create mode 100644 server/backup/src/backup.ts create mode 100644 server/backup/src/service.ts diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml index 877b590bba..64b07447dd 100644 --- a/common/config/rush/pnpm-lock.yaml +++ b/common/config/rush/pnpm-lock.yaml @@ -56,6 +56,9 @@ dependencies: '@rush-temp/auth-providers': specifier: file:./projects/auth-providers.tgz version: file:projects/auth-providers.tgz(@types/node@20.11.19)(esbuild@0.20.1)(ts-node@10.9.2) + '@rush-temp/backup-service': + specifier: file:./projects/backup-service.tgz + version: file:projects/backup-service.tgz(esbuild@0.20.1)(ts-node@10.9.2) '@rush-temp/bitrix': specifier: file:./projects/bitrix.tgz version: file:projects/bitrix.tgz(@types/node@20.11.19)(esbuild@0.20.1)(ts-node@10.9.2) @@ -17038,7 +17041,7 @@ packages: dev: false file:projects/account.tgz(@types/node@20.11.19)(bufferutil@4.0.8)(esbuild@0.20.1)(ts-node@10.9.2): - resolution: {integrity: sha512-gdOrX67T6n/Km9z3E9vHU90AXEZ8QwyOYLrLQfO0w+EanVjFErtvCNomVUIsgTkfjWk4diMN/FscChJ2FfO0lw==, tarball: file:projects/account.tgz} + resolution: {integrity: sha512-9RuPhqNNHTYjQwezirzcJ6WZpMJTzbjc72jZSJC6dQFG7WqW0a2QZ8VTaa32IM902stPP950kaRUDGGEKlxEwg==, tarball: file:projects/account.tgz} id: file:projects/account.tgz name: '@rush-temp/account' version: 0.0.0 @@ -17416,6 +17419,38 @@ packages: - ts-node dev: false + file:projects/backup-service.tgz(esbuild@0.20.1)(ts-node@10.9.2): + resolution: {integrity: sha512-V3tol7QGRHUEOi2hp9fv+nnjYVJJiMo/Pvfl5aRoL/CySk9vq/8KdQ6dBI2cFTLsRVjHiS4F4tzgsxcgZ09DQw==, tarball: file:projects/backup-service.tgz} + id: file:projects/backup-service.tgz + name: '@rush-temp/backup-service' + version: 0.0.0 + dependencies: + '@types/jest': 29.5.12 + '@types/node': 20.11.19 + '@types/tar-stream': 2.2.3 + '@typescript-eslint/eslint-plugin': 6.21.0(@typescript-eslint/parser@6.21.0)(eslint@8.56.0)(typescript@5.3.3) + '@typescript-eslint/parser': 6.21.0(eslint@8.56.0)(typescript@5.3.3) + eslint: 8.56.0 + eslint-config-standard-with-typescript: 40.0.0(@typescript-eslint/eslint-plugin@6.21.0)(eslint-plugin-import@2.29.1)(eslint-plugin-n@15.7.0)(eslint-plugin-promise@6.1.1)(eslint@8.56.0)(typescript@5.3.3) + eslint-plugin-import: 2.29.1(eslint@8.56.0) + eslint-plugin-n: 15.7.0(eslint@8.56.0) + eslint-plugin-promise: 6.1.1(eslint@8.56.0) + jest: 29.7.0(@types/node@20.11.19)(ts-node@10.9.2) + prettier: 3.2.5 + tar-stream: 2.2.0 + ts-jest: 29.1.2(esbuild@0.20.1)(jest@29.7.0)(typescript@5.3.3) + typescript: 5.3.3 + transitivePeerDependencies: + - '@babel/core' + - '@jest/types' + - babel-jest + - babel-plugin-macros + - esbuild + - node-notifier + - supports-color + - ts-node + dev: false + file:projects/bitrix-assets.tgz(esbuild@0.20.1)(ts-node@10.9.2): resolution: {integrity: sha512-aQD6a0vO8LgVaG8WlZUbCBOj/Al75h7pzI3u4zDr9QTqcCi4mj302yoJzPj7dRHBoTtmE0U/kqZsfVZaeOURxw==, tarball: file:projects/bitrix-assets.tgz} id: file:projects/bitrix-assets.tgz @@ -18081,7 +18116,7 @@ packages: dev: false file:projects/contact-resources.tgz(@types/node@20.11.19)(esbuild@0.20.1)(postcss-load-config@4.0.2)(postcss@8.4.35)(ts-node@10.9.2): - resolution: {integrity: sha512-VgYcfJpD4o2gzdBvypvWGaC7aOMeMYNL+Mc7+CnvxKI7mwSu1urHvxzrJCsKnptub6dBNxarnj0ESfdb4Gr+ww==, tarball: file:projects/contact-resources.tgz} + resolution: {integrity: sha512-IFjZO0UGQ83ccZ4ki0quTRNpYBaVMpRyv5gf7QoOm85RbUzxhBm7jJOH2HXGBfL+XSSlmgt8TFxhLJDoGBLw0g==, tarball: file:projects/contact-resources.tgz} id: file:projects/contact-resources.tgz name: '@rush-temp/contact-resources' version: 0.0.0 @@ -18840,7 +18875,7 @@ packages: dev: false file:projects/hr-resources.tgz(@types/node@20.11.19)(esbuild@0.20.1)(postcss-load-config@4.0.2)(postcss@8.4.35)(ts-node@10.9.2): - resolution: {integrity: sha512-82/nL+Vm0JAnfzAJV1hW8bQWYYeOFHhjYh/rXvkOJDttpW+m/J/b4uE9xIsSAmP+g2SSfzFEWysCr6p5gfEmmQ==, tarball: file:projects/hr-resources.tgz} + resolution: {integrity: sha512-qkvCJBEeNFe/BPRkLU3SasE4DMHQBxotTd4FahDfLz8x/gDDbphklWmXIJh5rOdV+XLiCfpKGyWg9yYUELcuRg==, tarball: file:projects/hr-resources.tgz} id: file:projects/hr-resources.tgz name: '@rush-temp/hr-resources' version: 0.0.0 @@ -22371,7 +22406,7 @@ packages: dev: false file:projects/server-notification-resources.tgz(@types/node@20.11.19)(esbuild@0.20.1)(ts-node@10.9.2): - resolution: {integrity: sha512-9ctaiwEU+M3RZ4Qz0/OuQ1lLh4Os9V8vlpf2FK/cpGOvAt5RUfV0qwiJLaImsLVbS62uz6hbZ1oy7Q3T+qex8Q==, tarball: file:projects/server-notification-resources.tgz} + resolution: {integrity: sha512-uhmmKoMg/lF5f1B/DlmNUTfo8sDauKZSZEgqSNbNw8SsYoRJG9/xQHlXRPHs71tKeskt+jvwgs/z2385w5R3IA==, tarball: file:projects/server-notification-resources.tgz} id: file:projects/server-notification-resources.tgz name: '@rush-temp/server-notification-resources' version: 0.0.0 @@ -23007,7 +23042,7 @@ packages: dev: false file:projects/server-tracker-resources.tgz(@types/node@20.11.19)(esbuild@0.20.1)(ts-node@10.9.2): - resolution: {integrity: sha512-sSU5k2FJCNeg9Jsmbkv3MskBYBjaYD+QOJBZCWaN5z0wpj2Gr3+dzhInFV3NMNn1wT8jjx91i2lQ3DOxK1CdJA==, tarball: file:projects/server-tracker-resources.tgz} + resolution: {integrity: sha512-XBDFV1uHjmsarrrNZIixKXw9ov021Xq0yqDIwNWtJ3pLLXK1gFYPTWDARljpQx1DKFvB6Hy+qUxTmOj4xunNAQ==, tarball: file:projects/server-tracker-resources.tgz} id: file:projects/server-tracker-resources.tgz name: '@rush-temp/server-tracker-resources' version: 0.0.0 @@ -23929,7 +23964,7 @@ packages: dev: false file:projects/text-editor.tgz(@types/node@20.11.19)(bufferutil@4.0.8)(esbuild@0.20.1)(postcss-load-config@4.0.2)(postcss@8.4.35)(prosemirror-model@1.19.4)(ts-node@10.9.2): - resolution: {integrity: sha512-JbV3dbxXXUQ8HmfqUkI/XcmvHmhOOAwJcwkFybbg8pNHPO8jTRKmYQB6HyXNMgoe4XFfxKDwWRFG/DS0QBaLzg==, tarball: file:projects/text-editor.tgz} + resolution: {integrity: sha512-G6/SsJ7NNINfdGwdI+6F9alZ+ow8Kmv+48UNx08Jh7Xz6lwvHuXWKRgh16LpzR7b5IPZJLyK3uz4zFo50JMTxA==, tarball: file:projects/text-editor.tgz} id: file:projects/text-editor.tgz name: '@rush-temp/text-editor' version: 0.0.0 diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index a190fe3066..7be5ea2ddb 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -25,7 +25,6 @@ import { getAccount, getWorkspaceById, listAccounts, - listWorkspaces, listWorkspacesPure, listWorkspacesRaw, replacePassword, @@ -58,6 +57,7 @@ import core, { MeasureMetricsContext, metricsToString, RateLimiter, + versionToString, type AccountRole, type Data, type Tx, @@ -68,6 +68,7 @@ import contact from '@hcengineering/model-contact' import { getMongoClient, getWorkspaceDB } from '@hcengineering/mongo' import { openAIConfigDefaults } from '@hcengineering/openai' import { type StorageAdapter } from '@hcengineering/server-core' +import { deepEqual } from 'fast-equals' import path from 'path' import { benchmark } from './benchmark' import { @@ -86,6 +87,18 @@ import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin' import { openAIConfig } from './openai' import { fixAccountEmails, renameAccount } from './renameAccount' +const colorConstants = { + colorRed: '\u001b[31m', + colorBlue: '\u001b[34m', + colorWhiteCyan: '\u001b[37;46m', + colorRedYellow: '\u001b[31;43m', + colorPing: '\u001b[38;5;201m', + colorLavander: '\u001b[38;5;147m', + colorAqua: '\u001b[38;2;145;231;255m', + colorPencil: '\u001b[38;2;253;182;0m', + reset: '\u001b[0m' +} + /** * @public */ @@ -468,11 +481,42 @@ export function devTool ( program .command('list-workspaces') .description('List workspaces') - .action(async () => { + .option('-e|--expired [expired]', 'Show only expired', false) + .action(async (cmd: { expired: boolean }) => { const { mongodbUri, version } = prepareTools() await withDatabase(mongodbUri, async (db) => { - const workspacesJSON = JSON.stringify(await listWorkspaces(toolCtx, db, productId), null, 2) - console.info(workspacesJSON) + const workspacesJSON = await listWorkspacesPure(db, productId) + for (const ws of workspacesJSON) { + let lastVisit = Math.floor((Date.now() - ws.lastVisit) / 1000 / 3600 / 24) + if (cmd.expired && lastVisit <= 7) { + continue + } + console.log( + colorConstants.colorBlue + + '####################################################################################################' + + colorConstants.reset + ) + console.log('id:', colorConstants.colorWhiteCyan + ws.workspace + colorConstants.reset) + console.log('url:', ws.workspaceUrl, 'name:', ws.workspaceName) + console.log( + 'version:', + ws.version !== undefined ? versionToString(ws.version) : 'not-set', + !deepEqual(ws.version, version) ? `upgrade to ${versionToString(version)} is required` : '' + ) + console.log('disabled:', ws.disabled) + console.log('created by:', ws.createdBy) + console.log('members:', (ws.accounts ?? []).length) + if (Number.isNaN(lastVisit)) { + lastVisit = 365 + } + if (lastVisit > 30) { + console.log(colorConstants.colorRed + `last visit: ${lastVisit} days ago` + colorConstants.reset) + } else if (lastVisit > 7) { + console.log(colorConstants.colorRedYellow + `last visit: ${lastVisit} days ago` + colorConstants.reset) + } else { + console.log('last visit:', lastVisit, 'days ago') + } + } console.log('latest model version:', JSON.stringify(version)) }) @@ -481,7 +525,7 @@ export function devTool ( program.command('fix-person-accounts').action(async () => { const { mongodbUri, version } = prepareTools() await withDatabase(mongodbUri, async (db, client) => { - const ws = await listWorkspaces(toolCtx, db, productId) + const ws = await listWorkspacesPure(db, productId) for (const w of ws) { const wsDb = getWorkspaceDB(client, { name: w.workspace, productId }) await wsDb.collection('tx').updateMany( @@ -534,6 +578,7 @@ export function devTool ( .action(async (dirName: string, workspace: string, cmd: { skip: string, force: boolean }) => { const storage = await createFileBackupStorage(dirName) await backup( + toolCtx, transactorUrl, getWorkspaceId(workspace, productId), storage, @@ -548,7 +593,7 @@ export function devTool ( .option('-f, --force', 'Force compact.', false) .action(async (dirName: string, cmd: { force: boolean }) => { const storage = await createFileBackupStorage(dirName) - await compactBackup(storage, cmd.force) + await compactBackup(toolCtx, storage, cmd.force) }) program @@ -557,7 +602,14 @@ export function devTool ( .description('dump workspace transactions and minio resources') .action(async (dirName: string, workspace: string, date, cmd: { merge: boolean }) => { const storage = await createFileBackupStorage(dirName) - await restore(transactorUrl, getWorkspaceId(workspace, productId), storage, parseInt(date ?? '-1'), cmd.merge) + await restore( + toolCtx, + transactorUrl, + getWorkspaceId(workspace, productId), + storage, + parseInt(date ?? '-1'), + cmd.merge + ) }) program @@ -579,7 +631,7 @@ export function devTool ( getWorkspaceId(bucketName, productId), dirName ) - await backup(transactorUrl, getWorkspaceId(workspace, productId), storage) + await backup(toolCtx, transactorUrl, getWorkspaceId(workspace, productId), storage) }) program @@ -594,7 +646,7 @@ export function devTool ( getWorkspaceId(bucketName, productId), dirName ) - await compactBackup(storage, cmd.force) + await compactBackup(toolCtx, storage, cmd.force) }) program @@ -611,11 +663,11 @@ export function devTool ( getWorkspaceId(bucketName, productId), dirName ) - const workspaces = await listWorkspaces(toolCtx, db, productId) + const workspaces = await listWorkspacesPure(db, productId) for (const w of workspaces) { console.log(`clearing ${w.workspace} history:`) - await compactBackup(storage, cmd.force) + await compactBackup(toolCtx, storage, cmd.force) } }) }) @@ -625,7 +677,7 @@ export function devTool ( .action(async (bucketName: string, dirName: string, workspace: string, date, cmd) => { const { storageAdapter } = prepareTools() const storage = await createStorageBackupStorage(toolCtx, storageAdapter, getWorkspaceId(bucketName), dirName) - await restore(transactorUrl, getWorkspaceId(workspace, productId), storage, parseInt(date ?? '-1')) + await restore(toolCtx, transactorUrl, getWorkspaceId(workspace, productId), storage, parseInt(date ?? '-1')) }) program .command('backup-s3-list ') @@ -695,7 +747,7 @@ export function devTool ( process.exit(1) } - const workspaces = await listWorkspaces(toolCtx, db, productId) + const workspaces = await listWorkspacesPure(db, productId) for (const w of workspaces) { console.log(`clearing ${w.workspace} history:`) diff --git a/packages/core/src/classes.ts b/packages/core/src/classes.ts index e2e576d83e..29c89f1b84 100644 --- a/packages/core/src/classes.ts +++ b/packages/core/src/classes.ts @@ -650,3 +650,20 @@ export interface DomainIndexConfiguration extends Doc { skip?: string[] } + +export interface BaseWorkspaceInfo { + workspace: string // An uniq workspace name, Database names + productId: string + disabled?: boolean + version?: Data + + workspaceUrl?: string | null // An optional url to the workspace, if not set workspace will be used + workspaceName?: string // An displayed workspace name + createdOn: number + lastVisit: number + + createdBy: string + + creating?: boolean + createProgress?: number // Some progress +} diff --git a/plugins/client-resources/src/connection.ts b/plugins/client-resources/src/connection.ts index 0e4a67a297..a06793ecac 100644 --- a/plugins/client-resources/src/connection.ts +++ b/plugins/client-resources/src/connection.ts @@ -79,6 +79,8 @@ class Connection implements ClientConnection { private sessionId: string | undefined private closed = false + private upgrading: boolean = false + private pingResponse: number = Date.now() constructor ( @@ -161,7 +163,11 @@ class Connection implements ClientConnection { throw new Error('connection closed') } this.pending = undefined - console.log('failed to connect', err) + if (!this.upgrading) { + console.log('connection: failed to connect', this.lastId) + } else { + console.log('connection: workspace during upgrade', this.lastId) + } if (err?.code === UNAUTHORIZED.code) { Analytics.handleError(err) this.onUnauthorized?.() @@ -169,7 +175,9 @@ class Connection implements ClientConnection { } await new Promise((resolve) => { setTimeout(() => { - console.log(`delay ${this.delay} second`) + if (!this.upgrading) { + console.log(`delay ${this.delay} second`) + } resolve(null) if (this.delay < 5) { this.delay++ @@ -220,7 +228,12 @@ class Connection implements ClientConnection { websocket.onmessage = (event: MessageEvent) => { const resp = readResponse(event.data, binaryResponse) + if (resp.id === -1 && resp.result === 'upgrading') { + this.upgrading = true + return + } if (resp.id === -1 && resp.result === 'hello') { + this.upgrading = false if ((resp as HelloResponse).alreadyConnected === true) { this.sessionId = generateId() if (typeof sessionStorage !== 'undefined') { diff --git a/plugins/guest-resources/src/components/GuestApp.svelte b/plugins/guest-resources/src/components/GuestApp.svelte index f54a42deb3..cd9893882d 100644 --- a/plugins/guest-resources/src/components/GuestApp.svelte +++ b/plugins/guest-resources/src/components/GuestApp.svelte @@ -26,11 +26,11 @@ {#await connect(getMetadata(workbench.metadata.PlatformTitle) ?? 'Platform')} {:then client} - {#if !client && versionError} + {#if $versionError}

- {versionError} + {$versionError}
{:else if client} diff --git a/plugins/guest-resources/src/connect.ts b/plugins/guest-resources/src/connect.ts index 549681c9b2..41f670cb7f 100644 --- a/plugins/guest-resources/src/connect.ts +++ b/plugins/guest-resources/src/connect.ts @@ -2,18 +2,19 @@ import { Analytics } from '@hcengineering/analytics' import client from '@hcengineering/client' import core, { ClientConnectEvent, + setCurrentAccount, versionToString, type AccountClient, type Client, - type Version, - setCurrentAccount + type Version } from '@hcengineering/core' import login, { loginId } from '@hcengineering/login' import { getMetadata, getResource, setMetadata } from '@hcengineering/platform' import presentation, { closeClient, refreshClient, setClient } from '@hcengineering/presentation' import { fetchMetadataLocalStorage, getCurrentLocation, navigate, setMetadataLocalStorage } from '@hcengineering/ui' +import { writable } from 'svelte/store' -export let versionError: string | undefined = '' +export const versionError = writable(undefined) let _token: string | undefined let _client: AccountClient | undefined @@ -93,7 +94,7 @@ export async function connect (title: string): Promise { if (currentVersionStr !== reconnectVersionStr) { // It seems upgrade happened // location.reload() - versionError = `${currentVersionStr} != ${reconnectVersionStr}` + versionError.set(`${currentVersionStr} != ${reconnectVersionStr}`) } const serverVersion: { version: string } = await ( await fetch(serverEndpoint + '/api/v1/version', {}) @@ -101,7 +102,7 @@ export async function connect (title: string): Promise { console.log('Server version', serverVersion.version) if (serverVersion.version !== '' && serverVersion.version !== currentVersionStr) { - versionError = `${currentVersionStr} => ${serverVersion.version}` + versionError.set(`${currentVersionStr} => ${serverVersion.version}`) } } })() @@ -131,7 +132,7 @@ export async function connect (title: string): Promise { const versionStr = versionToString(version) if (version === undefined || requiredVersion !== versionStr) { - versionError = `${versionStr} => ${requiredVersion}` + versionError.set(`${versionStr} => ${requiredVersion}`) return undefined } } @@ -139,17 +140,17 @@ export async function connect (title: string): Promise { try { const serverVersion: { version: string } = await (await fetch(serverEndpoint + '/api/v1/version', {})).json() - console.log('Server version', serverVersion.version) + console.log('Server version', serverVersion.version, version !== undefined ? versionToString(version) : '') if ( serverVersion.version !== '' && (version === undefined || serverVersion.version !== versionToString(version)) ) { const versionStr = version !== undefined ? versionToString(version) : 'unknown' - versionError = `${versionStr} => ${serverVersion.version}` + versionError.set(`${versionStr} => ${serverVersion.version}`) return } } catch (err: any) { - versionError = 'server version not available' + versionError.set('server version not available') return } } catch (err: any) { @@ -158,11 +159,12 @@ export async function connect (title: string): Promise { const requirdVersion = getMetadata(presentation.metadata.RequiredVersion) console.log('checking min model version', requirdVersion) if (requirdVersion !== undefined) { - versionError = `'unknown' => ${requirdVersion}` + versionError.set(`'unknown' => ${requirdVersion}`) return undefined } } + versionError.set(undefined) // Update window title document.title = [ws, title].filter((it) => it).join(' - ') _clientSet = true diff --git a/plugins/tracker-resources/src/components/issues/edit/EditIssue.svelte b/plugins/tracker-resources/src/components/issues/edit/EditIssue.svelte index 64a4e66e37..64ff383df7 100644 --- a/plugins/tracker-resources/src/components/issues/edit/EditIssue.svelte +++ b/plugins/tracker-resources/src/components/issues/edit/EditIssue.svelte @@ -206,13 +206,13 @@ {/if} {#if saved}
{:then client} - {#if !client && versionError} + {#if $versionError}
{#if isNeedUpgrade} @@ -73,7 +73,7 @@ {:else}

{/if} - {versionError} + {$versionError}
{:else if client} diff --git a/plugins/workbench-resources/src/connect.ts b/plugins/workbench-resources/src/connect.ts index 4f91a28cdd..3b6d23e7f0 100644 --- a/plugins/workbench-resources/src/connect.ts +++ b/plugins/workbench-resources/src/connect.ts @@ -22,10 +22,11 @@ import { networkStatus, setMetadataLocalStorage } from '@hcengineering/ui' +import { writable } from 'svelte/store' import plugin from './plugin' import { workspaceCreating } from './utils' -export let versionError: string | undefined = '' +export const versionError = writable(undefined) let _token: string | undefined let _client: AccountClient | undefined @@ -176,7 +177,7 @@ export async function connect (title: string): Promise { if (currentVersionStr !== reconnectVersionStr) { // It seems upgrade happened // location.reload() - versionError = `${currentVersionStr} != ${reconnectVersionStr}` + versionError.set(`${currentVersionStr} != ${reconnectVersionStr}`) } const serverVersion: { version: string } = await ctx.with( 'fetch-server-version', @@ -184,9 +185,15 @@ export async function connect (title: string): Promise { async () => await (await fetch(serverEndpoint + '/api/v1/version', {})).json() ) - console.log('Server version', serverVersion.version) + console.log( + 'Server version', + serverVersion.version, + version !== undefined ? versionToString(version) : '' + ) if (serverVersion.version !== '' && serverVersion.version !== currentVersionStr) { - versionError = `${currentVersionStr} => ${serverVersion.version}` + versionError.set(`${currentVersionStr} => ${serverVersion.version}`) + } else { + versionError.set(undefined) } } })() @@ -237,7 +244,7 @@ export async function connect (title: string): Promise { const versionStr = versionToString(version) if (version === undefined || requiredVersion !== versionStr) { - versionError = `${versionStr} => ${requiredVersion}` + versionError.set(`${versionStr} => ${requiredVersion}`) return undefined } } @@ -249,30 +256,32 @@ export async function connect (title: string): Promise { async () => await (await fetch(serverEndpoint + '/api/v1/version', {})).json() ) - console.log('Server version', serverVersion.version) + console.log('Server version', serverVersion.version, version !== undefined ? versionToString(version) : '') if ( serverVersion.version !== '' && (version === undefined || serverVersion.version !== versionToString(version)) ) { const versionStr = version !== undefined ? versionToString(version) : 'unknown' - versionError = `${versionStr} => ${serverVersion.version}` + versionError.set(`${versionStr} => ${serverVersion.version}`) return } } catch (err: any) { - versionError = 'server version not available' + versionError.set('server version not available') return } } catch (err: any) { console.error(err) Analytics.handleError(err) - const requirdVersion = getMetadata(presentation.metadata.RequiredVersion) - console.log('checking min model version', requirdVersion) - if (requirdVersion !== undefined) { - versionError = `'unknown' => ${requirdVersion}` + const requiredVersion = getMetadata(presentation.metadata.RequiredVersion) + console.log('checking min model version', requiredVersion) + if (requiredVersion !== undefined) { + versionError.set(`'unknown' => ${requiredVersion}`) return undefined } } + versionError.set(undefined) + // Update window title document.title = [ws, title].filter((it) => it).join(' - ') _clientSet = true diff --git a/pods/backup/package.json b/pods/backup/package.json index a81f6d5fc1..3adbc7bc1d 100644 --- a/pods/backup/package.json +++ b/pods/backup/package.json @@ -47,16 +47,12 @@ }, "dependencies": { "@hcengineering/platform": "^0.6.9", - "mongodb": "^6.3.0", "@hcengineering/server-tool": "^0.6.0", "@hcengineering/server-token": "^0.6.7", "@hcengineering/client": "^0.6.14", "@hcengineering/client-resources": "^0.6.23", "@hcengineering/core": "^0.6.28", "dotenv": "~16.0.0", - "got": "^11.8.3", - "@hcengineering/server-backup": "^0.6.0", - "@hcengineering/server-core": "^0.6.1", - "@hcengineering/minio": "^0.6.0" + "@hcengineering/backup-service": "^0.6.0" } } diff --git a/pods/backup/src/index.ts b/pods/backup/src/index.ts index d65a115781..6750032d63 100644 --- a/pods/backup/src/index.ts +++ b/pods/backup/src/index.ts @@ -13,24 +13,8 @@ // limitations under the License. // -import { PlatformWorker } from './platform' +import { MeasureMetricsContext } from '@hcengineering/core' +import { startBackup } from '@hcengineering/backup-service' -const main = async (): Promise => { - const platformWorker = await PlatformWorker.create() - - const shutdown = (): void => { - void platformWorker.close().then(() => { - process.exit() - }) - } - - process.on('SIGINT', shutdown) - process.on('SIGTERM', shutdown) - process.on('uncaughtException', (e) => { - console.error(e) - }) - process.on('unhandledRejection', (e) => { - console.error(e) - }) -} -void main() +const ctx = new MeasureMetricsContext('backup-service', {}) +startBackup(ctx) diff --git a/pods/backup/src/platform.ts b/pods/backup/src/platform.ts deleted file mode 100644 index 7a272ee752..0000000000 --- a/pods/backup/src/platform.ts +++ /dev/null @@ -1,119 +0,0 @@ -// -// Copyright © 2022 Hardcore Engineering Inc. -// -// Licensed under the Eclipse Public License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. You may -// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// -// See the License for the specific language governing permissions and -// limitations under the License. -// - -import { getWorkspaceId, MeasureMetricsContext } from '@hcengineering/core' -import { MinioService } from '@hcengineering/minio' -import { setMetadata } from '@hcengineering/platform' -import { backup, createStorageBackupStorage } from '@hcengineering/server-backup' -import { type StorageAdapter } from '@hcengineering/server-core' -import serverToken from '@hcengineering/server-token' -import toolPlugin from '@hcengineering/server-tool' -import got from 'got' -import { type ObjectId } from 'mongodb' -import config from './config' - -/** - * @public - */ -export interface Workspace { - _id: ObjectId - workspace: string - organisation: string - accounts: ObjectId[] - productId: string -} - -async function getWorkspaces (): Promise { - const { body }: { body: { error?: string, result?: any[] } } = await got.post(config.AccountsURL, { - json: { - method: 'listWorkspaces', - params: [] - }, - responseType: 'json' - }) - - if (body.error !== undefined) { - throw Error(body.error) - } - - return (body.result as Workspace[]) ?? [] -} - -export class PlatformWorker { - storageAdapter!: StorageAdapter - - async close (): Promise {} - - async init (): Promise { - setMetadata(serverToken.metadata.Secret, config.Secret) - let minioPort = 9000 - let minioEndpoint = config.MinioEndpoint - const sp = minioEndpoint.split(':') - if (sp.length > 1) { - minioEndpoint = sp[0] - minioPort = parseInt(sp[1]) - } - - this.storageAdapter = new MinioService({ - endPoint: minioEndpoint, - port: minioPort, - useSSL: false, - accessKey: config.MinioAccessKey, - secretKey: config.MinioSecretKey - }) - - setMetadata(toolPlugin.metadata.UserAgent, config.ServiceID) - - await this.backup().then(() => { - void this.schedule() - }) - } - - async schedule (): Promise { - console.log('schedule timeout for', config.Interval, ' seconds') - setTimeout(() => { - void this.backup().then(() => { - void this.schedule() - }) - }, config.Interval * 1000) - } - - async backup (): Promise { - const workspaces = await getWorkspaces() - const ctx = new MeasureMetricsContext('backup', {}) - for (const ws of workspaces) { - console.log('\n\nBACKUP WORKSPACE ', ws.workspace, ws.productId) - try { - const storage = await createStorageBackupStorage( - ctx, - this.storageAdapter, - getWorkspaceId('backups', ws.productId), - ws.workspace - ) - await backup(config.TransactorURL, getWorkspaceId(ws.workspace, ws.productId), storage) - } catch (err: any) { - console.error('\n\nFAILED to BACKUP', ws, err) - } - } - } - - static async create (): Promise { - const worker = new PlatformWorker() - await worker.init() - return worker - } - - private constructor () {} -} diff --git a/rush.json b/rush.json index f019add768..05e546beb6 100644 --- a/rush.json +++ b/rush.json @@ -1367,6 +1367,11 @@ "projectFolder": "server/backup", "shouldPublish": false }, + { + "packageName": "@hcengineering/backup-service", + "projectFolder": "server/backup-service", + "shouldPublish": false + }, { "packageName": "@hcengineering/pod-backup", "projectFolder": "pods/backup", diff --git a/server/account/src/index.ts b/server/account/src/index.ts index 34fb689864..735eb7e9be 100644 --- a/server/account/src/index.ts +++ b/server/account/src/index.ts @@ -26,6 +26,7 @@ import contact, { } from '@hcengineering/contact' import core, { AccountRole, + BaseWorkspaceInfo, Client, concatLink, Data, @@ -100,23 +101,9 @@ export interface Account { /** * @public */ -export interface Workspace { +export interface Workspace extends BaseWorkspaceInfo { _id: ObjectId - workspace: string // An uniq workspace name, Database names accounts: ObjectId[] - productId: string - disabled?: boolean - version?: Data - - workspaceUrl?: string | null // An optional url to the workspace, if not set workspace will be used - workspaceName?: string // An displayed workspace name - createdOn: number - lastVisit: number - - createdBy: string - - creating?: boolean - createProgress?: number // Some progress } /** @@ -664,7 +651,13 @@ export async function createAccount ( /** * @public */ -export async function listWorkspaces (ctx: MeasureContext, db: Db, productId: string): Promise { +export async function listWorkspaces ( + ctx: MeasureContext, + db: Db, + productId: string, + token: string +): Promise { + decodeToken(token) // Just verify token is valid return (await db.collection(WORKSPACE_COLLECTION).find(withProductId(productId, {})).toArray()) .map((it) => ({ ...it, productId })) .filter((it) => it.disabled !== true) diff --git a/server/backup-service/.eslintrc.js b/server/backup-service/.eslintrc.js new file mode 100644 index 0000000000..72235dc283 --- /dev/null +++ b/server/backup-service/.eslintrc.js @@ -0,0 +1,7 @@ +module.exports = { + extends: ['./node_modules/@hcengineering/platform-rig/profiles/default/eslint.config.json'], + parserOptions: { + tsconfigRootDir: __dirname, + project: './tsconfig.json' + } +} diff --git a/server/backup-service/.npmignore b/server/backup-service/.npmignore new file mode 100644 index 0000000000..e3ec093c38 --- /dev/null +++ b/server/backup-service/.npmignore @@ -0,0 +1,4 @@ +* +!/lib/** +!CHANGELOG.md +/lib/**/__tests__/ diff --git a/server/backup-service/config/rig.json b/server/backup-service/config/rig.json new file mode 100644 index 0000000000..0110930f55 --- /dev/null +++ b/server/backup-service/config/rig.json @@ -0,0 +1,4 @@ +{ + "$schema": "https://developer.microsoft.com/json-schemas/rig-package/rig.schema.json", + "rigPackageName": "@hcengineering/platform-rig" +} diff --git a/server/backup-service/jest.config.js b/server/backup-service/jest.config.js new file mode 100644 index 0000000000..2cfd408b67 --- /dev/null +++ b/server/backup-service/jest.config.js @@ -0,0 +1,7 @@ +module.exports = { + preset: 'ts-jest', + testEnvironment: 'node', + testMatch: ['**/?(*.)+(spec|test).[jt]s?(x)'], + roots: ["./src"], + coverageReporters: ["text-summary", "html"] +} diff --git a/server/backup-service/package.json b/server/backup-service/package.json new file mode 100644 index 0000000000..07769d2ddf --- /dev/null +++ b/server/backup-service/package.json @@ -0,0 +1,51 @@ +{ + "name": "@hcengineering/backup-service", + "version": "0.6.0", + "main": "lib/index.js", + "svelte": "src/index.ts", + "types": "types/index.d.ts", + "author": "Anticrm Platform Contributors", + "license": "EPL-2.0", + "scripts": { + "build": "compile", + "build:watch": "compile", + "format": "format src", + "run": "ts-node", + "test": "jest --passWithNoTests --silent", + "_phase:build": "compile transpile src", + "_phase:test": "jest --passWithNoTests --silent", + "_phase:format": "format src", + "_phase:validate": "compile validate" + }, + "devDependencies": { + "@hcengineering/platform-rig": "^0.6.0", + "@typescript-eslint/eslint-plugin": "^6.11.0", + "eslint-plugin-import": "^2.26.0", + "eslint-plugin-promise": "^6.1.1", + "eslint-plugin-n": "^15.4.0", + "eslint": "^8.54.0", + "@typescript-eslint/parser": "^6.11.0", + "eslint-config-standard-with-typescript": "^40.0.0", + "prettier": "^3.1.0", + "typescript": "^5.3.3", + "@types/tar-stream": "^2.2.2", + "@types/node": "~20.11.16", + "jest": "^29.7.0", + "ts-jest": "^29.1.1", + "@types/jest": "^29.5.5" + }, + "dependencies": { + "@hcengineering/platform": "^0.6.9", + "@hcengineering/core": "^0.6.28", + "@hcengineering/contact": "^0.6.20", + "@hcengineering/client-resources": "^0.6.23", + "@hcengineering/client": "^0.6.14", + "@hcengineering/model": "^0.6.7", + "tar-stream": "^2.2.0", + "@hcengineering/server-tool": "^0.6.0", + "@hcengineering/server-core": "^0.6.1", + "@hcengineering/server-backup": "^0.6.0", + "@hcengineering/minio": "^0.6.0", + "@hcengineering/server-token": "^0.6.7" + } +} diff --git a/pods/backup/src/config.ts b/server/backup-service/src/config.ts similarity index 81% rename from pods/backup/src/config.ts rename to server/backup-service/src/config.ts index 3c9c859437..f1fb66c2fe 100644 --- a/pods/backup/src/config.ts +++ b/server/backup-service/src/config.ts @@ -13,13 +13,16 @@ // limitations under the License. // -interface Config { +import { type BackupConfig } from '@hcengineering/server-backup' + +interface Config extends Omit { TransactorURL: string AccountsURL: string ServiceID: string Secret: string - Interval: number + Interval: number // Timeout in seconds + Timeout: number // Timeout in seconds BucketName: string MinioEndpoint: string @@ -36,7 +39,8 @@ const envMap: { [key in keyof Config]: string } = { Interval: 'INTERVAL', MinioEndpoint: 'MINIO_ENDPOINT', MinioAccessKey: 'MINIO_ACCESS_KEY', - MinioSecretKey: 'MINIO_SECRET_KEY' + MinioSecretKey: 'MINIO_SECRET_KEY', + Timeout: 'TIMEOUT' } const required: Array = [ @@ -55,12 +59,13 @@ const config: Config = (() => { TransactorURL: process.env[envMap.TransactorURL], AccountsURL: process.env[envMap.AccountsURL], Secret: process.env[envMap.Secret], - BucketName: process.env[envMap.BucketName], + BucketName: process.env[envMap.BucketName] ?? 'backups', ServiceID: process.env[envMap.ServiceID] ?? 'backup-service', Interval: parseInt(process.env[envMap.Interval] ?? '3600'), MinioEndpoint: process.env[envMap.MinioEndpoint], MinioAccessKey: process.env[envMap.MinioAccessKey], - MinioSecretKey: process.env[envMap.MinioSecretKey] + MinioSecretKey: process.env[envMap.MinioSecretKey], + Timeout: parseInt(process.env[envMap.Timeout] ?? '3600') } const missingEnv = required.filter((key) => params[key] === undefined).map((key) => envMap[key]) diff --git a/server/backup-service/src/index.ts b/server/backup-service/src/index.ts new file mode 100644 index 0000000000..0f56280947 --- /dev/null +++ b/server/backup-service/src/index.ts @@ -0,0 +1,58 @@ +// +// Copyright © 2022 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { MeasureContext, systemAccountEmail } from '@hcengineering/core' +import { MinioService } from '@hcengineering/minio' +import { setMetadata } from '@hcengineering/platform' +import { backupService } from '@hcengineering/server-backup' +import serverToken, { generateToken } from '@hcengineering/server-token' +import toolPlugin from '@hcengineering/server-tool' +import config from './config' + +export function startBackup (ctx: MeasureContext): void { + setMetadata(serverToken.metadata.Secret, config.Secret) + + let minioPort = 9000 + let minioEndpoint = config.MinioEndpoint + const sp = minioEndpoint.split(':') + if (sp.length > 1) { + minioEndpoint = sp[0] + minioPort = parseInt(sp[1]) + } + + const storageAdapter = new MinioService({ + endPoint: minioEndpoint, + port: minioPort, + useSSL: false, + accessKey: config.MinioAccessKey, + secretKey: config.MinioSecretKey + }) + + setMetadata(toolPlugin.metadata.UserAgent, config.ServiceID) + + // A token to access account service + const token = generateToken(systemAccountEmail, { name: 'backup', productId: '' }) + + const shutdown = backupService(ctx, storageAdapter, { ...config, Token: token }) + + process.on('SIGINT', shutdown) + process.on('SIGTERM', shutdown) + process.on('uncaughtException', (e) => { + void ctx.error('uncaughtException', { err: e }) + }) + process.on('unhandledRejection', (e) => { + void ctx.error('unhandledRejection', { err: e }) + }) +} diff --git a/server/backup-service/tsconfig.json b/server/backup-service/tsconfig.json new file mode 100644 index 0000000000..59e4fd4297 --- /dev/null +++ b/server/backup-service/tsconfig.json @@ -0,0 +1,10 @@ +{ + "extends": "./node_modules/@hcengineering/platform-rig/profiles/default/tsconfig.json", + + "compilerOptions": { + "rootDir": "./src", + "outDir": "./lib", + "declarationDir": "./types", + "tsBuildInfoFile": ".build/build.tsbuildinfo" + } +} \ No newline at end of file diff --git a/server/backup/src/backup.ts b/server/backup/src/backup.ts new file mode 100644 index 0000000000..808bf9ff1a --- /dev/null +++ b/server/backup/src/backup.ts @@ -0,0 +1,1322 @@ +// +// Copyright © 2020, 2021 Anticrm Platform Contributors. +// Copyright © 2021 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import core, { + AttachedDoc, + BackupClient, + BlobData, + Client as CoreClient, + Doc, + Domain, + DOMAIN_MODEL, + DOMAIN_TRANSIENT, + MeasureContext, + Ref, + SortingOrder, + TxCollectionCUD, + WorkspaceId +} from '@hcengineering/core' +import { connect } from '@hcengineering/server-tool' +import { createGzip } from 'node:zlib' +import { join } from 'path' +import { Writable } from 'stream' +import { extract, Pack, pack } from 'tar-stream' +import { createGunzip, gunzipSync, gzipSync } from 'zlib' +import { BackupStorage } from './storage' +export * from './storage' + +const dataBlobSize = 50 * 1024 * 1024 +const dataUploadSize = 2 * 1024 * 1024 +const retrieveChunkSize = 2 * 1024 * 1024 + +const defaultLevel = 9 + +/** + * @public + */ +export interface Snapshot { + added: Map, string> + updated: Map, string> + removed: Ref[] +} + +/** + * @public + */ +export interface SnapshotV6 { + added: Record, string> + updated: Record, string> + removed: Ref[] +} + +/** + * @public + */ +export interface DomainData { + snapshot?: string // 0.6 json snapshot + snapshots?: string[] + storage?: string[] + + // Some statistics + added: number + updated: number + removed: number +} + +/** + * @public + */ +export interface BackupSnapshot { + // _id => hash of added items. + domains: Record + date: number +} + +/** + * @public + */ +export interface BackupInfo { + workspace: string + version: string + productId: string + snapshots: BackupSnapshot[] + snapshotsIndex?: number + lastTxId?: string +} + +async function loadDigest ( + ctx: MeasureContext, + storage: BackupStorage, + snapshots: BackupSnapshot[], + domain: Domain, + date?: number +): Promise, string>> { + ctx = ctx.newChild('load digest', { domain, count: snapshots.length }) + const result = new Map, string>() + for (const s of snapshots) { + const d = s.domains[domain] + + // Load old JSON snapshot + if (d?.snapshot !== undefined) { + const dChanges: SnapshotV6 = JSON.parse(gunzipSync(await storage.loadFile(d.snapshot)).toString()) + for (const [k, v] of Object.entries(dChanges.added)) { + result.set(k as Ref, v) + } + for (const [k, v] of Object.entries(dChanges.updated)) { + result.set(k as Ref, v) + } + for (const d of dChanges.removed) { + result.delete(d) + } + } + for (const snapshot of d?.snapshots ?? []) { + try { + const dataBlob = gunzipSync(await storage.loadFile(snapshot)) + .toString() + .split('\n') + const addedCount = parseInt(dataBlob.shift() ?? '0') + const added = dataBlob.splice(0, addedCount) + for (const it of added) { + const [k, v] = it.split(';') + result.set(k as Ref, v) + } + + const updatedCount = parseInt(dataBlob.shift() ?? '0') + const updated = dataBlob.splice(0, updatedCount) + for (const it of updated) { + const [k, v] = it.split(';') + result.set(k as Ref, v) + } + + const removedCount = parseInt(dataBlob.shift() ?? '0') + const removed = dataBlob.splice(0, removedCount) + for (const k of removed) { + result.delete(k as Ref) + } + } catch (err: any) { + await ctx.error('digest is broken, will do full backup for', { domain }) + } + } + // Stop if stop date is matched and provided + if (date !== undefined && date === s.date) { + break + } + } + ctx.end() + return result +} + +async function write (chunk: any, stream: Writable): Promise { + let needDrain = false + await new Promise((resolve, reject) => { + needDrain = !stream.write(chunk, (err) => { + if (err != null) { + reject(err) + } else { + resolve(null) + } + }) + }) + if (needDrain) { + await new Promise((resolve, reject) => stream.once('drain', resolve)) + } +} + +async function writeChanges (storage: BackupStorage, snapshot: string, changes: Snapshot): Promise { + const snapshotWritable = await storage.write(snapshot) + const writable = createGzip({ level: defaultLevel }) + writable.pipe(snapshotWritable) + + // Write size + await write(`${changes.added.size}\n`, writable) + for (const [k, v] of changes.added.entries()) { + await write(`${k};${v}\n`, writable) + } + await write(`${changes.updated.size}\n`, writable) + for (const [k, v] of changes.updated.entries()) { + await write(`${k};${v}\n`, writable) + } + await write(`${changes.removed.length}\n`, writable) + for (const k of changes.removed) { + await write(`${k}\n`, writable) + } + writable.end() + await new Promise((resolve) => { + writable.flush(() => { + resolve(null) + }) + }) +} + +/** + * @public + */ +export async function cloneWorkspace ( + transactorUrl: string, + sourceWorkspaceId: WorkspaceId, + targetWorkspaceId: WorkspaceId, + clearTime: boolean = true, + progress: (value: number) => Promise +): Promise { + const sourceConnection = (await connect(transactorUrl, sourceWorkspaceId, undefined, { + mode: 'backup' + })) as unknown as CoreClient & BackupClient + const targetConnection = (await connect(transactorUrl, targetWorkspaceId, undefined, { + mode: 'backup' + })) as unknown as CoreClient & BackupClient + try { + const domains = sourceConnection + .getHierarchy() + .domains() + .filter((it) => it !== DOMAIN_TRANSIENT && it !== DOMAIN_MODEL) + + let i = 0 + for (const c of domains) { + console.log('clone domain...', c) + + const changes: Snapshot = { + added: new Map(), + updated: new Map(), + removed: [] + } + + let idx: number | undefined + + // update digest tar + const needRetrieveChunks: Ref[][] = [] + + let processed = 0 + let st = Date.now() + // Load all digest from collection. + while (true) { + try { + const it = await sourceConnection.loadChunk(c, idx) + idx = it.idx + + let needRetrieve: Ref[] = [] + let needRetrieveSize = 0 + + for (const { id, hash, size } of it.docs) { + processed++ + if (Date.now() - st > 2500) { + console.log('processed', processed, Date.now() - st) + st = Date.now() + } + + changes.added.set(id as Ref, hash) + needRetrieve.push(id as Ref) + needRetrieveSize += size + + if (needRetrieveSize > retrieveChunkSize) { + needRetrieveChunks.push(needRetrieve) + needRetrieveSize = 0 + needRetrieve = [] + } + } + if (needRetrieve.length > 0) { + needRetrieveChunks.push(needRetrieve) + } + if (it.finished) { + await sourceConnection.closeChunk(idx) + break + } + } catch (err: any) { + console.error(err) + if (idx !== undefined) { + await sourceConnection.closeChunk(idx) + } + // Try again + idx = undefined + processed = 0 + } + } + while (needRetrieveChunks.length > 0) { + const needRetrieve = needRetrieveChunks.shift() as Ref[] + + console.log('Retrieve chunk:', needRetrieve.length) + let docs: Doc[] = [] + try { + // We need to clean target connection before copying something. + await cleanDomain(targetConnection, c) + + docs = await sourceConnection.loadDocs(c, needRetrieve) + if (clearTime) { + docs = docs.map((p) => { + let collectionCud = false + try { + collectionCud = sourceConnection.getHierarchy().isDerived(p._class, core.class.TxCollectionCUD) + } catch (err: any) { + console.log(err) + } + if (collectionCud) { + return { + ...p, + modifiedOn: Date.now(), + createdOn: Date.now(), + tx: { + ...(p as TxCollectionCUD).tx, + modifiedOn: Date.now(), + createdOn: Date.now() + } + } + } else { + return { + ...p, + modifiedOn: Date.now(), + createdOn: Date.now() + } + } + }) + } + await targetConnection.upload(c, docs) + } catch (err: any) { + console.log(err) + // Put back. + needRetrieveChunks.push(needRetrieve) + continue + } + } + + i++ + await progress((100 / domains.length) * i) + } + } catch (err: any) { + console.error(err) + } finally { + console.log('end clone') + await sourceConnection.close() + await targetConnection.close() + } +} + +async function cleanDomain (connection: CoreClient & BackupClient, domain: Domain): Promise { + // Load all digest from collection. + let idx: number | undefined + const ids: Ref[] = [] + while (true) { + try { + const it = await connection.loadChunk(domain, idx) + idx = it.idx + + ids.push(...it.docs.map((it) => it.id as Ref)) + if (it.finished) { + break + } + } catch (err: any) { + console.error(err) + if (idx !== undefined) { + await connection.closeChunk(idx) + } + } + } + while (ids.length > 0) { + const part = ids.splice(0, 5000) + await connection.clean(domain, part) + } +} + +/** + * @public + */ +export async function backup ( + ctx: MeasureContext, + transactorUrl: string, + workspaceId: WorkspaceId, + storage: BackupStorage, + skipDomains: string[] = [], + force: boolean = false, + timeout: number = -1 +): Promise { + ctx = ctx.newChild('backup', { workspaceId: workspaceId.name, force }) + const connection = await ctx.with( + 'connect', + {}, + async () => + (await connect(transactorUrl, workspaceId, undefined, { + mode: 'backup' + })) as unknown as CoreClient & BackupClient + ) + await ctx.info('starting backup', { workspace: workspaceId.name }) + + let canceled = false + let timer: any + + if (timeout > 0) { + timer = setTimeout(() => { + void ctx.error('Timeout during backup', { workspace: workspaceId.name, timeout: timeout / 1000 }) + canceled = true + }, timeout) + } + + try { + const domains = [ + ...connection + .getHierarchy() + .domains() + .filter((it) => it !== DOMAIN_TRANSIENT && it !== DOMAIN_MODEL && !skipDomains.includes(it)) + ] + await ctx.info('domains for dump', { domains: domains.length }) + + let backupInfo: BackupInfo = { + workspace: workspaceId.name, + productId: workspaceId.productId, + version: '0.6.2', + snapshots: [] + } + + // Version 0.6.2, format of digest file is changed to + + const infoFile = 'backup.json.gz' + + if (await storage.exists(infoFile)) { + backupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString()) + } + backupInfo.version = '0.6.2' + + backupInfo.workspace = workspaceId.name + backupInfo.productId = workspaceId.productId + + // Skip backup if there is no transaction changes. + const lastTx = await connection.findOne( + core.class.Tx, + {}, + { limit: 1, sort: { modifiedOn: SortingOrder.Descending } } + ) + if (lastTx !== undefined) { + if (lastTx._id === backupInfo.lastTxId && !force) { + await ctx.info('No transaction changes. Skipping backup.', { workspace: workspaceId.name }) + return + } + } + + backupInfo.lastTxId = '' // Clear until full backup will be complete + + const snapshot: BackupSnapshot = { + date: Date.now(), + domains: {} + } + + backupInfo.snapshots.push(snapshot) + let backupIndex = `${backupInfo.snapshotsIndex ?? backupInfo.snapshots.length}` + while (backupIndex.length < 6) { + backupIndex = '0' + backupIndex + } + + async function loadChangesFromServer ( + ctx: MeasureContext, + domain: Domain, + digest: Map, string>, + changes: Snapshot + ): Promise<{ changed: number, needRetrieveChunks: Ref[][] }> { + let idx: number | undefined + let processed = 0 + let st = Date.now() + let changed: number = 0 + const needRetrieveChunks: Ref[][] = [] + // Load all digest from collection. + while (true) { + try { + const currentChunk = await ctx.with('loadChunk', {}, async () => await connection.loadChunk(domain, idx)) + idx = currentChunk.idx + + let needRetrieve: Ref[] = [] + let currentNeedRetrieveSize = 0 + + for (const { id, hash, size } of currentChunk.docs) { + processed++ + if (Date.now() - st > 2500) { + await ctx.info('processed', { + processed, + digest: digest.size, + time: Date.now() - st, + workspace: workspaceId.name + }) + st = Date.now() + } + const kHash = digest.get(id as Ref) + if (kHash !== undefined) { + digest.delete(id as Ref) + if (kHash !== hash) { + changes.updated.set(id as Ref, hash) + needRetrieve.push(id as Ref) + currentNeedRetrieveSize += size + changed++ + } + } else { + changes.added.set(id as Ref, hash) + needRetrieve.push(id as Ref) + changed++ + currentNeedRetrieveSize += size + } + + if (currentNeedRetrieveSize > retrieveChunkSize) { + needRetrieveChunks.push(needRetrieve) + currentNeedRetrieveSize = 0 + needRetrieve = [] + } + } + if (needRetrieve.length > 0) { + needRetrieveChunks.push(needRetrieve) + } + if (currentChunk.finished) { + await ctx.with('closeChunk', {}, async () => { + await connection.closeChunk(idx as number) + }) + break + } + } catch (err: any) { + console.error(err) + await ctx.error('failed to load chunks', { error: err }) + if (idx !== undefined) { + await ctx.with('loadChunk', {}, async () => { + await connection.closeChunk(idx as number) + }) + } + // Try again + idx = undefined + processed = 0 + } + } + return { changed, needRetrieveChunks } + } + + async function processDomain (ctx: MeasureContext, domain: Domain): Promise { + const changes: Snapshot = { + added: new Map(), + updated: new Map(), + removed: [] + } + + const processedChanges: Snapshot = { + added: new Map(), + updated: new Map(), + removed: [] + } + + let stIndex = 0 + let snapshotIndex = 0 + const domainInfo: DomainData = { + snapshot: undefined, + snapshots: [], + storage: [], + added: 0, + updated: 0, + removed: 0 + } + + // Cumulative digest + const digest = await ctx.with( + 'load-digest', + {}, + async (ctx) => await loadDigest(ctx, storage, backupInfo.snapshots, domain) + ) + + let _pack: Pack | undefined + let addedDocuments = 0 + + let { changed, needRetrieveChunks } = await ctx.with( + 'load-chunks', + { domain }, + async (ctx) => await loadChangesFromServer(ctx, domain, digest, changes) + ) + + while (needRetrieveChunks.length > 0) { + if (canceled) { + return + } + const needRetrieve = needRetrieveChunks.shift() as Ref[] + + await ctx.info('Retrieve chunk', { + needRetrieve: needRetrieveChunks.reduce((v, docs) => v + docs.length, 0), + toLoad: needRetrieve.length, + workspace: workspaceId.name + }) + let docs: Doc[] = [] + try { + docs = await ctx.with('load-docs', {}, async (ctx) => await connection.loadDocs(domain, needRetrieve)) + } catch (err: any) { + await ctx.error('error loading docs', { domain, err, workspace: workspaceId.name }) + // Put back. + needRetrieveChunks.push(needRetrieve) + continue + } + + // Chunk data into small pieces + if (addedDocuments > dataBlobSize && _pack !== undefined) { + _pack.finalize() + _pack = undefined + addedDocuments = 0 + + if (changed > 0) { + snapshot.domains[domain] = domainInfo + domainInfo.added += processedChanges.added.size + domainInfo.updated += processedChanges.updated.size + domainInfo.removed += processedChanges.removed.length + + const snapshotFile = join(backupIndex, `${domain}-${snapshot.date}-${snapshotIndex}.snp.gz`) + snapshotIndex++ + domainInfo.snapshots = [...(domainInfo.snapshots ?? []), snapshotFile] + await writeChanges(storage, snapshotFile, processedChanges) + + processedChanges.added.clear() + processedChanges.removed = [] + processedChanges.updated.clear() + await storage.writeFile( + infoFile, + gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }) + ) + } + } + if (_pack === undefined) { + _pack = pack() + stIndex++ + const storageFile = join(backupIndex, `${domain}-data-${snapshot.date}-${stIndex}.tar.gz`) + await ctx.info('storing from domain', { domain, storageFile, workspace: workspaceId.name }) + domainInfo.storage = [...(domainInfo.storage ?? []), storageFile] + const dataStream = await storage.write(storageFile) + const storageZip = createGzip({ level: defaultLevel }) + + _pack.pipe(storageZip) + storageZip.pipe(dataStream) + } + + while (docs.length > 0) { + if (canceled) { + return + } + const d = docs.shift() + if (d === undefined) { + break + } + + // Move processed document to processedChanges + if (changes.added.has(d._id)) { + processedChanges.added.set(d._id, changes.added.get(d._id) ?? '') + changes.added.delete(d._id) + } else { + processedChanges.updated.set(d._id, changes.updated.get(d._id) ?? '') + changes.updated.delete(d._id) + } + if (d._class === core.class.BlobData) { + const blob = d as BlobData + const data = Buffer.from(blob.base64Data, 'base64') + blob.base64Data = '' + const descrJson = JSON.stringify(d) + addedDocuments += descrJson.length + addedDocuments += data.length + _pack.entry({ name: d._id + '.json' }, descrJson, function (err) { + if (err != null) throw err + }) + _pack.entry({ name: d._id }, data, function (err) { + if (err != null) throw err + }) + } else { + const data = JSON.stringify(d) + addedDocuments += data.length + _pack.entry({ name: d._id + '.json' }, data, function (err) { + if (err != null) throw err + }) + } + } + } + processedChanges.removed = Array.from(digest.keys()) + if (processedChanges.removed.length > 0) { + changed++ + } + + if (changed > 0) { + snapshot.domains[domain] = domainInfo + domainInfo.added += processedChanges.added.size + domainInfo.updated += processedChanges.updated.size + domainInfo.removed += processedChanges.removed.length + + const snapshotFile = join(backupIndex, `${domain}-${snapshot.date}-${snapshotIndex}.snp.gz`) + snapshotIndex++ + domainInfo.snapshots = [...(domainInfo.snapshots ?? []), snapshotFile] + await writeChanges(storage, snapshotFile, processedChanges) + + processedChanges.added.clear() + processedChanges.removed = [] + processedChanges.updated.clear() + _pack?.finalize() + // This will allow to retry in case of critical error. + await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })) + } + } + + for (const domain of domains) { + if (canceled) { + break + } + await ctx.info('dumping domain...', { workspace: workspaceId.name, domain }) + + await ctx.with('process-domain', { domain }, async (ctx) => { + await processDomain(ctx, domain) + }) + } + if (!canceled) { + backupInfo.snapshotsIndex = backupInfo.snapshots.length + backupInfo.lastTxId = lastTx?._id ?? '0' // We could store last tx, since full backup is complete + await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })) + } + } catch (err: any) { + await ctx.error('backup error', { err, workspace: workspaceId.name }) + } finally { + await ctx.info('end backup', { workspace: workspaceId.name }) + await connection.close() + ctx.end() + if (timeout !== -1) { + clearTimeout(timer) + } + } +} + +/** + * @public + */ +export async function backupList (storage: BackupStorage): Promise { + const infoFile = 'backup.json.gz' + + if (!(await storage.exists(infoFile))) { + throw new Error(`${infoFile} should present to restore`) + } + const backupInfo: BackupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString()) + console.log('workspace:', backupInfo.workspace ?? '', backupInfo.version) + for (const s of backupInfo.snapshots) { + console.log('snapshot: id:', s.date, ' date:', new Date(s.date)) + } +} + +/** + * @public + * Restore state of DB to specified point. + */ +export async function restore ( + ctx: MeasureContext, + transactorUrl: string, + workspaceId: WorkspaceId, + storage: BackupStorage, + date: number, + merge?: boolean +): Promise { + const infoFile = 'backup.json.gz' + + if (!(await storage.exists(infoFile))) { + throw new Error(`${infoFile} should present to restore`) + } + const backupInfo: BackupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString()) + let snapshots = backupInfo.snapshots + if (date !== -1) { + const bk = backupInfo.snapshots.findIndex((it) => it.date === date) + if (bk === -1) { + throw new Error(`${infoFile} could not restore to ${date}. Snapshot is missing.`) + } + snapshots = backupInfo.snapshots.slice(0, bk + 1) + } else { + date = snapshots[snapshots.length - 1].date + } + console.log('restore to ', date, new Date(date)) + const rsnapshots = Array.from(snapshots).reverse() + + // Collect all possible domains + const domains = new Set() + for (const s of snapshots) { + Object.keys(s.domains).forEach((it) => domains.add(it as Domain)) + } + + console.log('connecting:', transactorUrl, workspaceId.name) + const connection = (await connect(transactorUrl, workspaceId, undefined, { + mode: 'backup', + model: 'upgrade' + })) as unknown as CoreClient & BackupClient + console.log('connected') + + // We need to find empty domains and clean them. + const allDomains = connection.getHierarchy().domains() + for (const d of allDomains) { + domains.add(d) + } + + async function processDomain (c: Domain): Promise { + const changeset = await loadDigest(ctx, storage, snapshots, c, date) + // We need to load full changeset from server + const serverChangeset = new Map, string>() + + let idx: number | undefined + let loaded = 0 + let el = 0 + let chunks = 0 + try { + while (true) { + const st = Date.now() + const it = await connection.loadChunk(c, idx) + chunks++ + + idx = it.idx + el += Date.now() - st + + for (const { id, hash } of it.docs) { + serverChangeset.set(id as Ref, hash) + loaded++ + } + + if (el > 2500) { + console.log(' loaded from server', loaded, el, chunks) + el = 0 + chunks = 0 + } + if (it.finished) { + break + } + } + } finally { + if (idx !== undefined) { + await connection.closeChunk(idx) + } + } + console.log(' loaded', loaded) + console.log('\tcompare documents', changeset.size, serverChangeset.size) + + // Let's find difference + const docsToAdd = new Map( + Array.from(changeset.entries()).filter( + ([it]) => !serverChangeset.has(it) || (serverChangeset.has(it) && serverChangeset.get(it) !== changeset.get(it)) + ) + ) + const docsToRemove = Array.from(serverChangeset.keys()).filter((it) => !changeset.has(it)) + + const docs: Doc[] = [] + const blobs = new Map() + let sendSize = 0 + let totalSend = 0 + async function sendChunk (doc: Doc | undefined, len: number): Promise { + if (doc !== undefined) { + docsToAdd.delete(doc._id) + docs.push(doc) + } + sendSize = sendSize + len + if (sendSize > dataUploadSize || (doc === undefined && docs.length > 0)) { + console.log('upload', docs.length, `send: ${totalSend} from ${docsToAdd.size + totalSend}`, 'size:', sendSize) + totalSend += docs.length + await connection.upload(c, docs) + docs.length = 0 + sendSize = 0 + } + } + let processed = 0 + + for (const s of rsnapshots) { + const d = s.domains[c] + + if (d !== undefined && docsToAdd.size > 0) { + const sDigest = await loadDigest(ctx, storage, [s], c) + const requiredDocs = new Map(Array.from(sDigest.entries()).filter(([it]) => docsToAdd.has(it))) + if (requiredDocs.size > 0) { + console.log('updating', c, requiredDocs.size) + // We have required documents here. + for (const sf of d.storage ?? []) { + if (docsToAdd.size === 0) { + break + } + console.log('processing', sf, processed) + + const readStream = await storage.load(sf) + const ex = extract() + + ex.on('entry', (headers, stream, next) => { + const name = headers.name ?? '' + processed++ + // We found blob data + if (requiredDocs.has(name as Ref)) { + const chunks: Buffer[] = [] + stream.on('data', (chunk) => { + chunks.push(chunk) + }) + stream.on('end', () => { + const bf = Buffer.concat(chunks) + const d = blobs.get(name) + if (d === undefined) { + blobs.set(name, { doc: undefined, buffer: bf }) + next() + } else { + const d = blobs.get(name) + blobs.delete(name) + const doc = d?.doc as BlobData + doc.base64Data = bf.toString('base64') ?? '' + void sendChunk(doc, bf.length).finally(() => { + requiredDocs.delete(doc._id) + next() + }) + } + }) + } else if (name.endsWith('.json') && requiredDocs.has(name.substring(0, name.length - 5) as Ref)) { + const chunks: Buffer[] = [] + const bname = name.substring(0, name.length - 5) + stream.on('data', (chunk) => { + chunks.push(chunk) + }) + stream.on('end', () => { + const bf = Buffer.concat(chunks) + const doc = JSON.parse(bf.toString()) as Doc + if (doc._class === core.class.BlobData) { + const d = blobs.get(bname) + if (d === undefined) { + blobs.set(bname, { doc, buffer: undefined }) + next() + } else { + const d = blobs.get(bname) + blobs.delete(bname) + ;(doc as BlobData).base64Data = d?.buffer?.toString('base64') ?? '' + ;(doc as any)['%hash%'] = changeset.get(doc._id) + void sendChunk(doc, bf.length).finally(() => { + requiredDocs.delete(doc._id) + next() + }) + } + } else { + ;(doc as any)['%hash%'] = changeset.get(doc._id) + void sendChunk(doc, bf.length).finally(() => { + requiredDocs.delete(doc._id) + next() + }) + } + }) + } else { + next() + } + stream.resume() // just auto drain the stream + }) + + const endPromise = new Promise((resolve) => { + ex.on('finish', () => { + resolve(null) + }) + }) + const unzip = createGunzip({ level: defaultLevel }) + + readStream.on('end', () => { + readStream.destroy() + }) + readStream.pipe(unzip) + unzip.pipe(ex) + + await endPromise + } + } else { + console.log('domain had no changes', c) + } + } + } + + await sendChunk(undefined, 0) + if (docsToRemove.length > 0 && merge !== true) { + console.log('cleanup', docsToRemove.length) + while (docsToRemove.length > 0) { + const part = docsToRemove.splice(0, 10000) + await connection.clean(c, part) + } + } + } + + try { + for (const c of domains) { + console.log('processing domain', c) + let retry = 5 + let delay = 1 + while (retry > 0) { + retry-- + try { + await processDomain(c) + if (delay > 1) { + console.log('retry-success') + } + break + } catch (err: any) { + console.error('error', err) + if (retry !== 0) { + console.log('cool-down to retry', delay) + await new Promise((resolve) => setTimeout(resolve, delay * 1000)) + delay++ + } + } + } + } + } finally { + await connection.close() + } +} + +/** + * Compacting backup into just one snapshot. + * @public + */ +export async function compactBackup ( + ctx: MeasureContext, + storage: BackupStorage, + force: boolean = false +): Promise { + console.log('starting backup compaction') + try { + let backupInfo: BackupInfo + + // Version 0.6.2, format of digest file is changed to + + const infoFile = 'backup.json.gz' + + if (await storage.exists(infoFile)) { + backupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString()) + } else { + console.log('No backup found') + return + } + if (backupInfo.version !== '0.6.2') { + console.log('Invalid backup version') + return + } + + if (backupInfo.snapshots.length < 5 && !force) { + console.log('No need to compact, less 5 snapshots') + return + } + + const snapshot: BackupSnapshot = { + date: Date.now(), + domains: {} + } + + const oldSnapshots = [...backupInfo.snapshots] + + backupInfo.snapshots = [snapshot] + let backupIndex = `${backupInfo.snapshotsIndex ?? oldSnapshots.length}` + while (backupIndex.length < 6) { + backupIndex = '0' + backupIndex + } + + const domains: Domain[] = [] + for (const sn of oldSnapshots) { + for (const d of Object.keys(sn.domains)) { + if (!domains.includes(d as Domain)) { + domains.push(d as Domain) + } + } + } + + for (const domain of domains) { + console.log('compacting domain...', domain) + + const processedChanges: Snapshot = { + added: new Map(), + updated: new Map(), + removed: [] + } + + let changed = 0 + let stIndex = 0 + let snapshotIndex = 0 + const domainInfo: DomainData = { + snapshot: undefined, + snapshots: [], + storage: [], + added: 0, + updated: 0, + removed: 0 + } + + // Cumulative digest + const digest = await loadDigest(ctx, storage, oldSnapshots, domain) + const digestAdded = new Map, string>() + + const rsnapshots = Array.from(oldSnapshots).reverse() + + let _pack: Pack | undefined + let addedDocuments = 0 + + let processed = 0 + + const blobs = new Map() + + async function pushDocs (docs: Doc[], size: number): Promise { + addedDocuments += size + changed += docs.length + // Chunk data into small pieces + if (addedDocuments > dataBlobSize && _pack !== undefined) { + _pack.finalize() + _pack = undefined + addedDocuments = 0 + + if (changed > 0) { + snapshot.domains[domain] = domainInfo + domainInfo.added += processedChanges.added.size + domainInfo.updated += processedChanges.updated.size + domainInfo.removed += processedChanges.removed.length + + const snapshotFile = join(backupIndex, `${domain}-${snapshot.date}-${snapshotIndex}.snp.gz`) + snapshotIndex++ + domainInfo.snapshots = [...(domainInfo.snapshots ?? []), snapshotFile] + await writeChanges(storage, snapshotFile, processedChanges) + + processedChanges.added.clear() + processedChanges.removed = [] + processedChanges.updated.clear() + await storage.writeFile( + infoFile, + gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }) + ) + } + } + if (_pack === undefined) { + _pack = pack() + stIndex++ + const storageFile = join(backupIndex, `${domain}-data-${snapshot.date}-${stIndex}.tar.gz`) + console.log('storing from domain', domain, storageFile) + domainInfo.storage = [...(domainInfo.storage ?? []), storageFile] + const dataStream = await storage.write(storageFile) + const storageZip = createGzip({ level: defaultLevel }) + + _pack.pipe(storageZip) + storageZip.pipe(dataStream) + } + + while (docs.length > 0) { + const d = docs.shift() + if (d === undefined) { + break + } + + // Move processed document to processedChanges + processedChanges.added.set(d._id, digestAdded.get(d._id) ?? '') + + if (d._class === core.class.BlobData) { + const blob = d as BlobData + const data = Buffer.from(blob.base64Data, 'base64') + blob.base64Data = '' + const descrJson = JSON.stringify(d) + addedDocuments += descrJson.length + addedDocuments += data.length + _pack.entry({ name: d._id + '.json' }, descrJson, function (err) { + if (err != null) throw err + }) + _pack.entry({ name: d._id }, data, function (err) { + if (err != null) throw err + }) + } else { + const data = JSON.stringify(d) + addedDocuments += data.length + _pack.entry({ name: d._id + '.json' }, data, function (err) { + if (err != null) throw err + }) + } + } + } + async function sendChunk (doc: Doc | undefined, len: number): Promise { + if (doc !== undefined) { + const hash = digest.get(doc._id) + digest.delete(doc._id) + digestAdded.set(doc._id, hash ?? '') + await pushDocs([doc], len) + } + } + + for (const s of rsnapshots) { + const d = s.domains[domain] + + if (d !== undefined && digest.size > 0) { + const sDigest = await loadDigest(ctx, storage, [s], domain) + const requiredDocs = new Map(Array.from(sDigest.entries()).filter(([it]) => digest.has(it))) + if (requiredDocs.size > 0) { + console.log('updating', domain, requiredDocs.size) + // We have required documents here. + for (const sf of d.storage ?? []) { + if (digest.size === 0) { + break + } + console.log('processing', sf, processed) + + const readStream = await storage.load(sf) + const ex = extract() + + ex.on('entry', (headers, stream, next) => { + const name = headers.name ?? '' + processed++ + // We found blob data + if (requiredDocs.has(name as Ref)) { + const chunks: Buffer[] = [] + stream.on('data', (chunk) => { + chunks.push(chunk) + }) + stream.on('end', () => { + const bf = Buffer.concat(chunks) + const d = blobs.get(name) + if (d === undefined) { + blobs.set(name, { doc: undefined, buffer: bf }) + next() + } else { + const d = blobs.get(name) + blobs.delete(name) + const doc = d?.doc as BlobData + doc.base64Data = bf.toString('base64') ?? '' + void sendChunk(doc, bf.length).finally(() => { + requiredDocs.delete(doc._id) + next() + }) + } + }) + } else if (name.endsWith('.json') && requiredDocs.has(name.substring(0, name.length - 5) as Ref)) { + const chunks: Buffer[] = [] + const bname = name.substring(0, name.length - 5) + stream.on('data', (chunk) => { + chunks.push(chunk) + }) + stream.on('end', () => { + const bf = Buffer.concat(chunks) + const doc = JSON.parse(bf.toString()) as Doc + if (doc._class === core.class.BlobData) { + const d = blobs.get(bname) + if (d === undefined) { + blobs.set(bname, { doc, buffer: undefined }) + next() + } else { + const d = blobs.get(bname) + blobs.delete(bname) + ;(doc as BlobData).base64Data = d?.buffer?.toString('base64') ?? '' + ;(doc as any)['%hash%'] = digest.get(doc._id) + void sendChunk(doc, bf.length).finally(() => { + requiredDocs.delete(doc._id) + next() + }) + } + } else { + ;(doc as any)['%hash%'] = digest.get(doc._id) + void sendChunk(doc, bf.length).finally(() => { + requiredDocs.delete(doc._id) + next() + }) + } + }) + } else { + next() + } + stream.resume() // just auto drain the stream + }) + + const endPromise = new Promise((resolve) => { + ex.on('finish', () => { + resolve(null) + }) + }) + const unzip = createGunzip({ level: defaultLevel }) + + readStream.on('end', () => { + readStream.destroy() + }) + readStream.pipe(unzip) + unzip.pipe(ex) + + await endPromise + } + } else { + console.log('domain had no changes', domain) + } + } + } + + if (changed > 0) { + snapshot.domains[domain] = domainInfo + domainInfo.added += processedChanges.added.size + domainInfo.updated += processedChanges.updated.size + domainInfo.removed += processedChanges.removed.length + + const snapshotFile = join(backupIndex, `${domain}-${snapshot.date}-${snapshotIndex}.snp.gz`) + snapshotIndex++ + domainInfo.snapshots = [...(domainInfo.snapshots ?? []), snapshotFile] + await writeChanges(storage, snapshotFile, processedChanges) + + processedChanges.added.clear() + processedChanges.removed = [] + processedChanges.updated.clear() + _pack?.finalize() + // This will allow to retry in case of critical error. + await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })) + } + } + + // We could get rid of all old snapshot files. + for (const s of oldSnapshots) { + for (const [, dta] of Object.entries(s.domains)) { + for (const sf of dta.storage ?? []) { + console.log('removing', sf) + await storage.delete(sf) + } + for (const sf of dta.snapshots ?? []) { + console.log('removing', sf) + await storage.delete(sf) + } + if (dta.snapshot !== undefined) { + await storage.delete(dta.snapshot) + } + } + } + + backupInfo.snapshotsIndex = backupInfo.snapshots.length + await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })) + } catch (err: any) { + console.error(err) + } finally { + console.log('end compacting') + } +} + +export * from './service' diff --git a/server/backup/src/index.ts b/server/backup/src/index.ts index 5cba896145..b61a2c25e0 100644 --- a/server/backup/src/index.ts +++ b/server/backup/src/index.ts @@ -1,6 +1,5 @@ // -// Copyright © 2020, 2021 Anticrm Platform Contributors. -// Copyright © 2021 Hardcore Engineering Inc. +// Copyright © 2024 Hardcore Engineering Inc. // // Licensed under the Eclipse Public License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. You may @@ -13,1232 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. // - -import core, { - AttachedDoc, - BackupClient, - BlobData, - Client as CoreClient, - Doc, - Domain, - DOMAIN_MODEL, - DOMAIN_TRANSIENT, - Ref, - SortingOrder, - TxCollectionCUD, - WorkspaceId -} from '@hcengineering/core' -import { connect } from '@hcengineering/server-tool' -import { createGzip } from 'node:zlib' -import { join } from 'path' -import { Writable } from 'stream' -import { extract, Pack, pack } from 'tar-stream' -import { createGunzip, gunzipSync, gzipSync } from 'zlib' -import { BackupStorage } from './storage' -export * from './storage' - -const dataBlobSize = 50 * 1024 * 1024 -const dataUploadSize = 2 * 1024 * 1024 -const retrieveChunkSize = 2 * 1024 * 1024 - -const defaultLevel = 9 - -/** - * @public - */ -export interface Snapshot { - added: Map, string> - updated: Map, string> - removed: Ref[] -} - -/** - * @public - */ -export interface SnapshotV6 { - added: Record, string> - updated: Record, string> - removed: Ref[] -} - -/** - * @public - */ -export interface DomainData { - snapshot?: string // 0.6 json snapshot - snapshots?: string[] - storage?: string[] - - // Some statistics - added: number - updated: number - removed: number -} - -/** - * @public - */ -export interface BackupSnapshot { - // _id => hash of added items. - domains: Record - date: number -} - -/** - * @public - */ -export interface BackupInfo { - workspace: string - version: string - productId: string - snapshots: BackupSnapshot[] - snapshotsIndex?: number - lastTxId?: string -} - -async function loadDigest ( - storage: BackupStorage, - snapshots: BackupSnapshot[], - domain: Domain, - date?: number -): Promise, string>> { - const result = new Map, string>() - for (const s of snapshots) { - const d = s.domains[domain] - - // Load old JSON snapshot - if (d?.snapshot !== undefined) { - const dChanges: SnapshotV6 = JSON.parse(gunzipSync(await storage.loadFile(d.snapshot)).toString()) - for (const [k, v] of Object.entries(dChanges.added)) { - result.set(k as Ref, v) - } - for (const [k, v] of Object.entries(dChanges.updated)) { - result.set(k as Ref, v) - } - for (const d of dChanges.removed) { - result.delete(d) - } - } - for (const snapshot of d?.snapshots ?? []) { - try { - const dataBlob = gunzipSync(await storage.loadFile(snapshot)) - .toString() - .split('\n') - console.log('loaded', snapshot, dataBlob.length) - - const addedCount = parseInt(dataBlob.shift() ?? '0') - const added = dataBlob.splice(0, addedCount) - for (const it of added) { - const [k, v] = it.split(';') - result.set(k as Ref, v) - } - - const updatedCount = parseInt(dataBlob.shift() ?? '0') - const updated = dataBlob.splice(0, updatedCount) - for (const it of updated) { - const [k, v] = it.split(';') - result.set(k as Ref, v) - } - - const removedCount = parseInt(dataBlob.shift() ?? '0') - const removed = dataBlob.splice(0, removedCount) - for (const k of removed) { - result.delete(k as Ref) - } - } catch (err: any) { - console.log('digest is broken, will do full backup for', domain) - } - } - // Stop if stop date is matched and provided - if (date !== undefined && date === s.date) { - break - } - } - return result -} - -async function write (chunk: any, stream: Writable): Promise { - let needDrain = false - await new Promise((resolve, reject) => { - needDrain = !stream.write(chunk, (err) => { - if (err != null) { - reject(err) - } else { - resolve(null) - } - }) - }) - if (needDrain) { - await new Promise((resolve, reject) => stream.once('drain', resolve)) - } -} - -async function writeChanges (storage: BackupStorage, snapshot: string, changes: Snapshot): Promise { - const snapshotWritable = await storage.write(snapshot) - const writable = createGzip({ level: defaultLevel }) - writable.pipe(snapshotWritable) - - // Write size - await write(`${changes.added.size}\n`, writable) - for (const [k, v] of changes.added.entries()) { - await write(`${k};${v}\n`, writable) - } - await write(`${changes.updated.size}\n`, writable) - for (const [k, v] of changes.updated.entries()) { - await write(`${k};${v}\n`, writable) - } - await write(`${changes.removed.length}\n`, writable) - for (const k of changes.removed) { - await write(`${k}\n`, writable) - } - writable.end() - await new Promise((resolve) => { - writable.flush(() => { - resolve(null) - }) - }) -} - -/** - * @public - */ -export async function cloneWorkspace ( - transactorUrl: string, - sourceWorkspaceId: WorkspaceId, - targetWorkspaceId: WorkspaceId, - clearTime: boolean = true, - progress: (value: number) => Promise -): Promise { - const sourceConnection = (await connect(transactorUrl, sourceWorkspaceId, undefined, { - mode: 'backup' - })) as unknown as CoreClient & BackupClient - const targetConnection = (await connect(transactorUrl, targetWorkspaceId, undefined, { - mode: 'backup' - })) as unknown as CoreClient & BackupClient - try { - const domains = sourceConnection - .getHierarchy() - .domains() - .filter((it) => it !== DOMAIN_TRANSIENT && it !== DOMAIN_MODEL) - - let i = 0 - for (const c of domains) { - console.log('clone domain...', c) - - const changes: Snapshot = { - added: new Map(), - updated: new Map(), - removed: [] - } - - let idx: number | undefined - - // update digest tar - const needRetrieveChunks: Ref[][] = [] - - let processed = 0 - let st = Date.now() - // Load all digest from collection. - while (true) { - try { - const it = await sourceConnection.loadChunk(c, idx) - idx = it.idx - - let needRetrieve: Ref[] = [] - let needRetrieveSize = 0 - - for (const { id, hash, size } of it.docs) { - processed++ - if (Date.now() - st > 2500) { - console.log('processed', processed, Date.now() - st) - st = Date.now() - } - - changes.added.set(id as Ref, hash) - needRetrieve.push(id as Ref) - needRetrieveSize += size - - if (needRetrieveSize > retrieveChunkSize) { - needRetrieveChunks.push(needRetrieve) - needRetrieveSize = 0 - needRetrieve = [] - } - } - if (needRetrieve.length > 0) { - needRetrieveChunks.push(needRetrieve) - } - if (it.finished) { - await sourceConnection.closeChunk(idx) - break - } - } catch (err: any) { - console.error(err) - if (idx !== undefined) { - await sourceConnection.closeChunk(idx) - } - // Try again - idx = undefined - processed = 0 - } - } - while (needRetrieveChunks.length > 0) { - const needRetrieve = needRetrieveChunks.shift() as Ref[] - - console.log('Retrieve chunk:', needRetrieve.length) - let docs: Doc[] = [] - try { - // We need to clean target connection before copying something. - await cleanDomain(targetConnection, c) - - docs = await sourceConnection.loadDocs(c, needRetrieve) - if (clearTime) { - docs = docs.map((p) => { - let collectionCud = false - try { - collectionCud = sourceConnection.getHierarchy().isDerived(p._class, core.class.TxCollectionCUD) - } catch (err: any) { - console.log(err) - } - if (collectionCud) { - return { - ...p, - modifiedOn: Date.now(), - createdOn: Date.now(), - tx: { - ...(p as TxCollectionCUD).tx, - modifiedOn: Date.now(), - createdOn: Date.now() - } - } - } else { - return { - ...p, - modifiedOn: Date.now(), - createdOn: Date.now() - } - } - }) - } - await targetConnection.upload(c, docs) - } catch (err: any) { - console.log(err) - // Put back. - needRetrieveChunks.push(needRetrieve) - continue - } - } - - i++ - await progress((100 / domains.length) * i) - } - } catch (err: any) { - console.error(err) - } finally { - console.log('end clone') - await sourceConnection.close() - await targetConnection.close() - } -} - -async function cleanDomain (connection: CoreClient & BackupClient, domain: Domain): Promise { - // Load all digest from collection. - let idx: number | undefined - const ids: Ref[] = [] - while (true) { - try { - const it = await connection.loadChunk(domain, idx) - idx = it.idx - - ids.push(...it.docs.map((it) => it.id as Ref)) - if (it.finished) { - break - } - } catch (err: any) { - console.error(err) - if (idx !== undefined) { - await connection.closeChunk(idx) - } - } - } - while (ids.length > 0) { - const part = ids.splice(0, 5000) - await connection.clean(domain, part) - } -} - -/** - * @public - */ -export async function backup ( - transactorUrl: string, - workspaceId: WorkspaceId, - storage: BackupStorage, - skipDomains: string[] = [], - force: boolean = false -): Promise { - const connection = (await connect(transactorUrl, workspaceId, undefined, { - mode: 'backup' - })) as unknown as CoreClient & BackupClient - console.log('starting backup') - try { - const domains = [ - ...connection - .getHierarchy() - .domains() - .filter((it) => it !== DOMAIN_TRANSIENT && it !== DOMAIN_MODEL && !skipDomains.includes(it)) - ] - console.log('domains for dump', domains.length) - - let backupInfo: BackupInfo = { - workspace: workspaceId.name, - productId: workspaceId.productId, - version: '0.6.2', - snapshots: [] - } - - // Version 0.6.2, format of digest file is changed to - - const infoFile = 'backup.json.gz' - - if (await storage.exists(infoFile)) { - backupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString()) - } - backupInfo.version = '0.6.2' - - backupInfo.workspace = workspaceId.name - backupInfo.productId = workspaceId.productId - - // Skip backup if there is no transaction changes. - const lastTx = await connection.findOne( - core.class.Tx, - {}, - { limit: 1, sort: { modifiedOn: SortingOrder.Descending } } - ) - if (lastTx !== undefined) { - if (lastTx._id === backupInfo.lastTxId && !force) { - console.log('No transaction changes. Skipping backup.') - return - } - } - - backupInfo.lastTxId = '' // Clear until full backup will be complete - - const snapshot: BackupSnapshot = { - date: Date.now(), - domains: {} - } - - backupInfo.snapshots.push(snapshot) - let backupIndex = `${backupInfo.snapshotsIndex ?? backupInfo.snapshots.length}` - while (backupIndex.length < 6) { - backupIndex = '0' + backupIndex - } - - for (const domain of domains) { - console.log('dumping domain...', domain) - - const changes: Snapshot = { - added: new Map(), - updated: new Map(), - removed: [] - } - - const processedChanges: Snapshot = { - added: new Map(), - updated: new Map(), - removed: [] - } - - let changed = 0 - let stIndex = 0 - let snapshotIndex = 0 - const domainInfo: DomainData = { - snapshot: undefined, - snapshots: [], - storage: [], - added: 0, - updated: 0, - removed: 0 - } - - // Cumulative digest - const digest = await loadDigest(storage, backupInfo.snapshots, domain) - - let idx: number | undefined - - let _pack: Pack | undefined - let addedDocuments = 0 - - // update digest tar - const needRetrieveChunks: Ref[][] = [] - - let processed = 0 - let st = Date.now() - // Load all digest from collection. - while (true) { - try { - const currentChunk = await connection.loadChunk(domain, idx) - idx = currentChunk.idx - - let needRetrieve: Ref[] = [] - let currentNeedRetrieveSize = 0 - - for (const { id, hash, size } of currentChunk.docs) { - processed++ - if (Date.now() - st > 2500) { - console.log('processed', processed, digest.size, Date.now() - st) - st = Date.now() - } - const kHash = digest.get(id as Ref) - if (kHash !== undefined) { - digest.delete(id as Ref) - if (kHash !== hash) { - changes.updated.set(id as Ref, hash) - needRetrieve.push(id as Ref) - currentNeedRetrieveSize += size - changed++ - } - } else { - changes.added.set(id as Ref, hash) - needRetrieve.push(id as Ref) - changed++ - currentNeedRetrieveSize += size - } - - if (currentNeedRetrieveSize > retrieveChunkSize) { - needRetrieveChunks.push(needRetrieve) - currentNeedRetrieveSize = 0 - needRetrieve = [] - } - } - if (needRetrieve.length > 0) { - needRetrieveChunks.push(needRetrieve) - } - if (currentChunk.finished) { - await connection.closeChunk(idx) - break - } - } catch (err: any) { - console.error(err) - if (idx !== undefined) { - await connection.closeChunk(idx) - } - // Try again - idx = undefined - processed = 0 - } - } - while (needRetrieveChunks.length > 0) { - const needRetrieve = needRetrieveChunks.shift() as Ref[] - - console.log('Retrieve chunk:', needRetrieve.length) - let docs: Doc[] = [] - try { - docs = await connection.loadDocs(domain, needRetrieve) - } catch (err: any) { - console.log(err) - // Put back. - needRetrieveChunks.push(needRetrieve) - continue - } - - // Chunk data into small pieces - if (addedDocuments > dataBlobSize && _pack !== undefined) { - _pack.finalize() - _pack = undefined - addedDocuments = 0 - - if (changed > 0) { - snapshot.domains[domain] = domainInfo - domainInfo.added += processedChanges.added.size - domainInfo.updated += processedChanges.updated.size - domainInfo.removed += processedChanges.removed.length - - const snapshotFile = join(backupIndex, `${domain}-${snapshot.date}-${snapshotIndex}.snp.gz`) - snapshotIndex++ - domainInfo.snapshots = [...(domainInfo.snapshots ?? []), snapshotFile] - await writeChanges(storage, snapshotFile, processedChanges) - - processedChanges.added.clear() - processedChanges.removed = [] - processedChanges.updated.clear() - await storage.writeFile( - infoFile, - gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }) - ) - } - } - if (_pack === undefined) { - _pack = pack() - stIndex++ - const storageFile = join(backupIndex, `${domain}-data-${snapshot.date}-${stIndex}.tar.gz`) - console.log('storing from domain', domain, storageFile) - domainInfo.storage = [...(domainInfo.storage ?? []), storageFile] - const dataStream = await storage.write(storageFile) - const storageZip = createGzip({ level: defaultLevel }) - - _pack.pipe(storageZip) - storageZip.pipe(dataStream) - } - - while (docs.length > 0) { - const d = docs.shift() - if (d === undefined) { - break - } - - // Move processed document to processedChanges - if (changes.added.has(d._id)) { - processedChanges.added.set(d._id, changes.added.get(d._id) ?? '') - changes.added.delete(d._id) - } else { - processedChanges.updated.set(d._id, changes.updated.get(d._id) ?? '') - changes.updated.delete(d._id) - } - if (d._class === core.class.BlobData) { - const blob = d as BlobData - const data = Buffer.from(blob.base64Data, 'base64') - blob.base64Data = '' - const descrJson = JSON.stringify(d) - addedDocuments += descrJson.length - addedDocuments += data.length - _pack.entry({ name: d._id + '.json' }, descrJson, function (err) { - if (err != null) throw err - }) - _pack.entry({ name: d._id }, data, function (err) { - if (err != null) throw err - }) - } else { - const data = JSON.stringify(d) - addedDocuments += data.length - _pack.entry({ name: d._id + '.json' }, data, function (err) { - if (err != null) throw err - }) - } - } - } - processedChanges.removed = Array.from(digest.keys()) - if (processedChanges.removed.length > 0) { - changed++ - } - - if (changed > 0) { - snapshot.domains[domain] = domainInfo - domainInfo.added += processedChanges.added.size - domainInfo.updated += processedChanges.updated.size - domainInfo.removed += processedChanges.removed.length - - const snapshotFile = join(backupIndex, `${domain}-${snapshot.date}-${snapshotIndex}.snp.gz`) - snapshotIndex++ - domainInfo.snapshots = [...(domainInfo.snapshots ?? []), snapshotFile] - await writeChanges(storage, snapshotFile, processedChanges) - - processedChanges.added.clear() - processedChanges.removed = [] - processedChanges.updated.clear() - _pack?.finalize() - // This will allow to retry in case of critical error. - await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })) - } - } - - backupInfo.snapshotsIndex = backupInfo.snapshots.length - backupInfo.lastTxId = lastTx?._id ?? '0' // We could store last tx, since full backup is complete - await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })) - } catch (err: any) { - console.error(err) - } finally { - console.log('end backup') - await connection.close() - } -} - -/** - * @public - */ -export async function backupList (storage: BackupStorage): Promise { - const infoFile = 'backup.json.gz' - - if (!(await storage.exists(infoFile))) { - throw new Error(`${infoFile} should present to restore`) - } - const backupInfo: BackupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString()) - console.log('workspace:', backupInfo.workspace ?? '', backupInfo.version) - for (const s of backupInfo.snapshots) { - console.log('snapshot: id:', s.date, ' date:', new Date(s.date)) - } -} - -/** - * @public - * Restore state of DB to specified point. - */ -export async function restore ( - transactorUrl: string, - workspaceId: WorkspaceId, - storage: BackupStorage, - date: number, - merge?: boolean -): Promise { - const infoFile = 'backup.json.gz' - - if (!(await storage.exists(infoFile))) { - throw new Error(`${infoFile} should present to restore`) - } - const backupInfo: BackupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString()) - let snapshots = backupInfo.snapshots - if (date !== -1) { - const bk = backupInfo.snapshots.findIndex((it) => it.date === date) - if (bk === -1) { - throw new Error(`${infoFile} could not restore to ${date}. Snapshot is missing.`) - } - snapshots = backupInfo.snapshots.slice(0, bk + 1) - } else { - date = snapshots[snapshots.length - 1].date - } - console.log('restore to ', date, new Date(date)) - const rsnapshots = Array.from(snapshots).reverse() - - // Collect all possible domains - const domains = new Set() - for (const s of snapshots) { - Object.keys(s.domains).forEach((it) => domains.add(it as Domain)) - } - - console.log('connecting:', transactorUrl, workspaceId.name) - const connection = (await connect(transactorUrl, workspaceId, undefined, { - mode: 'backup', - model: 'upgrade' - })) as unknown as CoreClient & BackupClient - console.log('connected') - - // We need to find empty domains and clean them. - const allDomains = connection.getHierarchy().domains() - for (const d of allDomains) { - domains.add(d) - } - - async function processDomain (c: Domain): Promise { - const changeset = await loadDigest(storage, snapshots, c, date) - // We need to load full changeset from server - const serverChangeset = new Map, string>() - - let idx: number | undefined - let loaded = 0 - let el = 0 - let chunks = 0 - try { - while (true) { - const st = Date.now() - const it = await connection.loadChunk(c, idx) - chunks++ - - idx = it.idx - el += Date.now() - st - - for (const { id, hash } of it.docs) { - serverChangeset.set(id as Ref, hash) - loaded++ - } - - if (el > 2500) { - console.log(' loaded from server', loaded, el, chunks) - el = 0 - chunks = 0 - } - if (it.finished) { - break - } - } - } finally { - if (idx !== undefined) { - await connection.closeChunk(idx) - } - } - console.log(' loaded', loaded) - console.log('\tcompare documents', changeset.size, serverChangeset.size) - - // Let's find difference - const docsToAdd = new Map( - Array.from(changeset.entries()).filter( - ([it]) => !serverChangeset.has(it) || (serverChangeset.has(it) && serverChangeset.get(it) !== changeset.get(it)) - ) - ) - const docsToRemove = Array.from(serverChangeset.keys()).filter((it) => !changeset.has(it)) - - const docs: Doc[] = [] - const blobs = new Map() - let sendSize = 0 - let totalSend = 0 - async function sendChunk (doc: Doc | undefined, len: number): Promise { - if (doc !== undefined) { - docsToAdd.delete(doc._id) - docs.push(doc) - } - sendSize = sendSize + len - if (sendSize > dataUploadSize || (doc === undefined && docs.length > 0)) { - console.log('upload', docs.length, `send: ${totalSend} from ${docsToAdd.size + totalSend}`, 'size:', sendSize) - totalSend += docs.length - await connection.upload(c, docs) - docs.length = 0 - sendSize = 0 - } - } - let processed = 0 - - for (const s of rsnapshots) { - const d = s.domains[c] - - if (d !== undefined && docsToAdd.size > 0) { - const sDigest = await loadDigest(storage, [s], c) - const requiredDocs = new Map(Array.from(sDigest.entries()).filter(([it]) => docsToAdd.has(it))) - if (requiredDocs.size > 0) { - console.log('updating', c, requiredDocs.size) - // We have required documents here. - for (const sf of d.storage ?? []) { - if (docsToAdd.size === 0) { - break - } - console.log('processing', sf, processed) - - const readStream = await storage.load(sf) - const ex = extract() - - ex.on('entry', (headers, stream, next) => { - const name = headers.name ?? '' - processed++ - // We found blob data - if (requiredDocs.has(name as Ref)) { - const chunks: Buffer[] = [] - stream.on('data', (chunk) => { - chunks.push(chunk) - }) - stream.on('end', () => { - const bf = Buffer.concat(chunks) - const d = blobs.get(name) - if (d === undefined) { - blobs.set(name, { doc: undefined, buffer: bf }) - next() - } else { - const d = blobs.get(name) - blobs.delete(name) - const doc = d?.doc as BlobData - doc.base64Data = bf.toString('base64') ?? '' - void sendChunk(doc, bf.length).finally(() => { - requiredDocs.delete(doc._id) - next() - }) - } - }) - } else if (name.endsWith('.json') && requiredDocs.has(name.substring(0, name.length - 5) as Ref)) { - const chunks: Buffer[] = [] - const bname = name.substring(0, name.length - 5) - stream.on('data', (chunk) => { - chunks.push(chunk) - }) - stream.on('end', () => { - const bf = Buffer.concat(chunks) - const doc = JSON.parse(bf.toString()) as Doc - if (doc._class === core.class.BlobData) { - const d = blobs.get(bname) - if (d === undefined) { - blobs.set(bname, { doc, buffer: undefined }) - next() - } else { - const d = blobs.get(bname) - blobs.delete(bname) - ;(doc as BlobData).base64Data = d?.buffer?.toString('base64') ?? '' - ;(doc as any)['%hash%'] = changeset.get(doc._id) - void sendChunk(doc, bf.length).finally(() => { - requiredDocs.delete(doc._id) - next() - }) - } - } else { - ;(doc as any)['%hash%'] = changeset.get(doc._id) - void sendChunk(doc, bf.length).finally(() => { - requiredDocs.delete(doc._id) - next() - }) - } - }) - } else { - next() - } - stream.resume() // just auto drain the stream - }) - - const endPromise = new Promise((resolve) => { - ex.on('finish', () => { - resolve(null) - }) - }) - const unzip = createGunzip({ level: defaultLevel }) - - readStream.on('end', () => { - readStream.destroy() - }) - readStream.pipe(unzip) - unzip.pipe(ex) - - await endPromise - } - } else { - console.log('domain had no changes', c) - } - } - } - - await sendChunk(undefined, 0) - if (docsToRemove.length > 0 && merge !== true) { - console.log('cleanup', docsToRemove.length) - while (docsToRemove.length > 0) { - const part = docsToRemove.splice(0, 10000) - await connection.clean(c, part) - } - } - } - - try { - for (const c of domains) { - console.log('processing domain', c) - let retry = 5 - let delay = 1 - while (retry > 0) { - retry-- - try { - await processDomain(c) - if (delay > 1) { - console.log('retry-success') - } - break - } catch (err: any) { - console.error('error', err) - if (retry !== 0) { - console.log('cool-down to retry', delay) - await new Promise((resolve) => setTimeout(resolve, delay * 1000)) - delay++ - } - } - } - } - } finally { - await connection.close() - } -} - -/** - * Compacting backup into just one snapshot. - * @public - */ -export async function compactBackup (storage: BackupStorage, force: boolean = false): Promise { - console.log('starting backup compaction') - try { - let backupInfo: BackupInfo - - // Version 0.6.2, format of digest file is changed to - - const infoFile = 'backup.json.gz' - - if (await storage.exists(infoFile)) { - backupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString()) - } else { - console.log('No backup found') - return - } - if (backupInfo.version !== '0.6.2') { - console.log('Invalid backup version') - return - } - - if (backupInfo.snapshots.length < 5 && !force) { - console.log('No need to compact, less 5 snapshots') - return - } - - const snapshot: BackupSnapshot = { - date: Date.now(), - domains: {} - } - - const oldSnapshots = [...backupInfo.snapshots] - - backupInfo.snapshots = [snapshot] - let backupIndex = `${backupInfo.snapshotsIndex ?? oldSnapshots.length}` - while (backupIndex.length < 6) { - backupIndex = '0' + backupIndex - } - - const domains: Domain[] = [] - for (const sn of oldSnapshots) { - for (const d of Object.keys(sn.domains)) { - if (!domains.includes(d as Domain)) { - domains.push(d as Domain) - } - } - } - - for (const domain of domains) { - console.log('compacting domain...', domain) - - const processedChanges: Snapshot = { - added: new Map(), - updated: new Map(), - removed: [] - } - - let changed = 0 - let stIndex = 0 - let snapshotIndex = 0 - const domainInfo: DomainData = { - snapshot: undefined, - snapshots: [], - storage: [], - added: 0, - updated: 0, - removed: 0 - } - - // Cumulative digest - const digest = await loadDigest(storage, oldSnapshots, domain) - const digestAdded = new Map, string>() - - const rsnapshots = Array.from(oldSnapshots).reverse() - - let _pack: Pack | undefined - let addedDocuments = 0 - - let processed = 0 - - const blobs = new Map() - - async function pushDocs (docs: Doc[], size: number): Promise { - addedDocuments += size - changed += docs.length - // Chunk data into small pieces - if (addedDocuments > dataBlobSize && _pack !== undefined) { - _pack.finalize() - _pack = undefined - addedDocuments = 0 - - if (changed > 0) { - snapshot.domains[domain] = domainInfo - domainInfo.added += processedChanges.added.size - domainInfo.updated += processedChanges.updated.size - domainInfo.removed += processedChanges.removed.length - - const snapshotFile = join(backupIndex, `${domain}-${snapshot.date}-${snapshotIndex}.snp.gz`) - snapshotIndex++ - domainInfo.snapshots = [...(domainInfo.snapshots ?? []), snapshotFile] - await writeChanges(storage, snapshotFile, processedChanges) - - processedChanges.added.clear() - processedChanges.removed = [] - processedChanges.updated.clear() - await storage.writeFile( - infoFile, - gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }) - ) - } - } - if (_pack === undefined) { - _pack = pack() - stIndex++ - const storageFile = join(backupIndex, `${domain}-data-${snapshot.date}-${stIndex}.tar.gz`) - console.log('storing from domain', domain, storageFile) - domainInfo.storage = [...(domainInfo.storage ?? []), storageFile] - const dataStream = await storage.write(storageFile) - const storageZip = createGzip({ level: defaultLevel }) - - _pack.pipe(storageZip) - storageZip.pipe(dataStream) - } - - while (docs.length > 0) { - const d = docs.shift() - if (d === undefined) { - break - } - - // Move processed document to processedChanges - processedChanges.added.set(d._id, digestAdded.get(d._id) ?? '') - - if (d._class === core.class.BlobData) { - const blob = d as BlobData - const data = Buffer.from(blob.base64Data, 'base64') - blob.base64Data = '' - const descrJson = JSON.stringify(d) - addedDocuments += descrJson.length - addedDocuments += data.length - _pack.entry({ name: d._id + '.json' }, descrJson, function (err) { - if (err != null) throw err - }) - _pack.entry({ name: d._id }, data, function (err) { - if (err != null) throw err - }) - } else { - const data = JSON.stringify(d) - addedDocuments += data.length - _pack.entry({ name: d._id + '.json' }, data, function (err) { - if (err != null) throw err - }) - } - } - } - async function sendChunk (doc: Doc | undefined, len: number): Promise { - if (doc !== undefined) { - const hash = digest.get(doc._id) - digest.delete(doc._id) - digestAdded.set(doc._id, hash ?? '') - await pushDocs([doc], len) - } - } - - for (const s of rsnapshots) { - const d = s.domains[domain] - - if (d !== undefined && digest.size > 0) { - const sDigest = await loadDigest(storage, [s], domain) - const requiredDocs = new Map(Array.from(sDigest.entries()).filter(([it]) => digest.has(it))) - if (requiredDocs.size > 0) { - console.log('updating', domain, requiredDocs.size) - // We have required documents here. - for (const sf of d.storage ?? []) { - if (digest.size === 0) { - break - } - console.log('processing', sf, processed) - - const readStream = await storage.load(sf) - const ex = extract() - - ex.on('entry', (headers, stream, next) => { - const name = headers.name ?? '' - processed++ - // We found blob data - if (requiredDocs.has(name as Ref)) { - const chunks: Buffer[] = [] - stream.on('data', (chunk) => { - chunks.push(chunk) - }) - stream.on('end', () => { - const bf = Buffer.concat(chunks) - const d = blobs.get(name) - if (d === undefined) { - blobs.set(name, { doc: undefined, buffer: bf }) - next() - } else { - const d = blobs.get(name) - blobs.delete(name) - const doc = d?.doc as BlobData - doc.base64Data = bf.toString('base64') ?? '' - void sendChunk(doc, bf.length).finally(() => { - requiredDocs.delete(doc._id) - next() - }) - } - }) - } else if (name.endsWith('.json') && requiredDocs.has(name.substring(0, name.length - 5) as Ref)) { - const chunks: Buffer[] = [] - const bname = name.substring(0, name.length - 5) - stream.on('data', (chunk) => { - chunks.push(chunk) - }) - stream.on('end', () => { - const bf = Buffer.concat(chunks) - const doc = JSON.parse(bf.toString()) as Doc - if (doc._class === core.class.BlobData) { - const d = blobs.get(bname) - if (d === undefined) { - blobs.set(bname, { doc, buffer: undefined }) - next() - } else { - const d = blobs.get(bname) - blobs.delete(bname) - ;(doc as BlobData).base64Data = d?.buffer?.toString('base64') ?? '' - ;(doc as any)['%hash%'] = digest.get(doc._id) - void sendChunk(doc, bf.length).finally(() => { - requiredDocs.delete(doc._id) - next() - }) - } - } else { - ;(doc as any)['%hash%'] = digest.get(doc._id) - void sendChunk(doc, bf.length).finally(() => { - requiredDocs.delete(doc._id) - next() - }) - } - }) - } else { - next() - } - stream.resume() // just auto drain the stream - }) - - const endPromise = new Promise((resolve) => { - ex.on('finish', () => { - resolve(null) - }) - }) - const unzip = createGunzip({ level: defaultLevel }) - - readStream.on('end', () => { - readStream.destroy() - }) - readStream.pipe(unzip) - unzip.pipe(ex) - - await endPromise - } - } else { - console.log('domain had no changes', domain) - } - } - } - - if (changed > 0) { - snapshot.domains[domain] = domainInfo - domainInfo.added += processedChanges.added.size - domainInfo.updated += processedChanges.updated.size - domainInfo.removed += processedChanges.removed.length - - const snapshotFile = join(backupIndex, `${domain}-${snapshot.date}-${snapshotIndex}.snp.gz`) - snapshotIndex++ - domainInfo.snapshots = [...(domainInfo.snapshots ?? []), snapshotFile] - await writeChanges(storage, snapshotFile, processedChanges) - - processedChanges.added.clear() - processedChanges.removed = [] - processedChanges.updated.clear() - _pack?.finalize() - // This will allow to retry in case of critical error. - await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })) - } - } - - // We could get rid of all old snapshot files. - for (const s of oldSnapshots) { - for (const [, dta] of Object.entries(s.domains)) { - for (const sf of dta.storage ?? []) { - console.log('removing', sf) - await storage.delete(sf) - } - for (const sf of dta.snapshots ?? []) { - console.log('removing', sf) - await storage.delete(sf) - } - if (dta.snapshot !== undefined) { - await storage.delete(dta.snapshot) - } - } - } - - backupInfo.snapshotsIndex = backupInfo.snapshots.length - await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })) - } catch (err: any) { - console.error(err) - } finally { - console.log('end compacting') - } -} +export * from './backup' +export * from './service' diff --git a/server/backup/src/service.ts b/server/backup/src/service.ts new file mode 100644 index 0000000000..97e2d9d909 --- /dev/null +++ b/server/backup/src/service.ts @@ -0,0 +1,115 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { BaseWorkspaceInfo, getWorkspaceId, type MeasureContext } from '@hcengineering/core' +import { type StorageAdapter } from '@hcengineering/server-core' +import { backup } from '.' +import { createStorageBackupStorage } from './storage' + +async function getWorkspaces (accounts: string, token: string): Promise { + const workspaces = await ( + await fetch(accounts, { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ + method: 'listWorkspaces', + params: [token] + }) + }) + ).json() + + return (workspaces.result as BaseWorkspaceInfo[]) ?? [] +} + +export interface BackupConfig { + TransactorURL: string + AccountsURL: string + Token: string + + Interval: number // Timeout in seconds + Timeout: number // Timeout in seconds + BucketName: string +} + +class BackupWorker { + constructor ( + readonly storageAdapter: StorageAdapter, + readonly config: BackupConfig + ) {} + + canceled = false + interval: any + + async close (): Promise { + this.canceled = true + clearTimeout(this.interval) + } + + async schedule (ctx: MeasureContext): Promise { + console.log('schedule timeout for', this.config.Interval, ' seconds') + this.interval = setTimeout(() => { + void this.backup(ctx).then(() => { + void this.schedule(ctx) + }) + }, this.config.Interval * 1000) + } + + async backup (ctx: MeasureContext): Promise { + const workspaces = await getWorkspaces(this.config.AccountsURL, this.config.Token) + workspaces.sort((a, b) => b.lastVisit - a.lastVisit) + for (const ws of workspaces) { + if (this.canceled) { + return + } + await ctx.info('\n\nBACKUP WORKSPACE ', { workspace: ws.workspace, productId: ws.productId }) + try { + const storage = await createStorageBackupStorage( + ctx, + this.storageAdapter, + getWorkspaceId(this.config.BucketName, ws.productId), + ws.workspace + ) + await ctx.with('backup', { workspace: ws.workspace }, async (ctx) => { + await backup( + ctx, + this.config.TransactorURL, + getWorkspaceId(ws.workspace, ws.productId), + storage, + [], + false, + this.config.Timeout * 1000 + ) + }) + } catch (err: any) { + await ctx.error('\n\nFAILED to BACKUP', { workspace: ws.workspace, err }) + } + } + } +} + +export function backupService (ctx: MeasureContext, storage: StorageAdapter, config: BackupConfig): () => void { + const backupWorker = new BackupWorker(storage, config) + + const shutdown = (): void => { + void backupWorker.close() + } + + void backupWorker.backup(ctx).then(() => { + void backupWorker.schedule(ctx) + }) + return shutdown +} diff --git a/server/ws/src/server.ts b/server/ws/src/server.ts index 5a18f85079..48a448f15b 100644 --- a/server/ws/src/server.ts +++ b/server/ws/src/server.ts @@ -20,6 +20,8 @@ import core, { generateId, systemAccountEmail, toWorkspaceString, + versionToString, + type BaseWorkspaceInfo, type MeasureContext, type Ref, type Space, @@ -43,10 +45,7 @@ import { type Workspace } from './types' -interface WorkspaceLoginInfo { - workspaceName?: string // A company name - workspace: string -} +interface WorkspaceLoginInfo extends BaseWorkspaceInfo {} function timeoutPromise (time: number): Promise { return new Promise((resolve) => { @@ -75,6 +74,8 @@ class TSessionManager implements SessionManager { maintenanceTimer: any timeMinutes = 0 + modelVersion = process.env.MODEL_VERSION ?? '' + constructor ( readonly ctx: MeasureContext, readonly sessionFactory: (token: Token, pipeline: Pipeline, broadcast: BroadcastCall) => Session, @@ -179,15 +180,7 @@ class TSessionManager implements SessionManager { return this.sessionFactory(token, pipeline, this.broadcast.bind(this)) } - async getWorkspaceInfo ( - accounts: string, - token: string - ): Promise<{ - workspace: string - workspaceUrl?: string | null - workspaceName?: string - creating?: boolean - }> { + async getWorkspaceInfo (accounts: string, token: string): Promise { const userInfo = await ( await fetch(accounts, { method: 'POST', @@ -230,10 +223,25 @@ class TSessionManager implements SessionManager { if (workspaceInfo === undefined && token.extra?.admin !== 'true') { // No access to workspace for token. return { error: new Error(`No access to workspace for token ${token.email} ${token.workspace.name}`) } - } else { + } else if (workspaceInfo === undefined) { workspaceInfo = this.wsFromToken(token) } + if ( + this.modelVersion !== '' && + workspaceInfo.version !== undefined && + this.modelVersion !== versionToString(workspaceInfo.version) && + token.extra?.model !== 'upgrade' && + token.extra?.mode !== 'backup' + ) { + await ctx.info('model version mismatch', { + version: this.modelVersion, + workspaceVersion: versionToString(workspaceInfo.version) + }) + // Version mismatch, return upgrading. + return { upgrade: true } + } + let workspace = this.workspaces.get(wsString) if (workspace?.closeTimeout !== undefined) { await ctx.info('Cancel workspace warm close', { wsString }) @@ -311,16 +319,18 @@ class TSessionManager implements SessionManager { }) } - private wsFromToken (token: Token): { - workspace: string - workspaceUrl?: string | null - workspaceName?: string - creating?: boolean - } { + private wsFromToken (token: Token): BaseWorkspaceInfo { return { workspace: token.workspace.name, workspaceUrl: token.workspace.name, - workspaceName: token.workspace.name + workspaceName: token.workspace.name, + createdBy: '', + createdOn: Date.now(), + lastVisit: Date.now(), + productId: '', + createProgress: 100, + creating: false, + disabled: false } } diff --git a/server/ws/src/server_http.ts b/server/ws/src/server_http.ts index 7538158e2e..02d233f966 100644 --- a/server/ws/src/server_http.ts +++ b/server/ws/src/server_http.ts @@ -13,6 +13,7 @@ // limitations under the License. // +import { Analytics } from '@hcengineering/analytics' import { generateId, type MeasureContext } from '@hcengineering/core' import { UNAUTHORIZED } from '@hcengineering/platform' import { serialize, type Response } from '@hcengineering/rpc' @@ -30,7 +31,6 @@ import { type PipelineFactory, type SessionManager } from './types' -import { Analytics } from '@hcengineering/analytics' /** * @public @@ -223,7 +223,11 @@ export function startHttpServer ( if ('error' in session) { void ctx.error('error', { error: session.error?.message, stack: session.error?.stack }) } - cs.close() + await cs.send(ctx, { id: -1, result: 'upgrading' }, false, false) + // Wait 1 second before closing the connection + setTimeout(() => { + cs.close() + }, 1000) return } // eslint-disable-next-line @typescript-eslint/no-misused-promises