UBERF-8122: Fix backup service

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-09-17 02:25:31 +07:00
parent c3a41ea1bb
commit b06f433bab
No known key found for this signature in database
GPG Key ID: BD80F68D68D8F7F2
20 changed files with 287 additions and 284 deletions

7
.vscode/launch.json vendored
View File

@ -221,7 +221,7 @@
"name": "Debug backup tool",
"type": "node",
"request": "launch",
"args": ["src/index.ts", "backup", "../../../dump/alex-staff-agency2", "alex-staff-agency"],
"args": ["src/index.ts", "backup", "../../../dump/platform2", "platform"],
"env": {
"MINIO_ACCESS_KEY": "minioadmin",
"MINIO_SECRET_KEY": "minioadmin",
@ -234,7 +234,10 @@
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],
"sourceMaps": true,
"cwd": "${workspaceRoot}/dev/tool",
"protocol": "inspector"
"protocol": "inspector",
"outputCapture": "std",
"runtimeVersion": "20",
"showAsyncStacks": true,
},
{
"name": "Debug tool upgrade",

View File

@ -14,12 +14,13 @@
"_phase:bundle": "rushx bundle",
"_phase:docker-build": "rushx docker:build",
"_phase:docker-staging": "rushx docker:staging",
"bundle": "mkdir -p bundle && esbuild src/__start.ts --bundle --minify --platform=node --define:process.env.MODEL_VERSION=$(node ../../common/scripts/show_version.js) --define:process.env.GIT_REVISION=$(../../common/scripts/git_version.sh) > bundle/bundle.js",
"bundle": "mkdir -p bundle && esbuild src/__start.ts --bundle --keep-names --sourcemap=external --platform=node --define:process.env.MODEL_VERSION=$(node ../../common/scripts/show_version.js) --define:process.env.GIT_REVISION=$(../../common/scripts/git_version.sh) --log-level=error --outfile=bundle/bundle.js",
"docker:build": "../../common/scripts/docker_build.sh hardcoreeng/tool",
"docker:tbuild": "docker build -t hardcoreeng/tool . --platform=linux/amd64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/tool",
"docker:staging": "../../common/scripts/docker_tag.sh hardcoreeng/tool staging",
"docker:push": "../../common/scripts/docker_tag.sh hardcoreeng/tool",
"run-local": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3333 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost MONGO_URL=mongodb://localhost:27017 TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) node --max-old-space-size=18000 ./bundle/bundle.js",
"run-local-brk": "rush bundle --to @hcengineering/tool >/dev/null && cross-env SERVER_SECRET=secret ACCOUNTS_URL=http://localhost:3000 TRANSACTOR_URL=ws://localhost:3333 MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost MONGO_URL=mongodb://localhost:27017 TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 REKONI_URL=http://localhost:4004 MODEL_VERSION=$(node ../../common/scripts/show_version.js) GIT_REVISION=$(git describe --all --long) node --inspect-brk --enable-source-maps --max-old-space-size=18000 ./bundle/bundle.js",
"run": "rush bundle --to @hcengineering/tool >/dev/null && cross-env node --max-old-space-size=8000 ./bundle/bundle.js",
"upgrade": "rushx run-local upgrade",
"format": "format src",

View File

@ -104,14 +104,17 @@ export async function cleanWorkspace (
const minioList = await storageAdapter.listStream(ctx, workspaceId)
const toClean: string[] = []
while (true) {
const mv = await minioList.next()
if (mv === undefined) {
const mvFiles = await minioList.next()
if (mvFiles.length === 0) {
break
}
for (const mv of mvFiles) {
if (!files.has(mv._id)) {
toClean.push(mv._id)
}
}
}
await storageAdapter.remove(ctx, workspaceId, toClean)
if (opt.recruit) {
@ -192,10 +195,11 @@ export async function fixMinioBW (
const list = await storageService.listStream(ctx, workspaceId)
let removed = 0
while (true) {
const obj = await list.next()
if (obj === undefined) {
const objs = await list.next()
if (objs.length === 0) {
break
}
for (const obj of objs) {
if (obj.modifiedOn < from) continue
if ((obj._id as string).includes('%preview%')) {
await storageService.remove(ctx, workspaceId, [obj._id])
@ -205,6 +209,7 @@ export async function fixMinioBW (
}
}
}
}
console.log('FINISH, removed: ', removed)
}

View File

@ -40,9 +40,10 @@ export async function syncFiles (
const iterator = await adapter.listStream(ctx, workspaceId)
try {
while (true) {
const data = await iterator.next()
if (data === undefined) break
const dataBulk = await iterator.next()
if (dataBulk.length === 0) break
for (const data of dataBulk) {
const blob = await exAdapter.stat(ctx, workspaceId, data._id)
if (blob !== undefined) continue
@ -56,6 +57,7 @@ export async function syncFiles (
console.log('...processed', count, Math.round(duration / 1000) + 's')
}
}
}
console.log('processed', count)
} finally {
await iterator.close()
@ -112,10 +114,12 @@ async function processAdapter (
const iterator = await source.listStream(ctx, workspaceId)
try {
while (true) {
const data = await iterator.next()
if (data === undefined) break
const dataBulk = await iterator.next()
if (dataBulk.length === 0) break
const blob = (await exAdapter.stat(ctx, workspaceId, data._id)) ?? (await source.stat(ctx, workspaceId, data._id))
for (const data of dataBulk) {
const blob =
(await exAdapter.stat(ctx, workspaceId, data._id)) ?? (await source.stat(ctx, workspaceId, data._id))
if (blob === undefined) {
console.error('blob not found', data._id)
@ -172,6 +176,7 @@ async function processAdapter (
time = Date.now()
}
}
}
await rateLimiter.waitProcessing()
} finally {

View File

@ -31,7 +31,7 @@ export interface DocInfo {
* @public
*/
export interface StorageIterator {
next: (ctx: MeasureContext) => Promise<DocInfo | undefined>
next: (ctx: MeasureContext) => Promise<DocInfo[]>
close: (ctx: MeasureContext) => Promise<void>
}

View File

@ -24,7 +24,7 @@ export interface UploadedObjectInfo {
}
export interface BlobStorageIterator {
next: () => Promise<ListBlobResult | undefined>
next: () => Promise<ListBlobResult[]>
close: () => Promise<void>
}
@ -99,7 +99,7 @@ export class DummyStorageAdapter implements StorageAdapter, StorageAdapterEx {
find (ctx: MeasureContext, workspaceId: WorkspaceId): StorageIterator {
return {
next: async (ctx) => undefined,
next: async (ctx) => [],
close: async (ctx) => {}
}
}
@ -120,8 +120,8 @@ export class DummyStorageAdapter implements StorageAdapter, StorageAdapterEx {
async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> {
return {
next: async (): Promise<ListBlobResult | undefined> => {
return undefined
next: async (): Promise<ListBlobResult[]> => {
return []
},
close: async () => {}
}
@ -179,16 +179,18 @@ export async function removeAllObjects (
const iterator = await storage.listStream(ctx, workspaceId)
let bulk: string[] = []
while (true) {
const obj = await iterator.next()
if (obj === undefined) {
const objs = await iterator.next()
if (objs.length === 0) {
break
}
for (const obj of objs) {
bulk.push(obj.storageId)
if (bulk.length > 50) {
await storage.remove(ctx, workspaceId, bulk)
bulk = []
}
}
}
if (bulk.length > 0) {
await storage.remove(ctx, workspaceId, bulk)
bulk = []
@ -206,10 +208,10 @@ export async function objectsToArray (
const bulk: ListBlobResult[] = []
while (true) {
const obj = await iterator.next()
if (obj === undefined) {
if (obj.length === 0) {
break
}
bulk.push(obj)
bulk.push(...obj)
}
await iterator.close()
return bulk

View File

@ -2,7 +2,6 @@ FROM node:20
WORKDIR /usr/src/app
RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd snappy msgpackr msgpackr-extract --unsafe-perm
RUN npm install --ignore-scripts=false --verbose uNetworking/uWebSockets.js#v20.47.0
RUN apt-get update
RUN apt-get install libjemalloc2
@ -10,7 +9,6 @@ RUN apt-get install libjemalloc2
ENV LD_PRELOAD=libjemalloc.so.2
ENV MALLOC_CONF=dirty_decay_ms:1000,narenas:2,background_thread:true
RUN mv node_modules/uWebSockets.js/*.node .
COPY bundle/bundle.js ./
COPY bundle/bundle.js.map ./

View File

@ -41,7 +41,6 @@ import { BlobClient, createClient } from '@hcengineering/server-client'
import { fullTextPushStagePrefix, type StorageAdapter } from '@hcengineering/server-core'
import { generateToken } from '@hcengineering/server-token'
import { connect } from '@hcengineering/server-tool'
import { mkdtemp, writeFile } from 'node:fs/promises'
import { PassThrough } from 'node:stream'
import { createGzip } from 'node:zlib'
import { join } from 'path'
@ -488,6 +487,16 @@ async function cleanDomain (ctx: MeasureContext, connection: CoreClient & Backup
}
}
function doTrimHash (s: string | undefined): string {
if (s == null) {
return ''
}
if (s.startsWith('"') && s.endsWith('"')) {
return s.slice(1, s.length - 1)
}
return s
}
/**
* @public
*/
@ -526,11 +535,15 @@ export async function backup (
let canceled = false
let timer: any
let ops = 0
if (options.timeout > 0) {
timer = setTimeout(() => {
timer = setInterval(() => {
if (ops === 0) {
ctx.error('Timeout during backup', { workspace: workspaceId.name, timeout: options.timeout / 1000 })
ops = 0
canceled = true
}
}, options.timeout)
}
@ -545,8 +558,6 @@ export async function backup (
const blobClient = new BlobClient(transactorUrl, token, workspaceId, { storageAdapter: options.storageAdapter })
ctx.info('starting backup', { workspace: workspaceId.name })
let tmpDir: string | undefined
try {
const domains = [
...connection
@ -613,6 +624,7 @@ export async function backup (
if (size == null || Number.isNaN(size)) {
return
}
ops++
downloaded += size
const newDownloadedMb = Math.round(downloaded / (1024 * 1024))
const newId = Math.round(newDownloadedMb / 10)
@ -641,6 +653,7 @@ export async function backup (
try {
const currentChunk = await ctx.with('loadChunk', {}, () => connection.loadChunk(domain, idx, options.recheck))
idx = currentChunk.idx
ops++
let needRetrieve: Ref<Doc>[] = []
let currentNeedRetrieveSize = 0
@ -656,17 +669,18 @@ export async function backup (
})
st = Date.now()
}
const kHash = digest.get(id as Ref<Doc>)
const _hash = doTrimHash(hash)
const kHash = doTrimHash(digest.get(id as Ref<Doc>))
if (kHash !== undefined) {
digest.delete(id as Ref<Doc>)
if (kHash !== hash) {
changes.updated.set(id as Ref<Doc>, hash)
if (kHash !== _hash) {
changes.updated.set(id as Ref<Doc>, _hash)
needRetrieve.push(id as Ref<Doc>)
currentNeedRetrieveSize += size
changed++
}
} else {
changes.added.set(id as Ref<Doc>, hash)
changes.added.set(id as Ref<Doc>, _hash)
needRetrieve.push(id as Ref<Doc>)
changed++
currentNeedRetrieveSize += size
@ -728,19 +742,13 @@ export async function backup (
}
// Cumulative digest
const digest = await ctx.with(
'load-digest',
{},
async (ctx) => await loadDigest(ctx, storage, backupInfo.snapshots, domain)
)
const digest = await ctx.with('load-digest', {}, (ctx) => loadDigest(ctx, storage, backupInfo.snapshots, domain))
let _pack: Pack | undefined
let addedDocuments = 0
let { changed, needRetrieveChunks } = await ctx.with(
'load-chunks',
{ domain },
async (ctx) => await loadChangesFromServer(ctx, domain, digest, changes)
let { changed, needRetrieveChunks } = await ctx.with('load-chunks', { domain }, (ctx) =>
loadChangesFromServer(ctx, domain, digest, changes)
)
if (needRetrieveChunks.length > 0) {
@ -761,6 +769,7 @@ export async function backup (
let docs: Doc[] = []
try {
docs = await ctx.with('load-docs', {}, async (ctx) => await connection.loadDocs(domain, needRetrieve))
ops++
} catch (err: any) {
ctx.error('error loading docs', { domain, err, workspace: workspaceId.name })
// Put back.
@ -876,16 +885,12 @@ export async function backup (
const finalBuffer = Buffer.concat(buffers)
if (finalBuffer.length !== blob.size) {
tmpDir = tmpDir ?? (await mkdtemp('backup', {}))
const tmpFile = join(tmpDir, blob._id)
await writeFile(tmpFile, finalBuffer)
await writeFile(tmpFile + '.json', JSON.stringify(blob, undefined, 2))
ctx.error('download blob size mismatch', {
_id: blob._id,
contentType: blob.contentType,
size: blob.size,
provider: blob.provider,
tempDir: tmpDir
bufferSize: finalBuffer.length,
provider: blob.provider
})
}
_pack.entry({ name: d._id + '.json' }, descrJson, (err) => {
@ -975,7 +980,7 @@ export async function backup (
}
ctx.end()
if (options.timeout !== -1) {
clearTimeout(timer)
clearInterval(timer)
}
}
}
@ -1200,22 +1205,12 @@ export async function restore (
workspace: workspaceId.name
})
const doTrim = (s: string | undefined): string | undefined => {
if (s == null) {
return s
}
if (s.startsWith('"') && s.endsWith('"')) {
return s.slice(1, s.length - 1)
}
return s
}
// Let's find difference
const docsToAdd = new Map(
Array.from(changeset.entries()).filter(
([it]) =>
!serverChangeset.has(it) ||
(serverChangeset.has(it) && doTrim(serverChangeset.get(it)) !== doTrim(changeset.get(it)))
(serverChangeset.has(it) && doTrimHash(serverChangeset.get(it)) !== doTrimHash(changeset.get(it)))
)
)
const docsToRemove = Array.from(serverChangeset.keys()).filter((it) => !changeset.has(it))

View File

@ -56,7 +56,7 @@ export class MemStorageAdapter implements StorageAdapter {
const files = Array.from(this.files.values()).filter((it) => it.workspace === workspaceId.name)
return {
next: async () => {
return files.shift()
return files.splice(0, 100)
},
close: async () => {}
}
@ -189,8 +189,7 @@ export class MemRawDBAdapter implements RawDBAdapter {
}
return {
next: async () => {
const doc = result.shift()
return doc
return result.splice(0, 50)
},
close: async () => {}
}

View File

@ -57,7 +57,7 @@ export interface DomainHelper {
}
export interface RawDBAdapterStream<T extends Doc> {
next: () => Promise<T | undefined>
next: () => Promise<T[]>
close: () => Promise<void>
}

View File

@ -86,7 +86,7 @@ export class DummyDbAdapter implements DbAdapter {
find (ctx: MeasureContext, domain: Domain): StorageIterator {
return {
next: async () => undefined,
next: async () => [],
close: async () => {}
}
}

View File

@ -99,31 +99,20 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
find (ctx: MeasureContext, workspaceId: WorkspaceId): StorageIterator {
const storageIterator = this.makeStorageIterator(ctx, workspaceId)
let buffer: ListBlobResult[] = []
return {
next: async (ctx) => {
const docInfo = await storageIterator.next()
if (docInfo !== undefined) {
buffer.push(docInfo)
next: async () => {
const docInfos = await storageIterator.next()
if (docInfos.length > 0) {
await this.doSyncDocs(ctx, workspaceId, docInfos)
}
if (buffer.length > 50) {
await this.doSyncDocs(ctx, workspaceId, buffer)
buffer = []
}
if (docInfo !== undefined) {
return {
hash: docInfo.etag,
id: docInfo._id,
size: docInfo.size
}
}
return docInfos.map((it) => ({
hash: it.etag,
id: it._id,
size: it.size
}))
},
close: async (ctx) => {
if (buffer.length > 0) {
await this.doSyncDocs(ctx, workspaceId, buffer)
}
await storageIterator.close()
}
}
@ -134,22 +123,21 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
let iterator: BlobStorageIterator | undefined
return {
next: async () => {
while (true) {
if (iterator === undefined && adapters.length > 0) {
iterator = await (adapters.shift() as StorageAdapter).listStream(ctx, workspaceId)
}
if (iterator === undefined) {
return undefined
return []
}
const docInfo = await iterator.next()
if (docInfo !== undefined) {
const docInfos = await iterator.next()
if (docInfos.length > 0) {
// We need to check if our stored version is fine
return docInfo
return docInfos
} else {
// We need to take next adapter
await iterator.close()
iterator = undefined
}
return []
}
},
close: async () => {
@ -227,7 +215,7 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> {
const data = await this.dbAdapter.findStream<Blob>(ctx, workspaceId, DOMAIN_BLOB, {})
return {
next: async (): Promise<ListBlobResult | undefined> => {
next: async (): Promise<ListBlobResult[]> => {
return await data.next()
},
close: async () => {

View File

@ -9,6 +9,7 @@ import {
type StorageIterator,
type WorkspaceId
} from '@hcengineering/core'
import { estimateDocSize } from './utils'
export * from '@hcengineering/storage'
@ -19,7 +20,7 @@ export function getBucketId (workspaceId: WorkspaceId): string {
return toWorkspaceString(workspaceId)
}
const chunkSize = 2 * 1024 * 1024
const chunkSize = 512 * 1024
/**
* @public
@ -70,15 +71,16 @@ export class BackupClientOps {
const docs: DocInfo[] = []
while (size < chunkSize) {
const doc = await chunk.iterator.next(ctx)
if (doc === undefined) {
const _docs = await chunk.iterator.next(ctx)
if (_docs.length === 0) {
chunk.finished = true
break
}
size += doc.size
for (const doc of _docs) {
size += estimateDocSize(doc)
docs.push(doc)
}
}
return {
idx,

View File

@ -192,7 +192,7 @@ export class MinioService implements StorageAdapter {
const rootPrefix = this.rootPrefix(workspaceId)
return {
next: async (): Promise<ListBlobResult | undefined> => {
next: async (): Promise<ListBlobResult[]> => {
try {
if (stream === undefined && !done) {
const rprefix = rootPrefix ?? ''
@ -227,7 +227,7 @@ export class MinioService implements StorageAdapter {
})
}
onNext()
if (buffer.length > 5) {
if (buffer.length > 100) {
stream?.pause()
}
})
@ -236,24 +236,24 @@ export class MinioService implements StorageAdapter {
const msg = (err?.message as string) ?? ''
if (msg.includes('Invalid bucket name') || msg.includes('The specified bucket does not exist')) {
hasMore = false
return
return []
}
error = err
}
if (buffer.length > 0) {
return buffer.shift()
return buffer.splice(0, 50)
}
if (!hasMore) {
return undefined
return []
}
return await new Promise<ListBlobResult | undefined>((resolve, reject) => {
return await new Promise<ListBlobResult[]>((resolve, reject) => {
onNext = () => {
if (error != null) {
reject(error)
}
onNext = () => {}
resolve(buffer.shift())
resolve(buffer.splice(0, 50))
}
stream?.resume()
})

View File

@ -105,7 +105,17 @@ export function createRawMongoDBAdapter (url: string): RawDBAdapter {
const { cursor } = await getCursor(workspace, domain, query, options)
return {
next: async () => (await cursor.next()) ?? undefined,
next: async () => {
const result: T[] = []
const doc = await cursor.next()
if (doc != null) {
result.push(doc)
}
if (cursor.bufferedCount() > 0) {
result.push(...cursor.readBufferedDocuments())
}
return result
},
close: async () => {
await cursor.close()
}

View File

@ -16,7 +16,6 @@
import core, {
DOMAIN_MODEL,
DOMAIN_TX,
type Iterator,
SortingOrder,
TxProcessor,
addOperation,
@ -30,6 +29,7 @@ import core, {
type AttachedDoc,
type Class,
type Doc,
type DocInfo,
type DocumentQuery,
type DocumentUpdate,
type Domain,
@ -38,6 +38,7 @@ import core, {
type FindOptions,
type FindResult,
type Hierarchy,
type Iterator,
type Lookup,
type MeasureContext,
type Mixin,
@ -135,7 +136,7 @@ export async function toArray<T> (cursor: AbstractCursor<T>): Promise<T[]> {
}
export interface DbAdapterOptions {
calculateHash?: (doc: Doc) => string
calculateHash?: (doc: Doc) => { digest: string, size: number }
}
abstract class MongoAdapterBase implements DbAdapter {
@ -1034,21 +1035,41 @@ abstract class MongoAdapterBase implements DbAdapter {
iterator = coll.find({ '%hash%': { $in: ['', null] } })
d = await ctx.with('next', { mode }, async () => await iterator.next())
}
if (d == null) {
return undefined
const result: DocInfo[] = []
if (d != null) {
result.push(this.toDocInfo(d, bulkUpdate))
}
if (iterator.bufferedCount() > 0) {
result.push(...iterator.readBufferedDocuments().map((it) => this.toDocInfo(it, bulkUpdate)))
}
await ctx.with('flush', {}, async () => {
await flush()
})
return result
},
close: async () => {
await ctx.with('flush', {}, async () => {
await flush(true)
})
await ctx.with('close', {}, async () => {
await iterator.close()
})
ctx.end()
}
}
}
private toDocInfo (d: Doc, bulkUpdate: Map<Ref<Doc>, string>): DocInfo {
let digest: string | null = (d as any)['%hash%']
if ('%hash%' in d) {
delete d['%hash%']
}
const pos = (digest ?? '').indexOf('|')
if (digest == null || digest === '') {
const cs = ctx.newChild('calc-size', {})
const size = estimateDocSize(d)
cs.end()
let size = estimateDocSize(d)
if (this.options?.calculateHash !== undefined) {
digest = this.options.calculateHash(d)
;({ digest, size } = this.options.calculateHash(d))
} else {
const hash = createHash('sha256')
updateHashForDoc(hash, d)
@ -1056,10 +1077,6 @@ abstract class MongoAdapterBase implements DbAdapter {
}
bulkUpdate.set(d._id, `${digest}|${size.toString(16)}`)
await ctx.with('flush', {}, async () => {
await flush()
})
return {
id: d._id,
hash: digest,
@ -1072,17 +1089,6 @@ abstract class MongoAdapterBase implements DbAdapter {
size: parseInt(digest.slice(pos + 1), 16)
}
}
},
close: async () => {
await ctx.with('flush', {}, async () => {
await flush(true)
})
await ctx.with('close', {}, async () => {
await iterator.close()
})
ctx.end()
}
}
}
async load (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {

View File

@ -23,7 +23,7 @@ import {
} from '@hcengineering/core'
import { PlatformError, unknownStatus } from '@hcengineering/platform'
import { type DomainHelperOperations } from '@hcengineering/server-core'
import { MongoClient, type Collection, type Db, type Document, type MongoClientOptions } from 'mongodb'
import { MongoClient, type Collection, type Db, type Document } from 'mongodb'
const connections = new Map<string, MongoClientReferenceImpl>()
@ -121,31 +121,20 @@ export class ClientRef implements MongoClientReference {
* Initialize a workspace connection to DB
* @public
*/
export function getMongoClient (uri: string, options?: MongoClientOptions): MongoClientReference {
export function getMongoClient (uri: string): MongoClientReference {
const extraOptions = JSON.parse(process.env.MONGO_OPTIONS ?? '{}')
const key = `${uri}${process.env.MONGO_OPTIONS ?? '{}'}_${JSON.stringify(options ?? {})}`
const key = `${uri}${process.env.MONGO_OPTIONS ?? '{}'}`
let existing = connections.get(key)
const allOptions: MongoClientOptions = {
...options,
...extraOptions
}
// Make poll size stable
if (allOptions.maxPoolSize !== undefined) {
allOptions.minPoolSize = allOptions.maxPoolSize
}
allOptions.monitorCommands = false
allOptions.noDelay = true
// If not created or closed
if (existing === undefined) {
existing = new MongoClientReferenceImpl(
MongoClient.connect(uri, {
retryReads: true,
appName: 'transactor',
enableUtf8Validation: false,
...allOptions
...extraOptions
}),
() => {
connections.delete(key)

View File

@ -18,6 +18,7 @@ import core, {
type AttachedDoc,
type Class,
type Doc,
type DocInfo,
type DocumentQuery,
type DocumentUpdate,
type Domain,
@ -941,12 +942,12 @@ abstract class PostgresAdapterBase implements DbAdapter {
)
}
const next = async (): Promise<Doc | null> => {
const result = await client.query(`FETCH 1 FROM ${cursorName}`)
const next = async (limit: number): Promise<Doc[]> => {
const result = await client.query(`FETCH ${limit} FROM ${cursorName}`)
if (result.rows.length === 0) {
return null
return []
}
return result.rows[0] !== undefined ? parseDoc(result.rows[0]) : null
return result.rows.filter((it) => it != null).map((it) => parseDoc(it))
}
const flush = async (flush = false): Promise<void> => {
@ -975,16 +976,18 @@ abstract class PostgresAdapterBase implements DbAdapter {
await init('_id, data', "data ->> '%hash%' IS NOT NULL AND data ->> '%hash%' <> ''")
initialized = true
}
let d = await ctx.with('next', { mode }, async () => await next())
if (d == null && mode === 'hashed') {
let docs = await ctx.with('next', { mode }, async () => await next(50))
if (docs.length === 0 && mode === 'hashed') {
await close(cursorName)
mode = 'non_hashed'
await init('*', "data ->> '%hash%' IS NULL OR data ->> '%hash%' = ''")
d = await ctx.with('next', { mode }, async () => await next())
docs = await ctx.with('next', { mode }, async () => await next(50))
}
if (d == null) {
return undefined
if (docs.length === 0) {
return []
}
const result: DocInfo[] = []
for (const d of docs) {
let digest: string | null = (d as any)['%hash%']
if ('%hash%' in d) {
delete d['%hash%']
@ -1004,18 +1007,20 @@ abstract class PostgresAdapterBase implements DbAdapter {
await ctx.with('flush', {}, async () => {
await flush()
})
return {
result.push({
id: d._id,
hash: digest,
size
}
})
} else {
return {
result.push({
id: d._id,
hash: digest.slice(0, pos),
size: parseInt(digest.slice(pos + 1), 16)
})
}
}
return result
},
close: async () => {
await ctx.with('flush', {}, async () => {

View File

@ -239,9 +239,9 @@ export class S3Service implements StorageAdapter {
const rootPrefix = this.rootPrefix(workspaceId)
return {
next: async (): Promise<ListBlobResult | undefined> => {
next: async (): Promise<ListBlobResult[]> => {
try {
if (hasMore && buffer.length === 0) {
while (hasMore && buffer.length < 50) {
const res = await this.client.listObjectsV2({
Bucket: this.getBucketId(workspaceId),
Prefix: rootPrefix ?? '',
@ -271,12 +271,7 @@ export class S3Service implements StorageAdapter {
} catch (err: any) {
ctx.error('Failed to get list', { error: err, workspaceId: workspaceId.name })
}
if (buffer.length > 0) {
return buffer.shift()
}
if (!hasMore) {
return undefined
}
return buffer.splice(0, 50)
},
close: async () => {}
}

View File

@ -170,9 +170,9 @@ export async function createStorageDataAdapter (
calculateHash: (d) => {
const blob = d as Blob
if (storageEx?.adapters !== undefined && storageEx.adapters.get(blob.provider) === undefined) {
return blob.etag + '_' + storageEx.defaultAdapter // Replace tag to be able to move to new provider
return { digest: blob.etag + '_' + storageEx.defaultAdapter, size: blob.size }
}
return blob.etag
return { digest: blob.etag, size: blob.size }
}
})
return new StorageBlobAdapter(workspaceId, storage, ctx, blobAdapter)