UBERF-6313: Improve backup/restore (#5241)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-04-08 17:43:01 +07:00 committed by GitHub
parent 8b9fc860a0
commit 9e183faf6b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 81 additions and 43 deletions

View File

@ -93,7 +93,7 @@ class ServerStorageWrapper implements ClientConnection {
async close (): Promise<void> {}
async loadChunk (domain: Domain, idx?: number): Promise<DocChunk> {
return { idx: -1, docs: {}, finished: true }
return { idx: -1, docs: [], finished: true }
}
async closeChunk (idx: number): Promise<void> {}

View File

@ -111,7 +111,7 @@ describe('client', () => {
loadChunk: async (domain: Domain, idx?: number) => ({
idx: -1,
index: -1,
docs: {},
docs: [],
finished: true,
digest: ''
}),

View File

@ -62,7 +62,7 @@ export async function connect (handler: (tx: Tx) => void): Promise<ClientConnect
loadChunk: async (domain: Domain, idx?: number) => ({
idx: -1,
index: -1,
docs: {},
docs: [],
finished: true,
digest: ''
}),

View File

@ -1,4 +1,5 @@
import { Doc, Domain, Ref } from './classes'
import { DocInfo } from './server'
/**
* @public
@ -8,7 +9,7 @@ import { Doc, Domain, Ref } from './classes'
export interface DocChunk {
idx: number
// _id => hash mapping
docs: Record<string, string>
docs: DocInfo[]
finished: boolean
}

View File

@ -2,7 +2,7 @@ const { join, dirname } = require("path")
const { readFileSync, existsSync, mkdirSync, createWriteStream } = require('fs')
const { spawn } = require('child_process')
async function execProcess(cmd, logFile, args) {
async function execProcess(cmd, logFile, args, useConsole) {
let compileRoot = dirname(dirname(process.argv[1]))
console.log("Svelte check...\n", process.cwd(), args)
@ -22,6 +22,10 @@ async function execProcess(cmd, logFile, args) {
compileOut.stdout.pipe(outPipe)
compileOut.stdout.on('end', function (data) {
outPipe.close()
if( useConsole ) {
console.log(readFileSync(stdoutFilePath).toString())
console.log(readFileSync(stderrFilePath).toString())
}
resolve()
})
} else {
@ -64,14 +68,22 @@ async function execProcess(cmd, logFile, args) {
}
}
let args = process.argv.splice(2)
let args = [] // process.argv.slice(2)
let useConsole = false
for(const a of process.argv.slice(2)) {
if( a === '--console') {
useConsole = true
} else {
args.push(a)
}
}
let st = Date.now()
execProcess(
'svelte-check',
'svelte-check', [
'--output', 'human',
...process.argv.splice(2)
])
...args
], useConsole)
.then(() => {
console.log("Svelte check time: ", Date.now() - st)
})

View File

@ -83,7 +83,7 @@ FulltextStorage & {
loadChunk: async (domain: Domain, idx?: number) => ({
idx: -1,
index: -1,
docs: {},
docs: [],
finished: true,
digest: ''
}),

View File

@ -39,6 +39,7 @@ export * from './storage'
const dataBlobSize = 50 * 1024 * 1024
const dataUploadSize = 2 * 1024 * 1024
const retrieveChunkSize = 2 * 1024 * 1024
const defaultLevel = 9
@ -241,17 +242,25 @@ export async function cloneWorkspace (
const it = await sourceConnection.loadChunk(c, idx)
idx = it.idx
const needRetrieve: Ref<Doc>[] = []
let needRetrieve: Ref<Doc>[] = []
let needRetrieveSize = 0
for (const [k, v] of Object.entries(it.docs)) {
for (const { id, hash, size } of it.docs) {
processed++
if (Date.now() - st > 2500) {
console.log('processed', processed, Date.now() - st)
st = Date.now()
}
changes.added.set(k as Ref<Doc>, v)
needRetrieve.push(k as Ref<Doc>)
changes.added.set(id as Ref<Doc>, hash)
needRetrieve.push(id as Ref<Doc>)
needRetrieveSize += size
if (needRetrieveSize > retrieveChunkSize) {
needRetrieveChunks.push(needRetrieve)
needRetrieveSize = 0
needRetrieve = []
}
}
if (needRetrieve.length > 0) {
needRetrieveChunks.push(needRetrieve)
@ -375,11 +384,11 @@ export async function backup (
if (lastTx._id === backupInfo.lastTxId && !force) {
console.log('No transaction changes. Skipping backup.')
return
} else {
backupInfo.lastTxId = lastTx._id
}
}
backupInfo.lastTxId = '' // Clear until full backup will be complete
const snapshot: BackupSnapshot = {
date: Date.now(),
domains: {}
@ -434,35 +443,44 @@ export async function backup (
// Load all digest from collection.
while (true) {
try {
const it = await connection.loadChunk(domain, idx)
idx = it.idx
const currentChunk = await connection.loadChunk(domain, idx)
idx = currentChunk.idx
const needRetrieve: Ref<Doc>[] = []
let needRetrieve: Ref<Doc>[] = []
let currentNeedRetrieveSize = 0
for (const [k, v] of Object.entries(it.docs)) {
for (const { id, hash, size } of currentChunk.docs) {
processed++
if (Date.now() - st > 2500) {
console.log('processed', processed, digest.size, Date.now() - st)
st = Date.now()
}
const kHash = digest.get(k as Ref<Doc>)
const kHash = digest.get(id as Ref<Doc>)
if (kHash !== undefined) {
digest.delete(k as Ref<Doc>)
if (kHash !== v) {
changes.updated.set(k as Ref<Doc>, v)
needRetrieve.push(k as Ref<Doc>)
digest.delete(id as Ref<Doc>)
if (kHash !== hash) {
changes.updated.set(id as Ref<Doc>, hash)
needRetrieve.push(id as Ref<Doc>)
currentNeedRetrieveSize += size
changed++
}
} else {
changes.added.set(k as Ref<Doc>, v)
needRetrieve.push(k as Ref<Doc>)
changes.added.set(id as Ref<Doc>, hash)
needRetrieve.push(id as Ref<Doc>)
changed++
currentNeedRetrieveSize += size
}
if (currentNeedRetrieveSize > retrieveChunkSize) {
needRetrieveChunks.push(needRetrieve)
currentNeedRetrieveSize = 0
needRetrieve = []
}
}
if (needRetrieve.length > 0) {
needRetrieveChunks.push(needRetrieve)
}
if (it.finished) {
if (currentChunk.finished) {
await connection.closeChunk(idx)
break
}
@ -510,7 +528,10 @@ export async function backup (
processedChanges.added.clear()
processedChanges.removed = []
processedChanges.updated.clear()
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2)))
await storage.writeFile(
infoFile,
gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })
)
}
}
if (_pack === undefined) {
@ -583,12 +604,13 @@ export async function backup (
processedChanges.updated.clear()
_pack?.finalize()
// This will allow to retry in case of critical error.
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2)))
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }))
}
}
backupInfo.snapshotsIndex = backupInfo.snapshots.length
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2)))
backupInfo.lastTxId = lastTx?._id ?? '0' // We could store last tx, since full backup is complete
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }))
} catch (err: any) {
console.error(err)
} finally {
@ -680,8 +702,8 @@ export async function restore (
idx = it.idx
el += Date.now() - st
for (const [_id, hash] of Object.entries(it.docs)) {
serverChangeset.set(_id as Ref<Doc>, hash)
for (const { id, hash } of it.docs) {
serverChangeset.set(id as Ref<Doc>, hash)
loaded++
}
@ -979,7 +1001,10 @@ export async function compactBackup (storage: BackupStorage, force: boolean = fa
processedChanges.added.clear()
processedChanges.removed = []
processedChanges.updated.clear()
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2)))
await storage.writeFile(
infoFile,
gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })
)
}
}
if (_pack === undefined) {
@ -1154,7 +1179,7 @@ export async function compactBackup (storage: BackupStorage, force: boolean = fa
processedChanges.updated.clear()
_pack?.finalize()
// This will allow to retry in case of critical error.
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2)))
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }))
}
}
@ -1176,7 +1201,7 @@ export async function compactBackup (storage: BackupStorage, force: boolean = fa
}
backupInfo.snapshotsIndex = backupInfo.snapshots.length
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2)))
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }))
} catch (err: any) {
console.error(err)
} finally {

View File

@ -1,9 +1,9 @@
import { Doc, DocChunk, Domain, MeasureContext, Ref, StorageIterator } from '@hcengineering/core'
import { Pipeline } from '@hcengineering/server-core'
import { Doc, DocChunk, DocInfo, Domain, MeasureContext, Ref, StorageIterator } from '@hcengineering/core'
import { estimateDocSize, Pipeline } from '@hcengineering/server-core'
import { Token } from '@hcengineering/server-token'
import { BroadcastCall, ClientSession, Session } from '@hcengineering/server-ws'
const chunkSize = 1024 * 1024
const chunkSize = 2 * 1024 * 1024
/**
* @public
@ -48,7 +48,7 @@ export class BackupClientSession extends ClientSession implements BackupSession
if (chunk.finished === undefined) {
return {
idx,
docs: {},
docs: [],
finished: true
}
}
@ -57,7 +57,7 @@ export class BackupClientSession extends ClientSession implements BackupSession
this.chunkInfo.set(idx, chunk)
}
let size = 0
const docs: Record<string, string> = {}
const docs: DocInfo[] = []
while (size < chunkSize) {
const doc = await chunk.iterator.next(ctx)
@ -66,8 +66,8 @@ export class BackupClientSession extends ClientSession implements BackupSession
break
}
size = size + doc.size
docs[doc.id] = doc.hash
size += estimateDocSize(doc)
docs.push(doc)
}
return {

View File

@ -211,7 +211,7 @@ export function startHttpServer (
)
if ('upgrade' in session || 'error' in session) {
if ('error' in session) {
void ctx.error('error', { error: session.error })
void ctx.error('error', { error: session.error?.message, stack: session.error?.stack })
}
cs.close()
return