diff --git a/server/backup-service/package.json b/server/backup-service/package.json index 07769d2ddf..bb9d7dedd2 100644 --- a/server/backup-service/package.json +++ b/server/backup-service/package.json @@ -44,6 +44,7 @@ "tar-stream": "^2.2.0", "@hcengineering/server-tool": "^0.6.0", "@hcengineering/server-core": "^0.6.1", + "@hcengineering/server-storage": "^0.6.0", "@hcengineering/server-backup": "^0.6.0", "@hcengineering/minio": "^0.6.0", "@hcengineering/server-token": "^0.6.7" diff --git a/server/backup-service/src/config.ts b/server/backup-service/src/config.ts index f1fb66c2fe..81b1955083 100644 --- a/server/backup-service/src/config.ts +++ b/server/backup-service/src/config.ts @@ -25,9 +25,7 @@ interface Config extends Omit { Timeout: number // Timeout in seconds BucketName: string - MinioEndpoint: string - MinioAccessKey: string - MinioSecretKey: string + MongoURL: string } const envMap: { [key in keyof Config]: string } = { @@ -37,22 +35,11 @@ const envMap: { [key in keyof Config]: string } = { Secret: 'SECRET', BucketName: 'BUCKET_NAME', Interval: 'INTERVAL', - MinioEndpoint: 'MINIO_ENDPOINT', - MinioAccessKey: 'MINIO_ACCESS_KEY', - MinioSecretKey: 'MINIO_SECRET_KEY', - Timeout: 'TIMEOUT' + Timeout: 'TIMEOUT', + MongoURL: 'MONGO_URL' } -const required: Array = [ - 'TransactorURL', - 'AccountsURL', - 'Secret', - 'ServiceID', - 'BucketName', - 'MinioEndpoint', - 'MinioAccessKey', - 'MinioSecretKey' -] +const required: Array = ['TransactorURL', 'AccountsURL', 'Secret', 'ServiceID', 'BucketName', 'MongoURL'] const config: Config = (() => { const params: Partial = { @@ -62,10 +49,8 @@ const config: Config = (() => { BucketName: process.env[envMap.BucketName] ?? 'backups', ServiceID: process.env[envMap.ServiceID] ?? 'backup-service', Interval: parseInt(process.env[envMap.Interval] ?? '3600'), - MinioEndpoint: process.env[envMap.MinioEndpoint], - MinioAccessKey: process.env[envMap.MinioAccessKey], - MinioSecretKey: process.env[envMap.MinioSecretKey], - Timeout: parseInt(process.env[envMap.Timeout] ?? '3600') + Timeout: parseInt(process.env[envMap.Timeout] ?? '3600'), + MongoURL: process.env[envMap.MongoURL] } const missingEnv = required.filter((key) => params[key] === undefined).map((key) => envMap[key]) diff --git a/server/backup-service/src/index.ts b/server/backup-service/src/index.ts index e5d6d57497..7bf123468e 100644 --- a/server/backup-service/src/index.ts +++ b/server/backup-service/src/index.ts @@ -14,32 +14,18 @@ // import { MeasureContext, systemAccountEmail } from '@hcengineering/core' -import { MinioService } from '@hcengineering/minio' import { setMetadata } from '@hcengineering/platform' import { backupService } from '@hcengineering/server-backup' +import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage' import serverToken, { generateToken } from '@hcengineering/server-token' import toolPlugin from '@hcengineering/server-tool' import config from './config' -import { StorageAdapter } from '@hcengineering/server-core' export function startBackup (ctx: MeasureContext): void { setMetadata(serverToken.metadata.Secret, config.Secret) - let minioPort = 9000 - let minioEndpoint = config.MinioEndpoint - const sp = minioEndpoint.split(':') - if (sp.length > 1) { - minioEndpoint = sp[0] - minioPort = parseInt(sp[1]) - } - - const storageAdapter: StorageAdapter = new MinioService({ - endpoint: minioEndpoint, - port: minioPort, - useSSL: 'false', - accessKey: config.MinioAccessKey, - secretKey: config.MinioSecretKey - }) + const storageConfiguration = storageConfigFromEnv() + const storageAdapter = buildStorageFromConfig(storageConfiguration, config.MongoURL) setMetadata(toolPlugin.metadata.UserAgent, config.ServiceID) diff --git a/server/backup/src/backup.ts b/server/backup/src/backup.ts index a686f4aed2..b0e555cba7 100644 --- a/server/backup/src/backup.ts +++ b/server/backup/src/backup.ts @@ -664,32 +664,59 @@ export async function backup ( break } - // Move processed document to processedChanges - if (changes.added.has(d._id)) { - processedChanges.added.set(d._id, changes.added.get(d._id) ?? '') - changes.added.delete(d._id) - } else { - processedChanges.updated.set(d._id, changes.updated.get(d._id) ?? '') - changes.updated.delete(d._id) + function processChanges (d: Doc, error: boolean = false): void { + // Move processed document to processedChanges + if (changes.added.has(d._id)) { + if (!error) { + processedChanges.added.set(d._id, changes.added.get(d._id) ?? '') + } + changes.added.delete(d._id) + } else { + if (!error) { + processedChanges.updated.set(d._id, changes.updated.get(d._id) ?? '') + } + changes.updated.delete(d._id) + } } if (d._class === core.class.Blob) { const blob = d as Blob const descrJson = JSON.stringify(d) addedDocuments += descrJson.length addedDocuments += blob.size + + let blobFiled = false + if (!(await blobClient.checkFile(ctx, blob._id))) { + ctx.error('failed to download blob', { blob: blob._id, provider: blob.provider }) + processChanges(d, true) + continue + } + _pack.entry({ name: d._id + '.json' }, descrJson, function (err) { if (err != null) throw err }) - - _pack.entry({ name: d._id }, await blobClient.pipeFromStorage(blob._id, blob.size), function (err) { - if (err != null) throw err - }) + try { + const entry = _pack?.entry({ name: d._id, size: blob.size }, (err) => { + if (err != null) { + ctx.error('error packing file', err) + } + }) + await blobClient.writeTo(ctx, blob._id, blob.size, entry) + } catch (err: any) { + if (err.message?.startsWith('No file for') === true) { + ctx.error('failed to download blob', { message: err.message }) + } else { + ctx.error('failed to download blob', { err }) + } + blobFiled = true + } + processChanges(d, blobFiled) } else { const data = JSON.stringify(d) addedDocuments += data.length _pack.entry({ name: d._id + '.json' }, data, function (err) { if (err != null) throw err }) + processChanges(d) } } } diff --git a/server/core/src/adapter.ts b/server/core/src/adapter.ts index c5af59a3f6..977e25575b 100644 --- a/server/core/src/adapter.ts +++ b/server/core/src/adapter.ts @@ -127,5 +127,5 @@ export type DbAdapterFactory = ( url: string, workspaceId: WorkspaceId, modelDb: ModelDb, - storage?: StorageAdapter + storage: StorageAdapter ) => Promise diff --git a/server/server-storage/src/blobStorage.ts b/server/server-storage/src/blobStorage.ts index 6eebb42956..4ce82ba230 100644 --- a/server/server-storage/src/blobStorage.ts +++ b/server/server-storage/src/blobStorage.ts @@ -94,15 +94,13 @@ export async function createStorageDataAdapter ( url: string, workspaceId: WorkspaceId, modelDb: ModelDb, - storage?: StorageAdapter + storage: StorageAdapter ): Promise { if (storage === undefined) { throw new Error('minio storage adapter require minio') } // We need to create bucket if it doesn't exist - if (storage !== undefined) { - await storage.make(ctx, workspaceId) - } + await storage.make(ctx, workspaceId) const blobAdapter = await createMongoAdapter(ctx, hierarchy, url, workspaceId, modelDb, undefined, { calculateHash: (d) => { return (d as Blob).etag diff --git a/server/tool/src/connect.ts b/server/tool/src/connect.ts index 3d2617abd0..c0d42acd9e 100644 --- a/server/tool/src/connect.ts +++ b/server/tool/src/connect.ts @@ -15,11 +15,19 @@ // import client, { clientId } from '@hcengineering/client' -import { Client, LoadModelResponse, systemAccountEmail, Tx, WorkspaceId } from '@hcengineering/core' +import { + Client, + LoadModelResponse, + systemAccountEmail, + Tx, + WorkspaceId, + type MeasureContext +} from '@hcengineering/core' import { addLocation, getMetadata, getResource, setMetadata } from '@hcengineering/platform' import { generateToken } from '@hcengineering/server-token' import { mkdtempSync } from 'fs' import crypto from 'node:crypto' +import { type Writable } from 'stream' import plugin from './plugin' /** @@ -89,14 +97,33 @@ export class BlobClient { this.tmpDir = mkdtempSync('blobs') } - async pipeFromStorage (name: string, size: number): Promise { + async checkFile (ctx: MeasureContext, name: string): Promise { + try { + const response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, { + headers: { + Authorization: 'Bearer ' + this.token, + Range: 'bytes=0-1' + } + }) + if (response.status === 404) { + return false + } + const buff = await response.arrayBuffer() + return buff.byteLength > 0 + } catch (err: any) { + ctx.error('Failed to check file', { name, error: err }) + return false + } + } + + async writeTo (ctx: MeasureContext, name: string, size: number, writable: Writable): Promise { let written = 0 const chunkSize = 1024 * 1024 - const chunks: Buffer[] = [] // Use ranges to iterave through file with retry if required. while (written < size) { - for (let i = 0; i < 5; i++) { + let i = 0 + for (; i < 5; i++) { try { const response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, { headers: { @@ -104,23 +131,40 @@ export class BlobClient { Range: `bytes=${written}-${Math.min(size - 1, written + chunkSize)}` } }) + if (response.status === 404) { + i = 5 + // No file, so make it empty + throw new Error(`No file for ${this.transactorAPIUrl}/${name}`) + } const chunk = Buffer.from(await response.arrayBuffer()) - chunks.push(chunk) + await new Promise((resolve, reject) => { + writable.write(chunk, (err) => { + if (err != null) { + reject(err) + } + resolve() + }) + }) + written += chunk.length if (size > 1024 * 1024) { - console.log('Downloaded', Math.round(written / (1024 * 1024)), 'Mb of', Math.round(size / (1024 * 1024))) + ctx.info('Downloaded', { + name, + written: Math.round(written / (1024 * 1024)), + of: Math.round(size / (1024 * 1024)) + }) } break } catch (err: any) { - if (i === 4) { - console.error(err) + if (i > 4) { + writable.end() throw err } // retry } } } - return Buffer.concat(chunks) + writable.end() } async upload (name: string, size: number, contentType: string, buffer: Buffer): Promise { diff --git a/server/ws/src/server_http.ts b/server/ws/src/server_http.ts index 33788dd977..92f9bd5de0 100644 --- a/server/ws/src/server_http.ts +++ b/server/ws/src/server_http.ts @@ -419,12 +419,18 @@ function createWebsocketClientSocket ( setImmediate(doSend) return } - ws.send(smsg, { binary: true, compress: compression }, (err) => { - if (err != null) { - reject(err) + try { + ws.send(smsg, { binary: true, compress: compression }, (err) => { + if (err != null) { + reject(err) + } + resolve() + }) + } catch (err: any) { + if (err.message !== 'WebSocket is not open') { + ctx.error('send error', { err }) } - resolve() - }) + } } doSend() })