UBERF-8595: Fix backup/restore performance (#7188)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-11-22 13:23:50 +07:00 committed by GitHub
parent 1e2b3645b0
commit e63bbd563b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 498 additions and 370 deletions

33
.vscode/launch.json vendored
View File

@ -35,9 +35,11 @@
"args": ["src/__start.ts"],
"env": {
"FULLTEXT_URL": "http://localhost:4700",
"MONGO_URL": "mongodb://localhost:27017",
"DB_URL": "mongodb://localhost:27017",
// "MONGO_URL": "mongodb://localhost:27017",
// "DB_URL": "mongodb://localhost:27017",
// "DB_URL": "postgresql://postgres:example@localhost:5432",
"DB_URL": "postgresql://root@host.docker.internal:26257/defaultdb?sslmode=disable",
"SERVER_PORT": "3332",
"APM_SERVER_URL2": "http://localhost:8200",
"METRICS_CONSOLE": "false",
"METRICS_FILE": "${workspaceRoot}/metrics.txt", // Show metrics in console evert 30 seconds.,
@ -253,7 +255,7 @@
"name": "Debug backup tool",
"type": "node",
"request": "launch",
"args": ["src/__start.ts", "backup", "../temp/backup-test", "platform", "--force"],
"args": ["src/__start.ts", "backup-restore", "../../../hardware/dump/alex-staff-agency", "w-haiodo-alex-staff-c-673ee7ab-87df5406ea-2b8b4d", "--skip", "blob"],
"env": {
"MINIO_ACCESS_KEY": "minioadmin",
"MINIO_SECRET_KEY": "minioadmin",
@ -295,6 +297,31 @@
"outputCapture": "std",
"cwd": "${workspaceRoot}/dev/tool"
},
{
"name": "Debug tool upgrade PG(tests)",
"type": "node",
"request": "launch",
"args": ["src/__start.ts", "upgrade", "--force"],
"env": {
"SERVER_SECRET": "secret",
"MINIO_ACCESS_KEY": "minioadmin",
"MINIO_SECRET_KEY": "minioadmin",
"MINIO_ENDPOINT": "localhost:9002",
"TRANSACTOR_URL": "ws://localhost:3334",
"ACCOUNT_DB_URL": "postgresql://postgres:example@localhost:5433",
"DB_URL": "postgresql://postgres:example@localhost:5433",
"MONGO_URL": "mongodb://localhost:27018",
"ACCOUNTS_URL": "http://localhost:3003",
"TELEGRAM_DATABASE": "telegram-service",
"ELASTIC_URL": "http://localhost:9201",
"REKONI_URL": "http://localhost:4004",
"MODEL_VERSION": "0.6.287"
},
"runtimeArgs": ["--nolazy", "-r", "ts-node/register"],
"sourceMaps": true,
"outputCapture": "std",
"cwd": "${workspaceRoot}/dev/tool"
},
{
"name": "Debug tool move",
"type": "node",

View File

@ -1075,7 +1075,7 @@ dependencies:
version: file:projects/time-resources.tgz(@tiptap/core@2.6.6)(@tiptap/pm@2.6.6)(@types/node@20.11.19)(esbuild@0.20.1)(postcss-load-config@4.0.2)(postcss@8.4.35)(ts-node@10.9.2)
'@rush-temp/tool':
specifier: file:./projects/tool.tgz
version: file:projects/tool.tgz(bufferutil@4.0.8)(utf-8-validate@6.0.4)
version: file:projects/tool.tgz
'@rush-temp/tracker':
specifier: file:./projects/tracker.tgz
version: file:projects/tracker.tgz(@types/node@20.11.19)(esbuild@0.20.1)(ts-node@10.9.2)
@ -1766,9 +1766,6 @@ dependencies:
pdfjs-dist:
specifier: 2.12.313
version: 2.12.313
pg:
specifier: 8.12.0
version: 8.12.0
png-chunks-extract:
specifier: ^1.0.0
version: 1.0.0
@ -1782,8 +1779,8 @@ dependencies:
specifier: ^7.0.2
version: 7.3.4(postcss@8.4.35)(typescript@5.3.3)(webpack@5.90.3)
postgres:
specifier: ^3.4.4
version: 3.4.4
specifier: ^3.4.5
version: 3.4.5
posthog-js:
specifier: ~1.122.0
version: 1.122.0
@ -16887,8 +16884,8 @@ packages:
resolution: {integrity: sha512-i/hbxIE9803Alj/6ytL7UHQxRvZkI9O4Sy+J3HGc4F4oo/2eQAjTSNJ0bfxyse3bH0nuVesCk+3IRLaMtG3H6w==}
dev: false
/postgres@3.4.4:
resolution: {integrity: sha512-IbyN+9KslkqcXa8AO9fxpk97PA4pzewvpi2B3Dwy9u4zpV32QicaEdgmF3eSQUzdRk7ttDHQejNgAEr4XoeH4A==}
/postgres@3.4.5:
resolution: {integrity: sha512-cDWgoah1Gez9rN3H4165peY9qfpEo+SA61oQv65O3cRUE1pOEoJWwddwcqKE8XZYjbblOJlYDlLV4h67HrEVDg==}
engines: {node: '>=12'}
dev: false
@ -21012,7 +21009,7 @@ packages:
dev: false
file:projects/account.tgz(@types/node@20.11.19)(esbuild@0.20.1)(ts-node@10.9.2):
resolution: {integrity: sha512-7ojvPVqMSMXTMLyJeAIRk/7DFH31sUkEO0taWQx2aMHp7D5pfaIhFwWUFEwIAiszGSxb45SHxQNJqeRZ/+IDdw==, tarball: file:projects/account.tgz}
resolution: {integrity: sha512-R9WSNuYeIemHJ7eAv7sCyf8IZHZott9CY1b/KJLAknaJfozXLQ6yOYszJjS/iWQRPl4TFFLgYsQaLWd1gDiqww==, tarball: file:projects/account.tgz}
id: file:projects/account.tgz
name: '@rush-temp/account'
version: 0.0.0
@ -21029,7 +21026,7 @@ packages:
jest: 29.7.0(@types/node@20.11.19)(ts-node@10.9.2)
mongodb: 6.10.0
otp-generator: 4.0.1
postgres: 3.4.4
postgres: 3.4.5
prettier: 3.2.5
ts-jest: 29.1.2(esbuild@0.20.1)(jest@29.7.0)(typescript@5.3.3)
typescript: 5.3.3
@ -22174,7 +22171,7 @@ packages:
dev: false
file:projects/cloud-datalake.tgz(@types/node@20.11.19)(bufferutil@4.0.8)(esbuild@0.20.1)(ts-node@10.9.2)(utf-8-validate@6.0.4):
resolution: {integrity: sha512-KdKIEaVTjeWtCEUYfkvyrIYxglGcfM8iICOnTxRzPiVmOIP3kswu7PaUKeH/xeFZQcvRBvhtNI7XT0dBr5mODA==, tarball: file:projects/cloud-datalake.tgz}
resolution: {integrity: sha512-4xq66fA2IsBhSePKZhWWfD1oCMF7eavO8dkefUDupGqhNUr/gv+4zPrsvoJZ2x5ZozqNvBHzbDQQeIuxg1uxNQ==, tarball: file:projects/cloud-datalake.tgz}
id: file:projects/cloud-datalake.tgz
name: '@rush-temp/cloud-datalake'
version: 0.0.0
@ -22191,7 +22188,7 @@ packages:
eslint-plugin-promise: 6.1.1(eslint@8.56.0)
itty-router: 5.0.18
jest: 29.7.0(@types/node@20.11.19)(ts-node@10.9.2)
postgres: 3.4.4
postgres: 3.4.5
prettier: 3.2.5
ts-jest: 29.1.2(esbuild@0.20.1)(jest@29.7.0)(typescript@5.6.2)
typescript: 5.6.2
@ -27301,7 +27298,7 @@ packages:
dev: false
file:projects/postgres.tgz(esbuild@0.20.1)(ts-node@10.9.2):
resolution: {integrity: sha512-/6jhoJPjD7X4j5u87Epy0kCDI5gedKzlzCM/anvCuB57Igtpi6Ji1cX6LrzyJ33gbswI8tlygVs4miiYWToa5g==, tarball: file:projects/postgres.tgz}
resolution: {integrity: sha512-L+wKYPpR4mHAuZkVGw2tuKwfngukoXRgeSCMf5TOn5bIFkwqV+HUpi0O4iDCIizyFvGftWdPQARYFRvRa0qlXg==, tarball: file:projects/postgres.tgz}
id: file:projects/postgres.tgz
name: '@rush-temp/postgres'
version: 0.0.0
@ -27318,7 +27315,7 @@ packages:
eslint-plugin-promise: 6.1.1(eslint@8.56.0)
jest: 29.7.0(@types/node@20.11.19)(ts-node@10.9.2)
pg: 8.12.0
postgres: 3.4.4
postgres: 3.4.5
prettier: 3.2.5
ts-jest: 29.1.2(esbuild@0.20.1)(jest@29.7.0)(typescript@5.3.3)
typescript: 5.3.3
@ -27329,7 +27326,6 @@ packages:
- babel-plugin-macros
- esbuild
- node-notifier
- pg-native
- supports-color
- ts-node
dev: false
@ -31936,9 +31932,8 @@ packages:
- ts-node
dev: false
file:projects/tool.tgz(bufferutil@4.0.8)(utf-8-validate@6.0.4):
resolution: {integrity: sha512-jFXGlkB1O4yIpKHye9XX55H4xZNemZfh3KaoCr5CKcaNIE5qZV8PGRihn2SuMNt5Y10PP2cUMXhJcBEO8tfNPQ==, tarball: file:projects/tool.tgz}
id: file:projects/tool.tgz
file:projects/tool.tgz:
resolution: {integrity: sha512-J1h1FNTEQWAGhkKNN3srbnXslsYEAoTnynbgY+6H1XzfABHP8gP0+g6aySbSfMn2HDM8TKfyPh/o9DGHs3tEfw==, tarball: file:projects/tool.tgz}
name: '@rush-temp/tool'
version: 0.0.0
dependencies:
@ -31947,11 +31942,11 @@ packages:
'@types/mime-types': 2.1.4
'@types/minio': 7.0.18
'@types/node': 20.11.19
'@types/pg': 8.11.6
'@types/request': 2.48.12
'@types/ws': 8.5.11
'@typescript-eslint/eslint-plugin': 6.21.0(@typescript-eslint/parser@6.21.0)(eslint@8.56.0)(typescript@5.3.3)
'@typescript-eslint/parser': 6.21.0(eslint@8.56.0)(typescript@5.3.3)
bufferutil: 4.0.8
commander: 8.3.0
cross-env: 7.0.3
csv-parse: 5.1.0
@ -31968,12 +31963,14 @@ packages:
libphonenumber-js: 1.10.56
mime-types: 2.1.35
mongodb: 6.10.0
pg: 8.12.0
postgres: 3.4.4
msgpackr: 1.11.0
msgpackr-extract: 3.0.3
postgres: 3.4.5
prettier: 3.2.5
ts-jest: 29.1.2(esbuild@0.20.1)(jest@29.7.0)(typescript@5.3.3)
ts-node: 10.9.2(@types/node@20.11.19)(typescript@5.3.3)
typescript: 5.3.3
utf-8-validate: 6.0.4
ws: 8.18.0(bufferutil@4.0.8)(utf-8-validate@6.0.4)
transitivePeerDependencies:
- '@aws-sdk/credential-providers'
@ -31984,7 +31981,6 @@ packages:
- '@swc/wasm'
- babel-jest
- babel-plugin-macros
- bufferutil
- gcp-metadata
- kerberos
- mongodb-client-encryption
@ -31992,7 +31988,6 @@ packages:
- snappy
- socks
- supports-color
- utf-8-validate
dev: false
file:projects/tracker-assets.tgz(esbuild@0.20.1)(ts-node@10.9.2):

View File

@ -2,6 +2,8 @@ FROM node:20
WORKDIR /usr/src/app
RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd snappy msgpackr msgpackr-extract --unsafe-perm
COPY bundle/bundle.js ./
CMD [ "bash" ]

View File

@ -158,7 +158,11 @@
"libphonenumber-js": "^1.9.46",
"mime-types": "~2.1.34",
"mongodb": "^6.10.0",
"postgres": "^3.4.4",
"ws": "^8.18.0"
"postgres": "^3.4.5",
"ws": "^8.18.0",
"bufferutil": "^4.0.8",
"utf-8-validate": "^6.0.4",
"msgpackr": "^1.11.0",
"msgpackr-extract": "^3.0.3"
}
}

View File

@ -127,6 +127,7 @@ import { fixJsonMarkup, migrateMarkup } from './markup'
import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin'
import { fixAccountEmails, renameAccount } from './renameAccount'
import { moveFiles, showLostFiles } from './storage'
import { getModelVersion } from '@hcengineering/model-all'
const colorConstants = {
colorRed: '\u001b[31m',
@ -217,7 +218,7 @@ export function devTool (
program.command('version').action(() => {
console.log(
`tools git_version: ${process.env.GIT_REVISION ?? ''} model_version: ${process.env.MODEL_VERSION ?? ''}`
`tools git_version: ${process.env.GIT_REVISION ?? ''} model_version: ${process.env.MODEL_VERSION ?? ''} ${JSON.stringify(getModelVersion())}`
)
})
@ -363,34 +364,42 @@ export function devTool (
.requiredOption('-w, --workspaceName <workspaceName>', 'Workspace name')
.option('-e, --email <email>', 'Author email', 'platform@email.com')
.option('-i, --init <ws>', 'Init from workspace')
.option('-r, --region <region>', 'Region')
.option('-b, --branding <key>', 'Branding key')
.action(async (workspace, cmd: { email: string, workspaceName: string, init?: string, branding?: string }) => {
const { dbUrl, txes, version, migrateOperations } = prepareTools()
await withDatabase(dbUrl, async (db) => {
const measureCtx = new MeasureMetricsContext('create-workspace', {})
const brandingObj =
cmd.branding !== undefined || cmd.init !== undefined ? { key: cmd.branding, initWorkspace: cmd.init } : null
const wsInfo = await createWorkspaceRecord(measureCtx, db, brandingObj, cmd.email, cmd.workspaceName, workspace)
.action(
async (
workspace,
cmd: { email: string, workspaceName: string, init?: string, branding?: string, region?: string }
) => {
const { dbUrl, txes, version, migrateOperations } = prepareTools()
await withDatabase(dbUrl, async (db) => {
const measureCtx = new MeasureMetricsContext('create-workspace', {})
const brandingObj =
cmd.branding !== undefined || cmd.init !== undefined ? { key: cmd.branding, initWorkspace: cmd.init } : null
const wsInfo = await createWorkspaceRecord(
measureCtx,
db,
brandingObj,
cmd.email,
cmd.workspaceName,
workspace,
cmd.region,
'manual-creation'
)
// update the record so it's not taken by one of the workers for the next 60 seconds
await updateWorkspace(db, wsInfo, {
mode: 'creating',
progress: 0,
lastProcessingTime: Date.now() + 1000 * 60
await createWorkspace(measureCtx, version, brandingObj, wsInfo, txes, migrateOperations, undefined, true)
await updateWorkspace(db, wsInfo, {
mode: 'active',
progress: 100,
disabled: false,
version
})
console.log('create-workspace done')
})
await createWorkspace(measureCtx, version, brandingObj, wsInfo, txes, migrateOperations, undefined, true)
await updateWorkspace(db, wsInfo, {
mode: 'active',
progress: 100,
disabled: false,
version
})
console.log('create-workspace done')
})
})
}
)
program
.command('set-user-role <email> <workspace> <role>')

View File

@ -114,7 +114,7 @@ interface ConfigurablePlugin extends Omit<Data<PluginConfiguration>, 'pluginId'
type BuilderConfig = [(b: Builder) => void, Plugin] | [(b: Builder) => void, Plugin, ConfigurablePlugin | undefined]
export function getModelVersion (): Data<Version> {
const rawVersion = (process.env.MODEL_VERSION ?? '0.6.0').trim().replace('v', '').split('.')
const rawVersion = (process.env.MODEL_VERSION ?? '0.6.0').replace('"', '').trim().replace('v', '').split('.')
if (rawVersion.length === 3) {
return {
major: parseInt(rawVersion[0]),

View File

@ -648,6 +648,7 @@ export interface DomainIndexConfiguration extends Doc {
}
export type WorkspaceMode =
| 'manual-creation'
| 'pending-creation'
| 'creating'
| 'upgrading'

View File

@ -0,0 +1,85 @@
import core, {
DOMAIN_TX,
generateId,
MeasureMetricsContext,
type Doc,
type LowLevelStorage,
type Ref,
type TxCreateDoc
} from '@hcengineering/core'
import builder from '@hcengineering/model-all'
import { wrapPipeline } from '@hcengineering/server-core'
import { getServerPipeline } from '@hcengineering/server-pipeline'
const model = builder().getTxes()
// const dbURL = 'postgresql://root@localhost:26257/defaultdb?sslmode=disable'
const dbURL = 'postgresql://postgres:example@localhost:5432'
const STORAGE_CONFIG = 'minio|localhost:9000?accessKey=minioadmin&secretKey=minioadmin&useSSL=false'
describe.skip('test-backup-find', () => {
it('check create/load/clean', async () => {
const toolCtx = new MeasureMetricsContext('-', {})
// We should setup a DB with docuemnts and try to backup them.
const wsUrl = { name: 'testdb-backup-test', workspaceName: 'test', workspaceUrl: 'test' }
const { pipeline, storageAdapter } = await getServerPipeline(toolCtx, model, dbURL, wsUrl, {
storageConfig: STORAGE_CONFIG,
disableTriggers: true
})
try {
const client = wrapPipeline(toolCtx, pipeline, wsUrl)
const lowLevel = pipeline.context.lowLevelStorage as LowLevelStorage
// We need to create a backup docs if they are missing.
await prepareTxes(lowLevel, toolCtx)
const docs: Doc[] = []
while (true) {
const chunk = await client.loadChunk(DOMAIN_TX, 0, true)
const part = await client.loadDocs(
DOMAIN_TX,
chunk.docs.map((doc) => doc.id as Ref<Doc>)
)
docs.push(...part)
if (chunk.finished) {
break
}
}
await client.closeChunk(0)
expect(docs.length).toBeGreaterThan(459)
await client.clean(
DOMAIN_TX,
docs.map((doc) => doc._id)
)
const findDocs = await client.findAll(core.class.Tx, {})
expect(findDocs.length).toBe(0)
//
} finally {
await pipeline.close()
await storageAdapter.close()
}
})
})
async function prepareTxes (lowLevel: LowLevelStorage, toolCtx: MeasureMetricsContext): Promise<void> {
const docs = await lowLevel.rawFindAll(DOMAIN_TX, {})
if ((docs?.length ?? 0) < 500) {
// We need to fill some documents to be pressent
const docs: TxCreateDoc<Doc>[] = []
for (let i = 0; i < 500; i++) {
docs.push({
_class: core.class.TxCreateDoc,
_id: generateId(),
space: core.space.Tx,
modifiedBy: core.account.ConfigUser,
modifiedOn: Date.now(),
attributes: {
qwe: generateId()
},
objectClass: core.class.DocIndexState,
objectId: generateId(),
objectSpace: core.space.Workspace
})
}
await lowLevel.upload(toolCtx, DOMAIN_TX, docs)
}
}

View File

@ -41,7 +41,7 @@
"@hcengineering/mongo": "^0.6.1",
"@hcengineering/postgres": "^0.6.0",
"mongodb": "^6.10.0",
"postgres": "^3.4.4",
"postgres": "^3.4.5",
"@hcengineering/platform": "^0.6.11",
"@hcengineering/core": "^0.6.32",
"@hcengineering/contact": "^0.6.24",

View File

@ -267,6 +267,7 @@ export class WorkspaceMongoDbCollection extends MongoDbCollection<Workspace> imp
// at the time of retrieval and not after some additional processing.
const query: Filter<Workspace> = {
$and: [
{ mode: { $ne: 'manual-creation' } },
operationQuery,
attemptsQuery,
region !== '' ? { region } : defaultRegionQuery,

View File

@ -398,6 +398,7 @@ export class WorkspacePostgresDbCollection extends PostgresDbCollection<Workspac
// TODO: support returning pending deletion workspaces when we will actually want
// to clear them with the worker.
whereChunks.push("mode <> 'manual-creation'")
whereChunks.push('(attempts IS NULL OR attempts <= 3)')
whereChunks.push(`("lastProcessingTime" IS NULL OR "lastProcessingTime" < $${values.length + 1})`)
values.push(Date.now() - processingTimeoutMs)

View File

@ -42,7 +42,8 @@ import core, {
versionToString,
WorkspaceId,
type BackupStatus,
type Branding
type Branding,
type WorkspaceMode
} from '@hcengineering/core'
import platform, { getMetadata, PlatformError, Severity, Status, translate } from '@hcengineering/platform'
import { type StorageAdapter } from '@hcengineering/server-core'
@ -940,7 +941,8 @@ async function generateWorkspaceRecord (
branding: Branding | null,
workspaceName: string,
fixedWorkspace?: string,
region?: string
region?: string,
initMode: WorkspaceMode = 'pending-creation'
): Promise<Workspace> {
type WorkspaceData = Omit<Workspace, '_id' | 'endpoint'>
const brandingKey = branding?.key ?? 'huly'
@ -972,7 +974,7 @@ async function generateWorkspaceRecord (
accounts: [],
disabled: true,
region: region ?? '',
mode: 'pending-creation',
mode: initMode,
progress: 0,
createdOn: Date.now(),
lastVisit: Date.now(),
@ -1010,7 +1012,7 @@ async function generateWorkspaceRecord (
accounts: [],
disabled: true,
region: region ?? '',
mode: 'pending-creation',
mode: initMode,
progress: 0,
createdOn: Date.now(),
lastVisit: Date.now(),
@ -1051,12 +1053,13 @@ export async function createWorkspace (
email: string,
workspaceName: string,
workspace?: string,
region?: string
region?: string,
initMode: WorkspaceMode = 'pending-creation'
): Promise<Workspace> {
// We need to search for duplicate workspaceUrl
// Safe generate workspace record.
return await createQueue.exec(async () => {
return await generateWorkspaceRecord(db, email, branding, workspaceName, workspace, region)
return await generateWorkspaceRecord(db, email, branding, workspaceName, workspace, region, initMode)
})
}
@ -1122,6 +1125,7 @@ export async function updateWorkspaceInfo (
if (workspaceInfo === null) {
throw new PlatformError(new Status(Severity.ERROR, platform.status.WorkspaceNotFound, { workspace: workspaceId }))
}
progress = Math.round(progress)
const update: Partial<WorkspaceInfo> = {}
switch (event) {

View File

@ -23,6 +23,7 @@ import core, {
DOMAIN_BLOB,
DOMAIN_DOC_INDEX_STATE,
DOMAIN_MODEL,
DOMAIN_MODEL_TX,
DOMAIN_TRANSIENT,
DOMAIN_TX,
MeasureContext,
@ -789,6 +790,7 @@ export async function backup (
const blobClient = new BlobClient(transactorUrl, token, workspaceId, { storageAdapter: options.storageAdapter })
const domains = [
DOMAIN_MODEL_TX,
...connection
.getHierarchy()
.domains()
@ -1751,6 +1753,22 @@ export async function restore (
sendSize,
workspace: workspaceId.name
})
// Correct docs without space
for (const d of docs) {
if (d._class === core.class.DocIndexState) {
// We need to clean old stuff from restored document.
if ('stages' in d) {
delete (d as any).stages
delete (d as any).attributes
;(d as any).needIndex = true
;(d as any)['%hash%'] = ''
}
}
if (d.space == null) {
d.space = core.space.Workspace
;(d as any)['%hash%'] = ''
}
}
await connection.upload(c, docs)
docs.length = 0
sendSize = 0
@ -1787,7 +1805,7 @@ export async function restore (
chunks.push(chunk)
})
stream.on('end', () => {
const bf = Buffer.concat(chunks)
const bf = Buffer.concat(chunks as any)
const d = blobs.get(name)
if (d === undefined) {
blobs.set(name, { doc: undefined, buffer: bf })

View File

@ -22,19 +22,15 @@ import core, {
ModelDb,
SortingOrder,
systemAccountEmail,
type BackupClient,
type BackupStatus,
type Branding,
type Client,
type MeasureContext,
type Tx,
type WorkspaceIdWithUrl
} from '@hcengineering/core'
import { PlatformError, unknownError } from '@hcengineering/platform'
import { listAccountWorkspaces, updateBackupInfo } from '@hcengineering/server-client'
import {
BackupClientOps,
SessionDataImpl,
wrapPipeline,
type DbConfiguration,
type Pipeline,
type PipelineFactory,
@ -227,7 +223,7 @@ class BackupWorker {
if (pipeline === undefined) {
pipeline = await this.pipelineFactory(ctx, wsUrl, true, () => {}, null)
}
return this.wrapPipeline(ctx, pipeline, wsUrl)
return wrapPipeline(ctx, pipeline, wsUrl)
}
})
)
@ -266,68 +262,6 @@ class BackupWorker {
}
return { failedWorkspaces, processed, skipped: workspaces.length - processed }
}
wrapPipeline (ctx: MeasureContext, pipeline: Pipeline, wsUrl: WorkspaceIdWithUrl): Client & BackupClient {
const contextData = new SessionDataImpl(
systemAccountEmail,
'backup',
true,
{ targets: {}, txes: [] },
wsUrl,
null,
false,
new Map(),
new Map(),
pipeline.context.modelDb
)
ctx.contextData = contextData
if (pipeline.context.lowLevelStorage === undefined) {
throw new PlatformError(unknownError('Low level storage is not available'))
}
const backupOps = new BackupClientOps(pipeline.context.lowLevelStorage)
return {
findAll: async (_class, query, options) => {
return await pipeline.findAll(ctx, _class, query, options)
},
findOne: async (_class, query, options) => {
return (await pipeline.findAll(ctx, _class, query, { ...options, limit: 1 })).shift()
},
clean: async (domain, docs) => {
await backupOps.clean(ctx, domain, docs)
},
close: async () => {},
closeChunk: async (idx) => {
await backupOps.closeChunk(ctx, idx)
},
getHierarchy: () => {
return pipeline.context.hierarchy
},
getModel: () => {
return pipeline.context.modelDb
},
loadChunk: async (domain, idx, recheck) => {
return await backupOps.loadChunk(ctx, domain, idx, recheck)
},
loadDocs: async (domain, docs) => {
return await backupOps.loadDocs(ctx, domain, docs)
},
upload: async (domain, docs) => {
await backupOps.upload(ctx, domain, docs)
},
searchFulltext: async (query, options) => {
return {
docs: [],
total: 0
}
},
sendForceClose: async () => {},
tx: async (tx) => {
return {}
},
notify: (...tx) => {}
}
}
}
export function backupService (

View File

@ -5,20 +5,26 @@ import core, {
getTypeOf,
systemAccountEmail,
type Account,
type BackupClient,
type Branding,
type BrandingMap,
type BulkUpdateEvent,
type Class,
type Client,
type Doc,
type DocInfo,
type MeasureContext,
type ModelDb,
type Ref,
type SessionData,
type TxWorkspaceEvent,
type WorkspaceIdWithUrl
} from '@hcengineering/core'
import platform, { PlatformError, Severity, Status } from '@hcengineering/platform'
import { type Hash } from 'crypto'
import platform, { PlatformError, Severity, Status, unknownError } from '@hcengineering/platform'
import { createHash, type Hash } from 'crypto'
import fs from 'fs'
import { BackupClientOps } from './storage'
import type { Pipeline } from './types'
/**
* Return some estimation for object size
@ -213,3 +219,77 @@ export function loadBrandingMap (brandingPath?: string): BrandingMap {
return brandings
}
export function toDocInfo (d: Doc, bulkUpdate: Map<Ref<Doc>, string>, recheck?: boolean): DocInfo {
let digest: string | null = (d as any)['%hash%']
if ('%hash%' in d) {
delete d['%hash%']
}
const pos = (digest ?? '').indexOf('|')
const oldDigest = digest
if (digest == null || digest === '' || recheck === true) {
const size = estimateDocSize(d)
const hash = createHash('sha256')
updateHashForDoc(hash, d)
digest = hash.digest('base64')
const newDigest = `${digest}|${size.toString(16)}`
if (recheck !== true || oldDigest !== newDigest) {
bulkUpdate.set(d._id, `${digest}|${size.toString(16)}`)
}
return {
id: d._id,
hash: digest,
size
}
} else {
return {
id: d._id,
hash: pos >= 0 ? digest.slice(0, pos) : digest,
size: pos >= 0 ? parseInt(digest.slice(pos + 1), 16) : 0
}
}
}
export function wrapPipeline (
ctx: MeasureContext,
pipeline: Pipeline,
wsUrl: WorkspaceIdWithUrl
): Client & BackupClient {
const contextData = new SessionDataImpl(
systemAccountEmail,
'pipeline',
true,
{ targets: {}, txes: [] },
wsUrl,
null,
true,
new Map(),
new Map(),
pipeline.context.modelDb
)
ctx.contextData = contextData
if (pipeline.context.lowLevelStorage === undefined) {
throw new PlatformError(unknownError('Low level storage is not available'))
}
const backupOps = new BackupClientOps(pipeline.context.lowLevelStorage)
return {
findAll: (_class, query, options) => pipeline.findAll(ctx, _class, query, options),
findOne: async (_class, query, options) =>
(await pipeline.findAll(ctx, _class, query, { ...options, limit: 1 })).shift(),
clean: (domain, docs) => backupOps.clean(ctx, domain, docs),
close: () => pipeline.close(),
closeChunk: (idx) => backupOps.closeChunk(ctx, idx),
getHierarchy: () => pipeline.context.hierarchy,
getModel: () => pipeline.context.modelDb,
loadChunk: (domain, idx, recheck) => backupOps.loadChunk(ctx, domain, idx, recheck),
loadDocs: (domain, docs) => backupOps.loadDocs(ctx, domain, docs),
upload: (domain, docs) => backupOps.upload(ctx, domain, docs),
searchFulltext: async (query, options) => ({ docs: [], total: 0 }),
sendForceClose: async () => {},
tx: (tx) => pipeline.tx(ctx, [tx]),
notify: (...tx) => {}
}
}

View File

@ -63,7 +63,7 @@ import core, {
} from '@hcengineering/core'
import {
estimateDocSize,
updateHashForDoc,
toDocInfo,
type DbAdapter,
type DbAdapterHandler,
type DomainHelperOperations,
@ -71,8 +71,6 @@ import {
type StorageAdapter,
type TxAdapter
} from '@hcengineering/server-core'
import { calculateObjectSize } from 'bson'
import { createHash } from 'crypto'
import {
type AbstractCursor,
type AnyBulkWriteOperation,
@ -1062,15 +1060,16 @@ abstract class MongoAdapterBase implements DbAdapter {
if (d == null && mode === 'hashed' && recheck !== true) {
mode = 'non-hashed'
await iterator.close()
await flush(true) // We need to flush, so wrong id documents will be updated.
iterator = coll.find({ '%hash%': { $in: ['', null] } })
d = await ctx.with('next', { mode }, () => iterator.next())
}
const result: DocInfo[] = []
if (d != null) {
result.push(this.toDocInfo(d, bulkUpdate, recheck))
result.push(toDocInfo(d, bulkUpdate, recheck))
}
if (iterator.bufferedCount() > 0) {
result.push(...iterator.readBufferedDocuments().map((it) => this.toDocInfo(it, bulkUpdate, recheck)))
result.push(...iterator.readBufferedDocuments().map((it) => toDocInfo(it, bulkUpdate, recheck)))
}
await ctx.with('flush', {}, () => flush())
return result
@ -1083,42 +1082,6 @@ abstract class MongoAdapterBase implements DbAdapter {
}
}
private toDocInfo (d: Doc, bulkUpdate: Map<Ref<Doc>, string>, recheck?: boolean): DocInfo {
let digest: string | null = (d as any)['%hash%']
if ('%hash%' in d) {
delete d['%hash%']
}
const pos = (digest ?? '').indexOf('|')
const oldDigest = digest
if (digest == null || digest === '' || recheck === true) {
let size = estimateDocSize(d)
if (this.options?.calculateHash !== undefined) {
;({ digest, size } = this.options.calculateHash(d))
} else {
const hash = createHash('sha256')
updateHashForDoc(hash, d)
digest = hash.digest('base64')
}
const newDigest = `${digest}|${size.toString(16)}`
if (recheck !== true || oldDigest !== newDigest) {
bulkUpdate.set(d._id, `${digest}|${size.toString(16)}`)
}
return {
id: d._id,
hash: digest,
size
}
} else {
return {
id: d._id,
hash: digest.slice(0, pos),
size: parseInt(digest.slice(pos + 1), 16)
}
}
}
load (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
return ctx.with('load', { domain }, async () => {
if (docs.length === 0) {
@ -1678,8 +1641,7 @@ export async function uploadDocuments (ctx: MeasureContext, docs: Doc[], coll: C
if ('%hash%' in it) {
delete it['%hash%']
}
const size = digest != null ? calculateObjectSize(it) : 0
const size = digest != null ? estimateDocSize(it) : 0
return {
replaceOne: {
filter: { _id: it._id },

View File

@ -34,8 +34,7 @@
"@types/node": "~20.11.16"
},
"dependencies": {
"pg": "8.12.0",
"postgres": "^3.4.4",
"postgres": "^3.4.5",
"@hcengineering/core": "^0.6.32",
"@hcengineering/platform": "^0.6.11",
"@hcengineering/server-core": "^0.6.1"

View File

@ -60,12 +60,11 @@ import {
type DomainHelperOperations,
estimateDocSize,
type ServerFindOptions,
type TxAdapter,
updateHashForDoc
toDocInfo,
type TxAdapter
} from '@hcengineering/server-core'
import { createHash } from 'crypto'
import type postgres from 'postgres'
import { getDocFieldsByDomains, translateDomain } from './schemas'
import { getDocFieldsByDomains, getSchema, translateDomain } from './schemas'
import { type ValueType } from './types'
import {
convertDoc,
@ -173,6 +172,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
query: DocumentQuery<T>,
options?: Pick<FindOptions<T>, 'sort' | 'limit' | 'projection'>
): Promise<Iterator<T>> {
const schema = getSchema(_domain)
const client = await this.client.reserve()
let closed = false
const cursorName = `cursor_${translateDomain(this.workspaceId.name)}_${translateDomain(_domain)}_${generateId()}`
@ -209,7 +209,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
await close(cursorName)
return null
}
return result.map((p) => parseDoc(p as any, _domain))
return result.map((p) => parseDoc(p as any, schema))
}
await init()
@ -306,12 +306,14 @@ abstract class PostgresAdapterBase implements DbAdapter {
const res = await client.unsafe(
`SELECT * FROM ${translateDomain(domain)} WHERE ${translatedQuery} FOR UPDATE`
)
const docs = res.map((p) => parseDoc(p as any, domain))
const schema = getSchema(domain)
const docs = res.map((p) => parseDoc(p as any, schema))
const domainFields = new Set(getDocFieldsByDomains(domain))
for (const doc of docs) {
if (doc === undefined) continue
const prevAttachedTo = (doc as any).attachedTo
TxProcessor.applyUpdate(doc, operations)
const converted = convertDoc(domain, doc, this.workspaceId.name)
const converted = convertDoc(domain, doc, this.workspaceId.name, domainFields)
const params: any[] = [doc._id, this.workspaceId.name]
let paramsIndex = params.length + 1
const updates: string[] = []
@ -516,6 +518,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
let joinIndex: number | undefined
let skip = false
try {
const schema = getSchema(domain)
for (const column in row) {
if (column.startsWith('reverse_lookup_')) {
if (row[column] != null) {
@ -527,7 +530,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
if (res === undefined) continue
const { obj, key } = res
const parsed = row[column].map((p: any) => parseDoc(p, domain))
const parsed = row[column].map((p: any) => parseDoc(p, schema))
obj[key] = parsed
}
} else if (column.startsWith('lookup_')) {
@ -1112,115 +1115,127 @@ abstract class PostgresAdapterBase implements DbAdapter {
find (_ctx: MeasureContext, domain: Domain, recheck?: boolean): StorageIterator {
const ctx = _ctx.newChild('find', { domain })
const getCursorName = (): string => {
return `cursor_${translateDomain(this.workspaceId.name)}_${translateDomain(domain)}_${mode}`
}
let initialized: boolean = false
let client: postgres.ReservedSql
let mode: 'hashed' | 'non_hashed' = 'hashed'
let cursorName = getCursorName()
const bulkUpdate = new Map<Ref<Doc>, string>()
const close = async (cursorName: string): Promise<void> => {
try {
await client.unsafe(`CLOSE ${cursorName}`)
await client.unsafe('COMMIT')
} catch (err) {
ctx.error('Error while closing cursor', { cursorName, err })
} finally {
client.release()
}
}
const init = async (projection: string, query: string): Promise<void> => {
cursorName = getCursorName()
client = await this.client.reserve()
await client.unsafe('BEGIN')
await client.unsafe(
`DECLARE ${cursorName} CURSOR FOR SELECT ${projection} FROM ${translateDomain(domain)} WHERE "workspaceId" = $1 AND ${query}`,
[this.workspaceId.name]
)
}
const next = async (limit: number): Promise<Doc[]> => {
const result = await client.unsafe(`FETCH ${limit} FROM ${cursorName}`)
if (result.length === 0) {
return []
}
return result.filter((it) => it != null).map((it) => parseDoc(it as any, domain))
}
const tdomain = translateDomain(domain)
const schema = getSchema(domain)
const flush = async (flush = false): Promise<void> => {
if (bulkUpdate.size > 1000 || flush) {
if (bulkUpdate.size > 0) {
await ctx.with('bulk-write-find', {}, () => {
const updates = new Map(Array.from(bulkUpdate.entries()).map((it) => [it[0], { '%hash%': it[1] }]))
return this.update(ctx, domain, updates)
})
const entries = Array.from(bulkUpdate.entries())
bulkUpdate.clear()
const updateClient = await this.client.reserve()
try {
while (entries.length > 0) {
const part = entries.splice(0, 200)
const data: string[] = part.flat()
const indexes = part.map((val, idx) => `($${2 * idx + 1}::text, $${2 * idx + 2}::text)`).join(', ')
await ctx.with('bulk-write-find', {}, () => {
return this.retryTxn(updateClient, (client) =>
client.unsafe(
`
UPDATE ${tdomain} SET "%hash%" = update_data.hash
FROM (values ${indexes}) AS update_data(_id, hash)
WHERE ${tdomain}."workspaceId" = '${this.workspaceId.name}' AND ${tdomain}."_id" = update_data._id
`,
data
)
)
})
}
} catch (err: any) {
ctx.error('failed to update hash', { err })
} finally {
updateClient.release()
}
}
bulkUpdate.clear()
}
}
const workspaceId = this.workspaceId
async function * createBulk (projection: string, query: string, limit = 50): AsyncGenerator<Doc[]> {
const cursor = client
.unsafe(`SELECT ${projection} FROM ${tdomain} WHERE "workspaceId" = '${workspaceId.name}' AND ${query}`)
.cursor(limit)
try {
for await (const part of cursor) {
yield part.filter((it) => it != null).map((it) => parseDoc(it as any, schema))
}
} catch (err: any) {
ctx.error('failed to recieve data', { err })
}
}
let bulk: AsyncGenerator<Doc[]>
let forcedRecheck = false
return {
next: async () => {
if (!initialized) {
if (recheck === true) {
await this.retryTxn(client, async (client) => {
await client`UPDATE ${client(translateDomain(domain))} SET '%hash%' = NULL WHERE "workspaceId" = ${this.workspaceId.name} AND '%hash%' IS NOT NULL`
})
if (client === undefined) {
client = await this.client.reserve()
}
await init('_id, data', "'%hash%' IS NOT NULL AND '%hash%' <> ''")
if (recheck === true) {
await this.retryTxn(
client,
(client) =>
client`UPDATE ${client(tdomain)} SET "%hash%" = NULL WHERE "workspaceId" = ${this.workspaceId.name} AND "%hash%" IS NOT NULL`
)
}
initialized = true
await flush(true) // We need to flush, so wrong id documents will be updated.
bulk = createBulk('_id, "%hash%"', '"%hash%" IS NOT NULL AND "%hash%" <> \'\'')
// bulk = createBulk('_id, "%hash%, data', '"%hash%" IS NOT NULL AND "%hash%" <> \'\'')
}
let docs = await ctx.with('next', { mode }, () => next(50))
if (docs.length === 0 && mode === 'hashed') {
await close(cursorName)
let docs = await ctx.with('next', { mode }, () => bulk.next())
if (!forcedRecheck && docs.done !== true && docs.value?.length > 0) {
// Check if we have wrong hash stored, and update all of them.
forcedRecheck = true
for (const d of docs.value) {
const digest: string | null = (d as any)['%hash%']
const pos = (digest ?? '').indexOf('|')
if (pos === -1) {
await bulk.return([]) // We need to close generator
docs = { done: true, value: undefined }
await this.retryTxn(
client,
(client) =>
client`UPDATE ${client(tdomain)} SET "%hash%" = NULL WHERE "workspaceId" = ${this.workspaceId.name} AND "%hash%" IS NOT NULL`
)
break
}
}
}
if ((docs.done === true || docs.value.length === 0) && mode === 'hashed') {
forcedRecheck = true
mode = 'non_hashed'
await init('*', "'%hash%' IS NULL OR '%hash%' = ''")
docs = await ctx.with('next', { mode }, () => next(50))
bulk = createBulk('*', '"%hash%" IS NULL OR "%hash%" = \'\'')
docs = await ctx.with('next', { mode }, () => bulk.next())
}
if (docs.length === 0) {
if (docs.done === true || docs.value.length === 0) {
return []
}
const result: DocInfo[] = []
for (const d of docs) {
let digest: string | null = (d as any)['%hash%']
if ('%hash%' in d) {
delete d['%hash%']
}
const pos = (digest ?? '').indexOf('|')
if (digest == null || digest === '') {
const cs = ctx.newChild('calc-size', {})
const size = estimateDocSize(d)
cs.end()
const hash = createHash('sha256')
updateHashForDoc(hash, d)
digest = hash.digest('base64')
bulkUpdate.set(d._id, `${digest}|${size.toString(16)}`)
await ctx.with('flush', {}, () => flush())
result.push({
id: d._id,
hash: digest,
size
})
} else {
result.push({
id: d._id,
hash: digest.slice(0, pos),
size: parseInt(digest.slice(pos + 1), 16)
})
}
for (const d of docs.value) {
result.push(toDocInfo(d, bulkUpdate))
}
await ctx.with('flush', {}, () => flush())
return result
},
close: async () => {
await ctx.with('flush', {}, () => flush(true))
await close(cursorName)
client?.release()
ctx.end()
}
}
@ -1231,16 +1246,16 @@ abstract class PostgresAdapterBase implements DbAdapter {
if (docs.length === 0) {
return []
}
const connection = (await this.getConnection(ctx)) ?? this.client
const res =
await connection`SELECT * FROM ${connection(translateDomain(domain))} WHERE _id = ANY(${docs}) AND "workspaceId" = ${this.workspaceId.name}`
return res.map((p) => parseDocWithProjection(p as any, domain))
return await this.withConnection(ctx, async (connection) => {
const res =
await connection`SELECT * FROM ${connection(translateDomain(domain))} WHERE _id = ANY(${docs}) AND "workspaceId" = ${this.workspaceId.name}`
return res.map((p) => parseDocWithProjection(p as any, domain))
})
})
}
upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise<void> {
return ctx.with('upload', { domain }, async (ctx) => {
const arr = docs.concat()
const fields = getDocFieldsByDomains(domain)
const filedsWithData = [...fields, 'data']
const insertFields: string[] = []
@ -1252,15 +1267,26 @@ abstract class PostgresAdapterBase implements DbAdapter {
const insertStr = insertFields.join(', ')
const onConflictStr = onConflict.join(', ')
await this.withConnection(ctx, async (connection) => {
while (arr.length > 0) {
const part = arr.splice(0, 500)
const domainFields = new Set(getDocFieldsByDomains(domain))
const toUpload = [...docs]
const tdomain = translateDomain(domain)
while (toUpload.length > 0) {
const part = toUpload.splice(0, 200)
const values: any[] = []
const vars: string[] = []
let index = 1
for (let i = 0; i < part.length; i++) {
const doc = part[i]
const variables: string[] = []
const d = convertDoc(domain, doc, this.workspaceId.name)
const digest: string | null = (doc as any)['%hash%']
if ('%hash%' in doc) {
delete doc['%hash%']
}
const size = digest != null ? estimateDocSize(doc) : 0
;(doc as any)['%hash%'] = digest == null ? null : `${digest}|${size.toString(16)}`
const d = convertDoc(domain, doc, this.workspaceId.name, domainFields)
values.push(d.workspaceId)
variables.push(`$${index++}`)
for (const field of fields) {
@ -1273,21 +1299,36 @@ abstract class PostgresAdapterBase implements DbAdapter {
}
const vals = vars.join(',')
await this.retryTxn(connection, async (client) => {
await client.unsafe(
`INSERT INTO ${translateDomain(domain)} ("workspaceId", ${insertStr}) VALUES ${vals}
await this.retryTxn(connection, (client) =>
client.unsafe(
`INSERT INTO ${tdomain} ("workspaceId", ${insertStr}) VALUES ${vals}
ON CONFLICT ("workspaceId", _id) DO UPDATE SET ${onConflictStr};`,
values
)
})
)
}
})
})
}
async clean (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<void> {
const connection = (await this.getConnection(ctx)) ?? this.client
await connection`DELETE FROM ${connection(translateDomain(domain))} WHERE _id = ANY(${docs}) AND "workspaceId" = ${this.workspaceId.name}`
const updateClient = await this.client.reserve()
try {
const tdomain = translateDomain(domain)
const toClean = [...docs]
while (toClean.length > 0) {
const part = toClean.splice(0, 200)
await ctx.with('clean', {}, () => {
return this.retryTxn(
updateClient,
(client) =>
client`DELETE FROM ${client(tdomain)} WHERE _id = ANY(${part}) AND "workspaceId" = ${this.workspaceId.name}`
)
})
}
} finally {
updateClient.release()
}
}
groupBy<T, P extends Doc>(
@ -1318,8 +1359,10 @@ abstract class PostgresAdapterBase implements DbAdapter {
try {
const res =
await client`SELECT * FROM ${client(translateDomain(domain))} WHERE _id = ANY(${ids}) AND "workspaceId" = ${this.workspaceId.name} FOR UPDATE`
const docs = res.map((p) => parseDoc(p as any, domain))
const schema = getSchema(domain)
const docs = res.map((p) => parseDoc(p as any, schema))
const map = new Map(docs.map((d) => [d._id, d]))
const domainFields = new Set(getDocFieldsByDomains(domain))
for (const [_id, ops] of operations) {
const doc = map.get(_id)
if (doc === undefined) continue
@ -1328,7 +1371,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
;(op as any)['%hash%'] = null
}
TxProcessor.applyUpdate(doc, op)
const converted = convertDoc(domain, doc, this.workspaceId.name)
const converted = convertDoc(domain, doc, this.workspaceId.name, domainFields)
const columns: string[] = []
const { extractedFields, remainingData } = parseUpdate(domain, op)
@ -1361,12 +1404,13 @@ abstract class PostgresAdapterBase implements DbAdapter {
columns.push(field)
}
await this.withConnection(ctx, async (connection) => {
const domainFields = new Set(getDocFieldsByDomains(domain))
while (docs.length > 0) {
const part = docs.splice(0, 500)
const values: DBDoc[] = []
for (let i = 0; i < part.length; i++) {
const doc = part[i]
const d = convertDoc(domain, doc, this.workspaceId.name)
const d = convertDoc(domain, doc, this.workspaceId.name, domainFields)
values.push(d)
}
await this.retryTxn(connection, async (client) => {
@ -1559,14 +1603,14 @@ class PostgresAdapter extends PostgresAdapterBase {
_id: Ref<Doc>,
forUpdate: boolean = false
): Promise<Doc | undefined> {
const domain = this.hierarchy.getDomain(_class)
return ctx.with('find-doc', { _class }, async () => {
const res =
await client`SELECT * FROM ${this.client(translateDomain(this.hierarchy.getDomain(_class)))} WHERE _id = ${_id} AND "workspaceId" = ${this.workspaceId.name} ${
await client`SELECT * FROM ${this.client(translateDomain(domain))} WHERE _id = ${_id} AND "workspaceId" = ${this.workspaceId.name} ${
forUpdate ? client` FOR UPDATE` : client``
}`
const dbDoc = res[0]
const domain = this.hierarchy.getDomain(_class)
return dbDoc !== undefined ? parseDoc(dbDoc as any, domain) : undefined
return dbDoc !== undefined ? parseDoc(dbDoc as any, getSchema(domain)) : undefined
})
}
@ -1622,7 +1666,7 @@ class PostgresTxAdapter extends PostgresAdapterBase implements TxAdapter {
const res = await this
.client`SELECT * FROM ${this.client(translateDomain(DOMAIN_MODEL_TX))} WHERE "workspaceId" = ${this.workspaceId.name} ORDER BY _id ASC, "modifiedOn" ASC`
const model = res.map((p) => parseDoc<Tx>(p as any, DOMAIN_MODEL_TX))
const model = res.map((p) => parseDoc<Tx>(p as any, getSchema(DOMAIN_MODEL_TX)))
// We need to put all core.account.System transactions first
const systemTx: Tx[] = []
const userTx: Tx[] = []

View File

@ -195,7 +195,6 @@ class PostgresClientReferenceImpl {
this.onclose()
const cl = await this.client
await cl.end()
console.log('Closed postgres connection')
})()
}
}
@ -261,7 +260,12 @@ export function getDBClient (connectionString: string, database?: string): Postg
return new ClientRef(existing)
}
export function convertDoc<T extends Doc> (domain: string, doc: T, workspaceId: string): DBDoc {
export function convertDoc<T extends Doc> (
domain: string,
doc: T,
workspaceId: string,
domainFields?: Set<string>
): DBDoc {
const extractedFields: Doc & Record<string, any> = {
_id: doc._id,
space: doc.space,
@ -273,9 +277,15 @@ export function convertDoc<T extends Doc> (domain: string, doc: T, workspaceId:
}
const remainingData: Partial<T> = {}
const extractedFieldsKeys = new Set(Object.keys(extractedFields))
domainFields = domainFields ?? new Set(getDocFieldsByDomains(domain))
for (const key in doc) {
if (Object.keys(extractedFields).includes(key)) continue
if (getDocFieldsByDomains(domain).includes(key)) {
if (extractedFieldsKeys.has(key)) {
continue
}
if (domainFields.has(key)) {
extractedFields[key] = doc[key]
} else {
remainingData[key] = doc[key]
@ -432,8 +442,7 @@ export function parseDocWithProjection<T extends Doc> (
return res
}
export function parseDoc<T extends Doc> (doc: DBDoc, domain: string): T {
const schema = getSchema(domain)
export function parseDoc<T extends Doc> (doc: DBDoc, schema: Schema): T {
const { workspaceId, data, ...rest } = doc
for (const key in rest) {
if ((rest as any)[key] === 'NULL' || (rest as any)[key] === null) {

View File

@ -68,10 +68,6 @@ export function getTxAdapterFactory (
workspace: WorkspaceIdWithUrl,
branding: Branding | null,
opt: {
fullTextUrl: string
rekoniUrl: string
indexProcessing: number // 1000
indexParallel: number // 2
disableTriggers?: boolean
usePassedCtx?: boolean
@ -199,19 +195,23 @@ export async function getServerPipeline (
ctx: MeasureContext,
model: Tx[],
dbUrl: string,
wsUrl: WorkspaceIdWithUrl
wsUrl: WorkspaceIdWithUrl,
opt?: {
storageConfig: string
disableTriggers?: boolean
}
): Promise<{
pipeline: Pipeline
storageAdapter: StorageAdapter
}> {
const storageConfig: StorageConfiguration = storageConfigFromEnv()
const storageConfig: StorageConfiguration = storageConfigFromEnv(opt?.storageConfig)
const storageAdapter = buildStorageFromConfig(storageConfig)
const pipelineFactory = createServerPipeline(ctx, dbUrl, model, {
externalStorage: storageAdapter,
usePassedCtx: true,
disableTriggers: false
disableTriggers: opt?.disableTriggers ?? false
})
try {

View File

@ -237,13 +237,14 @@ export class ClientSession implements Session {
return this.ops
}
async loadChunk (_ctx: ClientSessionCtx, domain: Domain, idx?: number, recheck?: boolean): Promise<void> {
async loadChunk (ctx: ClientSessionCtx, domain: Domain, idx?: number, recheck?: boolean): Promise<void> {
this.lastRequest = Date.now()
try {
const result = await this.getOps().loadChunk(_ctx.ctx, domain, idx, recheck)
await _ctx.sendResponse(result)
const result = await this.getOps().loadChunk(ctx.ctx, domain, idx, recheck)
await ctx.sendResponse(result)
} catch (err: any) {
await _ctx.sendResponse({ error: err.message })
await ctx.sendError('Failed to upload', unknownError(err))
ctx.ctx.error('failed to loadChunk', { domain, err })
}
}
@ -259,7 +260,8 @@ export class ClientSession implements Session {
const result = await this.getOps().loadDocs(ctx.ctx, domain, docs)
await ctx.sendResponse(result)
} catch (err: any) {
await ctx.sendResponse({ error: err.message })
await ctx.sendError('Failed to loadDocs', unknownError(err))
ctx.ctx.error('failed to loadDocs', { domain, err })
}
}
@ -271,7 +273,8 @@ export class ClientSession implements Session {
try {
await this.getOps().upload(ctx.ctx, domain, docs)
} catch (err: any) {
await ctx.sendResponse({ error: err.message })
await ctx.sendError('Failed to upload', unknownError(err))
ctx.ctx.error('failed to loadDocs', { domain, err })
return
}
await ctx.sendResponse({})
@ -285,7 +288,8 @@ export class ClientSession implements Session {
try {
await this.getOps().clean(ctx.ctx, domain, docs)
} catch (err: any) {
await ctx.sendResponse({ error: err.message })
await ctx.sendError('Failed to clean', unknownError(err))
ctx.ctx.error('failed to clean', { domain, err })
return
}
await ctx.sendResponse({})

View File

@ -243,8 +243,10 @@ class TSessionManager implements SessionManager {
}
}
for (const r of s[1].session.requests.values()) {
if (now - r.start > 30000) {
this.ctx.warn('request hang found, 30sec', {
const sec = Math.round((now - r.start) / 1000)
if (sec > 0 && sec % 30 === 0) {
this.ctx.warn('request hang found', {
sec,
wsId,
user: s[1].session.getUser(),
...r.params

View File

@ -16,7 +16,7 @@ import core, {
} from '@hcengineering/core'
import { consoleModelLogger, type MigrateOperation, type ModelLogger } from '@hcengineering/model'
import { getTransactorEndpoint } from '@hcengineering/server-client'
import { SessionDataImpl, type Pipeline, type StorageAdapter } from '@hcengineering/server-core'
import { SessionDataImpl, wrapPipeline, type Pipeline, type StorageAdapter } from '@hcengineering/server-core'
import {
getServerPipeline,
getTxAdapterFactory,
@ -26,51 +26,6 @@ import {
import { generateToken } from '@hcengineering/server-token'
import { initializeWorkspace, initModel, prepareTools, updateModel, upgradeModel } from '@hcengineering/server-tool'
function wrapPipeline (ctx: MeasureContext, pipeline: Pipeline, wsUrl: WorkspaceIdWithUrl): Client {
const sctx = new SessionDataImpl(
systemAccountEmail,
'backup',
true,
{ targets: {}, txes: [] },
wsUrl,
null,
true,
new Map(),
new Map(),
pipeline.context.modelDb
)
ctx.contextData = sctx
return {
findAll: async (_class, query, options) => {
return await pipeline.findAll(ctx, _class, query, options)
},
findOne: async (_class, query, options) => {
return (await pipeline.findAll(ctx, _class, query, { ...options, limit: 1 })).shift()
},
close: async () => {
await pipeline.close()
},
getHierarchy: () => {
return pipeline.context.hierarchy
},
getModel: () => {
return pipeline.context.modelDb
},
searchFulltext: async (query, options) => {
return {
docs: [],
total: 0
}
},
tx: async (tx) => {
return await pipeline.tx(ctx, [tx])
},
notify: (...tx) => {}
}
}
/**
* @public
*/
@ -125,10 +80,6 @@ export async function createWorkspace (
try {
const txFactory = getTxAdapterFactory(ctx, dbUrl, wsUrl, null, {
externalStorage: storageAdapter,
fullTextUrl: 'http://localhost:9200',
indexParallel: 0,
indexProcessing: 0,
rekoniUrl: '',
usePassedCtx: true
})
const txAdapter = await txFactory(ctx, hierarchy, dbUrl, wsId, modelDb, storageAdapter)

View File

@ -34,7 +34,7 @@ export class WorkspaceClient {
}
protected async initClient (): Promise<TxOperations> {
const token = generateToken(systemAccountEmail, this.workspace)
const token = generateToken(systemAccountEmail, this.workspace, { client: 'analytics' })
this.client = await connectPlatform(token)
return new TxOperations(this.client, core.account.System)

View File

@ -27,7 +27,7 @@ export async function createNotification (
const existing = await client.findOne(notification.class.CommonInboxNotification, {
user: data.user,
message: data.message,
props: data.props
...Object.fromEntries(Object.entries(data.props).map(([k, v]) => [`props.${k}`, v]))
})
if (existing !== undefined) {
await client.update(existing, {

View File

@ -1,10 +1,4 @@
export MINIO_ACCESS_KEY=minioadmin
export MINIO_SECRET_KEY=minioadmin
export MINIO_ENDPOINT=localhost:9000
export MONGO_URL=mongodb://localhost:27017
export ELASTIC_URL=http://localhost:9200
export SERVER_SECRET=secret
#!/usr/bin/env bash
# Restore workspace contents in mongo/elastic
./tool-local.sh backup-restore ./sanity-ws sanity-ws

View File

@ -1,4 +1,4 @@
export MODEL_VERSION=$(node ../common/scripts/show_version.js)
export MINIO_ACCESS_KEY=minioadmin
export MINIO_SECRET_KEY=minioadmin
export MINIO_ENDPOINT=localhost:9000

View File

@ -1,5 +1,6 @@
#!/usr/bin/env bash
export MODEL_VERSION=$(node ../common/scripts/show_version.js)
export MINIO_ACCESS_KEY=minioadmin
export MINIO_SECRET_KEY=minioadmin
export MINIO_ENDPOINT=localhost:9002

View File

@ -1,5 +1,6 @@
#!/usr/bin/env bash
export MODEL_VERSION=$(node ../common/scripts/show_version.js)
export MINIO_ACCESS_KEY=minioadmin
export MINIO_SECRET_KEY=minioadmin
export MINIO_ENDPOINT=localhost:9002

View File

@ -38,6 +38,6 @@
"dependencies": {
"aws4fetch": "^1.0.20",
"itty-router": "^5.0.18",
"postgres": "^3.4.4"
"postgres": "^3.4.5"
}
}