UBERF-8226: Fix backup service OOM (#6683)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-09-23 16:05:38 +07:00 committed by GitHub
parent 678da06af4
commit e72766a6b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 316 additions and 117 deletions

11
.vscode/launch.json vendored
View File

@ -293,16 +293,17 @@
"args": ["src/index.ts"],
"env": {
"ACCOUNTS_URL": "http://localhost:3000",
"MINIO_ENDPOINT": "localhost",
"MINIO_ACCESS_KEY": "minioadmin",
"MINIO_SECRET_KEY": "minioadmin",
"STORAGE": "minio|localhost?accessKey=minioadmin&secretKey=minioadmin",
"WORKSPACE_STORAGE": "minio|localhost?accessKey=minioadmin&secretKey=minioadmin",
"MONGO_URL": "mongodb://localhost:27017",
"SECRET": "secret",
"BUCKET_NAME":"test_backups",
"INTERVAL":"30"
"BUCKET_NAME":"backups",
"INTERVAL":"30",
},
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],
"showAsyncStacks": true,
"sourceMaps": true,
"outputCapture": "std",
"cwd": "${workspaceRoot}/pods/backup",
"protocol": "inspector"
},

View File

@ -55,7 +55,7 @@ async function doBackup (dirName: string, token: string, endpoint: string, works
progress: (value: number): void => {
notify('backup', value)
},
connection: client
getConnection: async () => client
})
} finally {
await client.close()

View File

@ -71,8 +71,6 @@ export class SplitLogger implements MeasureLogger {
)
})
)
} else {
console.log('Logging only in files')
}
this.logger.info(
'####################################################################################################################'

View File

@ -14,16 +14,16 @@
//
import { Analytics } from '@hcengineering/analytics'
import { configureAnalytics, SplitLogger } from '@hcengineering/analytics-service'
import { startBackup } from '@hcengineering/backup-service'
import { MeasureMetricsContext, metricsToString, newMetrics } from '@hcengineering/core'
import { DummyDbAdapter, DummyFullTextAdapter, type PipelineFactory } from '@hcengineering/server-core'
import { createServerPipeline } from '@hcengineering/server-pipeline'
import { configureAnalytics, SplitLogger } from '@hcengineering/analytics-service'
import { type PipelineFactory } from '@hcengineering/server-core'
import { createBackupPipeline, getConfig } from '@hcengineering/server-pipeline'
import { writeFile } from 'fs/promises'
import { join } from 'path'
const metricsContext = new MeasureMetricsContext(
'github',
'backup',
{},
{},
newMetrics(),
@ -55,35 +55,26 @@ const onClose = (): void => {
metricsContext.info('Closed')
}
startBackup(metricsContext, (mongoUrl, storageAdapter) => {
const factory: PipelineFactory = createServerPipeline(
metricsContext,
mongoUrl,
{
startBackup(
metricsContext,
(mongoUrl, storageAdapter) => {
const factory: PipelineFactory = createBackupPipeline(metricsContext, mongoUrl, {
externalStorage: storageAdapter,
usePassedCtx: true
})
return factory
},
(ctx, dbUrls, workspace, branding, externalStorage) => {
return getConfig(ctx, dbUrls, workspace, branding, ctx, {
externalStorage,
fullTextUrl: '',
indexParallel: 0,
indexProcessing: 0,
disableTriggers: true,
rekoniUrl: '',
usePassedCtx: true
},
{
adapters: {
FullTextBlob: {
factory: async () => new DummyDbAdapter(),
url: ''
}
},
fulltextAdapter: {
factory: async () => new DummyFullTextAdapter(),
stages: () => [],
url: ''
}
}
)
return factory
})
disableTriggers: true
})
}
)
process.on('SIGINT', onClose)
process.on('SIGTERM', onClose)

View File

@ -13,30 +13,39 @@
// limitations under the License.
//
import { MeasureContext, systemAccountEmail } from '@hcengineering/core'
import { MeasureContext, systemAccountEmail, type Branding, type WorkspaceIdWithUrl } from '@hcengineering/core'
import { setMetadata } from '@hcengineering/platform'
import { backupService } from '@hcengineering/server-backup'
import serverClientPlugin from '@hcengineering/server-client'
import { type PipelineFactory, type StorageAdapter } from '@hcengineering/server-core'
import { type DbConfiguration, type PipelineFactory, type StorageAdapter } from '@hcengineering/server-core'
import { buildStorageFromConfig, createStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import serverToken, { generateToken } from '@hcengineering/server-token'
import config from './config'
export function startBackup (
ctx: MeasureContext,
pipelineFactoryFactory: (mongoUrl: string, storage: StorageAdapter) => PipelineFactory
pipelineFactoryFactory: (mongoUrl: string, storage: StorageAdapter) => PipelineFactory,
getConfig: (
ctx: MeasureContext,
dbUrls: string,
workspace: WorkspaceIdWithUrl,
branding: Branding | null,
externalStorage: StorageAdapter
) => DbConfiguration
): void {
setMetadata(serverToken.metadata.Secret, config.Secret)
setMetadata(serverClientPlugin.metadata.Endpoint, config.AccountsURL)
setMetadata(serverClientPlugin.metadata.UserAgent, config.ServiceID)
const [mainDbUrl, rawDbUrl] = config.MongoURL.split(';')
const backupStorageConfig = storageConfigFromEnv(config.Storage)
const workspaceStorageConfig = storageConfigFromEnv(config.WorkspaceStorage)
const storageAdapter = createStorageFromConfig(backupStorageConfig.storages[0])
const workspaceStorageAdapter = buildStorageFromConfig(workspaceStorageConfig, config.MongoURL)
const workspaceStorageAdapter = buildStorageFromConfig(workspaceStorageConfig, rawDbUrl ?? mainDbUrl)
const pipelineFactory = pipelineFactoryFactory(config.MongoURL, workspaceStorageAdapter)
const pipelineFactory = pipelineFactoryFactory(mainDbUrl, workspaceStorageAdapter)
// A token to access account service
const token = generateToken(systemAccountEmail, { name: 'backup' })
@ -46,7 +55,10 @@ export function startBackup (
storageAdapter,
{ ...config, Token: token },
pipelineFactory,
workspaceStorageAdapter
workspaceStorageAdapter,
(ctx, workspace, branding, externalStorage) => {
return getConfig(ctx, mainDbUrl, workspace, branding, externalStorage)
}
)
process.on('SIGINT', shutdown)

View File

@ -35,7 +35,8 @@ import core, {
TxCollectionCUD,
WorkspaceId,
type Blob,
type DocIndexState
type DocIndexState,
type Tx
} from '@hcengineering/core'
import { BlobClient, createClient } from '@hcengineering/server-client'
import { fullTextPushStagePrefix, type StorageAdapter } from '@hcengineering/server-core'
@ -514,7 +515,8 @@ export async function backup (
connectTimeout: number
skipBlobContentTypes: string[]
blobDownloadLimit: number
connection?: CoreClient & BackupClient
getLastTx?: () => Promise<Tx | undefined>
getConnection?: () => Promise<CoreClient & BackupClient>
storageAdapter?: StorageAdapter
// Return true in case
isCanceled?: () => boolean
@ -529,7 +531,7 @@ export async function backup (
skipBlobContentTypes: [],
blobDownloadLimit: 15
}
): Promise<void> {
): Promise<boolean> {
ctx = ctx.newChild('backup', {
workspaceId: workspaceId.name,
force: options.force,
@ -555,36 +557,11 @@ export async function backup (
}, options.timeout)
}
const token =
options.token ??
generateToken(systemAccountEmail, workspaceId, {
mode: 'backup'
})
const connection =
options.connection ??
((await createClient(transactorUrl, options.token ?? token, undefined, options.connectTimeout)) as CoreClient &
BackupClient)
const blobClient = new BlobClient(transactorUrl, token, workspaceId, { storageAdapter: options.storageAdapter })
ctx.info('starting backup', { workspace: workspaceId.name })
const st = Date.now()
let connection!: CoreClient & BackupClient
let printEnd = true
try {
const domains = [
...connection
.getHierarchy()
.domains()
.filter(
(it) =>
it !== DOMAIN_TRANSIENT &&
it !== DOMAIN_MODEL &&
it !== ('fulltext-blob' as Domain) &&
!options.skipDomains.includes(it) &&
(options.include === undefined || options.include.has(it))
)
]
ctx.info('domains for dump', { domains: domains.length })
let backupInfo: BackupInfo = {
workspace: workspaceId.name,
version: '0.6.2',
@ -602,19 +579,70 @@ export async function backup (
backupInfo.workspace = workspaceId.name
let lastTx: Tx | undefined
let lastTxChecked = false
// Skip backup if there is no transaction changes.
const lastTx = await connection.findOne(
core.class.Tx,
{},
{ limit: 1, sort: { modifiedOn: SortingOrder.Descending } }
)
if (lastTx !== undefined) {
if (lastTx._id === backupInfo.lastTxId && !options.force) {
ctx.info('No transaction changes. Skipping backup.', { workspace: workspaceId.name })
return
if (options.getLastTx !== undefined) {
lastTx = await options.getLastTx()
if (lastTx !== undefined) {
if (lastTx._id === backupInfo.lastTxId && !options.force) {
printEnd = false
ctx.info('No transaction changes. Skipping backup.', { workspace: workspaceId.name })
return false
}
}
lastTxChecked = true
}
const token =
options.token ??
generateToken(systemAccountEmail, workspaceId, {
mode: 'backup'
})
ctx.warn('starting backup', { workspace: workspaceId.name })
connection =
options.getConnection !== undefined
? await options.getConnection()
: ((await createClient(
transactorUrl,
options.token ?? token,
undefined,
options.connectTimeout
)) as CoreClient & BackupClient)
if (!lastTxChecked) {
lastTx = await connection.findOne(core.class.Tx, {}, { limit: 1, sort: { modifiedOn: SortingOrder.Descending } })
if (lastTx !== undefined) {
if (lastTx._id === backupInfo.lastTxId && !options.force) {
ctx.info('No transaction changes. Skipping backup.', { workspace: workspaceId.name })
if (options.getConnection === undefined) {
await connection.close()
}
return false
}
}
}
const blobClient = new BlobClient(transactorUrl, token, workspaceId, { storageAdapter: options.storageAdapter })
const domains = [
...connection
.getHierarchy()
.domains()
.filter(
(it) =>
it !== DOMAIN_TRANSIENT &&
it !== DOMAIN_MODEL &&
it !== ('fulltext-blob' as Domain) &&
!options.skipDomains.includes(it) &&
(options.include === undefined || options.include.has(it))
)
]
ctx.info('domains for dump', { domains: domains.length })
backupInfo.lastTxId = '' // Clear until full backup will be complete
const snapshot: BackupSnapshot = {
@ -1006,11 +1034,15 @@ export async function backup (
backupInfo.lastTxId = lastTx?._id ?? '0' // We could store last tx, since full backup is complete
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }))
}
return true
} catch (err: any) {
ctx.error('backup error', { err, workspace: workspaceId.name })
return false
} finally {
ctx.info('end backup', { workspace: workspaceId.name })
if (options.connection === undefined) {
if (printEnd) {
ctx.info('end backup', { workspace: workspaceId.name, totalTime: Date.now() - st })
}
if (options.getConnection === undefined && connection !== undefined) {
await connection.close()
}
ctx.end()

View File

@ -15,11 +15,17 @@
import {
BaseWorkspaceInfo,
DOMAIN_TX,
getWorkspaceId,
Hierarchy,
ModelDb,
SortingOrder,
systemAccountEmail,
type BackupClient,
type Branding,
type Client,
type MeasureContext,
type Tx,
type WorkspaceIdWithUrl
} from '@hcengineering/core'
import { PlatformError, unknownError } from '@hcengineering/platform'
@ -27,6 +33,7 @@ import { listAccountWorkspaces } from '@hcengineering/server-client'
import {
BackupClientOps,
SessionDataImpl,
type DbConfiguration,
type Pipeline,
type PipelineFactory,
type StorageAdapter
@ -48,7 +55,13 @@ class BackupWorker {
readonly storageAdapter: StorageAdapter,
readonly config: BackupConfig,
readonly pipelineFactory: PipelineFactory,
readonly workspaceStorageAdapter: StorageAdapter
readonly workspaceStorageAdapter: StorageAdapter,
readonly getConfig: (
ctx: MeasureContext,
workspace: WorkspaceIdWithUrl,
branding: Branding | null,
externalStorage: StorageAdapter
) => DbConfiguration
) {}
canceled = false
@ -61,11 +74,26 @@ class BackupWorker {
backupPromise: Promise<void> | undefined
printStats (
ctx: MeasureContext,
stats: { failedWorkspaces: BaseWorkspaceInfo[], processed: number, skipped: number }
): void {
ctx.warn(
`****************************************
backup statistics:`,
{
backuped: stats.processed,
notChanges: stats.skipped,
failed: stats.failedWorkspaces.length
}
)
}
async triggerBackup (ctx: MeasureContext): Promise<void> {
const failed = await this.backup(ctx)
if (failed.length > 0) {
ctx.info('Failed to backup workspaces, Retry failed workspaces once.', { failed: failed.length })
await this.doBackup(ctx, failed)
const { failedWorkspaces } = await this.backup(ctx)
if (failedWorkspaces.length > 0) {
ctx.info('Failed to backup workspaces, Retry failed workspaces once.', { failed: failedWorkspaces.length })
this.printStats(ctx, await this.doBackup(ctx, failedWorkspaces))
}
}
@ -81,7 +109,9 @@ class BackupWorker {
}, this.config.Interval * 1000)
}
async backup (ctx: MeasureContext): Promise<BaseWorkspaceInfo[]> {
async backup (
ctx: MeasureContext
): Promise<{ failedWorkspaces: BaseWorkspaceInfo[], processed: number, skipped: number }> {
const workspacesIgnore = new Set(this.config.SkipWorkspaces.split(';'))
const workspaces = (await listAccountWorkspaces(this.config.Token)).filter((it) => {
return !workspacesIgnore.has(it.workspace)
@ -90,16 +120,20 @@ class BackupWorker {
return await this.doBackup(ctx, workspaces)
}
async doBackup (rootCtx: MeasureContext, workspaces: BaseWorkspaceInfo[]): Promise<BaseWorkspaceInfo[]> {
async doBackup (
rootCtx: MeasureContext,
workspaces: BaseWorkspaceInfo[]
): Promise<{ failedWorkspaces: BaseWorkspaceInfo[], processed: number, skipped: number }> {
let index = 0
const failedWorkspaces: BaseWorkspaceInfo[] = []
let processed = 0
for (const ws of workspaces) {
if (this.canceled) {
return failedWorkspaces
return { failedWorkspaces, processed, skipped: workspaces.length - processed }
}
index++
rootCtx.info('\n\nBACKUP WORKSPACE ', {
rootCtx.warn('\n\nBACKUP WORKSPACE ', {
workspace: ws.workspace,
index,
total: workspaces.length
@ -109,6 +143,7 @@ class BackupWorker {
enableConsole: 'true'
})
const ctx = rootCtx.newChild(ws.workspace, { workspace: ws.workspace }, {}, childLogger)
let pipeline: Pipeline | undefined
try {
const storage = await createStorageBackupStorage(
ctx,
@ -121,28 +156,67 @@ class BackupWorker {
workspaceName: ws.workspaceName ?? '',
workspaceUrl: ws.workspaceUrl ?? ''
}
const pipeline = await this.pipelineFactory(ctx, wsUrl, true, () => {}, null)
processed += (await ctx.with(
'backup',
{ workspace: ws.workspace },
async (ctx) =>
await backup(ctx, '', getWorkspaceId(ws.workspace), storage, {
skipDomains: [],
force: false,
recheck: false,
timeout: this.config.Timeout * 1000,
connectTimeout: 5 * 60 * 1000, // 5 minutes to,
blobDownloadLimit: 100,
skipBlobContentTypes: [],
storageAdapter: this.workspaceStorageAdapter,
getLastTx: async (): Promise<Tx | undefined> => {
const config = this.getConfig(ctx, wsUrl, null, this.workspaceStorageAdapter)
const adapterConf = config.adapters[config.domains[DOMAIN_TX]]
const hierarchy = new Hierarchy()
const modelDb = new ModelDb(hierarchy)
const txAdapter = await adapterConf.factory(
ctx,
hierarchy,
adapterConf.url,
wsUrl,
modelDb,
this.workspaceStorageAdapter
)
try {
await txAdapter.init?.()
await ctx.with('backup', { workspace: ws.workspace }, async (ctx) => {
await backup(ctx, '', getWorkspaceId(ws.workspace), storage, {
skipDomains: [],
force: false,
recheck: false,
timeout: this.config.Timeout * 1000,
connectTimeout: 5 * 60 * 1000, // 5 minutes to,
blobDownloadLimit: 100,
skipBlobContentTypes: [],
storageAdapter: this.workspaceStorageAdapter,
connection: this.wrapPipeline(ctx, pipeline, wsUrl)
})
})
return (
await txAdapter.rawFindAll<Tx>(
DOMAIN_TX,
{},
{ limit: 1, sort: { modifiedOn: SortingOrder.Descending } }
)
).shift()
} finally {
await txAdapter.close()
}
},
getConnection: async () => {
if (pipeline === undefined) {
pipeline = await this.pipelineFactory(ctx, wsUrl, true, () => {}, null)
}
return this.wrapPipeline(ctx, pipeline, wsUrl)
}
})
))
? 1
: 0
} catch (err: any) {
rootCtx.error('\n\nFAILED to BACKUP', { workspace: ws.workspace, err })
failedWorkspaces.push(ws)
await childLogger?.close()
} finally {
if (pipeline !== undefined) {
await pipeline.close()
}
}
}
return failedWorkspaces
return { failedWorkspaces, processed, skipped: workspaces.length - processed }
}
wrapPipeline (ctx: MeasureContext, pipeline: Pipeline, wsUrl: WorkspaceIdWithUrl): Client & BackupClient {
@ -213,15 +287,22 @@ export function backupService (
storage: StorageAdapter,
config: BackupConfig,
pipelineFactory: PipelineFactory,
workspaceStorageAdapter: StorageAdapter
workspaceStorageAdapter: StorageAdapter,
getConfig: (
ctx: MeasureContext,
workspace: WorkspaceIdWithUrl,
branding: Branding | null,
externalStorage: StorageAdapter
) => DbConfiguration
): () => void {
const backupWorker = new BackupWorker(storage, config, pipelineFactory, workspaceStorageAdapter)
const backupWorker = new BackupWorker(storage, config, pipelineFactory, workspaceStorageAdapter, getConfig)
const shutdown = (): void => {
void backupWorker.close()
}
void backupWorker.backup(ctx).then(() => {
void backupWorker.backup(ctx).then((res) => {
backupWorker.printStats(ctx, res)
void backupWorker.schedule(ctx)
})
return shutdown

View File

@ -82,7 +82,7 @@ class AdapterStorage implements BackupStorage {
async exists (name: string): Promise<boolean> {
try {
return (await this.client.stat(this.ctx, this.workspaceId, join(this.root, name))) !== undefined
} catch (err) {
} catch (err: any) {
return false
}
}

View File

@ -286,6 +286,15 @@ export class MinioService implements StorageAdapter {
version: result.versionId ?? null
}
} catch (err: any) {
if (
err?.code === 'NoSuchKey' ||
err?.code === 'NotFound' ||
err?.message === 'No such key' ||
err?.Code === 'NoSuchKey'
) {
// Do not print error in this case
return
}
ctx.error('no object found', { error: err, objectName, workspaceId: workspaceId.name })
}
}

View File

@ -49,6 +49,7 @@ import {
createBenchmarkAdapter,
createInMemoryAdapter,
createPipeline,
DummyDbAdapter,
DummyFullTextAdapter,
FullTextMiddleware,
type AggregatorStorageAdapter,
@ -154,6 +155,74 @@ export function createServerPipeline (
}
}
/**
* @public
*/
export function createBackupPipeline (
metrics: MeasureContext,
dbUrls: string,
opt: {
usePassedCtx?: boolean
adapterSecurity?: boolean
externalStorage: StorageAdapter
}
): PipelineFactory {
return (ctx, workspace, upgrade, broadcast, branding) => {
const metricsCtx = opt.usePassedCtx === true ? ctx : metrics
const wsMetrics = metricsCtx.newChild('🧲 backup', {})
const conf = getConfig(
metrics,
dbUrls,
workspace,
branding,
wsMetrics,
{
...opt,
fullTextUrl: '',
indexParallel: 0,
indexProcessing: 0,
rekoniUrl: '',
disableTriggers: true
},
{
adapters: {
FullTextBlob: {
factory: async () => new DummyDbAdapter(),
url: ''
}
},
fulltextAdapter: {
factory: async () => new DummyFullTextAdapter(),
stages: () => [],
url: ''
}
}
)
const middlewares: MiddlewareCreator[] = [
LowLevelMiddleware.create,
ContextNameMiddleware.create,
DomainFindMiddleware.create,
DBAdapterInitMiddleware.create,
ModelMiddleware.create,
DBAdapterMiddleware.create(conf)
]
const hierarchy = new Hierarchy()
const modelDb = new ModelDb(hierarchy)
const context: PipelineContext = {
workspace,
branding,
modelDb,
hierarchy,
storageAdapter: opt.externalStorage
}
return createPipeline(ctx, middlewares, context)
}
}
export async function getServerPipeline (
ctx: MeasureContext,
mongodbUri: string,
@ -211,7 +280,7 @@ export async function getServerPipeline (
}
}
function getConfig (
export function getConfig (
metrics: MeasureContext,
dbUrls: string,
workspace: WorkspaceIdWithUrl,

View File

@ -865,16 +865,22 @@ class TSessionManager implements SessionManager {
request: Request<any>,
workspace: string // wsId, toWorkspaceString()
): void {
const userCtx = requestCtx.newChild('📞 client', {
workspace: '🧲 ' + workspace
})
const backupMode = service.getMode() === 'backup'
const userCtx = requestCtx.newChild(
'📞 client',
!backupMode
? {
workspace: '🧲 ' + workspace
}
: {}
)
// Calculate total number of clients
const reqId = generateId()
const st = Date.now()
try {
const backupMode = 'loadChunk' in service
void userCtx.with(`🧭 ${backupMode ? 'handleBackup' : 'handleRequest'}`, {}, async (ctx) => {
if (request.time != null) {
const delta = Date.now() - request.time