UBERF-6161: Storage configuration (#5109)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-04-02 14:05:16 +07:00 committed by GitHub
parent 5da63b70f4
commit 0803bb4ea2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
50 changed files with 1254 additions and 713 deletions

File diff suppressed because it is too large Load Diff

View File

@ -75,6 +75,7 @@ services:
environment:
- SERVER_PORT=8080
- SERVER_SECRET=secret
- MONGO_URL=mongodb://mongodb:27017
- ACCOUNTS_URL=http://localhost:3000
- REKONI_URL=http://localhost:4004
- CALENDAR_URL=http://localhost:8095

View File

@ -69,6 +69,7 @@ class InMemoryTxAdapter extends DummyDbAdapter implements TxAdapter {
* @public
*/
export async function createInMemoryTxAdapter (
ctx: MeasureContext,
hierarchy: Hierarchy,
url: string,
workspace: WorkspaceId

View File

@ -18,6 +18,7 @@ import chunter, { type ChatMessage } from '@hcengineering/chunter'
import contact from '@hcengineering/contact'
import core, {
DOMAIN_TX,
type MeasureContext,
SortingOrder,
TxOperations,
TxProcessor,
@ -43,6 +44,7 @@ import { MongoClient } from 'mongodb'
export const DOMAIN_ACTIVITY = 'activity' as Domain
export async function cleanWorkspace (
ctx: MeasureContext,
mongoUrl: string,
workspaceId: WorkspaceId,
storageAdapter: StorageAdapter,
@ -67,14 +69,14 @@ export async function cleanWorkspace (
attachments.map((it) => it.file).concat(contacts.map((it) => it.avatar).filter((it) => it) as string[])
)
const minioList = await storageAdapter.list(workspaceId)
const minioList = await storageAdapter.list(ctx, workspaceId)
const toClean: string[] = []
for (const mv of minioList) {
if (!files.has(mv.name)) {
toClean.push(mv.name)
if (!files.has(mv._id)) {
toClean.push(mv._id)
}
}
await storageAdapter.remove(workspaceId, toClean)
await storageAdapter.remove(ctx, workspaceId, toClean)
// connection.loadChunk(DOMAIN_BLOB, idx = )
if (opt.recruit) {
@ -145,16 +147,20 @@ export async function cleanWorkspace (
}
}
export async function fixMinioBW (workspaceId: WorkspaceId, storageService: StorageAdapter): Promise<void> {
export async function fixMinioBW (
ctx: MeasureContext,
workspaceId: WorkspaceId,
storageService: StorageAdapter
): Promise<void> {
console.log('try clean bw miniature for ', workspaceId.name)
const from = new Date(new Date().setDate(new Date().getDate() - 7))
const list = await storageService.list(workspaceId)
const from = new Date(new Date().setDate(new Date().getDate() - 7)).getTime()
const list = await storageService.list(ctx, workspaceId)
console.log('found', list.length)
let removed = 0
for (const obj of list) {
if (obj.lastModified < from) continue
if (obj.name.includes('%size%')) {
await storageService.remove(workspaceId, [obj.name])
if (obj.modifiedOn < from) continue
if ((obj._id as string).includes('%size%')) {
await storageService.remove(ctx, workspaceId, [obj._id])
removed++
if (removed % 100 === 0) {
console.log('removed: ', removed)

View File

@ -1,5 +1,5 @@
import { dropWorkspace, setWorkspaceDisabled, type Workspace } from '@hcengineering/account'
import core, { AccountRole, MeasureMetricsContext, SortingOrder } from '@hcengineering/core'
import core, { AccountRole, type MeasureContext, MeasureMetricsContext, SortingOrder } from '@hcengineering/core'
import contact from '@hcengineering/model-contact'
import { getWorkspaceDB } from '@hcengineering/mongo'
import { type StorageAdapter } from '@hcengineering/server-core'
@ -7,6 +7,7 @@ import { connect } from '@hcengineering/server-tool'
import { type Db, type MongoClient } from 'mongodb'
export async function checkOrphanWorkspaces (
ctx: MeasureContext,
workspaces: Workspace[],
transactorUrl: string,
productId: string,
@ -40,7 +41,7 @@ export async function checkOrphanWorkspaces (
// Find last transaction index:
const wspace = { name: ws.workspace, productId }
const hasBucket = await storageAdapter.exists(wspace)
const hasBucket = await storageAdapter.exists(ctx, wspace)
const [lastTx] = await connection.findAll(
core.class.Tx,
{
@ -69,12 +70,13 @@ export async function checkOrphanWorkspaces (
const workspaceDb = getWorkspaceDB(client, { name: ws.workspace, productId })
await workspaceDb.dropDatabase()
if (storageAdapter !== undefined && hasBucket) {
const docs = await storageAdapter.list(wspace)
const docs = await storageAdapter.list(ctx, wspace)
await storageAdapter.remove(
ctx,
wspace,
docs.map((it) => it.name)
docs.map((it) => it._id)
)
await storageAdapter?.delete(wspace)
await storageAdapter.delete(ctx, wspace)
}
}
}

View File

@ -410,6 +410,7 @@ export function devTool (
// We need to update workspaces with missing workspaceUrl
await checkOrphanWorkspaces(
toolCtx,
workspaces,
transactorUrl,
productId,
@ -482,7 +483,6 @@ export function devTool (
program
.command('backup <dirName> <workspace>')
.description('dump workspace transactions and minio resources')
.requiredOption('-i --index <index>', 'Index name for elastic')
.option('-s, --skip <skip>', 'A list of ; separated domain names to skip during backup', '')
.option('-f, --force', 'Force backup', false)
.action(async (dirName: string, workspace: string, cmd: { skip: string, force: boolean }) => {
@ -518,7 +518,12 @@ export function devTool (
.description('dump workspace transactions and minio resources')
.action(async (bucketName: string, dirName: string, workspace: string, cmd) => {
const { storageAdapter } = prepareTools()
const storage = await createStorageBackupStorage(storageAdapter, getWorkspaceId(bucketName, productId), dirName)
const storage = await createStorageBackupStorage(
toolCtx,
storageAdapter,
getWorkspaceId(bucketName, productId),
dirName
)
await backup(transactorUrl, getWorkspaceId(workspace, productId), storage)
})
program
@ -526,7 +531,7 @@ export function devTool (
.description('dump workspace transactions and minio resources')
.action(async (bucketName: string, dirName: string, workspace: string, date, cmd) => {
const { storageAdapter } = prepareTools()
const storage = await createStorageBackupStorage(storageAdapter, getWorkspaceId(bucketName), dirName)
const storage = await createStorageBackupStorage(toolCtx, storageAdapter, getWorkspaceId(bucketName), dirName)
await restore(transactorUrl, getWorkspaceId(workspace, productId), storage, parseInt(date ?? '-1'))
})
program
@ -535,7 +540,12 @@ export function devTool (
.action(async (bucketName: string, dirName: string, cmd) => {
const { storageAdapter } = prepareTools()
const storage = await createStorageBackupStorage(storageAdapter, getWorkspaceId(bucketName, productId), dirName)
const storage = await createStorageBackupStorage(
toolCtx,
storageAdapter,
getWorkspaceId(bucketName, productId),
dirName
)
await backupList(storage)
})
@ -576,7 +586,7 @@ export function devTool (
}
console.log(`clearing ${workspace} history:`)
await clearTelegramHistory(mongodbUri, getWorkspaceId(workspace, productId), telegramDB, minio)
await clearTelegramHistory(toolCtx, mongodbUri, getWorkspaceId(workspace, productId), telegramDB, minio)
})
})
@ -596,7 +606,7 @@ export function devTool (
for (const w of workspaces) {
console.log(`clearing ${w.workspace} history:`)
await clearTelegramHistory(mongodbUri, getWorkspaceId(w.workspace, productId), telegramDB, minio)
await clearTelegramHistory(toolCtx, mongodbUri, getWorkspaceId(w.workspace, productId), telegramDB, minio)
}
})
})
@ -624,6 +634,7 @@ export function devTool (
const { mongodbUri, storageAdapter: minio } = prepareTools()
await withDatabase(mongodbUri, async (db) => {
await cleanWorkspace(
toolCtx,
mongodbUri,
getWorkspaceId(workspace, productId),
minio,
@ -636,7 +647,7 @@ export function devTool (
program.command('fix-bw-workspace <workspace>').action(async (workspace: string) => {
const { storageAdapter: minio } = prepareTools()
await fixMinioBW(getWorkspaceId(workspace, productId), minio)
await fixMinioBW(toolCtx, getWorkspaceId(workspace, productId), minio)
})
program

View File

@ -14,7 +14,7 @@
// limitations under the License.
//
import { DOMAIN_TX, type Ref, type WorkspaceId } from '@hcengineering/core'
import { DOMAIN_TX, type MeasureContext, type Ref, type WorkspaceId } from '@hcengineering/core'
import { type StorageAdapter } from '@hcengineering/server-core'
import { DOMAIN_ATTACHMENT } from '@hcengineering/model-attachment'
import contact, { DOMAIN_CHANNEL } from '@hcengineering/model-contact'
@ -29,6 +29,7 @@ const LastMessages = 'last-msgs'
* @public
*/
export async function clearTelegramHistory (
ctx: MeasureContext,
mongoUrl: string,
workspaceId: WorkspaceId,
tgDb: string,
@ -90,7 +91,7 @@ export async function clearTelegramHistory (
workspaceDB.collection(DOMAIN_ATTACHMENT).deleteMany({
attachedToClass: telegram.class.Message
}),
storageAdapter.remove(workspaceId, Array.from(attachments))
storageAdapter.remove(ctx, workspaceId, Array.from(attachments))
])
console.log('clearing telegram service data...')

View File

@ -21,6 +21,12 @@ export default mergeIds(coreId, core, {
Archived: '' as IntlString,
ClassLabel: '' as IntlString,
ClassPropertyLabel: '' as IntlString,
Members: '' as IntlString
Members: '' as IntlString,
Blob: '' as IntlString,
BlobContentType: '' as IntlString,
BlobEtag: '' as IntlString,
BlobVersion: '' as IntlString,
BlobStorageId: '' as IntlString,
BlobSize: '' as IntlString
}
})

View File

@ -14,7 +14,9 @@
//
import {
type Blob,
DOMAIN_BLOB,
DOMAIN_BLOB_DATA,
DOMAIN_CONFIGURATION,
DOMAIN_DOC_INDEX_STATE,
DOMAIN_FULLTEXT_BLOB,
@ -63,6 +65,7 @@ import {
ReadOnly,
TypeBoolean,
TypeIntlString,
TypeNumber,
TypeRecord,
TypeRef,
TypeString,
@ -129,6 +132,34 @@ export class TAttachedDoc extends TDoc implements AttachedDoc {
collection!: string
}
@Model(core.class.Blob, core.class.Doc, DOMAIN_BLOB_DATA)
@UX(core.string.Object)
export class TBlob extends TDoc implements Blob {
@Prop(TypeString(), core.string.Blob)
@ReadOnly()
provider!: string
@Prop(TypeString(), core.string.BlobContentType)
@ReadOnly()
contentType!: string
@Prop(TypeString(), core.string.BlobStorageId)
@ReadOnly()
storageId!: string
@Prop(TypeString(), core.string.BlobEtag)
@ReadOnly()
etag!: string
@Prop(TypeString(), core.string.BlobVersion)
@ReadOnly()
version!: string
@Prop(TypeNumber(), core.string.BlobSize)
@ReadOnly()
size!: number
}
@UX(core.string.ClassLabel)
@Model(core.class.Class, core.class.Doc, DOMAIN_MODEL)
export class TClass extends TDoc implements Class<Obj> {

View File

@ -29,6 +29,7 @@ import {
TArrOf,
TAttachedDoc,
TAttribute,
TBlob,
TBlobData,
TClass,
TCollection,
@ -151,7 +152,8 @@ export function createModel (builder: Builder): void {
TIndexConfiguration,
TStatus,
TStatusCategory,
TMigrationState
TMigrationState,
TBlob
)
builder.createDoc(

View File

@ -337,6 +337,12 @@ export const DOMAIN_TRANSIENT = 'transient' as Domain
*/
export const DOMAIN_BLOB = 'blob' as Domain
/**
* Special domain to access s3 blob data.
* @public
*/
export const DOMAIN_BLOB_DATA = 'blob-data' as Domain
/**
* Special domain to access s3 blob data.
* @public
@ -535,6 +541,29 @@ export interface IndexStageState extends Doc {
attributes: Record<string, any>
}
/**
* @public
*
* A blob document to manage blob attached documents.
*
* _id: is a platform ID and it created using our regular generateId(),
* and storageId is a provider specified storage id.
*/
export interface Blob extends Doc {
// Provider
provider: string
// A provider specific id
storageId: string
// A content type for blob
contentType: string
// A etag for blob
etag: string
// Document version if supported by provider
version: string | null
// A document size
size: number
}
/**
* @public
*

View File

@ -20,6 +20,7 @@ import type {
AnyAttribute,
ArrOf,
AttachedDoc,
Blob,
BlobData,
Class,
Collection,
@ -82,6 +83,7 @@ export default plugin(coreId, {
class: {
Obj: '' as Ref<Class<Obj>>,
Doc: '' as Ref<Class<Doc>>,
Blob: '' as Ref<Class<Blob>>,
AttachedDoc: '' as Ref<Class<AttachedDoc>>,
Class: '' as Ref<Class<Class<Obj>>>,
Mixin: '' as Ref<Class<Mixin<Doc>>>,

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
import { getWorkspaceId } from '@hcengineering/core'
import { getWorkspaceId, MeasureMetricsContext } from '@hcengineering/core'
import { MinioService } from '@hcengineering/minio'
import { setMetadata } from '@hcengineering/platform'
import { backup, createStorageBackupStorage } from '@hcengineering/server-backup'
@ -92,10 +92,12 @@ export class PlatformWorker {
async backup (): Promise<void> {
const workspaces = await getWorkspaces()
const ctx = new MeasureMetricsContext('backup', {})
for (const ws of workspaces) {
console.log('\n\nBACKUP WORKSPACE ', ws.workspace, ws.productId)
try {
const storage = await createStorageBackupStorage(
ctx,
this.storageAdapter,
getWorkspaceId('backups', ws.productId),
ws.workspace

View File

@ -15,87 +15,29 @@
//
// Add this to the VERY top of the first file loaded in your app
import { setMetadata } from '@hcengineering/platform'
import serverCore from '@hcengineering/server-core'
import serverToken from '@hcengineering/server-token'
import { serverFactories } from '@hcengineering/server-ws'
import { start } from '.'
import serverNotification from '@hcengineering/server-notification'
import contactPlugin from '@hcengineering/contact'
import { setMetadata } from '@hcengineering/platform'
import { serverConfigFromEnv, storageConfigFromEnv } from '@hcengineering/server'
import serverCore, { type StorageConfiguration } from '@hcengineering/server-core'
import serverNotification from '@hcengineering/server-notification'
import serverToken from '@hcengineering/server-token'
import { start } from '.'
const serverPort = parseInt(process.env.SERVER_PORT ?? '3333')
const {
url,
frontUrl,
serverSecret,
sesUrl,
elasticUrl,
elasticIndexName,
accountsUrl,
rekoniUrl,
serverFactory,
serverPort,
enableCompression
} = serverConfigFromEnv()
const storageConfig: StorageConfiguration = storageConfigFromEnv()
const serverFactory = serverFactories[(process.env.SERVER_PROVIDER as string) ?? 'ws'] ?? serverFactories.ws
const enableCompression = (process.env.ENABLE_COMPRESSION ?? 'true') === 'true'
const url = process.env.MONGO_URL
if (url === undefined) {
console.error('please provide mongodb url')
process.exit(1)
}
const elasticUrl = process.env.ELASTIC_URL
if (elasticUrl === undefined) {
console.error('please provide elastic url')
process.exit(1)
}
const minioEndpoint = process.env.MINIO_ENDPOINT
if (minioEndpoint === undefined) {
console.error('MINIO_ENDPOINT is required')
process.exit(1)
}
const minioAccessKey = process.env.MINIO_ACCESS_KEY
if (minioAccessKey === undefined) {
console.error('MINIO_ACCESS_KEY is required')
process.exit(1)
}
const minioSecretKey = process.env.MINIO_SECRET_KEY
if (minioSecretKey === undefined) {
console.error('MINIO_SECRET_KEY is required')
process.exit(1)
}
const minioConf = {
endPoint: minioEndpoint,
accessKey: minioAccessKey,
secretKey: minioSecretKey
}
const serverSecret = process.env.SERVER_SECRET
if (serverSecret === undefined) {
console.log('Please provide server secret')
process.exit(1)
}
const rekoniUrl = process.env.REKONI_URL
if (rekoniUrl === undefined) {
console.log('Please provide REKONI_URL url')
process.exit(1)
}
const frontUrl = process.env.FRONT_URL
if (frontUrl === undefined) {
console.log('Please provide FRONT_URL url')
process.exit(1)
}
const accountsUrl = process.env.ACCOUNTS_URL
if (accountsUrl === undefined) {
console.log('Please provide ACCOUNTS_URL url')
process.exit(1)
}
const elasticIndexName = process.env.ELASTIC_INDEX_NAME
if (elasticIndexName === undefined) {
console.log('Please provide ELASTIC_INDEX_NAME')
process.exit(1)
}
const sesUrl = process.env.SES_URL
const cursorMaxTime = process.env.SERVER_CURSOR_MAXTIMEMS
const lastNameFirst = process.env.LAST_NAME_FIRST === 'true'
@ -114,7 +56,7 @@ console.log(
)
const shutdown = start(url, {
fullTextUrl: elasticUrl,
minioConf,
storageConfig,
rekoniUrl,
port: serverPort,
serverFactory,

View File

@ -25,27 +25,26 @@ import {
type ServerStorage,
type WorkspaceId
} from '@hcengineering/core'
import { MinioService } from '@hcengineering/minio'
import { createElasticAdapter, createElasticBackupDataAdapter } from '@hcengineering/elastic'
import {
ConfigurationMiddleware,
ModifiedMiddleware,
PrivateMiddleware,
QueryJoinMiddleware,
SpaceSecurityMiddleware,
SpacePermissionsMiddleware
SpacePermissionsMiddleware,
SpaceSecurityMiddleware
} from '@hcengineering/middleware'
import { createMongoAdapter, createMongoTxAdapter } from '@hcengineering/mongo'
import { OpenAIEmbeddingsStage, openAIId, openAIPluginImpl } from '@hcengineering/openai'
import { addLocation, addStringsLoader } from '@hcengineering/platform'
import {
BackupClientSession,
buildStorageFromConfig,
createNullAdapter,
createRekoniAdapter,
createStorageDataAdapter,
createYDocAdapter,
getMetricsContext,
type MinioConfig
getMetricsContext
} from '@hcengineering/server'
import { serverActivityId } from '@hcengineering/server-activity'
import { serverAttachmentId } from '@hcengineering/server-attachment'
@ -61,13 +60,14 @@ import {
FullTextPushStage,
globalIndexer,
IndexedFieldStage,
type StorageAdapter,
type StorageConfiguration,
type ContentTextAdapter,
type DbConfiguration,
type FullTextAdapter,
type FullTextPipelineStage,
type MiddlewareCreator,
type Pipeline
type Pipeline,
type StorageAdapter
} from '@hcengineering/server-core'
import { serverDocumentId } from '@hcengineering/server-document'
import { serverGmailId } from '@hcengineering/server-gmail'
@ -188,7 +188,7 @@ export function start (
dbUrl: string,
opt: {
fullTextUrl: string
minioConf: MinioConfig
storageConfig: StorageConfiguration
rekoniUrl: string
port: number
productId: string
@ -236,6 +236,9 @@ export function start (
]
const metrics = getMetricsContext()
const externalStorage = buildStorageFromConfig(opt.storageConfig, dbUrl)
function createIndexStages (
fullText: MeasureContext,
workspace: WorkspaceId,
@ -361,12 +364,7 @@ export function start (
},
serviceAdapters: {},
defaultContentAdapter: 'Rekoni',
storageFactory: () =>
new MinioService({
...opt.minioConf,
port: 9000,
useSSL: false
}),
storageFactory: () => externalStorage,
workspace
}
return createPipeline(ctx, conf, middlewares, upgrade, broadcast)

View File

@ -24,7 +24,7 @@ import type { TriggerControl } from '@hcengineering/server-core'
*/
export async function OnAttachmentDelete (
tx: Tx,
{ findAll, hierarchy, fulltextFx, storageFx, removedMap }: TriggerControl
{ findAll, hierarchy, fulltextFx, storageFx, removedMap, ctx }: TriggerControl
): Promise<Tx[]> {
const rmTx = TxProcessor.extractTx(tx) as TxRemoveDoc<Attachment>
@ -39,7 +39,7 @@ export async function OnAttachmentDelete (
})
storageFx(async (adapter, bucket) => {
await adapter.remove(bucket, [attach.file])
await adapter.remove(ctx, bucket, [attach.file])
})
return []

View File

@ -106,12 +106,7 @@ export class CollaborativeContentRetrievalStage implements FullTextPipelineStage
if (collaborativeDoc !== undefined && collaborativeDoc !== '') {
const { documentId } = parseCollaborativeDoc(collaborativeDoc)
let docInfo: any | undefined
try {
docInfo = await this.storageAdapter?.stat(this.workspace, documentId)
} catch (err: any) {
// not found.
}
const docInfo: any | undefined = await this.storageAdapter?.stat(this.metrics, this.workspace, documentId)
if (docInfo !== undefined) {
const digest = docInfo.etag
@ -120,7 +115,7 @@ export class CollaborativeContentRetrievalStage implements FullTextPipelineStage
;(update as any)[docUpdKey(digestKey)] = digest
const contentType = ((docInfo.metaData['content-type'] as string) ?? '').split(';')[0]
const readable = await this.storageAdapter?.get(this.workspace, documentId)
const readable = await this.storageAdapter?.get(this.metrics, this.workspace, documentId)
if (readable !== undefined) {
let textContent = await this.metrics.with(

View File

@ -38,7 +38,7 @@ import { workbenchId } from '@hcengineering/workbench'
*/
export async function OnContactDelete (
tx: Tx,
{ findAll, hierarchy, storageFx, removedMap, txFactory }: TriggerControl
{ findAll, hierarchy, storageFx, removedMap, txFactory, ctx }: TriggerControl
): Promise<Tx[]> {
const rmTx = tx as TxRemoveDoc<Contact>
@ -61,14 +61,15 @@ export async function OnContactDelete (
}
storageFx(async (adapter, bucket) => {
await adapter.remove(bucket, [avatar])
await adapter.remove(ctx, bucket, [avatar])
if (avatar != null) {
const extra = await adapter.list(bucket, avatar)
const extra = await adapter.list(ctx, bucket, avatar)
if (extra.length > 0) {
await adapter.remove(
ctx,
bucket,
Array.from(extra.entries()).map((it) => it[1].name)
Array.from(extra.entries()).map((it) => it[1]._id)
)
}
}

View File

@ -837,7 +837,7 @@ export async function createWorkspace (
const initWS = getMetadata(toolPlugin.metadata.InitWorkspace)
const wsId = getWorkspaceId(workspaceInfo.workspace, productId)
if (initWS !== undefined && (await getWorkspaceById(db, productId, initWS)) !== null) {
client = await initModel(getTransactor(), wsId, txes, [], ctxModellogger)
client = await initModel(ctx, getTransactor(), wsId, txes, [], ctxModellogger)
await client.close()
await cloneWorkspace(
getTransactor(),
@ -846,7 +846,7 @@ export async function createWorkspace (
)
client = await upgradeModel(getTransactor(), wsId, txes, migrationOperation, ctxModellogger)
} else {
client = await initModel(getTransactor(), wsId, txes, migrationOperation, ctxModellogger)
client = await initModel(ctx, getTransactor(), wsId, txes, migrationOperation, ctxModellogger)
}
} catch (err: any) {
return { workspaceInfo, err, client: {} as any }

View File

@ -1,4 +1,4 @@
import { WorkspaceId } from '@hcengineering/core'
import { MeasureContext, WorkspaceId } from '@hcengineering/core'
import { StorageAdapter } from '@hcengineering/server-core'
import { createReadStream, createWriteStream, existsSync } from 'fs'
import { mkdir, readFile, writeFile } from 'fs/promises'
@ -55,35 +55,43 @@ class AdapterStorage implements BackupStorage {
constructor (
readonly client: StorageAdapter,
readonly workspaceId: WorkspaceId,
readonly root: string
readonly root: string,
readonly ctx: MeasureContext
) {}
async loadFile (name: string): Promise<Buffer> {
const data = await this.client.read(this.workspaceId, join(this.root, name))
const data = await this.client.read(this.ctx, this.workspaceId, join(this.root, name))
return Buffer.concat(data)
}
async write (name: string): Promise<Writable> {
const wr = new PassThrough()
void this.client.put(this.workspaceId, join(this.root, name), wr)
void this.client.put(this.ctx, this.workspaceId, join(this.root, name), wr, 'application/octet-stream')
return wr
}
async load (name: string): Promise<Readable> {
return await this.client.get(this.workspaceId, join(this.root, name))
return await this.client.get(this.ctx, this.workspaceId, join(this.root, name))
}
async exists (name: string): Promise<boolean> {
try {
await this.client.stat(this.workspaceId, join(this.root, name))
return true
return (await this.client.stat(this.ctx, this.workspaceId, join(this.root, name))) !== undefined
} catch (err) {
return false
}
}
async writeFile (name: string, data: string | Buffer): Promise<void> {
void this.client.put(this.workspaceId, join(this.root, name), data, data.length)
// TODO: add mime type detection here.
await this.client.put(
this.ctx,
this.workspaceId,
join(this.root, name),
data,
'application/octet-stream',
data.length
)
}
}
@ -101,12 +109,13 @@ export async function createFileBackupStorage (fileName: string): Promise<Backup
* @public
*/
export async function createStorageBackupStorage (
ctx: MeasureContext,
client: StorageAdapter,
workspaceId: WorkspaceId,
root: string
): Promise<BackupStorage> {
if (!(await client.exists(workspaceId))) {
await client.make(workspaceId)
if (!(await client.exists(ctx, workspaceId))) {
await client.make(ctx, workspaceId)
}
return new AdapterStorage(client, workspaceId, root)
return new AdapterStorage(client, workspaceId, root, ctx)
}

View File

@ -47,7 +47,7 @@ export async function loadCollaborativeDoc (
return await ctx.with('loadCollaborativeDoc', { type: 'content' }, async (ctx) => {
const yContent = await ctx.with('yDocFromMinio', { type: 'content' }, async () => {
return await yDocFromStorage(storageAdapter, workspace, documentId, new YDoc({ gc: false }))
return await yDocFromStorage(ctx, storageAdapter, workspace, documentId, new YDoc({ gc: false }))
})
// the document does not exist
@ -60,7 +60,7 @@ export async function loadCollaborativeDoc (
}
const yHistory = await ctx.with('yDocFromMinio', { type: 'history' }, async () => {
return await yDocFromStorage(storageAdapter, workspace, historyDocumentId, new YDoc())
return await yDocFromStorage(ctx, storageAdapter, workspace, historyDocumentId, new YDoc())
})
// the history document does not exist
@ -98,7 +98,7 @@ export async function saveCollaborativeDocVersion (
await ctx.with('saveCollaborativeDoc', {}, async (ctx) => {
if (versionId === 'HEAD') {
await ctx.with('yDocToMinio', {}, async () => {
await yDocToStorage(storageAdapter, workspace, documentId, ydoc)
await yDocToStorage(ctx, storageAdapter, workspace, documentId, ydoc)
})
} else {
console.warn('Cannot save non HEAD document version', documentId, versionId)
@ -125,7 +125,7 @@ export async function removeCollaborativeDoc (
}
if (toRemove.length > 0) {
await ctx.with('remove', {}, async () => {
await storageAdapter.remove(workspace, toRemove)
await storageAdapter.remove(ctx, workspace, toRemove)
})
}
})
@ -181,7 +181,7 @@ export async function takeCollaborativeDocSnapshot (
await ctx.with('takeCollaborativeDocSnapshot', {}, async (ctx) => {
const yHistory =
(await ctx.with('yDocFromMinio', { type: 'history' }, async () => {
return await yDocFromStorage(storageAdapter, workspace, historyDocumentId, new YDoc({ gc: false }))
return await yDocFromStorage(ctx, storageAdapter, workspace, historyDocumentId, new YDoc({ gc: false }))
})) ?? new YDoc()
await ctx.with('createYdocSnapshot', {}, async () => {
@ -189,7 +189,7 @@ export async function takeCollaborativeDocSnapshot (
})
await ctx.with('yDocToMinio', { type: 'history' }, async () => {
await yDocToStorage(storageAdapter, workspace, historyDocumentId, yHistory)
await yDocToStorage(ctx, storageAdapter, workspace, historyDocumentId, yHistory)
})
})
}

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
import { WorkspaceId } from '@hcengineering/core'
import { MeasureContext, WorkspaceId } from '@hcengineering/core'
import { StorageAdapter } from '@hcengineering/server-core'
import { Doc as YDoc } from 'yjs'
@ -21,6 +21,7 @@ import { yDocFromBuffer, yDocToBuffer } from './ydoc'
/** @public */
export async function yDocFromStorage (
ctx: MeasureContext,
storageAdapter: StorageAdapter,
workspace: WorkspaceId,
minioDocumentId: string,
@ -31,7 +32,7 @@ export async function yDocFromStorage (
ydoc ??= new YDoc({ gc: false })
try {
const buffer = await storageAdapter.read(workspace, minioDocumentId)
const buffer = await storageAdapter.read(ctx, workspace, minioDocumentId)
return yDocFromBuffer(Buffer.concat(buffer), ydoc)
} catch (err: any) {
if (err?.code === 'NoSuchKey' || err?.code === 'NotFound') {
@ -43,12 +44,12 @@ export async function yDocFromStorage (
/** @public */
export async function yDocToStorage (
ctx: MeasureContext,
storageAdapter: StorageAdapter,
workspace: WorkspaceId,
minioDocumentId: string,
ydoc: YDoc
): Promise<void> {
const buffer = yDocToBuffer(ydoc)
const metadata = { 'content-type': 'application/ydoc' }
await storageAdapter.put(workspace, minioDocumentId, buffer, buffer.length, metadata)
await storageAdapter.put(ctx, workspace, minioDocumentId, buffer, 'application/ydoc', buffer.length)
}

View File

@ -39,7 +39,7 @@ export async function removeDocument (
const historyDocumentId = collaborativeHistoryDocId(minioDocumentId)
try {
await minio.remove(workspaceId, [minioDocumentId, historyDocumentId])
await minio.remove(ctx, workspaceId, [minioDocumentId, historyDocumentId])
} catch (err) {
await ctx.error('failed to remove document', { documentId, error: err })
}

View File

@ -57,7 +57,7 @@ export async function takeSnapshot (
const historyDocumentId = collaborativeHistoryDocId(minioDocumentId)
const yHistory =
(await ctx.with('yDocFromMinio', {}, async () => {
return await yDocFromStorage(minio, workspaceId, historyDocumentId)
return await yDocFromStorage(ctx, minio, workspaceId, historyDocumentId)
})) ?? new YDoc()
await ctx.with('createYdocSnapshot', {}, async () => {
@ -67,7 +67,7 @@ export async function takeSnapshot (
})
await ctx.with('yDocToMinio', {}, async () => {
await yDocToStorage(minio, workspaceId, historyDocumentId, yHistory)
await yDocToStorage(ctx, minio, workspaceId, historyDocumentId, yHistory)
})
return { ...version }

View File

@ -32,13 +32,15 @@
"@types/html-to-text": "^8.1.1",
"jest": "^29.7.0",
"ts-jest": "^29.1.1",
"@types/jest": "^29.5.5"
"@types/jest": "^29.5.5",
"@types/uuid": "^8.3.1"
},
"dependencies": {
"@hcengineering/core": "^0.6.28",
"@hcengineering/platform": "^0.6.9",
"@hcengineering/query": "^0.6.8",
"fast-equals": "^2.0.3",
"html-to-text": "^9.0.3"
"html-to-text": "^9.0.3",
"uuid": "^8.3.2"
}
}

View File

@ -33,6 +33,19 @@ import {
} from '@hcengineering/core'
import { type StorageAdapter } from './storage'
/**
* @public
*/
export interface RawDBAdapter {
find: <T extends Doc>(
workspace: WorkspaceId,
domain: Domain,
query: DocumentQuery<T>,
options?: Omit<FindOptions<T>, 'projection' | 'lookup'>
) => Promise<FindResult<T>>
upload: <T extends Doc>(workspace: WorkspaceId, domain: Domain, docs: T[]) => Promise<void>
}
/**
* @public
*/
@ -77,6 +90,7 @@ export interface TxAdapter extends DbAdapter {
* @public
*/
export type DbAdapterFactory = (
ctx: MeasureContext,
hierarchy: Hierarchy,
url: string,
workspaceId: WorkspaceId,

View File

@ -100,12 +100,7 @@ export class ContentRetrievalStage implements FullTextPipelineStage {
// We need retrieve value of attached document content.
const ref = doc.attributes[docKey(val.name, { _class: val.attributeOf })] as Ref<Doc>
if (ref !== undefined && ref !== '') {
let docInfo: any | undefined
try {
docInfo = await this.storageAdapter?.stat(this.workspace, ref)
} catch (err: any) {
// not found.
}
const docInfo: any | undefined = await this.storageAdapter?.stat(this.metrics, this.workspace, ref)
if (docInfo !== undefined && docInfo.size < 30 * 1024 * 1024) {
// We have blob, we need to decode it to string.
const contentType = ((docInfo.metaData['content-type'] as string) ?? '').split(';')[0]
@ -116,7 +111,7 @@ export class ContentRetrievalStage implements FullTextPipelineStage {
if (doc.attributes[digestKey] !== digest) {
;(update as any)[docUpdKey(digestKey)] = digest
const readable = await this.storageAdapter?.get(this.workspace, ref)
const readable = await this.storageAdapter?.get(this.metrics, this.workspace, ref)
if (readable !== undefined) {
let textContent = await this.metrics.with(

View File

@ -115,6 +115,7 @@ class InMemoryAdapter extends DummyDbAdapter implements DbAdapter {
* @public
*/
export async function createInMemoryAdapter (
ctx: MeasureContext,
hierarchy: Hierarchy,
url: string,
workspaceId: WorkspaceId

View File

@ -0,0 +1,197 @@
import core, {
DOMAIN_BLOB_DATA,
generateId,
groupByArray,
type Blob,
type MeasureContext,
type Ref,
type WorkspaceId
} from '@hcengineering/core'
import { type Readable } from 'stream'
import { type RawDBAdapter } from '../adapter'
import { type ListBlobResult, type StorageAdapter, type UploadedObjectInfo } from '../storage'
import { v4 as uuid } from 'uuid'
import { type StorageConfig, type StorageConfiguration } from '../types'
/**
* Perform operations on storage adapter and map required information into BinaryDocument into provided DbAdapter storage.
*/
export class AggregatorStorageAdapter implements StorageAdapter {
constructor (
readonly adapters: Map<string, StorageAdapter>,
readonly defaultAdapter: string, // Adapter will be used to put new documents into
readonly dbAdapter: RawDBAdapter
) {}
async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {
// We need to initialize internal table if it miss documents.
}
async exists (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<boolean> {
for (const a of this.adapters.values()) {
if (!(await a.exists(ctx, workspaceId))) {
return false
}
}
return true
}
async make (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {
for (const a of this.adapters.values()) {
if (!(await a.exists(ctx, workspaceId))) {
await a.make(ctx, workspaceId)
}
}
}
async delete (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {
for (const a of this.adapters.values()) {
if (await a.exists(ctx, workspaceId)) {
await a.delete(ctx, workspaceId)
}
}
}
async remove (ctx: MeasureContext, workspaceId: WorkspaceId, objectNames: string[]): Promise<void> {
const docs = await this.dbAdapter.find<Blob>(workspaceId, DOMAIN_BLOB_DATA, {
_class: core.class.Blob,
_id: { $in: objectNames as Ref<Blob>[] }
})
// Group by provider and delegate into it.
const byProvider = groupByArray(docs, (item) => item.provider)
for (const [k, docs] of byProvider) {
const adapter = this.adapters.get(k)
if (adapter !== undefined) {
await adapter.remove(
ctx,
workspaceId,
docs.map((it) => it._id)
)
}
}
}
async list (ctx: MeasureContext, workspaceId: WorkspaceId, prefix?: string | undefined): Promise<ListBlobResult[]> {
return await this.dbAdapter.find<Blob>(workspaceId, DOMAIN_BLOB_DATA, {
_class: core.class.Blob,
_id: { $regex: `/^${prefix ?? ''}/i` }
})
}
async stat (ctx: MeasureContext, workspaceId: WorkspaceId, name: string): Promise<Blob | undefined> {
return (
await this.dbAdapter.find<Blob>(
workspaceId,
DOMAIN_BLOB_DATA,
{ _class: core.class.Blob, _id: name as Ref<Blob> },
{ limit: 1 }
)
).shift()
}
async get (ctx: MeasureContext, workspaceId: WorkspaceId, name: string): Promise<Readable> {
const { provider, stat } = await this.findProvider(workspaceId, ctx, name)
return await provider.get(ctx, workspaceId, stat.storageId)
}
private async findProvider (
workspaceId: WorkspaceId,
ctx: MeasureContext,
objectName: string
): Promise<{ provider: StorageAdapter, stat: Blob }> {
const stat = (
await this.dbAdapter.find<Blob>(
workspaceId,
DOMAIN_BLOB_DATA,
{ _class: core.class.Blob, _id: objectName as Ref<Blob> },
{ limit: 1 }
)
).shift()
if (stat === undefined) {
throw new Error('No such object found')
}
const provider = this.adapters.get(stat.provider)
if (provider === undefined) {
throw new Error('No such provider found')
}
return { provider, stat }
}
async partial (
ctx: MeasureContext,
workspaceId: WorkspaceId,
objectName: string,
offset: number,
length?: number | undefined
): Promise<Readable> {
const { provider, stat } = await this.findProvider(workspaceId, ctx, objectName)
return await provider.partial(ctx, workspaceId, stat.storageId, offset, length)
}
async read (ctx: MeasureContext, workspaceId: WorkspaceId, name: string): Promise<Buffer[]> {
const { provider, stat } = await this.findProvider(workspaceId, ctx, name)
return await provider.read(ctx, workspaceId, stat.storageId)
}
async put (
ctx: MeasureContext,
workspaceId: WorkspaceId,
objectName: string,
stream: string | Readable | Buffer,
contentType: string,
size?: number | undefined
): Promise<UploadedObjectInfo> {
const provider = this.adapters.get(this.defaultAdapter)
if (provider === undefined) {
throw new Error('No such provider found')
}
const storageId = uuid()
const result = await provider.put(ctx, workspaceId, storageId, stream, contentType, size)
if (size === undefined || size === 0) {
const docStats = await provider.stat(ctx, workspaceId, storageId)
if (docStats !== undefined) {
if (contentType !== docStats.contentType) {
contentType = docStats.contentType
}
size = docStats.size
}
}
const blobDoc: Blob = {
_class: core.class.Blob,
_id: generateId(),
modifiedBy: core.account.System,
modifiedOn: Date.now(),
space: core.space.Configuration,
provider: this.defaultAdapter,
storageId,
size: size ?? 0,
contentType,
etag: result.etag,
version: result.versionId ?? null
}
await this.dbAdapter.upload<Blob>(workspaceId, DOMAIN_BLOB_DATA, [blobDoc])
return result
}
}
/**
* @public
*/
export function buildStorage (
config: StorageConfiguration,
dbAdapter: RawDBAdapter,
storageFactory: (kind: string, config: StorageConfig) => StorageAdapter
): StorageAdapter {
const adapters = new Map<string, StorageAdapter>()
for (const c of config.storages) {
adapters.set(c.name, storageFactory(c.kind, c))
}
return new AggregatorStorageAdapter(adapters, config.default, dbAdapter)
}

View File

@ -35,11 +35,11 @@ import { type DbConfiguration } from '../configuration'
import { createContentAdapter } from '../content'
import { FullTextIndex } from '../fulltext'
import { FullTextIndexPipeline } from '../indexer'
import { createServiceAdaptersManager } from '../service'
import { type StorageAdapter } from '../storage'
import { Triggers } from '../triggers'
import { type ServerStorageOptions } from '../types'
import { TServerStorage } from './storage'
import { createServiceAdaptersManager } from '../service'
/**
* @public
@ -58,7 +58,10 @@ export async function createServerStorage (
for (const key in conf.adapters) {
const adapterConf = conf.adapters[key]
adapters.set(key, await adapterConf.factory(hierarchy, adapterConf.url, conf.workspace, modelDb, storageAdapter))
adapters.set(
key,
await adapterConf.factory(ctx, hierarchy, adapterConf.url, conf.workspace, modelDb, storageAdapter)
)
}
const txAdapter = adapters.get(conf.domains[DOMAIN_TX]) as TxAdapter
@ -187,17 +190,21 @@ export async function createServerStorage (
*/
export function createNullStorageFactory (): StorageAdapter {
return {
exists: async (workspaceId: WorkspaceId) => {
initialize: async (ctx, workspaceId) => {},
exists: async (ctx, workspaceId: WorkspaceId) => {
return false
},
make: async (workspaceId: WorkspaceId) => {},
remove: async (workspaceId: WorkspaceId, objectNames: string[]) => {},
delete: async (workspaceId: WorkspaceId) => {},
list: async (workspaceId: WorkspaceId, prefix?: string) => [],
stat: async (workspaceId: WorkspaceId, objectName: string) => ({}) as any,
get: async (workspaceId: WorkspaceId, objectName: string) => ({}) as any,
put: async (workspaceId: WorkspaceId, objectName: string, stream: any, size?: number, qwe?: any) => ({}) as any,
read: async (workspaceId: WorkspaceId, name: string) => ({}) as any,
partial: async (workspaceId: WorkspaceId, objectName: string, offset: number, length?: number) => ({}) as any
make: async (ctx, workspaceId: WorkspaceId) => {},
remove: async (ctx, workspaceId: WorkspaceId, objectNames: string[]) => {},
delete: async (ctx, workspaceId: WorkspaceId) => {},
list: async (ctx, workspaceId: WorkspaceId, prefix?: string) => [],
stat: async (ctx, workspaceId: WorkspaceId, objectName: string) => ({}) as any,
get: async (ctx, workspaceId: WorkspaceId, objectName: string) => ({}) as any,
put: async (ctx, workspaceId: WorkspaceId, objectName: string, stream: any, contentType: string, size?: number) =>
({}) as any,
read: async (ctx, workspaceId: WorkspaceId, name: string) => ({}) as any,
partial: async (ctx, workspaceId: WorkspaceId, objectName: string, offset: number, length?: number) => ({}) as any
}
}
export { AggregatorStorageAdapter, buildStorage } from './aggregator'

View File

@ -1,51 +1,11 @@
import { type WorkspaceId, toWorkspaceString } from '@hcengineering/core'
import { type Blob, type MeasureContext, type WorkspaceId, toWorkspaceString } from '@hcengineering/core'
import { type Readable } from 'stream'
export interface MetadataItem {
Key: string
Value: string
}
export type BucketItem =
| {
name: string
size: number
etag: string
prefix?: never
lastModified: Date
}
| {
name?: never
etag?: never
lastModified?: never
prefix: string
size: 0
}
export interface BucketItemStat {
size: number
etag: string
lastModified: Date
metaData: ItemBucketMetadata
versionId?: string | null
}
export interface UploadedObjectInfo {
etag: string
versionId: string | null
}
export interface ItemBucketMetadataList {
Items: MetadataItem[]
}
export type ItemBucketMetadata = Record<string, any>
export type BucketItemWithMetadata = BucketItem & {
metadata?: ItemBucketMetadata | ItemBucketMetadataList
}
/**
* @public
*/
export type WorkspaceItem = Required<BucketItem> & { metaData: ItemBucketMetadata }
/**
* @public
*/
@ -53,22 +13,33 @@ export function getBucketId (workspaceId: WorkspaceId): string {
return toWorkspaceString(workspaceId, '.')
}
export interface StorageAdapter {
exists: (workspaceId: WorkspaceId) => Promise<boolean>
export type ListBlobResult = Omit<Blob, 'contentType' | 'version'>
make: (workspaceId: WorkspaceId) => Promise<void>
remove: (workspaceId: WorkspaceId, objectNames: string[]) => Promise<void>
delete: (workspaceId: WorkspaceId) => Promise<void>
list: (workspaceId: WorkspaceId, prefix?: string) => Promise<WorkspaceItem[]>
stat: (workspaceId: WorkspaceId, objectName: string) => Promise<BucketItemStat>
get: (workspaceId: WorkspaceId, objectName: string) => Promise<Readable>
export interface StorageAdapter {
initialize: (ctx: MeasureContext, workspaceId: WorkspaceId) => Promise<void>
exists: (ctx: MeasureContext, workspaceId: WorkspaceId) => Promise<boolean>
make: (ctx: MeasureContext, workspaceId: WorkspaceId) => Promise<void>
delete: (ctx: MeasureContext, workspaceId: WorkspaceId) => Promise<void>
remove: (ctx: MeasureContext, workspaceId: WorkspaceId, objectNames: string[]) => Promise<void>
list: (ctx: MeasureContext, workspaceId: WorkspaceId, prefix?: string) => Promise<ListBlobResult[]>
stat: (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string) => Promise<Blob | undefined>
get: (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string) => Promise<Readable>
put: (
ctx: MeasureContext,
workspaceId: WorkspaceId,
objectName: string,
stream: Readable | Buffer | string,
size?: number,
metaData?: ItemBucketMetadata
contentType: string,
size?: number
) => Promise<UploadedObjectInfo>
read: (workspaceId: WorkspaceId, name: string) => Promise<Buffer[]>
partial: (workspaceId: WorkspaceId, objectName: string, offset: number, length?: number) => Promise<Readable>
read: (ctx: MeasureContext, workspaceId: WorkspaceId, name: string) => Promise<Buffer[]>
partial: (
ctx: MeasureContext,
workspaceId: WorkspaceId,
objectName: string,
offset: number,
length?: number
) => Promise<Readable>
}

View File

@ -41,9 +41,9 @@ import {
type WorkspaceIdWithUrl
} from '@hcengineering/core'
import type { Asset, Resource } from '@hcengineering/platform'
import { type StorageAdapter } from './storage'
import { type Readable } from 'stream'
import { type ServiceAdaptersManager } from './service'
import { type StorageAdapter } from './storage'
/**
* @public
@ -426,3 +426,13 @@ export interface ServiceAdapterConfig {
db: string
url: string
}
export interface StorageConfig {
name: string
kind: string
}
export interface StorageConfiguration {
default: string
storages: StorageConfig[]
}

View File

@ -1,5 +1,5 @@
import { DbAdapter } from '@hcengineering/server-core'
import { Domain, getWorkspaceId, Hierarchy } from '@hcengineering/core'
import { Domain, getWorkspaceId, Hierarchy, MeasureMetricsContext } from '@hcengineering/core'
import { createElasticBackupDataAdapter } from '../backup'
import { Client } from '@elastic/elasticsearch'
@ -11,7 +11,12 @@ describe('Elastic Data Adapter', () => {
let adapter: DbAdapter
beforeEach(async () => {
adapter = await createElasticBackupDataAdapter(new Hierarchy(), url, getWorkspaceId('ws1', ''))
adapter = await createElasticBackupDataAdapter(
new MeasureMetricsContext('test', {}),
new Hierarchy(),
url,
getWorkspaceId('ws1', '')
)
})
afterEach(async () => {

View File

@ -298,6 +298,7 @@ class ElasticDataAdapter implements DbAdapter {
* @public
*/
export async function createElasticBackupDataAdapter (
ctx: MeasureContext,
hierarchy: Hierarchy,
url: string,
workspaceId: WorkspaceId

View File

@ -51,11 +51,13 @@
"cors": "^2.8.5",
"@hcengineering/elastic": "^0.6.0",
"@hcengineering/server-core": "^0.6.1",
"@hcengineering/server": "^0.6.4",
"@hcengineering/server-token": "^0.6.7",
"@hcengineering/attachment": "^0.6.9",
"body-parser": "^1.20.2",
"sharp": "~0.32.0",
"@hcengineering/minio": "^0.6.0",
"@hcengineering/mongo": "^0.6.1",
"morgan": "^1.10.0"
}
}

View File

@ -15,7 +15,7 @@
//
import { MeasureContext, WorkspaceId, metricsAggregate } from '@hcengineering/core'
import { MinioService } from '@hcengineering/minio'
import { StorageAdapter } from '@hcengineering/server-core'
import { Token, decodeToken } from '@hcengineering/server-token'
import bp from 'body-parser'
import cors from 'cors'
@ -35,19 +35,16 @@ const cacheControlNoCache = 'max-age=1d, no-cache, must-revalidate'
async function minioUpload (
ctx: MeasureContext,
minio: MinioService,
storageAdapter: StorageAdapter,
workspace: WorkspaceId,
file: UploadedFile
): Promise<string> {
const id = uuid()
const meta: any = {
'Content-Type': file.mimetype
}
const resp = await ctx.with(
'storage upload',
{ workspace: workspace.name },
async () => await minio.put(workspace, id, file.data, file.size, meta),
async () => await storageAdapter.put(ctx, workspace, id, file.data, file.mimetype, file.size),
{ file: file.name, contentType: file.mimetype }
)
@ -76,13 +73,17 @@ function getRange (range: string, size: number): [number, number] {
async function getFileRange (
ctx: MeasureContext,
range: string,
client: MinioService,
client: StorageAdapter,
workspace: WorkspaceId,
uuid: string,
res: Response
): Promise<void> {
const stat = await ctx.with('stats', {}, async () => await client.stat(workspace, uuid))
const stat = await ctx.with('stats', {}, async () => await client.stat(ctx, workspace, uuid))
if (stat === undefined) {
await ctx.error('No such key', { file: uuid })
res.status(404).send()
return
}
const size: number = stat.size
const [start, end] = getRange(range, size)
@ -97,13 +98,13 @@ async function getFileRange (
await ctx.with(
'write',
{ contentType: stat.metaData['content-type'] },
{ contentType: stat.contentType },
async (ctx) => {
try {
const dataStream = await ctx.with(
'partial',
{},
async () => await client.partial(workspace, uuid, start, end - start + 1),
async () => await client.partial(ctx, workspace, uuid, start, end - start + 1),
{}
)
res.writeHead(206, {
@ -111,9 +112,9 @@ async function getFileRange (
'Content-Range': `bytes ${start}-${end}/${size}`,
'Accept-Ranges': 'bytes',
'Content-Length': end - start + 1,
'Content-Type': stat.metaData['content-type'],
'Content-Type': stat.contentType,
Etag: stat.etag,
'Last-Modified': stat.lastModified.toISOString()
'Last-Modified': new Date(stat.modifiedOn).toISOString()
})
dataStream.pipe(res)
@ -144,33 +145,38 @@ async function getFileRange (
res.status(500).send()
}
},
{ ...stat.metaData, uuid, start, end: end - start + 1, ...stat.metaData }
{ uuid, start, end: end - start + 1 }
)
}
async function getFile (
ctx: MeasureContext,
client: MinioService,
client: StorageAdapter,
workspace: WorkspaceId,
uuid: string,
req: Request,
res: Response
): Promise<void> {
const stat = await ctx.with('stat', {}, async () => await client.stat(workspace, uuid))
const stat = await ctx.with('stat', {}, async () => await client.stat(ctx, workspace, uuid))
if (stat === undefined) {
await ctx.error('No such key', { file: req.query.file })
res.status(404).send()
return
}
const etag = stat.etag
if (
preConditions.IfNoneMatch(req.headers, { etag }) === 'notModified' ||
preConditions.IfMatch(req.headers, { etag }) === 'notModified' ||
preConditions.IfModifiedSince(req.headers, { lastModified: stat.lastModified }) === 'notModified'
preConditions.IfModifiedSince(req.headers, { lastModified: new Date(stat.modifiedOn) }) === 'notModified'
) {
// Matched, return not modified
res.statusCode = 304
res.end()
return
}
if (preConditions.IfUnmodifiedSince(req.headers, { lastModified: stat.lastModified }) === 'failed') {
if (preConditions.IfUnmodifiedSince(req.headers, { lastModified: new Date(stat.modifiedOn) }) === 'failed') {
// Send 412 (Precondition Failed)
res.statusCode = 412
res.end()
@ -179,14 +185,14 @@ async function getFile (
await ctx.with(
'write',
{ contentType: stat.metaData['content-type'] },
{ contentType: stat.contentType },
async (ctx) => {
try {
const dataStream = await ctx.with('readable', {}, async () => await client.get(workspace, uuid))
const dataStream = await ctx.with('readable', {}, async () => await client.get(ctx, workspace, uuid))
res.writeHead(200, {
'Content-Type': stat.metaData['content-type'],
'Content-Type': stat.contentType,
Etag: stat.etag,
'Last-Modified': stat.lastModified.toISOString(),
'Last-Modified': new Date(stat.modifiedOn).toISOString(),
'Cache-Control': cacheControlValue
})
@ -210,7 +216,7 @@ async function getFile (
res.status(500).send()
}
},
{ ...stat.metaData }
{}
)
}
@ -223,7 +229,7 @@ export function start (
config: {
transactorEndpoint: string
elasticUrl: string
minio: MinioService
storageAdapter: StorageAdapter
accountsUrl: string
uploadUrl: string
modelVersion: string
@ -325,8 +331,13 @@ export function start (
let uuid = req.query.file as string
const size = req.query.size as 'inline' | 'tiny' | 'x-small' | 'small' | 'medium' | 'large' | 'x-large' | 'full'
uuid = await getResizeID(size, uuid, config, payload)
const stat = await config.minio.stat(payload.workspace, uuid)
uuid = await getResizeID(ctx, size, uuid, config, payload)
const stat = await config.storageAdapter.stat(ctx, payload.workspace, uuid)
if (stat === undefined) {
await ctx.error('No such key', { file: req.query.file })
res.status(404).send()
return
}
const fileSize = stat.size
@ -334,14 +345,14 @@ export function start (
'accept-ranges': 'bytes',
'content-length': fileSize,
Etag: stat.etag,
'Last-Modified': stat.lastModified.toISOString()
'Last-Modified': new Date(stat.modifiedOn).toISOString()
})
res.status(200)
res.end()
} catch (error: any) {
if (error?.code === 'NoSuchKey' || error?.code === 'NotFound') {
console.log('No such key', req.query.file)
await ctx.error('No such key', { file: req.query.file })
res.status(404).send()
return
} else {
@ -390,9 +401,9 @@ export function start (
const d = await ctx.with(
'notoken-stat',
{ workspace: payload.workspace.name },
async () => await config.minio.stat(payload.workspace, uuid)
async () => await config.storageAdapter.stat(ctx, payload.workspace, uuid)
)
if (!((d.metaData['content-type'] as string) ?? '').includes('image')) {
if (d !== undefined && !(d.contentType ?? '').includes('image')) {
// Do not allow to return non images with no token.
if (token === undefined) {
res.status(403).send()
@ -404,19 +415,19 @@ export function start (
const size = req.query.size as 'inline' | 'tiny' | 'x-small' | 'small' | 'medium' | 'large' | 'x-large' | 'full'
uuid = await ctx.with('resize', {}, async () => await getResizeID(size, uuid, config, payload))
uuid = await ctx.with('resize', {}, async () => await getResizeID(ctx, size, uuid, config, payload))
const range = req.headers.range
if (range !== undefined) {
await ctx.with('file-range', { workspace: payload.workspace.name }, async (ctx) => {
await getFileRange(ctx, range, config.minio, payload.workspace, uuid, res)
await getFileRange(ctx, range, config.storageAdapter, payload.workspace, uuid, res)
})
} else {
await ctx.with(
'file',
{ workspace: payload.workspace.name },
async (ctx) => {
await getFile(ctx, config.minio, payload.workspace, uuid, req, res)
await getFile(ctx, config.storageAdapter, payload.workspace, uuid, req, res)
},
{ uuid }
)
@ -479,7 +490,7 @@ export function start (
try {
const token = authHeader.split(' ')[1]
const payload = decodeToken(token)
const uuid = await minioUpload(ctx, config.minio, payload.workspace, file)
const uuid = await minioUpload(ctx, config.storageAdapter, payload.workspace, file)
res.status(200).send(uuid)
} catch (error: any) {
@ -508,13 +519,16 @@ export function start (
}
// TODO: We need to allow delete only of user attached documents. (https://front.hc.engineering/workbench/platform/tracker/TSK-1081)
await config.minio.remove(payload.workspace, [uuid])
await config.storageAdapter.remove(ctx, payload.workspace, [uuid])
const extra = await config.minio.list(payload.workspace, uuid)
// TODO: Add support for related documents.
// TODO: Move support of image resize/format change to separate place.
const extra = await config.storageAdapter.list(ctx, payload.workspace, uuid)
if (extra.length > 0) {
await config.minio.remove(
await config.storageAdapter.remove(
ctx,
payload.workspace,
Array.from(extra.entries()).map((it) => it[1].name)
Array.from(extra.entries()).map((it) => it[1]._id)
)
}
@ -570,10 +584,7 @@ export function start (
return
}
const id = uuid()
const contentType = response.headers['content-type']
const meta = {
'Content-Type': contentType
}
const contentType = response.headers['content-type'] ?? 'application/octet-stream'
const data: Buffer[] = []
response
.on('data', function (chunk) {
@ -581,8 +592,8 @@ export function start (
})
.on('end', function () {
const buffer = Buffer.concat(data)
config.minio
.put(payload.workspace, id, buffer, 0, meta)
config.storageAdapter
.put(ctx, payload.workspace, id, buffer, contentType, buffer.length)
.then(async (objInfo) => {
console.log('uploaded uuid', id, objInfo.etag)
@ -649,9 +660,6 @@ export function start (
}
const id = uuid()
const contentType = response.headers['content-type']
const meta = {
'Content-Type': contentType
}
const data: Buffer[] = []
response
.on('data', function (chunk) {
@ -660,8 +668,8 @@ export function start (
.on('end', function () {
const buffer = Buffer.concat(data)
// eslint-disable-next-line @typescript-eslint/no-misused-promises
config.minio
.put(payload.workspace, id, buffer, 0, meta)
config.storageAdapter
.put(ctx, payload.workspace, id, buffer, contentType ?? 'application/octet-stream', buffer.length)
.then(async () => {
console.log('uploaded uuid', id)
@ -753,9 +761,10 @@ export function start (
// | '2x-large'
// | 'full'
async function getResizeID (
ctx: MeasureContext,
size: string,
uuid: string,
config: { minio: MinioService },
config: { storageAdapter: StorageAdapter },
payload: Token
): Promise<string> {
if (size !== undefined && size !== 'full') {
@ -784,7 +793,7 @@ async function getResizeID (
let hasSmall = false
const sizeId = uuid + `%size%${width}`
try {
const d = await config.minio.stat(payload.workspace, sizeId)
const d = await config.storageAdapter.stat(ctx, payload.workspace, sizeId)
hasSmall = d !== undefined && d.size > 0
} catch (err: any) {
if (err.code !== 'NotFound') {
@ -796,7 +805,7 @@ async function getResizeID (
uuid = sizeId
} else {
// Let's get data and resize it
const data = Buffer.concat(await config.minio.read(payload.workspace, uuid))
const data = Buffer.concat(await config.storageAdapter.read(ctx, payload.workspace, uuid))
const dataBuff = await sharp(data)
.resize({
@ -804,9 +813,9 @@ async function getResizeID (
})
.jpeg()
.toBuffer()
await config.minio.put(payload.workspace, sizeId, dataBuff, dataBuff.length, {
'Content-Type': 'image/jpeg'
})
// Add support of avif as well.
await config.storageAdapter.put(ctx, payload.workspace, sizeId, dataBuff, 'image/jpeg', dataBuff.length)
uuid = sizeId
}
}

View File

@ -15,8 +15,9 @@
//
import { MeasureContext } from '@hcengineering/core'
import { MinioService } from '@hcengineering/minio'
import { setMetadata } from '@hcengineering/platform'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server'
import { StorageConfiguration } from '@hcengineering/server-core'
import serverToken from '@hcengineering/server-token'
import { start } from '.'
@ -31,37 +32,20 @@ export function startFront (ctx: MeasureContext, extraConfig?: Record<string, st
process.exit(1)
}
const url = process.env.MONGO_URL
if (url === undefined) {
console.error('please provide mongodb url')
process.exit(1)
}
const elasticUrl = process.env.ELASTIC_URL
if (elasticUrl === undefined) {
console.error('please provide elastic url')
process.exit(1)
}
const minioEndpoint = process.env.MINIO_ENDPOINT
if (minioEndpoint === undefined) {
console.error('please provide minio endpoint')
process.exit(1)
}
const minioAccessKey = process.env.MINIO_ACCESS_KEY
if (minioAccessKey === undefined) {
console.error('please provide minio access key')
process.exit(1)
}
const minioSecretKey = process.env.MINIO_SECRET_KEY
if (minioSecretKey === undefined) {
console.error('please provide minio secret key')
process.exit(1)
}
const minio = new MinioService({
endPoint: minioEndpoint,
port: 9000,
useSSL: false,
accessKey: minioAccessKey,
secretKey: minioSecretKey
})
const storageConfig: StorageConfiguration = storageConfigFromEnv()
const storageAdapter = buildStorageFromConfig(storageConfig, url)
const accountsUrl = process.env.ACCOUNTS_URL
if (accountsUrl === undefined) {
@ -132,7 +116,7 @@ export function startFront (ctx: MeasureContext, extraConfig?: Record<string, st
const config = {
transactorEndpoint,
elasticUrl,
minio,
storageAdapter,
accountsUrl,
uploadUrl,
modelVersion,

View File

@ -13,11 +13,17 @@
// limitations under the License.
//
import { Client, type BucketItemStat, type ItemBucketMetadata, type UploadedObjectInfo } from 'minio'
import { Client, type UploadedObjectInfo } from 'minio'
import { toWorkspaceString, type WorkspaceId } from '@hcengineering/core'
import core, {
toWorkspaceString,
type Blob,
type MeasureContext,
type Ref,
type WorkspaceId
} from '@hcengineering/core'
import { type StorageAdapter, type WorkspaceItem } from '@hcengineering/server-core'
import { type ListBlobResult, type StorageAdapter, type StorageConfig } from '@hcengineering/server-core'
import { type Readable } from 'stream'
/**
@ -27,39 +33,63 @@ export function getBucketId (workspaceId: WorkspaceId): string {
return toWorkspaceString(workspaceId, '.')
}
export interface MinioConfig extends StorageConfig {
kind: 'minio'
region: string
endpoint: string
accessKeyId: string
secretAccessKey: string
port: number
useSSL: boolean
}
/**
* @public
*/
export class MinioService implements StorageAdapter {
static config = 'minio'
client: Client
constructor (opt: { endPoint: string, port: number, accessKey: string, secretKey: string, useSSL: boolean }) {
this.client = new Client(opt)
}
async exists (workspaceId: WorkspaceId): Promise<boolean> {
async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {}
async exists (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<boolean> {
return await this.client.bucketExists(getBucketId(workspaceId))
}
async make (workspaceId: WorkspaceId): Promise<void> {
async make (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {
await this.client.makeBucket(getBucketId(workspaceId), 'k8s')
}
async remove (workspaceId: WorkspaceId, objectNames: string[]): Promise<void> {
async remove (ctx: MeasureContext, workspaceId: WorkspaceId, objectNames: string[]): Promise<void> {
await this.client.removeObjects(getBucketId(workspaceId), objectNames)
}
async delete (workspaceId: WorkspaceId): Promise<void> {
async delete (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {
await this.client.removeBucket(getBucketId(workspaceId))
}
async list (workspaceId: WorkspaceId, prefix?: string): Promise<WorkspaceItem[]> {
async list (ctx: MeasureContext, workspaceId: WorkspaceId, prefix?: string): Promise<ListBlobResult[]> {
try {
const items = new Map<string, WorkspaceItem>()
const items = new Map<string, ListBlobResult>()
const list = this.client.listObjects(getBucketId(workspaceId), prefix, true)
await new Promise((resolve, reject) => {
list.on('data', (data) => {
if (data.name !== undefined) {
items.set(data.name, { metaData: {}, ...data } as any)
items.set(data.name, {
_id: data.name as Ref<Blob>,
_class: core.class.Blob,
etag: data.etag,
size: data.size,
provider: '',
space: core.space.Configuration,
modifiedBy: core.account.ConfigUser,
modifiedOn: data.lastModified.getTime(),
storageId: data.name
})
}
})
list.on('end', () => {
@ -67,6 +97,7 @@ export class MinioService implements StorageAdapter {
resolve(null)
})
list.on('error', (err) => {
list.destroy()
reject(err)
})
})
@ -80,25 +111,41 @@ export class MinioService implements StorageAdapter {
}
}
async stat (workspaceId: WorkspaceId, objectName: string): Promise<BucketItemStat> {
return await this.client.statObject(getBucketId(workspaceId), objectName)
async stat (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<Blob> {
const result = await this.client.statObject(getBucketId(workspaceId), objectName)
return {
provider: '',
_class: core.class.Blob,
_id: objectName as Ref<Blob>,
storageId: objectName,
contentType: result.metaData['content-type'],
size: result.size,
etag: result.etag,
space: core.space.Configuration,
modifiedBy: core.account.System,
modifiedOn: result.lastModified.getTime(),
version: result.versionId ?? null
}
}
async get (workspaceId: WorkspaceId, objectName: string): Promise<Readable> {
async get (ctx: MeasureContext, workspaceId: WorkspaceId, objectName: string): Promise<Readable> {
return await this.client.getObject(getBucketId(workspaceId), objectName)
}
async put (
ctx: MeasureContext,
workspaceId: WorkspaceId,
objectName: string,
stream: Readable | Buffer | string,
size?: number,
metaData?: ItemBucketMetadata
contentType: string,
size?: number
): Promise<UploadedObjectInfo> {
return await this.client.putObject(getBucketId(workspaceId), objectName, stream, size, metaData)
return await this.client.putObject(getBucketId(workspaceId), objectName, stream, size, {
'Content-Type': contentType
})
}
async read (workspaceId: WorkspaceId, name: string): Promise<Buffer[]> {
async read (ctx: MeasureContext, workspaceId: WorkspaceId, name: string): Promise<Buffer[]> {
const data = await this.client.getObject(getBucketId(workspaceId), name)
const chunks: Buffer[] = []
@ -122,7 +169,13 @@ export class MinioService implements StorageAdapter {
return chunks
}
async partial (workspaceId: WorkspaceId, objectName: string, offset: number, length?: number): Promise<Readable> {
async partial (
ctx: MeasureContext,
workspaceId: WorkspaceId,
objectName: string,
offset: number,
length?: number
): Promise<Readable> {
return await this.client.getPartialObject(getBucketId(workspaceId), objectName, offset, length)
}
}

View File

@ -54,6 +54,7 @@ const txes = genMinModel()
createTaskModel(txes)
async function createNullAdapter (
ctx: MeasureContext,
hierarchy: Hierarchy,
url: string,
db: WorkspaceId,
@ -116,7 +117,13 @@ describe('mongo operations', () => {
}
const mctx = new MeasureMetricsContext('', {})
const txStorage = await createMongoTxAdapter(hierarchy, mongodbUri, getWorkspaceId(dbId, ''), model)
const txStorage = await createMongoTxAdapter(
new MeasureMetricsContext('', {}),
hierarchy,
mongodbUri,
getWorkspaceId(dbId, ''),
model
)
// Put all transactions to Tx
for (const t of txes) {

View File

@ -14,5 +14,6 @@
// limitations under the License.
//
export * from './rawAdapter'
export * from './storage'
export * from './utils'

View File

@ -0,0 +1,84 @@
import {
SortingOrder,
cutObjectArray,
toFindResult,
type Doc,
type DocumentQuery,
type Domain,
type FindOptions,
type FindResult,
type WorkspaceId
} from '@hcengineering/core'
import type { RawDBAdapter } from '@hcengineering/server-core'
import { type Document, type Filter, type Sort } from 'mongodb'
import { toArray, uploadDocuments } from './storage'
import { getMongoClient, getWorkspaceDB } from './utils'
export function createRawMongoDBAdapter (url: string): RawDBAdapter {
const client = getMongoClient(url)
const collectSort = (options: FindOptions<Doc>): Sort | undefined => {
if (options?.sort === undefined) {
return undefined
}
const sort: Sort = {}
let count = 0
for (const key in options.sort) {
const order = options.sort[key] === SortingOrder.Ascending ? 1 : -1
sort[key] = order
count++
}
if (count === 0) {
return undefined
}
return sort
}
return {
find: async function <T extends Doc>(
workspace: WorkspaceId,
domain: Domain,
query: DocumentQuery<T>,
options?: Omit<FindOptions<T>, 'projection' | 'lookup'>
): Promise<FindResult<T>> {
const db = getWorkspaceDB(await client.getClient(), workspace)
const coll = db.collection(domain)
let cursor = coll.find<T>(query as Filter<Document>, {
checkKeys: false,
enableUtf8Validation: false
})
let total: number = -1
if (options != null) {
if (options.sort !== undefined) {
const sort = collectSort(options)
if (sort !== undefined) {
cursor = cursor.sort(sort)
}
}
if (options.limit !== undefined || typeof query._id === 'string') {
if (options.total === true) {
total = await coll.countDocuments(query)
}
cursor = cursor.limit(options.limit ?? 1)
}
}
// Error in case of timeout
try {
const res = await toArray<T>(cursor)
if (options?.total === true && options?.limit === undefined) {
total = res.length
}
return toFindResult(res, total)
} catch (e) {
console.error('error during executing cursor in findAll', cutObjectArray(query), options, e)
throw e
}
},
upload: async (workspace, domain, docs) => {
const db = getWorkspaceDB(await client.getClient(), workspace)
const coll = db.collection(domain)
await uploadDocuments(docs, coll)
}
}
}

View File

@ -100,6 +100,67 @@ interface LookupStep {
pipeline?: any
}
export async function toArray<T> (cursor: AbstractCursor<T>): Promise<T[]> {
const data = await cursor.toArray()
await cursor.close()
return data
}
/**
* Return some estimation for object size
*/
function calcSize (obj: any): number {
if (typeof obj === 'undefined') {
return 0
}
if (typeof obj === 'function') {
return 0
}
let result = 0
for (const key in obj) {
// include prototype properties
const value = obj[key]
const type = getTypeOf(value)
result += key.length
switch (type) {
case 'Array':
result += 4 + calcSize(value)
break
case 'Object':
result += calcSize(value)
break
case 'Date':
result += 24 // Some value
break
case 'string':
result += (value as string).length
break
case 'number':
result += 8
break
case 'boolean':
result += 1
break
case 'symbol':
result += (value as symbol).toString().length
break
case 'bigint':
result += (value as bigint).toString().length
break
case 'undefined':
result += 1
break
case 'null':
result += 1
break
default:
result += value.toString().length
}
}
return result
}
abstract class MongoAdapterBase implements DbAdapter {
constructor (
protected readonly db: Db,
@ -110,12 +171,6 @@ abstract class MongoAdapterBase implements DbAdapter {
async init (): Promise<void> {}
async toArray<T>(cursor: AbstractCursor<T>): Promise<T[]> {
const data = await cursor.toArray()
await cursor.close()
return data
}
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {
for (const vv of config.indexes) {
try {
@ -469,7 +524,7 @@ abstract class MongoAdapterBase implements DbAdapter {
let result: WithLookup<T>[] = []
let total = options?.total === true ? 0 : -1
try {
result = (await ctx.with('toArray', {}, async (ctx) => await this.toArray(cursor), {
result = (await ctx.with('toArray', {}, async (ctx) => await toArray(cursor), {
domain,
pipeline
})) as any[]
@ -489,7 +544,7 @@ abstract class MongoAdapterBase implements DbAdapter {
checkKeys: false,
enableUtf8Validation: false
})
const arr = await this.toArray(totalCursor)
const arr = await toArray(totalCursor)
total = arr?.[0]?.total ?? 0
}
return toFindResult(this.stripHash(result), total)
@ -620,7 +675,7 @@ abstract class MongoAdapterBase implements DbAdapter {
// Error in case of timeout
try {
const res: T[] = await ctx.with('toArray', {}, async (ctx) => await this.toArray(cursor), {
const res: T[] = await ctx.with('toArray', {}, async (ctx) => await toArray(cursor), {
mongoQuery,
options,
domain
@ -729,7 +784,7 @@ abstract class MongoAdapterBase implements DbAdapter {
}
const pos = (digest ?? '').indexOf('|')
if (digest == null || digest === '' || pos === -1) {
const size = this.calcSize(d)
const size = calcSize(d)
digest = hashID // we just need some random value
bulkUpdate.set(d._id, `${digest}|${size.toString(16)}`)
@ -755,95 +810,19 @@ abstract class MongoAdapterBase implements DbAdapter {
}
}
/**
* Return some estimation for object size
*/
calcSize (obj: any): number {
if (typeof obj === 'undefined') {
return 0
}
if (typeof obj === 'function') {
return 0
}
let result = 0
for (const key in obj) {
// include prototype properties
const value = obj[key]
const type = getTypeOf(value)
result += key.length
switch (type) {
case 'Array':
result += 4 + this.calcSize(value)
break
case 'Object':
result += this.calcSize(value)
break
case 'Date':
result += 24 // Some value
break
case 'string':
result += (value as string).length
break
case 'number':
result += 8
break
case 'boolean':
result += 1
break
case 'symbol':
result += (value as symbol).toString().length
break
case 'bigint':
result += (value as bigint).toString().length
break
case 'undefined':
result += 1
break
case 'null':
result += 1
break
default:
result += value.toString().length
}
}
return result
}
async load (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
if (docs.length === 0) {
return []
}
const cursor = this.db.collection<Doc>(domain).find<Doc>({ _id: { $in: docs } }, { limit: docs.length })
const result = await this.toArray(cursor)
const result = await toArray(cursor)
return this.stripHash(this.stripHash(result))
}
async upload (domain: Domain, docs: Doc[]): Promise<void> {
const coll = this.db.collection(domain)
const ops = Array.from(docs)
while (ops.length > 0) {
const part = ops.splice(0, 500)
await coll.bulkWrite(
part.map((it) => {
const digest: string | null = (it as any)['%hash%']
if ('%hash%' in it) {
delete it['%hash%']
}
const size = this.calcSize(it)
return {
replaceOne: {
filter: { _id: it._id },
replacement: { ...it, '%hash%': digest == null ? null : `${digest}|${size.toString(16)}` },
upsert: true
}
}
})
)
}
await uploadDocuments(docs, coll)
}
async update (domain: Domain, operations: Map<Ref<Doc>, DocumentUpdate<Doc>>): Promise<void> {
@ -1290,7 +1269,7 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
.collection(DOMAIN_TX)
.find<Tx>({ objectSpace: core.space.Model })
.sort({ _id: 1, modifiedOn: 1 })
const model = await this.toArray(cursor)
const model = await toArray(cursor)
// We need to put all core.account.System transactions first
const systemTx: Tx[] = []
const userTx: Tx[] = []
@ -1311,6 +1290,31 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
}
}
export async function uploadDocuments (docs: Doc[], coll: Collection<Document>): Promise<void> {
const ops = Array.from(docs)
while (ops.length > 0) {
const part = ops.splice(0, 500)
await coll.bulkWrite(
part.map((it) => {
const digest: string | null = (it as any)['%hash%']
if ('%hash%' in it) {
delete it['%hash%']
}
const size = calcSize(it)
return {
replaceOne: {
filter: { _id: it._id },
replacement: { ...it, '%hash%': digest == null ? null : `${digest}|${size.toString(16)}` },
upsert: true
}
}
})
)
}
}
function fillEnumSort (
enumOf: Enum,
key: string,
@ -1402,6 +1406,7 @@ function translateLikeQuery (pattern: string): { $regex: string, $options: strin
* @public
*/
export async function createMongoAdapter (
ctx: MeasureContext,
hierarchy: Hierarchy,
url: string,
workspaceId: WorkspaceId,
@ -1417,6 +1422,7 @@ export async function createMongoAdapter (
* @public
*/
export async function createMongoTxAdapter (
ctx: MeasureContext,
hierarchy: Hierarchy,
url: string,
workspaceId: WorkspaceId,

View File

@ -41,6 +41,7 @@
"@hcengineering/server-core": "^0.6.1",
"@hcengineering/server-ws": "^0.6.11",
"@hcengineering/mongo": "^0.6.1",
"@hcengineering/minio": "^0.6.0",
"@hcengineering/elastic": "^0.6.0",
"elastic-apm-node": "~3.26.0",
"@hcengineering/server-token": "^0.6.7",

View File

@ -21,3 +21,4 @@ export * from './backup'
export * from './metrics'
export * from './rekoni'
export * from './ydoc'
export * from './starter'

View File

@ -33,12 +33,13 @@ import core, {
TxResult,
WorkspaceId
} from '@hcengineering/core'
import { DbAdapter, StorageAdapter, WorkspaceItem } from '@hcengineering/server-core'
import { DbAdapter, ListBlobResult, StorageAdapter } from '@hcengineering/server-core'
class StorageBlobAdapter implements DbAdapter {
constructor (
readonly workspaceId: WorkspaceId,
readonly client: StorageAdapter
readonly client: StorageAdapter,
readonly ctx: MeasureContext
) {}
async findAll<T extends Doc>(
@ -63,18 +64,18 @@ class StorageBlobAdapter implements DbAdapter {
find (domain: Domain): StorageIterator {
let listReceived = false
let items: WorkspaceItem[] = []
let items: ListBlobResult[] = []
let pos = 0
return {
next: async () => {
if (!listReceived) {
items = await this.client.list(this.workspaceId)
items = await this.client.list(this.ctx, this.workspaceId)
listReceived = true
}
if (pos < items?.length) {
const item = items[pos]
const result = {
id: item.name,
id: item._id,
hash: item.etag,
size: item.size
}
@ -89,17 +90,20 @@ class StorageBlobAdapter implements DbAdapter {
async load (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
const result: Doc[] = []
for (const item of docs) {
const stat = await this.client.stat(this.workspaceId, item)
const chunks: Buffer[] = await this.client.read(this.workspaceId, item)
const stat = await this.client.stat(this.ctx, this.workspaceId, item)
if (stat === undefined) {
throw new Error(`Could not find blob ${item}`)
}
const chunks: Buffer[] = await this.client.read(this.ctx, this.workspaceId, item)
const final = Buffer.concat(chunks)
const dta: BlobData = {
_id: item as Ref<BlobData>,
_class: core.class.BlobData,
name: item as string,
size: stat.size,
type: stat.metaData['content-type'],
type: stat.contentType,
space: 'blob' as Ref<Space>,
modifiedOn: stat.lastModified.getTime(),
modifiedOn: stat.modifiedOn,
modifiedBy: core.account.System,
base64Data: final.toString('base64')
}
@ -118,20 +122,19 @@ class StorageBlobAdapter implements DbAdapter {
const blob = d as unknown as BlobData
// Remove existing document
try {
await this.client.remove(this.workspaceId, [blob._id])
await this.client.remove(this.ctx, this.workspaceId, [blob._id])
} catch (ee) {
// ignore error
}
const buffer = Buffer.from(blob.base64Data, 'base64')
await this.client.put(this.workspaceId, blob._id, buffer, buffer.length, {
'Content-Type': blob.type,
lastModified: new Date(blob.modifiedOn)
})
// TODO: Add support of
/// lastModified: new Date(blob.modifiedOn)
await this.client.put(this.ctx, this.workspaceId, blob._id, buffer, blob.type, buffer.length)
}
}
async clean (domain: Domain, docs: Ref<Doc>[]): Promise<void> {
await this.client.remove(this.workspaceId, docs)
await this.client.remove(this.ctx, this.workspaceId, docs)
}
async update (domain: Domain, operations: Map<Ref<Doc>, DocumentUpdate<Doc>>): Promise<void> {
@ -143,6 +146,7 @@ class StorageBlobAdapter implements DbAdapter {
* @public
*/
export async function createStorageDataAdapter (
ctx: MeasureContext,
hierarchy: Hierarchy,
url: string,
workspaceId: WorkspaceId,
@ -154,9 +158,9 @@ export async function createStorageDataAdapter (
}
// We need to create bucket if it doesn't exist
if (storage !== undefined) {
if (!(await storage.exists(workspaceId))) {
await storage.make(workspaceId)
if (!(await storage.exists(ctx, workspaceId))) {
await storage.make(ctx, workspaceId)
}
}
return new StorageBlobAdapter(workspaceId, storage)
return new StorageBlobAdapter(workspaceId, storage, ctx)
}

View File

@ -13,13 +13,14 @@
// limitations under the License.
//
import { Hierarchy, ModelDb, WorkspaceId } from '@hcengineering/core'
import { Hierarchy, MeasureContext, ModelDb, WorkspaceId } from '@hcengineering/core'
import { DbAdapter, DummyDbAdapter } from '@hcengineering/server-core'
/**
* @public
*/
export async function createNullAdapter (
ctx: MeasureContext,
hierarchy: Hierarchy,
url: string,
workspaceId: WorkspaceId,
@ -27,12 +28,3 @@ export async function createNullAdapter (
): Promise<DbAdapter> {
return new DummyDbAdapter()
}
/**
* @public
*/
export interface MinioConfig {
endPoint: string
accessKey: string
secretKey: string
}

View File

@ -0,0 +1,159 @@
import { MinioConfig, MinioService } from '@hcengineering/minio'
import { createRawMongoDBAdapter } from '@hcengineering/mongo'
import { buildStorage, StorageAdapter, StorageConfiguration } from '@hcengineering/server-core'
import { serverFactories, ServerFactory } from '@hcengineering/server-ws'
export function storageConfigFromEnv (): StorageConfiguration {
const storageConfig: StorageConfiguration = JSON.parse(
process.env.STORAGE_CONFIG ?? '{ "default": "", "storages": []}'
)
if (storageConfig.storages.length === 0 || storageConfig.default === '') {
console.info('STORAGE_CONFIG is required for complex configuration, fallback to minio config')
let minioEndpoint = process.env.MINIO_ENDPOINT
if (minioEndpoint === undefined) {
console.error('MINIO_ENDPOINT is required')
process.exit(1)
}
const minioAccessKey = process.env.MINIO_ACCESS_KEY
if (minioAccessKey === undefined) {
console.error('MINIO_ACCESS_KEY is required')
process.exit(1)
}
let minioPort = 9000
const sp = minioEndpoint.split(':')
if (sp.length > 1) {
minioEndpoint = sp[0]
minioPort = parseInt(sp[1])
}
const minioSecretKey = process.env.MINIO_SECRET_KEY
if (minioSecretKey === undefined) {
console.error('MINIO_SECRET_KEY is required')
process.exit(1)
}
const minioConfig: MinioConfig = {
kind: 'minio',
name: 'minio',
port: minioPort,
region: 'us-east-1',
useSSL: false,
endpoint: minioEndpoint,
accessKeyId: minioAccessKey,
secretAccessKey: minioSecretKey
}
storageConfig.storages.push(minioConfig)
storageConfig.default = 'minio'
}
return storageConfig
}
export function serverConfigFromEnv (): {
url: string
elasticUrl: string
serverSecret: string
rekoniUrl: string
frontUrl: string
sesUrl: string | undefined
accountsUrl: string
serverPort: number
serverFactory: ServerFactory
enableCompression: boolean
elasticIndexName: string
} {
const serverPort = parseInt(process.env.SERVER_PORT ?? '3333')
const serverFactory = serverFactories[(process.env.SERVER_PROVIDER as string) ?? 'ws'] ?? serverFactories.ws
const enableCompression = (process.env.ENABLE_COMPRESSION ?? 'true') === 'true'
const url = process.env.MONGO_URL
if (url === undefined) {
console.error('please provide mongodb url')
process.exit(1)
}
const elasticUrl = process.env.ELASTIC_URL
if (elasticUrl === undefined) {
console.error('please provide elastic url')
process.exit(1)
}
const elasticIndexName = process.env.ELASTIC_INDEX_NAME
if (elasticIndexName === undefined) {
console.log('Please provide ELASTIC_INDEX_NAME')
process.exit(1)
}
const serverSecret = process.env.SERVER_SECRET
if (serverSecret === undefined) {
console.log('Please provide server secret')
process.exit(1)
}
const rekoniUrl = process.env.REKONI_URL
if (rekoniUrl === undefined) {
console.log('Please provide REKONI_URL url')
process.exit(1)
}
const frontUrl = process.env.FRONT_URL
if (frontUrl === undefined) {
console.log('Please provide FRONT_URL url')
process.exit(1)
}
const sesUrl = process.env.SES_URL
const accountsUrl = process.env.ACCOUNTS_URL
if (accountsUrl === undefined) {
console.log('Please provide ACCOUNTS_URL url')
process.exit(1)
}
return {
url,
elasticUrl,
elasticIndexName,
serverSecret,
rekoniUrl,
frontUrl,
sesUrl,
accountsUrl,
serverPort,
serverFactory,
enableCompression
}
}
// Temporary solution, until migration will be implemented.
const ONLY_MINIO = true
export function buildStorageFromConfig (config: StorageConfiguration, dbUrl: string): StorageAdapter {
if (ONLY_MINIO) {
const minioConfig = config.storages.find((it) => it.kind === 'minio') as MinioConfig
if (minioConfig === undefined) {
throw new Error('minio config is required')
}
return new MinioService({
accessKey: minioConfig.accessKeyId,
secretKey: minioConfig.accessKeyId,
endPoint: minioConfig.endpoint,
port: minioConfig.port,
useSSL: minioConfig.useSSL
})
}
return buildStorage(config, createRawMongoDBAdapter(dbUrl), (kind, config) => {
if (kind === MinioService.config) {
const c = config as MinioConfig
return new MinioService({
accessKey: c.accessKeyId,
secretKey: c.accessKeyId,
endPoint: c.endpoint,
port: c.port,
useSSL: c.useSSL
})
} else {
throw new Error('Unsupported storage kind:' + kind)
}
})
}

View File

@ -43,6 +43,7 @@
"@hcengineering/model": "^0.6.7",
"@hcengineering/server-token": "^0.6.7",
"@hcengineering/server-core": "^0.6.1",
"@hcengineering/server": "^0.6.4",
"@hcengineering/mongo": "^0.6.1",
"@hcengineering/minio": "^0.6.0"
}

View File

@ -25,14 +25,15 @@ import core, {
Hierarchy,
IndexKind,
IndexOrder,
MeasureContext,
ModelDb,
Tx,
WorkspaceId
} from '@hcengineering/core'
import { MinioService } from '@hcengineering/minio'
import { consoleModelLogger, MigrateOperation, ModelLogger } from '@hcengineering/model'
import { getWorkspaceDB } from '@hcengineering/mongo'
import { StorageAdapter } from '@hcengineering/server-core'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server'
import { StorageAdapter, StorageConfiguration } from '@hcengineering/server-core'
import { Db, Document, MongoClient } from 'mongodb'
import { connect } from './connect'
import toolPlugin from './plugin'
@ -70,7 +71,7 @@ export class FileModelLogger implements ModelLogger {
* @public
*/
export function prepareTools (rawTxes: Tx[]): { mongodbUri: string, storageAdapter: StorageAdapter, txes: Tx[] } {
let minioEndpoint = process.env.MINIO_ENDPOINT
const minioEndpoint = process.env.MINIO_ENDPOINT
if (minioEndpoint === undefined) {
console.error('please provide minio endpoint')
process.exit(1)
@ -94,28 +95,18 @@ export function prepareTools (rawTxes: Tx[]): { mongodbUri: string, storageAdapt
process.exit(1)
}
let minioPort = 9000
const sp = minioEndpoint.split(':')
if (sp.length > 1) {
minioEndpoint = sp[0]
minioPort = parseInt(sp[1])
}
const storageConfig: StorageConfiguration = storageConfigFromEnv()
const minio = new MinioService({
endPoint: minioEndpoint,
port: minioPort,
useSSL: false,
accessKey: minioAccessKey,
secretKey: minioSecretKey
})
const storageAdapter = buildStorageFromConfig(storageConfig, mongodbUri)
return { mongodbUri, storageAdapter: minio, txes: JSON.parse(JSON.stringify(rawTxes)) as Tx[] }
return { mongodbUri, storageAdapter, txes: JSON.parse(JSON.stringify(rawTxes)) as Tx[] }
}
/**
* @public
*/
export async function initModel (
ctx: MeasureContext,
transactorUrl: string,
workspaceId: WorkspaceId,
rawTxes: Tx[],
@ -154,8 +145,8 @@ export async function initModel (
await createUpdateIndexes(connection, db, logger)
logger.log('create minio bucket', { workspaceId })
if (!(await minio.exists(workspaceId))) {
await minio.make(workspaceId)
if (!(await minio.exists(ctx, workspaceId))) {
await minio.make(ctx, workspaceId)
}
} catch (e: any) {
logger.error('error', { error: e })

View File

@ -65,6 +65,7 @@ services:
- SERVER_PORT=8083
- SERVER_SECRET=secret
- ACCOUNTS_URL=http://localhost:3003
- MONGO_URL=mongodb://mongodb:27018
- UPLOAD_URL=/files
- TRANSACTOR_URL=ws://localhost:3334
- ELASTIC_URL=http://elastic:9200