UBERF-6598: Perform upgrade all workspaces to new versions (#5392)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-04-18 13:28:13 +07:00 committed by GitHub
parent 4536ad8355
commit 808b5c8f7d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 2192 additions and 2073 deletions

View File

@ -30,10 +30,8 @@ import {
replacePassword,
setAccountAdmin,
setRole,
updateWorkspace,
upgradeWorkspace,
type Workspace,
type WorkspaceInfo
UpgradeWorker
} from '@hcengineering/account'
import { setMetadata } from '@hcengineering/platform'
import {
@ -45,7 +43,7 @@ import {
restore
} from '@hcengineering/server-backup'
import serverToken, { decodeToken, generateToken } from '@hcengineering/server-token'
import toolPlugin, { FileModelLogger } from '@hcengineering/server-tool'
import toolPlugin from '@hcengineering/server-tool'
import { program, type Command } from 'commander'
import { type Db, type MongoClient } from 'mongodb'
@ -56,7 +54,6 @@ import core, {
getWorkspaceId,
MeasureMetricsContext,
metricsToString,
RateLimiter,
versionToString,
type AccountRole,
type Data,
@ -69,7 +66,6 @@ import { getMongoClient, getWorkspaceDB } from '@hcengineering/mongo'
import { openAIConfigDefaults } from '@hcengineering/openai'
import { type StorageAdapter } from '@hcengineering/server-core'
import { deepEqual } from 'fast-equals'
import path from 'path'
import { benchmark } from './benchmark'
import {
cleanArchivedSpaces,
@ -345,93 +341,15 @@ export function devTool (
.option('-f|--force [force]', 'Force update', false)
.action(async (cmd: { parallel: string, logs: string, retry: string, force: boolean, console: boolean }) => {
const { mongodbUri, version, txes, migrateOperations } = prepareTools()
await withDatabase(mongodbUri, async (db) => {
const workspaces = await listWorkspacesRaw(db, productId)
workspaces.sort((a, b) => b.lastVisit - a.lastVisit)
// We need to update workspaces with missing workspaceUrl
for (const ws of workspaces) {
if (ws.workspaceUrl == null) {
const upd: Partial<Workspace> = {
workspaceUrl: ws.workspace
}
if (ws.workspaceName == null) {
upd.workspaceName = ws.workspace
}
await updateWorkspace(db, productId, ws, upd)
}
}
const withError: string[] = []
let toProcess = workspaces.length
const st = Date.now()
async function _upgradeWorkspace (ws: WorkspaceInfo): Promise<void> {
if (ws.disabled === true) {
return
}
const t = Date.now()
const logger = cmd.console
? consoleModelLogger
: new FileModelLogger(path.join(cmd.logs, `${ws.workspace}.log`))
const avgTime = (Date.now() - st) / (workspaces.length - toProcess + 1)
console.log(
'----------------------------------------------------------\n---UPGRADING----',
'pending: ',
toProcess,
'ETA: ',
Math.floor(avgTime * toProcess),
ws.workspace
)
toProcess--
try {
await upgradeWorkspace(
toolCtx,
version,
txes,
migrateOperations,
productId,
db,
ws.workspaceUrl ?? ws.workspace,
logger,
cmd.force
)
console.log('---done---------', 'pending: ', toProcess, 'TIME:', Date.now() - t, ws.workspace)
} catch (err: any) {
withError.push(ws.workspace)
logger.log('error', JSON.stringify(err))
console.log(' FAILED-------', 'pending: ', toProcess, 'TIME:', Date.now() - t, ws.workspace)
} finally {
if (!cmd.console) {
;(logger as FileModelLogger).close()
}
}
}
if (cmd.parallel !== '0') {
const parallel = parseInt(cmd.parallel) ?? 1
const rateLimit = new RateLimiter(parallel)
console.log('parallel upgrade', parallel, cmd.parallel)
await Promise.all(
workspaces.map((it) =>
rateLimit.add(() => {
return _upgradeWorkspace(it)
})
)
)
console.log('Upgrade done')
// console.log((process as any)._getActiveHandles())
// console.log((process as any)._getActiveRequests())
// process.exit()
} else {
console.log('UPGRADE write logs at:', cmd.logs)
for (const ws of workspaces) {
await _upgradeWorkspace(ws)
}
if (withError.length > 0) {
console.log('Failed workspaces', withError)
}
}
await withDatabase(mongodbUri, async (db, client) => {
const worker = new UpgradeWorker(db, client, version, txes, migrateOperations, productId)
await worker.upgradeAll(toolCtx, {
errorHandler: async (ws, err) => {},
force: cmd.force,
console: cmd.console,
logs: cmd.logs,
parallel: parseInt(cmd.parallel ?? '1')
})
})
})

View File

@ -26,8 +26,8 @@ import contact, { contactId, createModel as contactModel } from '@hcengineering/
import { createModel as coreModel } from '@hcengineering/model-core'
import document, { documentId, createModel as documentModel } from '@hcengineering/model-document'
import gmail, { gmailId, createModel as gmailModel } from '@hcengineering/model-gmail'
import { guestId, createModel as guestModel } from '@hcengineering/model-guest'
import hr, { hrId, createModel as hrModel } from '@hcengineering/model-hr'
import { timeId, createModel as timeModel } from '@hcengineering/model-time'
import inventory, { inventoryId, createModel as inventoryModel } from '@hcengineering/model-inventory'
import lead, { leadId, createModel as leadModel } from '@hcengineering/model-lead'
import notification, { notificationId, createModel as notificationModel } from '@hcengineering/model-notification'
@ -37,16 +37,17 @@ import recruit, { recruitId, createModel as recruitModel } from '@hcengineering/
import { requestId, createModel as requestModel } from '@hcengineering/model-request'
import { serverActivityId, createModel as serverActivityModel } from '@hcengineering/model-server-activity'
import { serverAttachmentId, createModel as serverAttachmentModel } from '@hcengineering/model-server-attachment'
import { serverCalendarId, createModel as serverCalendarModel } from '@hcengineering/model-server-calendar'
import { serverChunterId, createModel as serverChunterModel } from '@hcengineering/model-server-chunter'
import {
serverCollaborationId,
createModel as serverCollaborationModel
} from '@hcengineering/model-server-collaboration'
import { serverCalendarId, createModel as serverCalendarModel } from '@hcengineering/model-server-calendar'
import { serverChunterId, createModel as serverChunterModel } from '@hcengineering/model-server-chunter'
import { serverContactId, createModel as serverContactModel } from '@hcengineering/model-server-contact'
import { serverCoreId, createModel as serverCoreModel } from '@hcengineering/model-server-core'
import { serverDocumentId, createModel as serverDocumentModel } from '@hcengineering/model-server-document'
import { serverGmailId, createModel as serverGmailModel } from '@hcengineering/model-server-gmail'
import { serverGuestId, createModel as serverGuestModel } from '@hcengineering/model-server-guest'
import { serverHrId, createModel as serverHrModel } from '@hcengineering/model-server-hr'
import { serverInventoryId, createModel as serverInventoryModel } from '@hcengineering/model-server-inventory'
import { serverLeadId, createModel as serverLeadModel } from '@hcengineering/model-server-lead'
@ -58,6 +59,7 @@ import { serverTagsId, createModel as serverTagsModel } from '@hcengineering/mod
import { serverTaskId, createModel as serverTaskModel } from '@hcengineering/model-server-task'
import { serverTelegramId, createModel as serverTelegramModel } from '@hcengineering/model-server-telegram'
import { serverTemplatesId, createModel as serverTemplatesModel } from '@hcengineering/model-server-templates'
import { serverTimeId, createModel as serverTimeModel } from '@hcengineering/model-server-time'
import { serverTrackerId, createModel as serverTrackerModel } from '@hcengineering/model-server-tracker'
import { serverViewId, createModel as serverViewModel } from '@hcengineering/model-server-view'
import setting, { settingId, createModel as settingModel } from '@hcengineering/model-setting'
@ -67,12 +69,10 @@ import { taskId, createModel as taskModel } from '@hcengineering/model-task'
import telegram, { telegramId, createModel as telegramModel } from '@hcengineering/model-telegram'
import { templatesId, createModel as templatesModel } from '@hcengineering/model-templates'
import { textEditorId, createModel as textEditorModel } from '@hcengineering/model-text-editor'
import { timeId, createModel as timeModel } from '@hcengineering/model-time'
import tracker, { trackerId, createModel as trackerModel } from '@hcengineering/model-tracker'
import view, { viewId, createModel as viewModel } from '@hcengineering/model-view'
import workbench, { workbenchId, createModel as workbenchModel } from '@hcengineering/model-workbench'
import { guestId, createModel as guestModel } from '@hcengineering/model-guest'
import { serverGuestId, createModel as serverGuestModel } from '@hcengineering/model-server-guest'
import { serverTimeId, createModel as serverTimeModel } from '@hcengineering/model-server-time'
import { openAIId, createModel as serverOpenAI } from '@hcengineering/model-server-openai'
import { createModel as serverTranslate, translateId } from '@hcengineering/model-server-translate'
@ -95,6 +95,8 @@ export function getModelVersion (): Data<Version> {
return { major: 0, minor: 6, patch: 0 }
}
export type { MigrateOperation } from '@hcengineering/model'
/**
* @public
* @param enabled - a set of enabled plugins

View File

@ -26,11 +26,9 @@ export const coreOperation: MigrateOperation = {
// We need to delete all documents in doc index state for missing classes
const allClasses = client.hierarchy.getDescendants(core.class.Doc)
const allIndexed = allClasses.filter((it) => isClassIndexable(client.hierarchy, it))
const indexed = new Set(allIndexed)
const skipped = allClasses.filter((it) => !indexed.has(it))
// Next remove all non indexed classes and missing classes as well.
const updated = await client.update(
await client.update(
DOMAIN_DOC_INDEX_STATE,
{ objectClass: { $nin: allIndexed } },
{
@ -39,7 +37,6 @@ export const coreOperation: MigrateOperation = {
}
}
)
console.log('clearing non indexed documents', skipped, updated.updated, updated.matched)
},
async upgrade (client: MigrationUpgradeClient): Promise<void> {
await tryUpgrade(client, coreId, [

View File

@ -13,7 +13,6 @@
// limitations under the License.
//
import { getMethods } from '@hcengineering/account'
import { MeasureMetricsContext, newMetrics, type Tx } from '@hcengineering/core'
import builder, { getModelVersion, migrateOperations } from '@hcengineering/model-all'
import { serveAccount } from '.'
@ -25,4 +24,4 @@ const txes = JSON.parse(JSON.stringify(builder(enabled, disabled).getTxes())) as
const metricsContext = new MeasureMetricsContext('account', {}, {}, newMetrics())
serveAccount(metricsContext, getMethods(getModelVersion(), txes, migrateOperations))
serveAccount(metricsContext, getModelVersion(), txes, migrateOperations)

View File

@ -14,11 +14,18 @@
// limitations under the License.
//
import account, { ACCOUNT_DB, type AccountMethod, accountId, cleanInProgressWorkspaces } from '@hcengineering/account'
import account, {
ACCOUNT_DB,
UpgradeWorker,
accountId,
cleanInProgressWorkspaces,
getMethods
} from '@hcengineering/account'
import accountEn from '@hcengineering/account/lang/en.json'
import accountRu from '@hcengineering/account/lang/ru.json'
import { registerProviders } from '@hcengineering/auth-providers'
import { type MeasureContext } from '@hcengineering/core'
import { type Data, type MeasureContext, type Tx, type Version } from '@hcengineering/core'
import { getModelVersion, type MigrateOperation } from '@hcengineering/model-all'
import platform, { Severity, Status, addStringsLoader, setMetadata } from '@hcengineering/platform'
import serverToken from '@hcengineering/server-token'
import toolPlugin from '@hcengineering/server-tool'
@ -32,7 +39,14 @@ import { MongoClient } from 'mongodb'
/**
* @public
*/
export function serveAccount (measureCtx: MeasureContext, methods: Record<string, AccountMethod>, productId = ''): void {
export function serveAccount (
measureCtx: MeasureContext,
version: Data<Version>,
txes: Tx[],
migrateOperations: [string, MigrateOperation][],
productId: string = ''
): void {
const methods = getMethods(getModelVersion(), txes, migrateOperations)
const ACCOUNT_PORT = parseInt(process.env.ACCOUNT_PORT ?? '3000')
const dbUri = process.env.MONGO_URL
if (dbUri === undefined) {
@ -90,12 +104,21 @@ export function serveAccount (measureCtx: MeasureContext, methods: Record<string
const app = new Koa()
const router = new Router()
void client.then((p: MongoClient) => {
void client.then(async (p: MongoClient) => {
const db = p.db(ACCOUNT_DB)
registerProviders(measureCtx, app, router, db, productId, serverSecret, frontURL)
// We need to clean workspace with creating === true, since server is restarted.
void cleanInProgressWorkspaces(db, productId)
const worker = new UpgradeWorker(db, p, version, txes, migrateOperations, productId)
await worker.upgradeAll(measureCtx, {
errorHandler: async (ws, err) => {},
force: false,
console: false,
logs: 'upgrade-logs',
parallel: parseInt(process.env.PARALLEL ?? '1')
})
})
const extractToken = (header: IncomingHttpHeaders): string | undefined => {

View File

@ -66,7 +66,6 @@ import notification, {
} from '@hcengineering/notification'
import { getMetadata, getResource, translate } from '@hcengineering/platform'
import type { TriggerControl } from '@hcengineering/server-core'
import { stripTags } from '@hcengineering/text'
import serverCore from '@hcengineering/server-core'
import serverNotification, {
getEmployee,
@ -74,6 +73,7 @@ import serverNotification, {
getPersonAccountById,
NOTIFICATION_BODY_SIZE
} from '@hcengineering/server-notification'
import { stripTags } from '@hcengineering/text'
import { workbenchId } from '@hcengineering/workbench'
import webpush, { WebPushError } from 'web-push'
import { Content, NotifyResult } from './types'

View File

@ -17,7 +17,7 @@
import builder, { migrateOperations, getModelVersion } from '@hcengineering/model-all'
import { randomBytes } from 'crypto'
import { Db, MongoClient } from 'mongodb'
import accountPlugin, { getAccount, getMethods, getWorkspaceByUrl } from '..'
import accountPlugin, { getAccount, getMethods, getWorkspaceByUrl } from '../operations'
import { setMetadata } from '@hcengineering/platform'
import { MeasureMetricsContext } from '@hcengineering/core'

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -8,7 +8,7 @@ export const accountId = 'account' as Plugin
/**
* @public
*/
const accountPlugin = plugin(accountId, {
export const accountPlugin = plugin(accountId, {
metadata: {
FrontURL: '' as Metadata<string>,
SES_URL: '' as Metadata<string>,
@ -26,5 +26,3 @@ const accountPlugin = plugin(accountId, {
InviteSubject: '' as IntlString
}
})
export default accountPlugin

View File

@ -0,0 +1,160 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
import { BaseWorkspaceInfo, Data, RateLimiter, Tx, Version, type MeasureContext } from '@hcengineering/core'
import { MigrateOperation, ModelLogger } from '@hcengineering/model'
import { FileModelLogger } from '@hcengineering/server-tool'
import { Db, MongoClient } from 'mongodb'
import path from 'path'
import { listWorkspacesRaw, updateWorkspace, upgradeWorkspace, Workspace, WorkspaceInfo } from './operations'
export type UpgradeErrorHandler = (workspace: BaseWorkspaceInfo, error: any) => Promise<void>
export interface UpgradeOptions {
errorHandler: (workspace: BaseWorkspaceInfo, error: any) => Promise<void>
force: boolean
console: boolean
logs: string
parallel: number
}
export class UpgradeWorker {
constructor (
readonly db: Db,
readonly client: MongoClient,
readonly version: Data<Version>,
readonly txes: Tx[],
readonly migrationOperation: [string, MigrateOperation][],
readonly productId: string
) {}
canceled = false
st: number = Date.now()
workspaces: BaseWorkspaceInfo[] = []
toProcess: number = 0
async close (): Promise<void> {
this.canceled = true
}
private async _upgradeWorkspace (ctx: MeasureContext, ws: WorkspaceInfo, opt: UpgradeOptions): Promise<void> {
if (ws.disabled === true) {
return
}
const t = Date.now()
const ctxModelLogger: ModelLogger = {
log (msg: string, data: any): void {
void ctx.info(msg, data)
},
error (msg: string, data: any): void {
void ctx.error(msg, data)
}
}
const logger = opt.console ? ctxModelLogger : new FileModelLogger(path.join(opt.logs, `${ws.workspace}.log`))
const avgTime = (Date.now() - this.st) / (this.workspaces.length - this.toProcess + 1)
await ctx.info('----------------------------------------------------------\n---UPGRADING----', {
pending: this.toProcess,
eta: Math.floor(avgTime * this.toProcess),
workspace: ws.workspace
})
this.toProcess--
try {
await upgradeWorkspace(
ctx,
this.version,
this.txes,
this.migrationOperation,
this.productId,
this.db,
ws.workspaceUrl ?? ws.workspace,
logger,
opt.force
)
await ctx.info('---done---------', {
pending: this.toProcess,
time: Date.now() - t,
workspace: ws.workspace
})
} catch (err: any) {
await opt.errorHandler(ws, err)
logger.log('error', err)
if (!opt.console) {
await ctx.error('error', err)
}
await ctx.info('---failed---------', {
pending: this.toProcess,
time: Date.now() - t,
workspace: ws.workspace
})
} finally {
if (!opt.console) {
;(logger as FileModelLogger).close()
}
}
}
async upgradeAll (ctx: MeasureContext, opt: UpgradeOptions): Promise<void> {
const workspaces = await listWorkspacesRaw(this.db, this.productId)
workspaces.sort((a, b) => b.lastVisit - a.lastVisit)
// We need to update workspaces with missing workspaceUrl
for (const ws of workspaces) {
if (ws.workspaceUrl == null) {
const upd: Partial<Workspace> = {
workspaceUrl: ws.workspace
}
if (ws.workspaceName == null) {
upd.workspaceName = ws.workspace
}
await updateWorkspace(this.db, this.productId, ws, upd)
}
}
const withError: string[] = []
this.toProcess = workspaces.length
this.st = Date.now()
if (opt.parallel !== 0) {
const parallel = opt.parallel
const rateLimit = new RateLimiter(parallel)
await ctx.info('parallel upgrade', { parallel })
await Promise.all(
workspaces.map((it) =>
rateLimit.add(async () => {
await ctx.with('do-upgrade', {}, async () => {
await this._upgradeWorkspace(ctx, it, opt)
})
})
)
)
await ctx.info('Upgrade done')
} else {
await ctx.info('UPGRADE write logs at:', { logs: opt.logs })
for (const ws of workspaces) {
await this._upgradeWorkspace(ctx, ws, opt)
}
if (withError.length > 0) {
await ctx.info('Failed workspaces', withError)
}
}
}
}