UBERF-8508: Get rid of Mongo in storage adapter (#6989)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-10-21 23:23:26 +07:00 committed by GitHub
parent 8edf748d30
commit 187c489b2c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
57 changed files with 653 additions and 1332 deletions

View File

@ -54,17 +54,11 @@ export function docImportTool (): void {
const uploadUrl = process.env.UPLOAD_URL ?? '/files'
const mongodbUri = process.env.MONGO_URL
if (mongodbUri === undefined) {
console.log('Please provide mongodb url')
process.exit(1)
}
setMetadata(serverClientPlugin.metadata.Endpoint, accountUrl)
setMetadata(serverToken.metadata.Secret, serverSecret)
async function withStorage (mongodbUri: string, f: (storageAdapter: StorageAdapter) => Promise<any>): Promise<void> {
const adapter = buildStorageFromConfig(storageConfigFromEnv(), mongodbUri)
async function withStorage (f: (storageAdapter: StorageAdapter) => Promise<any>): Promise<void> {
const adapter = buildStorageFromConfig(storageConfigFromEnv())
try {
await f(adapter)
} catch (err: any) {
@ -94,7 +88,7 @@ export function docImportTool (): void {
}, space: ${cmd.space}, backend: ${cmd.backend}`
)
await withStorage(mongodbUri, async (storageAdapter) => {
await withStorage(async (storageAdapter) => {
const workspaceId = getWorkspaceId(workspace)
const config: Config = {

View File

@ -106,7 +106,6 @@ services:
environment:
# - WS_OPERATION=create
- SERVER_SECRET=secret
- MONGO_URL=${MONGO_URL}
- DB_URL=${MONGO_URL}
# - DB_URL=postgresql://postgres:example@postgres:5432
- SES_URL=
@ -133,7 +132,6 @@ services:
environment:
# - WS_OPERATION=create
- SERVER_SECRET=secret
- MONGO_URL=${MONGO_URL}
- DB_URL=postgresql://postgres:example@postgres:5432
- SES_URL=
- REGION=pg
@ -161,8 +159,6 @@ services:
- COLLABORATOR_PORT=3078
- SECRET=secret
- ACCOUNTS_URL=http://host.docker.internal:3000
- MONGO_URL=${MONGO_URL}
- 'MONGO_OPTIONS={"appName":"collaborator","maxPoolSize":2}'
- STORAGE_CONFIG=${STORAGE_CONFIG}
restart: unless-stopped
front:
@ -179,11 +175,8 @@ services:
- 8087:8080
- 8088:8080
environment:
- UV_THREADPOOL_SIZE=10
- SERVER_PORT=8080
- SERVER_SECRET=secret
- MONGO_URL=${MONGO_URL}
- 'MONGO_OPTIONS={"appName":"front","maxPoolSize":1}'
- ACCOUNTS_URL=http://host.docker.internal:3000
- UPLOAD_URL=/files
- ELASTIC_URL=http://host.docker.internal:9200
@ -298,8 +291,6 @@ services:
- 4005:4005
environment:
- SECRET=secret
- MONGO_URL=${MONGO_URL}
- 'MONGO_OPTIONS={"appName":"print","maxPoolSize":1}'
- STORAGE_CONFIG=${STORAGE_CONFIG}
deploy:
resources:
@ -317,8 +308,6 @@ services:
- ../services/sign/pod-sign/debug/branding.json:/var/cfg/branding.json
environment:
- SECRET=secret
- MONGO_URL=${MONGO_URL}
- 'MONGO_OPTIONS={"appName":"sign","maxPoolSize":1}'
- MINIO_ENDPOINT=minio
- MINIO_ACCESS_KEY=minioadmin
- ACCOUNTS_URL=http://host.docker.internal:3000

View File

@ -72,7 +72,6 @@ addLocation(serverDriveId, () => import('@hcengineering/server-drive-resources')
addLocation(serverAiBotId, () => import('@hcengineering/server-ai-bot-resources'))
function prepareTools (): {
mongodbUri: string | undefined
dbUrl: string
txes: Tx[]
version: Data<Version>
@ -84,4 +83,13 @@ function prepareTools (): {
return { ...prepareToolsRaw(builder(enabled, disabled).getTxes()), version: getModelVersion(), migrateOperations }
}
export function getMongoDBUrl (): string {
const url = process.env.MONGO_URL
if (url === undefined) {
console.error('please provide mongo DB URL')
process.exit(1)
}
return url
}
devTool(prepareTools)

View File

@ -92,6 +92,7 @@ import { backupDownload } from '@hcengineering/server-backup/src/backup'
import type { StorageAdapter, StorageAdapterEx } from '@hcengineering/server-core'
import { deepEqual } from 'fast-equals'
import { createWriteStream, readFileSync } from 'fs'
import { getMongoDBUrl } from './__start'
import {
benchmark,
benchmarkWorker,
@ -117,7 +118,7 @@ import { moveAccountDbFromMongoToPG, moveFromMongoToPG, moveWorkspaceFromMongoTo
import { fixJsonMarkup, migrateMarkup, restoreLostMarkup } from './markup'
import { fixMixinForeignAttributes, showMixinForeignAttributes } from './mixin'
import { fixAccountEmails, renameAccount } from './renameAccount'
import { moveFiles, showLostFiles, syncFiles } from './storage'
import { moveFiles, showLostFiles } from './storage'
const colorConstants = {
colorRed: '\u001b[31m',
@ -136,7 +137,6 @@ const colorConstants = {
*/
export function devTool (
prepareTools: () => {
mongodbUri: string | undefined
dbUrl: string
txes: Tx[]
version: Data<Version>
@ -195,8 +195,8 @@ export function devTool (
await shutdown()
}
async function withStorage (dbUrl: string, f: (storageAdapter: StorageAdapter) => Promise<any>): Promise<void> {
const adapter = buildStorageFromConfig(storageConfigFromEnv(), dbUrl)
async function withStorage (f: (storageAdapter: StorageAdapter) => Promise<any>): Promise<void> {
const adapter = buildStorageFromConfig(storageConfigFromEnv())
try {
await f(adapter)
} catch (err: any) {
@ -263,11 +263,12 @@ export function devTool (
})
program
.command('compact-db')
.command('compact-db-mongo')
.description('compact all db collections')
.option('-w, --workspace <workspace>', 'A selected "workspace" only', '')
.action(async (cmd: { workspace: string }) => {
const { dbUrl, mongodbUri } = prepareTools()
const { dbUrl } = prepareTools()
const mongodbUri = getMongoDBUrl()
await withDatabase(dbUrl, async (db) => {
console.log('compacting db ...')
let gtotal: number = 0
@ -508,14 +509,14 @@ export function devTool (
})
program
.command('list-unused-workspaces')
.command('list-unused-workspaces-mongo')
.description(
'remove unused workspaces, please pass --remove to really delete them. Without it will only mark them disabled'
)
.option('-r|--remove [remove]', 'Force remove', false)
.option('-t|--timeout [timeout]', 'Timeout in days', '7')
.action(async (cmd: { remove: boolean, disable: boolean, exclude: string, timeout: string }) => {
const { dbUrl, mongodbUri } = prepareTools()
const { dbUrl } = prepareTools()
await withDatabase(dbUrl, async (db) => {
const workspaces = new Map((await listWorkspacesPure(db)).map((p) => [p._id.toString(), p]))
@ -523,8 +524,9 @@ export function devTool (
const _timeout = parseInt(cmd.timeout) ?? 7
await withStorage(dbUrl, async (adapter) => {
await withStorage(async (adapter) => {
// We need to update workspaces with missing workspaceUrl
const mongodbUri = getMongoDBUrl()
const client = getMongoClient(mongodbUri ?? dbUrl)
const _client = await client.getClient()
try {
@ -572,13 +574,14 @@ export function devTool (
})
program
.command('drop-workspace <name>')
.command('drop-workspace-mongo <name>')
.description('drop workspace')
.option('--full [full]', 'Force remove all data', false)
.action(async (workspace, cmd: { full: boolean }) => {
const { dbUrl, mongodbUri } = prepareTools()
const { dbUrl } = prepareTools()
const mongodbUri = getMongoDBUrl()
await withStorage(dbUrl, async (storageAdapter) => {
await withStorage(async (storageAdapter) => {
await withDatabase(dbUrl, async (db) => {
const ws = await getWorkspaceById(db, workspace)
if (ws === null) {
@ -601,12 +604,13 @@ export function devTool (
})
program
.command('drop-workspace-by-email <email>')
.command('drop-workspace-by-email-mongo <email>')
.description('drop workspace')
.option('--full [full]', 'Force remove all data', false)
.action(async (email, cmd: { full: boolean }) => {
const { dbUrl, mongodbUri } = prepareTools()
await withStorage(dbUrl, async (storageAdapter) => {
const { dbUrl } = prepareTools()
const mongodbUri = getMongoDBUrl()
await withStorage(async (storageAdapter) => {
await withDatabase(dbUrl, async (db) => {
const client = getMongoClient(mongodbUri ?? dbUrl)
const _client = await client.getClient()
@ -638,12 +642,13 @@ export function devTool (
})
program
.command('drop-workspace-last-visit')
.command('drop-workspace-last-visit-mongo')
.description('drop old workspaces')
.action(async (cmd: any) => {
const { dbUrl, mongodbUri } = prepareTools()
const { dbUrl } = prepareTools()
const mongodbUri = getMongoDBUrl()
await withStorage(dbUrl, async (storageAdapter) => {
await withStorage(async (storageAdapter) => {
await withDatabase(dbUrl, async (db) => {
const workspacesJSON = await listWorkspacesPure(db)
const client = getMongoClient(mongodbUri ?? dbUrl)
@ -706,11 +711,12 @@ export function devTool (
})
})
program.command('fix-person-accounts').action(async () => {
const { dbUrl, mongodbUri, version } = prepareTools()
program.command('fix-person-accounts-mongo').action(async () => {
const { dbUrl, version } = prepareTools()
const mongodbUri = getMongoDBUrl()
await withDatabase(dbUrl, async (db) => {
const ws = await listWorkspacesPure(db)
const client = getMongoClient(mongodbUri ?? dbUrl)
const client = getMongoClient(mongodbUri)
const _client = await client.getClient()
try {
for (const w of ws) {
@ -872,8 +878,7 @@ export function devTool (
.command('backup-s3 <bucketName> <dirName> <workspace>')
.description('dump workspace transactions and minio resources')
.action(async (bucketName: string, dirName: string, workspace: string, cmd) => {
const { dbUrl } = prepareTools()
await withStorage(dbUrl, async (adapter) => {
await withStorage(async (adapter) => {
const storage = await createStorageBackupStorage(toolCtx, adapter, getWorkspaceId(bucketName), dirName)
const wsid = getWorkspaceId(workspace)
const endpoint = await getTransactorEndpoint(generateToken(systemAccountEmail, wsid), 'external')
@ -1048,8 +1053,8 @@ export function devTool (
.command('diff-workspace <workspace>')
.description('restore workspace transactions and minio resources from previous dump.')
.action(async (workspace: string, cmd) => {
const { dbUrl, mongodbUri, txes } = prepareTools()
await diffWorkspace(mongodbUri ?? dbUrl, getWorkspaceId(workspace), txes)
const { dbUrl, txes } = prepareTools()
await diffWorkspace(dbUrl, getWorkspaceId(workspace), txes)
})
program
@ -1057,8 +1062,8 @@ export function devTool (
.description('clear telegram history')
.option('-w, --workspace <workspace>', 'target workspace')
.action(async (workspace: string, cmd) => {
const { dbUrl, mongodbUri } = prepareTools()
await withStorage(dbUrl, async (adapter) => {
const { dbUrl } = prepareTools()
await withStorage(async (adapter) => {
await withDatabase(dbUrl, async (db) => {
const telegramDB = process.env.TELEGRAM_DATABASE
if (telegramDB === undefined) {
@ -1067,7 +1072,7 @@ export function devTool (
}
console.log(`clearing ${workspace} history:`)
await clearTelegramHistory(toolCtx, mongodbUri ?? dbUrl, getWorkspaceId(workspace), telegramDB, adapter)
await clearTelegramHistory(toolCtx, dbUrl, getWorkspaceId(workspace), telegramDB, adapter)
})
})
})
@ -1076,8 +1081,8 @@ export function devTool (
.command('clear-telegram-all-history')
.description('clear telegram history')
.action(async (cmd) => {
const { dbUrl, mongodbUri } = prepareTools()
await withStorage(dbUrl, async (adapter) => {
const { dbUrl } = prepareTools()
await withStorage(async (adapter) => {
await withDatabase(dbUrl, async (db) => {
const telegramDB = process.env.TELEGRAM_DATABASE
if (telegramDB === undefined) {
@ -1089,7 +1094,7 @@ export function devTool (
for (const w of workspaces) {
console.log(`clearing ${w.workspace} history:`)
await clearTelegramHistory(toolCtx, mongodbUri ?? dbUrl, getWorkspaceId(w.workspace), telegramDB, adapter)
await clearTelegramHistory(toolCtx, dbUrl, getWorkspaceId(w.workspace), telegramDB, adapter)
}
})
})
@ -1116,18 +1121,17 @@ export function devTool (
.option('--tracker', 'Clean tracker', false)
.option('--removedTx', 'Clean removed transactions', false)
.action(async (workspace: string, cmd: { recruit: boolean, tracker: boolean, removedTx: boolean }) => {
const { dbUrl, mongodbUri } = prepareTools()
await withStorage(dbUrl, async (adapter) => {
const { dbUrl } = prepareTools()
await withStorage(async (adapter) => {
await withDatabase(dbUrl, async (db) => {
const wsid = getWorkspaceId(workspace)
const endpoint = await getTransactorEndpoint(generateToken(systemAccountEmail, wsid), 'external')
await cleanWorkspace(toolCtx, mongodbUri ?? dbUrl, wsid, adapter, getElasticUrl(), endpoint, cmd)
await cleanWorkspace(toolCtx, dbUrl, wsid, adapter, getElasticUrl(), endpoint, cmd)
})
})
})
program.command('clean-empty-buckets').action(async (cmd: any) => {
const { dbUrl } = prepareTools()
await withStorage(dbUrl, async (adapter) => {
await withStorage(async (adapter) => {
const buckets = await adapter.listBuckets(toolCtx)
for (const ws of buckets) {
const l = await ws.list()
@ -1145,39 +1149,33 @@ export function devTool (
program
.command('upload-file <workspace> <local> <remote> <contentType>')
.action(async (workspace: string, local: string, remote: string, contentType: string, cmd: any) => {
const { dbUrl } = prepareTools()
await withStorage(dbUrl, async (adapter) => {
const wsId: WorkspaceId = {
name: workspace
}
const token = generateToken(systemAccountEmail, wsId)
const endpoint = await getTransactorEndpoint(token)
const blobClient = new BlobClient(endpoint, token, wsId)
const buffer = readFileSync(local)
await blobClient.upload(toolCtx, remote, buffer.length, contentType, buffer)
})
const wsId: WorkspaceId = {
name: workspace
}
const token = generateToken(systemAccountEmail, wsId)
const endpoint = await getTransactorEndpoint(token)
const blobClient = new BlobClient(endpoint, token, wsId)
const buffer = readFileSync(local)
await blobClient.upload(toolCtx, remote, buffer.length, contentType, buffer)
})
program
.command('download-file <workspace> <remote> <local>')
.action(async (workspace: string, remote: string, local: string, cmd: any) => {
const { dbUrl } = prepareTools()
await withStorage(dbUrl, async (adapter) => {
const wsId: WorkspaceId = {
name: workspace
const wsId: WorkspaceId = {
name: workspace
}
const token = generateToken(systemAccountEmail, wsId)
const endpoint = await getTransactorEndpoint(token)
const blobClient = new BlobClient(endpoint, token, wsId)
const wrstream = createWriteStream(local)
await blobClient.writeTo(toolCtx, remote, -1, {
write: (buffer, cb) => {
wrstream.write(buffer, cb)
},
end: (cb) => {
wrstream.end(cb)
}
const token = generateToken(systemAccountEmail, wsId)
const endpoint = await getTransactorEndpoint(token)
const blobClient = new BlobClient(endpoint, token, wsId)
const wrstream = createWriteStream(local)
await blobClient.writeTo(toolCtx, remote, -1, {
write: (buffer, cb) => {
wrstream.write(buffer, cb)
},
end: (cb) => {
wrstream.end(cb)
}
})
})
})
@ -1197,14 +1195,14 @@ export function devTool (
const { dbUrl } = prepareTools()
await withDatabase(dbUrl, async (db) => {
await withStorage(dbUrl, async (adapter) => {
await withStorage(async (adapter) => {
try {
const exAdapter = adapter as StorageAdapterEx
if (exAdapter.adapters === undefined || exAdapter.adapters.size < 2) {
if (exAdapter.adapters === undefined || exAdapter.adapters.length < 2) {
throw new Error('bad storage config, at least two storage providers are required')
}
console.log('moving files to storage provider', exAdapter.defaultAdapter)
console.log('moving files to storage provider', exAdapter.adapters[0].name)
let index = 1
const workspaces = await listWorkspacesPure(db)
@ -1237,59 +1235,16 @@ export function devTool (
)
program
.command('sync-files')
.option('-w, --workspace <workspace>', 'Selected workspace only', '')
.option('--disabled', 'Include disabled workspaces', false)
.action(async (cmd: { workspace: string, disabled: boolean }) => {
const { dbUrl } = prepareTools()
await withDatabase(dbUrl, async (db) => {
await withStorage(dbUrl, async (adapter) => {
try {
const exAdapter = adapter as StorageAdapterEx
console.log('syncing files from storage provider')
let index = 1
const workspaces = await listWorkspacesPure(db)
workspaces.sort((a, b) => b.lastVisit - a.lastVisit)
for (const workspace of workspaces) {
if (workspace.disabled === true && !cmd.disabled) {
console.log('ignore disabled workspace', workspace.workspace)
continue
}
if (cmd.workspace !== '' && workspace.workspace !== cmd.workspace) {
continue
}
try {
console.log('start', workspace.workspace, index, '/', workspaces.length)
await syncFiles(toolCtx, getWorkspaceId(workspace.workspace), exAdapter)
console.log('done', workspace.workspace)
} catch (err) {
console.warn('failed to sync files', err)
}
index += 1
}
} catch (err: any) {
console.error(err)
}
})
})
})
program
.command('show-lost-files')
.command('show-lost-files-mongo')
.option('-w, --workspace <workspace>', 'Selected workspace only', '')
.option('--disabled', 'Include disabled workspaces', false)
.option('--all', 'Show all files', false)
.action(async (cmd: { workspace: string, disabled: boolean, all: boolean }) => {
const { dbUrl, mongodbUri } = prepareTools()
const { dbUrl } = prepareTools()
await withDatabase(dbUrl, async (db) => {
await withStorage(dbUrl, async (adapter) => {
const client = getMongoClient(mongodbUri ?? dbUrl)
await withStorage(async (adapter) => {
const mongodbUri = getMongoDBUrl()
const client = getMongoClient(mongodbUri)
const _client = await client.getClient()
try {
let index = 1
@ -1330,7 +1285,7 @@ export function devTool (
program.command('show-lost-markup <workspace>').action(async (workspace: string, cmd: any) => {
const { dbUrl } = prepareTools()
await withDatabase(dbUrl, async (db) => {
await withStorage(dbUrl, async (adapter) => {
await withStorage(async (adapter) => {
try {
const workspaceId = getWorkspaceId(workspace)
const token = generateToken(systemAccountEmail, workspaceId)
@ -1346,7 +1301,7 @@ export function devTool (
program.command('restore-lost-markup <workspace>').action(async (workspace: string, cmd: any) => {
const { dbUrl } = prepareTools()
await withDatabase(dbUrl, async (db) => {
await withStorage(dbUrl, async (adapter) => {
await withStorage(async (adapter) => {
try {
const workspaceId = getWorkspaceId(workspace)
const token = generateToken(systemAccountEmail, workspaceId)
@ -1360,8 +1315,7 @@ export function devTool (
})
program.command('fix-bw-workspace <workspace>').action(async (workspace: string) => {
const { dbUrl } = prepareTools()
await withStorage(dbUrl, async (adapter) => {
await withStorage(async (adapter) => {
await fixMinioBW(toolCtx, getWorkspaceId(workspace), adapter)
})
})
@ -1410,16 +1364,16 @@ export function devTool (
})
program
.command('mixin-fix-foreign-attributes <workspace>')
.command('mixin-fix-foreign-attributes-mongo <workspace>')
.description('mixin-fix-foreign-attributes')
.option('--mixin <mixin>', 'Mixin class', '')
.option('--property <property>', 'Property name', '')
.action(async (workspace: string, cmd: { mixin: string, property: string }) => {
const { dbUrl, mongodbUri } = prepareTools()
const mongodbUri = getMongoDBUrl()
const wsid = getWorkspaceId(workspace)
const token = generateToken(systemAccountEmail, wsid)
const endpoint = await getTransactorEndpoint(token)
await fixMixinForeignAttributes(mongodbUri ?? dbUrl, wsid, endpoint, cmd)
await fixMixinForeignAttributes(mongodbUri, wsid, endpoint, cmd)
})
program
@ -1551,36 +1505,36 @@ export function devTool (
})
program
.command('fix-skills <workspace> <step>')
.command('fix-skills-mongo <workspace> <step>')
.description('fix skills for workspace')
.action(async (workspace: string, step: string) => {
const { dbUrl, mongodbUri } = prepareTools()
const mongodbUri = getMongoDBUrl()
const wsid = getWorkspaceId(workspace)
const token = generateToken(systemAccountEmail, wsid)
const endpoint = await getTransactorEndpoint(token)
await fixSkills(mongodbUri ?? dbUrl, wsid, endpoint, step)
await fixSkills(mongodbUri, wsid, endpoint, step)
})
program
.command('restore-ats-types <workspace>')
.command('restore-ats-types-mongo <workspace>')
.description('Restore recruiting task types for workspace')
.action(async (workspace: string) => {
const { dbUrl, mongodbUri } = prepareTools()
const mongodbUri = getMongoDBUrl()
console.log('Restoring recruiting task types in workspace ', workspace, '...')
const wsid = getWorkspaceId(workspace)
const endpoint = await getTransactorEndpoint(generateToken(systemAccountEmail, wsid), 'external')
await restoreRecruitingTaskTypes(mongodbUri ?? dbUrl, wsid, endpoint)
await restoreRecruitingTaskTypes(mongodbUri, wsid, endpoint)
})
program
.command('restore-ats-types-2 <workspace>')
.command('restore-ats-types-2-mongo <workspace>')
.description('Restore recruiting task types for workspace 2')
.action(async (workspace: string) => {
const { dbUrl, mongodbUri } = prepareTools()
const mongodbUri = getMongoDBUrl()
console.log('Restoring recruiting task types in workspace ', workspace, '...')
const wsid = getWorkspaceId(workspace)
const endpoint = await getTransactorEndpoint(generateToken(systemAccountEmail, wsid), 'external')
await restoreHrTaskTypesFromUpdates(mongodbUri ?? dbUrl, wsid, endpoint)
await restoreHrTaskTypesFromUpdates(mongodbUri, wsid, endpoint)
})
program
@ -1591,33 +1545,32 @@ export function devTool (
.requiredOption('--attribute <attribute>')
.requiredOption('--type <type>', 'number | string')
.requiredOption('--value <value>')
.requiredOption('--domain <domain>')
.action(
async (
workspace: string,
cmd: { objectId: string, objectClass: string, type: string, attribute: string, value: string, domain: string }
) => {
const { dbUrl, mongodbUri } = prepareTools()
const wsid = getWorkspaceId(workspace)
const endpoint = await getTransactorEndpoint(generateToken(systemAccountEmail, wsid), 'external')
await updateField(mongodbUri ?? dbUrl, wsid, endpoint, cmd)
await updateField(wsid, endpoint, cmd)
}
)
program
.command('recreate-elastic-indexes <workspace>')
.command('recreate-elastic-indexes-mongo <workspace>')
.description('reindex workspace to elastic')
.action(async (workspace: string) => {
const { dbUrl, mongodbUri } = prepareTools()
const mongodbUri = getMongoDBUrl()
const wsid = getWorkspaceId(workspace)
await recreateElastic(mongodbUri ?? dbUrl, wsid)
await recreateElastic(mongodbUri, wsid)
})
program
.command('recreate-all-elastic-indexes')
.command('recreate-all-elastic-indexes-mongo')
.description('reindex elastic')
.action(async () => {
const { dbUrl, mongodbUri } = prepareTools()
const { dbUrl } = prepareTools()
const mongodbUri = getMongoDBUrl()
await withDatabase(dbUrl, async (db) => {
const workspaces = await listWorkspacesRaw(db)
@ -1630,28 +1583,29 @@ export function devTool (
})
program
.command('fix-json-markup <workspace>')
.command('fix-json-markup-mongo <workspace>')
.description('fixes double converted json markup')
.action(async (workspace: string) => {
const { dbUrl, mongodbUri } = prepareTools()
await withStorage(dbUrl, async (adapter) => {
const mongodbUri = getMongoDBUrl()
await withStorage(async (adapter) => {
const wsid = getWorkspaceId(workspace)
const endpoint = await getTransactorEndpoint(generateToken(systemAccountEmail, wsid), 'external')
await fixJsonMarkup(toolCtx, mongodbUri ?? dbUrl, adapter, wsid, endpoint)
await fixJsonMarkup(toolCtx, mongodbUri, adapter, wsid, endpoint)
})
})
program
.command('migrate-markup')
.command('migrate-markup-mongo')
.description('migrates collaborative markup to storage')
.option('-w, --workspace <workspace>', 'Selected workspace only', '')
.option('-c, --concurrency <concurrency>', 'Number of documents being processed concurrently', '10')
.action(async (cmd: { workspace: string, concurrency: string }) => {
const { dbUrl, mongodbUri, txes } = prepareTools()
const { dbUrl, txes } = prepareTools()
const mongodbUri = getMongoDBUrl()
await withDatabase(dbUrl, async (db) => {
await withStorage(dbUrl, async (adapter) => {
await withStorage(async (adapter) => {
const workspaces = await listWorkspacesPure(db)
const client = getMongoClient(mongodbUri ?? dbUrl)
const client = getMongoClient(mongodbUri)
const _client = await client.getClient()
let index = 0
try {
@ -1671,7 +1625,7 @@ export function devTool (
registerServerPlugins()
registerStringLoaders()
const { pipeline } = await getServerPipeline(toolCtx, txes, mongodbUri ?? dbUrl, dbUrl, wsUrl)
const { pipeline } = await getServerPipeline(toolCtx, txes, dbUrl, wsUrl)
await migrateMarkup(toolCtx, adapter, wsId, _client, pipeline, parseInt(cmd.concurrency))
@ -1686,20 +1640,18 @@ export function devTool (
})
program
.command('remove-duplicates-ids <workspaces>')
.command('remove-duplicates-ids-mongo <workspaces>')
.description('remove duplicates ids for futue migration')
.action(async (workspaces: string) => {
const { dbUrl, mongodbUri } = prepareTools()
await withStorage(dbUrl, async (adapter) => {
await removeDuplicateIds(toolCtx, mongodbUri ?? dbUrl, adapter, accountsUrl, workspaces)
const mongodbUri = getMongoDBUrl()
await withStorage(async (adapter) => {
await removeDuplicateIds(toolCtx, mongodbUri, adapter, accountsUrl, workspaces)
})
})
program.command('move-to-pg <region>').action(async (region: string) => {
const { dbUrl, mongodbUri } = prepareTools()
if (mongodbUri === undefined) {
throw new Error('mongodbUri is not set')
}
const { dbUrl } = prepareTools()
const mongodbUri = getMongoDBUrl()
await withDatabase(mongodbUri, async (db) => {
const workspaces = await listWorkspacesRaw(db)
@ -1715,10 +1667,8 @@ export function devTool (
})
program.command('move-workspace-to-pg <workspace> <region>').action(async (workspace: string, region: string) => {
const { dbUrl, mongodbUri } = prepareTools()
if (mongodbUri === undefined) {
throw new Error('mongodbUri is not set')
}
const { dbUrl } = prepareTools()
const mongodbUri = getMongoDBUrl()
await withDatabase(mongodbUri, async (db) => {
const workspaceInfo = await getWorkspaceById(db, workspace)
@ -1733,11 +1683,8 @@ export function devTool (
})
program.command('move-account-db-to-pg').action(async () => {
const { dbUrl, mongodbUri } = prepareTools()
if (mongodbUri === undefined) {
throw new Error('MONGO_URL is not set')
}
const { dbUrl } = prepareTools()
const mongodbUri = getMongoDBUrl()
if (mongodbUri === dbUrl) {
throw new Error('MONGO_URL and DB_URL are the same')

View File

@ -16,7 +16,12 @@
import { type Attachment } from '@hcengineering/attachment'
import { type Blob, type MeasureContext, type Ref, type WorkspaceId, RateLimiter } from '@hcengineering/core'
import { DOMAIN_ATTACHMENT } from '@hcengineering/model-attachment'
import { type ListBlobResult, type StorageAdapter, type StorageAdapterEx } from '@hcengineering/server-core'
import {
type ListBlobResult,
type StorageAdapter,
type StorageAdapterEx,
type UploadedObjectInfo
} from '@hcengineering/server-core'
import { type Db } from 'mongodb'
import { PassThrough } from 'stream'
@ -25,54 +30,6 @@ export interface MoveFilesParams {
move: boolean
}
export async function syncFiles (
ctx: MeasureContext,
workspaceId: WorkspaceId,
exAdapter: StorageAdapterEx
): Promise<void> {
if (exAdapter.adapters === undefined) return
for (const [name, adapter] of [...exAdapter.adapters.entries()].reverse()) {
await adapter.make(ctx, workspaceId)
await retryOnFailure(ctx, 5, async () => {
let time = Date.now()
let count = 0
const iterator = await adapter.listStream(ctx, workspaceId)
try {
while (true) {
const dataBulk = await iterator.next()
if (dataBulk.length === 0) break
for (const data of dataBulk) {
const blob = await exAdapter.stat(ctx, workspaceId, data._id)
if (blob !== undefined) {
if (blob.provider !== name && name === exAdapter.defaultAdapter) {
await exAdapter.syncBlobFromStorage(ctx, workspaceId, data._id, exAdapter.defaultAdapter)
}
continue
}
await exAdapter.syncBlobFromStorage(ctx, workspaceId, data._id, name)
count += 1
if (count % 100 === 0) {
const duration = Date.now() - time
time = Date.now()
console.log('...processed', count, Math.round(duration / 1000) + 's')
}
}
}
console.log('processed', count)
} finally {
await iterator.close()
}
})
}
}
export async function moveFiles (
ctx: MeasureContext,
workspaceId: WorkspaceId,
@ -81,15 +38,13 @@ export async function moveFiles (
): Promise<void> {
if (exAdapter.adapters === undefined) return
const target = exAdapter.adapters.get(exAdapter.defaultAdapter)
const target = exAdapter.adapters[0].adapter
if (target === undefined) return
// We assume that the adapter moves all new files to the default adapter
await target.make(ctx, workspaceId)
for (const [name, adapter] of exAdapter.adapters.entries()) {
if (name === exAdapter.defaultAdapter) continue
for (const { name, adapter } of exAdapter.adapters.slice(1).reverse()) {
console.log('moving from', name, 'limit', 'concurrency', params.concurrency)
// we attempt retry the whole process in case of failure
@ -192,14 +147,9 @@ async function processAdapter (
}
for (const data of dataBulk) {
let targetBlob: Blob | ListBlobResult | undefined = targetBlobs.get(data._id)
const targetBlob: Blob | ListBlobResult | undefined = targetBlobs.get(data._id)
if (targetBlob !== undefined) {
console.log('Target blob already exists', targetBlob._id)
const aggrBlob = await exAdapter.stat(ctx, workspaceId, data._id)
if (aggrBlob === undefined || aggrBlob?.provider !== targetBlob.provider) {
targetBlob = await exAdapter.syncBlobFromStorage(ctx, workspaceId, targetBlob._id, exAdapter.defaultAdapter)
}
// We could safely delete source blob
toRemove.push(data._id)
}
@ -211,15 +161,13 @@ async function processAdapter (
console.error('blob not found', data._id)
continue
}
targetBlob = await rateLimiter.exec(async () => {
const info = await rateLimiter.exec(async () => {
try {
const result = await retryOnFailure(
ctx,
5,
async () => {
await processFile(ctx, source, target, workspaceId, sourceBlob)
// We need to sync and update aggregator table for now.
return await exAdapter.syncBlobFromStorage(ctx, workspaceId, sourceBlob._id, exAdapter.defaultAdapter)
return await processFile(ctx, source, target, workspaceId, sourceBlob)
},
50
)
@ -232,8 +180,8 @@ async function processAdapter (
}
})
if (targetBlob !== undefined) {
// We could safely delete source blob
// We could safely delete source blob
if (info !== undefined) {
toRemove.push(sourceBlob._id)
}
processedBytes += sourceBlob.size
@ -266,14 +214,14 @@ async function processFile (
target: Pick<StorageAdapter, 'put'>,
workspaceId: WorkspaceId,
blob: Blob
): Promise<void> {
): Promise<UploadedObjectInfo> {
const readable = await source.get(ctx, workspaceId, blob._id)
try {
readable.on('end', () => {
readable.destroy()
})
const stream = readable.pipe(new PassThrough())
await target.put(ctx, workspaceId, blob._id, stream, blob.contentType, blob.size)
return await target.put(ctx, workspaceId, blob._id, stream, blob.contentType, blob.size)
} finally {
readable.destroy()
}

View File

@ -17,6 +17,7 @@
import contact from '@hcengineering/contact'
import core, {
type BackupClient,
type Class,
type Client as CoreClient,
type Doc,
DOMAIN_DOC_INDEX_STATE,
@ -72,7 +73,6 @@ export async function diffWorkspace (mongoUrl: string, workspace: WorkspaceId, r
}
export async function updateField (
mongoUrl: string,
workspaceId: WorkspaceId,
transactorUrl: string,
cmd: { objectId: string, objectClass: string, type: string, attribute: string, value: string, domain: string }
@ -80,19 +80,18 @@ export async function updateField (
const connection = (await connect(transactorUrl, workspaceId, undefined, {
mode: 'backup'
})) as unknown as CoreClient & BackupClient
const client = getMongoClient(mongoUrl)
let valueToPut: string | number = cmd.value
if (cmd.type === 'number') valueToPut = parseFloat(valueToPut)
try {
const _client = await client.getClient()
try {
const db = getWorkspaceMongoDB(_client, workspaceId)
await db
.collection(cmd.domain)
.updateOne({ _id: cmd.objectId as Ref<Doc> }, { $set: { [cmd.attribute]: valueToPut } })
} finally {
client.close()
const doc = await connection.findOne(cmd.objectClass as Ref<Class<Doc>>, { _id: cmd.objectId as Ref<Doc> })
if (doc === undefined) {
console.error('Document not found')
process.exit(1)
}
let valueToPut: string | number = cmd.value
if (cmd.type === 'number') valueToPut = parseFloat(valueToPut)
;(doc as any)[cmd.attribute] = valueToPut
await connection.upload(connection.getHierarchy().getDomain(doc?._class), [doc])
} finally {
await connection.close()
}

View File

@ -210,17 +210,17 @@ async function createDocumentCategories (tx: TxOperations): Promise<void> {
{ code: 'CM', title: 'Client Management' }
]
await Promise.all(
categories.map((c) =>
createOrUpdate(
tx,
documents.class.DocumentCategory,
documents.space.QualityDocuments,
{ ...c, attachments: 0 },
((documents.category.DOC as string) + ' - ' + c.code) as Ref<DocumentCategory>
)
const ops = tx.apply()
for (const c of categories) {
await createOrUpdate(
ops,
documents.class.DocumentCategory,
documents.space.QualityDocuments,
{ ...c, attachments: 0 },
((documents.category.DOC as string) + ' - ' + c.code) as Ref<DocumentCategory>
)
)
}
await ops.commit()
}
async function createTagCategories (tx: TxOperations): Promise<void> {

View File

@ -551,8 +551,6 @@ export interface Blob extends Doc {
// Provider
provider: string
// A provider specific id
storageId: string
// A content type for blob
contentType: string
// A etag for blob
etag: string

View File

@ -69,16 +69,13 @@ export interface StorageAdapter {
getUrl: (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string) => Promise<string>
}
export interface StorageAdapterEx extends StorageAdapter {
defaultAdapter: string
adapters?: Map<string, StorageAdapter>
export interface NamedStorageAdapter {
name: string
adapter: StorageAdapter
}
syncBlobFromStorage: (
ctx: MeasureContext,
workspaceId: WorkspaceId,
objectName: string,
provider?: string
) => Promise<Blob>
export interface StorageAdapterEx extends StorageAdapter {
adapters?: NamedStorageAdapter[]
find: (ctx: MeasureContext, workspaceId: WorkspaceId) => StorageIterator
}
@ -187,7 +184,7 @@ export async function removeAllObjects (
break
}
for (const obj of objs) {
bulk.push(obj.storageId)
bulk.push(obj._id)
if (bulk.length > 50) {
await storage.remove(ctx, workspaceId, bulk)
bulk = []

View File

@ -13,6 +13,7 @@
// limitations under the License.
-->
<script lang="ts">
import { Analytics } from '@hcengineering/analytics'
import { Channel, Person, getName } from '@hcengineering/contact'
import core, {
ArrOf,
@ -122,15 +123,17 @@
}
await client.update(targetPerson, _update)
}
const ops = client.apply()
for (const channel of resultChannels.values()) {
if (channel.attachedTo === targetPerson._id) continue
await client.update(channel, { attachedTo: targetPerson._id })
await ops.update(channel, { attachedTo: targetPerson._id })
}
for (const old of oldChannels) {
if (!(enabledChannels.get(old._id) ?? true)) {
await client.remove(old)
await ops.remove(old)
}
}
for (const mixin in mixinUpdate) {
const attrs = (mixinUpdate as any)[mixin]
if (Object.keys(attrs).length > 0) {
@ -151,6 +154,7 @@
)
}
}
await ops.commit()
await updateAllRefs(client, sourcePerson, targetPerson)
dispatch('close')
@ -249,28 +253,31 @@
if (attr.name === '_id') {
continue
}
const to = attr.type as RefTo<Doc>
const descendants = h.getDescendants(attr.attributeOf)
for (const d of descendants) {
if (h.isDerived(d, core.class.Tx) || h.isDerived(d, core.class.BenchmarkDoc)) {
continue
}
if (h.findDomain(d) !== undefined) {
while (true) {
const values = await client.findAll(d, { [attr.name]: sourceAccount._id }, { limit: 100 })
if (values.length === 0) {
break
}
try {
const descendants = h.getDescendants(attr.attributeOf)
for (const d of descendants) {
if (h.isDerived(d, core.class.Tx) || h.isDerived(d, core.class.BenchmarkDoc)) {
continue
}
if (h.findDomain(d) !== undefined) {
while (true) {
const values = await client.findAll(d, { [attr.name]: sourceAccount._id }, { limit: 100 })
if (values.length === 0) {
break
}
const builder = client.apply(sourceAccount._id)
for (const v of values) {
await updateAttribute(builder, v, d, { key: attr.name, attr }, targetAccount._id)
}
if (builder.txes.length > 0) {
await builder.commit()
const builder = client.apply(sourceAccount._id)
for (const v of values) {
await updateAttribute(builder, v, d, { key: attr.name, attr }, targetAccount._id)
}
if (builder.txes.length > 0) {
await builder.commit()
}
}
}
}
} catch (err: any) {
Analytics.handleError(err)
}
}
const arrs = (await client.findAll(core.class.Attribute, { 'type._class': core.class.ArrOf })).filter((it) => {

View File

@ -1,7 +1,8 @@
<script lang="ts">
import { Metrics } from '@hcengineering/core'
import { Metrics, type MetricsData } from '@hcengineering/core'
import { getEmbeddedLabel } from '@hcengineering/platform'
import { Button, Expandable, showPopup } from '@hcengineering/ui'
import DropdownLabels from '@hcengineering/ui/src/components/DropdownLabels.svelte'
import { FixedColumn } from '@hcengineering/view-resources'
import Params from './Params.svelte'
@ -23,8 +24,29 @@
}
return `${Math.floor((time / ops) * 100) / 100}`
}
export let sortingOrder: 'avg' | 'ops' | 'total' = 'ops'
const sortOrder = [
{ id: 'avg', label: 'Average' },
{ id: 'ops', label: 'Operations' },
{ id: 'total', label: 'Total' }
]
const getSorted = (v: Record<string, MetricsData>, sortingOrder: 'avg' | 'ops' | 'total') => {
if (sortingOrder === 'avg') {
return Object.entries(v).sort((a, b) => b[1].value / (b[1].operations + 1) - a[1].value / (a[1].operations + 1))
} else if (sortingOrder === 'ops') {
return Object.entries(v).sort((a, b) => b[1].operations + 1 - (a[1].operations + 1))
} else {
return Object.entries(v).sort((a, b) => b[1].value - a[1].value)
}
}
</script>
{#if level === 0}
<div class="p-1 flex flex-grow flex-reverse">
<DropdownLabels bind:selected={sortingOrder} items={sortOrder}></DropdownLabels>
</div>
{/if}
<Expandable
expanded={level === 0}
expandable={level !== 0 && haschilds}
@ -93,12 +115,12 @@
{/if}
{#each Object.entries(metrics.measurements) as [k, v], i (k)}
<div style:margin-left={`${level * 0.5}rem`}>
<svelte:self metrics={v} name="{i}. {k}" level={level + 1} />
<svelte:self metrics={v} name="{i}. {k}" level={level + 1} {sortingOrder} />
</div>
{/each}
{#each Object.entries(metrics.params) as [k, v], i}
<div style:margin-left={`${level * 0.5}rem`}>
{#each Object.entries(v).sort((a, b) => b[1].value / (b[1].operations + 1) - a[1].value / (a[1].operations + 1)) as [kk, vv]}
{#each getSorted(v, sortingOrder) as [kk, vv]}
{@const childExpandable =
vv.topResult !== undefined &&
vv.topResult.length > 0 &&

View File

@ -67,8 +67,8 @@ startBackup(
})
return factory
},
(ctx, dbUrls, workspace, branding, externalStorage) => {
return getConfig(ctx, dbUrls, workspace, branding, ctx, {
(ctx, dbUrl, workspace, branding, externalStorage) => {
return getConfig(ctx, dbUrl, workspace, branding, ctx, {
externalStorage,
fullTextUrl: '',
indexParallel: 0,

View File

@ -59,7 +59,7 @@ setMetadata(serverCore.metadata.ElasticIndexVersion, 'v1')
setMetadata(serverTelegram.metadata.BotUrl, process.env.TELEGRAM_BOT_URL)
setMetadata(serverAiBot.metadata.SupportWorkspaceId, process.env.SUPPORT_WORKSPACE)
const shutdown = start(config.url, {
const shutdown = start(config.dbUrl, {
fullTextUrl: config.elasticUrl,
storageConfig,
rekoniUrl: config.rekoniUrl,
@ -73,7 +73,8 @@ const shutdown = start(config.url, {
profiling: {
start: profileStart,
stop: profileStop
}
},
mongoUrl: config.mongoUrl
})
const close = (): void => {

View File

@ -35,7 +35,7 @@ registerStringLoaders()
* @public
*/
export function start (
dbUrls: string,
dbUrl: string,
opt: {
fullTextUrl: string
storageConfig: StorageConfiguration
@ -55,30 +55,32 @@ export function start (
start: () => void
stop: () => Promise<string | undefined>
}
mongoUrl?: string
}
): () => Promise<void> {
const metrics = getMetricsContext()
registerServerPlugins()
const [mainDbUrl, rawDbUrl] = dbUrls.split(';')
const externalStorage = buildStorageFromConfig(opt.storageConfig, rawDbUrl ?? mainDbUrl)
const externalStorage = buildStorageFromConfig(opt.storageConfig)
const pipelineFactory = createServerPipeline(
metrics,
dbUrls,
dbUrl,
model,
{ ...opt, externalStorage, adapterSecurity: rawDbUrl !== undefined },
{
serviceAdapters: {
[serverAiBotId]: {
factory: createAIBotAdapter,
db: '%ai-bot',
url: rawDbUrl ?? mainDbUrl
{ ...opt, externalStorage, adapterSecurity: dbUrl.startsWith('postgresql') },
opt.mongoUrl !== undefined
? {
serviceAdapters: {
[serverAiBotId]: {
factory: createAIBotAdapter,
db: '%ai-bot',
url: opt.mongoUrl
}
}
}
}
}
: {}
)
const sessionFactory = (
token: Token,

View File

@ -20,7 +20,7 @@
"docker:abuild": "docker build -t hardcoreeng/workspace . --platform=linux/arm64 && ../../common/scripts/docker_tag_push.sh hardcoreeng/workspace",
"docker:staging": "../../common/scripts/docker_tag.sh hardcoreeng/workspace staging",
"docker:push": "../../common/scripts/docker_tag.sh hardcoreeng/workspace",
"run-local": "cross-env DB_URL=mongodb://localhost:27017 MONGO_URL=mongodb://localhost:27017 MINIO_ACCESS_KEY=minioadmi MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost SERVER_SECRET='secret' TRANSACTOR_URL=ws://localhost:3333 ts-node src/__start.ts",
"run-local": "cross-env DB_URL=mongodb://localhost:27017 MINIO_ACCESS_KEY=minioadmi MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost SERVER_SECRET='secret' TRANSACTOR_URL=ws://localhost:3333 ts-node src/__start.ts",
"format": "format src",
"test": "jest --passWithNoTests --silent --forceExit",
"_phase:build": "compile transpile src",

View File

@ -27,7 +27,7 @@ export function startBackup (
pipelineFactoryFactory: (mongoUrl: string, storage: StorageAdapter) => PipelineFactory,
getConfig: (
ctx: MeasureContext,
dbUrls: string,
dbUrl: string,
workspace: WorkspaceIdWithUrl,
branding: Branding | null,
externalStorage: StorageAdapter
@ -38,13 +38,12 @@ export function startBackup (
setMetadata(serverClientPlugin.metadata.UserAgent, config.ServiceID)
const mainDbUrl = config.DbURL
const rawDbUrl = config.MongoURL
const backupStorageConfig = storageConfigFromEnv(config.Storage)
const workspaceStorageConfig = storageConfigFromEnv(config.WorkspaceStorage)
const storageAdapter = createStorageFromConfig(backupStorageConfig.storages[0])
const workspaceStorageAdapter = buildStorageFromConfig(workspaceStorageConfig, rawDbUrl ?? mainDbUrl)
const workspaceStorageAdapter = buildStorageFromConfig(workspaceStorageConfig)
const pipelineFactory = pipelineFactoryFactory(mainDbUrl, workspaceStorageAdapter)

View File

@ -1856,7 +1856,7 @@ export async function restore (
chunks.push(chunk)
})
stream.on('end', () => {
const bf = Buffer.concat(chunks)
const bf = Buffer.concat(chunks as any)
const doc = JSON.parse(bf.toString()) as Doc
if (doc._class === core.class.Blob || doc._class === 'core:class:BlobData') {
const data = migradeBlobData(doc as Blob, changeset.get(doc._id) as string)
@ -2211,7 +2211,7 @@ export async function compactBackup (
chunks.push(chunk)
})
stream.on('end', () => {
const bf = Buffer.concat(chunks)
const bf = Buffer.concat(chunks as any)
const doc = JSON.parse(bf.toString()) as Doc
if (doc._class === core.class.Blob || doc._class === 'core:class:BlobData') {
const d = blobs.get(bname)
@ -2314,7 +2314,6 @@ function migradeBlobData (blob: Blob, etag: string): string {
if (blob._class === 'core:class:BlobData') {
const bd = blob as unknown as BlobData
blob.contentType = blob.contentType ?? bd.type
blob.storageId = bd._id
blob.etag = etag
blob._class = core.class.Blob
delete (blob as any).type

View File

@ -25,7 +25,6 @@ export interface Config {
Port: number
AccountsUrl: string
MongoUrl: string
}
const envMap: { [key in keyof Config]: string } = {
@ -33,11 +32,10 @@ const envMap: { [key in keyof Config]: string } = {
Secret: 'SECRET',
Interval: 'INTERVAL',
Port: 'COLLABORATOR_PORT',
AccountsUrl: 'ACCOUNTS_URL',
MongoUrl: 'MONGO_URL'
AccountsUrl: 'ACCOUNTS_URL'
}
const required: Array<keyof Config> = ['Secret', 'ServiceID', 'Port', 'AccountsUrl', 'MongoUrl']
const required: Array<keyof Config> = ['Secret', 'ServiceID', 'Port', 'AccountsUrl']
const config: Config = (() => {
const params: Partial<Config> = {
@ -45,8 +43,7 @@ const config: Config = (() => {
ServiceID: process.env[envMap.ServiceID] ?? 'collaborator-service',
Interval: parseInt(process.env[envMap.Interval] ?? '30000'),
Port: parseInt(process.env[envMap.Port] ?? '3078'),
AccountsUrl: process.env[envMap.AccountsUrl],
MongoUrl: process.env[envMap.MongoUrl]
AccountsUrl: process.env[envMap.AccountsUrl]
}
const missingEnv = required.filter((key) => params[key] === undefined).map((key) => envMap[key])

View File

@ -18,11 +18,11 @@ import { setMetadata } from '@hcengineering/platform'
import serverToken from '@hcengineering/server-token'
import type { MeasureContext } from '@hcengineering/core'
import serverClient from '@hcengineering/server-client'
import type { StorageConfiguration } from '@hcengineering/server-core'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import config from './config'
import { start } from './server'
import serverClient from '@hcengineering/server-client'
export async function startCollaborator (ctx: MeasureContext, onClose?: () => void): Promise<void> {
setMetadata(serverToken.metadata.Secret, config.Secret)
@ -30,7 +30,7 @@ export async function startCollaborator (ctx: MeasureContext, onClose?: () => vo
setMetadata(serverClient.metadata.Endpoint, config.AccountsUrl)
const storageConfig: StorageConfiguration = storageConfigFromEnv()
const storageAdapter = buildStorageFromConfig(storageConfig, config.MongoUrl)
const storageAdapter = buildStorageFromConfig(storageConfig)
const shutdown = await start(ctx, config, storageAdapter)

View File

@ -14,16 +14,15 @@
//
import {
type LowLevelStorage,
type Class,
type Doc,
type DocumentQuery,
type DocumentUpdate,
type Domain,
type FieldIndexConfig,
type FindOptions,
type FindResult,
type Hierarchy,
type LowLevelStorage,
type MeasureContext,
type ModelDb,
type Ref,
@ -56,40 +55,6 @@ export interface DomainHelper {
) => Promise<void>
}
export interface RawDBAdapterStream<T extends Doc> {
next: () => Promise<T[]>
close: () => Promise<void>
}
/**
* @public
*/
export interface RawDBAdapter {
find: <T extends Doc>(
ctx: MeasureContext,
workspace: WorkspaceId,
domain: Domain,
query: DocumentQuery<T>,
options?: Omit<FindOptions<T>, 'projection' | 'lookup' | 'total'>
) => Promise<FindResult<T>>
findStream: <T extends Doc>(
ctx: MeasureContext,
workspace: WorkspaceId,
domain: Domain,
query: DocumentQuery<T>,
options?: Omit<FindOptions<T>, 'projection' | 'lookup' | 'total'>
) => Promise<RawDBAdapterStream<T>>
upload: <T extends Doc>(ctx: MeasureContext, workspace: WorkspaceId, domain: Domain, docs: T[]) => Promise<void>
update: <T extends Doc>(
ctx: MeasureContext,
workspace: WorkspaceId,
domain: Domain,
docs: Map<Ref<T>, DocumentUpdate<T>>
) => Promise<void>
clean: <T extends Doc>(ctx: MeasureContext, workspace: WorkspaceId, domain: Domain, docs: Ref<T>[]) => Promise<void>
close: () => Promise<void>
}
export type DbAdapterHandler = (
domain: Domain,
event: 'add' | 'update' | 'delete' | 'read',
@ -102,7 +67,7 @@ export type DbAdapterHandler = (
export interface DbAdapter extends LowLevelStorage {
init?: (domains?: string[], excludeDomains?: string[]) => Promise<void>
helper: () => DomainHelperOperations
helper?: () => DomainHelperOperations
close: () => Promise<void>
findAll: <T extends Doc>(

View File

@ -89,7 +89,6 @@ export class DatalakeService implements StorageAdapter {
provider: '',
_class: core.class.Blob,
_id: objectName as Ref<Blob>,
storageId: objectName,
contentType: result.type,
size: result.size ?? 0,
etag: result.etag ?? '',

View File

@ -897,8 +897,7 @@ async function getGeneratePreview (
_id: sizeId as Ref<PlatformBlob>,
size: dataBuff.size,
contentType,
etag: upload.etag,
storageId: sizeId
etag: upload.etag
}
} catch (err: any) {
Analytics.handleError(err)

View File

@ -24,12 +24,6 @@ import { start } from '.'
export function startFront (ctx: MeasureContext, extraConfig?: Record<string, string | undefined>): void {
const SERVER_PORT = parseInt(process.env.SERVER_PORT ?? '8080')
const url = process.env.MONGO_URL
if (url === undefined) {
console.error('please provide mongodb url')
process.exit(1)
}
const elasticUrl = process.env.ELASTIC_URL
if (elasticUrl === undefined) {
console.error('please provide elastic url')
@ -37,7 +31,7 @@ export function startFront (ctx: MeasureContext, extraConfig?: Record<string, st
}
const storageConfig: StorageConfiguration = storageConfigFromEnv()
const storageAdapter = buildStorageFromConfig(storageConfig, url)
const storageAdapter = buildStorageFromConfig(storageConfig)
const accountsUrl = process.env.ACCOUNTS_URL
if (accountsUrl === undefined) {

View File

@ -88,8 +88,10 @@ export class DomainTxMiddleware extends BaseMiddleware implements Middleware {
}
}
const r = await ctx.with('adapter-tx', { domain }, async (ctx) => await adapter.tx(ctx, ...txes), {
txes: txes.length
const r = await ctx.with('adapter-tx', { domain }, (ctx) => adapter.tx(ctx, ...txes), {
txes: txes.length,
classes: Array.from(new Set(txes.map((it) => it.objectClass))),
_classes: Array.from(new Set(txes.map((it) => it._class)))
})
if (Array.isArray(r)) {

View File

@ -225,8 +225,7 @@ export class MinioService implements StorageAdapter {
provider: this.opt.name,
space: core.space.Configuration,
modifiedBy: core.account.ConfigUser,
modifiedOn: data.lastModified.getTime(),
storageId: _id
modifiedOn: data.lastModified.getTime()
})
}
onNext()
@ -279,7 +278,6 @@ export class MinioService implements StorageAdapter {
provider: '',
_class: core.class.Blob,
_id: this.stripPrefix(rootPrefix, objectName) as Ref<Blob>,
storageId: this.stripPrefix(rootPrefix, objectName),
contentType: result.metaData['content-type'],
size: result.size,
etag: result.etag,

View File

@ -14,6 +14,5 @@
// limitations under the License.
//
export * from './rawAdapter'
export * from './storage'
export * from './utils'

View File

@ -1,194 +0,0 @@
import {
SortingOrder,
cutObjectArray,
toFindResult,
type Doc,
type DocumentQuery,
type DocumentUpdate,
type Domain,
type FindOptions,
type FindResult,
type MeasureContext,
type Ref,
type WorkspaceId
} from '@hcengineering/core'
import type { RawDBAdapter, RawDBAdapterStream } from '@hcengineering/server-core'
import { type Document, type Filter, type FindCursor, type MongoClient, type Sort } from 'mongodb'
import { toArray, uploadDocuments } from './storage'
import { getMongoClient, getWorkspaceMongoDB } from './utils'
export function createRawMongoDBAdapter (url: string): RawDBAdapter {
const client = getMongoClient(url)
let mongoClient: MongoClient | undefined
const collectSort = (options: FindOptions<Doc>): Sort | undefined => {
if (options?.sort === undefined) {
return undefined
}
const sort: Sort = {}
let count = 0
for (const key in options.sort) {
const order = options.sort[key] === SortingOrder.Ascending ? 1 : -1
sort[key] = order
count++
}
if (count === 0) {
return undefined
}
return sort
}
async function getCursor<T extends Doc> (
workspace: WorkspaceId,
domain: Domain,
query: DocumentQuery<T>,
options?: Omit<FindOptions<T>, 'projection' | 'lookup' | 'total'>
): Promise<{
cursor: FindCursor<T>
total: number
}> {
mongoClient = mongoClient ?? (await client.getClient())
const db = getWorkspaceMongoDB(mongoClient, workspace)
const coll = db.collection(domain)
let cursor = coll.find<T>(query as Filter<Document>, {
checkKeys: false
})
const total: number = -1
if (options != null) {
if (options.sort !== undefined) {
const sort = collectSort(options)
if (sort !== undefined) {
cursor = cursor.sort(sort)
}
}
if (options.limit !== undefined || typeof query._id === 'string') {
cursor = cursor.limit(options.limit ?? 1)
}
}
return { cursor, total }
}
return {
find: async function <T extends Doc>(
ctx: MeasureContext,
workspace: WorkspaceId,
domain: Domain,
query: DocumentQuery<T>,
options?: Omit<FindOptions<T>, 'projection' | 'lookup' | 'total'>
): Promise<FindResult<T>> {
const { cursor, total } = await ctx.with(
'get-cursor',
{},
async () => await getCursor(workspace, domain, query, options)
)
// Error in case of timeout
try {
const res = await ctx.with('to-array', {}, async () => await toArray<T>(cursor), {
...query,
...options
})
return toFindResult(res, total)
} catch (e) {
console.error('error during executing cursor in findAll', cutObjectArray(query), options, e)
throw e
}
},
findStream: async function <T extends Doc>(
ctx: MeasureContext,
workspace: WorkspaceId,
domain: Domain,
query: DocumentQuery<T>,
options?: Omit<FindOptions<T>, 'projection' | 'lookup' | 'total'>
): Promise<RawDBAdapterStream<T>> {
const { cursor } = await getCursor(workspace, domain, query, options)
return {
next: async () => {
const result: T[] = []
const doc = await cursor.next()
if (doc != null) {
result.push(doc)
}
if (cursor.bufferedCount() > 0) {
result.push(...cursor.readBufferedDocuments())
}
return result
},
close: async () => {
await cursor.close()
}
}
},
upload: async (ctx: MeasureContext, workspace, domain, docs) => {
mongoClient = mongoClient ?? (await client.getClient())
const db = getWorkspaceMongoDB(mongoClient, workspace)
const coll = db.collection(domain)
await uploadDocuments(ctx, docs, coll)
},
close: async () => {
client.close()
},
clean: async (ctx, workspace, domain, docs) => {
mongoClient = mongoClient ?? (await client.getClient())
const db = getWorkspaceMongoDB(mongoClient, workspace)
const coll = db.collection<Doc>(domain)
await coll.deleteMany({ _id: { $in: docs } })
},
update: async (
ctx: MeasureContext,
workspace: WorkspaceId,
domain: Domain,
operations: Map<Ref<Doc>, DocumentUpdate<Doc>>
): Promise<void> => {
await ctx.with('update', { domain }, async () => {
mongoClient = mongoClient ?? (await client.getClient())
const db = getWorkspaceMongoDB(mongoClient, workspace)
const coll = db.collection(domain)
// remove old and insert new ones
const ops = Array.from(operations.entries())
let skip = 500
while (ops.length > 0) {
const part = ops.splice(0, skip)
try {
await ctx.with('raw-bulk-write', {}, async () => {
await coll.bulkWrite(
part.map((it) => {
const { $unset, ...set } = it[1] as any
if ($unset !== undefined) {
for (const k of Object.keys(set)) {
if ($unset[k] === '') {
// eslint-disable-next-line @typescript-eslint/no-dynamic-delete
delete $unset[k]
}
}
}
return {
updateOne: {
filter: { _id: it[0] },
update: {
$set: { ...set, '%hash%': null },
...($unset !== undefined ? { $unset } : {})
}
}
}
}),
{
ordered: false
}
)
})
} catch (err: any) {
ctx.error('failed on bulk write', { error: err, skip })
if (skip !== 1) {
ops.push(...part)
skip = 1 // Let's update one by one, to loose only one failed variant.
}
}
}
})
}
}
}

View File

@ -1300,8 +1300,8 @@ class MongoAdapter extends MongoAdapterBase {
const coll = this.db.collection<Doc>(domain)
promises.push(
addOperation(ctx, 'bulk-write', { domain, operations: ops.length }, async (ctx) => {
await ctx.with(
addOperation(ctx, 'bulk-write', { domain, operations: ops.length }, (ctx) =>
ctx.with(
'bulk-write',
{ domain },
async () => {
@ -1318,7 +1318,7 @@ class MongoAdapter extends MongoAdapterBase {
operations: ops.length
}
)
})
)
)
}
if (domainBulk.findUpdate.size > 0) {
@ -1337,7 +1337,7 @@ class MongoAdapter extends MongoAdapterBase {
ctx,
'find-result',
{},
async (ctx) => await coll.find({ _id: { $in: Array.from(domainBulk.findUpdate) } }).toArray(),
(ctx) => coll.find({ _id: { $in: Array.from(domainBulk.findUpdate) } }).toArray(),
{ domain, _ids: domainBulk.findUpdate.size, queueTime: stTime - st }
)
result.push(...docs)
@ -1665,19 +1665,8 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
@withContext('get-model')
async getModel (ctx: MeasureContext): Promise<Tx[]> {
const txCollection = this.db.collection<Tx>(DOMAIN_TX)
const cursor = await ctx.with('find', {}, async () => {
const c = txCollection.find(
{ objectSpace: core.space.Model },
{
sort: {
_id: 1,
modifiedOn: 1
}
}
)
return c
})
const model = await ctx.with('to-array', {}, async () => await toArray<Tx>(cursor))
const cursor = txCollection.find({ objectSpace: core.space.Model })
const model = await toArray<Tx>(cursor)
// We need to put all core.account.System transactions first
const systemTx: Tx[] = []
const userTx: Tx[] = []

View File

@ -263,8 +263,7 @@ export class S3Service implements StorageAdapter {
provider: this.opt.name,
space: core.space.Configuration,
modifiedBy: core.account.ConfigUser,
modifiedOn: data.LastModified?.getTime() ?? 0,
storageId: _id
modifiedOn: data.LastModified?.getTime() ?? 0
})
}
}
@ -289,7 +288,6 @@ export class S3Service implements StorageAdapter {
provider: '',
_class: core.class.Blob,
_id: this.stripPrefix(rootPrefix, objectName) as Ref<Blob>,
storageId: this.stripPrefix(rootPrefix, objectName),
contentType: result.ContentType ?? '',
size: result.ContentLength ?? 0,
etag: result.ETag ?? '',

View File

@ -71,7 +71,7 @@ import { createIndexStages } from './indexing'
export function getTxAdapterFactory (
metrics: MeasureContext,
dbUrls: string,
dbUrl: string,
workspace: WorkspaceIdWithUrl,
branding: Branding | null,
opt: {
@ -86,7 +86,7 @@ export function getTxAdapterFactory (
},
extensions?: Partial<DbConfiguration>
): DbAdapterFactory {
const conf = getConfig(metrics, dbUrls, workspace, branding, metrics, opt, extensions)
const conf = getConfig(metrics, dbUrl, workspace, branding, metrics, opt, extensions)
const adapterName = conf.domains[DOMAIN_TX] ?? conf.defaultAdapter
const adapter = conf.adapters[adapterName]
return adapter.factory
@ -98,7 +98,7 @@ export function getTxAdapterFactory (
export function createServerPipeline (
metrics: MeasureContext,
dbUrls: string,
dbUrl: string,
model: Tx[],
opt: {
fullTextUrl: string
@ -116,7 +116,7 @@ export function createServerPipeline (
return (ctx, workspace, upgrade, broadcast, branding) => {
const metricsCtx = opt.usePassedCtx === true ? ctx : metrics
const wsMetrics = metricsCtx.newChild('🧲 session', {})
const conf = getConfig(metrics, dbUrls, workspace, branding, wsMetrics, opt, extensions)
const conf = getConfig(metrics, dbUrl, workspace, branding, wsMetrics, opt, extensions)
const middlewares: MiddlewareCreator[] = [
LookupMiddleware.create,
@ -163,7 +163,7 @@ export function createServerPipeline (
export function createBackupPipeline (
metrics: MeasureContext,
dbUrls: string,
dbUrl: string,
systemTx: Tx[],
opt: {
usePassedCtx?: boolean
@ -177,7 +177,7 @@ export function createBackupPipeline (
const wsMetrics = metricsCtx.newChild('🧲 backup', {})
const conf = getConfig(
metrics,
dbUrls,
dbUrl,
workspace,
branding,
wsMetrics,
@ -229,25 +229,19 @@ export function createBackupPipeline (
export async function getServerPipeline (
ctx: MeasureContext,
model: Tx[],
mongodbUri: string | undefined,
dbUrl: string,
wsUrl: WorkspaceIdWithUrl
): Promise<{
pipeline: Pipeline
storageAdapter: StorageAdapter
}> {
const dbUrls = mongodbUri !== undefined && mongodbUri !== dbUrl ? `${dbUrl};${mongodbUri}` : dbUrl
const storageConfig: StorageConfiguration = storageConfigFromEnv()
if (mongodbUri === undefined) {
throw new Error('MONGO_URL is not provided')
}
const storageAdapter = buildStorageFromConfig(storageConfig, mongodbUri)
const storageAdapter = buildStorageFromConfig(storageConfig)
const pipelineFactory = createServerPipeline(
ctx,
dbUrls,
dbUrl,
model,
{
externalStorage: storageAdapter,
@ -291,7 +285,7 @@ export async function getServerPipeline (
export function getConfig (
metrics: MeasureContext,
dbUrls: string,
dbUrl: string,
workspace: WorkspaceIdWithUrl,
branding: Branding | null,
ctx: MeasureContext,
@ -309,7 +303,6 @@ export function getConfig (
): DbConfiguration {
const metricsCtx = opt.usePassedCtx === true ? ctx : metrics
const wsMetrics = metricsCtx.newChild('🧲 session', {})
const [dbUrl, mongoUrl] = dbUrls.split(';')
const conf: DbConfiguration & FulltextDBConfiguration = {
domains: {
[DOMAIN_TX]: 'Tx',
@ -324,11 +317,11 @@ export function getConfig (
defaultAdapter: extensions?.defaultAdapter ?? 'Main',
adapters: {
Tx: {
factory: mongoUrl !== undefined ? createPostgresTxAdapter : createMongoTxAdapter,
factory: dbUrl.startsWith('postgresql') ? createPostgresTxAdapter : createMongoTxAdapter,
url: dbUrl
},
Main: {
factory: mongoUrl !== undefined ? createPostgresAdapter : createMongoAdapter,
factory: dbUrl.startsWith('postgresql') ? createPostgresAdapter : createMongoAdapter,
url: dbUrl
},
Null: {
@ -341,7 +334,7 @@ export function getConfig (
},
StorageData: {
factory: createStorageDataAdapter,
url: mongoUrl ?? dbUrl
url: ''
},
FullTextBlob: {
factory: createElasticBackupDataAdapter,

View File

@ -1,416 +0,0 @@
import core, {
DOMAIN_BLOB,
groupByArray,
toIdMap,
withContext,
type Blob,
type MeasureContext,
type Ref,
type StorageIterator,
type WorkspaceId
} from '@hcengineering/core'
import { type Readable } from 'stream'
import { getMetadata } from '@hcengineering/platform'
import {
type BlobStorageIterator,
type BucketInfo,
type ListBlobResult,
type StorageAdapter,
type StorageAdapterEx,
type UploadedObjectInfo
} from '@hcengineering/storage'
import { Analytics } from '@hcengineering/analytics'
import serverCore, {
type RawDBAdapter,
type StorageConfig,
type StorageConfiguration
} from '@hcengineering/server-core'
class NoSuchKeyError extends Error {
code: string
constructor (msg: string) {
super(msg)
this.code = 'NoSuchKey'
}
}
/**
* Perform operations on storage adapter and map required information into BinaryDocument into provided DbAdapter storage.
*/
export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterEx {
constructor (
readonly adapters: Map<string, StorageAdapter>,
readonly defaultAdapter: string, // Adapter will be used to put new documents into, if not matched by content type
readonly dbAdapter: RawDBAdapter
) {}
async syncBlobFromStorage (
ctx: MeasureContext,
workspaceId: WorkspaceId,
objectName: string,
providerId?: string
): Promise<Blob> {
let current: Blob | undefined = (
await this.dbAdapter.find<Blob>(ctx, workspaceId, DOMAIN_BLOB, { _id: objectName as Ref<Blob> }, { limit: 1 })
).shift()
let updated = false
if (current === undefined && providerId !== undefined) {
current = await this.adapters.get(providerId)?.stat(ctx, workspaceId, objectName)
if (current !== undefined) {
current.provider = providerId
updated = true
}
}
const provider = this.adapters.get(providerId ?? current?.provider ?? this.defaultAdapter)
if (provider === undefined) {
throw new NoSuchKeyError('No such provider found')
}
const stat = updated ? current : await provider.stat(ctx, workspaceId, objectName)
if (stat !== undefined) {
stat.provider = providerId ?? current?.provider ?? this.defaultAdapter
if (current !== undefined && !updated) {
await this.dbAdapter.clean(ctx, workspaceId, DOMAIN_BLOB, [current._id])
}
await this.dbAdapter.upload<Blob>(ctx, workspaceId, DOMAIN_BLOB, [stat])
// TODO: We need to send notification about Blob is changed.
return stat
} else {
throw new NoSuchKeyError('No such blob found')
}
}
async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {}
doTrimHash (s: string | undefined): string {
if (s == null) {
return ''
}
if (s.startsWith('"') && s.endsWith('"')) {
return s.slice(1, s.length - 1)
}
return s
}
async doSyncDocs (ctx: MeasureContext, workspaceId: WorkspaceId, docs: ListBlobResult[]): Promise<void> {
const existingBlobs = toIdMap(
await this.dbAdapter.find<Blob>(ctx, workspaceId, DOMAIN_BLOB, { _id: { $in: docs.map((it) => it._id) } })
)
const toUpdate: Blob[] = []
for (const d of docs) {
const blobInfo = existingBlobs.get(d._id)
if (
blobInfo === undefined || // Blob info undefined
// Provider are same and etag or size are diffrent.
(d.provider === blobInfo.provider &&
(this.doTrimHash(blobInfo.etag) !== this.doTrimHash(d.etag) || blobInfo.size !== d.size)) ||
// We have replacement in default
(d.provider === this.defaultAdapter && blobInfo?.provider !== d.provider)
) {
const stat = await this.adapters.get(d.provider)?.stat(ctx, workspaceId, d._id)
if (stat !== undefined) {
stat.provider = d.provider
toUpdate.push(stat)
} else {
ctx.error('blob not found for sync', { provider: d.provider, id: d._id, workspace: workspaceId.name })
}
}
}
if (toUpdate.length > 0) {
await this.dbAdapter.clean(ctx, workspaceId, DOMAIN_BLOB, Array.from(toUpdate.map((it) => it._id)))
await this.dbAdapter.upload(ctx, workspaceId, DOMAIN_BLOB, toUpdate)
}
}
find (ctx: MeasureContext, workspaceId: WorkspaceId): StorageIterator {
const storageIterator = this.makeStorageIterator(ctx, workspaceId)
return {
next: async () => {
const docInfos = await storageIterator.next()
if (docInfos.length > 0) {
await this.doSyncDocs(ctx, workspaceId, docInfos)
}
return docInfos.map((it) => ({
hash: it.etag,
id: it._id,
size: it.size
}))
},
close: async (ctx) => {
await storageIterator.close()
}
}
}
private makeStorageIterator (ctx: MeasureContext, workspaceId: WorkspaceId): BlobStorageIterator {
const adapters = Array.from(this.adapters.entries())
let provider: [string, StorageAdapter] | undefined
let iterator: BlobStorageIterator | undefined
return {
next: async () => {
while (true) {
if (iterator === undefined && adapters.length > 0) {
provider = adapters.shift() as [string, StorageAdapter]
iterator = await provider[1].listStream(ctx, workspaceId)
}
if (iterator === undefined) {
return []
}
const docInfos = await iterator.next()
if (docInfos.length > 0) {
for (const d of docInfos) {
d.provider = provider?.[0] as string
}
// We need to check if our stored version is fine
return docInfos
} else {
// We need to take next adapter
await iterator.close()
iterator = undefined
continue
}
}
},
close: async () => {
if (iterator !== undefined) {
await iterator.close()
}
}
}
}
async close (): Promise<void> {
for (const a of this.adapters.values()) {
await a.close()
}
await this.dbAdapter.close()
}
async exists (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<boolean> {
for (const a of this.adapters.values()) {
if (!(await a.exists(ctx, workspaceId))) {
return false
}
}
return true
}
@withContext('aggregator-make', {})
async make (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {
for (const [k, a] of this.adapters.entries()) {
try {
if (!(await a.exists(ctx, workspaceId))) {
await a.make(ctx, workspaceId)
}
} catch (err: any) {
ctx.error('failed to init adapter', { adapter: k, workspaceId, error: err })
// Do not throw error in case default adapter is ok
Analytics.handleError(err)
if (k === this.defaultAdapter) {
// We should throw in case default one is not valid
throw err
}
}
}
}
@withContext('aggregator-listBuckets', {})
async listBuckets (ctx: MeasureContext): Promise<BucketInfo[]> {
const result: BucketInfo[] = []
for (const a of this.adapters.values()) {
result.push(...(await a.listBuckets(ctx)))
}
return result
}
@withContext('aggregator-delete', {})
async delete (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {
for (const a of this.adapters.values()) {
if (await a.exists(ctx, workspaceId)) {
await a.delete(ctx, workspaceId)
}
}
}
@withContext('aggregator-remove', {})
async remove (ctx: MeasureContext, workspaceId: WorkspaceId, objectNames: string[]): Promise<void> {
const docs = await this.dbAdapter.find<Blob>(ctx, workspaceId, DOMAIN_BLOB, {
_id: { $in: objectNames as Ref<Blob>[] }
})
// Group by provider and delegate into it.
const byProvider = groupByArray(docs, (item) => item.provider)
for (const [k, docs] of byProvider) {
const adapter = this.adapters.get(k)
if (adapter !== undefined) {
await adapter.remove(
ctx,
workspaceId,
docs.map((it) => it._id)
)
}
}
await this.dbAdapter.clean(ctx, workspaceId, DOMAIN_BLOB, objectNames as Ref<Blob>[])
}
async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> {
const data = await this.dbAdapter.findStream<Blob>(ctx, workspaceId, DOMAIN_BLOB, {})
return {
next: async (): Promise<ListBlobResult[]> => {
return await data.next()
},
close: async () => {
await data.close()
}
}
}
@withContext('aggregator-stat', {})
async stat (ctx: MeasureContext, workspaceId: WorkspaceId, name: string): Promise<Blob | undefined> {
const result = await this.dbAdapter.find<Blob>(
ctx,
workspaceId,
DOMAIN_BLOB,
{ _id: name as Ref<Blob> },
{ limit: 1 }
)
return result.shift()
}
@withContext('aggregator-get', {})
async get (ctx: MeasureContext, workspaceId: WorkspaceId, name: string): Promise<Readable> {
const { provider, stat } = await this.findProvider(ctx, workspaceId, name)
return await provider.get(ctx, workspaceId, stat.storageId)
}
@withContext('find-provider', {})
private async findProvider (
ctx: MeasureContext,
workspaceId: WorkspaceId,
objectName: string
): Promise<{ provider: StorageAdapter, stat: Blob }> {
const stat = (
await this.dbAdapter.find<Blob>(ctx, workspaceId, DOMAIN_BLOB, { _id: objectName as Ref<Blob> }, { limit: 1 })
).shift()
if (stat === undefined) {
throw new NoSuchKeyError(`No such object found ${objectName}`)
}
const provider = this.adapters.get(stat.provider)
if (provider === undefined) {
throw new NoSuchKeyError(`No such provider found: ${provider}`)
}
return { provider, stat }
}
@withContext('aggregator-partial', {})
async partial (
ctx: MeasureContext,
workspaceId: WorkspaceId,
objectName: string,
offset: number,
length?: number | undefined
): Promise<Readable> {
const { provider, stat } = await this.findProvider(ctx, workspaceId, objectName)
return await provider.partial(ctx, workspaceId, stat.storageId, offset, length)
}
@withContext('aggregator-read', {})
async read (ctx: MeasureContext, workspaceId: WorkspaceId, name: string): Promise<Buffer[]> {
const { provider, stat } = await this.findProvider(ctx, workspaceId, name)
return await provider.read(ctx, workspaceId, stat.storageId)
}
selectProvider (
forceProvider: string | undefined,
contentType: string
): { adapter: StorageAdapter, provider: string } {
if (forceProvider !== undefined) {
return {
adapter: this.adapters.get(forceProvider) ?? (this.adapters.get(this.defaultAdapter) as StorageAdapter),
provider: forceProvider
}
}
return { adapter: this.adapters.get(this.defaultAdapter) as StorageAdapter, provider: this.defaultAdapter }
}
@withContext('aggregator-put', {})
async put (
ctx: MeasureContext,
workspaceId: WorkspaceId,
objectName: string,
stream: string | Readable | Buffer,
contentType: string,
size?: number | undefined
): Promise<UploadedObjectInfo> {
const stat = (
await this.dbAdapter.find<Blob>(ctx, workspaceId, DOMAIN_BLOB, { _id: objectName as Ref<Blob> }, { limit: 1 })
).shift()
const { provider, adapter } = this.selectProvider(undefined, contentType)
const result = await adapter.put(ctx, workspaceId, objectName, stream, contentType, size)
if (size === undefined || size === 0 || !Number.isInteger(size)) {
const docStats = await adapter.stat(ctx, workspaceId, objectName)
if (docStats !== undefined) {
if (contentType !== docStats.contentType) {
contentType = docStats.contentType
}
size = docStats.size
}
}
const blobDoc: Blob = {
_class: core.class.Blob,
_id: objectName as Ref<Blob>,
modifiedBy: core.account.System,
modifiedOn: Date.now(),
space: core.space.Configuration,
provider,
storageId: objectName,
size: size ?? 0,
contentType,
etag: result.etag,
version: result.versionId ?? null
}
await this.dbAdapter.upload<Blob>(ctx, workspaceId, DOMAIN_BLOB, [blobDoc])
// If the file is already stored in different provider, we need to remove it.
if (stat !== undefined && stat.provider !== provider) {
// TODO temporary not needed
// const adapter = this.adapters.get(stat.provider)
// await adapter?.remove(ctx, workspaceId, [stat._id])
}
return result
}
@withContext('aggregator-getUrl', {})
async getUrl (ctx: MeasureContext, workspaceId: WorkspaceId, name: string): Promise<string> {
// const { provider, stat } = await this.findProvider(ctx, workspaceId, name)
// return await provider.getUrl(ctx, workspaceId, stat.storageId)
const filesUrl = getMetadata(serverCore.metadata.FilesUrl) ?? ''
return filesUrl.replaceAll(':workspace', workspaceId.name).replaceAll(':blobId', name)
}
}
/**
* @public
*/
export function buildStorage (
config: StorageConfiguration,
dbAdapter: RawDBAdapter,
storageFactory: (config: StorageConfig) => StorageAdapter
): AggregatorStorageAdapter {
const adapters = new Map<string, StorageAdapter>()
for (const c of config.storages) {
adapters.set(c.name, storageFactory(c))
}
return new AggregatorStorageAdapter(adapters, config.default, dbAdapter)
}

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
import core, {
import {
Class,
Doc,
DocumentQuery,
@ -28,27 +28,20 @@ import core, {
ModelDb,
Ref,
StorageIterator,
toFindResult,
Tx,
TxResult,
WorkspaceId,
type Blob
} from '@hcengineering/core'
import { createMongoAdapter } from '@hcengineering/mongo'
import { PlatformError, unknownError } from '@hcengineering/platform'
import {
DbAdapter,
DbAdapterHandler,
StorageAdapter,
type DomainHelperOperations,
type StorageAdapterEx
} from '@hcengineering/server-core'
import { DbAdapter, DbAdapterHandler, StorageAdapter, type StorageAdapterEx } from '@hcengineering/server-core'
class StorageBlobAdapter implements DbAdapter {
constructor (
readonly workspaceId: WorkspaceId,
readonly client: StorageAdapter, // Should not be closed
readonly ctx: MeasureContext,
readonly blobAdapter: DbAdapter // A real blob adapter for Blob documents.
readonly client: StorageAdapterEx, // Should not be closed
readonly ctx: MeasureContext
) {}
async traverse<T extends Doc>(
@ -56,23 +49,26 @@ class StorageBlobAdapter implements DbAdapter {
query: DocumentQuery<T>,
options?: Pick<FindOptions<T>, 'sort' | 'limit' | 'projection'>
): Promise<Iterator<T>> {
return await this.blobAdapter.traverse(domain, query, options)
return {
next: async () => {
return toFindResult<T>([])
},
close: async () => {}
}
}
init?: ((domains?: string[], excludeDomains?: string[]) => Promise<void>) | undefined
on?: ((handler: DbAdapterHandler) => void) | undefined
async rawFindAll<T extends Doc>(domain: Domain, query: DocumentQuery<T>, options?: FindOptions<T>): Promise<T[]> {
return await this.blobAdapter.rawFindAll(domain, query, options)
return []
}
async rawUpdate<T extends Doc>(
domain: Domain,
query: DocumentQuery<T>,
operations: DocumentUpdate<T>
): Promise<void> {
await this.blobAdapter.rawUpdate(domain, query, operations)
}
): Promise<void> {}
async findAll<T extends Doc>(
ctx: MeasureContext,
@ -80,15 +76,11 @@ class StorageBlobAdapter implements DbAdapter {
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> {
return await this.blobAdapter.findAll(ctx, _class, query, options)
}
helper (): DomainHelperOperations {
return this.blobAdapter.helper()
return toFindResult([])
}
async groupBy<T>(ctx: MeasureContext, domain: Domain, field: string): Promise<Set<T>> {
return await this.blobAdapter.groupBy(ctx, domain, field)
return new Set()
}
async tx (ctx: MeasureContext, ...tx: Tx[]): Promise<TxResult[]> {
@ -98,53 +90,32 @@ class StorageBlobAdapter implements DbAdapter {
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {}
async removeOldIndex (domain: Domain, deletePattern: RegExp[], keepPattern: RegExp[]): Promise<void> {}
async close (): Promise<void> {
await this.blobAdapter.close()
}
async close (): Promise<void> {}
find (ctx: MeasureContext, domain: Domain, recheck?: boolean): StorageIterator {
return (this.client as StorageAdapterEx).find(ctx, this.workspaceId)
return this.client.find(ctx, this.workspaceId)
}
async load (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
return await this.blobAdapter.load(ctx, domain, docs)
const blobs: Blob[] = []
for (const d of docs) {
const bb = await this.client.stat(ctx, this.workspaceId, d)
if (bb !== undefined) {
blobs.push(bb)
}
}
return blobs
}
async upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise<void> {
// We need to update docs to have provider === defualt one.
if ('adapters' in this.client) {
const toUpload: Doc[] = []
const adapterEx = this.client as StorageAdapterEx
for (const d of docs) {
// We need sync stats to be sure all info are correct from storage.
if (d._class === core.class.Blob) {
const blob = d as Blob
const blobStat = await this.client.stat(ctx, this.workspaceId, blob.storageId)
if (blobStat !== undefined) {
blob.provider = adapterEx.defaultAdapter
blob.etag = blobStat.etag
blob.contentType = blobStat.contentType
blob.version = blobStat.version
blob.size = blobStat.size
delete (blob as any).downloadUrl
delete (blob as any).downloadUrlExpire
toUpload.push(blob)
}
}
}
docs = toUpload
}
await this.blobAdapter.upload(ctx, domain, docs)
// Nothing to do
}
async clean (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<void> {
await Promise.all([this.blobAdapter.clean(ctx, domain, docs), this.client.remove(this.ctx, this.workspaceId, docs)])
await this.client.remove(this.ctx, this.workspaceId, docs)
}
async update (ctx: MeasureContext, domain: Domain, operations: Map<Ref<Doc>, DocumentUpdate<Doc>>): Promise<void> {
await this.blobAdapter.update(ctx, domain, operations)
}
async update (ctx: MeasureContext, domain: Domain, operations: Map<Ref<Doc>, DocumentUpdate<Doc>>): Promise<void> {}
}
/**
@ -163,17 +134,5 @@ export async function createStorageDataAdapter (
}
// We need to create bucket if it doesn't exist
await storage.make(ctx, workspaceId)
const storageEx = 'adapters' in storage ? (storage as StorageAdapterEx) : undefined
const blobAdapter = await createMongoAdapter(ctx, hierarchy, url, workspaceId, modelDb, undefined, {
calculateHash: (d) => {
const blob = d as Blob
if (storageEx?.adapters !== undefined && storageEx.adapters.get(blob.provider) === undefined) {
return { digest: blob.etag + '_' + storageEx.defaultAdapter, size: blob.size }
}
return { digest: blob.etag, size: blob.size }
}
})
return new StorageBlobAdapter(workspaceId, storage, ctx, blobAdapter)
return new StorageBlobAdapter(workspaceId, storage as StorageAdapterEx, ctx)
}

View File

@ -0,0 +1,269 @@
import {
withContext,
type Blob,
type MeasureContext,
type StorageIterator,
type WorkspaceId
} from '@hcengineering/core'
import { type Readable } from 'stream'
import { getMetadata } from '@hcengineering/platform'
import {
type BlobStorageIterator,
type BucketInfo,
type ListBlobResult,
type NamedStorageAdapter,
type StorageAdapter,
type StorageAdapterEx,
type UploadedObjectInfo
} from '@hcengineering/storage'
import { Analytics } from '@hcengineering/analytics'
import serverCore, { type StorageConfig, type StorageConfiguration } from '@hcengineering/server-core'
class NoSuchKeyError extends Error {
code: string
constructor (msg: string) {
super(msg)
this.code = 'NoSuchKey'
}
}
/**
* Perform operations on storage adapter and map required information into BinaryDocument into provided DbAdapter storage.
*/
export class FallbackStorageAdapter implements StorageAdapter, StorageAdapterEx {
// Adapters should be in reverse order, first one is target one, and next ones are for fallback
constructor (readonly adapters: NamedStorageAdapter[]) {}
async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {}
doTrimHash (s: string | undefined): string {
if (s == null) {
return ''
}
if (s.startsWith('"') && s.endsWith('"')) {
return s.slice(1, s.length - 1)
}
return s
}
find (ctx: MeasureContext, workspaceId: WorkspaceId): StorageIterator {
const storageIterator = this.makeStorageIterator(ctx, workspaceId)
return {
next: async () => {
const docInfos = await storageIterator.next()
return docInfos.map((it) => ({
hash: it.etag,
id: it._id,
size: it.size
}))
},
close: async (ctx) => {
await storageIterator.close()
}
}
}
private makeStorageIterator (ctx: MeasureContext, workspaceId: WorkspaceId): BlobStorageIterator {
// We need to reverse, since we need to iterate on latest document last
const adapters = [...this.adapters].reverse()
let provider: NamedStorageAdapter | undefined
let iterator: BlobStorageIterator | undefined
return {
next: async () => {
while (true) {
if (iterator === undefined && adapters.length > 0) {
provider = adapters.shift() as NamedStorageAdapter
iterator = await provider.adapter.listStream(ctx, workspaceId)
}
if (iterator === undefined) {
return []
}
const docInfos = await iterator.next()
if (docInfos.length > 0) {
for (const d of docInfos) {
d.provider = provider?.name as string
}
// We need to check if our stored version is fine
return docInfos
} else {
// We need to take next adapter
await iterator.close()
iterator = undefined
continue
}
}
},
close: async () => {
if (iterator !== undefined) {
await iterator.close()
}
}
}
}
async close (): Promise<void> {
for (const { adapter } of this.adapters) {
await adapter.close()
}
}
async exists (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<boolean> {
for (const { adapter } of this.adapters) {
if (!(await adapter.exists(ctx, workspaceId))) {
return false
}
}
return true
}
@withContext('aggregator-make', {})
async make (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {
for (const { name, adapter } of this.adapters) {
try {
if (!(await adapter.exists(ctx, workspaceId))) {
await adapter.make(ctx, workspaceId)
}
} catch (err: any) {
ctx.error('failed to init adapter', { adapter: name, workspaceId, error: err })
// Do not throw error in case default adapter is ok
Analytics.handleError(err)
}
}
}
@withContext('aggregator-listBuckets', {})
async listBuckets (ctx: MeasureContext): Promise<BucketInfo[]> {
const result: BucketInfo[] = []
for (const { adapter } of this.adapters) {
result.push(...(await adapter.listBuckets(ctx)))
}
return result
}
@withContext('aggregator-delete', {})
async delete (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {
for (const { adapter } of this.adapters) {
if (await adapter.exists(ctx, workspaceId)) {
await adapter.delete(ctx, workspaceId)
}
}
}
@withContext('aggregator-remove', {})
async remove (ctx: MeasureContext, workspaceId: WorkspaceId, objectNames: string[]): Promise<void> {
// Group by provider and delegate into it.
for (const { adapter } of this.adapters) {
await adapter.remove(ctx, workspaceId, objectNames)
}
}
async listStream (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<BlobStorageIterator> {
const storageIterator = this.makeStorageIterator(ctx, workspaceId)
return {
next: async (): Promise<ListBlobResult[]> => {
return await storageIterator.next()
},
close: async () => {
await storageIterator.close()
}
}
}
@withContext('aggregator-stat', {})
async stat (ctx: MeasureContext, workspaceId: WorkspaceId, name: string): Promise<Blob | undefined> {
const result = await this.findProvider(ctx, workspaceId, name)
if (result !== undefined) {
result.stat.provider = result.name
}
return result?.stat
}
@withContext('aggregator-get', {})
async get (ctx: MeasureContext, workspaceId: WorkspaceId, name: string): Promise<Readable> {
const result = await this.findProvider(ctx, workspaceId, name)
if (result === undefined) {
throw new NoSuchKeyError(`${workspaceId.name} missing ${name}`)
}
return await result.adapter.get(ctx, workspaceId, result.stat._id)
}
@withContext('find-provider', {})
private async findProvider (
ctx: MeasureContext,
workspaceId: WorkspaceId,
objectName: string
): Promise<{ name: string, adapter: StorageAdapter, stat: Blob } | undefined> {
// Group by provider and delegate into it.
for (const { name, adapter } of this.adapters) {
const stat = await adapter.stat(ctx, workspaceId, objectName)
if (stat !== undefined) {
return { name, adapter, stat }
}
}
}
@withContext('aggregator-partial', {})
async partial (
ctx: MeasureContext,
workspaceId: WorkspaceId,
objectName: string,
offset: number,
length?: number | undefined
): Promise<Readable> {
const result = await this.findProvider(ctx, workspaceId, objectName)
if (result === undefined) {
throw new NoSuchKeyError(`${workspaceId.name} missing ${objectName}`)
}
return await result.adapter.partial(ctx, workspaceId, result.stat._id, offset, length)
}
@withContext('aggregator-read', {})
async read (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<Buffer[]> {
const result = await this.findProvider(ctx, workspaceId, objectName)
if (result === undefined) {
throw new NoSuchKeyError(`${workspaceId.name} missing ${objectName}`)
}
return await result.adapter.read(ctx, workspaceId, result.stat._id)
}
@withContext('aggregator-put', {})
put (
ctx: MeasureContext,
workspaceId: WorkspaceId,
objectName: string,
stream: string | Readable | Buffer,
contentType: string,
size?: number | undefined
): Promise<UploadedObjectInfo> {
const adapter = this.adapters[0].adapter
// Remove in other storages, if appicable
return adapter.put(ctx, workspaceId, objectName, stream, contentType, size)
}
@withContext('aggregator-getUrl', {})
async getUrl (ctx: MeasureContext, workspaceId: WorkspaceId, name: string): Promise<string> {
// const { provider, stat } = await this.findProvider(ctx, workspaceId, name)
// return await provider.getUrl(ctx, workspaceId, stat.storageId)
const filesUrl = getMetadata(serverCore.metadata.FilesUrl) ?? ''
return filesUrl.replaceAll(':workspace', workspaceId.name).replaceAll(':blobId', name)
}
}
/**
* @public
*/
export function buildStorage (
config: StorageConfiguration,
storageFactory: (config: StorageConfig) => StorageAdapter
): FallbackStorageAdapter {
const adapters: NamedStorageAdapter[] = []
for (const c of config.storages) {
adapters.push({ name: c.name, adapter: storageFactory(c) })
}
// Reverse adapter's so latest one will be target one.
return new FallbackStorageAdapter(adapters.reverse())
}

View File

@ -14,6 +14,6 @@
// limitations under the License.
//
export * from './aggregator'
export * from './fallback'
export * from './blobStorage'
export * from './starter'

View File

@ -1,9 +1,8 @@
import { DatalakeService, type DatalakeConfig } from '@hcengineering/datalake'
import { MinioConfig, MinioService, addMinioFallback } from '@hcengineering/minio'
import { createRawMongoDBAdapter } from '@hcengineering/mongo'
import { S3Service, type S3Config } from '@hcengineering/s3'
import { StorageAdapter, StorageConfiguration, type StorageConfig } from '@hcengineering/server-core'
import { AggregatorStorageAdapter, buildStorage } from './aggregator'
import { FallbackStorageAdapter, buildStorage } from './fallback'
/*
@ -14,8 +13,6 @@ import { AggregatorStorageAdapter, buildStorage } from './aggregator'
* kind - an storage kind minior/s3 for now.
* name - a symbolic name for provider, name could be ommited in case kind will be used as name.
* uri - an storage URI with encoded parameters.
* contentTypes - a comma separated list of content type patterns. Like 'image/*,video/gif' will match all image/* and video/gif formats.
So * will be replaced to `.*` for regexp
Last one is used as default one, or one with conrent type matched will be used.
@ -103,6 +100,6 @@ export function createStorageFromConfig (config: StorageConfig): StorageAdapter
}
}
export function buildStorageFromConfig (config: StorageConfiguration, dbUrl: string): AggregatorStorageAdapter {
return buildStorage(config, createRawMongoDBAdapter(dbUrl), createStorageFromConfig)
export function buildStorageFromConfig (config: StorageConfiguration): FallbackStorageAdapter {
return buildStorage(config, createStorageFromConfig)
}

View File

@ -1,24 +1,23 @@
import { MeasureMetricsContext, type MeasureContext, type WorkspaceId } from '@hcengineering/core'
import type { StorageAdapter } from '@hcengineering/storage'
import { AggregatorStorageAdapter } from '../aggregator'
import { MemRawDBAdapter, MemStorageAdapter } from './memAdapters'
import type { NamedStorageAdapter } from '@hcengineering/storage'
import { FallbackStorageAdapter } from '../fallback'
import { MemStorageAdapter } from './memAdapters'
describe('aggregator tests', () => {
function prepare1 (): {
mem1: MemStorageAdapter
mem2: MemStorageAdapter
aggr: AggregatorStorageAdapter
aggr: FallbackStorageAdapter
testCtx: MeasureContext
ws1: WorkspaceId
} {
const mem1 = new MemStorageAdapter()
const mem2 = new MemStorageAdapter()
const adapters = new Map<string, StorageAdapter>()
adapters.set('mem1', mem1)
adapters.set('mem2', mem2)
const blobs = new MemRawDBAdapter()
const aggr = new AggregatorStorageAdapter(adapters, 'mem2', blobs)
const adapters: NamedStorageAdapter[] = []
adapters.push({ name: 'mem2', adapter: mem2 })
adapters.push({ name: 'mem1', adapter: mem1 })
const aggr = new FallbackStorageAdapter(adapters)
const testCtx = new MeasureMetricsContext('test', {})
const ws1: WorkspaceId = { name: 'ws1' }
@ -29,17 +28,15 @@ describe('aggregator tests', () => {
// Test default provider
await mem1.put(testCtx, ws1, 'test', 'data', 'text/plain')
await aggr.syncBlobFromStorage(testCtx, ws1, 'test', 'mem1')
const stat = await aggr.stat(testCtx, ws1, 'test')
expect(stat?.provider).toEqual('mem1')
// Test content typed provider
await aggr.put(testCtx, ws1, 'test', 'data2', 'text/plain')
const stat2 = await aggr.stat(testCtx, ws1, 'test')
expect(stat2?.provider).toEqual('mem2')
const dta = Buffer.concat(await aggr.read(testCtx, ws1, 'test')).toString()
const dta = Buffer.concat((await aggr.read(testCtx, ws1, 'test')) as any).toString()
expect(dta).toEqual('data2')
})
})

View File

@ -1,22 +1,4 @@
import core, {
Hierarchy,
ModelDb,
TxProcessor,
toFindResult,
type Blob,
type Class,
type Doc,
type DocumentQuery,
type DocumentUpdate,
type Domain,
type FindOptions,
type FindResult,
type MeasureContext,
type Ref,
type WorkspaceId
} from '@hcengineering/core'
import { genMinModel } from '@hcengineering/core/src/__tests__/minmodel'
import type { RawDBAdapter, RawDBAdapterStream } from '@hcengineering/server-core'
import core, { type Blob, type MeasureContext, type WorkspaceId } from '@hcengineering/core'
import type { BlobStorageIterator, BucketInfo, StorageAdapter, UploadedObjectInfo } from '@hcengineering/storage'
import { Readable } from 'stream'
@ -102,7 +84,7 @@ export class MemStorageAdapter implements StorageAdapter {
})
})
}
const data = Buffer.concat(buffer)
const data = Buffer.concat(buffer as any)
const dta = {
_class: core.class.Blob,
_id: objectName as any,
@ -114,7 +96,6 @@ export class MemStorageAdapter implements StorageAdapter {
modifiedOn: Date.now(),
provider: '_test',
space: '' as any,
storageId: objectName,
version: null,
workspace: workspaceId.name
}
@ -148,95 +129,3 @@ export class MemStorageAdapter implements StorageAdapter {
return '/files/' + objectName
}
}
export class MemRawDBAdapter implements RawDBAdapter {
hierarchy: Hierarchy
workspaces = new Map<string, ModelDb>()
constructor () {
this.hierarchy = new Hierarchy()
const minModel = genMinModel()
minModel.forEach((it) => {
this.hierarchy.tx(it)
})
}
async find<T extends Doc>(
ctx: MeasureContext,
workspace: WorkspaceId,
domain: Domain,
query: DocumentQuery<T>,
options?: Omit<FindOptions<T>, 'projection' | 'lookup'>
): Promise<FindResult<T>> {
const db = this.workspaces.get(workspace.name)
if (db === undefined) {
return toFindResult([])
}
return await db.findAll(core.class.Blob as Ref<Class<T>>, query, options)
}
async findStream<T extends Doc>(
ctx: MeasureContext,
workspace: WorkspaceId,
domain: Domain,
query: DocumentQuery<T>,
options?: Omit<FindOptions<T>, 'projection' | 'lookup'>
): Promise<RawDBAdapterStream<T>> {
const db = this.workspaces.get(workspace.name)
let result: T[] = []
if (db !== undefined) {
result = await db.findAll(core.class.Blob as Ref<Class<T>>, query, options)
}
return {
next: async () => {
return result.splice(0, 50)
},
close: async () => {}
}
}
async upload<T extends Doc>(ctx: MeasureContext, workspace: WorkspaceId, domain: Domain, docs: T[]): Promise<void> {
let db = this.workspaces.get(workspace.name)
if (db === undefined) {
db = new ModelDb(this.hierarchy)
this.workspaces.set(workspace.name, db)
}
for (const d of docs) {
db.addDoc(d)
}
}
async update<T extends Doc>(
ctx: MeasureContext,
workspace: WorkspaceId,
domain: Domain,
docs: Map<Ref<T>, DocumentUpdate<T>>
): Promise<void> {
let db = this.workspaces.get(workspace.name)
if (db === undefined) {
db = new ModelDb(this.hierarchy)
this.workspaces.set(workspace.name, db)
}
for (const [du, upd] of docs.entries()) {
const doc = db.getObject(du)
TxProcessor.applyUpdate<T>(doc, upd)
}
}
async clean<T extends Doc>(
ctx: MeasureContext,
workspace: WorkspaceId,
domain: Domain,
docs: Ref<T>[]
): Promise<void> {
const db = this.workspaces.get(workspace.name)
if (db === undefined) {
return
}
for (const d of docs) {
db.delDoc(d)
}
}
async close (): Promise<void> {}
}

View File

@ -1,5 +1,6 @@
export interface ServerEnv {
url: string
dbUrl: string
mongoUrl?: string
elasticUrl: string
serverSecret: string
rekoniUrl: string
@ -27,12 +28,6 @@ export function serverConfigFromEnv (): ServerEnv {
}
const mongoUrl = process.env.MONGO_URL
if (mongoUrl === undefined) {
console.error('please provide mongodb url')
process.exit(1)
}
const url = dbUrl !== mongoUrl ? `${dbUrl};${mongoUrl}` : dbUrl
const elasticUrl = process.env.ELASTIC_URL
if (elasticUrl === undefined) {
@ -78,7 +73,8 @@ export function serverConfigFromEnv (): ServerEnv {
const brandingPath = process.env.BRANDING_PATH
return {
url,
dbUrl,
mongoUrl,
elasticUrl,
elasticIndexName,
serverSecret,

View File

@ -35,7 +35,8 @@ import core, {
WorkspaceId,
WorkspaceIdWithUrl,
type Doc,
type Ref
type Ref,
type WithLookup
} from '@hcengineering/core'
import { consoleModelLogger, MigrateOperation, ModelLogger, tryMigrate } from '@hcengineering/model'
import { DomainIndexHelperImpl, Pipeline, StorageAdapter, type DbAdapter } from '@hcengineering/server-core'
@ -79,11 +80,9 @@ export class FileModelLogger implements ModelLogger {
* @public
*/
export function prepareTools (rawTxes: Tx[]): {
mongodbUri: string | undefined
dbUrl: string
txes: Tx[]
} {
const mongodbUri = process.env.MONGO_URL
const dbUrl = process.env.DB_URL
if (dbUrl === undefined) {
console.error('please provide db url.')
@ -91,7 +90,6 @@ export function prepareTools (rawTxes: Tx[]): {
}
return {
mongodbUri,
dbUrl,
txes: JSON.parse(JSON.stringify(rawTxes)) as Tx[]
}
@ -157,7 +155,12 @@ export async function updateModel (
const states = await connection.findAll<MigrationState>(core.class.MigrationState, {})
const sts = Array.from(groupByArray(states, (it) => it.plugin).entries())
const migrateState = new Map(sts.map((it) => [it[0], new Set(it[1].map((q) => q.state))]))
const _toSet = (vals: WithLookup<MigrationState>[]): Set<string> => {
return new Set(vals.map((q) => q.state))
}
const migrateState = new Map<string, Set<string>>(sts.map((it) => [it[0], _toSet(it[1])]))
try {
let i = 0
@ -447,9 +450,11 @@ async function createUpdateIndexes (
if (adapter === undefined) {
throw new PlatformError(unknownError(`Adapter for domain ${domain} not found`))
}
const dbHelper = adapter.helper()
const dbHelper = adapter.helper?.()
await domainHelper.checkDomain(ctx, domain, await dbHelper.estimatedCount(domain), dbHelper)
if (dbHelper !== undefined) {
await domainHelper.checkDomain(ctx, domain, await dbHelper.estimatedCount(domain), dbHelper)
}
completed++
await progress((100 / allDomains.length) * completed)
}

View File

@ -74,13 +74,6 @@ export function serveWorkspaceAccount (
process.exit(1)
}
// Required by the tool
const dbUri = process.env.MONGO_URL
if (dbUri === undefined) {
console.log('Please provide mongodb url')
process.exit(1)
}
const waitTimeout = parseInt(process.env.WAIT_TIMEOUT ?? '5000')
setMetadata(serverToken.metadata.Secret, serverSecret)

View File

@ -113,20 +113,16 @@ export async function createWorkspace (
await handleWsEvent?.('create-started', version, 10)
const { mongodbUri, dbUrl } = prepareTools([])
if (mongodbUri === undefined) {
throw new Error('No MONGO_URL specified')
}
const dbUrls = mongodbUri !== undefined && dbUrl !== mongodbUri ? `${dbUrl};${mongodbUri}` : dbUrl
const { dbUrl } = prepareTools([])
const hierarchy = new Hierarchy()
const modelDb = new ModelDb(hierarchy)
registerServerPlugins()
registerStringLoaders()
const { pipeline, storageAdapter } = await getServerPipeline(ctx, txes, mongodbUri, dbUrl, wsUrl)
const { pipeline, storageAdapter } = await getServerPipeline(ctx, txes, dbUrl, wsUrl)
try {
const txFactory = getTxAdapterFactory(ctx, dbUrls, wsUrl, null, {
const txFactory = getTxAdapterFactory(ctx, dbUrl, wsUrl, null, {
externalStorage: storageAdapter,
fullTextUrl: 'http://localhost:9200',
indexParallel: 0,
@ -134,7 +130,7 @@ export async function createWorkspace (
rekoniUrl: '',
usePassedCtx: true
})
const txAdapter = await txFactory(ctx, hierarchy, dbUrl ?? mongodbUri, wsId, modelDb, storageAdapter)
const txAdapter = await txFactory(ctx, hierarchy, dbUrl, wsId, modelDb, storageAdapter)
await childLogger.withLog('init-workspace', {}, async (ctx) => {
await initModel(ctx, wsId, txes, txAdapter, storageAdapter, ctxModellogger, async (value) => {
@ -204,17 +200,14 @@ export async function upgradeWorkspace (
forceIndexes: boolean = false,
external: boolean = false
): Promise<void> {
const { mongodbUri, dbUrl } = prepareTools([])
if (mongodbUri === undefined) {
throw new Error('No MONGO_URL specified')
}
const { dbUrl } = prepareTools([])
let pipeline: Pipeline | undefined
let storageAdapter: StorageAdapter | undefined
registerServerPlugins()
registerStringLoaders()
try {
;({ pipeline, storageAdapter } = await getServerPipeline(ctx, txes, mongodbUri, dbUrl, {
;({ pipeline, storageAdapter } = await getServerPipeline(ctx, txes, dbUrl, {
name: ws.workspace,
workspaceName: ws.workspaceName ?? '',
workspaceUrl: ws.workspaceUrl ?? ''

View File

@ -91,7 +91,7 @@ export class PlatformWorker {
this.userManager = new UserManager(db.collection<GithubUserRecord>('users'))
const storageConfig = storageConfigFromEnv()
this.storageAdapter = buildStorageFromConfig(storageConfig, config.MongoURL)
this.storageAdapter = buildStorageFromConfig(storageConfig)
}
async close (): Promise<void> {

View File

@ -44,7 +44,7 @@ export const main = async (): Promise<void> => {
setMetadata(serverToken.metadata.Secret, config.Secret)
const storageConfig: StorageConfiguration = storageConfigFromEnv()
const storageAdapter = buildStorageFromConfig(storageConfig, config.MongoURI)
const storageAdapter = buildStorageFromConfig(storageConfig)
const db = await getDB()
const gmailController = GmailController.create(ctx, db, storageAdapter)

View File

@ -51,7 +51,7 @@ export const main = async (): Promise<void> => {
const storageConfigs: StorageConfiguration = storageConfigFromEnv()
const ctx = new MeasureMetricsContext('love', {}, {}, newMetrics())
const storageConfig = storageConfigs.storages.findLast((p) => p.name === config.StorageProviderName)
const storageAdapter = buildStorageFromConfig(storageConfigs, config.MongoUrl)
const storageAdapter = buildStorageFromConfig(storageConfigs)
const app = express()
const port = config.Port
app.use(cors())
@ -78,12 +78,14 @@ export const main = async (): Promise<void> => {
for (const res of event.egressInfo.fileResults) {
const data = dataByUUID.get(res.filename)
if (data !== undefined) {
const client = await WorkspaceClient.create(data.workspace)
const prefix = rootPrefix(storageConfig, data.workspaceId)
const filename = stripPrefix(prefix, res.filename)
await storageAdapter.syncBlobFromStorage(ctx, data.workspaceId, filename, storageConfig?.name)
await client.saveFile(filename, data.name)
await client.close()
const storedBlob = await storageAdapter.stat(ctx, data.workspaceId, filename)
if (storedBlob !== undefined) {
const client = await WorkspaceClient.create(data.workspace)
await client.saveFile(filename, data.name, storedBlob)
await client.close()
}
dataByUUID.delete(res.filename)
} else {
console.log('no data found for', res.filename)

View File

@ -41,7 +41,7 @@ export class WorkspaceClient {
return this.client
}
async saveFile (uuid: string, name: string): Promise<void> {
async saveFile (uuid: string, name: string, blob: Blob): Promise<void> {
const current = await this.client.findOne(drive.class.Drive, { _id: love.space.Drive })
if (current === undefined) {
await this.client.createDoc(
@ -59,23 +59,19 @@ export class WorkspaceClient {
love.space.Drive
)
}
const blob = await this.client.findOne(core.class.Blob, { _id: uuid as Ref<Blob> })
if (blob !== undefined) {
const data = {
file: uuid as Ref<Blob>,
title: name,
size: blob.size,
type: blob.contentType,
lastModified: blob.modifiedOn,
// hardcoded values from preset we use
// https://docs.livekit.io/realtime/egress/overview/#EncodingOptionsPreset
metadata: {
originalHeight: 720,
originalWidth: 1280
}
const data = {
file: uuid as Ref<Blob>,
title: name,
size: blob.size,
type: blob.contentType,
lastModified: blob.modifiedOn,
// hardcoded values from preset we use
// https://docs.livekit.io/realtime/egress/overview/#EncodingOptionsPreset
metadata: {
originalHeight: 720,
originalWidth: 1280
}
await createFile(this.client, love.space.Drive, drive.ids.Root, data)
}
await createFile(this.client, love.space.Drive, drive.ids.Root, data)
}
}

View File

@ -4,7 +4,6 @@
export interface Config {
Port: number
DbURL: string
Secret: string
}
@ -13,7 +12,6 @@ const parseNumber = (str: string | undefined): number | undefined => (str !== un
const config: Config = (() => {
const params: Partial<Config> = {
Port: parseNumber(process.env.PORT) ?? 4005,
DbURL: process.env.MONGO_URL,
Secret: process.env.SECRET
}

View File

@ -17,7 +17,7 @@ export const main = async (): Promise<void> => {
setupMetadata()
const storageConfig = storageConfigFromEnv()
const { app, close } = createServer(config.DbURL, storageConfig)
const { app, close } = createServer(storageConfig)
const server = listen(app, config.Port)
const shutdown = (): void => {

View File

@ -110,8 +110,8 @@ const wrapRequest = (fn: AsyncRequestHandler) => (req: Request, res: Response, n
handleRequest(fn, req, res, next)
}
export function createServer (dbUrl: string, storageConfig: StorageConfiguration): { app: Express, close: () => void } {
const storageAdapter = buildStorageFromConfig(storageConfig, dbUrl)
export function createServer (storageConfig: StorageConfiguration): { app: Express, close: () => void } {
const storageAdapter = buildStorageFromConfig(storageConfig)
const measureCtx = new MeasureMetricsContext('print', {})
const app = express()
@ -187,7 +187,7 @@ export function createServer (dbUrl: string, storageConfig: StorageConfiguration
throw new ApiError(404, `File ${file} not found`)
}
const htmlRes = await convertToHtml(Buffer.concat(originalFile))
const htmlRes = await convertToHtml(Buffer.concat(originalFile as any))
if (htmlRes === undefined) {
throw new ApiError(400, 'Failed to convert')

View File

@ -8,7 +8,6 @@ export interface Config {
AccountsUrl: string
Cert: Buffer
CertPwd: string
DbURL: string
Port: number
Secret: string
ServiceID: string
@ -23,7 +22,6 @@ const config: Config = (() => {
AccountsUrl: process.env.ACCOUNTS_URL,
Cert: process.env.CERTIFICATE_PATH !== undefined ? fs.readFileSync(process.env.CERTIFICATE_PATH) : undefined,
CertPwd: process.env.CERTIFICATE_PASSWORD ?? '',
DbURL: process.env.MONGO_URL,
Port: parseNumber(process.env.PORT) ?? 4006,
Secret: process.env.SECRET,
ServiceID: process.env.SERVICE_ID,

View File

@ -3,9 +3,9 @@
//
import { setMetadata } from '@hcengineering/platform'
import { storageConfigFromEnv } from '@hcengineering/server-storage'
import serverClient from '@hcengineering/server-client'
import { loadBrandingMap } from '@hcengineering/server-core'
import { storageConfigFromEnv } from '@hcengineering/server-storage'
import serverToken from '@hcengineering/server-token'
import config from './config'
@ -20,7 +20,7 @@ const setupMetadata = (): void => {
export const main = async (): Promise<void> => {
setupMetadata()
const storageConfig = storageConfigFromEnv()
const server = listen(createServer(config.DbURL, storageConfig, loadBrandingMap(config.BrandingPath)), config.Port)
const server = listen(createServer(storageConfig, loadBrandingMap(config.BrandingPath)), config.Port)
const shutdown = (): void => {
server.close(() => process.exit())

View File

@ -14,19 +14,19 @@
// limitations under the License.
//
import cors from 'cors'
import express, { type Express, type NextFunction, type Request, type Response } from 'express'
import { Token } from '@hcengineering/server-token'
import { type Server } from 'http'
import { MeasureMetricsContext, generateId } from '@hcengineering/core'
import { StorageConfiguration } from '@hcengineering/server-core'
import { buildStorageFromConfig } from '@hcengineering/server-storage'
import { MeasureMetricsContext, generateId } from '@hcengineering/core'
import { Token } from '@hcengineering/server-token'
import cors from 'cors'
import express, { type Express, type NextFunction, type Request, type Response } from 'express'
import { type Server } from 'http'
import { type Branding, type BrandingMap, extractBranding } from './branding'
import config from './config'
import { ApiError } from './error'
import { signPDF } from './sign'
import { extractToken } from './token'
import { type Branding, type BrandingMap, extractBranding } from './branding'
import config from './config'
type AsyncRequestHandler = (
req: Request,
@ -58,8 +58,8 @@ const wrapRequest =
handleRequest(fn, brandings, req, res, next)
}
export function createServer (dbUrl: string, storageConfig: StorageConfiguration, brandings: BrandingMap): Express {
const storageAdapter = buildStorageFromConfig(storageConfig, dbUrl)
export function createServer (storageConfig: StorageConfiguration, brandings: BrandingMap): Express {
const storageAdapter = buildStorageFromConfig(storageConfig)
const measureCtx = new MeasureMetricsContext('sign', {})
const app = express()

View File

@ -78,7 +78,7 @@ export const start = async (): Promise<void> => {
registerLoaders()
const storageConfig: StorageConfiguration = storageConfigFromEnv()
const storageAdapter = buildStorageFromConfig(storageConfig, config.MongoURL)
const storageAdapter = buildStorageFromConfig(storageConfig)
const worker = await PlatformWorker.create(ctx, storageAdapter)
const bot = await setUpBot(worker)

View File

@ -27,7 +27,7 @@ export const main = async (): Promise<void> => {
setMetadata(serverToken.metadata.Secret, config.Secret)
const storageConfig: StorageConfiguration = storageConfigFromEnv()
const storageAdapter = buildStorageFromConfig(storageConfig, config.MongoURI)
const storageAdapter = buildStorageFromConfig(storageConfig)
const platformWorker = await PlatformWorker.create(ctx, storageAdapter)
const endpoints: Array<[string, Handler]> = [

View File

@ -4,7 +4,6 @@ services:
- DB_URL=postgresql://postgres:example@postgres:5432
transactor:
environment:
- MONGO_URL=mongodb://mongodb:27018
- DB_URL=postgresql://postgres:example@postgres:5432
workspace:
environment:

View File

@ -65,7 +65,6 @@ services:
environment:
- SERVER_SECRET=secret
- DB_URL=mongodb://mongodb:27018
- MONGO_URL=mongodb://mongodb:27018
- TRANSACTOR_URL=ws://transactor:3334;ws://localhost:3334
- STORAGE_CONFIG=${STORAGE_CONFIG}
- MODEL_ENABLED=*
@ -90,7 +89,6 @@ services:
- SERVER_PORT=8083
- SERVER_SECRET=secret
- ACCOUNTS_URL=http://localhost:3003
- MONGO_URL=mongodb://mongodb:27018
- UPLOAD_URL=/files
- ELASTIC_URL=http://elastic:9200
- GMAIL_URL=http://localhost:8088
@ -142,7 +140,6 @@ services:
- COLLABORATOR_PORT=3079
- SECRET=secret
- ACCOUNTS_URL=http://account:3003
- MONGO_URL=mongodb://mongodb:27018
- STORAGE_CONFIG=${STORAGE_CONFIG}
restart: unless-stopped
rekoni:

View File

@ -15,4 +15,4 @@
./tool-pg.sh configure sanity-ws --list
# setup issue createdOn for yesterday
./tool-pg.sh change-field sanity-ws --objectId 65e47f1f1b875b51e3b4b983 --objectClass tracker:class:Issue --attribute createdOn --value $(($(date +%s)*1000 - 86400000)) --type number --domain task
./tool-pg.sh change-field sanity-ws --objectId 65e47f1f1b875b51e3b4b983 --objectClass tracker:class:Issue --attribute createdOn --value $(($(date +%s)*1000 - 86400000)) --type number

View File

@ -15,4 +15,4 @@
./tool.sh configure sanity-ws --list
# setup issue createdOn for yesterday
./tool.sh change-field sanity-ws --objectId 65e47f1f1b875b51e3b4b983 --objectClass tracker:class:Issue --attribute createdOn --value $(($(date +%s)*1000 - 86400000)) --type number --domain task
./tool.sh change-field sanity-ws --objectId 65e47f1f1b875b51e3b4b983 --objectClass tracker:class:Issue --attribute createdOn --value $(($(date +%s)*1000 - 86400000)) --type number