UBERF-8510: Fix OOM in backup service (#6973)

This commit is contained in:
Andrey Sobolev 2024-10-18 12:12:07 +07:00 committed by GitHub
parent 49f608bc98
commit f189e417c2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 74 additions and 27 deletions

View File

@ -19,8 +19,8 @@
"docker:tbuild": "docker build -t hardcoreeng/tool . --platform=linux/amd64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/tool",
"docker:staging": "../../common/scripts/docker_tag.sh hardcoreeng/tool staging",
"docker:push": "../../common/scripts/docker_tag.sh hardcoreeng/tool",
"run-local": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3333 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost MONGO_URL=mongodb://localhost:27017 DB_URL=mongodb://localhost:27017 TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) node --max-old-space-size=18000 ./bundle/bundle.js",
"run-local-pg": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3333 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost MONGO_URL=mongodb://localhost:27017 DB_URL=postgresql://postgres:example@localhost:5432 TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) node --max-old-space-size=18000 ./bundle/bundle.js",
"run-local": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3333 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost MONGO_URL=mongodb://localhost:27017 DB_URL=mongodb://localhost:27017 TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) node --expose-gc --max-old-space-size=18000 ./bundle/bundle.js",
"run-local-pg": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3333 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost MONGO_URL=mongodb://localhost:27017 DB_URL=postgresql://postgres:example@localhost:5432 TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) node --expose-gc --max-old-space-size=18000 ./bundle/bundle.js",
"run-local-brk": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3333 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost MONGO_URL=mongodb://localhost:27017 DB_URL=mongodb://localhost:27017 TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) node --inspect-brk --enable-source-maps --max-old-space-size=18000 ./bundle/bundle.js",
"run": "rush bundle --to @hcengineering/tool >/dev/null && cross-env node --max-old-space-size=8000 ./bundle/bundle.js",
"upgrade": "rushx run-local upgrade",

View File

@ -17,4 +17,4 @@ COPY bundle/bundle.js.map ./
COPY bundle/model.json ./
EXPOSE 3000
CMD [ "node", "bundle.js" ]
CMD [ "node", "--expose-gc", "bundle.js" ]

View File

@ -26,6 +26,7 @@ import core, {
DOMAIN_FULLTEXT_BLOB,
DOMAIN_MODEL,
DOMAIN_TRANSIENT,
DOMAIN_TX,
MeasureContext,
MeasureMetricsContext,
RateLimiter,
@ -44,8 +45,9 @@ import { type StorageAdapter } from '@hcengineering/server-core'
import { fullTextPushStagePrefix } from '@hcengineering/server-indexer'
import { generateToken } from '@hcengineering/server-token'
import { connect } from '@hcengineering/server-tool'
import { createWriteStream, existsSync, mkdirSync } from 'node:fs'
import { dirname } from 'node:path'
import { createReadStream, createWriteStream, existsSync, mkdirSync } from 'node:fs'
import { rm } from 'node:fs/promises'
import { basename, dirname } from 'node:path'
import { PassThrough } from 'node:stream'
import { createGzip } from 'node:zlib'
import { join } from 'path'
@ -662,6 +664,13 @@ export async function backup (
(options.include === undefined || options.include.has(it))
)
]
domains.sort((a, b) => {
if (a === DOMAIN_TX) {
return -1
}
return a.localeCompare(b)
})
ctx.info('domains for dump', { domains: domains.length })
@ -863,12 +872,15 @@ export async function backup (
const digest = await ctx.with('load-digest', {}, (ctx) => loadDigest(ctx, storage, backupInfo.snapshots, domain))
let _pack: Pack | undefined
let _packClose = async (): Promise<void> => {}
let addedDocuments = (): number => 0
progress(0)
let { changed, needRetrieveChunks } = await ctx.with('load-chunks', { domain }, (ctx) =>
loadChangesFromServer(ctx, domain, digest, changes)
)
processedChanges.removed = Array.from(digest.keys())
digest.clear()
progress(10)
if (needRetrieveChunks.length > 0) {
@ -879,6 +891,10 @@ export async function backup (
let processed = 0
let blobs = 0
try {
global.gc?.()
} catch (err) {}
while (needRetrieveChunks.length > 0) {
if (canceled()) {
return
@ -910,11 +926,16 @@ export async function backup (
while (docs.length > 0) {
// Chunk data into small pieces
if (addedDocuments() > dataBlobSize && _pack !== undefined) {
_pack.finalize()
_pack = undefined
if (
(addedDocuments() > dataBlobSize || processedChanges.added.size + processedChanges.updated.size > 500000) &&
_pack !== undefined
) {
await _packClose()
if (changed > 0) {
try {
global.gc?.()
} catch (err) {}
snapshot.domains[domain] = domainInfo
domainInfo.added += processedChanges.added.size
domainInfo.updated += processedChanges.updated.size
@ -940,7 +961,9 @@ export async function backup (
const storageFile = join(backupIndex, `${domain}-data-${snapshot.date}-${stIndex}.tar.gz`)
ctx.info('storing from domain', { domain, storageFile, workspace: workspaceId.name })
domainInfo.storage = [...(domainInfo.storage ?? []), storageFile]
const dataStream = await storage.write(storageFile)
const tmpFile = basename(storageFile) + '.tmp'
const tempFile = createWriteStream(tmpFile)
// const dataStream = await storage.write(storageFile)
const sizePass = new PassThrough()
let sz = 0
@ -951,12 +974,26 @@ export async function backup (
cb()
}
sizePass.pipe(dataStream)
sizePass.pipe(tempFile)
const storageZip = createGzip({ level: defaultLevel, memLevel: 9 })
addedDocuments = () => sz
_pack.pipe(storageZip)
storageZip.pipe(sizePass)
_packClose = async () => {
_pack?.finalize()
storageZip.destroy()
_pack?.destroy()
tempFile.destroy()
// We need to upload file to storage
ctx.info('Upload pack file', { storageFile, size: sz, workspace: workspaceId.name })
await storage.writeFile(storageFile, createReadStream(tmpFile))
await rm(tmpFile)
_pack = undefined
}
}
if (canceled()) {
return
@ -1025,7 +1062,7 @@ export async function backup (
}
})
const finalBuffer = Buffer.concat(buffers)
const finalBuffer = Buffer.concat(buffers as any)
if (finalBuffer.length !== blob.size) {
ctx.error('download blob size mismatch', {
_id: blob._id,
@ -1078,7 +1115,7 @@ export async function backup (
}
}
}
processedChanges.removed = Array.from(digest.keys())
if (processedChanges.removed.length > 0) {
changed++
}
@ -1097,7 +1134,7 @@ export async function backup (
processedChanges.added.clear()
processedChanges.removed = []
processedChanges.updated.clear()
_pack?.finalize()
await _packClose()
// This will allow to retry in case of critical error.
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }))
}
@ -1108,6 +1145,14 @@ export async function backup (
if (canceled()) {
break
}
const oldUsed = process.memoryUsage().heapUsed
try {
global.gc?.()
} catch (err) {}
ctx.info('memory-stats', {
old: Math.round(oldUsed / (1024 * 1024)),
current: Math.round(process.memoryUsage().heapUsed / (1024 * 1024))
})
await ctx.with('process-domain', { domain }, async (ctx) => {
await processDomain(ctx, domain, (value) => {
options.progress?.(Math.round(((domainProgress + value / 100) / domains.length) * 100))
@ -1458,6 +1503,12 @@ export async function restore (
// We need to load full changeset from server
const serverChangeset = new Map<Ref<Doc>, string>()
const oldUsed = process.memoryUsage().heapUsed
try {
global.gc?.()
} catch (err) {}
ctx.info('memory-stats', { old: oldUsed / (1024 * 1024), current: process.memoryUsage().heapUsed / (1024 * 1024) })
let idx: number | undefined
let loaded = 0
let el = 0

View File

@ -125,7 +125,9 @@ class BackupWorker {
}
return !workspacesIgnore.has(it.workspace)
})
workspaces.sort((a, b) => b.lastVisit - a.lastVisit)
workspaces.sort((a, b) => {
return (b.backupInfo?.backupSize ?? 0) - (a.backupInfo?.backupSize ?? 0)
})
ctx.info('Preparing for BACKUP', {
total: workspaces.length,

View File

@ -12,7 +12,8 @@ export interface BackupStorage {
loadFile: (name: string) => Promise<Buffer>
load: (name: string) => Promise<Readable>
write: (name: string) => Promise<Writable>
writeFile: (name: string, data: string | Buffer) => Promise<void>
writeFile: (name: string, data: string | Buffer | Readable) => Promise<void>
exists: (name: string) => Promise<boolean>
stat: (name: string) => Promise<number>
@ -51,14 +52,14 @@ class FileStorage implements BackupStorage {
await rm(join(this.root, name))
}
async writeFile (name: string, data: string | Buffer): Promise<void> {
async writeFile (name: string, data: string | Buffer | Readable): Promise<void> {
const fileName = join(this.root, name)
const dir = dirname(fileName)
if (!existsSync(dir)) {
await mkdir(dir, { recursive: true })
}
await writeFile(fileName, data)
await writeFile(fileName, data as any)
}
}
@ -72,7 +73,7 @@ class AdapterStorage implements BackupStorage {
async loadFile (name: string): Promise<Buffer> {
const data = await this.client.read(this.ctx, this.workspaceId, join(this.root, name))
return Buffer.concat(data)
return Buffer.concat(data as any)
}
async write (name: string): Promise<Writable> {
@ -106,16 +107,9 @@ class AdapterStorage implements BackupStorage {
await this.client.remove(this.ctx, this.workspaceId, [join(this.root, name)])
}
async writeFile (name: string, data: string | Buffer): Promise<void> {
async writeFile (name: string, data: string | Buffer | Readable): Promise<void> {
// TODO: add mime type detection here.
await this.client.put(
this.ctx,
this.workspaceId,
join(this.root, name),
data,
'application/octet-stream',
data.length
)
await this.client.put(this.ctx, this.workspaceId, join(this.root, name), data, 'application/octet-stream')
}
}