diff --git a/dev/storage/src/storage.ts b/dev/storage/src/storage.ts index f7b3947b50..66829a5a94 100644 --- a/dev/storage/src/storage.ts +++ b/dev/storage/src/storage.ts @@ -54,12 +54,6 @@ class InMemoryTxAdapter extends DummyDbAdapter implements TxAdapter { return r } - async init (model: Tx[]): Promise { - for (const tx of model) { - await this.txdb.tx(tx) - } - } - async getModel (): Promise { return builder().getTxes() } diff --git a/models/activity/src/index.ts b/models/activity/src/index.ts index cbae182738..f2bba38719 100644 --- a/models/activity/src/index.ts +++ b/models/activity/src/index.ts @@ -240,7 +240,7 @@ export class TReaction extends TAttachedDoc implements Reaction { @Index(IndexKind.Indexed) declare attachedTo: Ref - @Prop(TypeRef(activity.class.ActivityMessage), core.string.AttachedToClass) + @Prop(TypeRef(core.class.Class), core.string.AttachedToClass) @Index(IndexKind.Indexed) declare attachedToClass: Ref> diff --git a/models/hr/src/index.ts b/models/hr/src/index.ts index a0068f7a9d..8051287922 100644 --- a/models/hr/src/index.ts +++ b/models/hr/src/index.ts @@ -51,15 +51,15 @@ import { } from '@hcengineering/model' import attachment from '@hcengineering/model-attachment' import calendar from '@hcengineering/model-calendar' +import chunter from '@hcengineering/model-chunter' import contact, { TEmployee, TPersonAccount } from '@hcengineering/model-contact' import core, { TAttachedDoc, TDoc, TSpace, TType } from '@hcengineering/model-core' import view, { classPresenter, createAction } from '@hcengineering/model-view' import workbench from '@hcengineering/model-workbench' import notification from '@hcengineering/notification' import { type Asset, type IntlString } from '@hcengineering/platform' -import hr from './plugin' import { PaletteColorIndexes } from '@hcengineering/ui/src/colors' -import chunter from '@hcengineering/model-chunter' +import hr from './plugin' export { hrId } from '@hcengineering/hr' export { hrOperation } from './migration' @@ -147,6 +147,7 @@ export class TRequest extends TAttachedDoc implements Request { @Index(IndexKind.Indexed) declare attachedTo: Ref + @Prop(TypeRef(core.class.Class), core.string.Class) @Index(IndexKind.Indexed) declare attachedToClass: Ref> diff --git a/models/notification/src/index.ts b/models/notification/src/index.ts index e8cd4ed4e0..c85d83f9de 100644 --- a/models/notification/src/index.ts +++ b/models/notification/src/index.ts @@ -75,7 +75,7 @@ import { notificationId, type MentionInboxNotification } from '@hcengineering/notification' -import { type Asset, type IntlString } from '@hcengineering/platform' +import { getEmbeddedLabel, type Asset, type IntlString } from '@hcengineering/platform' import setting from '@hcengineering/setting' import { type AnyComponent } from '@hcengineering/ui/src/types' @@ -178,12 +178,15 @@ export class TNotificationContextPresenter extends TClass implements Notificatio @Model(notification.class.DocUpdates, core.class.Doc, DOMAIN_NOTIFICATION) export class TDocUpdates extends TDoc implements DocUpdates { + @Prop(TypeRef(core.class.Account), core.string.Account) @Index(IndexKind.Indexed) user!: Ref + @Prop(TypeRef(core.class.Account), core.string.AttachedTo) @Index(IndexKind.Indexed) attachedTo!: Ref + @Prop(TypeRef(core.class.Account), getEmbeddedLabel('Hidden')) @Index(IndexKind.Indexed) hidden!: boolean diff --git a/packages/core/package.json b/packages/core/package.json index 6d7eba6b7d..9263b77fed 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -34,7 +34,6 @@ }, "dependencies": { "@hcengineering/platform": "^0.6.9", - "just-clone": "~6.2.0", "fast-equals": "^2.0.3" }, "repository": "https://github.com/hcengineering/anticrm", diff --git a/packages/core/src/__tests__/client.test.ts b/packages/core/src/__tests__/client.test.ts index 8b47bcfa75..ba831ccf12 100644 --- a/packages/core/src/__tests__/client.test.ts +++ b/packages/core/src/__tests__/client.test.ts @@ -25,6 +25,7 @@ import type { DocumentQuery, FindResult, TxResult, SearchQuery, SearchOptions, S import { Tx, TxFactory, TxProcessor } from '../tx' import { connect } from './connection' import { genMinModel } from './minmodel' +import { clone } from '../clone' describe('client', () => { it('should create client and spaces', async () => { @@ -118,7 +119,7 @@ describe('client', () => { loadDocs: async (domain: Domain, docs: Ref[]) => [], upload: async (domain: Domain, docs: Doc[]) => {}, clean: async (domain: Domain, docs: Ref[]) => {}, - loadModel: async (last: Timestamp) => txes, + loadModel: async (last: Timestamp) => clone(txes), getAccount: async () => null as unknown as Account, measure: async () => { return async () => ({ time: 0, serverTime: 0 }) @@ -141,8 +142,8 @@ describe('client', () => { expect(result1).toHaveLength(1) expect(result1[0]._id).toStrictEqual(txCreateDoc1.objectId) - expect(spyCreate).toHaveBeenLastCalledWith(txCreateDoc1) - expect(spyUpdate).toBeCalledTimes(0) + expect(spyCreate).toHaveBeenLastCalledWith(txCreateDoc1, false) + expect(spyUpdate).toHaveBeenCalledTimes(0) await client1.close() const pluginData2 = { @@ -159,8 +160,8 @@ describe('client', () => { expect(result2).toHaveLength(2) expect(result2[0]._id).toStrictEqual(txCreateDoc1.objectId) expect(result2[1]._id).toStrictEqual(txCreateDoc2.objectId) - expect(spyCreate).toHaveBeenLastCalledWith(txCreateDoc2) - expect(spyUpdate).toBeCalledTimes(0) + expect(spyCreate).toHaveBeenLastCalledWith(txCreateDoc2, false) + expect(spyUpdate).toHaveBeenCalledTimes(0) await client2.close() const pluginData3 = { @@ -181,7 +182,7 @@ describe('client', () => { expect(result3).toHaveLength(1) expect(result3[0]._id).toStrictEqual(txCreateDoc2.objectId) - expect(spyCreate).toHaveBeenLastCalledWith(txCreateDoc2) + expect(spyCreate).toHaveBeenLastCalledWith(txCreateDoc2, false) expect(spyUpdate.mock.calls[1][1]).toStrictEqual(txUpdateDoc) expect(spyUpdate).toBeCalledTimes(2) await client3.close() diff --git a/packages/core/src/client.ts b/packages/core/src/client.ts index 559584a64f..7a2b987a89 100644 --- a/packages/core/src/client.ts +++ b/packages/core/src/client.ts @@ -18,11 +18,12 @@ import { BackupClient, DocChunk } from './backup' import { Account, AttachedDoc, Class, DOMAIN_MODEL, Doc, Domain, PluginConfiguration, Ref, Timestamp } from './classes' import core from './component' import { Hierarchy } from './hierarchy' +import { MeasureContext, MeasureMetricsContext } from './measurements' import { ModelDb } from './memdb' import type { DocumentQuery, FindOptions, FindResult, FulltextStorage, Storage, TxResult, WithLookup } from './storage' import { SearchOptions, SearchQuery, SearchResult, SortingOrder } from './storage' import { Tx, TxCUD, TxCollectionCUD, TxCreateDoc, TxProcessor, TxUpdateDoc } from './tx' -import { toFindResult } from './utils' +import { toFindResult, toIdMap } from './utils' const transactionThreshold = 500 @@ -222,8 +223,10 @@ export async function createClient ( connect: (txHandler: TxHandler) => Promise, // If set will build model with only allowed plugins. allowedPlugins?: Plugin[], - txPersistence?: TxPersistenceStore + txPersistence?: TxPersistenceStore, + _ctx?: MeasureContext ): Promise { + const ctx = _ctx ?? new MeasureMetricsContext('createClient', {}) let client: ClientImpl | null = null // Temporal buffer, while we apply model @@ -248,9 +251,13 @@ export async function createClient ( } const configs = new Map, PluginConfiguration>() - const conn = await connect(txHandler) + const conn = await ctx.with('connect', {}, async () => await connect(txHandler)) - await loadModel(conn, allowedPlugins, configs, hierarchy, model, false, txPersistence) + await ctx.with( + 'load-model', + { reload: false }, + async (ctx) => await loadModel(ctx, conn, allowedPlugins, configs, hierarchy, model, false, txPersistence) + ) txBuffer = txBuffer.filter((tx) => tx.space !== core.space.Model) @@ -264,14 +271,20 @@ export async function createClient ( conn.onConnect = async (event) => { console.log('Client: onConnect', event) // Find all new transactions and apply - const loadModelResponse = await loadModel(conn, allowedPlugins, configs, hierarchy, model, true, txPersistence) + const loadModelResponse = await ctx.with( + 'connect', + { reload: true }, + async (ctx) => await loadModel(ctx, conn, allowedPlugins, configs, hierarchy, model, true, txPersistence) + ) if (event === ClientConnectEvent.Reconnected && loadModelResponse.full) { // We have upgrade procedure and need rebuild all stuff. hierarchy = new Hierarchy() model = new ModelDb(hierarchy) - await buildModel(loadModelResponse, allowedPlugins, configs, hierarchy, model) + await ctx.with('build-model', {}, async (ctx) => { + await buildModel(ctx, loadModelResponse, allowedPlugins, configs, hierarchy, model) + }) await oldOnConnect?.(ClientConnectEvent.Upgraded) // No need to fetch more stuff since upgrade was happened. @@ -285,10 +298,15 @@ export async function createClient ( } // We need to look for last {transactionThreshold} transactions and if it is more since lastTx one we receive, we need to perform full refresh. - const atxes = await conn.findAll( - core.class.Tx, - { modifiedOn: { $gt: lastTx }, objectSpace: { $ne: core.space.Model } }, - { sort: { modifiedOn: SortingOrder.Ascending, _id: SortingOrder.Ascending }, limit: transactionThreshold } + const atxes = await ctx.with( + 'find-atx', + {}, + async () => + await conn.findAll( + core.class.Tx, + { modifiedOn: { $gt: lastTx }, objectSpace: { $ne: core.space.Model } }, + { sort: { modifiedOn: SortingOrder.Ascending, _id: SortingOrder.Ascending }, limit: transactionThreshold } + ) ) let needFullRefresh = false @@ -318,14 +336,23 @@ export async function createClient ( } async function tryLoadModel ( + ctx: MeasureContext, conn: ClientConnection, reload: boolean, persistence?: TxPersistenceStore ): Promise { - const current = (await persistence?.load()) ?? { full: true, transactions: [], hash: '' } + const current = (await ctx.with('persistence-load', {}, async () => await persistence?.load())) ?? { + full: true, + transactions: [], + hash: '' + } const lastTxTime = getLastTxTime(current.transactions) - const result = await conn.loadModel(lastTxTime, current.hash) + const result = await ctx.with( + 'connection-load-model', + { hash: current.hash !== '' }, + async (ctx) => await conn.loadModel(lastTxTime, current.hash) + ) if (Array.isArray(result)) { // Fallback to old behavior, only for tests @@ -337,10 +364,15 @@ async function tryLoadModel ( } // Save concatenated - await persistence?.store({ - ...result, - transactions: !result.full ? current.transactions.concat(result.transactions) : result.transactions - }) + void (await ctx.with( + 'persistence-store', + {}, + async (ctx) => + await persistence?.store({ + ...result, + transactions: !result.full ? current.transactions.concat(result.transactions) : result.transactions + }) + )) if (!result.full && !reload) { result.transactions = current.transactions.concat(result.transactions) @@ -361,6 +393,7 @@ function isPersonAccount (tx: Tx): boolean { } async function loadModel ( + ctx: MeasureContext, conn: ClientConnection, allowedPlugins: Plugin[] | undefined, configs: Map, PluginConfiguration>, @@ -371,7 +404,11 @@ async function loadModel ( ): Promise { const t = Date.now() - const modelResponse = await tryLoadModel(conn, reload, persistence) + const modelResponse = await ctx.with( + 'try-load-model', + { reload }, + async (ctx) => await tryLoadModel(ctx, conn, reload, persistence) + ) if (reload && modelResponse.full) { return modelResponse @@ -385,11 +422,14 @@ async function loadModel ( ) } - await buildModel(modelResponse, allowedPlugins, configs, hierarchy, model) + await ctx.with('build-model', {}, async (ctx) => { + await buildModel(ctx, modelResponse, allowedPlugins, configs, hierarchy, model) + }) return modelResponse } async function buildModel ( + ctx: MeasureContext, modelResponse: LoadModelResponse, allowedPlugins: Plugin[] | undefined, configs: Map, PluginConfiguration>, @@ -400,38 +440,45 @@ async function buildModel ( const userTx: Tx[] = [] const atxes = modelResponse.transactions - atxes.forEach((tx) => - ((tx.modifiedBy === core.account.ConfigUser || tx.modifiedBy === core.account.System) && !isPersonAccount(tx) - ? systemTx - : userTx - ).push(tx) - ) + + await ctx.with('split txes', {}, async () => { + atxes.forEach((tx) => + ((tx.modifiedBy === core.account.ConfigUser || tx.modifiedBy === core.account.System) && !isPersonAccount(tx) + ? systemTx + : userTx + ).push(tx) + ) + }) if (allowedPlugins != null) { - fillConfiguration(systemTx, configs) - fillConfiguration(userTx, configs) + await ctx.with('fill config system', {}, async () => { + fillConfiguration(systemTx, configs) + }) + await ctx.with('fill config user', {}, async () => { + fillConfiguration(userTx, configs) + }) const excludedPlugins = Array.from(configs.values()).filter( (it) => !it.enabled || !allowedPlugins.includes(it.pluginId) ) - systemTx = pluginFilterTx(excludedPlugins, configs, systemTx) + await ctx.with('filter txes', {}, async () => { + systemTx = pluginFilterTx(excludedPlugins, configs, systemTx) + }) } const txes = systemTx.concat(userTx) - for (const tx of txes) { - try { - hierarchy.tx(tx) - } catch (err: any) { - console.error('failed to apply model transaction, skipping', tx._id, tx._class, err?.message) + await ctx.with('build hierarchy', {}, async () => { + for (const tx of txes) { + try { + hierarchy.tx(tx) + } catch (err: any) { + console.error('failed to apply model transaction, skipping', tx._id, tx._class, err?.message) + } } - } - for (const tx of txes) { - try { - await model.tx(tx) - } catch (err: any) { - console.error('failed to apply model transaction, skipping', tx._id, tx._class, err?.message) - } - } + }) + await ctx.with('build model', {}, async (ctx) => { + model.addTxes(ctx, txes, false) + }) } function getLastTxTime (txes: Tx[]): number { @@ -468,14 +515,15 @@ function pluginFilterTx ( configs: Map, PluginConfiguration>, systemTx: Tx[] ): Tx[] { + const stx = toIdMap(systemTx) + const totalExcluded = new Set>() for (const a of excludedPlugins) { for (const c of configs.values()) { if (a.pluginId === c.pluginId) { - const excluded = new Set>() for (const id of c.transactions) { if (c.classFilter !== undefined) { const filter = new Set(c.classFilter) - const tx = systemTx.find((it) => it._id === id) + const tx = stx.get(id as Ref) if ( tx?._class === core.class.TxCreateDoc || tx?._class === core.class.TxUpdateDoc || @@ -483,18 +531,17 @@ function pluginFilterTx ( ) { const cud = tx as TxCUD if (filter.has(cud.objectClass)) { - excluded.add(id as Ref) + totalExcluded.add(id as Ref) } } } else { - excluded.add(id as Ref) + totalExcluded.add(id as Ref) } } - const exclude = systemTx.filter((t) => excluded.has(t._id)) - console.log('exclude plugin', c.pluginId, exclude.length) - systemTx = systemTx.filter((t) => !excluded.has(t._id)) + console.log('exclude plugin', c.pluginId, c.transactions.length) } } } + systemTx = systemTx.filter((t) => !totalExcluded.has(t._id)) return systemTx } diff --git a/packages/core/src/clone.ts b/packages/core/src/clone.ts new file mode 100644 index 0000000000..baa6437a0e --- /dev/null +++ b/packages/core/src/clone.ts @@ -0,0 +1,72 @@ +const se = typeof Symbol !== 'undefined' +const ste = se && typeof Symbol.toStringTag !== 'undefined' + +export function getTypeOf (obj: any): string { + const typeofObj = typeof obj + if (typeofObj !== 'object') { + return typeofObj + } + if (obj === null) { + return 'null' + } + + if (Array.isArray(obj) && (!ste || !(Symbol.toStringTag in obj))) { + return 'Array' + } + + const stringTag = ste && obj[Symbol.toStringTag] + if (typeof stringTag === 'string') { + return stringTag + } + + const objPrototype = Object.getPrototypeOf(obj) + + if (objPrototype === RegExp.prototype) { + return 'RegExp' + } + if (objPrototype === Date.prototype) { + return 'Date' + } + + if (objPrototype === null) { + return 'Object' + } + return {}.toString.call(obj).slice(8, -1) +} + +export function clone (obj: any, as?: (doc: any, m: any) => any, needAs?: (value: any) => any | undefined): any { + if (typeof obj === 'undefined') { + return undefined + } + if (typeof obj === 'function') { + return obj + } + const typeOf = getTypeOf(obj) + if (typeOf === 'Date') { + return new Date(obj.getTime()) + } else if (typeOf === 'Array' || typeOf === 'Object') { + const isArray = Array.isArray(obj) + const result: any = isArray ? [] : Object.assign({}, obj) + for (const key in obj) { + // include prototype properties + const value = obj[key] + const type = getTypeOf(value) + if (type === 'Array') { + result[key] = clone(value, as, needAs) + } else if (type === 'Object') { + const m = needAs?.(value) + const valClone = clone(value, as, needAs) + result[key] = m !== undefined && as !== undefined ? as(valClone, m) : valClone + } else if (type === 'Date') { + result[key] = new Date(value.getTime()) + } else { + if (isArray) { + result[key] = value + } + } + } + return result + } else { + return obj + } +} diff --git a/packages/core/src/hierarchy.ts b/packages/core/src/hierarchy.ts index 9496b74d37..61e4dfed60 100644 --- a/packages/core/src/hierarchy.ts +++ b/packages/core/src/hierarchy.ts @@ -16,11 +16,11 @@ import { FindOptions, Lookup, ToClassRefT, WithLookup } from '.' import type { AnyAttribute, Class, Classifier, Doc, Domain, Interface, Mixin, Obj, Ref } from './classes' import { ClassifierKind } from './classes' +import { clone as deepClone } from './clone' import core from './component' import { _createMixinProxy, _mixinClass, _toDoc } from './proxy' import type { Tx, TxCreateDoc, TxMixin, TxRemoveDoc, TxUpdateDoc } from './tx' import { TxProcessor } from './tx' -import { getTypeOf } from './typeof' /** * @public @@ -587,33 +587,11 @@ export class Hierarchy { } clone (obj: any): any { - if (typeof obj === 'undefined') { - return undefined - } - if (typeof obj === 'function') { - return obj - } - const isArray = Array.isArray(obj) - const result: any = isArray ? [] : Object.assign({}, obj) - for (const key in obj) { - // include prototype properties - const value = obj[key] - const type = getTypeOf(value) - if (type === 'Array') { - result[key] = this.clone(value) - } else if (type === 'Object') { - const m = Hierarchy.mixinClass(value) - const valClone = this.clone(value) - result[key] = m !== undefined ? this.as(valClone, m) : valClone - } else if (type === 'Date') { - result[key] = new Date(value.getTime()) - } else { - if (isArray) { - result[key] = value - } - } - } - return result + return deepClone( + obj, + (doc, m) => this.as(doc, m), + (value) => Hierarchy.mixinClass(value) + ) } domains (): Domain[] { diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 4303f8e73d..9652e82543 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -30,6 +30,6 @@ export * from './tx' export * from './utils' export * from './backup' export * from './status' -export * from './typeof' +export * from './clone' export * from './common' export * from './time' diff --git a/packages/core/src/memdb.ts b/packages/core/src/memdb.ts index 2fa1e580b9..8a102d0de4 100644 --- a/packages/core/src/memdb.ts +++ b/packages/core/src/memdb.ts @@ -14,13 +14,13 @@ // import { PlatformError, Severity, Status } from '@hcengineering/platform' -import { Lookup, ReverseLookups, getObjectValue } from '.' -import type { Class, Doc, Ref } from './classes' +import { Lookup, MeasureContext, ReverseLookups, getObjectValue } from '.' +import type { AttachedDoc, Class, Doc, Ref } from './classes' import core from './component' import { Hierarchy } from './hierarchy' import { checkMixinKey, matchQuery, resultSort } from './query' import type { DocumentQuery, FindOptions, FindResult, LookupData, Storage, TxResult, WithLookup } from './storage' -import type { Tx, TxCreateDoc, TxMixin, TxRemoveDoc, TxUpdateDoc } from './tx' +import type { Tx, TxCollectionCUD, TxCreateDoc, TxMixin, TxRemoveDoc, TxUpdateDoc } from './tx' import { TxProcessor } from './tx' import { toFindResult } from './utils' @@ -28,17 +28,17 @@ import { toFindResult } from './utils' * @public */ export abstract class MemDb extends TxProcessor implements Storage { - private readonly objectsByClass = new Map>, Doc[]>() + private readonly objectsByClass = new Map>, Map, Doc>>() private readonly objectById = new Map, Doc>() constructor (protected readonly hierarchy: Hierarchy) { super() } - private getObjectsByClass (_class: Ref>): Doc[] { + private getObjectsByClass (_class: Ref>): Map, Doc> { const result = this.objectsByClass.get(_class) if (result === undefined) { - const result: Doc[] = [] + const result = new Map, Doc>() this.objectsByClass.set(_class, result) return result } @@ -46,10 +46,9 @@ export abstract class MemDb extends TxProcessor implements Storage { } private cleanObjectByClass (_class: Ref>, _id: Ref): void { - let result = this.objectsByClass.get(_class) + const result = this.objectsByClass.get(_class) if (result !== undefined) { - result = result.filter((cl) => cl._id !== _id) - this.objectsByClass.set(_class, result) + result.delete(_id) } } @@ -152,7 +151,7 @@ export abstract class MemDb extends TxProcessor implements Storage { ) { result = this.getByIdQuery(query, baseClass) } else { - result = this.getObjectsByClass(baseClass) + result = Array.from(this.getObjectsByClass(baseClass).values()) } result = matchQuery(result, query, _class, this.hierarchy, true) @@ -195,7 +194,7 @@ export abstract class MemDb extends TxProcessor implements Storage { ) { result = this.getByIdQuery(query, baseClass) } else { - result = this.getObjectsByClass(baseClass) + result = Array.from(this.getObjectsByClass(baseClass).values()) } result = matchQuery(result, query, _class, this.hierarchy, true) @@ -214,12 +213,7 @@ export abstract class MemDb extends TxProcessor implements Storage { addDoc (doc: Doc): void { this.hierarchy.getAncestors(doc._class).forEach((_class) => { const arr = this.getObjectsByClass(_class) - const index = arr.findIndex((p) => p._id === doc._id) - if (index === -1) { - arr.push(doc) - } else { - arr[index] = doc - } + arr.set(doc._id, doc) }) this.objectById.set(doc._id, doc) } @@ -275,6 +269,74 @@ export class ModelDb extends MemDb { return {} } + addTxes (ctx: MeasureContext, txes: Tx[], clone: boolean): void { + for (const tx of txes) { + switch (tx._class) { + case core.class.TxCreateDoc: + this.addDoc(TxProcessor.createDoc2Doc(tx as TxCreateDoc, clone)) + break + case core.class.TxCollectionCUD: { + // We need update only create transactions to contain attached, attachedToClass. + const cud = tx as TxCollectionCUD> + if (cud.tx._class === core.class.TxCreateDoc) { + const createTx = cud.tx as TxCreateDoc + const d: TxCreateDoc = { + ...createTx, + attributes: { + ...createTx.attributes, + attachedTo: cud.objectId, + attachedToClass: cud.objectClass, + collection: cud.collection + } + } + this.addDoc(TxProcessor.createDoc2Doc(d as TxCreateDoc, clone)) + } + this.addTxes(ctx, [cud.tx], clone) + break + } + case core.class.TxUpdateDoc: { + const cud = tx as TxUpdateDoc + const doc = this.findObject(cud.objectId) + if (doc !== undefined) { + TxProcessor.updateDoc2Doc(doc, cud) + } else { + void ctx.error('no document found, failed to apply model transaction, skipping', { + _id: tx._id, + _class: tx._class, + objectId: cud.objectId + }) + } + break + } + case core.class.TxRemoveDoc: + try { + this.delDoc((tx as TxRemoveDoc).objectId) + } catch (err: any) { + void ctx.error('no document found, failed to apply model transaction, skipping', { + _id: tx._id, + _class: tx._class, + objectId: (tx as TxRemoveDoc).objectId + }) + } + break + case core.class.TxMixin: { + const mix = tx as TxMixin + const obj = this.findObject(mix.objectId) + if (obj !== undefined) { + TxProcessor.updateMixin4Doc(obj, mix) + } else { + void ctx.error('no document found, failed to apply model transaction, skipping', { + _id: tx._id, + _class: tx._class, + objectId: mix.objectId + }) + } + break + } + } + } + } + protected async txUpdateDoc (tx: TxUpdateDoc): Promise { const doc = this.getObject(tx.objectId) as any TxProcessor.updateDoc2Doc(doc, tx) diff --git a/packages/core/src/objvalue.ts b/packages/core/src/objvalue.ts index 8b9733bdfc..64c979cae0 100644 --- a/packages/core/src/objvalue.ts +++ b/packages/core/src/objvalue.ts @@ -1,7 +1,7 @@ import { PlatformError, Severity, Status } from '@hcengineering/platform' import { Doc } from './classes' +import { clone } from './clone' import core from './component' -import justClone from 'just-clone' /** * @public @@ -60,7 +60,7 @@ export function setObjectValue (key: string, doc: Doc, newValue: any): void { value = lvalue } } - value[last] = justClone(newValue) + value[last] = clone(newValue) return value } diff --git a/packages/core/src/tx.ts b/packages/core/src/tx.ts index f2f1ad17b2..a4bf47aaac 100644 --- a/packages/core/src/tx.ts +++ b/packages/core/src/tx.ts @@ -13,7 +13,6 @@ // limitations under the License. // -import justClone from 'just-clone' import type { KeysByType } from 'simplytyped' import type { Account, @@ -35,6 +34,7 @@ import { _getOperator } from './operator' import { _toDoc } from './proxy' import type { DocumentQuery, TxResult } from './storage' import { generateId } from './utils' +import { clone } from './clone' /** * @public @@ -357,10 +357,10 @@ export abstract class TxProcessor implements WithTx { return result } - static createDoc2Doc(tx: TxCreateDoc): T { + static createDoc2Doc(tx: TxCreateDoc, doClone = true): T { // eslint-disable-next-line @typescript-eslint/consistent-type-assertions return { - ...justClone(tx.attributes), + ...(doClone ? clone(tx.attributes) : tx.attributes), _id: tx.objectId, _class: tx.objectClass, space: tx.objectSpace, diff --git a/packages/core/src/typeof.ts b/packages/core/src/typeof.ts deleted file mode 100644 index 1e175eccf9..0000000000 --- a/packages/core/src/typeof.ts +++ /dev/null @@ -1,35 +0,0 @@ -const se = typeof Symbol !== 'undefined' -const ste = se && typeof Symbol.toStringTag !== 'undefined' - -export function getTypeOf (obj: any): string { - const typeofObj = typeof obj - if (typeofObj !== 'object') { - return typeofObj - } - if (obj === null) { - return 'null' - } - - if (Array.isArray(obj) && (!ste || !(Symbol.toStringTag in obj))) { - return 'Array' - } - - const stringTag = ste && obj[Symbol.toStringTag] - if (typeof stringTag === 'string') { - return stringTag - } - - const objPrototype = Object.getPrototypeOf(obj) - - if (objPrototype === RegExp.prototype) { - return 'RegExp' - } - if (objPrototype === Date.prototype) { - return 'Date' - } - - if (objPrototype === null) { - return 'Object' - } - return {}.toString.call(obj).slice(8, -1) -} diff --git a/packages/ui/package.json b/packages/ui/package.json index 9033e94ad6..ebceba2253 100644 --- a/packages/ui/package.json +++ b/packages/ui/package.json @@ -41,7 +41,6 @@ "@hcengineering/platform": "^0.6.9", "@hcengineering/theme": "^0.6.3", "@hcengineering/core": "^0.6.28", - "just-clone": "~6.2.0", "svelte": "^4.2.12", "fast-equals": "^2.0.3", "autolinker": "4.0.0", diff --git a/packages/ui/src/location.ts b/packages/ui/src/location.ts index 43e472af39..85d0a2d50a 100644 --- a/packages/ui/src/location.ts +++ b/packages/ui/src/location.ts @@ -13,7 +13,7 @@ // limitations under the License. // -import justClone from 'just-clone' +import { clone } from '@hcengineering/core' import { derived, get, writable } from 'svelte/store' import { closePopup } from './popups' import { type Location as PlatformLocation } from './types' @@ -105,7 +105,7 @@ export function getRawCurrentLocation (): PlatformLocation { } export function getCurrentResolvedLocation (): PlatformLocation { - return justClone(resolvedLocation) + return clone(resolvedLocation) } declare global { @@ -130,7 +130,7 @@ if (!embeddedPlatform) { }) } -export const location = derived(locationWritable, (loc) => justClone(loc)) +export const location = derived(locationWritable, (loc) => clone(loc)) /** * Unlike {@link location}, exposes raw browser location as seen in URL @@ -143,7 +143,7 @@ export const workspaceId = derived(location, (loc) => loc.path[1]) * @public */ export function getLocation (): PlatformLocation { - return justClone(get(location)) + return clone(get(location)) } export const resolvedLocationStore = writable(getRawCurrentLocation()) @@ -151,12 +151,12 @@ let resolvedLocation = getRawCurrentLocation() export function setResolvedLocation (location: PlatformLocation): void { resolvedLocation = location - resolvedLocationStore.set(justClone(location)) + resolvedLocationStore.set(clone(location)) } export function getCurrentLocation (): PlatformLocation { if (embeddedPlatform) { - return justClone(get(locationWritable)) + return clone(get(locationWritable)) } return getRawCurrentLocation() } diff --git a/plugins/client-resources/src/connection.ts b/plugins/client-resources/src/connection.ts index 3d841613d8..be302ee23a 100644 --- a/plugins/client-resources/src/connection.ts +++ b/plugins/client-resources/src/connection.ts @@ -128,7 +128,7 @@ class Connection implements ClientConnection { } } - delay = 1 + delay = 0 pending: Promise | undefined private async waitOpenConnection (): Promise { @@ -140,7 +140,7 @@ class Connection implements ClientConnection { } this.pending = this.openConnection() await this.pending - this.delay = 5 + this.delay = 0 return await this.pending } catch (err: any) { this.pending = undefined @@ -154,7 +154,7 @@ class Connection implements ClientConnection { setTimeout(() => { console.log(`delay ${this.delay} second`) resolve(null) - if (this.delay !== 15) { + if (this.delay < 5) { this.delay++ } }, this.delay * SECOND) diff --git a/plugins/client-resources/src/index.ts b/plugins/client-resources/src/index.ts index 4a3eb00783..ac0ed2a129 100644 --- a/plugins/client-resources/src/index.ts +++ b/plugins/client-resources/src/index.ts @@ -18,6 +18,7 @@ import core, { AccountClient, ClientConnectEvent, LoadModelResponse, + MeasureContext, Tx, TxHandler, TxPersistenceStore, @@ -37,6 +38,28 @@ import { connect } from './connection' export { connect } +let dbRequest: IDBOpenDBRequest | undefined +let dbPromise: Promise = Promise.resolve(undefined) + +if (typeof localStorage !== 'undefined') { + const st = Date.now() + dbPromise = new Promise((resolve) => { + dbRequest = indexedDB.open('model.db.persistence', 2) + + dbRequest.onupgradeneeded = function () { + const db = (dbRequest as IDBOpenDBRequest).result + if (!db.objectStoreNames.contains('model')) { + db.createObjectStore('model', { keyPath: 'id' }) + } + } + dbRequest.onsuccess = function () { + const db = (dbRequest as IDBOpenDBRequest).result + console.log('init DB complete', Date.now() - st) + resolve(db) + } + }) +} + // eslint-disable-next-line @typescript-eslint/explicit-function-return-type export default async () => { return { @@ -46,7 +69,8 @@ export default async () => { endpoint: string, onUpgrade?: () => void, onUnauthorized?: () => void, - onConnect?: (event: ClientConnectEvent) => void + onConnect?: (event: ClientConnectEvent) => void, + ctx?: MeasureContext ): Promise => { const filterModel = getMetadata(clientPlugin.metadata.FilterModel) ?? false @@ -75,7 +99,8 @@ export default async () => { return connect(url.href, upgradeHandler, onUpgrade, onUnauthorized, onConnect) }, filterModel ? [...getPlugins(), ...(getMetadata(clientPlugin.metadata.ExtraPlugins) ?? [])] : undefined, - createModelPersistence(getWSFromToken(token)) + createModelPersistence(getWSFromToken(token)), + ctx ) // Check if we had dev hook for client. client = hookClient(client) @@ -90,25 +115,6 @@ function createModelPersistence (workspace: string): TxPersistenceStore | undefi return overrideStore } - let dbRequest: IDBOpenDBRequest | undefined - let dbPromise: Promise = Promise.resolve(undefined) - - if (typeof localStorage !== 'undefined') { - dbPromise = new Promise((resolve) => { - dbRequest = indexedDB.open('model.db.persistence', 2) - - dbRequest.onupgradeneeded = function () { - const db = (dbRequest as IDBOpenDBRequest).result - if (!db.objectStoreNames.contains('model')) { - db.createObjectStore('model', { keyPath: 'id' }) - } - } - dbRequest.onsuccess = function () { - const db = (dbRequest as IDBOpenDBRequest).result - resolve(db) - } - }) - } return { load: async () => { const db = await dbPromise diff --git a/plugins/client/src/index.ts b/plugins/client/src/index.ts index a2b1621c9c..938d0b86a9 100644 --- a/plugins/client/src/index.ts +++ b/plugins/client/src/index.ts @@ -13,7 +13,7 @@ // limitations under the License. // -import type { AccountClient, ClientConnectEvent, TxPersistenceStore } from '@hcengineering/core' +import type { AccountClient, ClientConnectEvent, MeasureContext, TxPersistenceStore } from '@hcengineering/core' import type { Plugin, Resource } from '@hcengineering/platform' import { Metadata, plugin } from '@hcengineering/platform' @@ -66,7 +66,8 @@ export type ClientFactory = ( endpoint: string, onUpgrade?: () => void, onUnauthorized?: () => void, - onConnect?: (event: ClientConnectEvent) => void + onConnect?: (event: ClientConnectEvent) => void, + ctx?: MeasureContext ) => Promise export default plugin(clientId, { diff --git a/plugins/workbench-resources/src/connect.ts b/plugins/workbench-resources/src/connect.ts index 655e48cb01..276a466190 100644 --- a/plugins/workbench-resources/src/connect.ts +++ b/plugins/workbench-resources/src/connect.ts @@ -3,6 +3,8 @@ import client from '@hcengineering/client' import core, { ClientConnectEvent, getCurrentAccount, + MeasureMetricsContext, + metricsToString, setCurrentAccount, versionToString, type AccountClient, @@ -41,6 +43,7 @@ export async function disconnect (): Promise { } export async function connect (title: string): Promise { + const ctx = new MeasureMetricsContext('connect', {}) const loc = getCurrentLocation() const ws = loc.path[1] if (ws === undefined) { @@ -62,7 +65,7 @@ export async function connect (title: string): Promise { let token = tokens[ws] if (token === undefined && getMetadata(presentation.metadata.Token) !== undefined) { const selectWorkspace = await getResource(login.function.SelectWorkspace) - const loginInfo = (await selectWorkspace(ws))[1] + const loginInfo = await ctx.with('select-workspace', {}, async () => (await selectWorkspace(ws))[1]) if (loginInfo !== undefined) { tokens[ws] = loginInfo.token token = loginInfo.token @@ -88,8 +91,12 @@ export async function connect (title: string): Promise { if (_token !== token && _client !== undefined) { // We need to flush all data from memory - await purgeClient() - await _client.close() + await ctx.with('purge-client', {}, async () => { + await purgeClient() + }) + await ctx.with('close previous client', {}, async () => { + await _client?.close() + }) _client = undefined tokenChanged = true } @@ -104,63 +111,79 @@ export async function connect (title: string): Promise { serverEndpoint = serverEndpoint.substring(0, serverEndpoint.length - 1) } const clientFactory = await getResource(client.function.GetClient) - _client = await clientFactory( - token, - endpoint, - () => { - location.reload() - }, - () => { - clearMetadata(ws) - navigate({ - path: [loginId], - query: {} - }) - }, - // We need to refresh all active live queries and clear old queries. - (event: ClientConnectEvent) => { - console.log('WorkbenchClient: onConnect', event) - try { - if ((_clientSet && event === ClientConnectEvent.Connected) || event === ClientConnectEvent.Refresh) { - void refreshClient(tokenChanged) - tokenChanged = false - } - - if (event === ClientConnectEvent.Upgraded) { - window.location.reload() - } - - void (async () => { - if (_client !== undefined) { - const newVersion = await _client.findOne(core.class.Version, {}) - console.log('Reconnect Model version', newVersion) - - const currentVersionStr = versionToString(version as Version) - const reconnectVersionStr = versionToString(newVersion as Version) - - if (currentVersionStr !== reconnectVersionStr) { - // It seems upgrade happened - // location.reload() - versionError = `${currentVersionStr} != ${reconnectVersionStr}` + const newClient = await ctx.with( + 'create-client', + {}, + async (ctx) => + await clientFactory( + token, + endpoint, + () => { + location.reload() + }, + () => { + clearMetadata(ws) + navigate({ + path: [loginId], + query: {} + }) + }, + // We need to refresh all active live queries and clear old queries. + (event: ClientConnectEvent) => { + console.log('WorkbenchClient: onConnect', event) + try { + if ((_clientSet && event === ClientConnectEvent.Connected) || event === ClientConnectEvent.Refresh) { + void ctx.with('refresh client', {}, async () => { + await refreshClient(tokenChanged) + }) + tokenChanged = false } - const serverVersion: { version: string } = await ( - await fetch(serverEndpoint + '/api/v1/version', {}) - ).json() - console.log('Server version', serverVersion.version) - if (serverVersion.version !== '' && serverVersion.version !== currentVersionStr) { - versionError = `${currentVersionStr} => ${serverVersion.version}` + if (event === ClientConnectEvent.Upgraded) { + window.location.reload() } + + void (async () => { + if (_client !== undefined) { + const newVersion = await ctx.with( + 'find-version', + {}, + async () => await newClient.findOne(core.class.Version, {}) + ) + console.log('Reconnect Model version', newVersion) + + const currentVersionStr = versionToString(version as Version) + const reconnectVersionStr = versionToString(newVersion as Version) + + if (currentVersionStr !== reconnectVersionStr) { + // It seems upgrade happened + // location.reload() + versionError = `${currentVersionStr} != ${reconnectVersionStr}` + } + const serverVersion: { version: string } = await ctx.with( + 'fetch-server-version', + {}, + async () => await (await fetch(serverEndpoint + '/api/v1/version', {})).json() + ) + + console.log('Server version', serverVersion.version) + if (serverVersion.version !== '' && serverVersion.version !== currentVersionStr) { + versionError = `${currentVersionStr} => ${serverVersion.version}` + } + } + })() + } catch (err) { + console.error(err) } - })() - } catch (err) { - console.error(err) - } - } + }, + ctx + ) ) + + _client = newClient console.log('logging in as', email) - const me = await _client?.getAccount() + const me = await ctx.with('get-account', {}, async () => await newClient.getAccount()) if (me !== undefined) { Analytics.setUser(me.email) Analytics.setTag('workspace', ws) @@ -176,11 +199,18 @@ export async function connect (title: string): Promise { // Update on connect, so it will be triggered _clientSet = true - await setClient(_client) + const client = _client + await ctx.with('set-client', {}, async () => { + await setClient(client) + }) return } try { - version = await _client.findOne(core.class.Version, {}) + version = await ctx.with( + 'find-model-version', + {}, + async () => await newClient.findOne(core.class.Version, {}) + ) console.log('Model version', version) const requiredVersion = getMetadata(presentation.metadata.RequiredVersion) @@ -195,7 +225,11 @@ export async function connect (title: string): Promise { } try { - const serverVersion: { version: string } = await (await fetch(serverEndpoint + '/api/v1/version', {})).json() + const serverVersion: { version: string } = await ctx.with( + 'find-server-version', + {}, + async () => await (await fetch(serverEndpoint + '/api/v1/version', {})).json() + ) console.log('Server version', serverVersion.version) if ( @@ -223,10 +257,14 @@ export async function connect (title: string): Promise { // Update window title document.title = [ws, title].filter((it) => it).join(' - ') _clientSet = true - await setClient(_client) - await broadcastEvent(plugin.event.NotifyConnection, getCurrentAccount()) - - return _client + await ctx.with('set-client', {}, async () => { + await setClient(newClient) + }) + await ctx.with('broadcast-connected', {}, async () => { + await broadcastEvent(plugin.event.NotifyConnection, getCurrentAccount()) + }) + console.log(metricsToString(ctx.metrics, 'connect', 50)) + return newClient } function clearMetadata (ws: string): void { diff --git a/server/core/src/adapter.ts b/server/core/src/adapter.ts index f731bdb049..f55bb2368c 100644 --- a/server/core/src/adapter.ts +++ b/server/core/src/adapter.ts @@ -50,11 +50,6 @@ export interface RawDBAdapter { * @public */ export interface DbAdapter { - /** - * Method called after hierarchy is ready to use. - */ - init: (model: Tx[]) => Promise - createIndexes: (domain: Domain, config: Pick, 'indexes'>) => Promise removeOldIndex: (domain: Domain, deletePattern: RegExp, keepPattern: RegExp) => Promise @@ -83,7 +78,7 @@ export interface DbAdapter { * @public */ export interface TxAdapter extends DbAdapter { - getModel: () => Promise + getModel: (ctx: MeasureContext) => Promise } /** diff --git a/server/core/src/fulltext.ts b/server/core/src/fulltext.ts index 20ce7815e9..bf441dc50c 100644 --- a/server/core/src/fulltext.ts +++ b/server/core/src/fulltext.ts @@ -63,7 +63,7 @@ export class FullTextIndex implements WithFind { readonly indexer: FullTextIndexPipeline, private readonly upgrade: boolean ) { - if (!upgrade) { + if (!this.upgrade) { // Schedule indexing after consistency check void this.indexer.startIndexing() } diff --git a/server/core/src/indexer/indexer.ts b/server/core/src/indexer/indexer.ts index 316105411e..df9464087c 100644 --- a/server/core/src/indexer/indexer.ts +++ b/server/core/src/indexer/indexer.ts @@ -91,13 +91,12 @@ export class FullTextIndexPipeline implements FullTextPipeline { } async cancel (): Promise { - console.log(this.workspace.name, 'Cancel indexing', this.indexId) this.cancelling = true clearTimeout(this.skippedReiterationTimeout) this.triggerIndexing() await this.indexing await this.flush(true) - console.log(this.workspace.name, 'Indexing canceled', this.indexId) + await this.metrics.info('Cancel indexing', { workspace: this.workspace.name, indexId: this.indexId }) } async markRemove (doc: DocIndexState): Promise { @@ -336,7 +335,10 @@ export class FullTextIndexPipeline implements FullTextPipeline { try { this.hierarchy.getClass(core.class.DocIndexState) } catch (err: any) { - console.log(this.workspace.name, 'Models is not upgraded to support indexer', this.indexId) + await this.metrics.info('Models is not upgraded to support indexer', { + indexId: this.indexId, + workspace: this.workspace.name + }) return } await this.metrics.with('init-states', {}, async () => { @@ -367,12 +369,12 @@ export class FullTextIndexPipeline implements FullTextPipeline { _classes.forEach((it) => this.broadcastClasses.add(it)) if (this.triggerCounts > 0) { - console.log('No wait, trigger counts', this.triggerCounts) + await this.metrics.info('No wait, trigger counts', { triggerCount: this.triggerCounts }) } if (this.toIndex.size === 0 && this.stageChanged === 0 && this.triggerCounts === 0) { if (this.toIndex.size === 0) { - console.log(this.workspace.name, 'Indexing complete', this.indexId) + await this.metrics.info('Indexing complete', { indexId: this.indexId, workspace: this.workspace.name }) } if (!this.cancelling) { // We need to send index update event @@ -398,7 +400,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { } } } - console.log(this.workspace.name, 'Exit indexer', this.indexId) + await this.metrics.info('Exit indexer', { indexId: this.indexId, workspace: this.workspace.name }) } private async processIndex (ctx: MeasureContext): Promise>[]> { @@ -470,13 +472,12 @@ export class FullTextIndexPipeline implements FullTextPipeline { } if (result.length > 0) { - console.log( - this.workspace.name, - `Full text: Indexing ${this.indexId} ${st.stageId}`, - Object.entries(this.currentStages) - .map((it) => `${it[0]}:${it[1]}`) - .join(' ') - ) + await this.metrics.info('Full text: Indexing', { + indexId: this.indexId, + stageId: st.stageId, + workspace: this.workspace.name, + ...this.currentStages + }) } else { // Nothing to index, check on next cycle. break @@ -528,7 +529,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { } } } catch (err: any) { - console.error(err) + await this.metrics.error('error during index', { error: err }) } } }) diff --git a/server/core/src/mem.ts b/server/core/src/mem.ts index 892b2e62b3..f443953c40 100644 --- a/server/core/src/mem.ts +++ b/server/core/src/mem.ts @@ -38,7 +38,6 @@ import { type DbAdapter } from './adapter' * @public */ export class DummyDbAdapter implements DbAdapter { - async init (model: Tx[]): Promise {} async findAll( ctx: MeasureContext, _class: Ref>, @@ -99,16 +98,6 @@ class InMemoryAdapter extends DummyDbAdapter implements DbAdapter { async tx (ctx: MeasureContext, ...tx: Tx[]): Promise { return await this.modeldb.tx(...tx) } - - async init (model: Tx[]): Promise { - for (const tx of model) { - try { - await this.modeldb.tx(tx) - } catch (err: any) { - console.error('skip broken TX', err) - } - } - } } /** diff --git a/server/core/src/pipeline.ts b/server/core/src/pipeline.ts index 3e656975b9..cafb9986e2 100644 --- a/server/core/src/pipeline.ts +++ b/server/core/src/pipeline.ts @@ -31,8 +31,8 @@ import { type Tx, type TxResult } from '@hcengineering/core' -import { createServerStorage } from './server' import { type DbConfiguration } from './configuration' +import { createServerStorage } from './server' import { type BroadcastFunc, type HandledBroadcastFunc, @@ -67,12 +67,12 @@ export async function createPipeline ( } }) ) - const pipeline = ctx.with( - 'create pipeline', - {}, - async (ctx) => await PipelineImpl.create(ctx, storage, constructors, broadcast) + const pipelineResult = await PipelineImpl.create( + ctx.newChild('pipeline-operations', {}), + storage, + constructors, + broadcast ) - const pipelineResult = await pipeline broadcastHook = (tx, targets) => { return pipelineResult.handleBroadcast(tx, targets) } diff --git a/server/core/src/server/index.ts b/server/core/src/server/index.ts index 3dcf475df7..ea15b7a9d0 100644 --- a/server/core/src/server/index.ts +++ b/server/core/src/server/index.ts @@ -56,18 +56,20 @@ export async function createServerStorage ( const storageAdapter = conf.storageFactory?.() - for (const key in conf.adapters) { - const adapterConf = conf.adapters[key] - adapters.set( - key, - await adapterConf.factory(ctx, hierarchy, adapterConf.url, conf.workspace, modelDb, storageAdapter) - ) - } + await ctx.with('create-adapters', {}, async (ctx) => { + for (const key in conf.adapters) { + const adapterConf = conf.adapters[key] + adapters.set( + key, + await adapterConf.factory(ctx, hierarchy, adapterConf.url, conf.workspace, modelDb, storageAdapter) + ) + } + }) const txAdapter = adapters.get(conf.domains[DOMAIN_TX]) as TxAdapter const model = await ctx.with('get model', {}, async (ctx) => { - const model = await txAdapter.getModel() + const model = await ctx.with('fetch-model', {}, async (ctx) => await txAdapter.getModel(ctx)) for (const tx of model) { try { hierarchy.tx(tx) @@ -76,22 +78,10 @@ export async function createServerStorage ( console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err) } } - for (const tx of model) { - try { - await modelDb.tx(tx) - } catch (err: any) { - console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err) - } - } + modelDb.addTxes(ctx, model, false) return model }) - for (const [adn, adapter] of adapters) { - await ctx.with('init-adapter', { name: adn }, async (ctx) => { - await adapter.init(model) - }) - } - const fulltextAdapter = await ctx.with( 'create full text adapter', {}, diff --git a/server/core/src/server/storage.ts b/server/core/src/server/storage.ts index 2808f42951..4e1b9e2b0f 100644 --- a/server/core/src/server/storage.ts +++ b/server/core/src/server/storage.ts @@ -60,10 +60,10 @@ import crypto from 'node:crypto' import { type DbAdapter } from '../adapter' import { type FullTextIndex } from '../fulltext' import serverCore from '../plugin' +import { type ServiceAdaptersManager } from '../service' +import { type StorageAdapter } from '../storage' import { type Triggers } from '../triggers' import type { FullTextAdapter, ObjectDDParticipant, ServerStorageOptions, TriggerControl } from '../types' -import { type StorageAdapter } from '../storage' -import { type ServiceAdaptersManager } from '../service' export class TServerStorage implements ServerStorage { private readonly fulltext: FullTextIndex @@ -137,15 +137,11 @@ export class TServerStorage implements ServerStorage { } async close (): Promise { - console.timeLog(this.workspace.name, 'closing') await this.fulltext.close() - console.timeLog(this.workspace.name, 'closing adapters') for (const o of this.adapters.values()) { await o.close() } - console.timeLog(this.workspace.name, 'closing fulltext') await this.fulltextAdapter.close() - console.timeLog(this.workspace.name, 'closing service adapters') await this.serviceAdaptersManager.close() } @@ -199,7 +195,7 @@ export class TServerStorage implements ServerStorage { const txCUD = TxProcessor.extractTx(tx) as TxCUD if (!this.hierarchy.isDerived(txCUD._class, core.class.TxCUD)) { // Skip unsupported tx - console.error('Unsupported transaction', tx) + await ctx.error('Unsupported transaction', tx) continue } const domain = this.hierarchy.getDomain(txCUD.objectClass) @@ -397,7 +393,7 @@ export class TServerStorage implements ServerStorage { { clazz, query, options } ) if (Date.now() - st > 1000) { - console.error('FindAll', Date.now() - st, clazz, query, options) + await ctx.error('FindAll', { time: Date.now() - st, clazz, query, options }) } return result } @@ -794,7 +790,7 @@ export class TServerStorage implements ServerStorage { await fx() } } catch (err: any) { - console.log(err) + await ctx.error('error process tx', { error: err }) throw err } finally { onEnds.forEach((p) => { diff --git a/server/elastic/src/backup.ts b/server/elastic/src/backup.ts index 2d24b0ebfb..c3e4fb547b 100644 --- a/server/elastic/src/backup.ts +++ b/server/elastic/src/backup.ts @@ -59,8 +59,6 @@ class ElasticDataAdapter implements DbAdapter { return [] } - async init (model: Tx[]): Promise {} - async createIndexes (domain: Domain, config: Pick, 'indexes'>): Promise {} async removeOldIndex (domain: Domain, deletePattern: RegExp, keepPattern: RegExp): Promise {} diff --git a/server/front/src/index.ts b/server/front/src/index.ts index ddc59fbaac..e8aa9a0bbe 100644 --- a/server/front/src/index.ts +++ b/server/front/src/index.ts @@ -292,7 +292,9 @@ export function start ( const token = req.query.token as string const payload = decodeToken(token) const admin = payload.extra?.admin === 'true' - res.writeHead(200, { 'Content-Type': 'application/json' }) + res.status(200) + res.setHeader('Content-Type', 'application/json') + res.setHeader('Cache-Control', cacheControlNoCache) const json = JSON.stringify({ metrics: metricsAggregate((ctx as any).metrics), @@ -301,7 +303,6 @@ export function start ( }, admin }) - res.set('Cache-Control', 'private, no-cache') res.end(json) } catch (err) { console.error(err) diff --git a/server/middleware/src/spaceSecurity.ts b/server/middleware/src/spaceSecurity.ts index 9acd4ecbc6..7a57e4fb65 100644 --- a/server/middleware/src/spaceSecurity.ts +++ b/server/middleware/src/spaceSecurity.ts @@ -62,6 +62,8 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar private spaceMeasureCtx!: MeasureContext + private spaceSecurityInit: Promise | undefined + private readonly systemSpaces = [ core.space.Configuration, core.space.DerivedTx, @@ -86,7 +88,7 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar ): Promise { const res = new SpaceSecurityMiddleware(broadcast, storage, next) res.spaceMeasureCtx = ctx.newChild('space chain', {}) - await res.init(res.spaceMeasureCtx) + res.spaceSecurityInit = res.init(res.spaceMeasureCtx) return res } @@ -124,6 +126,13 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar this.publicSpaces = spaces.filter((it) => !it.private).map((p) => p._id) } + async waitInit (): Promise { + if (this.spaceSecurityInit !== undefined) { + await this.spaceSecurityInit + this.spaceSecurityInit = undefined + } + } + private removeMemberSpace (member: Ref, space: Ref): void { const arr = this.allowedSpaces[member] if (arr !== undefined) { @@ -240,6 +249,8 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar } private async handleUpdate (ctx: SessionContext, tx: TxCUD): Promise { + await this.waitInit() + const updateDoc = tx as TxUpdateDoc if (!this.storage.hierarchy.isDerived(updateDoc.objectClass, core.class.Space)) return @@ -285,6 +296,7 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar } private async handleTx (ctx: SessionContext, tx: TxCUD): Promise { + await this.waitInit() if (tx._class === core.class.TxCreateDoc) { this.handleCreate(tx) } else if (tx._class === core.class.TxUpdateDoc) { @@ -370,6 +382,7 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar } async tx (ctx: SessionContext, tx: Tx): Promise { + await this.waitInit() const account = await getUser(this.storage, ctx) if (account.role === AccountRole.Guest) { throw new PlatformError(new Status(Severity.ERROR, platform.status.Forbidden, {})) @@ -385,6 +398,7 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar handleBroadcast (tx: Tx[], targets?: string[]): Tx[] { const process = async (): Promise => { + await this.waitInit() for (const t of tx) { if (this.storage.hierarchy.isDerived(t._class, core.class.TxCUD)) { await this.processTxSpaceDomain(t as TxCUD) @@ -476,6 +490,8 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar query: DocumentQuery, options?: FindOptions ): Promise> { + await this.waitInit() + const domain = this.storage.hierarchy.getDomain(_class) const newQuery = query const account = await getUser(this.storage, ctx) @@ -509,6 +525,7 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar query: SearchQuery, options: SearchOptions ): Promise { + await this.waitInit() const newQuery = { ...query } const account = await getUser(this.storage, ctx) if (!isSystem(account)) { diff --git a/server/mongo/src/storage.ts b/server/mongo/src/storage.ts index 5082abde6f..7742a4488d 100644 --- a/server/mongo/src/storage.ts +++ b/server/mongo/src/storage.ts @@ -1264,12 +1264,21 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter { return this.txColl } - async getModel (): Promise { - const cursor = this.db - .collection(DOMAIN_TX) - .find({ objectSpace: core.space.Model }) - .sort({ _id: 1, modifiedOn: 1 }) - const model = await toArray(cursor) + async getModel (ctx: MeasureContext): Promise { + const modelProjection = { + '%hash%': 0, + objectSpace: 0, + createdBy: 0, + space: 0 + } + const cursor = await ctx.with('find', {}, async () => + this.db + .collection(DOMAIN_TX) + .find({ objectSpace: core.space.Model }) + .sort({ _id: 1, modifiedOn: 1 }) + .project(modelProjection) + ) + const model = await ctx.with('to-array', {}, async () => await toArray(cursor)) // We need to put all core.account.System transactions first const systemTx: Tx[] = [] const userTx: Tx[] = [] @@ -1284,7 +1293,6 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter { (tx as TxCUD).objectClass === 'contact:class:EmployeeAccount') ) } - model.forEach((tx) => (tx.modifiedBy === core.account.System && !isPersonAccount(tx) ? systemTx : userTx).push(tx)) return systemTx.concat(userTx) } diff --git a/server/server/src/metrics.ts b/server/server/src/metrics.ts index 6785e9dfc2..0225f406b8 100644 --- a/server/server/src/metrics.ts +++ b/server/server/src/metrics.ts @@ -17,7 +17,7 @@ let metricsContext: MeasureContext | undefined /** * @public */ -export function getMetricsContext (): MeasureContext { +export function getMetricsContext (factory?: () => MeasureMetricsContext): MeasureContext { if (metricsContext !== undefined) { return metricsContext } @@ -25,7 +25,11 @@ export function getMetricsContext (): MeasureContext { console.info('please provide apm server url for monitoring') const metrics = newMetrics() - metricsContext = new MeasureMetricsContext('System', {}, {}, metrics) + if (factory !== undefined) { + metricsContext = factory() + } else { + metricsContext = new MeasureMetricsContext('System', {}, {}, metrics) + } if (metricsFile !== undefined || metricsConsole) { console.info('storing measurements into local file', metricsFile) diff --git a/server/server/src/minio.ts b/server/server/src/minio.ts index d2a5d018ad..79df7fb6fb 100644 --- a/server/server/src/minio.ts +++ b/server/server/src/minio.ts @@ -55,8 +55,6 @@ class StorageBlobAdapter implements DbAdapter { return [] } - async init (model: Tx[]): Promise {} - async createIndexes (domain: Domain, config: Pick, 'indexes'>): Promise {} async removeOldIndex (domain: Domain, deletePattern: RegExp, keepPattern: RegExp): Promise {} diff --git a/server/tool/src/index.ts b/server/tool/src/index.ts index c7e59b88e7..9cf60378f2 100644 --- a/server/tool/src/index.ts +++ b/server/tool/src/index.ts @@ -268,7 +268,7 @@ async function fetchModelFromMongo ( const txAdapter = await createMongoTxAdapter(ctx, hierarchy, mongodbUri, workspaceId, modelDb) - const model = await ctx.with('get-model', {}, async () => await txAdapter.getModel()) + const model = await ctx.with('get-model', {}, async (ctx) => await txAdapter.getModel(ctx)) await ctx.with('build local model', {}, async () => { for (const tx of model) { diff --git a/server/ws/src/__tests__/server.test.ts b/server/ws/src/__tests__/server.test.ts index c596e13e91..1721c38608 100644 --- a/server/ws/src/__tests__/server.test.ts +++ b/server/ws/src/__tests__/server.test.ts @@ -42,12 +42,9 @@ import { import { type SessionContext } from '@hcengineering/server-core' import { ClientSession } from '../client' import { startHttpServer } from '../server_http' -import { disableLogging } from '../types' import { genMinModel } from './minmodel' describe('server', () => { - disableLogging() - async function getModelDb (): Promise { const txes = genMinModel() const hierarchy = new Hierarchy() diff --git a/server/ws/src/client.ts b/server/ws/src/client.ts index 82b9146c90..07e8f00541 100644 --- a/server/ws/src/client.ts +++ b/server/ws/src/client.ts @@ -48,6 +48,7 @@ import { type BroadcastCall, type Session, type SessionRequest, type StatisticsE * @public */ export class ClientSession implements Session { + createTime = Date.now() requests = new Map() binaryResponseMode: boolean = false useCompression: boolean = true @@ -81,7 +82,7 @@ export class ClientSession implements Session { } async loadModel (ctx: MeasureContext, lastModelTx: Timestamp, hash?: string): Promise { - return await this._pipeline.storage.loadModel(lastModelTx, hash) + return await ctx.with('load-model', {}, async () => await this._pipeline.storage.loadModel(lastModelTx, hash)) } async getAccount (ctx: MeasureContext): Promise { diff --git a/server/ws/src/server.ts b/server/ws/src/server.ts index 0959ac76b1..8f42bf43ae 100644 --- a/server/ws/src/server.ts +++ b/server/ws/src/server.ts @@ -145,7 +145,7 @@ class TSessionManager implements SessionManager { const now = Date.now() const diff = now - s[1].session.lastRequest if (diff > 60000 && this.ticks % 10 === 0) { - console.log('session hang, closing...', h[0], s[1].session.getUser()) + void this.ctx.error('session hang, closing...', { sessionId: h[0], user: s[1].session.getUser() }) void this.close(s[1].socket, h[1].workspaceId, 1001, 'CLIENT_HANGOUT') continue } @@ -160,7 +160,11 @@ class TSessionManager implements SessionManager { for (const r of s[1].session.requests.values()) { if (now - r.start > 30000) { - console.log(h[0], 'request hang found, 30sec', h[0], s[1].session.getUser(), r.params) + void this.ctx.info('request hang found, 30sec', { + sessionId: h[0], + user: s[1].session.getUser(), + ...r.params + }) } } } @@ -212,8 +216,9 @@ class TSessionManager implements SessionManager { return await baseCtx.with('📲 add-session', {}, async (ctx) => { const wsString = toWorkspaceString(token.workspace, '@') - let workspaceInfo = + let workspaceInfo = await ctx.with('check-token', {}, async (ctx) => accountsUrl !== '' ? await this.getWorkspaceInfo(accountsUrl, rawToken) : this.wsFromToken(token) + ) if (workspaceInfo === undefined && token.extra?.admin !== 'true') { // No access to workspace for token. return { error: new Error(`No access to workspace for token ${token.email} ${token.workspace.name}`) } @@ -222,6 +227,10 @@ class TSessionManager implements SessionManager { } let workspace = this.workspaces.get(wsString) + if (workspace?.closeTimeout !== undefined) { + await ctx.info('Cancel workspace warm close', { wsString }) + clearTimeout(workspace?.closeTimeout) + } await workspace?.closing workspace = this.workspaces.get(wsString) if (sessionId !== undefined && workspace?.sessions?.has(sessionId) === true) { @@ -278,7 +287,9 @@ class TSessionManager implements SessionManager { this.sessions.set(ws.id, { session, socket: ws }) // We need to delete previous session with Id if found. workspace.sessions.set(session.sessionId, { session, socket: ws }) - await ctx.with('set-status', {}, () => this.setStatus(ctx, session, true)) + + // We do not need to wait for set-status, just return session to client + void ctx.with('set-status', {}, (ctx) => this.setStatus(ctx, session, true)) if (this.timeMinutes > 0) { void ws.send( @@ -316,7 +327,7 @@ class TSessionManager implements SessionManager { workspaceName: string ): Promise { if (LOGGING_ENABLED) { - console.log(workspaceName, 'reloading workspace', JSON.stringify(token)) + await ctx.info('reloading workspace', { workspaceName, token: JSON.stringify(token) }) } // If upgrade client is used. // Drop all existing clients @@ -351,12 +362,16 @@ class TSessionManager implements SessionManager { for (const session of sessions.splice(0, 1)) { if (targets !== undefined && !targets.includes(session.session.getUser())) continue for (const _tx of tx) { - void session.socket.send( - ctx, - { result: _tx }, - session.session.binaryResponseMode, - session.session.useCompression - ) + try { + void session.socket.send( + ctx, + { result: _tx }, + session.session.binaryResponseMode, + session.session.useCompression + ) + } catch (err: any) { + void ctx.error('error during send', { error: err }) + } } } if (sessions.length > 0) { @@ -377,11 +392,12 @@ class TSessionManager implements SessionManager { ): Workspace { const upgrade = token.extra?.model === 'upgrade' const context = ctx.newChild('🧲 session', {}) + const pipelineCtx = context.newChild('🧲 pipeline-factory', {}) const workspace: Workspace = { context, id: generateId(), pipeline: pipelineFactory( - context, + pipelineCtx, { ...token.workspace, workspaceUrl, workspaceName }, upgrade, (tx, targets) => { @@ -393,8 +409,6 @@ class TSessionManager implements SessionManager { workspaceId: token.workspace, workspaceName } - if (LOGGING_ENABLED) console.time(workspaceName) - if (LOGGING_ENABLED) console.timeLog(workspaceName, 'Creating Workspace:', workspace.id) this.workspaces.set(toWorkspaceString(token.workspace), workspace) return workspace } @@ -429,11 +443,12 @@ class TSessionManager implements SessionManager { } async close (ws: ConnectionSocket, workspaceId: WorkspaceId, code: number, reason: string): Promise { - // if (LOGGING_ENABLED) console.log(workspaceId.name, `closing websocket, code: ${code}, reason: ${reason}`) const wsid = toWorkspaceString(workspaceId) const workspace = this.workspaces.get(wsid) if (workspace === undefined) { - if (LOGGING_ENABLED) console.error(new Error('internal: cannot find sessions')) + if (LOGGING_ENABLED) { + await this.ctx.error('internal: cannot find sessions', { id: ws.id, workspace: workspaceId.name, code, reason }) + } return } const sessionRef = this.sessions.get(ws.id) @@ -458,7 +473,9 @@ class TSessionManager implements SessionManager { if (!workspace.upgrade) { // Wait some time for new client to appear before closing workspace. if (workspace.sessions.size === 0) { - setTimeout(() => { + clearTimeout(workspace.closeTimeout) + void this.ctx.info('schedule warm closing', { workspace: workspace.workspaceName, wsid }) + workspace.closeTimeout = setTimeout(() => { void this.performWorkspaceCloseCheck(workspace, workspaceId, wsid) }, this.timeouts.shutdownWarmTimeout) } @@ -469,7 +486,15 @@ class TSessionManager implements SessionManager { } async closeAll (wsId: string, workspace: Workspace, code: number, reason: 'upgrade' | 'shutdown'): Promise { - if (LOGGING_ENABLED) console.timeLog(wsId, `closing workspace ${workspace.id}, code: ${code}, reason: ${reason}`) + if (LOGGING_ENABLED) { + await this.ctx.info('closing workspace', { + workspace: workspace.id, + wsName: workspace.workspaceName, + code, + reason, + wsId + }) + } const sessions = Array.from(workspace.sessions) workspace.sessions = new Map() @@ -484,21 +509,30 @@ class TSessionManager implements SessionManager { await this.setStatus(workspace.context, s, false) } - if (LOGGING_ENABLED) console.timeLog(wsId, workspace.id, 'Clients disconnected. Closing Workspace...') + if (LOGGING_ENABLED) { + await this.ctx.info('Clients disconnected. Closing Workspace...', { + wsId, + workspace: workspace.id, + wsName: workspace.workspaceName + }) + } await Promise.all(sessions.map((s) => closeS(s[1].session, s[1].socket))) const closePipeline = async (): Promise => { try { - if (LOGGING_ENABLED) console.timeLog(wsId, 'closing pipeline') - await (await workspace.pipeline).close() - if (LOGGING_ENABLED) console.timeLog(wsId, 'closing pipeline done') + await this.ctx.with('close-pipeline', {}, async () => { + await (await workspace.pipeline).close() + }) } catch (err: any) { - console.error(err) + await this.ctx.error('close-pipeline-error', { error: err }) } } - await Promise.race([closePipeline(), timeoutPromise(15000)]) - if (LOGGING_ENABLED) console.timeLog(wsId, 'Workspace closed...') - console.timeEnd(wsId) + await this.ctx.with('closing', {}, async () => { + await Promise.race([closePipeline(), timeoutPromise(15000)]) + }) + if (LOGGING_ENABLED) { + await this.ctx.info('Workspace closed...', { workspace: workspace.id, wsId, wsName: workspace.workspaceName }) + } } private async sendUpgrade (ctx: MeasureContext, webSocket: ConnectionSocket, binary: boolean): Promise { @@ -530,31 +564,36 @@ class TSessionManager implements SessionManager { ): Promise { if (workspace.sessions.size === 0) { const wsUID = workspace.id + const logParams = { wsid, workspace: workspace.id, wsName: workspaceId.name } if (LOGGING_ENABLED) { - console.log(workspaceId.name, 'no sessions for workspace', wsid, wsUID) + await this.ctx.info('no sessions for workspace', logParams) } - const waitAndClose = async (workspace: Workspace): Promise => { - try { - const pl = await workspace.pipeline - await Promise.race([pl, timeoutPromise(60000)]) - await Promise.race([pl.close(), timeoutPromise(60000)]) + if (workspace.closing === undefined) { + const waitAndClose = async (workspace: Workspace): Promise => { + try { + if (workspace.sessions.size === 0) { + const pl = await workspace.pipeline + await Promise.race([pl, timeoutPromise(60000)]) + await Promise.race([pl.close(), timeoutPromise(60000)]) - if (this.workspaces.get(wsid)?.id === wsUID) { + if (this.workspaces.get(wsid)?.id === wsUID) { + this.workspaces.delete(wsid) + } + workspace.context.end() + if (LOGGING_ENABLED) { + await this.ctx.info('Closed workspace', logParams) + } + } + } catch (err: any) { this.workspaces.delete(wsid) - } - workspace.context.end() - if (LOGGING_ENABLED) { - console.timeLog(workspaceId.name, 'Closed workspace', wsUID) - } - } catch (err: any) { - this.workspaces.delete(wsid) - if (LOGGING_ENABLED) { - console.error(workspaceId.name, err) + if (LOGGING_ENABLED) { + await this.ctx.error('failed', { ...logParams, error: err }) + } } } + workspace.closing = waitAndClose(workspace) } - workspace.closing = waitAndClose(workspace) await workspace.closing } } @@ -562,13 +601,22 @@ class TSessionManager implements SessionManager { broadcast (from: Session | null, workspaceId: WorkspaceId, resp: Response, target?: string[]): void { const workspace = this.workspaces.get(toWorkspaceString(workspaceId)) if (workspace === undefined) { - console.error(new Error('internal: cannot find sessions')) + void this.ctx.error('internal: cannot find sessions', { + workspaceId: workspaceId.name, + target, + userId: from?.getUser() ?? '$unknown' + }) return } if (workspace?.upgrade ?? false) { return } - if (LOGGING_ENABLED) console.log(workspaceId.name, `server broadcasting to ${workspace.sessions.size} clients...`) + if (LOGGING_ENABLED) { + void this.ctx.info('server broadcasting to clients...', { + workspace: workspaceId.name, + count: workspace.sessions.size + }) + } const sessions = [...workspace.sessions.values()] const ctx = this.ctx.newChild('📭 broadcast', {}) @@ -627,19 +675,14 @@ class TSessionManager implements SessionManager { service.useBroadcast = hello.broadcast ?? false if (LOGGING_ENABLED) { - console.timeLog( - workspace, - 'hello happen', - service.getUser(), - 'binary:', - service.binaryResponseMode, - 'compression:', - service.useCompression, - 'workspace users:', - this.workspaces.get(workspace)?.sessions?.size, - 'total users:', - this.sessions.size - ) + await ctx.info('hello happen', { + user: service.getUser(), + binary: service.binaryResponseMode, + compression: service.useCompression, + timeToHello: Date.now() - service.createTime, + workspaceUsers: this.workspaces.get(workspace)?.sessions?.size, + totalUsers: this.sessions.size + }) } const helloResponse: HelloResponse = { id: -1, @@ -684,7 +727,9 @@ class TSessionManager implements SessionManager { service.useCompression ) } catch (err: any) { - if (LOGGING_ENABLED) console.error(err) + if (LOGGING_ENABLED) { + await this.ctx.error('error handle request', { error: err, request }) + } const resp: Response = { id: request.id, error: unknownError(err), @@ -726,7 +771,9 @@ class TSessionManager implements SessionManager { service.useCompression ) } catch (err: any) { - if (LOGGING_ENABLED) console.error(err) + if (LOGGING_ENABLED) { + await ctx.error('error handle measure', { error: err, request }) + } const resp: Response = { id: request.id, error: unknownError(err), diff --git a/server/ws/src/server_http.ts b/server/ws/src/server_http.ts index 33a64034ba..22dbdc7676 100644 --- a/server/ws/src/server_http.ts +++ b/server/ws/src/server_http.ts @@ -47,7 +47,9 @@ export function startHttpServer ( enableCompression: boolean, accountsUrl: string ): () => Promise { - if (LOGGING_ENABLED) console.log(`starting server on port ${port} ...`) + if (LOGGING_ENABLED) { + void ctx.info('starting server on', { port, productId, enableCompression, accountsUrl }) + } const app = express() app.use(cors()) @@ -209,21 +211,27 @@ export function startHttpServer ( ) if ('upgrade' in session || 'error' in session) { if ('error' in session) { - console.error(session.error) + void ctx.error('error', { error: session.error }) } cs.close() return } // eslint-disable-next-line @typescript-eslint/no-misused-promises ws.on('message', (msg: RawData) => { - let buff: any | undefined - if (msg instanceof Buffer) { - buff = msg?.toString() - } else if (Array.isArray(msg)) { - buff = Buffer.concat(msg).toString() - } - if (buff !== undefined) { - void handleRequest(session.context, session.session, cs, buff, session.workspaceName) + try { + let buff: any | undefined + if (msg instanceof Buffer) { + buff = msg?.toString() + } else if (Array.isArray(msg)) { + buff = Buffer.concat(msg).toString() + } + if (buff !== undefined) { + void handleRequest(session.context, session.session, cs, buff, session.workspaceName) + } + } catch (err: any) { + if (LOGGING_ENABLED) { + void ctx.error('message error', err) + } } }) // eslint-disable-next-line @typescript-eslint/no-misused-promises @@ -251,12 +259,17 @@ export function startHttpServer ( const sessionId = url.searchParams.get('sessionId') if (payload.workspace.productId !== productId) { + if (LOGGING_ENABLED) { + void ctx.error('invalid product', { required: payload.workspace.productId, productId }) + } throw new Error('Invalid workspace product') } wss.handleUpgrade(request, socket, head, (ws) => wss.emit('connection', ws, request, payload, token, sessionId)) - } catch (err) { - if (LOGGING_ENABLED) console.error('invalid token', err) + } catch (err: any) { + if (LOGGING_ENABLED) { + void ctx.error('invalid token', err) + } wss.handleUpgrade(request, socket, head, (ws) => { const resp: Response = { id: -1, @@ -274,7 +287,9 @@ export function startHttpServer ( } }) httpServer.on('error', (err) => { - if (LOGGING_ENABLED) console.error('server error', err) + if (LOGGING_ENABLED) { + void ctx.error('server error', err) + } }) httpServer.listen(port) diff --git a/server/ws/src/types.ts b/server/ws/src/types.ts index 8360eaf045..ef78c88879 100644 --- a/server/ws/src/types.ts +++ b/server/ws/src/types.ts @@ -35,6 +35,7 @@ export interface StatisticsElement { * @public */ export interface Session { + createTime: number getUser: () => string pipeline: () => Pipeline ping: () => Promise @@ -117,7 +118,9 @@ export interface Workspace { pipeline: Promise sessions: Map upgrade: boolean + closing?: Promise + closeTimeout?: any workspaceId: WorkspaceId workspaceName: string