mirror of
https://github.com/hcengineering/platform.git
synced 2024-12-22 19:11:33 +03:00
Fix for blobs backup (#6751)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
parent
b359d080ac
commit
28dbc1bae5
@ -196,7 +196,13 @@
|
||||
{#if isAdmin && ws.lastVisit != null && ws.lastVisit !== 0}
|
||||
<div class="text-sm">
|
||||
{#if ws.backupInfo != null}
|
||||
{ws.backupInfo.backupSize}Mb -
|
||||
{@const sz = ws.backupInfo.dataSize + ws.backupInfo.blobsSize}
|
||||
{@const szGb = Math.round((sz * 100) / 1024) / 100}
|
||||
{#if szGb > 0}
|
||||
{Math.round((sz * 100) / 1024) / 100}Gb -
|
||||
{:else}
|
||||
{Math.round(sz)}Mb -
|
||||
{/if}
|
||||
{/if}
|
||||
({lastUsageDays} days)
|
||||
</div>
|
||||
|
@ -43,7 +43,7 @@ import { BlobClient, createClient } from '@hcengineering/server-client'
|
||||
import { fullTextPushStagePrefix, type StorageAdapter } from '@hcengineering/server-core'
|
||||
import { generateToken } from '@hcengineering/server-token'
|
||||
import { connect } from '@hcengineering/server-tool'
|
||||
import { createWriteStream, existsSync, mkdirSync, statSync } from 'node:fs'
|
||||
import { createWriteStream, existsSync, mkdirSync } from 'node:fs'
|
||||
import { dirname } from 'node:path'
|
||||
import { PassThrough } from 'node:stream'
|
||||
import { createGzip } from 'node:zlib'
|
||||
@ -132,6 +132,7 @@ async function loadDigest (
|
||||
date?: number
|
||||
): Promise<Map<Ref<Doc>, string>> {
|
||||
ctx = ctx.newChild('load digest', { domain, count: snapshots.length })
|
||||
ctx.info('load-digest', { domain, count: snapshots.length })
|
||||
const result = new Map<Ref<Doc>, string>()
|
||||
for (const s of snapshots) {
|
||||
const d = s.domains[domain]
|
||||
@ -492,9 +493,9 @@ async function cleanDomain (ctx: MeasureContext, connection: CoreClient & Backup
|
||||
}
|
||||
}
|
||||
|
||||
function doTrimHash (s: string | undefined): string {
|
||||
function doTrimHash (s: string | undefined): string | undefined {
|
||||
if (s == null) {
|
||||
return ''
|
||||
return undefined
|
||||
}
|
||||
if (s.startsWith('"') && s.endsWith('"')) {
|
||||
return s.slice(1, s.length - 1)
|
||||
@ -716,6 +717,24 @@ export async function backup (
|
||||
time: Date.now() - st,
|
||||
workspace: workspaceId.name
|
||||
})
|
||||
const oldHash = new Map<Ref<Doc>, string>()
|
||||
|
||||
function removeFromNeedRetrieve (needRetrieve: Ref<Doc>[], id: string): void {
|
||||
const pos = needRetrieve.indexOf(id as Ref<Doc>)
|
||||
if (pos !== -1) {
|
||||
needRetrieve.splice(pos, 1)
|
||||
processed--
|
||||
changed--
|
||||
}
|
||||
for (const ch of needRetrieveChunks) {
|
||||
const pos = ch.indexOf(id as Ref<Doc>)
|
||||
if (pos !== -1) {
|
||||
ch.splice(pos, 1)
|
||||
processed--
|
||||
changed--
|
||||
}
|
||||
}
|
||||
}
|
||||
while (true) {
|
||||
try {
|
||||
const currentChunk = await ctx.with('loadChunk', {}, () => connection.loadChunk(domain, idx, options.recheck))
|
||||
@ -741,17 +760,31 @@ export async function backup (
|
||||
})
|
||||
st = Date.now()
|
||||
}
|
||||
const _hash = doTrimHash(hash)
|
||||
const kHash = doTrimHash(digest.get(id as Ref<Doc>))
|
||||
const _hash = doTrimHash(hash) as string
|
||||
const kHash = doTrimHash(digest.get(id as Ref<Doc>) ?? oldHash.get(id as Ref<Doc>))
|
||||
if (kHash !== undefined) {
|
||||
digest.delete(id as Ref<Doc>)
|
||||
if (digest.delete(id as Ref<Doc>)) {
|
||||
oldHash.set(id as Ref<Doc>, kHash)
|
||||
}
|
||||
if (kHash !== _hash) {
|
||||
if (changes.updated.has(id as Ref<Doc>)) {
|
||||
removeFromNeedRetrieve(needRetrieve, id as Ref<Doc>)
|
||||
}
|
||||
changes.updated.set(id as Ref<Doc>, _hash)
|
||||
needRetrieve.push(id as Ref<Doc>)
|
||||
currentNeedRetrieveSize += size
|
||||
changed++
|
||||
} else if (changes.updated.has(id as Ref<Doc>)) {
|
||||
// We have same
|
||||
changes.updated.delete(id as Ref<Doc>)
|
||||
removeFromNeedRetrieve(needRetrieve, id as Ref<Doc>)
|
||||
processed -= 1
|
||||
}
|
||||
} else {
|
||||
if (domain === DOMAIN_BLOB && changes.added.has(id as Ref<Doc>)) {
|
||||
// We need to clean old need retrieve in case of duplicates.
|
||||
removeFromNeedRetrieve(needRetrieve, id)
|
||||
}
|
||||
changes.added.set(id as Ref<Doc>, _hash)
|
||||
needRetrieve.push(id as Ref<Doc>)
|
||||
changed++
|
||||
@ -759,7 +792,9 @@ export async function backup (
|
||||
}
|
||||
|
||||
if (currentNeedRetrieveSize > retrieveChunkSize) {
|
||||
if (needRetrieve.length > 0) {
|
||||
needRetrieveChunks.push(needRetrieve)
|
||||
}
|
||||
currentNeedRetrieveSize = 0
|
||||
needRetrieve = []
|
||||
}
|
||||
@ -841,12 +876,17 @@ export async function backup (
|
||||
|
||||
const totalChunks = needRetrieveChunks.flatMap((it) => it.length).reduce((p, c) => p + c, 0)
|
||||
let processed = 0
|
||||
let blobs = 0
|
||||
|
||||
while (needRetrieveChunks.length > 0) {
|
||||
if (canceled()) {
|
||||
return
|
||||
}
|
||||
const needRetrieve = needRetrieveChunks.shift() as Ref<Doc>[]
|
||||
|
||||
if (needRetrieve.length === 0) {
|
||||
continue
|
||||
}
|
||||
ctx.info('Retrieve chunk', {
|
||||
needRetrieve: needRetrieveChunks.reduce((v, docs) => v + docs.length, 0),
|
||||
toLoad: needRetrieve.length,
|
||||
@ -855,6 +895,10 @@ export async function backup (
|
||||
let docs: Doc[] = []
|
||||
try {
|
||||
docs = await ctx.with('load-docs', {}, async (ctx) => await connection.loadDocs(domain, needRetrieve))
|
||||
if (docs.length !== needRetrieve.length) {
|
||||
const nr = new Set(docs.map((it) => it._id))
|
||||
ctx.error('failed to retrieve all documents', { missing: needRetrieve.filter((it) => !nr.has(it)) })
|
||||
}
|
||||
ops++
|
||||
} catch (err: any) {
|
||||
ctx.error('error loading docs', { domain, err, workspace: workspaceId.name })
|
||||
@ -998,7 +1042,8 @@ export async function backup (
|
||||
ctx.error('error packing file', { err })
|
||||
}
|
||||
})
|
||||
if (blob.size > 1024 * 1024) {
|
||||
blobs++
|
||||
if (blob.size > 1024 * 1024 || blobs >= 10) {
|
||||
ctx.info('download blob', {
|
||||
_id: blob._id,
|
||||
contentType: blob.contentType,
|
||||
@ -1006,6 +1051,9 @@ export async function backup (
|
||||
provider: blob.provider,
|
||||
pending: docs.length
|
||||
})
|
||||
if (blobs >= 10) {
|
||||
blobs = 0
|
||||
}
|
||||
}
|
||||
|
||||
printDownloaded('', blob.size)
|
||||
@ -1179,15 +1227,16 @@ export async function backupDownload (storage: BackupStorage, storeIn: string):
|
||||
|
||||
const backupInfo: BackupInfo = JSON.parse(gunzipSync(await storage.loadFile(infoFile)).toString())
|
||||
console.log('workspace:', backupInfo.workspace ?? '', backupInfo.version)
|
||||
const addFileSize = async (file: string | undefined | null): Promise<void> => {
|
||||
if (file != null && (await storage.exists(file))) {
|
||||
const fileSize = await storage.stat(file)
|
||||
|
||||
const addFileSize = async (file: string | undefined | null, force: boolean = false): Promise<void> => {
|
||||
if (file != null) {
|
||||
const target = join(storeIn, file)
|
||||
const dir = dirname(target)
|
||||
if (!existsSync(dir)) {
|
||||
mkdirSync(dir, { recursive: true })
|
||||
}
|
||||
if (!existsSync(target) || fileSize !== statSync(target).size) {
|
||||
if (!existsSync(target) || force) {
|
||||
const fileSize = await storage.stat(file)
|
||||
console.log('downloading', file, fileSize)
|
||||
const readStream = await storage.load(file)
|
||||
const outp = createWriteStream(target)
|
||||
@ -1200,8 +1249,10 @@ export async function backupDownload (storage: BackupStorage, storeIn: string):
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
}
|
||||
size += fileSize
|
||||
} else {
|
||||
console.log('file-same', file)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1217,7 +1268,7 @@ export async function backupDownload (storage: BackupStorage, storeIn: string):
|
||||
}
|
||||
}
|
||||
}
|
||||
await addFileSize(infoFile)
|
||||
await addFileSize(infoFile, true)
|
||||
|
||||
console.log('Backup size', size / (1024 * 1024), 'Mb')
|
||||
}
|
||||
@ -1693,7 +1744,7 @@ export async function compactBackup (
|
||||
const oldSnapshots = [...backupInfo.snapshots]
|
||||
|
||||
backupInfo.snapshots = [snapshot]
|
||||
let backupIndex = `${backupInfo.snapshotsIndex ?? oldSnapshots.length}`
|
||||
let backupIndex = `${(backupInfo.snapshotsIndex ?? oldSnapshots.length) + 1}`
|
||||
while (backupIndex.length < 6) {
|
||||
backupIndex = '0' + backupIndex
|
||||
}
|
||||
|
@ -95,9 +95,12 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
|
||||
for (const d of docs) {
|
||||
const blobInfo = existingBlobs.get(d._id)
|
||||
if (
|
||||
blobInfo === undefined ||
|
||||
this.doTrimHash(blobInfo.etag) !== this.doTrimHash(d.etag) ||
|
||||
blobInfo.size !== d.size
|
||||
blobInfo === undefined || // Blob info undefined
|
||||
// Provider are same and etag or size are diffrent.
|
||||
(d.provider === blobInfo.provider &&
|
||||
(this.doTrimHash(blobInfo.etag) !== this.doTrimHash(d.etag) || blobInfo.size !== d.size)) ||
|
||||
// We have replacement in default
|
||||
(d.provider === this.defaultAdapter && blobInfo?.provider !== d.provider)
|
||||
) {
|
||||
const stat = await this.adapters.get(d.provider)?.stat(ctx, workspaceId, d._id)
|
||||
if (stat !== undefined) {
|
||||
|
Loading…
Reference in New Issue
Block a user