UBER-1085: Improve upgrade tool (#3852)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2023-10-18 16:33:37 +07:00 committed by GitHub
parent d39f81600a
commit 92226819ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 154 additions and 61 deletions

View File

@ -29,7 +29,11 @@ export default async () => {
client = await createClient(connect)
for (const op of migrateOperations) {
console.log('Migrate', op[0])
await op[1].upgrade(client)
await op[1].upgrade(client, {
log (...data) {
console.log(...data)
}
})
}
}
// Check if we had dev hook for client.

View File

@ -29,7 +29,8 @@ import {
replacePassword,
setAccountAdmin,
setRole,
upgradeWorkspace
upgradeWorkspace,
WorkspaceInfoOnly
} from '@hcengineering/account'
import { setMetadata } from '@hcengineering/platform'
import {
@ -40,17 +41,18 @@ import {
restore
} from '@hcengineering/server-backup'
import serverToken, { decodeToken, generateToken } from '@hcengineering/server-token'
import toolPlugin from '@hcengineering/server-tool'
import toolPlugin, { FileModelLogger } from '@hcengineering/server-tool'
import { program } from 'commander'
import { Db, MongoClient } from 'mongodb'
import { clearTelegramHistory } from './telegram'
import { diffWorkspace } from './workspace'
import { Data, getWorkspaceId, Tx, Version } from '@hcengineering/core'
import { Data, getWorkspaceId, RateLimitter, Tx, Version } from '@hcengineering/core'
import { MinioService } from '@hcengineering/minio'
import { MigrateOperation } from '@hcengineering/model'
import { openAIConfigDefaults } from '@hcengineering/openai'
import path from 'path'
import { benchmark } from './benchmark'
import {
cleanArchivedSpaces,
@ -232,19 +234,47 @@ export function devTool (
program
.command('upgrade')
.description('upgrade')
.option('-p|--parallel', 'Parallel upgrade', false)
.action(async (cmd: { parallel: boolean }) => {
.option('-p|--parallel <parallel>', 'Parallel upgrade', '0')
.option('-l|--logs <logs>', 'Default logs folder', './logs')
.option('-r|--retry <retry>', 'Number of apply retries', '0')
.option('-f|--force [force]', 'Force update', false)
.action(async (cmd: { parallel: string, logs: string, retry: string, force: boolean }) => {
const { mongodbUri, version, txes, migrateOperations } = prepareTools()
return await withDatabase(mongodbUri, async (db) => {
const workspaces = await listWorkspaces(db, productId)
if (cmd.parallel) {
await Promise.all(
workspaces.map((ws) => upgradeWorkspace(version, txes, migrateOperations, productId, db, ws.workspace))
)
} else {
const withError: string[] = []
async function _upgradeWorkspace (ws: WorkspaceInfoOnly): Promise<void> {
const t = Date.now()
const logger = new FileModelLogger(path.join(cmd.logs, `${ws.workspace}.log`))
console.log('---UPGRADING----', ws.workspace, logger.file)
try {
await upgradeWorkspace(version, txes, migrateOperations, productId, db, ws.workspace, logger, cmd.force)
console.log('---UPGRADING-DONE----', ws.workspace, Date.now() - t)
} catch (err: any) {
withError.push(ws.workspace)
logger.log('error', JSON.stringify(err))
console.log('---UPGRADING-FAILED----', ws.workspace, Date.now() - t)
} finally {
logger.close()
}
}
if (cmd.parallel !== '0') {
const parallel = parseInt(cmd.parallel) ?? 1
const rateLimit = new RateLimitter(() => ({ rate: parallel }))
console.log('parallel upgrade', parallel, cmd.parallel)
for (const ws of workspaces) {
console.log('---UPGRADING----', ws.workspace)
await upgradeWorkspace(version, txes, migrateOperations, productId, db, ws.workspace)
await rateLimit.exec(() => {
return _upgradeWorkspace(ws)
})
}
} else {
console.log('UPGRADE write logs at:', cmd.logs)
for (const ws of workspaces) {
await _upgradeWorkspace(ws)
}
if (withError.length > 0) {
console.log('Failed workspaces', withError)
}
}
})

View File

@ -376,7 +376,7 @@ export interface MigrationState extends Doc {
/**
* @public
*/
export function versionToString (version: Version): string {
export function versionToString (version: Version | Data<Version>): string {
return `${version?.major}.${version?.minor}.${version?.patch}`
}

View File

@ -358,11 +358,13 @@ async function loadModel (
return modelResponse
}
console.log(
'find' + (modelResponse.full ? 'full model' : 'model diff'),
modelResponse.transactions.length,
Date.now() - t
)
if (typeof window !== 'undefined') {
console.log(
'find' + (modelResponse.full ? 'full model' : 'model diff'),
modelResponse.transactions.length,
Date.now() - t
)
}
await buildModel(modelResponse, allowedPlugins, configs, hierarchy, model)
return modelResponse

View File

@ -17,6 +17,7 @@ import core, {
TxOperations,
Data
} from '@hcengineering/core'
import { ModelLogger } from './utils'
/**
* @public
@ -95,9 +96,9 @@ export type MigrationUpgradeClient = Client
*/
export interface MigrateOperation {
// Perform low level migration
migrate: (client: MigrationClient) => Promise<void>
migrate: (client: MigrationClient, logger: ModelLogger) => Promise<void>
// Perform high level upgrade operations.
upgrade: (client: MigrationUpgradeClient) => Promise<void>
upgrade: (client: MigrationUpgradeClient, logger: ModelLogger) => Promise<void>
}
/**

View File

@ -51,3 +51,19 @@ export async function createOrUpdate<T extends Doc> (
await client.createDoc<T>(_class, space, data, _id)
}
}
/**
* @public
*/
export interface ModelLogger {
log: (...data: any[]) => void
}
/**
* @public
*/
export const consoleModelLogger: ModelLogger = {
log (...data: any[]): void {
console.log(...data)
}
}

View File

@ -78,7 +78,6 @@ class Connection implements ClientConnection {
private readonly onUnauthorized?: () => void,
readonly onConnect?: (event: ClientConnectEvent) => Promise<void>
) {
console.log('connection created')
this.interval = setInterval(() => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
@ -171,7 +170,6 @@ class Connection implements ClientConnection {
typeof sessionStorage !== 'undefined'
? sessionStorage.getItem('session.id.' + this.url) ?? undefined
: undefined
console.log('find sessionId', this.sessionId)
this.sessionId = this.sessionId ?? generateId()
if (typeof sessionStorage !== 'undefined') {
sessionStorage.setItem('session.id.' + this.url, this.sessionId)
@ -203,7 +201,6 @@ class Connection implements ClientConnection {
v.reconnect?.()
}
resolve(websocket)
console.log('reconnect info', (resp as HelloResponse).reconnect)
void this.onConnect?.(
(resp as HelloResponse).reconnect === true ? ClientConnectEvent.Reconnected : ClientConnectEvent.Connected
@ -289,7 +286,6 @@ class Connection implements ClientConnection {
websocket.onopen = () => {
const useBinary = getMetadata(client.metadata.UseBinaryProtocol) ?? true
const useCompression = getMetadata(client.metadata.UseProtocolCompression) ?? false
console.log('connection opened...', socketId, useBinary, useCompression)
clearTimeout(dialTimer)
const helloRequest: HelloRequest = {
method: 'hello',

View File

@ -50,7 +50,7 @@ export default async () => {
let client = createClient(
(handler: TxHandler) => {
const url = new URL(`/${token}`, endpoint)
console.log('connecting to', url.href)
const upgradeHandler: TxHandler = (tx) => {
if (tx?._class === core.class.TxWorkspaceEvent) {
const event = tx as TxWorkspaceEvent

View File

@ -32,9 +32,10 @@ import core, {
Tx,
TxOperations,
Version,
versionToString,
WorkspaceId
} from '@hcengineering/core'
import { MigrateOperation } from '@hcengineering/model'
import { consoleModelLogger, MigrateOperation, ModelLogger } from '@hcengineering/model'
import platform, {
getMetadata,
Metadata,
@ -118,6 +119,7 @@ export interface Workspace {
accounts: ObjectId[]
productId: string
disabled?: boolean
version?: Data<Version>
}
/**
@ -609,7 +611,9 @@ export async function upgradeWorkspace (
migrationOperation: [string, MigrateOperation][],
productId: string,
db: Db,
workspace: string
workspace: string,
logger: ModelLogger = consoleModelLogger,
forceUpdate: boolean = true
): Promise<string> {
const ws = await getWorkspace(db, productId, workspace)
if (ws === null) {
@ -620,14 +624,26 @@ export async function upgradeWorkspace (
throw new PlatformError(new Status(Severity.ERROR, platform.status.ProductIdMismatch, { productId }))
}
}
const versionStr = versionToString(version)
const currentVersion = await db.collection<Workspace>(WORKSPACE_COLLECTION).findOne({ workspace })
console.log(
`${forceUpdate ? 'force-' : ''}upgrade from "${
currentVersion?.version !== undefined ? versionToString(currentVersion.version) : ''
}" to "${versionStr}"`
)
if (currentVersion?.version !== undefined && !forceUpdate && versionStr === versionToString(currentVersion.version)) {
return versionStr
}
await db.collection(WORKSPACE_COLLECTION).updateOne(
{ workspace },
{
$set: { version }
}
)
await upgradeModel(getTransactor(), getWorkspaceId(workspace, productId), txes, migrationOperation)
return `${version.major}.${version.minor}.${version.patch}`
await upgradeModel(getTransactor(), getWorkspaceId(workspace, productId), txes, migrationOperation, logger)
return versionStr
}
/**

View File

@ -29,17 +29,37 @@ import core, {
WorkspaceId
} from '@hcengineering/core'
import { MinioService } from '@hcengineering/minio'
import { MigrateOperation } from '@hcengineering/model'
import { consoleModelLogger, MigrateOperation, ModelLogger } from '@hcengineering/model'
import { getWorkspaceDB } from '@hcengineering/mongo'
import { Db, Document, MongoClient } from 'mongodb'
import { connect } from './connect'
import toolPlugin from './plugin'
import { MigrateClientImpl } from './upgrade'
import fs from 'fs'
import path from 'path'
export * from './connect'
export * from './plugin'
export { toolPlugin as default }
export class FileModelLogger implements ModelLogger {
handle: fs.WriteStream
constructor (readonly file: string) {
fs.mkdirSync(path.dirname(this.file), { recursive: true })
this.handle = fs.createWriteStream(this.file, { flags: 'a' })
}
log (...data: any[]): void {
this.handle.write(data.map((it: any) => JSON.stringify(it)).join(' ') + '\n')
}
close (): void {
this.handle.close()
}
}
/**
* @public
*/
@ -93,7 +113,8 @@ export async function initModel (
transactorUrl: string,
workspaceId: WorkspaceId,
rawTxes: Tx[],
migrateOperations: [string, MigrateOperation][]
migrateOperations: [string, MigrateOperation][],
logger: ModelLogger = consoleModelLogger
): Promise<void> {
const { mongodbUri, minio, txes } = prepareTools(rawTxes)
if (txes.some((tx) => tx.objectSpace !== core.space.Model)) {
@ -105,33 +126,33 @@ export async function initModel (
await client.connect()
const db = getWorkspaceDB(client, workspaceId)
console.log('dropping database...')
logger.log('dropping database...')
await db.dropDatabase()
console.log('creating model...')
logger.log('creating model...')
const model = txes
const result = await db.collection(DOMAIN_TX).insertMany(model as Document[])
console.log(`${result.insertedCount} model transactions inserted.`)
logger.log(`${result.insertedCount} model transactions inserted.`)
console.log('creating data...')
logger.log('creating data...')
const connection = (await connect(transactorUrl, workspaceId, undefined, {
model: 'upgrade'
})) as unknown as CoreClient & BackupClient
try {
for (const op of migrateOperations) {
console.log('Migrage', op[0])
await op[1].upgrade(connection)
logger.log('Migrage', op[0])
await op[1].upgrade(connection, logger)
}
} catch (e) {
console.log(e)
logger.log(e)
} finally {
await connection.close()
}
// Create update indexes
await createUpdateIndexes(connection, db)
await createUpdateIndexes(connection, db, logger)
console.log('create minio bucket')
logger.log('create minio bucket')
if (!(await minio.exists(workspaceId))) {
await minio.make(workspaceId)
}
@ -147,7 +168,8 @@ export async function upgradeModel (
transactorUrl: string,
workspaceId: WorkspaceId,
rawTxes: Tx[],
migrateOperations: [string, MigrateOperation][]
migrateOperations: [string, MigrateOperation][],
logger: ModelLogger = consoleModelLogger
): Promise<void> {
const { mongodbUri, txes } = prepareTools(rawTxes)
@ -160,19 +182,19 @@ export async function upgradeModel (
await client.connect()
const db = getWorkspaceDB(client, workspaceId)
console.log(`${workspaceId.name}: removing model...`)
logger.log(`${workspaceId.name}: removing model...`)
// we're preserving accounts (created by core.account.System).
const result = await db.collection(DOMAIN_TX).deleteMany({
objectSpace: core.space.Model,
modifiedBy: core.account.System,
objectClass: { $nin: [contact.class.PersonAccount, 'contact:class:EmployeeAccount'] }
})
console.log(`${workspaceId.name}: ${result.deletedCount} transactions deleted.`)
logger.log(`${workspaceId.name}: ${result.deletedCount} transactions deleted.`)
console.log(`${workspaceId.name}: creating model...`)
logger.log(`${workspaceId.name}: creating model...`)
const model = txes
const insert = await db.collection(DOMAIN_TX).insertMany(model as Document[])
console.log(`${workspaceId.name}: ${insert.insertedCount} model transactions inserted.`)
logger.log(`${workspaceId.name}: ${insert.insertedCount} model transactions inserted.`)
const hierarchy = new Hierarchy()
const modelDb = new ModelDb(hierarchy)
@ -189,20 +211,20 @@ export async function upgradeModel (
const migrateClient = new MigrateClientImpl(db, hierarchy, modelDb)
for (const op of migrateOperations) {
console.log(`${workspaceId.name}: migrate:`, op[0])
await op[1].migrate(migrateClient)
logger.log(`${workspaceId.name}: migrate:`, op[0])
await op[1].migrate(migrateClient, logger)
}
console.log(`${workspaceId.name}: Apply upgrade operations`)
logger.log(`${workspaceId.name}: Apply upgrade operations`)
const connection = await connect(transactorUrl, workspaceId, undefined, { mode: 'backup', model: 'upgrade' })
// Create update indexes
await createUpdateIndexes(connection, db)
await createUpdateIndexes(connection, db, logger)
for (const op of migrateOperations) {
console.log(`${workspaceId.name}: upgrade:`, op[0])
await op[1].upgrade(connection)
logger.log(`${workspaceId.name}: upgrade:`, op[0])
await op[1].upgrade(connection, logger)
}
await connection.close()
@ -211,7 +233,7 @@ export async function upgradeModel (
}
}
async function createUpdateIndexes (connection: CoreClient, db: Db): Promise<void> {
async function createUpdateIndexes (connection: CoreClient, db: Db, logger: ModelLogger): Promise<void> {
const classes = await connection.findAll(core.class.Class, {})
const hierarchy = connection.getHierarchy()
@ -252,12 +274,12 @@ async function createUpdateIndexes (connection: CoreClient, db: Db): Promise<voi
try {
await collection.createIndex(vv)
} catch (err: any) {
console.error(err)
logger.log('error', JSON.stringify(err))
}
bb.push(vv)
}
if (bb.length > 0) {
console.log('created indexes', d, bb)
logger.log('created indexes', d, bb)
}
}
}

View File

@ -141,7 +141,7 @@ class TSessionManager implements SessionManager {
return this.sessionFactory(token, pipeline, this.broadcast.bind(this))
}
upgradeId: string | undefined
upgradeIdMap: Map<string, string> = new Map()
async addSession (
ctx: MeasureContext,
@ -163,15 +163,18 @@ class TSessionManager implements SessionManager {
}
let pipeline: Pipeline
const upgradeId = this.upgradeIdMap.get(token.workspace.name)
if (token.extra?.model === 'upgrade') {
if (this.upgradeId !== undefined && sessionId !== this.upgradeId) {
if (upgradeId !== undefined && sessionId !== upgradeId) {
ws.close()
throw new Error('Another Upgrade in progress....')
}
this.upgradeId = sessionId
if (sessionId !== undefined) {
this.upgradeIdMap.set(token.workspace.name, sessionId)
}
pipeline = await this.createUpgradeSession(token, sessionId, ctx, wsString, workspace, pipelineFactory, ws)
} else {
if (workspace.upgrade && sessionId !== this.upgradeId) {
if (workspace.upgrade && sessionId !== upgradeId) {
ws.close()
throw new Error('Upgrade in progress....')
}
@ -316,8 +319,9 @@ class TSessionManager implements SessionManager {
}
const sessionRef = this.sessions.get(ws.id)
if (sessionRef !== undefined) {
if (this.upgradeId === sessionRef.session.sessionId) {
this.upgradeId = undefined
const upgradeId = this.upgradeIdMap.get(workspaceId.name)
if (upgradeId === sessionRef.session.sessionId) {
this.upgradeIdMap.delete(workspaceId.name)
}
this.sessions.delete(ws.id)
workspace.sessions.delete(sessionRef.session.sessionId)
@ -422,12 +426,14 @@ class TSessionManager implements SessionManager {
if (this.workspaces.get(wsid)?.id === wsUID) {
this.workspaces.delete(wsid)
this.upgradeIdMap.delete(workspaceId.name)
}
if (LOGGING_ENABLED) {
console.timeLog(workspaceId.name, 'Closed workspace', wsUID)
}
} catch (err: any) {
this.workspaces.delete(wsid)
this.upgradeIdMap.delete(workspaceId.name)
if (LOGGING_ENABLED) {
console.error(workspaceId.name, err)
}