UBERF-7519: Rework backup service (#6050)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-07-11 20:45:08 +07:00 committed by GitHub
parent b14cc90039
commit 987b8a8068
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 502 additions and 263 deletions

View File

@ -167,7 +167,7 @@ class ClientImpl implements AccountClient, BackupClient {
}
}
} catch (err) {
console.error('failed to apply model transaction, skipping', t)
// console.error('failed to apply model transaction, skipping', t)
continue
}
}
@ -471,7 +471,11 @@ async function buildModel (
try {
hierarchy.tx(tx)
} catch (err: any) {
console.error('failed to apply model transaction, skipping', tx._id, tx._class, err?.message)
ctx.warn('failed to apply model transaction, skipping', {
_id: tx._id,
_class: tx._class,
message: err?.message
})
}
}
})

View File

@ -92,11 +92,9 @@ export class MeasureMetricsContext implements MeasureContext {
if (value instanceof Promise) {
value = await value
}
c.end()
return value
} catch (err: any) {
c.error('Error during:' + name, { err, ...(this.logParams ?? {}) })
throw err
} finally {
c.end()
}
}

View File

@ -49,6 +49,8 @@
"@hcengineering/platform": "^0.6.11",
"@hcengineering/server-tool": "^0.6.0",
"@hcengineering/server-token": "^0.6.11",
"@hcengineering/server-core": "^0.6.1",
"@hcengineering/server-pipeline": "^0.6.0",
"@hcengineering/client": "^0.6.18",
"@hcengineering/client-resources": "^0.6.27",
"@hcengineering/core": "^0.6.32",

View File

@ -13,8 +13,37 @@
// limitations under the License.
//
import { MeasureMetricsContext } from '@hcengineering/core'
import { startBackup } from '@hcengineering/backup-service'
import { MeasureMetricsContext } from '@hcengineering/core'
import { DummyDbAdapter, DummyFullTextAdapter, type PipelineFactory } from '@hcengineering/server-core'
import { createServerPipeline } from '@hcengineering/server-pipeline'
const ctx = new MeasureMetricsContext('backup-service', {})
startBackup(ctx)
startBackup(ctx, (mongoUrl, storageAdapter) => {
const factory: PipelineFactory = createServerPipeline(
ctx,
mongoUrl,
{
externalStorage: storageAdapter,
fullTextUrl: '',
indexParallel: 0,
indexProcessing: 0,
rekoniUrl: '',
usePassedCtx: true
},
{
adapters: {
FullTextBlob: {
factory: async () => new DummyDbAdapter(),
url: ''
}
},
fulltextAdapter: {
factory: async () => new DummyFullTextAdapter(),
stages: () => [],
url: ''
}
}
)
return factory
})

View File

@ -24,6 +24,10 @@ interface Config extends Omit<BackupConfig, 'Token'> {
Interval: number // Timeout in seconds
Timeout: number // Timeout in seconds
BucketName: string
Storage: string // A bucket storage config
WorkspaceStorage: string // A workspace storage config
SkipWorkspaces: string
MongoURL: string
}
@ -36,10 +40,22 @@ const envMap: { [key in keyof Config]: string } = {
BucketName: 'BUCKET_NAME',
Interval: 'INTERVAL',
Timeout: 'TIMEOUT',
MongoURL: 'MONGO_URL'
MongoURL: 'MONGO_URL',
SkipWorkspaces: 'SKIP_WORKSPACES',
Storage: 'STORAGE',
WorkspaceStorage: 'WORKSPACE_STORAGE'
}
const required: Array<keyof Config> = ['TransactorURL', 'AccountsURL', 'Secret', 'ServiceID', 'BucketName', 'MongoURL']
const required: Array<keyof Config> = [
'TransactorURL',
'AccountsURL',
'Secret',
'ServiceID',
'BucketName',
'MongoURL',
'Storage',
'WorkspaceStorage'
]
const config: Config = (() => {
const params: Partial<Config> = {
@ -50,7 +66,10 @@ const config: Config = (() => {
ServiceID: process.env[envMap.ServiceID] ?? 'backup-service',
Interval: parseInt(process.env[envMap.Interval] ?? '3600'),
Timeout: parseInt(process.env[envMap.Timeout] ?? '3600'),
MongoURL: process.env[envMap.MongoURL]
MongoURL: process.env[envMap.MongoURL],
SkipWorkspaces: process.env[envMap.SkipWorkspaces] ?? '',
WorkspaceStorage: process.env[envMap.WorkspaceStorage],
Storage: process.env[envMap.Storage]
}
const missingEnv = required.filter((key) => params[key] === undefined).map((key) => envMap[key])

View File

@ -16,23 +16,32 @@
import { MeasureContext, systemAccountEmail } from '@hcengineering/core'
import { setMetadata } from '@hcengineering/platform'
import { backupService } from '@hcengineering/server-backup'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import { type PipelineFactory, type StorageAdapter } from '@hcengineering/server-core'
import { buildStorageFromConfig, storageConfigFromEnv, createStorageFromConfig } from '@hcengineering/server-storage'
import serverToken, { generateToken } from '@hcengineering/server-token'
import toolPlugin from '@hcengineering/server-tool'
import config from './config'
export function startBackup (ctx: MeasureContext): void {
export function startBackup (
ctx: MeasureContext,
pipelineFactoryFactory: (mongoUrl: string, storage: StorageAdapter) => PipelineFactory
): void {
setMetadata(serverToken.metadata.Secret, config.Secret)
const storageConfiguration = storageConfigFromEnv()
const storageAdapter = buildStorageFromConfig(storageConfiguration, config.MongoURL)
const backupStorageConfig = storageConfigFromEnv(config.Storage)
const workspaceStorageConfig = storageConfigFromEnv(config.WorkspaceStorage)
const storageAdapter = createStorageFromConfig(backupStorageConfig.storages[0])
const workspaceStorageAdapter = buildStorageFromConfig(workspaceStorageConfig, config.MongoURL)
setMetadata(toolPlugin.metadata.UserAgent, config.ServiceID)
const pipelineFactory = pipelineFactoryFactory(config.MongoURL, workspaceStorageAdapter)
// A token to access account service
const token = generateToken(systemAccountEmail, { name: 'backup', productId: '' })
const shutdown = backupService(ctx, storageAdapter, { ...config, Token: token })
const shutdown = backupService(ctx, storageAdapter, { ...config, Token: token }, pipelineFactory)
process.on('SIGINT', shutdown)
process.on('SIGTERM', shutdown)

View File

@ -503,6 +503,8 @@ export async function backup (
connectTimeout: number
skipBlobContentTypes: string[]
blobDownloadLimit: number
connection?: CoreClient & BackupClient
storageAdapter?: StorageAdapter
} = {
force: false,
recheck: false,
@ -529,18 +531,20 @@ export async function backup (
canceled = true
}, options.timeout)
}
const connection = (await connect(
transactorUrl,
workspaceId,
undefined,
{
mode: 'backup'
},
undefined,
options.connectTimeout
)) as unknown as CoreClient & BackupClient
const connection =
options.connection ??
((await connect(
transactorUrl,
workspaceId,
undefined,
{
mode: 'backup'
},
undefined,
options.connectTimeout
)) as unknown as CoreClient & BackupClient)
const blobClient = new BlobClient(transactorUrl, workspaceId)
const blobClient = new BlobClient(transactorUrl, workspaceId, { storageAdapter: options.storageAdapter })
ctx.info('starting backup', { workspace: workspaceId.name })
let tmpDir: string | undefined
@ -897,14 +901,15 @@ export async function backup (
ctx.error('error packing file', { err })
}
})
// if (blob.size > 1024 * 1024) {
ctx.info('download blob', {
_id: blob._id,
contentType: blob.contentType,
size: blob.size,
provider: blob.provider,
pending: docs.length
})
if (blob.size > 1024 * 1024) {
ctx.info('download blob', {
_id: blob._id,
contentType: blob.contentType,
size: blob.size,
provider: blob.provider,
pending: docs.length
})
}
printDownloaded(blob._id, blob.size)
} catch (err: any) {
@ -970,7 +975,9 @@ export async function backup (
ctx.error('backup error', { err, workspace: workspaceId.name })
} finally {
ctx.info('end backup', { workspace: workspaceId.name })
await connection.close()
if (options.connection === undefined) {
await connection.close()
}
ctx.end()
if (options.timeout !== -1) {
clearTimeout(timer)

View File

@ -13,8 +13,22 @@
// limitations under the License.
//
import { BaseWorkspaceInfo, getWorkspaceId, type MeasureContext } from '@hcengineering/core'
import { type StorageAdapter } from '@hcengineering/server-core'
import {
BaseWorkspaceInfo,
getWorkspaceId,
systemAccountEmail,
type BackupClient,
type Client,
type MeasureContext,
type WorkspaceIdWithUrl
} from '@hcengineering/core'
import {
BackupClientOps,
SessionContextImpl,
type Pipeline,
type PipelineFactory,
type StorageAdapter
} from '@hcengineering/server-core'
import { backup } from '.'
import { createStorageBackupStorage } from './storage'
@ -43,12 +57,14 @@ export interface BackupConfig {
Interval: number // Timeout in seconds
Timeout: number // Timeout in seconds
BucketName: string
SkipWorkspaces: string
}
class BackupWorker {
constructor (
readonly storageAdapter: StorageAdapter,
readonly config: BackupConfig
readonly config: BackupConfig,
readonly pipelineFactory: PipelineFactory
) {}
canceled = false
@ -59,29 +75,38 @@ class BackupWorker {
clearTimeout(this.interval)
}
backupPromise: Promise<void> | undefined
async triggerBackup (ctx: MeasureContext): Promise<void> {
const failed = await this.backup(ctx)
if (failed.length > 0) {
ctx.info('Failed to backup workspaces, Retry failed workspaces once.', { failed: failed.length })
await this.doBackup(ctx, failed)
}
}
async schedule (ctx: MeasureContext): Promise<void> {
console.log('schedule timeout for', this.config.Interval, ' seconds')
this.interval = setTimeout(() => {
void this.backup(ctx).then((failed) => {
if (failed.length > 0) {
ctx.info('Failed to backup workspaces, Retry failed workspaces once.', { failed: failed.length })
void this.doBackup(ctx, failed).then(() => {
void this.schedule(ctx)
})
} else {
void this.schedule(ctx)
}
})
if (this.backupPromise !== undefined) {
void this.backupPromise.then(() => {
void this.triggerBackup(ctx)
})
}
void this.triggerBackup(ctx)
}, this.config.Interval * 1000)
}
async backup (ctx: MeasureContext): Promise<BaseWorkspaceInfo[]> {
const workspaces = await getWorkspaces(this.config.AccountsURL, this.config.Token)
const workspacesIgnore = new Set(this.config.SkipWorkspaces.split(';'))
const workspaces = (await getWorkspaces(this.config.AccountsURL, this.config.Token)).filter((it) => {
return !workspacesIgnore.has(it.workspace)
})
workspaces.sort((a, b) => b.lastVisit - a.lastVisit)
return await this.doBackup(ctx, workspaces)
}
async doBackup (ctx: MeasureContext, workspaces: BaseWorkspaceInfo[]): Promise<BaseWorkspaceInfo[]> {
async doBackup (rootCtx: MeasureContext, workspaces: BaseWorkspaceInfo[]): Promise<BaseWorkspaceInfo[]> {
let index = 0
const failedWorkspaces: BaseWorkspaceInfo[] = []
@ -90,12 +115,17 @@ class BackupWorker {
return failedWorkspaces
}
index++
ctx.info('\n\nBACKUP WORKSPACE ', {
rootCtx.info('\n\nBACKUP WORKSPACE ', {
workspace: ws.workspace,
productId: ws.productId,
index,
total: workspaces.length
})
const childLogger = rootCtx.logger.childLogger?.(ws.workspace, {
workspace: ws.workspace,
enableConsole: 'false'
})
const ctx = rootCtx.newChild(ws.workspace, { workspace: ws.workspace }, {}, childLogger)
try {
const storage = await createStorageBackupStorage(
ctx,
@ -103,6 +133,13 @@ class BackupWorker {
getWorkspaceId(this.config.BucketName, ws.productId),
ws.workspace
)
const wsUrl: WorkspaceIdWithUrl = {
name: ws.workspace,
productId: ws.productId,
workspaceName: ws.workspaceName ?? '',
workspaceUrl: ws.workspaceUrl ?? ''
}
const pipeline = await this.pipelineFactory(ctx, wsUrl, true, () => {}, null)
await ctx.with('backup', { workspace: ws.workspace }, async (ctx) => {
await backup(ctx, this.config.TransactorURL, getWorkspaceId(ws.workspace, ws.productId), storage, {
skipDomains: [],
@ -111,20 +148,84 @@ class BackupWorker {
timeout: this.config.Timeout * 1000,
connectTimeout: 5 * 60 * 1000, // 5 minutes to,
blobDownloadLimit: 100,
skipBlobContentTypes: []
skipBlobContentTypes: [],
storageAdapter: pipeline.storage.storageAdapter,
connection: this.wrapPipeline(ctx, pipeline, wsUrl)
})
})
} catch (err: any) {
ctx.error('\n\nFAILED to BACKUP', { workspace: ws.workspace, err })
rootCtx.error('\n\nFAILED to BACKUP', { workspace: ws.workspace, err })
failedWorkspaces.push(ws)
await childLogger?.close()
}
}
return failedWorkspaces
}
wrapPipeline (ctx: MeasureContext, pipeline: Pipeline, wsUrl: WorkspaceIdWithUrl): Client & BackupClient {
const sctx = new SessionContextImpl(
ctx,
systemAccountEmail,
'backup',
true,
{ targets: {}, txes: [] },
wsUrl,
null,
false
)
const backupOps = new BackupClientOps(pipeline)
return {
findAll: async (_class, query, options) => {
return await pipeline.findAll(sctx, _class, query, options)
},
findOne: async (_class, query, options) => {
return (await pipeline.findAll(sctx, _class, query, { ...options, limit: 1 })).shift()
},
clean: async (domain, docs) => {
await backupOps.clean(ctx, domain, docs)
},
close: async () => {},
closeChunk: async (idx) => {
await backupOps.closeChunk(ctx, idx)
},
getHierarchy: () => {
return pipeline.storage.hierarchy
},
getModel: () => {
return pipeline.storage.modelDb
},
loadChunk: async (domain, idx, recheck) => {
return await backupOps.loadChunk(ctx, domain, idx, recheck)
},
loadDocs: async (domain, docs) => {
return await backupOps.loadDocs(ctx, domain, docs)
},
upload: async (domain, docs) => {
await backupOps.upload(ctx, domain, docs)
},
searchFulltext: async (query, options) => {
return {
docs: [],
total: 0
}
},
sendForceClose: async () => {},
tx: async (tx) => {
return {}
},
notify: (...tx) => {}
}
}
}
export function backupService (ctx: MeasureContext, storage: StorageAdapter, config: BackupConfig): () => void {
const backupWorker = new BackupWorker(storage, config)
export function backupService (
ctx: MeasureContext,
storage: StorageAdapter,
config: BackupConfig,
pipelineFactory: PipelineFactory
): () => void {
const backupWorker = new BackupWorker(storage, config, pipelineFactory)
const shutdown = (): void => {
void backupWorker.close()

View File

@ -51,7 +51,8 @@ export async function createPipeline (
constructors: MiddlewareCreator[],
upgrade: boolean,
broadcast: BroadcastFunc,
branding: Branding | null
branding: Branding | null,
disableTriggers?: boolean
): Promise<Pipeline> {
const broadcastHandlers: BroadcastFunc[] = [broadcast]
const _broadcast: BroadcastFunc = (
@ -68,7 +69,8 @@ export async function createPipeline (
await createServerStorage(ctx, conf, {
upgrade,
broadcast: _broadcast,
branding
branding,
disableTriggers
})
)
const pipelineResult = await PipelineImpl.create(ctx.newChild('pipeline-operations', {}), storage, constructors)

View File

@ -400,11 +400,11 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
export function buildStorage (
config: StorageConfiguration,
dbAdapter: RawDBAdapter,
storageFactory: (kind: string, config: StorageConfig) => StorageAdapter
storageFactory: (config: StorageConfig) => StorageAdapter
): AggregatorStorageAdapter {
const adapters = new Map<string, StorageAdapter>()
for (const c of config.storages) {
adapters.set(c.name, storageFactory(c.kind, c))
adapters.set(c.name, storageFactory(c))
}
return new AggregatorStorageAdapter(adapters, config.default, dbAdapter)
}

View File

@ -77,9 +77,11 @@ export async function createServerStorage (
for (const tx of model) {
try {
hierarchy.tx(tx)
await triggers.tx(tx)
if (options.disableTriggers !== true) {
await triggers.tx(tx)
}
} catch (err: any) {
console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err)
ctx.warn('failed to apply model transaction, skipping', { tx: JSON.stringify(tx), err })
}
}
modelDb.addTxes(ctx, model, false)

View File

@ -1,4 +1,15 @@
import { type WorkspaceId, toWorkspaceString } from '@hcengineering/core'
import {
toWorkspaceString,
type Doc,
type DocInfo,
type Domain,
type MeasureContext,
type Ref,
type StorageIterator,
type WorkspaceId
} from '@hcengineering/core'
import type { Pipeline } from './types'
import { estimateDocSize } from './utils'
export * from '@hcengineering/storage'
@ -8,3 +19,95 @@ export * from '@hcengineering/storage'
export function getBucketId (workspaceId: WorkspaceId): string {
return toWorkspaceString(workspaceId, '.')
}
const chunkSize = 2 * 1024 * 1024
/**
* @public
*/
export interface ChunkInfo {
idx: number
index: 0
finished: boolean
iterator: StorageIterator
}
/**
* @public
*/
export class BackupClientOps {
constructor (protected readonly _pipeline: Pipeline) {}
idIndex = 0
chunkInfo = new Map<number, ChunkInfo>()
async loadChunk (
ctx: MeasureContext,
domain: Domain,
idx?: number,
recheck?: boolean
): Promise<{
idx: number
docs: DocInfo[]
finished: boolean
}> {
return await ctx.with('load-chunk', { domain }, async (ctx) => {
idx = idx ?? this.idIndex++
let chunk: ChunkInfo | undefined = this.chunkInfo.get(idx)
if (chunk !== undefined) {
chunk.index++
if (chunk.finished === undefined || chunk.finished) {
return {
idx,
docs: [],
finished: true
}
}
} else {
chunk = { idx, iterator: this._pipeline.storage.find(ctx, domain, recheck), finished: false, index: 0 }
this.chunkInfo.set(idx, chunk)
}
let size = 0
const docs: DocInfo[] = []
while (size < chunkSize) {
const doc = await chunk.iterator.next(ctx)
if (doc === undefined) {
chunk.finished = true
break
}
size += estimateDocSize(doc)
docs.push(doc)
}
return {
idx,
docs,
finished: chunk.finished
}
})
}
async closeChunk (ctx: MeasureContext, idx: number): Promise<void> {
await ctx.with('close-chunk', {}, async () => {
const chunk = this.chunkInfo.get(idx)
this.chunkInfo.delete(idx)
if (chunk != null) {
await chunk.iterator.close(ctx)
}
})
}
async loadDocs (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
return await this._pipeline.storage.load(ctx, domain, docs)
}
async upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise<void> {
await this._pipeline.storage.upload(ctx, domain, docs)
}
async clean (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<void> {
await this._pipeline.storage.clean(ctx, domain, docs)
}
}

View File

@ -137,6 +137,17 @@ export interface Pipeline extends LowLevelStorage {
close: () => Promise<void>
}
/**
* @public
*/
export type PipelineFactory = (
ctx: MeasureContext,
ws: WorkspaceIdWithUrl,
upgrade: boolean,
broadcast: BroadcastFunc,
branding: Branding | null
) => Promise<Pipeline>
/**
* @public
*/
@ -448,6 +459,8 @@ export interface ServerStorageOptions {
broadcast: BroadcastFunc
branding: Branding | null
disableTriggers?: boolean
}
export interface ServiceAdapter {

View File

@ -6,7 +6,6 @@ import {
DOMAIN_MODEL,
DOMAIN_TRANSIENT,
DOMAIN_TX,
type BrandingMap,
type MeasureContext
} from '@hcengineering/core'
import { createElasticAdapter, createElasticBackupDataAdapter } from '@hcengineering/elastic'
@ -33,10 +32,9 @@ import {
createPipeline,
type DbConfiguration,
type MiddlewareCreator,
type StorageAdapter,
type StorageConfiguration
type PipelineFactory,
type StorageAdapter
} from '@hcengineering/server-core'
import { type PipelineFactory, type ServerFactory } from '@hcengineering/server-ws'
import { createIndexStages } from './indexing'
/**
@ -48,15 +46,11 @@ export function createServerPipeline (
dbUrl: string,
opt: {
fullTextUrl: string
storageConfig: StorageConfiguration
rekoniUrl: string
port: number
productId: string
brandingMap: BrandingMap
serverFactory: ServerFactory
indexProcessing: number // 1000
indexParallel: number // 2
disableTriggers?: boolean
usePassedCtx?: boolean
externalStorage: StorageAdapter
},
@ -73,7 +67,8 @@ export function createServerPipeline (
QueryJoinMiddleware.create
]
return (ctx, workspace, upgrade, broadcast, branding) => {
const wsMetrics = metrics.newChild('🧲 session', {})
const metricsCtx = opt.usePassedCtx === true ? ctx : metrics
const wsMetrics = metricsCtx.newChild('🧲 session', {})
const conf: DbConfiguration = {
domains: {
[DOMAIN_TX]: 'MongoTx',
@ -151,6 +146,6 @@ export function createServerPipeline (
storageFactory: opt.externalStorage,
workspace
}
return createPipeline(ctx, conf, middlewares, upgrade, broadcast, branding)
return createPipeline(ctx, conf, middlewares, upgrade, broadcast, branding, opt.disableTriggers)
}
}

View File

@ -65,10 +65,7 @@ class StorageBlobAdapter implements DbAdapter {
}
find (ctx: MeasureContext, domain: Domain, recheck?: boolean): StorageIterator {
if (recheck === true) {
return (this.client as StorageAdapterEx).find(ctx, this.workspaceId)
}
return this.blobAdapter.find(ctx, domain, recheck)
return (this.client as StorageAdapterEx).find(ctx, this.workspaceId)
}
async load (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {

View File

@ -29,10 +29,10 @@ import {
*/
export function storageConfigFromEnv (): StorageConfiguration {
export function storageConfigFromEnv (configEnv: string = 'STORAGE_CONFIG'): StorageConfiguration {
const storageConfig: StorageConfiguration = { default: '', storages: [] }
const storageEnv = process.env.STORAGE_CONFIG
const storageEnv = process.env[configEnv]
if (storageEnv !== undefined) {
parseStorageEnv(storageEnv, storageConfig)
}
@ -83,22 +83,25 @@ export function parseStorageEnv (storageEnv: string, storageConfig: StorageConfi
}
}
export function buildStorageFromConfig (config: StorageConfiguration, dbUrl: string): AggregatorStorageAdapter {
return buildStorage(config, createRawMongoDBAdapter(dbUrl), (kind, config): StorageAdapter => {
if (kind === MinioService.config) {
const c = config as MinioConfig
if (c.endpoint == null || c.accessKey == null || c.secretKey == null) {
throw new Error('One of endpoint/accessKey/secretKey values are not specified')
}
return new MinioService(c)
} else if (kind === S3Service.config) {
const c = config as S3Config
if (c.endpoint == null || c.accessKey == null || c.secretKey == null) {
throw new Error('One of endpoint/accessKey/secretKey values are not specified')
}
return new S3Service(c)
} else {
throw new Error('Unsupported storage kind:' + kind)
export function createStorageFromConfig (config: StorageConfig): StorageAdapter {
const kind = config.kind
if (kind === MinioService.config) {
const c = config as MinioConfig
if (c.endpoint == null || c.accessKey == null || c.secretKey == null) {
throw new Error('One of endpoint/accessKey/secretKey values are not specified')
}
})
return new MinioService(c)
} else if (kind === S3Service.config) {
const c = config as S3Config
if (c.endpoint == null || c.accessKey == null || c.secretKey == null) {
throw new Error('One of endpoint/accessKey/secretKey values are not specified')
}
return new S3Service(c)
} else {
throw new Error('Unsupported storage kind:' + kind)
}
}
export function buildStorageFromConfig (config: StorageConfiguration, dbUrl: string): AggregatorStorageAdapter {
return buildStorage(config, createRawMongoDBAdapter(dbUrl), createStorageFromConfig)
}

View File

@ -1,20 +1,8 @@
import { Doc, DocInfo, Domain, Ref, StorageIterator } from '@hcengineering/core'
import { Pipeline, estimateDocSize } from '@hcengineering/server-core'
import { Doc, Domain, Ref } from '@hcengineering/core'
import { BackupClientOps, Pipeline } from '@hcengineering/server-core'
import { Token } from '@hcengineering/server-token'
import { ClientSession, Session, type ClientSessionCtx } from '@hcengineering/server-ws'
const chunkSize = 2 * 1024 * 1024
/**
* @public
*/
export interface ChunkInfo {
idx: number
index: 0
finished: boolean
iterator: StorageIterator
}
/**
* @public
*/
@ -28,76 +16,35 @@ export interface BackupSession extends Session {
* @public
*/
export class BackupClientSession extends ClientSession implements BackupSession {
ops: BackupClientOps
constructor (
protected readonly token: Token,
protected readonly _pipeline: Pipeline
_pipeline: Pipeline
) {
super(token, _pipeline)
this.ops = new BackupClientOps(_pipeline)
}
idIndex = 0
chunkInfo = new Map<number, ChunkInfo>()
async loadChunk (_ctx: ClientSessionCtx, domain: Domain, idx?: number, recheck?: boolean): Promise<void> {
this.lastRequest = Date.now()
await _ctx.ctx.with('load-chunk', { domain }, async (ctx) => {
try {
idx = idx ?? this.idIndex++
let chunk: ChunkInfo | undefined = this.chunkInfo.get(idx)
if (chunk !== undefined) {
chunk.index++
if (chunk.finished === undefined) {
return {
idx,
docs: [],
finished: true
}
}
} else {
chunk = { idx, iterator: this._pipeline.storage.find(ctx, domain, recheck), finished: false, index: 0 }
this.chunkInfo.set(idx, chunk)
}
let size = 0
const docs: DocInfo[] = []
while (size < chunkSize) {
const doc = await chunk.iterator.next(ctx)
if (doc === undefined) {
chunk.finished = true
break
}
size += estimateDocSize(doc)
docs.push(doc)
}
await _ctx.sendResponse({
idx,
docs,
finished: chunk.finished
})
} catch (err: any) {
await _ctx.sendResponse({ error: err.message })
}
})
try {
const result = await this.ops.loadChunk(_ctx.ctx, domain)
await _ctx.sendResponse(result)
} catch (err: any) {
await _ctx.sendResponse({ error: err.message })
}
}
async closeChunk (ctx: ClientSessionCtx, idx: number): Promise<void> {
this.lastRequest = Date.now()
await ctx.ctx.with('close-chunk', {}, async () => {
const chunk = this.chunkInfo.get(idx)
this.chunkInfo.delete(idx)
if (chunk != null) {
await chunk.iterator.close(ctx.ctx)
}
await ctx.sendResponse({})
})
await this.ops.closeChunk(ctx.ctx, idx)
await ctx.sendResponse({})
}
async loadDocs (ctx: ClientSessionCtx, domain: Domain, docs: Ref<Doc>[]): Promise<void> {
this.lastRequest = Date.now()
try {
const result = await this._pipeline.storage.load(ctx.ctx, domain, docs)
const result = await this.ops.loadDocs(ctx.ctx, domain, docs)
await ctx.sendResponse(result)
} catch (err: any) {
await ctx.sendResponse({ error: err.message })
@ -107,7 +54,7 @@ export class BackupClientSession extends ClientSession implements BackupSession
async upload (ctx: ClientSessionCtx, domain: Domain, docs: Doc[]): Promise<void> {
this.lastRequest = Date.now()
try {
await this._pipeline.storage.upload(ctx.ctx, domain, docs)
await this.ops.upload(ctx.ctx, domain, docs)
} catch (err: any) {
await ctx.sendResponse({ error: err.message })
return
@ -118,7 +65,7 @@ export class BackupClientSession extends ClientSession implements BackupSession
async clean (ctx: ClientSessionCtx, domain: Domain, docs: Ref<Doc>[]): Promise<void> {
this.lastRequest = Date.now()
try {
await this._pipeline.storage.clean(ctx.ctx, domain, docs)
await this.ops.clean(ctx.ctx, domain, docs)
} catch (err: any) {
await ctx.sendResponse({ error: err.message })
return

View File

@ -24,6 +24,7 @@ import {
type MeasureContext
} from '@hcengineering/core'
import { addLocation, getMetadata, getResource, setMetadata } from '@hcengineering/platform'
import type { StorageAdapter } from '@hcengineering/server-core'
import { generateToken } from '@hcengineering/server-token'
import crypto from 'node:crypto'
import plugin from './plugin'
@ -92,11 +93,14 @@ export class BlobClient {
constructor (
readonly transactorUrl: string,
readonly workspace: WorkspaceId,
email?: string,
extra?: Record<string, string>
readonly opt?: {
email?: string
extra?: Record<string, string>
storageAdapter?: StorageAdapter
}
) {
this.index = 0
this.token = generateToken(email ?? systemAccountEmail, workspace, extra)
this.token = generateToken(opt?.email ?? systemAccountEmail, workspace, opt?.extra)
let url = transactorUrl
if (url.endsWith('/')) {
url = url.slice(0, url.length - 1)
@ -106,6 +110,12 @@ export class BlobClient {
}
async checkFile (ctx: MeasureContext, name: string): Promise<boolean> {
if (this.opt?.storageAdapter !== undefined) {
const obj = await this.opt?.storageAdapter.stat(ctx, this.workspace, name)
if (obj !== undefined) {
return true
}
}
for (let i = 0; i < 5; i++) {
try {
const response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, {
@ -149,51 +159,66 @@ export class BlobClient {
for (; i < 5; i++) {
try {
const st = Date.now()
let chunk: Buffer
const header: Record<string, string> = {
Authorization: 'Bearer ' + this.token
}
if (!(size !== -1 && written === 0 && size < chunkSize)) {
header.Range = `bytes=${written}-${size === -1 ? written + chunkSize : Math.min(size - 1, written + chunkSize)}`
}
response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, { headers: header })
if (header.Range != null) {
ctx.info('fetch part', { time: Date.now() - st, blobId: name, written, size })
}
if (response.status === 403) {
i = 5
// No file, so make it empty
throw new Error(`Unauthorized ${this.transactorAPIUrl}/${this.workspace.name}/${name}`)
}
if (response.status === 404) {
i = 5
// No file, so make it empty
throw new Error(`No file for ${this.transactorAPIUrl}/${this.workspace.name}/${name}`)
}
if (response.status === 416) {
if (size === -1) {
size = parseInt((response.headers.get('content-range') ?? '').split('*/')[1])
continue
if (this.opt?.storageAdapter !== undefined) {
const chunks: Buffer[] = []
const readable = await this.opt.storageAdapter.partial(ctx, this.workspace, name, written, chunkSize)
await new Promise<void>((resolve) => {
readable.on('data', (chunk) => {
chunks.push(chunk)
})
readable.on('end', () => {
resolve()
})
})
chunk = Buffer.concat(chunks)
} else {
const header: Record<string, string> = {
Authorization: 'Bearer ' + this.token
}
// No file, so make it empty
throw new Error(`No file for ${this.transactorAPIUrl}/${this.workspace.name}/${name}`)
}
const chunk = Buffer.from(await response.arrayBuffer())
if (!(size !== -1 && written === 0 && size < chunkSize)) {
header.Range = `bytes=${written}-${size === -1 ? written + chunkSize : Math.min(size - 1, written + chunkSize)}`
}
if (header.Range == null) {
size = chunk.length
}
// We need to parse
// 'Content-Range': `bytes ${start}-${end}/${size}`
// To determine if something is left
const range = response.headers.get('Content-Range')
if (range !== null) {
const [, total] = range.split(' ')[1].split('/')
if (total !== undefined) {
size = parseInt(total)
response = await fetch(this.transactorAPIUrl + `?name=${encodeURIComponent(name)}`, { headers: header })
if (header.Range != null) {
ctx.info('fetch part', { time: Date.now() - st, blobId: name, written, size })
}
if (response.status === 403) {
i = 5
// No file, so make it empty
throw new Error(`Unauthorized ${this.transactorAPIUrl}/${this.workspace.name}/${name}`)
}
if (response.status === 404) {
i = 5
// No file, so make it empty
throw new Error(`No file for ${this.transactorAPIUrl}/${this.workspace.name}/${name}`)
}
if (response.status === 416) {
if (size === -1) {
size = parseInt((response.headers.get('content-range') ?? '').split('*/')[1])
continue
}
// No file, so make it empty
throw new Error(`No file for ${this.transactorAPIUrl}/${this.workspace.name}/${name}`)
}
chunk = Buffer.from(await response.arrayBuffer())
if (header.Range == null) {
size = chunk.length
}
// We need to parse
// 'Content-Range': `bytes ${start}-${end}/${size}`
// To determine if something is left
const range = response.headers.get('Content-Range')
if (range !== null) {
const [, total] = range.split(' ')[1].split('/')
if (total !== undefined) {
size = parseInt(total)
}
}
}
@ -219,6 +244,10 @@ export class BlobClient {
}
break
} catch (err: any) {
if (err?.code === 'NoSuchKey') {
ctx.info('No such key', { name })
return
}
if (i > 4) {
await new Promise<void>((resolve) => {
writable.end(resolve)
@ -236,26 +265,30 @@ export class BlobClient {
}
async upload (ctx: MeasureContext, name: string, size: number, contentType: string, buffer: Buffer): Promise<void> {
// TODO: We need to improve this logig, to allow restore of huge blobs
for (let i = 0; i < 5; i++) {
try {
await fetch(
this.transactorAPIUrl +
`?name=${encodeURIComponent(name)}&contentType=${encodeURIComponent(contentType)}&size=${size}`,
{
method: 'PUT',
headers: {
Authorization: 'Bearer ' + this.token,
'Content-Type': contentType
},
body: buffer
if (this.opt?.storageAdapter !== undefined) {
await this.opt.storageAdapter.put(ctx, this.workspace, name, buffer, contentType, size)
} else {
// TODO: We need to improve this logig, to allow restore of huge blobs
for (let i = 0; i < 5; i++) {
try {
await fetch(
this.transactorAPIUrl +
`?name=${encodeURIComponent(name)}&contentType=${encodeURIComponent(contentType)}&size=${size}`,
{
method: 'PUT',
headers: {
Authorization: 'Bearer ' + this.token,
'Content-Type': contentType
},
body: buffer
}
)
break
} catch (err: any) {
if (i === 4) {
ctx.error('failed to upload file', { name })
throw err
}
)
break
} catch (err: any) {
if (i === 4) {
ctx.error('failed to upload file', { name })
throw err
}
}
}

View File

@ -32,14 +32,13 @@ import core, {
} from '@hcengineering/core'
import { unknownError, type Status } from '@hcengineering/platform'
import { type HelloRequest, type HelloResponse, type Request, type Response } from '@hcengineering/rpc'
import type { Pipeline, StorageAdapter } from '@hcengineering/server-core'
import type { Pipeline, StorageAdapter, PipelineFactory } from '@hcengineering/server-core'
import { type Token } from '@hcengineering/server-token'
import {
LOGGING_ENABLED,
type ClientSessionCtx,
type ConnectionSocket,
type PipelineFactory,
type ServerFactory,
type Session,
type SessionManager,

View File

@ -25,15 +25,9 @@ import http, { type IncomingMessage } from 'http'
import os from 'os'
import { WebSocketServer, type RawData, type WebSocket } from 'ws'
import { getStatistics, wipeStatistics } from './stats'
import {
LOGGING_ENABLED,
type ConnectionSocket,
type HandleRequestFunction,
type PipelineFactory,
type SessionManager
} from './types'
import { LOGGING_ENABLED, type ConnectionSocket, type HandleRequestFunction, type SessionManager } from './types'
import type { StorageAdapter } from '@hcengineering/server-core'
import { type StorageAdapter, type PipelineFactory } from '@hcengineering/server-core'
import 'bufferutil'
import 'utf-8-validate'
import { getFile, getFileRange, type BlobResponse } from './blobs'

View File

@ -22,15 +22,9 @@ import { decodeToken } from '@hcengineering/server-token'
import { Analytics } from '@hcengineering/analytics'
import { RPCHandler } from '@hcengineering/rpc'
import { getStatistics, wipeStatistics } from './stats'
import {
LOGGING_ENABLED,
type ConnectionSocket,
type HandleRequestFunction,
type PipelineFactory,
type SessionManager
} from './types'
import { LOGGING_ENABLED, type ConnectionSocket, type HandleRequestFunction, type SessionManager } from './types'
import type { StorageAdapter } from '@hcengineering/server-core'
import { type StorageAdapter, type PipelineFactory } from '@hcengineering/server-core'
import uWebSockets, { DISABLED, SHARED_COMPRESSOR, type HttpResponse, type WebSocket } from '@hcengineering/uws'
import { Readable } from 'stream'
import { getFile, getFileRange, type BlobResponse } from './blobs'

View File

@ -1,4 +1,5 @@
import {
type Branding,
type Class,
type Doc,
type DocumentQuery,
@ -7,12 +8,10 @@ import {
type MeasureContext,
type Ref,
type Tx,
type WorkspaceId,
type WorkspaceIdWithUrl,
type Branding
type WorkspaceId
} from '@hcengineering/core'
import { type Request, type Response } from '@hcengineering/rpc'
import { type BroadcastFunc, type Pipeline, type StorageAdapter } from '@hcengineering/server-core'
import { type Pipeline, type PipelineFactory, type StorageAdapter } from '@hcengineering/server-core'
import { type Token } from '@hcengineering/server-token'
/**
@ -84,17 +83,6 @@ export interface Session {
getMode: () => string
}
/**
* @public
*/
export type PipelineFactory = (
ctx: MeasureContext,
ws: WorkspaceIdWithUrl,
upgrade: boolean,
broadcast: BroadcastFunc,
branding: Branding | null
) => Promise<Pipeline>
/**
* @public
*/