mirror of
https://github.com/hcengineering/platform.git
synced 2024-11-21 16:09:12 +03:00
UBERF-8259: Do not store system model into DB (#6716)
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
parent
0da0d6e80a
commit
6fd8018baf
2
.vscode/launch.json
vendored
2
.vscode/launch.json
vendored
@ -129,7 +129,7 @@
|
||||
"MINIO_ACCESS_KEY": "minioadmin",
|
||||
"MINIO_SECRET_KEY": "minioadmin",
|
||||
"MINIO_ENDPOINT": "localhost",
|
||||
"MODEL_VERSION": "v0.6.286"
|
||||
"MODEL_VERSION": "v0.6.287"
|
||||
// "INIT_SCRIPT_URL": "https://raw.githubusercontent.com/hcengineering/init/main/script.yaml",
|
||||
// "INIT_WORKSPACE": "onboarding",
|
||||
},
|
||||
|
@ -1503,7 +1503,7 @@ export function devTool (
|
||||
.option('-w, --workspace <workspace>', 'Selected workspace only', '')
|
||||
.option('-c, --concurrency <concurrency>', 'Number of documents being processed concurrently', '10')
|
||||
.action(async (cmd: { workspace: string, concurrency: string }) => {
|
||||
const { mongodbUri, dbUrl } = prepareTools()
|
||||
const { mongodbUri, dbUrl, txes } = prepareTools()
|
||||
await withDatabase(mongodbUri, async (db, client) => {
|
||||
await withStorage(mongodbUri, async (adapter) => {
|
||||
const workspaces = await listWorkspacesPure(db)
|
||||
@ -1521,7 +1521,7 @@ export function devTool (
|
||||
workspaceUrl: workspace.workspaceUrl ?? ''
|
||||
}
|
||||
|
||||
const { pipeline } = await getServerPipeline(toolCtx, mongodbUri, dbUrl, wsUrl)
|
||||
const { pipeline } = await getServerPipeline(toolCtx, txes, mongodbUri, dbUrl, wsUrl)
|
||||
|
||||
await migrateMarkup(toolCtx, adapter, wsId, client, pipeline, parseInt(cmd.concurrency))
|
||||
|
||||
|
@ -15,7 +15,7 @@ import core, {
|
||||
} from '@hcengineering/core'
|
||||
import { getMongoClient, getWorkspaceDB } from '@hcengineering/mongo'
|
||||
import { type Pipeline, type StorageAdapter } from '@hcengineering/server-core'
|
||||
import { connect, fetchModel } from '@hcengineering/server-tool'
|
||||
import { connect } from '@hcengineering/server-tool'
|
||||
import { jsonToText, markupToYDoc } from '@hcengineering/text'
|
||||
import { type Db, type FindCursor, type MongoClient } from 'mongodb'
|
||||
|
||||
@ -123,7 +123,7 @@ export async function migrateMarkup (
|
||||
pipeline: Pipeline,
|
||||
concurrency: number
|
||||
): Promise<void> {
|
||||
const { hierarchy } = await fetchModel(ctx, pipeline)
|
||||
const hierarchy = pipeline.context.hierarchy
|
||||
|
||||
const workspaceDb = client.db(workspaceId.name)
|
||||
|
||||
|
@ -57,6 +57,7 @@
|
||||
"dotenv": "~16.0.0",
|
||||
"@hcengineering/backup-service": "^0.6.0",
|
||||
"@hcengineering/analytics": "^0.6.0",
|
||||
"@hcengineering/analytics-service": "^0.6.0"
|
||||
"@hcengineering/analytics-service": "^0.6.0",
|
||||
"@hcengineering/model-all": "^0.6.0"
|
||||
}
|
||||
}
|
||||
|
@ -16,12 +16,19 @@
|
||||
import { Analytics } from '@hcengineering/analytics'
|
||||
import { configureAnalytics, SplitLogger } from '@hcengineering/analytics-service'
|
||||
import { startBackup } from '@hcengineering/backup-service'
|
||||
import { MeasureMetricsContext, metricsToString, newMetrics } from '@hcengineering/core'
|
||||
import { MeasureMetricsContext, metricsToString, newMetrics, type Tx } from '@hcengineering/core'
|
||||
import { type PipelineFactory } from '@hcengineering/server-core'
|
||||
import { createBackupPipeline, getConfig } from '@hcengineering/server-pipeline'
|
||||
import { writeFile } from 'fs/promises'
|
||||
import { join } from 'path'
|
||||
|
||||
import builder from '@hcengineering/model-all'
|
||||
|
||||
const enabled = (process.env.MODEL_ENABLED ?? '*').split(',').map((it) => it.trim())
|
||||
const disabled = (process.env.MODEL_DISABLED ?? '').split(',').map((it) => it.trim())
|
||||
|
||||
const model = JSON.parse(JSON.stringify(builder(enabled, disabled).getTxes())) as Tx[]
|
||||
|
||||
const metricsContext = new MeasureMetricsContext(
|
||||
'backup',
|
||||
{},
|
||||
@ -58,7 +65,7 @@ const onClose = (): void => {
|
||||
startBackup(
|
||||
metricsContext,
|
||||
(mongoUrl, storageAdapter) => {
|
||||
const factory: PipelineFactory = createBackupPipeline(metricsContext, mongoUrl, {
|
||||
const factory: PipelineFactory = createBackupPipeline(metricsContext, mongoUrl, model, {
|
||||
externalStorage: storageAdapter,
|
||||
usePassedCtx: true
|
||||
})
|
||||
|
@ -72,6 +72,7 @@
|
||||
"@hcengineering/server-telegram": "^0.6.0",
|
||||
"@hcengineering/pod-telegram-bot": "^0.6.0",
|
||||
"@hcengineering/server-ai-bot": "^0.6.0",
|
||||
"@hcengineering/server-ai-bot-resources": "^0.6.0"
|
||||
"@hcengineering/server-ai-bot-resources": "^0.6.0",
|
||||
"@hcengineering/model-all": "^0.6.0"
|
||||
}
|
||||
}
|
||||
|
@ -14,7 +14,7 @@
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import { type Branding, type BrandingMap, type WorkspaceIdWithUrl } from '@hcengineering/core'
|
||||
import { type Branding, type BrandingMap, type Tx, type WorkspaceIdWithUrl } from '@hcengineering/core'
|
||||
import { buildStorageFromConfig, getMetricsContext } from '@hcengineering/server'
|
||||
|
||||
import { ClientSession, startSessionManager, type ServerFactory, type Session } from '@hcengineering/server'
|
||||
@ -25,6 +25,13 @@ import { serverAiBotId } from '@hcengineering/server-ai-bot'
|
||||
import { createAIBotAdapter } from '@hcengineering/server-ai-bot-resources'
|
||||
import { createServerPipeline, registerServerPlugins, registerStringLoaders } from '@hcengineering/server-pipeline'
|
||||
|
||||
import builder from '@hcengineering/model-all'
|
||||
|
||||
const enabled = (process.env.MODEL_ENABLED ?? '*').split(',').map((it) => it.trim())
|
||||
const disabled = (process.env.MODEL_DISABLED ?? '').split(',').map((it) => it.trim())
|
||||
|
||||
const model = JSON.parse(JSON.stringify(builder(enabled, disabled).getTxes())) as Tx[]
|
||||
|
||||
registerStringLoaders()
|
||||
|
||||
/**
|
||||
@ -64,6 +71,7 @@ export function start (
|
||||
const pipelineFactory = createServerPipeline(
|
||||
metrics,
|
||||
dbUrls,
|
||||
model,
|
||||
{ ...opt, externalStorage, adapterSecurity: rawDbUrl !== undefined },
|
||||
{
|
||||
serviceAdapters: {
|
||||
|
@ -14,14 +14,22 @@
|
||||
//
|
||||
|
||||
import core, {
|
||||
type Doc,
|
||||
type LoadModelResponse,
|
||||
type MeasureContext,
|
||||
type Timestamp,
|
||||
type Tx,
|
||||
type TxCUD,
|
||||
DOMAIN_TX
|
||||
} from '@hcengineering/core'
|
||||
import { PlatformError, unknownError } from '@hcengineering/platform'
|
||||
import type { Middleware, PipelineContext, TxAdapter, TxMiddlewareResult } from '@hcengineering/server-core'
|
||||
import type {
|
||||
Middleware,
|
||||
MiddlewareCreator,
|
||||
PipelineContext,
|
||||
TxAdapter,
|
||||
TxMiddlewareResult
|
||||
} from '@hcengineering/server-core'
|
||||
import { BaseMiddleware } from '@hcengineering/server-core'
|
||||
import crypto from 'node:crypto'
|
||||
|
||||
@ -34,20 +42,46 @@ export class ModelMiddleware extends BaseMiddleware implements Middleware {
|
||||
lastHashResponse!: Promise<LoadModelResponse>
|
||||
model!: Tx[]
|
||||
|
||||
static async create (ctx: MeasureContext, context: PipelineContext, next?: Middleware): Promise<Middleware> {
|
||||
const middleware = new ModelMiddleware(context, next)
|
||||
constructor (
|
||||
context: PipelineContext,
|
||||
next: Middleware | undefined,
|
||||
readonly systemTx: Tx[]
|
||||
) {
|
||||
super(context, next)
|
||||
}
|
||||
|
||||
static async doCreate (
|
||||
ctx: MeasureContext,
|
||||
context: PipelineContext,
|
||||
next: Middleware | undefined,
|
||||
systemTx: Tx[]
|
||||
): Promise<Middleware> {
|
||||
const middleware = new ModelMiddleware(context, next, systemTx)
|
||||
await middleware.init(ctx)
|
||||
return middleware
|
||||
}
|
||||
|
||||
static create (tx: Tx[]): MiddlewareCreator {
|
||||
return (ctx, context, next) => {
|
||||
return this.doCreate(ctx, context, next, tx)
|
||||
}
|
||||
}
|
||||
|
||||
async init (ctx: MeasureContext): Promise<void> {
|
||||
if (this.context.adapterManager == null) {
|
||||
throw new PlatformError(unknownError('Adapter manager should be configured'))
|
||||
}
|
||||
const txAdapter = this.context.adapterManager.getAdapter(DOMAIN_TX, true) as TxAdapter
|
||||
|
||||
const isUserTx = (it: Tx): boolean =>
|
||||
it.modifiedBy !== core.account.System ||
|
||||
(it as TxCUD<Doc>).objectClass === 'contact:class:Person' ||
|
||||
(it as TxCUD<Doc>).objectClass === 'contact:class:PersonAccount'
|
||||
|
||||
this.model = await ctx.with('get-model', {}, async (ctx) => {
|
||||
const model = await ctx.with('fetch-model', {}, (ctx) => txAdapter.getModel(ctx))
|
||||
const allUserTxes = await ctx.with('fetch-model', {}, (ctx) => txAdapter.getModel(ctx))
|
||||
const userTxes = allUserTxes.filter((it) => isUserTx(it))
|
||||
const model = this.systemTx.concat(userTxes)
|
||||
for (const tx of model) {
|
||||
try {
|
||||
this.context.hierarchy.tx(tx)
|
||||
@ -55,7 +89,7 @@ export class ModelMiddleware extends BaseMiddleware implements Middleware {
|
||||
ctx.warn('failed to apply model transaction, skipping', { tx: JSON.stringify(tx), err })
|
||||
}
|
||||
}
|
||||
this.context.modelDb.addTxes(ctx, model, false)
|
||||
this.context.modelDb.addTxes(ctx, model, true)
|
||||
return model
|
||||
})
|
||||
|
||||
|
@ -1174,7 +1174,9 @@ abstract class MongoAdapterBase implements DbAdapter {
|
||||
|
||||
async clean (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<void> {
|
||||
await ctx.with('clean', {}, async () => {
|
||||
await this.db.collection<Doc>(domain).deleteMany({ _id: { $in: docs } })
|
||||
if (docs.length > 0) {
|
||||
await this.db.collection<Doc>(domain).deleteMany({ _id: { $in: docs } })
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -1693,7 +1695,7 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
|
||||
)
|
||||
}
|
||||
model.forEach((tx) => (tx.modifiedBy === core.account.System && !isPersonAccount(tx) ? systemTx : userTx).push(tx))
|
||||
return systemTx.concat(userTx)
|
||||
return this.stripHash(systemTx.concat(userTx)) as Tx[]
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -169,6 +169,7 @@ export class DBCollectionHelper implements DomainHelperOperations {
|
||||
}
|
||||
|
||||
async init (domain?: Domain): Promise<void> {
|
||||
// Check and create DB if missin
|
||||
if (domain === undefined) {
|
||||
// Init existing collecfions
|
||||
for (const c of (await this.db.listCollections({}, { nameOnly: true }).toArray()).map((it) => it.name)) {
|
||||
|
@ -10,6 +10,7 @@ import {
|
||||
ModelDb,
|
||||
type Branding,
|
||||
type MeasureContext,
|
||||
type Tx,
|
||||
type WorkspaceIdWithUrl
|
||||
} from '@hcengineering/core'
|
||||
import { createElasticAdapter, createElasticBackupDataAdapter } from '@hcengineering/elastic'
|
||||
@ -100,6 +101,7 @@ export function getTxAdapterFactory (
|
||||
export function createServerPipeline (
|
||||
metrics: MeasureContext,
|
||||
dbUrls: string,
|
||||
model: Tx[],
|
||||
opt: {
|
||||
fullTextUrl: string
|
||||
rekoniUrl: string
|
||||
@ -139,7 +141,7 @@ export function createServerPipeline (
|
||||
DomainFindMiddleware.create,
|
||||
DomainTxMiddleware.create,
|
||||
DBAdapterInitMiddleware.create,
|
||||
ModelMiddleware.create,
|
||||
ModelMiddleware.create(model),
|
||||
DBAdapterMiddleware.create(conf), // Configure DB adapters
|
||||
BroadcastMiddleware.create(broadcast)
|
||||
]
|
||||
@ -164,6 +166,7 @@ export function createServerPipeline (
|
||||
export function createBackupPipeline (
|
||||
metrics: MeasureContext,
|
||||
dbUrls: string,
|
||||
systemTx: Tx[],
|
||||
opt: {
|
||||
usePassedCtx?: boolean
|
||||
adapterSecurity?: boolean
|
||||
@ -208,7 +211,7 @@ export function createBackupPipeline (
|
||||
ContextNameMiddleware.create,
|
||||
DomainFindMiddleware.create,
|
||||
DBAdapterInitMiddleware.create,
|
||||
ModelMiddleware.create,
|
||||
ModelMiddleware.create(systemTx),
|
||||
DBAdapterMiddleware.create(conf)
|
||||
]
|
||||
|
||||
@ -227,6 +230,7 @@ export function createBackupPipeline (
|
||||
|
||||
export async function getServerPipeline (
|
||||
ctx: MeasureContext,
|
||||
model: Tx[],
|
||||
mongodbUri: string,
|
||||
dbUrl: string | undefined,
|
||||
wsUrl: WorkspaceIdWithUrl
|
||||
@ -242,6 +246,7 @@ export async function getServerPipeline (
|
||||
const pipelineFactory = createServerPipeline(
|
||||
ctx,
|
||||
dbUrls,
|
||||
model,
|
||||
{
|
||||
externalStorage: storageAdapter,
|
||||
fullTextUrl: 'http://localhost:9200',
|
||||
|
@ -36,15 +36,16 @@ import core, {
|
||||
WorkspaceId,
|
||||
WorkspaceIdWithUrl,
|
||||
type Doc,
|
||||
type Ref,
|
||||
type TxCUD
|
||||
} from '@hcengineering/core'
|
||||
import { consoleModelLogger, MigrateOperation, ModelLogger, tryMigrate } from '@hcengineering/model'
|
||||
import {
|
||||
AggregatorStorageAdapter,
|
||||
DbAdapter,
|
||||
DomainIndexHelperImpl,
|
||||
Pipeline,
|
||||
StorageAdapter
|
||||
StorageAdapter,
|
||||
type DbAdapter
|
||||
} from '@hcengineering/server-core'
|
||||
import { connect } from './connect'
|
||||
import { InitScript, WorkspaceInitializer } from './initializer'
|
||||
@ -141,9 +142,16 @@ export async function initModel (
|
||||
logger.log('transactions deleted.', { workspaceId: workspaceId.name })
|
||||
}
|
||||
|
||||
logger.log('creating model...', workspaceId)
|
||||
await adapter.upload(ctx, DOMAIN_TX, txes)
|
||||
logger.log('model transactions inserted.', { count: txes.length })
|
||||
logger.log('creating database...', workspaceId)
|
||||
await adapter.upload(ctx, DOMAIN_TX, [
|
||||
{
|
||||
_class: core.class.Tx,
|
||||
_id: 'first-tx' as Ref<Doc>,
|
||||
modifiedBy: core.account.System,
|
||||
modifiedOn: Date.now(),
|
||||
space: core.space.DerivedTx
|
||||
}
|
||||
])
|
||||
|
||||
await progress(30)
|
||||
|
||||
@ -264,11 +272,14 @@ export async function upgradeModel (
|
||||
throw Error('Model txes must target only core.space.Model')
|
||||
}
|
||||
|
||||
const prevModel = await fetchModel(ctx, pipeline)
|
||||
const newModelRes = await ctx.with('load-model', {}, (ctx) => pipeline.loadModel(ctx, 0))
|
||||
const newModel = Array.isArray(newModelRes) ? newModelRes : newModelRes.transactions
|
||||
|
||||
const { hierarchy, modelDb, model } = await buildModel(ctx, newModel)
|
||||
const { migrateClient: preMigrateClient } = await prepareMigrationClient(
|
||||
pipeline,
|
||||
prevModel.hierarchy,
|
||||
prevModel.modelDb,
|
||||
hierarchy,
|
||||
modelDb,
|
||||
logger,
|
||||
storageAdapter,
|
||||
workspaceId
|
||||
@ -305,36 +316,9 @@ export async function upgradeModel (
|
||||
}
|
||||
logger.log('removing model...', { workspaceId: workspaceId.name })
|
||||
await progress(10)
|
||||
const toRemove = await pipeline.findAll(ctx, core.class.Tx, {
|
||||
objectSpace: core.space.Model,
|
||||
modifiedBy: core.account.System,
|
||||
objectClass: { $nin: [contact.class.PersonAccount, 'contact:class:EmployeeAccount'] }
|
||||
})
|
||||
await pipeline.context.lowLevelStorage.clean(
|
||||
ctx,
|
||||
DOMAIN_TX,
|
||||
toRemove.map((p) => p._id)
|
||||
)
|
||||
logger.log('transactions deleted.', { workspaceId: workspaceId.name, count: toRemove.length })
|
||||
logger.log('creating model...', { workspaceId: workspaceId.name })
|
||||
await pipeline.context.lowLevelStorage.upload(ctx, DOMAIN_TX, txes)
|
||||
|
||||
logger.log('model transactions inserted.', { workspaceId: workspaceId.name, count: txes.length })
|
||||
await progress(20)
|
||||
}
|
||||
const newModel = [
|
||||
...txes,
|
||||
...Array.from(
|
||||
prevModel.model.filter(
|
||||
(it) =>
|
||||
it.modifiedBy !== core.account.System ||
|
||||
(it as TxCUD<Doc>).objectClass === contact.class.Person ||
|
||||
(it as TxCUD<Doc>).objectClass === 'contact:class:PersonAccount'
|
||||
)
|
||||
)
|
||||
]
|
||||
|
||||
const { hierarchy, modelDb, model } = await fetchModel(ctx, pipeline, newModel)
|
||||
const { migrateClient, migrateState } = await prepareMigrationClient(
|
||||
pipeline,
|
||||
hierarchy,
|
||||
@ -383,6 +367,21 @@ export async function upgradeModel (
|
||||
{
|
||||
state: 'indexes-v4',
|
||||
func: upgradeIndexes
|
||||
},
|
||||
{
|
||||
state: 'delete-model',
|
||||
func: async (client) => {
|
||||
const model = await client.find<Tx>(DOMAIN_TX, { objectSpace: core.space.Model })
|
||||
|
||||
// Ignore Employee accounts.
|
||||
const isUserTx = (it: Tx): boolean =>
|
||||
it.modifiedBy !== core.account.System ||
|
||||
(it as TxCUD<Doc>).objectClass === 'contact:class:Person' ||
|
||||
(it as TxCUD<Doc>).objectClass === 'contact:class:PersonAccount'
|
||||
|
||||
const toDelete = model.filter((it) => !isUserTx(it)).map((it) => it._id)
|
||||
await client.deleteMany(DOMAIN_TX, { _id: { $in: toDelete } })
|
||||
}
|
||||
}
|
||||
])
|
||||
})
|
||||
@ -463,26 +462,20 @@ async function prepareMigrationClient (
|
||||
return { migrateClient, migrateState }
|
||||
}
|
||||
|
||||
export async function fetchModel (
|
||||
export async function buildModel (
|
||||
ctx: MeasureContext,
|
||||
pipeline: Pipeline,
|
||||
model?: Tx[]
|
||||
model: Tx[]
|
||||
): Promise<{ hierarchy: Hierarchy, modelDb: ModelDb, model: Tx[] }> {
|
||||
const hierarchy = new Hierarchy()
|
||||
const modelDb = new ModelDb(hierarchy)
|
||||
|
||||
if (model === undefined) {
|
||||
const res = await ctx.with('load-model', {}, (ctx) => pipeline.loadModel(ctx, 0))
|
||||
model = Array.isArray(res) ? res : res.transactions
|
||||
}
|
||||
|
||||
ctx.withSync('build local model', {}, () => {
|
||||
for (const tx of model ?? []) {
|
||||
try {
|
||||
hierarchy.tx(tx)
|
||||
} catch (err: any) {}
|
||||
}
|
||||
modelDb.addTxes(ctx, model as Tx[], false)
|
||||
modelDb.addTxes(ctx, model, false)
|
||||
})
|
||||
return { hierarchy, modelDb, model: model ?? [] }
|
||||
}
|
||||
|
@ -163,6 +163,7 @@ export async function createWorkspace (
|
||||
const factory: PipelineFactory = createServerPipeline(
|
||||
ctx,
|
||||
dbUrls,
|
||||
txes,
|
||||
{
|
||||
externalStorage: storageAdapter,
|
||||
fullTextUrl: 'http://localhost:9200',
|
||||
@ -288,7 +289,7 @@ export async function upgradeWorkspace (
|
||||
let pipeline: Pipeline | undefined
|
||||
let storageAdapter: StorageAdapter | undefined
|
||||
try {
|
||||
;({ pipeline, storageAdapter } = await getServerPipeline(ctx, mongodbUri, dbUrl, wsUrl))
|
||||
;({ pipeline, storageAdapter } = await getServerPipeline(ctx, txes, mongodbUri, dbUrl, wsUrl))
|
||||
const contextData = new SessionDataImpl(
|
||||
systemAccountEmail,
|
||||
'backup',
|
||||
|
Loading…
Reference in New Issue
Block a user