UBERF-6374: Improve server logging and improve startup performance (#5210)

This commit is contained in:
Andrey Sobolev 2024-04-06 15:14:06 +07:00 committed by GitHub
parent ceac67f3a3
commit 034700a65b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
39 changed files with 631 additions and 405 deletions

View File

@ -54,12 +54,6 @@ class InMemoryTxAdapter extends DummyDbAdapter implements TxAdapter {
return r
}
async init (model: Tx[]): Promise<void> {
for (const tx of model) {
await this.txdb.tx(tx)
}
}
async getModel (): Promise<Tx[]> {
return builder().getTxes()
}

View File

@ -240,7 +240,7 @@ export class TReaction extends TAttachedDoc implements Reaction {
@Index(IndexKind.Indexed)
declare attachedTo: Ref<ActivityMessage>
@Prop(TypeRef(activity.class.ActivityMessage), core.string.AttachedToClass)
@Prop(TypeRef(core.class.Class), core.string.AttachedToClass)
@Index(IndexKind.Indexed)
declare attachedToClass: Ref<Class<ActivityMessage>>

View File

@ -51,15 +51,15 @@ import {
} from '@hcengineering/model'
import attachment from '@hcengineering/model-attachment'
import calendar from '@hcengineering/model-calendar'
import chunter from '@hcengineering/model-chunter'
import contact, { TEmployee, TPersonAccount } from '@hcengineering/model-contact'
import core, { TAttachedDoc, TDoc, TSpace, TType } from '@hcengineering/model-core'
import view, { classPresenter, createAction } from '@hcengineering/model-view'
import workbench from '@hcengineering/model-workbench'
import notification from '@hcengineering/notification'
import { type Asset, type IntlString } from '@hcengineering/platform'
import hr from './plugin'
import { PaletteColorIndexes } from '@hcengineering/ui/src/colors'
import chunter from '@hcengineering/model-chunter'
import hr from './plugin'
export { hrId } from '@hcengineering/hr'
export { hrOperation } from './migration'
@ -147,6 +147,7 @@ export class TRequest extends TAttachedDoc implements Request {
@Index(IndexKind.Indexed)
declare attachedTo: Ref<Staff>
@Prop(TypeRef(core.class.Class), core.string.Class)
@Index(IndexKind.Indexed)
declare attachedToClass: Ref<Class<Staff>>

View File

@ -75,7 +75,7 @@ import {
notificationId,
type MentionInboxNotification
} from '@hcengineering/notification'
import { type Asset, type IntlString } from '@hcengineering/platform'
import { getEmbeddedLabel, type Asset, type IntlString } from '@hcengineering/platform'
import setting from '@hcengineering/setting'
import { type AnyComponent } from '@hcengineering/ui/src/types'
@ -178,12 +178,15 @@ export class TNotificationContextPresenter extends TClass implements Notificatio
@Model(notification.class.DocUpdates, core.class.Doc, DOMAIN_NOTIFICATION)
export class TDocUpdates extends TDoc implements DocUpdates {
@Prop(TypeRef(core.class.Account), core.string.Account)
@Index(IndexKind.Indexed)
user!: Ref<Account>
@Prop(TypeRef(core.class.Account), core.string.AttachedTo)
@Index(IndexKind.Indexed)
attachedTo!: Ref<Doc>
@Prop(TypeRef(core.class.Account), getEmbeddedLabel('Hidden'))
@Index(IndexKind.Indexed)
hidden!: boolean

View File

@ -34,7 +34,6 @@
},
"dependencies": {
"@hcengineering/platform": "^0.6.9",
"just-clone": "~6.2.0",
"fast-equals": "^2.0.3"
},
"repository": "https://github.com/hcengineering/anticrm",

View File

@ -25,6 +25,7 @@ import type { DocumentQuery, FindResult, TxResult, SearchQuery, SearchOptions, S
import { Tx, TxFactory, TxProcessor } from '../tx'
import { connect } from './connection'
import { genMinModel } from './minmodel'
import { clone } from '../clone'
describe('client', () => {
it('should create client and spaces', async () => {
@ -118,7 +119,7 @@ describe('client', () => {
loadDocs: async (domain: Domain, docs: Ref<Doc>[]) => [],
upload: async (domain: Domain, docs: Doc[]) => {},
clean: async (domain: Domain, docs: Ref<Doc>[]) => {},
loadModel: async (last: Timestamp) => txes,
loadModel: async (last: Timestamp) => clone(txes),
getAccount: async () => null as unknown as Account,
measure: async () => {
return async () => ({ time: 0, serverTime: 0 })
@ -141,8 +142,8 @@ describe('client', () => {
expect(result1).toHaveLength(1)
expect(result1[0]._id).toStrictEqual(txCreateDoc1.objectId)
expect(spyCreate).toHaveBeenLastCalledWith(txCreateDoc1)
expect(spyUpdate).toBeCalledTimes(0)
expect(spyCreate).toHaveBeenLastCalledWith(txCreateDoc1, false)
expect(spyUpdate).toHaveBeenCalledTimes(0)
await client1.close()
const pluginData2 = {
@ -159,8 +160,8 @@ describe('client', () => {
expect(result2).toHaveLength(2)
expect(result2[0]._id).toStrictEqual(txCreateDoc1.objectId)
expect(result2[1]._id).toStrictEqual(txCreateDoc2.objectId)
expect(spyCreate).toHaveBeenLastCalledWith(txCreateDoc2)
expect(spyUpdate).toBeCalledTimes(0)
expect(spyCreate).toHaveBeenLastCalledWith(txCreateDoc2, false)
expect(spyUpdate).toHaveBeenCalledTimes(0)
await client2.close()
const pluginData3 = {
@ -181,7 +182,7 @@ describe('client', () => {
expect(result3).toHaveLength(1)
expect(result3[0]._id).toStrictEqual(txCreateDoc2.objectId)
expect(spyCreate).toHaveBeenLastCalledWith(txCreateDoc2)
expect(spyCreate).toHaveBeenLastCalledWith(txCreateDoc2, false)
expect(spyUpdate.mock.calls[1][1]).toStrictEqual(txUpdateDoc)
expect(spyUpdate).toBeCalledTimes(2)
await client3.close()

View File

@ -18,11 +18,12 @@ import { BackupClient, DocChunk } from './backup'
import { Account, AttachedDoc, Class, DOMAIN_MODEL, Doc, Domain, PluginConfiguration, Ref, Timestamp } from './classes'
import core from './component'
import { Hierarchy } from './hierarchy'
import { MeasureContext, MeasureMetricsContext } from './measurements'
import { ModelDb } from './memdb'
import type { DocumentQuery, FindOptions, FindResult, FulltextStorage, Storage, TxResult, WithLookup } from './storage'
import { SearchOptions, SearchQuery, SearchResult, SortingOrder } from './storage'
import { Tx, TxCUD, TxCollectionCUD, TxCreateDoc, TxProcessor, TxUpdateDoc } from './tx'
import { toFindResult } from './utils'
import { toFindResult, toIdMap } from './utils'
const transactionThreshold = 500
@ -222,8 +223,10 @@ export async function createClient (
connect: (txHandler: TxHandler) => Promise<ClientConnection>,
// If set will build model with only allowed plugins.
allowedPlugins?: Plugin[],
txPersistence?: TxPersistenceStore
txPersistence?: TxPersistenceStore,
_ctx?: MeasureContext
): Promise<AccountClient> {
const ctx = _ctx ?? new MeasureMetricsContext('createClient', {})
let client: ClientImpl | null = null
// Temporal buffer, while we apply model
@ -248,9 +251,13 @@ export async function createClient (
}
const configs = new Map<Ref<PluginConfiguration>, PluginConfiguration>()
const conn = await connect(txHandler)
const conn = await ctx.with('connect', {}, async () => await connect(txHandler))
await loadModel(conn, allowedPlugins, configs, hierarchy, model, false, txPersistence)
await ctx.with(
'load-model',
{ reload: false },
async (ctx) => await loadModel(ctx, conn, allowedPlugins, configs, hierarchy, model, false, txPersistence)
)
txBuffer = txBuffer.filter((tx) => tx.space !== core.space.Model)
@ -264,14 +271,20 @@ export async function createClient (
conn.onConnect = async (event) => {
console.log('Client: onConnect', event)
// Find all new transactions and apply
const loadModelResponse = await loadModel(conn, allowedPlugins, configs, hierarchy, model, true, txPersistence)
const loadModelResponse = await ctx.with(
'connect',
{ reload: true },
async (ctx) => await loadModel(ctx, conn, allowedPlugins, configs, hierarchy, model, true, txPersistence)
)
if (event === ClientConnectEvent.Reconnected && loadModelResponse.full) {
// We have upgrade procedure and need rebuild all stuff.
hierarchy = new Hierarchy()
model = new ModelDb(hierarchy)
await buildModel(loadModelResponse, allowedPlugins, configs, hierarchy, model)
await ctx.with('build-model', {}, async (ctx) => {
await buildModel(ctx, loadModelResponse, allowedPlugins, configs, hierarchy, model)
})
await oldOnConnect?.(ClientConnectEvent.Upgraded)
// No need to fetch more stuff since upgrade was happened.
@ -285,10 +298,15 @@ export async function createClient (
}
// We need to look for last {transactionThreshold} transactions and if it is more since lastTx one we receive, we need to perform full refresh.
const atxes = await conn.findAll(
core.class.Tx,
{ modifiedOn: { $gt: lastTx }, objectSpace: { $ne: core.space.Model } },
{ sort: { modifiedOn: SortingOrder.Ascending, _id: SortingOrder.Ascending }, limit: transactionThreshold }
const atxes = await ctx.with(
'find-atx',
{},
async () =>
await conn.findAll(
core.class.Tx,
{ modifiedOn: { $gt: lastTx }, objectSpace: { $ne: core.space.Model } },
{ sort: { modifiedOn: SortingOrder.Ascending, _id: SortingOrder.Ascending }, limit: transactionThreshold }
)
)
let needFullRefresh = false
@ -318,14 +336,23 @@ export async function createClient (
}
async function tryLoadModel (
ctx: MeasureContext,
conn: ClientConnection,
reload: boolean,
persistence?: TxPersistenceStore
): Promise<LoadModelResponse> {
const current = (await persistence?.load()) ?? { full: true, transactions: [], hash: '' }
const current = (await ctx.with('persistence-load', {}, async () => await persistence?.load())) ?? {
full: true,
transactions: [],
hash: ''
}
const lastTxTime = getLastTxTime(current.transactions)
const result = await conn.loadModel(lastTxTime, current.hash)
const result = await ctx.with(
'connection-load-model',
{ hash: current.hash !== '' },
async (ctx) => await conn.loadModel(lastTxTime, current.hash)
)
if (Array.isArray(result)) {
// Fallback to old behavior, only for tests
@ -337,10 +364,15 @@ async function tryLoadModel (
}
// Save concatenated
await persistence?.store({
...result,
transactions: !result.full ? current.transactions.concat(result.transactions) : result.transactions
})
void (await ctx.with(
'persistence-store',
{},
async (ctx) =>
await persistence?.store({
...result,
transactions: !result.full ? current.transactions.concat(result.transactions) : result.transactions
})
))
if (!result.full && !reload) {
result.transactions = current.transactions.concat(result.transactions)
@ -361,6 +393,7 @@ function isPersonAccount (tx: Tx): boolean {
}
async function loadModel (
ctx: MeasureContext,
conn: ClientConnection,
allowedPlugins: Plugin[] | undefined,
configs: Map<Ref<PluginConfiguration>, PluginConfiguration>,
@ -371,7 +404,11 @@ async function loadModel (
): Promise<LoadModelResponse> {
const t = Date.now()
const modelResponse = await tryLoadModel(conn, reload, persistence)
const modelResponse = await ctx.with(
'try-load-model',
{ reload },
async (ctx) => await tryLoadModel(ctx, conn, reload, persistence)
)
if (reload && modelResponse.full) {
return modelResponse
@ -385,11 +422,14 @@ async function loadModel (
)
}
await buildModel(modelResponse, allowedPlugins, configs, hierarchy, model)
await ctx.with('build-model', {}, async (ctx) => {
await buildModel(ctx, modelResponse, allowedPlugins, configs, hierarchy, model)
})
return modelResponse
}
async function buildModel (
ctx: MeasureContext,
modelResponse: LoadModelResponse,
allowedPlugins: Plugin[] | undefined,
configs: Map<Ref<PluginConfiguration>, PluginConfiguration>,
@ -400,38 +440,45 @@ async function buildModel (
const userTx: Tx[] = []
const atxes = modelResponse.transactions
atxes.forEach((tx) =>
((tx.modifiedBy === core.account.ConfigUser || tx.modifiedBy === core.account.System) && !isPersonAccount(tx)
? systemTx
: userTx
).push(tx)
)
await ctx.with('split txes', {}, async () => {
atxes.forEach((tx) =>
((tx.modifiedBy === core.account.ConfigUser || tx.modifiedBy === core.account.System) && !isPersonAccount(tx)
? systemTx
: userTx
).push(tx)
)
})
if (allowedPlugins != null) {
fillConfiguration(systemTx, configs)
fillConfiguration(userTx, configs)
await ctx.with('fill config system', {}, async () => {
fillConfiguration(systemTx, configs)
})
await ctx.with('fill config user', {}, async () => {
fillConfiguration(userTx, configs)
})
const excludedPlugins = Array.from(configs.values()).filter(
(it) => !it.enabled || !allowedPlugins.includes(it.pluginId)
)
systemTx = pluginFilterTx(excludedPlugins, configs, systemTx)
await ctx.with('filter txes', {}, async () => {
systemTx = pluginFilterTx(excludedPlugins, configs, systemTx)
})
}
const txes = systemTx.concat(userTx)
for (const tx of txes) {
try {
hierarchy.tx(tx)
} catch (err: any) {
console.error('failed to apply model transaction, skipping', tx._id, tx._class, err?.message)
await ctx.with('build hierarchy', {}, async () => {
for (const tx of txes) {
try {
hierarchy.tx(tx)
} catch (err: any) {
console.error('failed to apply model transaction, skipping', tx._id, tx._class, err?.message)
}
}
}
for (const tx of txes) {
try {
await model.tx(tx)
} catch (err: any) {
console.error('failed to apply model transaction, skipping', tx._id, tx._class, err?.message)
}
}
})
await ctx.with('build model', {}, async (ctx) => {
model.addTxes(ctx, txes, false)
})
}
function getLastTxTime (txes: Tx[]): number {
@ -468,14 +515,15 @@ function pluginFilterTx (
configs: Map<Ref<PluginConfiguration>, PluginConfiguration>,
systemTx: Tx[]
): Tx[] {
const stx = toIdMap(systemTx)
const totalExcluded = new Set<Ref<Tx>>()
for (const a of excludedPlugins) {
for (const c of configs.values()) {
if (a.pluginId === c.pluginId) {
const excluded = new Set<Ref<Tx>>()
for (const id of c.transactions) {
if (c.classFilter !== undefined) {
const filter = new Set(c.classFilter)
const tx = systemTx.find((it) => it._id === id)
const tx = stx.get(id as Ref<Tx>)
if (
tx?._class === core.class.TxCreateDoc ||
tx?._class === core.class.TxUpdateDoc ||
@ -483,18 +531,17 @@ function pluginFilterTx (
) {
const cud = tx as TxCUD<Doc>
if (filter.has(cud.objectClass)) {
excluded.add(id as Ref<Tx>)
totalExcluded.add(id as Ref<Tx>)
}
}
} else {
excluded.add(id as Ref<Tx>)
totalExcluded.add(id as Ref<Tx>)
}
}
const exclude = systemTx.filter((t) => excluded.has(t._id))
console.log('exclude plugin', c.pluginId, exclude.length)
systemTx = systemTx.filter((t) => !excluded.has(t._id))
console.log('exclude plugin', c.pluginId, c.transactions.length)
}
}
}
systemTx = systemTx.filter((t) => !totalExcluded.has(t._id))
return systemTx
}

View File

@ -0,0 +1,72 @@
const se = typeof Symbol !== 'undefined'
const ste = se && typeof Symbol.toStringTag !== 'undefined'
export function getTypeOf (obj: any): string {
const typeofObj = typeof obj
if (typeofObj !== 'object') {
return typeofObj
}
if (obj === null) {
return 'null'
}
if (Array.isArray(obj) && (!ste || !(Symbol.toStringTag in obj))) {
return 'Array'
}
const stringTag = ste && obj[Symbol.toStringTag]
if (typeof stringTag === 'string') {
return stringTag
}
const objPrototype = Object.getPrototypeOf(obj)
if (objPrototype === RegExp.prototype) {
return 'RegExp'
}
if (objPrototype === Date.prototype) {
return 'Date'
}
if (objPrototype === null) {
return 'Object'
}
return {}.toString.call(obj).slice(8, -1)
}
export function clone (obj: any, as?: (doc: any, m: any) => any, needAs?: (value: any) => any | undefined): any {
if (typeof obj === 'undefined') {
return undefined
}
if (typeof obj === 'function') {
return obj
}
const typeOf = getTypeOf(obj)
if (typeOf === 'Date') {
return new Date(obj.getTime())
} else if (typeOf === 'Array' || typeOf === 'Object') {
const isArray = Array.isArray(obj)
const result: any = isArray ? [] : Object.assign({}, obj)
for (const key in obj) {
// include prototype properties
const value = obj[key]
const type = getTypeOf(value)
if (type === 'Array') {
result[key] = clone(value, as, needAs)
} else if (type === 'Object') {
const m = needAs?.(value)
const valClone = clone(value, as, needAs)
result[key] = m !== undefined && as !== undefined ? as(valClone, m) : valClone
} else if (type === 'Date') {
result[key] = new Date(value.getTime())
} else {
if (isArray) {
result[key] = value
}
}
}
return result
} else {
return obj
}
}

View File

@ -16,11 +16,11 @@
import { FindOptions, Lookup, ToClassRefT, WithLookup } from '.'
import type { AnyAttribute, Class, Classifier, Doc, Domain, Interface, Mixin, Obj, Ref } from './classes'
import { ClassifierKind } from './classes'
import { clone as deepClone } from './clone'
import core from './component'
import { _createMixinProxy, _mixinClass, _toDoc } from './proxy'
import type { Tx, TxCreateDoc, TxMixin, TxRemoveDoc, TxUpdateDoc } from './tx'
import { TxProcessor } from './tx'
import { getTypeOf } from './typeof'
/**
* @public
@ -587,33 +587,11 @@ export class Hierarchy {
}
clone (obj: any): any {
if (typeof obj === 'undefined') {
return undefined
}
if (typeof obj === 'function') {
return obj
}
const isArray = Array.isArray(obj)
const result: any = isArray ? [] : Object.assign({}, obj)
for (const key in obj) {
// include prototype properties
const value = obj[key]
const type = getTypeOf(value)
if (type === 'Array') {
result[key] = this.clone(value)
} else if (type === 'Object') {
const m = Hierarchy.mixinClass(value)
const valClone = this.clone(value)
result[key] = m !== undefined ? this.as(valClone, m) : valClone
} else if (type === 'Date') {
result[key] = new Date(value.getTime())
} else {
if (isArray) {
result[key] = value
}
}
}
return result
return deepClone(
obj,
(doc, m) => this.as(doc, m),
(value) => Hierarchy.mixinClass(value)
)
}
domains (): Domain[] {

View File

@ -30,6 +30,6 @@ export * from './tx'
export * from './utils'
export * from './backup'
export * from './status'
export * from './typeof'
export * from './clone'
export * from './common'
export * from './time'

View File

@ -14,13 +14,13 @@
//
import { PlatformError, Severity, Status } from '@hcengineering/platform'
import { Lookup, ReverseLookups, getObjectValue } from '.'
import type { Class, Doc, Ref } from './classes'
import { Lookup, MeasureContext, ReverseLookups, getObjectValue } from '.'
import type { AttachedDoc, Class, Doc, Ref } from './classes'
import core from './component'
import { Hierarchy } from './hierarchy'
import { checkMixinKey, matchQuery, resultSort } from './query'
import type { DocumentQuery, FindOptions, FindResult, LookupData, Storage, TxResult, WithLookup } from './storage'
import type { Tx, TxCreateDoc, TxMixin, TxRemoveDoc, TxUpdateDoc } from './tx'
import type { Tx, TxCollectionCUD, TxCreateDoc, TxMixin, TxRemoveDoc, TxUpdateDoc } from './tx'
import { TxProcessor } from './tx'
import { toFindResult } from './utils'
@ -28,17 +28,17 @@ import { toFindResult } from './utils'
* @public
*/
export abstract class MemDb extends TxProcessor implements Storage {
private readonly objectsByClass = new Map<Ref<Class<Doc>>, Doc[]>()
private readonly objectsByClass = new Map<Ref<Class<Doc>>, Map<Ref<Doc>, Doc>>()
private readonly objectById = new Map<Ref<Doc>, Doc>()
constructor (protected readonly hierarchy: Hierarchy) {
super()
}
private getObjectsByClass (_class: Ref<Class<Doc>>): Doc[] {
private getObjectsByClass (_class: Ref<Class<Doc>>): Map<Ref<Doc>, Doc> {
const result = this.objectsByClass.get(_class)
if (result === undefined) {
const result: Doc[] = []
const result = new Map<Ref<Doc>, Doc>()
this.objectsByClass.set(_class, result)
return result
}
@ -46,10 +46,9 @@ export abstract class MemDb extends TxProcessor implements Storage {
}
private cleanObjectByClass (_class: Ref<Class<Doc>>, _id: Ref<Doc>): void {
let result = this.objectsByClass.get(_class)
const result = this.objectsByClass.get(_class)
if (result !== undefined) {
result = result.filter((cl) => cl._id !== _id)
this.objectsByClass.set(_class, result)
result.delete(_id)
}
}
@ -152,7 +151,7 @@ export abstract class MemDb extends TxProcessor implements Storage {
) {
result = this.getByIdQuery(query, baseClass)
} else {
result = this.getObjectsByClass(baseClass)
result = Array.from(this.getObjectsByClass(baseClass).values())
}
result = matchQuery(result, query, _class, this.hierarchy, true)
@ -195,7 +194,7 @@ export abstract class MemDb extends TxProcessor implements Storage {
) {
result = this.getByIdQuery(query, baseClass)
} else {
result = this.getObjectsByClass(baseClass)
result = Array.from(this.getObjectsByClass(baseClass).values())
}
result = matchQuery(result, query, _class, this.hierarchy, true)
@ -214,12 +213,7 @@ export abstract class MemDb extends TxProcessor implements Storage {
addDoc (doc: Doc): void {
this.hierarchy.getAncestors(doc._class).forEach((_class) => {
const arr = this.getObjectsByClass(_class)
const index = arr.findIndex((p) => p._id === doc._id)
if (index === -1) {
arr.push(doc)
} else {
arr[index] = doc
}
arr.set(doc._id, doc)
})
this.objectById.set(doc._id, doc)
}
@ -275,6 +269,74 @@ export class ModelDb extends MemDb {
return {}
}
addTxes (ctx: MeasureContext, txes: Tx[], clone: boolean): void {
for (const tx of txes) {
switch (tx._class) {
case core.class.TxCreateDoc:
this.addDoc(TxProcessor.createDoc2Doc(tx as TxCreateDoc<Doc>, clone))
break
case core.class.TxCollectionCUD: {
// We need update only create transactions to contain attached, attachedToClass.
const cud = tx as TxCollectionCUD<Doc, AttachedDoc<Doc>>
if (cud.tx._class === core.class.TxCreateDoc) {
const createTx = cud.tx as TxCreateDoc<AttachedDoc>
const d: TxCreateDoc<AttachedDoc> = {
...createTx,
attributes: {
...createTx.attributes,
attachedTo: cud.objectId,
attachedToClass: cud.objectClass,
collection: cud.collection
}
}
this.addDoc(TxProcessor.createDoc2Doc(d as TxCreateDoc<Doc>, clone))
}
this.addTxes(ctx, [cud.tx], clone)
break
}
case core.class.TxUpdateDoc: {
const cud = tx as TxUpdateDoc<Doc>
const doc = this.findObject(cud.objectId)
if (doc !== undefined) {
TxProcessor.updateDoc2Doc(doc, cud)
} else {
void ctx.error('no document found, failed to apply model transaction, skipping', {
_id: tx._id,
_class: tx._class,
objectId: cud.objectId
})
}
break
}
case core.class.TxRemoveDoc:
try {
this.delDoc((tx as TxRemoveDoc<Doc>).objectId)
} catch (err: any) {
void ctx.error('no document found, failed to apply model transaction, skipping', {
_id: tx._id,
_class: tx._class,
objectId: (tx as TxRemoveDoc<Doc>).objectId
})
}
break
case core.class.TxMixin: {
const mix = tx as TxMixin<Doc, Doc>
const obj = this.findObject(mix.objectId)
if (obj !== undefined) {
TxProcessor.updateMixin4Doc(obj, mix)
} else {
void ctx.error('no document found, failed to apply model transaction, skipping', {
_id: tx._id,
_class: tx._class,
objectId: mix.objectId
})
}
break
}
}
}
}
protected async txUpdateDoc (tx: TxUpdateDoc<Doc>): Promise<TxResult> {
const doc = this.getObject(tx.objectId) as any
TxProcessor.updateDoc2Doc(doc, tx)

View File

@ -1,7 +1,7 @@
import { PlatformError, Severity, Status } from '@hcengineering/platform'
import { Doc } from './classes'
import { clone } from './clone'
import core from './component'
import justClone from 'just-clone'
/**
* @public
@ -60,7 +60,7 @@ export function setObjectValue (key: string, doc: Doc, newValue: any): void {
value = lvalue
}
}
value[last] = justClone(newValue)
value[last] = clone(newValue)
return value
}

View File

@ -13,7 +13,6 @@
// limitations under the License.
//
import justClone from 'just-clone'
import type { KeysByType } from 'simplytyped'
import type {
Account,
@ -35,6 +34,7 @@ import { _getOperator } from './operator'
import { _toDoc } from './proxy'
import type { DocumentQuery, TxResult } from './storage'
import { generateId } from './utils'
import { clone } from './clone'
/**
* @public
@ -357,10 +357,10 @@ export abstract class TxProcessor implements WithTx {
return result
}
static createDoc2Doc<T extends Doc>(tx: TxCreateDoc<T>): T {
static createDoc2Doc<T extends Doc>(tx: TxCreateDoc<T>, doClone = true): T {
// eslint-disable-next-line @typescript-eslint/consistent-type-assertions
return {
...justClone(tx.attributes),
...(doClone ? clone(tx.attributes) : tx.attributes),
_id: tx.objectId,
_class: tx.objectClass,
space: tx.objectSpace,

View File

@ -1,35 +0,0 @@
const se = typeof Symbol !== 'undefined'
const ste = se && typeof Symbol.toStringTag !== 'undefined'
export function getTypeOf (obj: any): string {
const typeofObj = typeof obj
if (typeofObj !== 'object') {
return typeofObj
}
if (obj === null) {
return 'null'
}
if (Array.isArray(obj) && (!ste || !(Symbol.toStringTag in obj))) {
return 'Array'
}
const stringTag = ste && obj[Symbol.toStringTag]
if (typeof stringTag === 'string') {
return stringTag
}
const objPrototype = Object.getPrototypeOf(obj)
if (objPrototype === RegExp.prototype) {
return 'RegExp'
}
if (objPrototype === Date.prototype) {
return 'Date'
}
if (objPrototype === null) {
return 'Object'
}
return {}.toString.call(obj).slice(8, -1)
}

View File

@ -41,7 +41,6 @@
"@hcengineering/platform": "^0.6.9",
"@hcengineering/theme": "^0.6.3",
"@hcengineering/core": "^0.6.28",
"just-clone": "~6.2.0",
"svelte": "^4.2.12",
"fast-equals": "^2.0.3",
"autolinker": "4.0.0",

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
import justClone from 'just-clone'
import { clone } from '@hcengineering/core'
import { derived, get, writable } from 'svelte/store'
import { closePopup } from './popups'
import { type Location as PlatformLocation } from './types'
@ -105,7 +105,7 @@ export function getRawCurrentLocation (): PlatformLocation {
}
export function getCurrentResolvedLocation (): PlatformLocation {
return justClone(resolvedLocation)
return clone(resolvedLocation)
}
declare global {
@ -130,7 +130,7 @@ if (!embeddedPlatform) {
})
}
export const location = derived(locationWritable, (loc) => justClone(loc))
export const location = derived(locationWritable, (loc) => clone(loc))
/**
* Unlike {@link location}, exposes raw browser location as seen in URL
@ -143,7 +143,7 @@ export const workspaceId = derived(location, (loc) => loc.path[1])
* @public
*/
export function getLocation (): PlatformLocation {
return justClone(get(location))
return clone(get(location))
}
export const resolvedLocationStore = writable(getRawCurrentLocation())
@ -151,12 +151,12 @@ let resolvedLocation = getRawCurrentLocation()
export function setResolvedLocation (location: PlatformLocation): void {
resolvedLocation = location
resolvedLocationStore.set(justClone(location))
resolvedLocationStore.set(clone(location))
}
export function getCurrentLocation (): PlatformLocation {
if (embeddedPlatform) {
return justClone(get(locationWritable))
return clone(get(locationWritable))
}
return getRawCurrentLocation()
}

View File

@ -128,7 +128,7 @@ class Connection implements ClientConnection {
}
}
delay = 1
delay = 0
pending: Promise<ClientSocket> | undefined
private async waitOpenConnection (): Promise<ClientSocket> {
@ -140,7 +140,7 @@ class Connection implements ClientConnection {
}
this.pending = this.openConnection()
await this.pending
this.delay = 5
this.delay = 0
return await this.pending
} catch (err: any) {
this.pending = undefined
@ -154,7 +154,7 @@ class Connection implements ClientConnection {
setTimeout(() => {
console.log(`delay ${this.delay} second`)
resolve(null)
if (this.delay !== 15) {
if (this.delay < 5) {
this.delay++
}
}, this.delay * SECOND)

View File

@ -18,6 +18,7 @@ import core, {
AccountClient,
ClientConnectEvent,
LoadModelResponse,
MeasureContext,
Tx,
TxHandler,
TxPersistenceStore,
@ -37,6 +38,28 @@ import { connect } from './connection'
export { connect }
let dbRequest: IDBOpenDBRequest | undefined
let dbPromise: Promise<IDBDatabase | undefined> = Promise.resolve(undefined)
if (typeof localStorage !== 'undefined') {
const st = Date.now()
dbPromise = new Promise<IDBDatabase>((resolve) => {
dbRequest = indexedDB.open('model.db.persistence', 2)
dbRequest.onupgradeneeded = function () {
const db = (dbRequest as IDBOpenDBRequest).result
if (!db.objectStoreNames.contains('model')) {
db.createObjectStore('model', { keyPath: 'id' })
}
}
dbRequest.onsuccess = function () {
const db = (dbRequest as IDBOpenDBRequest).result
console.log('init DB complete', Date.now() - st)
resolve(db)
}
})
}
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
export default async () => {
return {
@ -46,7 +69,8 @@ export default async () => {
endpoint: string,
onUpgrade?: () => void,
onUnauthorized?: () => void,
onConnect?: (event: ClientConnectEvent) => void
onConnect?: (event: ClientConnectEvent) => void,
ctx?: MeasureContext
): Promise<AccountClient> => {
const filterModel = getMetadata(clientPlugin.metadata.FilterModel) ?? false
@ -75,7 +99,8 @@ export default async () => {
return connect(url.href, upgradeHandler, onUpgrade, onUnauthorized, onConnect)
},
filterModel ? [...getPlugins(), ...(getMetadata(clientPlugin.metadata.ExtraPlugins) ?? [])] : undefined,
createModelPersistence(getWSFromToken(token))
createModelPersistence(getWSFromToken(token)),
ctx
)
// Check if we had dev hook for client.
client = hookClient(client)
@ -90,25 +115,6 @@ function createModelPersistence (workspace: string): TxPersistenceStore | undefi
return overrideStore
}
let dbRequest: IDBOpenDBRequest | undefined
let dbPromise: Promise<IDBDatabase | undefined> = Promise.resolve(undefined)
if (typeof localStorage !== 'undefined') {
dbPromise = new Promise<IDBDatabase>((resolve) => {
dbRequest = indexedDB.open('model.db.persistence', 2)
dbRequest.onupgradeneeded = function () {
const db = (dbRequest as IDBOpenDBRequest).result
if (!db.objectStoreNames.contains('model')) {
db.createObjectStore('model', { keyPath: 'id' })
}
}
dbRequest.onsuccess = function () {
const db = (dbRequest as IDBOpenDBRequest).result
resolve(db)
}
})
}
return {
load: async () => {
const db = await dbPromise

View File

@ -13,7 +13,7 @@
// limitations under the License.
//
import type { AccountClient, ClientConnectEvent, TxPersistenceStore } from '@hcengineering/core'
import type { AccountClient, ClientConnectEvent, MeasureContext, TxPersistenceStore } from '@hcengineering/core'
import type { Plugin, Resource } from '@hcengineering/platform'
import { Metadata, plugin } from '@hcengineering/platform'
@ -66,7 +66,8 @@ export type ClientFactory = (
endpoint: string,
onUpgrade?: () => void,
onUnauthorized?: () => void,
onConnect?: (event: ClientConnectEvent) => void
onConnect?: (event: ClientConnectEvent) => void,
ctx?: MeasureContext
) => Promise<AccountClient>
export default plugin(clientId, {

View File

@ -3,6 +3,8 @@ import client from '@hcengineering/client'
import core, {
ClientConnectEvent,
getCurrentAccount,
MeasureMetricsContext,
metricsToString,
setCurrentAccount,
versionToString,
type AccountClient,
@ -41,6 +43,7 @@ export async function disconnect (): Promise<void> {
}
export async function connect (title: string): Promise<Client | undefined> {
const ctx = new MeasureMetricsContext('connect', {})
const loc = getCurrentLocation()
const ws = loc.path[1]
if (ws === undefined) {
@ -62,7 +65,7 @@ export async function connect (title: string): Promise<Client | undefined> {
let token = tokens[ws]
if (token === undefined && getMetadata(presentation.metadata.Token) !== undefined) {
const selectWorkspace = await getResource(login.function.SelectWorkspace)
const loginInfo = (await selectWorkspace(ws))[1]
const loginInfo = await ctx.with('select-workspace', {}, async () => (await selectWorkspace(ws))[1])
if (loginInfo !== undefined) {
tokens[ws] = loginInfo.token
token = loginInfo.token
@ -88,8 +91,12 @@ export async function connect (title: string): Promise<Client | undefined> {
if (_token !== token && _client !== undefined) {
// We need to flush all data from memory
await purgeClient()
await _client.close()
await ctx.with('purge-client', {}, async () => {
await purgeClient()
})
await ctx.with('close previous client', {}, async () => {
await _client?.close()
})
_client = undefined
tokenChanged = true
}
@ -104,63 +111,79 @@ export async function connect (title: string): Promise<Client | undefined> {
serverEndpoint = serverEndpoint.substring(0, serverEndpoint.length - 1)
}
const clientFactory = await getResource(client.function.GetClient)
_client = await clientFactory(
token,
endpoint,
() => {
location.reload()
},
() => {
clearMetadata(ws)
navigate({
path: [loginId],
query: {}
})
},
// We need to refresh all active live queries and clear old queries.
(event: ClientConnectEvent) => {
console.log('WorkbenchClient: onConnect', event)
try {
if ((_clientSet && event === ClientConnectEvent.Connected) || event === ClientConnectEvent.Refresh) {
void refreshClient(tokenChanged)
tokenChanged = false
}
if (event === ClientConnectEvent.Upgraded) {
window.location.reload()
}
void (async () => {
if (_client !== undefined) {
const newVersion = await _client.findOne<Version>(core.class.Version, {})
console.log('Reconnect Model version', newVersion)
const currentVersionStr = versionToString(version as Version)
const reconnectVersionStr = versionToString(newVersion as Version)
if (currentVersionStr !== reconnectVersionStr) {
// It seems upgrade happened
// location.reload()
versionError = `${currentVersionStr} != ${reconnectVersionStr}`
const newClient = await ctx.with(
'create-client',
{},
async (ctx) =>
await clientFactory(
token,
endpoint,
() => {
location.reload()
},
() => {
clearMetadata(ws)
navigate({
path: [loginId],
query: {}
})
},
// We need to refresh all active live queries and clear old queries.
(event: ClientConnectEvent) => {
console.log('WorkbenchClient: onConnect', event)
try {
if ((_clientSet && event === ClientConnectEvent.Connected) || event === ClientConnectEvent.Refresh) {
void ctx.with('refresh client', {}, async () => {
await refreshClient(tokenChanged)
})
tokenChanged = false
}
const serverVersion: { version: string } = await (
await fetch(serverEndpoint + '/api/v1/version', {})
).json()
console.log('Server version', serverVersion.version)
if (serverVersion.version !== '' && serverVersion.version !== currentVersionStr) {
versionError = `${currentVersionStr} => ${serverVersion.version}`
if (event === ClientConnectEvent.Upgraded) {
window.location.reload()
}
void (async () => {
if (_client !== undefined) {
const newVersion = await ctx.with(
'find-version',
{},
async () => await newClient.findOne<Version>(core.class.Version, {})
)
console.log('Reconnect Model version', newVersion)
const currentVersionStr = versionToString(version as Version)
const reconnectVersionStr = versionToString(newVersion as Version)
if (currentVersionStr !== reconnectVersionStr) {
// It seems upgrade happened
// location.reload()
versionError = `${currentVersionStr} != ${reconnectVersionStr}`
}
const serverVersion: { version: string } = await ctx.with(
'fetch-server-version',
{},
async () => await (await fetch(serverEndpoint + '/api/v1/version', {})).json()
)
console.log('Server version', serverVersion.version)
if (serverVersion.version !== '' && serverVersion.version !== currentVersionStr) {
versionError = `${currentVersionStr} => ${serverVersion.version}`
}
}
})()
} catch (err) {
console.error(err)
}
})()
} catch (err) {
console.error(err)
}
}
},
ctx
)
)
_client = newClient
console.log('logging in as', email)
const me = await _client?.getAccount()
const me = await ctx.with('get-account', {}, async () => await newClient.getAccount())
if (me !== undefined) {
Analytics.setUser(me.email)
Analytics.setTag('workspace', ws)
@ -176,11 +199,18 @@ export async function connect (title: string): Promise<Client | undefined> {
// Update on connect, so it will be triggered
_clientSet = true
await setClient(_client)
const client = _client
await ctx.with('set-client', {}, async () => {
await setClient(client)
})
return
}
try {
version = await _client.findOne<Version>(core.class.Version, {})
version = await ctx.with(
'find-model-version',
{},
async () => await newClient.findOne<Version>(core.class.Version, {})
)
console.log('Model version', version)
const requiredVersion = getMetadata(presentation.metadata.RequiredVersion)
@ -195,7 +225,11 @@ export async function connect (title: string): Promise<Client | undefined> {
}
try {
const serverVersion: { version: string } = await (await fetch(serverEndpoint + '/api/v1/version', {})).json()
const serverVersion: { version: string } = await ctx.with(
'find-server-version',
{},
async () => await (await fetch(serverEndpoint + '/api/v1/version', {})).json()
)
console.log('Server version', serverVersion.version)
if (
@ -223,10 +257,14 @@ export async function connect (title: string): Promise<Client | undefined> {
// Update window title
document.title = [ws, title].filter((it) => it).join(' - ')
_clientSet = true
await setClient(_client)
await broadcastEvent(plugin.event.NotifyConnection, getCurrentAccount())
return _client
await ctx.with('set-client', {}, async () => {
await setClient(newClient)
})
await ctx.with('broadcast-connected', {}, async () => {
await broadcastEvent(plugin.event.NotifyConnection, getCurrentAccount())
})
console.log(metricsToString(ctx.metrics, 'connect', 50))
return newClient
}
function clearMetadata (ws: string): void {

View File

@ -50,11 +50,6 @@ export interface RawDBAdapter {
* @public
*/
export interface DbAdapter {
/**
* Method called after hierarchy is ready to use.
*/
init: (model: Tx[]) => Promise<void>
createIndexes: (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>) => Promise<void>
removeOldIndex: (domain: Domain, deletePattern: RegExp, keepPattern: RegExp) => Promise<void>
@ -83,7 +78,7 @@ export interface DbAdapter {
* @public
*/
export interface TxAdapter extends DbAdapter {
getModel: () => Promise<Tx[]>
getModel: (ctx: MeasureContext) => Promise<Tx[]>
}
/**

View File

@ -63,7 +63,7 @@ export class FullTextIndex implements WithFind {
readonly indexer: FullTextIndexPipeline,
private readonly upgrade: boolean
) {
if (!upgrade) {
if (!this.upgrade) {
// Schedule indexing after consistency check
void this.indexer.startIndexing()
}

View File

@ -91,13 +91,12 @@ export class FullTextIndexPipeline implements FullTextPipeline {
}
async cancel (): Promise<void> {
console.log(this.workspace.name, 'Cancel indexing', this.indexId)
this.cancelling = true
clearTimeout(this.skippedReiterationTimeout)
this.triggerIndexing()
await this.indexing
await this.flush(true)
console.log(this.workspace.name, 'Indexing canceled', this.indexId)
await this.metrics.info('Cancel indexing', { workspace: this.workspace.name, indexId: this.indexId })
}
async markRemove (doc: DocIndexState): Promise<void> {
@ -336,7 +335,10 @@ export class FullTextIndexPipeline implements FullTextPipeline {
try {
this.hierarchy.getClass(core.class.DocIndexState)
} catch (err: any) {
console.log(this.workspace.name, 'Models is not upgraded to support indexer', this.indexId)
await this.metrics.info('Models is not upgraded to support indexer', {
indexId: this.indexId,
workspace: this.workspace.name
})
return
}
await this.metrics.with('init-states', {}, async () => {
@ -367,12 +369,12 @@ export class FullTextIndexPipeline implements FullTextPipeline {
_classes.forEach((it) => this.broadcastClasses.add(it))
if (this.triggerCounts > 0) {
console.log('No wait, trigger counts', this.triggerCounts)
await this.metrics.info('No wait, trigger counts', { triggerCount: this.triggerCounts })
}
if (this.toIndex.size === 0 && this.stageChanged === 0 && this.triggerCounts === 0) {
if (this.toIndex.size === 0) {
console.log(this.workspace.name, 'Indexing complete', this.indexId)
await this.metrics.info('Indexing complete', { indexId: this.indexId, workspace: this.workspace.name })
}
if (!this.cancelling) {
// We need to send index update event
@ -398,7 +400,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
}
}
}
console.log(this.workspace.name, 'Exit indexer', this.indexId)
await this.metrics.info('Exit indexer', { indexId: this.indexId, workspace: this.workspace.name })
}
private async processIndex (ctx: MeasureContext): Promise<Ref<Class<Doc>>[]> {
@ -470,13 +472,12 @@ export class FullTextIndexPipeline implements FullTextPipeline {
}
if (result.length > 0) {
console.log(
this.workspace.name,
`Full text: Indexing ${this.indexId} ${st.stageId}`,
Object.entries(this.currentStages)
.map((it) => `${it[0]}:${it[1]}`)
.join(' ')
)
await this.metrics.info('Full text: Indexing', {
indexId: this.indexId,
stageId: st.stageId,
workspace: this.workspace.name,
...this.currentStages
})
} else {
// Nothing to index, check on next cycle.
break
@ -528,7 +529,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
}
}
} catch (err: any) {
console.error(err)
await this.metrics.error('error during index', { error: err })
}
}
})

View File

@ -38,7 +38,6 @@ import { type DbAdapter } from './adapter'
* @public
*/
export class DummyDbAdapter implements DbAdapter {
async init (model: Tx[]): Promise<void> {}
async findAll<T extends Doc>(
ctx: MeasureContext,
_class: Ref<Class<T>>,
@ -99,16 +98,6 @@ class InMemoryAdapter extends DummyDbAdapter implements DbAdapter {
async tx (ctx: MeasureContext, ...tx: Tx[]): Promise<TxResult[]> {
return await this.modeldb.tx(...tx)
}
async init (model: Tx[]): Promise<void> {
for (const tx of model) {
try {
await this.modeldb.tx(tx)
} catch (err: any) {
console.error('skip broken TX', err)
}
}
}
}
/**

View File

@ -31,8 +31,8 @@ import {
type Tx,
type TxResult
} from '@hcengineering/core'
import { createServerStorage } from './server'
import { type DbConfiguration } from './configuration'
import { createServerStorage } from './server'
import {
type BroadcastFunc,
type HandledBroadcastFunc,
@ -67,12 +67,12 @@ export async function createPipeline (
}
})
)
const pipeline = ctx.with(
'create pipeline',
{},
async (ctx) => await PipelineImpl.create(ctx, storage, constructors, broadcast)
const pipelineResult = await PipelineImpl.create(
ctx.newChild('pipeline-operations', {}),
storage,
constructors,
broadcast
)
const pipelineResult = await pipeline
broadcastHook = (tx, targets) => {
return pipelineResult.handleBroadcast(tx, targets)
}

View File

@ -56,18 +56,20 @@ export async function createServerStorage (
const storageAdapter = conf.storageFactory?.()
for (const key in conf.adapters) {
const adapterConf = conf.adapters[key]
adapters.set(
key,
await adapterConf.factory(ctx, hierarchy, adapterConf.url, conf.workspace, modelDb, storageAdapter)
)
}
await ctx.with('create-adapters', {}, async (ctx) => {
for (const key in conf.adapters) {
const adapterConf = conf.adapters[key]
adapters.set(
key,
await adapterConf.factory(ctx, hierarchy, adapterConf.url, conf.workspace, modelDb, storageAdapter)
)
}
})
const txAdapter = adapters.get(conf.domains[DOMAIN_TX]) as TxAdapter
const model = await ctx.with('get model', {}, async (ctx) => {
const model = await txAdapter.getModel()
const model = await ctx.with('fetch-model', {}, async (ctx) => await txAdapter.getModel(ctx))
for (const tx of model) {
try {
hierarchy.tx(tx)
@ -76,22 +78,10 @@ export async function createServerStorage (
console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err)
}
}
for (const tx of model) {
try {
await modelDb.tx(tx)
} catch (err: any) {
console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err)
}
}
modelDb.addTxes(ctx, model, false)
return model
})
for (const [adn, adapter] of adapters) {
await ctx.with('init-adapter', { name: adn }, async (ctx) => {
await adapter.init(model)
})
}
const fulltextAdapter = await ctx.with(
'create full text adapter',
{},

View File

@ -60,10 +60,10 @@ import crypto from 'node:crypto'
import { type DbAdapter } from '../adapter'
import { type FullTextIndex } from '../fulltext'
import serverCore from '../plugin'
import { type ServiceAdaptersManager } from '../service'
import { type StorageAdapter } from '../storage'
import { type Triggers } from '../triggers'
import type { FullTextAdapter, ObjectDDParticipant, ServerStorageOptions, TriggerControl } from '../types'
import { type StorageAdapter } from '../storage'
import { type ServiceAdaptersManager } from '../service'
export class TServerStorage implements ServerStorage {
private readonly fulltext: FullTextIndex
@ -137,15 +137,11 @@ export class TServerStorage implements ServerStorage {
}
async close (): Promise<void> {
console.timeLog(this.workspace.name, 'closing')
await this.fulltext.close()
console.timeLog(this.workspace.name, 'closing adapters')
for (const o of this.adapters.values()) {
await o.close()
}
console.timeLog(this.workspace.name, 'closing fulltext')
await this.fulltextAdapter.close()
console.timeLog(this.workspace.name, 'closing service adapters')
await this.serviceAdaptersManager.close()
}
@ -199,7 +195,7 @@ export class TServerStorage implements ServerStorage {
const txCUD = TxProcessor.extractTx(tx) as TxCUD<Doc>
if (!this.hierarchy.isDerived(txCUD._class, core.class.TxCUD)) {
// Skip unsupported tx
console.error('Unsupported transaction', tx)
await ctx.error('Unsupported transaction', tx)
continue
}
const domain = this.hierarchy.getDomain(txCUD.objectClass)
@ -397,7 +393,7 @@ export class TServerStorage implements ServerStorage {
{ clazz, query, options }
)
if (Date.now() - st > 1000) {
console.error('FindAll', Date.now() - st, clazz, query, options)
await ctx.error('FindAll', { time: Date.now() - st, clazz, query, options })
}
return result
}
@ -794,7 +790,7 @@ export class TServerStorage implements ServerStorage {
await fx()
}
} catch (err: any) {
console.log(err)
await ctx.error('error process tx', { error: err })
throw err
} finally {
onEnds.forEach((p) => {

View File

@ -59,8 +59,6 @@ class ElasticDataAdapter implements DbAdapter {
return []
}
async init (model: Tx[]): Promise<void> {}
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {}
async removeOldIndex (domain: Domain, deletePattern: RegExp, keepPattern: RegExp): Promise<void> {}

View File

@ -292,7 +292,9 @@ export function start (
const token = req.query.token as string
const payload = decodeToken(token)
const admin = payload.extra?.admin === 'true'
res.writeHead(200, { 'Content-Type': 'application/json' })
res.status(200)
res.setHeader('Content-Type', 'application/json')
res.setHeader('Cache-Control', cacheControlNoCache)
const json = JSON.stringify({
metrics: metricsAggregate((ctx as any).metrics),
@ -301,7 +303,6 @@ export function start (
},
admin
})
res.set('Cache-Control', 'private, no-cache')
res.end(json)
} catch (err) {
console.error(err)

View File

@ -62,6 +62,8 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar
private spaceMeasureCtx!: MeasureContext
private spaceSecurityInit: Promise<void> | undefined
private readonly systemSpaces = [
core.space.Configuration,
core.space.DerivedTx,
@ -86,7 +88,7 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar
): Promise<SpaceSecurityMiddleware> {
const res = new SpaceSecurityMiddleware(broadcast, storage, next)
res.spaceMeasureCtx = ctx.newChild('space chain', {})
await res.init(res.spaceMeasureCtx)
res.spaceSecurityInit = res.init(res.spaceMeasureCtx)
return res
}
@ -124,6 +126,13 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar
this.publicSpaces = spaces.filter((it) => !it.private).map((p) => p._id)
}
async waitInit (): Promise<void> {
if (this.spaceSecurityInit !== undefined) {
await this.spaceSecurityInit
this.spaceSecurityInit = undefined
}
}
private removeMemberSpace (member: Ref<Account>, space: Ref<Space>): void {
const arr = this.allowedSpaces[member]
if (arr !== undefined) {
@ -240,6 +249,8 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar
}
private async handleUpdate (ctx: SessionContext, tx: TxCUD<Space>): Promise<void> {
await this.waitInit()
const updateDoc = tx as TxUpdateDoc<Space>
if (!this.storage.hierarchy.isDerived(updateDoc.objectClass, core.class.Space)) return
@ -285,6 +296,7 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar
}
private async handleTx (ctx: SessionContext, tx: TxCUD<Space>): Promise<void> {
await this.waitInit()
if (tx._class === core.class.TxCreateDoc) {
this.handleCreate(tx)
} else if (tx._class === core.class.TxUpdateDoc) {
@ -370,6 +382,7 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar
}
async tx (ctx: SessionContext, tx: Tx): Promise<TxMiddlewareResult> {
await this.waitInit()
const account = await getUser(this.storage, ctx)
if (account.role === AccountRole.Guest) {
throw new PlatformError(new Status(Severity.ERROR, platform.status.Forbidden, {}))
@ -385,6 +398,7 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar
handleBroadcast (tx: Tx[], targets?: string[]): Tx[] {
const process = async (): Promise<void> => {
await this.waitInit()
for (const t of tx) {
if (this.storage.hierarchy.isDerived(t._class, core.class.TxCUD)) {
await this.processTxSpaceDomain(t as TxCUD<Doc>)
@ -476,6 +490,8 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> {
await this.waitInit()
const domain = this.storage.hierarchy.getDomain(_class)
const newQuery = query
const account = await getUser(this.storage, ctx)
@ -509,6 +525,7 @@ export class SpaceSecurityMiddleware extends BaseMiddleware implements Middlewar
query: SearchQuery,
options: SearchOptions
): Promise<SearchResult> {
await this.waitInit()
const newQuery = { ...query }
const account = await getUser(this.storage, ctx)
if (!isSystem(account)) {

View File

@ -1264,12 +1264,21 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
return this.txColl
}
async getModel (): Promise<Tx[]> {
const cursor = this.db
.collection(DOMAIN_TX)
.find<Tx>({ objectSpace: core.space.Model })
.sort({ _id: 1, modifiedOn: 1 })
const model = await toArray(cursor)
async getModel (ctx: MeasureContext): Promise<Tx[]> {
const modelProjection = {
'%hash%': 0,
objectSpace: 0,
createdBy: 0,
space: 0
}
const cursor = await ctx.with('find', {}, async () =>
this.db
.collection<Tx>(DOMAIN_TX)
.find({ objectSpace: core.space.Model })
.sort({ _id: 1, modifiedOn: 1 })
.project<Tx>(modelProjection)
)
const model = await ctx.with('to-array', {}, async () => await toArray<Tx>(cursor))
// We need to put all core.account.System transactions first
const systemTx: Tx[] = []
const userTx: Tx[] = []
@ -1284,7 +1293,6 @@ class MongoTxAdapter extends MongoAdapterBase implements TxAdapter {
(tx as TxCUD<Doc>).objectClass === 'contact:class:EmployeeAccount')
)
}
model.forEach((tx) => (tx.modifiedBy === core.account.System && !isPersonAccount(tx) ? systemTx : userTx).push(tx))
return systemTx.concat(userTx)
}

View File

@ -17,7 +17,7 @@ let metricsContext: MeasureContext | undefined
/**
* @public
*/
export function getMetricsContext (): MeasureContext {
export function getMetricsContext (factory?: () => MeasureMetricsContext): MeasureContext {
if (metricsContext !== undefined) {
return metricsContext
}
@ -25,7 +25,11 @@ export function getMetricsContext (): MeasureContext {
console.info('please provide apm server url for monitoring')
const metrics = newMetrics()
metricsContext = new MeasureMetricsContext('System', {}, {}, metrics)
if (factory !== undefined) {
metricsContext = factory()
} else {
metricsContext = new MeasureMetricsContext('System', {}, {}, metrics)
}
if (metricsFile !== undefined || metricsConsole) {
console.info('storing measurements into local file', metricsFile)

View File

@ -55,8 +55,6 @@ class StorageBlobAdapter implements DbAdapter {
return []
}
async init (model: Tx[]): Promise<void> {}
async createIndexes (domain: Domain, config: Pick<IndexingConfiguration<Doc>, 'indexes'>): Promise<void> {}
async removeOldIndex (domain: Domain, deletePattern: RegExp, keepPattern: RegExp): Promise<void> {}

View File

@ -268,7 +268,7 @@ async function fetchModelFromMongo (
const txAdapter = await createMongoTxAdapter(ctx, hierarchy, mongodbUri, workspaceId, modelDb)
const model = await ctx.with('get-model', {}, async () => await txAdapter.getModel())
const model = await ctx.with('get-model', {}, async (ctx) => await txAdapter.getModel(ctx))
await ctx.with('build local model', {}, async () => {
for (const tx of model) {

View File

@ -42,12 +42,9 @@ import {
import { type SessionContext } from '@hcengineering/server-core'
import { ClientSession } from '../client'
import { startHttpServer } from '../server_http'
import { disableLogging } from '../types'
import { genMinModel } from './minmodel'
describe('server', () => {
disableLogging()
async function getModelDb (): Promise<ModelDb> {
const txes = genMinModel()
const hierarchy = new Hierarchy()

View File

@ -48,6 +48,7 @@ import { type BroadcastCall, type Session, type SessionRequest, type StatisticsE
* @public
*/
export class ClientSession implements Session {
createTime = Date.now()
requests = new Map<string, SessionRequest>()
binaryResponseMode: boolean = false
useCompression: boolean = true
@ -81,7 +82,7 @@ export class ClientSession implements Session {
}
async loadModel (ctx: MeasureContext, lastModelTx: Timestamp, hash?: string): Promise<Tx[] | LoadModelResponse> {
return await this._pipeline.storage.loadModel(lastModelTx, hash)
return await ctx.with('load-model', {}, async () => await this._pipeline.storage.loadModel(lastModelTx, hash))
}
async getAccount (ctx: MeasureContext): Promise<Account> {

View File

@ -145,7 +145,7 @@ class TSessionManager implements SessionManager {
const now = Date.now()
const diff = now - s[1].session.lastRequest
if (diff > 60000 && this.ticks % 10 === 0) {
console.log('session hang, closing...', h[0], s[1].session.getUser())
void this.ctx.error('session hang, closing...', { sessionId: h[0], user: s[1].session.getUser() })
void this.close(s[1].socket, h[1].workspaceId, 1001, 'CLIENT_HANGOUT')
continue
}
@ -160,7 +160,11 @@ class TSessionManager implements SessionManager {
for (const r of s[1].session.requests.values()) {
if (now - r.start > 30000) {
console.log(h[0], 'request hang found, 30sec', h[0], s[1].session.getUser(), r.params)
void this.ctx.info('request hang found, 30sec', {
sessionId: h[0],
user: s[1].session.getUser(),
...r.params
})
}
}
}
@ -212,8 +216,9 @@ class TSessionManager implements SessionManager {
return await baseCtx.with('📲 add-session', {}, async (ctx) => {
const wsString = toWorkspaceString(token.workspace, '@')
let workspaceInfo =
let workspaceInfo = await ctx.with('check-token', {}, async (ctx) =>
accountsUrl !== '' ? await this.getWorkspaceInfo(accountsUrl, rawToken) : this.wsFromToken(token)
)
if (workspaceInfo === undefined && token.extra?.admin !== 'true') {
// No access to workspace for token.
return { error: new Error(`No access to workspace for token ${token.email} ${token.workspace.name}`) }
@ -222,6 +227,10 @@ class TSessionManager implements SessionManager {
}
let workspace = this.workspaces.get(wsString)
if (workspace?.closeTimeout !== undefined) {
await ctx.info('Cancel workspace warm close', { wsString })
clearTimeout(workspace?.closeTimeout)
}
await workspace?.closing
workspace = this.workspaces.get(wsString)
if (sessionId !== undefined && workspace?.sessions?.has(sessionId) === true) {
@ -278,7 +287,9 @@ class TSessionManager implements SessionManager {
this.sessions.set(ws.id, { session, socket: ws })
// We need to delete previous session with Id if found.
workspace.sessions.set(session.sessionId, { session, socket: ws })
await ctx.with('set-status', {}, () => this.setStatus(ctx, session, true))
// We do not need to wait for set-status, just return session to client
void ctx.with('set-status', {}, (ctx) => this.setStatus(ctx, session, true))
if (this.timeMinutes > 0) {
void ws.send(
@ -316,7 +327,7 @@ class TSessionManager implements SessionManager {
workspaceName: string
): Promise<Pipeline> {
if (LOGGING_ENABLED) {
console.log(workspaceName, 'reloading workspace', JSON.stringify(token))
await ctx.info('reloading workspace', { workspaceName, token: JSON.stringify(token) })
}
// If upgrade client is used.
// Drop all existing clients
@ -351,12 +362,16 @@ class TSessionManager implements SessionManager {
for (const session of sessions.splice(0, 1)) {
if (targets !== undefined && !targets.includes(session.session.getUser())) continue
for (const _tx of tx) {
void session.socket.send(
ctx,
{ result: _tx },
session.session.binaryResponseMode,
session.session.useCompression
)
try {
void session.socket.send(
ctx,
{ result: _tx },
session.session.binaryResponseMode,
session.session.useCompression
)
} catch (err: any) {
void ctx.error('error during send', { error: err })
}
}
}
if (sessions.length > 0) {
@ -377,11 +392,12 @@ class TSessionManager implements SessionManager {
): Workspace {
const upgrade = token.extra?.model === 'upgrade'
const context = ctx.newChild('🧲 session', {})
const pipelineCtx = context.newChild('🧲 pipeline-factory', {})
const workspace: Workspace = {
context,
id: generateId(),
pipeline: pipelineFactory(
context,
pipelineCtx,
{ ...token.workspace, workspaceUrl, workspaceName },
upgrade,
(tx, targets) => {
@ -393,8 +409,6 @@ class TSessionManager implements SessionManager {
workspaceId: token.workspace,
workspaceName
}
if (LOGGING_ENABLED) console.time(workspaceName)
if (LOGGING_ENABLED) console.timeLog(workspaceName, 'Creating Workspace:', workspace.id)
this.workspaces.set(toWorkspaceString(token.workspace), workspace)
return workspace
}
@ -429,11 +443,12 @@ class TSessionManager implements SessionManager {
}
async close (ws: ConnectionSocket, workspaceId: WorkspaceId, code: number, reason: string): Promise<void> {
// if (LOGGING_ENABLED) console.log(workspaceId.name, `closing websocket, code: ${code}, reason: ${reason}`)
const wsid = toWorkspaceString(workspaceId)
const workspace = this.workspaces.get(wsid)
if (workspace === undefined) {
if (LOGGING_ENABLED) console.error(new Error('internal: cannot find sessions'))
if (LOGGING_ENABLED) {
await this.ctx.error('internal: cannot find sessions', { id: ws.id, workspace: workspaceId.name, code, reason })
}
return
}
const sessionRef = this.sessions.get(ws.id)
@ -458,7 +473,9 @@ class TSessionManager implements SessionManager {
if (!workspace.upgrade) {
// Wait some time for new client to appear before closing workspace.
if (workspace.sessions.size === 0) {
setTimeout(() => {
clearTimeout(workspace.closeTimeout)
void this.ctx.info('schedule warm closing', { workspace: workspace.workspaceName, wsid })
workspace.closeTimeout = setTimeout(() => {
void this.performWorkspaceCloseCheck(workspace, workspaceId, wsid)
}, this.timeouts.shutdownWarmTimeout)
}
@ -469,7 +486,15 @@ class TSessionManager implements SessionManager {
}
async closeAll (wsId: string, workspace: Workspace, code: number, reason: 'upgrade' | 'shutdown'): Promise<void> {
if (LOGGING_ENABLED) console.timeLog(wsId, `closing workspace ${workspace.id}, code: ${code}, reason: ${reason}`)
if (LOGGING_ENABLED) {
await this.ctx.info('closing workspace', {
workspace: workspace.id,
wsName: workspace.workspaceName,
code,
reason,
wsId
})
}
const sessions = Array.from(workspace.sessions)
workspace.sessions = new Map()
@ -484,21 +509,30 @@ class TSessionManager implements SessionManager {
await this.setStatus(workspace.context, s, false)
}
if (LOGGING_ENABLED) console.timeLog(wsId, workspace.id, 'Clients disconnected. Closing Workspace...')
if (LOGGING_ENABLED) {
await this.ctx.info('Clients disconnected. Closing Workspace...', {
wsId,
workspace: workspace.id,
wsName: workspace.workspaceName
})
}
await Promise.all(sessions.map((s) => closeS(s[1].session, s[1].socket)))
const closePipeline = async (): Promise<void> => {
try {
if (LOGGING_ENABLED) console.timeLog(wsId, 'closing pipeline')
await (await workspace.pipeline).close()
if (LOGGING_ENABLED) console.timeLog(wsId, 'closing pipeline done')
await this.ctx.with('close-pipeline', {}, async () => {
await (await workspace.pipeline).close()
})
} catch (err: any) {
console.error(err)
await this.ctx.error('close-pipeline-error', { error: err })
}
}
await Promise.race([closePipeline(), timeoutPromise(15000)])
if (LOGGING_ENABLED) console.timeLog(wsId, 'Workspace closed...')
console.timeEnd(wsId)
await this.ctx.with('closing', {}, async () => {
await Promise.race([closePipeline(), timeoutPromise(15000)])
})
if (LOGGING_ENABLED) {
await this.ctx.info('Workspace closed...', { workspace: workspace.id, wsId, wsName: workspace.workspaceName })
}
}
private async sendUpgrade (ctx: MeasureContext, webSocket: ConnectionSocket, binary: boolean): Promise<void> {
@ -530,31 +564,36 @@ class TSessionManager implements SessionManager {
): Promise<void> {
if (workspace.sessions.size === 0) {
const wsUID = workspace.id
const logParams = { wsid, workspace: workspace.id, wsName: workspaceId.name }
if (LOGGING_ENABLED) {
console.log(workspaceId.name, 'no sessions for workspace', wsid, wsUID)
await this.ctx.info('no sessions for workspace', logParams)
}
const waitAndClose = async (workspace: Workspace): Promise<void> => {
try {
const pl = await workspace.pipeline
await Promise.race([pl, timeoutPromise(60000)])
await Promise.race([pl.close(), timeoutPromise(60000)])
if (workspace.closing === undefined) {
const waitAndClose = async (workspace: Workspace): Promise<void> => {
try {
if (workspace.sessions.size === 0) {
const pl = await workspace.pipeline
await Promise.race([pl, timeoutPromise(60000)])
await Promise.race([pl.close(), timeoutPromise(60000)])
if (this.workspaces.get(wsid)?.id === wsUID) {
if (this.workspaces.get(wsid)?.id === wsUID) {
this.workspaces.delete(wsid)
}
workspace.context.end()
if (LOGGING_ENABLED) {
await this.ctx.info('Closed workspace', logParams)
}
}
} catch (err: any) {
this.workspaces.delete(wsid)
}
workspace.context.end()
if (LOGGING_ENABLED) {
console.timeLog(workspaceId.name, 'Closed workspace', wsUID)
}
} catch (err: any) {
this.workspaces.delete(wsid)
if (LOGGING_ENABLED) {
console.error(workspaceId.name, err)
if (LOGGING_ENABLED) {
await this.ctx.error('failed', { ...logParams, error: err })
}
}
}
workspace.closing = waitAndClose(workspace)
}
workspace.closing = waitAndClose(workspace)
await workspace.closing
}
}
@ -562,13 +601,22 @@ class TSessionManager implements SessionManager {
broadcast (from: Session | null, workspaceId: WorkspaceId, resp: Response<any>, target?: string[]): void {
const workspace = this.workspaces.get(toWorkspaceString(workspaceId))
if (workspace === undefined) {
console.error(new Error('internal: cannot find sessions'))
void this.ctx.error('internal: cannot find sessions', {
workspaceId: workspaceId.name,
target,
userId: from?.getUser() ?? '$unknown'
})
return
}
if (workspace?.upgrade ?? false) {
return
}
if (LOGGING_ENABLED) console.log(workspaceId.name, `server broadcasting to ${workspace.sessions.size} clients...`)
if (LOGGING_ENABLED) {
void this.ctx.info('server broadcasting to clients...', {
workspace: workspaceId.name,
count: workspace.sessions.size
})
}
const sessions = [...workspace.sessions.values()]
const ctx = this.ctx.newChild('📭 broadcast', {})
@ -627,19 +675,14 @@ class TSessionManager implements SessionManager {
service.useBroadcast = hello.broadcast ?? false
if (LOGGING_ENABLED) {
console.timeLog(
workspace,
'hello happen',
service.getUser(),
'binary:',
service.binaryResponseMode,
'compression:',
service.useCompression,
'workspace users:',
this.workspaces.get(workspace)?.sessions?.size,
'total users:',
this.sessions.size
)
await ctx.info('hello happen', {
user: service.getUser(),
binary: service.binaryResponseMode,
compression: service.useCompression,
timeToHello: Date.now() - service.createTime,
workspaceUsers: this.workspaces.get(workspace)?.sessions?.size,
totalUsers: this.sessions.size
})
}
const helloResponse: HelloResponse = {
id: -1,
@ -684,7 +727,9 @@ class TSessionManager implements SessionManager {
service.useCompression
)
} catch (err: any) {
if (LOGGING_ENABLED) console.error(err)
if (LOGGING_ENABLED) {
await this.ctx.error('error handle request', { error: err, request })
}
const resp: Response<any> = {
id: request.id,
error: unknownError(err),
@ -726,7 +771,9 @@ class TSessionManager implements SessionManager {
service.useCompression
)
} catch (err: any) {
if (LOGGING_ENABLED) console.error(err)
if (LOGGING_ENABLED) {
await ctx.error('error handle measure', { error: err, request })
}
const resp: Response<any> = {
id: request.id,
error: unknownError(err),

View File

@ -47,7 +47,9 @@ export function startHttpServer (
enableCompression: boolean,
accountsUrl: string
): () => Promise<void> {
if (LOGGING_ENABLED) console.log(`starting server on port ${port} ...`)
if (LOGGING_ENABLED) {
void ctx.info('starting server on', { port, productId, enableCompression, accountsUrl })
}
const app = express()
app.use(cors())
@ -209,21 +211,27 @@ export function startHttpServer (
)
if ('upgrade' in session || 'error' in session) {
if ('error' in session) {
console.error(session.error)
void ctx.error('error', { error: session.error })
}
cs.close()
return
}
// eslint-disable-next-line @typescript-eslint/no-misused-promises
ws.on('message', (msg: RawData) => {
let buff: any | undefined
if (msg instanceof Buffer) {
buff = msg?.toString()
} else if (Array.isArray(msg)) {
buff = Buffer.concat(msg).toString()
}
if (buff !== undefined) {
void handleRequest(session.context, session.session, cs, buff, session.workspaceName)
try {
let buff: any | undefined
if (msg instanceof Buffer) {
buff = msg?.toString()
} else if (Array.isArray(msg)) {
buff = Buffer.concat(msg).toString()
}
if (buff !== undefined) {
void handleRequest(session.context, session.session, cs, buff, session.workspaceName)
}
} catch (err: any) {
if (LOGGING_ENABLED) {
void ctx.error('message error', err)
}
}
})
// eslint-disable-next-line @typescript-eslint/no-misused-promises
@ -251,12 +259,17 @@ export function startHttpServer (
const sessionId = url.searchParams.get('sessionId')
if (payload.workspace.productId !== productId) {
if (LOGGING_ENABLED) {
void ctx.error('invalid product', { required: payload.workspace.productId, productId })
}
throw new Error('Invalid workspace product')
}
wss.handleUpgrade(request, socket, head, (ws) => wss.emit('connection', ws, request, payload, token, sessionId))
} catch (err) {
if (LOGGING_ENABLED) console.error('invalid token', err)
} catch (err: any) {
if (LOGGING_ENABLED) {
void ctx.error('invalid token', err)
}
wss.handleUpgrade(request, socket, head, (ws) => {
const resp: Response<any> = {
id: -1,
@ -274,7 +287,9 @@ export function startHttpServer (
}
})
httpServer.on('error', (err) => {
if (LOGGING_ENABLED) console.error('server error', err)
if (LOGGING_ENABLED) {
void ctx.error('server error', err)
}
})
httpServer.listen(port)

View File

@ -35,6 +35,7 @@ export interface StatisticsElement {
* @public
*/
export interface Session {
createTime: number
getUser: () => string
pipeline: () => Pipeline
ping: () => Promise<string>
@ -117,7 +118,9 @@ export interface Workspace {
pipeline: Promise<Pipeline>
sessions: Map<string, { session: Session, socket: ConnectionSocket }>
upgrade: boolean
closing?: Promise<void>
closeTimeout?: any
workspaceId: WorkspaceId
workspaceName: string