diff --git a/dev/tool/src/benchmark.ts b/dev/tool/src/benchmark.ts index 8d6da2ed94..3f1e967fc3 100644 --- a/dev/tool/src/benchmark.ts +++ b/dev/tool/src/benchmark.ts @@ -227,7 +227,7 @@ export async function benchmark ( requestTime = (r?.value ?? 0) / (((r?.operations as number) ?? 0) + 1) - const tr = extract(json.metrics as Metrics, '🧲 session', 'client', 'handleRequest', '#send-data') + const tr = extract(json.metrics as Metrics, '🧲 session', '#send-data') transfer = (tr?.value ?? 0) - oldTransfer oldTransfer = tr?.value ?? 0 }) diff --git a/packages/core/src/client.ts b/packages/core/src/client.ts index 148b33ddc0..2c68ac8f36 100644 --- a/packages/core/src/client.ts +++ b/packages/core/src/client.ts @@ -148,10 +148,8 @@ class ClientImpl implements AccountClient, BackupClient { this.hierarchy.tx(tx) await this.model.tx(tx) } - // We need to handle it on server, before performing local live query updates. - const result = await this.conn.tx(tx) - return result + return await this.conn.tx(tx) } async updateFromRemote (...tx: Tx[]): Promise { diff --git a/packages/core/src/measurements/metrics.ts b/packages/core/src/measurements/metrics.ts index f87dd470e8..e77e2fcd82 100644 --- a/packages/core/src/measurements/metrics.ts +++ b/packages/core/src/measurements/metrics.ts @@ -102,7 +102,7 @@ export function childMetrics (root: Metrics, path: string[]): Metrics { * @public */ export function metricsAggregate (m: Metrics, limit: number = -1): Metrics { - const ms = aggregateMetrics(m.measurements, limit) + let ms = aggregateMetrics(m.measurements, limit) // Use child overage, if there is no top level value specified. const me = Object.entries(ms) @@ -127,6 +127,7 @@ export function metricsAggregate (m: Metrics, limit: number = -1): Metrics { break } } + ms = newMs } } diff --git a/packages/core/src/query.ts b/packages/core/src/query.ts index d42a81011c..703e8e9a24 100644 --- a/packages/core/src/query.ts +++ b/packages/core/src/query.ts @@ -156,7 +156,7 @@ export function matchQuery ( hierarchy: Hierarchy, skipLookup: boolean = false ): Doc[] { - let result = [...docs] + let result = docs for (const key in query) { if (skipLookup && key.startsWith('$lookup.')) { continue diff --git a/packages/core/src/utils.ts b/packages/core/src/utils.ts index 4d185dc29c..c567ddcc16 100644 --- a/packages/core/src/utils.ts +++ b/packages/core/src/utils.ts @@ -46,6 +46,7 @@ import { TxOperations } from './operations' import { isPredicate } from './predicate' import { DocumentQuery, FindResult } from './storage' import { DOMAIN_TX } from './tx' +import { DOMAIN_BENCHMARK } from './benchmark' function toHex (value: number, chars: number): string { const result = value.toString(16) @@ -712,7 +713,8 @@ export function isClassIndexable (hierarchy: Hierarchy, c: Ref>): boo domain === DOMAIN_TX || domain === DOMAIN_MODEL || domain === DOMAIN_BLOB || - domain === DOMAIN_TRANSIENT + domain === DOMAIN_TRANSIENT || + domain === DOMAIN_BENCHMARK ) { hierarchy.setClassifierProp(c, 'class_indexed', false) return false diff --git a/plugins/client-resources/src/connection.ts b/plugins/client-resources/src/connection.ts index f513eccb41..b5a5a84ef1 100644 --- a/plugins/client-resources/src/connection.ts +++ b/plugins/client-resources/src/connection.ts @@ -42,7 +42,8 @@ import core, { } from '@hcengineering/core' import { PlatformError, UNAUTHORIZED, broadcastEvent, getMetadata, unknownError } from '@hcengineering/platform' -import { HelloRequest, HelloResponse, ReqId, readResponse, serialize, type Response } from '@hcengineering/rpc' +import { HelloRequest, HelloResponse, ReqId, type Response } from '@hcengineering/rpc' +import { RPCHandler } from '@hcengineering/rpc' const SECOND = 1000 const pingTimeout = 10 * SECOND @@ -91,6 +92,8 @@ class Connection implements ClientConnection { private pingResponse: number = Date.now() + rpcHandler = new RPCHandler() + constructor ( private readonly url: string, private readonly handler: TxHandler, @@ -408,11 +411,11 @@ class Connection implements ClientConnection { } if (event.data instanceof Blob) { void event.data.arrayBuffer().then((data) => { - const resp = readResponse(data, this.binaryMode) + const resp = this.rpcHandler.readResponse(data, this.binaryMode) this.handleMsg(socketId, resp) }) } else { - const resp = readResponse(event.data, this.binaryMode) + const resp = this.rpcHandler.readResponse(event.data, this.binaryMode) this.handleMsg(socketId, resp) } } @@ -441,7 +444,7 @@ class Connection implements ClientConnection { binary: useBinary, compression: useCompression } - this.websocket?.send(serialize(helloRequest, false)) + this.websocket?.send(this.rpcHandler.serialize(helloRequest, false)) } wsocket.onerror = (event: any) => { @@ -475,8 +478,9 @@ class Connection implements ClientConnection { if (data.once === true) { // Check if has same request already then skip + const dparams = JSON.stringify(data.params) for (const [, v] of this.requests) { - if (v.method === data.method && JSON.stringify(v.params) === JSON.stringify(data.params)) { + if (v.method === data.method && JSON.stringify(v.params) === dparams) { // We have same unanswered, do not add one more. return } @@ -495,8 +499,9 @@ class Connection implements ClientConnection { const sendData = async (): Promise => { if (this.websocket?.readyState === ClientSocketReadyState.OPEN) { promise.startTime = Date.now() + this.websocket?.send( - serialize( + this.rpcHandler.serialize( { method: data.method, params: data.params, diff --git a/plugins/devmodel-resources/src/index.ts b/plugins/devmodel-resources/src/index.ts index c5129ea47f..3a77f3166d 100644 --- a/plugins/devmodel-resources/src/index.ts +++ b/plugins/devmodel-resources/src/index.ts @@ -13,7 +13,7 @@ // limitations under the License. // -import { +import core, { DOMAIN_MODEL, cutObjectArray, type Class, @@ -56,10 +56,13 @@ export class PresentationClientHook implements ClientHook { addTxListener((...tx) => { if (this.notifyEnabled) { - console.debug( - 'devmodel# notify=>', - testing ? JSON.stringify(cutObjectArray(tx)).slice(0, 160) : tx.length === 1 ? tx[0] : tx - ) + const rtx = tx.filter((tx) => (tx as any).objectClass !== core.class.BenchmarkDoc) + if (rtx.length > 0) { + console.debug( + 'devmodel# notify=>', + testing ? JSON.stringify(cutObjectArray(rtx)).slice(0, 160) : rtx.length === 1 ? rtx[0] : tx + ) + } } }) } @@ -152,7 +155,7 @@ export class PresentationClientHook implements ClientHook { async tx (client: Client, tx: Tx): Promise { const startTime = Date.now() const result = await client.tx(tx) - if (this.notifyEnabled) { + if (this.notifyEnabled && (tx as any).objectClass !== core.class.BenchmarkDoc) { console.debug( 'devmodel# tx=>', testing ? JSON.stringify(cutObjectArray(tx)).slice(0, 160) : tx, diff --git a/plugins/workbench-resources/src/components/ServerManagerGeneral.svelte b/plugins/workbench-resources/src/components/ServerManagerGeneral.svelte index 6e47b37892..4e4d38a31b 100644 --- a/plugins/workbench-resources/src/components/ServerManagerGeneral.svelte +++ b/plugins/workbench-resources/src/components/ServerManagerGeneral.svelte @@ -1,7 +1,8 @@ {#if isAdminUser()} @@ -34,6 +117,33 @@ +
+
+ Command benchmark {avgTime / opss} + {maxTime} - {active} +
+
+
+ total +
+
+ parallel +
+
+ dsize +
+
+ rsize +
+
+
2.
diff --git a/server-plugins/activity-resources/src/references.ts b/server-plugins/activity-resources/src/references.ts index 0c33147172..9a65a2f6f5 100644 --- a/server-plugins/activity-resources/src/references.ts +++ b/server-plugins/activity-resources/src/references.ts @@ -628,8 +628,8 @@ function guessReferenceTx (hierarchy: Hierarchy, tx: TxCUD): TxCUD { return tx } -async function ActivityReferenceCreate (tx: TxCUD, control: TriggerControl): Promise { - const ctx = TxProcessor.extractTx(tx) as TxCreateDoc +async function ActivityReferenceCreate (tx: TxCUD, etx: TxCUD, control: TriggerControl): Promise { + const ctx = etx as TxCreateDoc if (ctx._class !== core.class.TxCreateDoc) return [] if (control.hierarchy.isDerived(ctx.objectClass, notification.class.InboxNotification)) return [] @@ -658,8 +658,8 @@ async function ActivityReferenceCreate (tx: TxCUD, control: TriggerControl) return [] } -async function ActivityReferenceUpdate (tx: TxCUD, control: TriggerControl): Promise { - const ctx = TxProcessor.extractTx(tx) as TxUpdateDoc +async function ActivityReferenceUpdate (tx: TxCUD, etx: TxCUD, control: TriggerControl): Promise { + const ctx = etx as TxUpdateDoc const attributes = control.hierarchy.getAllAttributes(ctx.objectClass) let hasUpdates = false @@ -705,8 +705,8 @@ async function ActivityReferenceUpdate (tx: TxCUD, control: TriggerControl) return [] } -async function ActivityReferenceRemove (tx: Tx, control: TriggerControl): Promise { - const ctx = TxProcessor.extractTx(tx) as TxRemoveDoc +async function ActivityReferenceRemove (tx: Tx, etx: TxCUD, control: TriggerControl): Promise { + const ctx = etx as TxRemoveDoc const attributes = control.hierarchy.getAllAttributes(ctx.objectClass) let hasMarkdown = false @@ -741,13 +741,13 @@ export async function ReferenceTrigger (tx: TxCUD, control: TriggerControl) if (control.hierarchy.isDerived(etx.objectClass, notification.class.InboxNotification)) return [] if (etx._class === core.class.TxCreateDoc) { - result.push(...(await ActivityReferenceCreate(tx, control))) + result.push(...(await ActivityReferenceCreate(tx, etx, control))) } if (etx._class === core.class.TxUpdateDoc) { - result.push(...(await ActivityReferenceUpdate(tx, control))) + result.push(...(await ActivityReferenceUpdate(tx, etx, control))) } if (etx._class === core.class.TxRemoveDoc) { - result.push(...(await ActivityReferenceRemove(tx, control))) + result.push(...(await ActivityReferenceRemove(tx, etx, control))) } return result } diff --git a/server/core/src/benchmark/index.ts b/server/core/src/benchmark/index.ts index d38e938dea..52560a198b 100644 --- a/server/core/src/benchmark/index.ts +++ b/server/core/src/benchmark/index.ts @@ -16,6 +16,7 @@ import core, { generateId, toFindResult, + TxProcessor, type BenchmarkDoc, type Class, type Doc, @@ -27,6 +28,9 @@ import core, { type ModelDb, type Ref, type Space, + type Tx, + type TxCreateDoc, + type TxResult, type WorkspaceId } from '@hcengineering/core' import type { DbAdapter } from '../adapter' @@ -85,6 +89,30 @@ class BenchmarkDbAdapter extends DummyDbAdapter { return toFindResult(result as T[]) } + + async tx (ctx: MeasureContext, ...tx: Tx[]): Promise { + if (benchData === '') { + benchData = genData(1024 * 1024) + } + for (const t of tx) { + if (t._class === core.class.TxCreateDoc) { + const doc = TxProcessor.createDoc2Doc(t as TxCreateDoc) + const request = doc.request + + if (request?.size != null) { + const dataSize = + typeof request.size === 'number' ? request.size : request.size.from + Math.random() * request.size.to + return [ + { + response: benchData.slice(0, dataSize) + } + ] + } + } + } + + return [{}] + } } /** * @public diff --git a/server/core/src/fulltext.ts b/server/core/src/fulltext.ts index d643c9fd36..358c6fc0e9 100644 --- a/server/core/src/fulltext.ts +++ b/server/core/src/fulltext.ts @@ -128,7 +128,6 @@ export class FullTextIndex implements WithFind { await ctx.with('queue', {}, async (ctx) => { await this.indexer.queue(ctx, stDocs) }) - this.indexer.triggerIndexing() return {} } diff --git a/server/core/src/server/domainHelper.ts b/server/core/src/server/domainHelper.ts index e5b6892a15..919caf25ff 100644 --- a/server/core/src/server/domainHelper.ts +++ b/server/core/src/server/domainHelper.ts @@ -9,7 +9,7 @@ import type { ModelDb, WorkspaceId } from '@hcengineering/core' -import core, { DOMAIN_MODEL, IndexKind, IndexOrder } from '@hcengineering/core' +import core, { DOMAIN_BENCHMARK, DOMAIN_MODEL, IndexKind, IndexOrder } from '@hcengineering/core' import { deepEqual } from 'fast-equals' import type { DomainHelper, DomainHelperOperations } from '../adapter' @@ -38,7 +38,7 @@ export class DomainIndexHelperImpl implements DomainHelper { for (const c of classes) { try { const domain = hierarchy.findDomain(c._id) - if (domain === undefined || domain === DOMAIN_MODEL) { + if (domain === undefined || domain === DOMAIN_MODEL || domain === DOMAIN_BENCHMARK) { continue } const attrs = hierarchy.getAllAttributes(c._id) @@ -103,7 +103,7 @@ export class DomainIndexHelperImpl implements DomainHelper { try { const has50Documents = await operations.hasDocuments(domain, 50) const allIndexes = (await operations.listIndexes(domain)).filter((it) => it.name !== '_id_') - console.log('check indexes', domain, has50Documents) + ctx.info('check indexes', { domain, has50Documents }) if (has50Documents) { for (const vv of [...(domainInfo?.values() ?? []), ...(cfg?.indexes ?? [])]) { try { diff --git a/server/core/src/server/storage.ts b/server/core/src/server/storage.ts index d0f718ff0a..65dcd97652 100644 --- a/server/core/src/server/storage.ts +++ b/server/core/src/server/storage.ts @@ -174,7 +174,7 @@ export class TServerStorage implements ServerStorage { await this.fulltext.close() for (const [domain, info] of this.domainInfo.entries()) { if (info.checkPromise !== undefined) { - console.log('wait for check domain', domain) + this.metrics.info('wait for check domain', { domain }) // We need to be sure we wait for check to be complete await info.checkPromise } @@ -218,21 +218,22 @@ export class TServerStorage implements ServerStorage { } private async routeTx (ctx: MeasureContext, removedDocs: Map, Doc>, ...txes: Tx[]): Promise { - let part: TxCUD[] = [] - let lastDomain: Domain | undefined const result: TxResult[] = [] - const processPart = async (): Promise => { - if (part.length > 0) { + + const domainGroups = new Map[]>() + + const processPart = async (domain: Domain, txes: TxCUD[]): Promise => { + if (txes.length > 0) { // Find all deleted documents - const adapter = this.getAdapter(lastDomain as Domain, true) - const toDelete = part.filter((it) => it._class === core.class.TxRemoveDoc).map((it) => it.objectId) + const adapter = this.getAdapter(domain, true) + const toDelete = txes.filter((it) => it._class === core.class.TxRemoveDoc).map((it) => it.objectId) if (toDelete.length > 0) { const toDeleteDocs = await ctx.with( 'adapter-load', - { domain: lastDomain }, - async () => await adapter.load(ctx, lastDomain as Domain, toDelete) + { domain }, + async () => await adapter.load(ctx, domain, toDelete) ) for (const ddoc of toDeleteDocs) { @@ -240,18 +241,18 @@ export class TServerStorage implements ServerStorage { } } - const r = await ctx.with('adapter-tx', { domain: lastDomain }, async (ctx) => await adapter.tx(ctx, ...part)) + const r = await ctx.with('adapter-tx', { domain }, async (ctx) => await adapter.tx(ctx, ...txes)) // Update server live queries. - await this.liveQuery.tx(...part) + await this.liveQuery.tx(...txes) if (Array.isArray(r)) { result.push(...r) } else { result.push(r) } - part = [] } } + for (const tx of txes) { const txCUD = TxProcessor.extractTx(tx) as TxCUD if (!this.hierarchy.isDerived(txCUD._class, core.class.TxCUD)) { @@ -260,18 +261,17 @@ export class TServerStorage implements ServerStorage { continue } const domain = this.hierarchy.getDomain(txCUD.objectClass) - if (part.length > 0) { - if (lastDomain !== domain) { - await processPart() - } - lastDomain = domain - part.push(txCUD) - } else { - lastDomain = domain - part.push(txCUD) + + let group = domainGroups.get(domain) + if (group === undefined) { + group = [] + domainGroups.set(domain, group) } + group.push(txCUD) + } + for (const [domain, txes] of domainGroups.entries()) { + await processPart(domain, txes) } - await processPart() return result } @@ -664,7 +664,6 @@ export class TServerStorage implements ServerStorage { classes.add((etx as TxCUD).objectClass) } } - console.log('Broadcasting compact bulk', derived.length) const bevent = createBroadcastEvent(Array.from(classes)) this.options.broadcast([bevent], target, exclude) } @@ -678,7 +677,6 @@ export class TServerStorage implements ServerStorage { await sendWithPart(derived, target, exclude) } else { // Let's send after our response will go out - console.log('Broadcasting', derived.length, derived.length) this.options.broadcast(derived, target, exclude) } } @@ -879,7 +877,11 @@ export class TServerStorage implements ServerStorage { if (!this.hierarchy.isDerived(tx._class, core.class.TxApplyIf)) { if (tx.space !== core.space.DerivedTx) { if (this.hierarchy.isDerived(tx._class, core.class.TxCUD)) { - if (this.hierarchy.findDomain((tx as TxCUD).objectClass) !== DOMAIN_TRANSIENT) { + const objectClass = (tx as TxCUD).objectClass + if ( + objectClass !== core.class.BenchmarkDoc && + this.hierarchy.findDomain(objectClass) !== DOMAIN_TRANSIENT + ) { txToStore.push(tx) } } else { @@ -944,10 +946,17 @@ export class TServerStorage implements ServerStorage { await this.triggers.tx(tx) await this.modelDb.tx(tx) } - await ctx.with('domain-tx', {}, async (ctx) => await this.getAdapter(DOMAIN_TX, true).tx(ctx.ctx, ...txToStore), { - count: txToStore.length - }) - result.push(...(await ctx.with('apply', {}, (ctx) => this.routeTx(ctx.ctx, removedMap, ...txToProcess))), { + if (txToStore.length > 0) { + await ctx.with( + 'domain-tx', + {}, + async (ctx) => await this.getAdapter(DOMAIN_TX, true).tx(ctx.ctx, ...txToStore), + { + count: txToStore.length + } + ) + } + result.push(...(await ctx.with('routeTx', {}, (ctx) => this.routeTx(ctx.ctx, removedMap, ...txToProcess))), { count: txToProcess.length }) @@ -955,14 +964,17 @@ export class TServerStorage implements ServerStorage { derived.push(...(await this.processDerived(ctx, txToProcess, _findAll, removedMap))) // index object - await ctx.with( - 'fulltext-tx', - {}, - async (ctx) => { - await this.fulltext.tx(ctx.ctx, [...txToProcess, ...derived]) - }, - { count: txToProcess.length + derived.length } - ) + const ftx = [...txToProcess, ...derived] + if (ftx.length > 0) { + await ctx.with( + 'fulltext-tx', + {}, + async (ctx) => { + await this.fulltext.tx(ctx.ctx, ftx) + }, + { count: txToProcess.length + derived.length } + ) + } } catch (err: any) { ctx.ctx.error('error process tx', { error: err }) Analytics.handleError(err) diff --git a/server/core/src/triggers.ts b/server/core/src/triggers.ts index 22c398a3f2..bc2232ec5e 100644 --- a/server/core/src/triggers.ts +++ b/server/core/src/triggers.ts @@ -107,7 +107,7 @@ export class Triggers { ...ctrl, operationContext: ctx, ctx: ctx.ctx, - txFactory: new TxFactory(core.account.System, true), + txFactory: null as any, // Will be set later findAll: async (clazz, query, options) => await ctrl.findAllCtx(ctx.ctx, clazz, query, options), apply: async (tx, needResult) => { apply.push(...tx) @@ -195,9 +195,9 @@ export class Triggers { return } if (typeof q[key] === 'string') { - const descendants = this.hierarchy.getDescendants(q[key]) + const descendants = this.hierarchy.getDescendants(q[key] as Ref>) q[key] = { - $in: [...(q[key].$in ?? []), ...descendants] + $in: [q[key] as Ref>, ...descendants] } } else { if (Array.isArray(q[key].$in)) { diff --git a/server/rpc/src/rpc.ts b/server/rpc/src/rpc.ts index 6c2201b76f..d4df2e8a6b 100644 --- a/server/rpc/src/rpc.ts +++ b/server/rpc/src/rpc.ts @@ -16,8 +16,6 @@ import platform, { PlatformError, Severity, Status } from '@hcengineering/platform' import { Packr } from 'msgpackr' -const packr = new Packr({ structuredClone: true, bundleStrings: true, copyBuffers: true }) - /** * @public */ @@ -49,79 +47,6 @@ export interface HelloResponse extends Response { reconnect?: boolean } -/** - * Response object define a server response on transaction request. - * Also used to inform other clients about operations being performed by server. - * - * @public - */ -export interface Response { - result?: R - id?: ReqId - error?: Status - chunk?: { - index: number - final: boolean - } - time?: number // Server time to perform operation - bfst?: number // Server time to perform operation - queue?: number -} - -/** - * @public - * @param object - - * @returns - */ -export function protoSerialize (object: object, binary: boolean): any { - if (!binary) { - return JSON.stringify(object, replacer) - } - return new Uint8Array(packr.pack(object)) -} - -/** - * @public - * @param data - - * @returns - */ -export function protoDeserialize (data: any, binary: boolean): any { - if (!binary) { - let _data = data - if (_data instanceof ArrayBuffer) { - const decoder = new TextDecoder() - _data = decoder.decode(_data) - } - return JSON.parse(_data.toString(), receiver) - } - return packr.unpack(new Uint8Array(data)) -} - -/** - * @public - * @param object - - * @returns - */ -export function serialize (object: Request | Response, binary: boolean): any { - if ((object as any).result !== undefined) { - ;(object as any).result = replacer('result', (object as any).result) - } - return protoSerialize(object, binary) -} - -/** - * @public - * @param response - - * @returns - */ -export function readResponse (response: any, binary: boolean): Response { - const data = protoDeserialize(response, binary) - if (data.result !== undefined) { - data.result = receiver('result', data.result) - } - return data -} - function replacer (key: string, value: any): any { if (Array.isArray(value) && ((value as any).total !== undefined || (value as any).lookupMap !== undefined)) { return { @@ -145,16 +70,82 @@ function receiver (key: string, value: any): any { } /** + * Response object define a server response on transaction request. + * Also used to inform other clients about operations being performed by server. + * * @public - * @param request - - * @returns */ -export function readRequest

(request: any, binary: boolean): Request

{ - const result: Request

= protoDeserialize(request, binary) - if (typeof result.method !== 'string') { - throw new PlatformError(new Status(Severity.ERROR, platform.status.BadRequest, {})) +export interface Response { + result?: R + id?: ReqId + error?: Status + chunk?: { + index: number + final: boolean + } + time?: number // Server time to perform operation + bfst?: number // Server time to perform operation + queue?: number +} + +export class RPCHandler { + packr = new Packr({ structuredClone: true, bundleStrings: true, copyBuffers: true }) + protoSerialize (object: object, binary: boolean): any { + if (!binary) { + return JSON.stringify(object, replacer) + } + return new Uint8Array(this.packr.pack(object)) + } + + protoDeserialize (data: any, binary: boolean): any { + if (!binary) { + let _data = data + if (_data instanceof ArrayBuffer) { + const decoder = new TextDecoder() + _data = decoder.decode(_data) + } + return JSON.parse(_data.toString(), receiver) + } + return this.packr.unpack(new Uint8Array(data)) + } + + /** + * @public + * @param object - + * @returns + */ + serialize (object: Request | Response, binary: boolean): any { + if ((object as any).result !== undefined) { + ;(object as any).result = replacer('result', (object as any).result) + } + return this.protoSerialize(object, binary) + } + + /** + * @public + * @param response - + * @returns + */ + readResponse(response: any, binary: boolean): Response { + const data = this.protoDeserialize(response, binary) + if (data.result !== undefined) { + data.result = receiver('result', data.result) + } + return data + } + + /** + * @public + * @param request - + * @returns + */ + readRequest

(request: any, binary: boolean): Request

{ + const result: Request

= this.protoDeserialize(request, binary) + if (typeof result.method !== 'string') { + throw new PlatformError(new Status(Severity.ERROR, platform.status.BadRequest, {})) + } + return result } - return result } /** diff --git a/server/server/src/starter.ts b/server/server/src/starter.ts index 8d87611c7f..0faabb2051 100644 --- a/server/server/src/starter.ts +++ b/server/server/src/starter.ts @@ -18,7 +18,7 @@ export interface ServerEnv { export function serverConfigFromEnv (): ServerEnv { const serverPort = parseInt(process.env.SERVER_PORT ?? '3333') - const enableCompression = (process.env.ENABLE_COMPRESSION ?? 'true') === 'true' + const enableCompression = (process.env.ENABLE_COMPRESSION ?? 'false') === 'true' const url = process.env.MONGO_URL if (url === undefined) { diff --git a/server/tool/src/index.ts b/server/tool/src/index.ts index 01b0fd4536..aa6c6da530 100644 --- a/server/tool/src/index.ts +++ b/server/tool/src/index.ts @@ -18,6 +18,7 @@ import core, { BackupClient, Branding, Client as CoreClient, + DOMAIN_BENCHMARK, DOMAIN_MIGRATION, DOMAIN_MODEL, DOMAIN_TRANSIENT, @@ -480,7 +481,7 @@ async function createUpdateIndexes ( let completed = 0 const allDomains = connection.getHierarchy().domains() for (const domain of allDomains) { - if (domain === DOMAIN_MODEL || domain === DOMAIN_TRANSIENT) { + if (domain === DOMAIN_MODEL || domain === DOMAIN_TRANSIENT || domain === DOMAIN_BENCHMARK) { continue } const result = await domainHelper.checkDomain(ctx, domain, false, dbHelper) diff --git a/server/ws/src/__tests__/server.test.ts b/server/ws/src/__tests__/server.test.ts index 3c99b2e51a..be06d7a935 100644 --- a/server/ws/src/__tests__/server.test.ts +++ b/server/ws/src/__tests__/server.test.ts @@ -15,7 +15,7 @@ // import { UNAUTHORIZED } from '@hcengineering/platform' -import { readResponse, serialize } from '@hcengineering/rpc' +import { RPCHandler } from '@hcengineering/rpc' import { generateToken } from '@hcengineering/server-token' import WebSocket from 'ws' import { start } from '../server' @@ -45,6 +45,7 @@ import { startHttpServer } from '../server_http' import { genMinModel } from './minmodel' describe('server', () => { + const handler = new RPCHandler() async function getModelDb (): Promise { const txes = genMinModel() const hierarchy = new Hierarchy() @@ -116,7 +117,7 @@ describe('server', () => { conn.close(1000) }) conn.on('message', (msg: string) => { - const resp = readResponse(msg, false) + const resp = handler.readResponse(msg, false) expect(resp.result === 'hello') expect(resp.error?.code).toBe(UNAUTHORIZED.code) conn.close(1000) @@ -132,12 +133,12 @@ describe('server', () => { // const start = Date.now() conn.on('open', () => { for (let i = 0; i < total; i++) { - conn.send(serialize({ method: 'tx', params: [], id: i }, false)) + conn.send(handler.serialize({ method: 'tx', params: [], id: i }, false)) } }) let received = 0 conn.on('message', (msg: string) => { - readResponse(msg, false) + handler.readResponse(msg, false) if (++received === total) { // console.log('resp:', resp, ' Time: ', Date.now() - start) conn.close(1000) @@ -199,8 +200,8 @@ describe('server', () => { timeoutPromise, new Promise((resolve) => { newConn.on('open', () => { - newConn.send(serialize({ method: 'hello', params: [], id: -1 }, false)) - newConn.send(serialize({ method: 'findAll', params: [], id: -1 }, false)) + newConn.send(handler.serialize({ method: 'hello', params: [], id: -1 }, false)) + newConn.send(handler.serialize({ method: 'findAll', params: [], id: -1 }, false)) resolve(null) }) }) @@ -216,13 +217,13 @@ describe('server', () => { newConn.on('message', (msg: Buffer) => { try { console.log('resp:', msg.toString()) - const parsedMsg = readResponse(msg.toString(), false) // Hello + const parsedMsg = handler.readResponse(msg.toString(), false) // Hello if (!helloReceived) { expect(parsedMsg.result === 'hello') helloReceived = true return } - responseMsg = readResponse(msg.toString(), false) // our message + responseMsg = handler.readResponse(msg.toString(), false) // our message resolve(null) } catch (err: any) { console.error(err) diff --git a/server/ws/src/client.ts b/server/ws/src/client.ts index 9739a415a5..6919bccb97 100644 --- a/server/ws/src/client.ts +++ b/server/ws/src/client.ts @@ -37,11 +37,13 @@ import core, { import { SessionContextImpl, createBroadcastEvent, type Pipeline } from '@hcengineering/server-core' import { type Token } from '@hcengineering/server-token' import { type ClientSessionCtx, type Session, type SessionRequest, type StatisticsElement } from './types' +import { RPCHandler } from '@hcengineering/rpc' /** * @public */ export class ClientSession implements Session { + handler = new RPCHandler() createTime = Date.now() requests = new Map() binaryMode: boolean = false @@ -256,7 +258,6 @@ export class ClientSession implements Session { await this.sendWithPart(derived, ctx, target, exclude) } else { // Let's send after our response will go out - console.log('Broadcasting', derived.length, derived.length) await ctx.send(derived, target, exclude) } } @@ -315,7 +316,6 @@ export class ClientSession implements Session { classes.add((etx as TxCUD).objectClass) } } - console.log('Broadcasting compact bulk', derived.length) const bevent = createBroadcastEvent(Array.from(classes)) await ctx.send([bevent], target, exclude) } diff --git a/server/ws/src/server.ts b/server/ws/src/server.ts index 6b335a048f..9b5b1ed49a 100644 --- a/server/ws/src/server.ts +++ b/server/ws/src/server.ts @@ -502,12 +502,6 @@ class TSessionManager implements SessionManager { if (workspace?.upgrade ?? false) { return } - if (LOGGING_ENABLED) { - this.ctx.info('server broadcasting to clients...', { - workspace: workspaceId.name, - count: workspace.sessions.size - }) - } const sessions = [...workspace.sessions.values()] const ctx = this.ctx.newChild('📭 broadcast', {}) @@ -824,7 +818,7 @@ class TSessionManager implements SessionManager { void userCtx.with(`🧭 ${backupMode ? 'handleBackup' : 'handleRequest'}`, {}, async (ctx) => { if (request.time != null) { const delta = Date.now() - request.time - userCtx.measure('receive msg', delta) + requestCtx.measure('msg-receive-delta', delta) } const wsRef = this.workspaces.get(workspace) if (wsRef === undefined) { @@ -843,7 +837,7 @@ class TSessionManager implements SessionManager { let done = false if (wsRef.upgrade) { done = true - console.log('FORCE CLOSE', workspace) + this.ctx.warn('FORCE CLOSE', { workspace }) // In case of upgrade, we need to force close workspace not in interval handler await this.forceClose(workspace, ws) } @@ -882,12 +876,12 @@ class TSessionManager implements SessionManager { binary: service.binaryMode, reconnect } - await ws.send(ctx, helloResponse, false, false) + await ws.send(requestCtx, helloResponse, false, false) return } const opContext = (ctx: MeasureContext): ClientSessionCtx => ({ sendResponse: async (msg) => { - await sendResponse(ctx, service, ws, { + await sendResponse(requestCtx, service, ws, { id: request.id, result: msg, time: Date.now() - st, diff --git a/server/ws/src/server_http.ts b/server/ws/src/server_http.ts index 94757c8a30..1f86b81f20 100644 --- a/server/ws/src/server_http.ts +++ b/server/ws/src/server_http.ts @@ -16,7 +16,7 @@ import { Analytics } from '@hcengineering/analytics' import { generateId, toWorkspaceString, type MeasureContext } from '@hcengineering/core' import { UNAUTHORIZED } from '@hcengineering/platform' -import { serialize, type Response } from '@hcengineering/rpc' +import { RPCHandler, type Response } from '@hcengineering/rpc' import { decodeToken, type Token } from '@hcengineering/server-token' import compression from 'compression' import cors from 'cors' @@ -109,7 +109,7 @@ export function startHttpServer ( res.end(json) } catch (err: any) { Analytics.handleError(err) - console.error(err) + ctx.error('error', { err }) res.writeHead(404, {}) res.end() } @@ -158,7 +158,7 @@ export function startHttpServer ( res.end() } catch (err: any) { Analytics.handleError(err) - console.error(err) + ctx.error('error', { err }) res.writeHead(404, {}) res.end() } @@ -313,16 +313,21 @@ export function startHttpServer ( // eslint-disable-next-line @typescript-eslint/no-misused-promises ws.on('message', (msg: RawData) => { try { - let buff: any | undefined + let buff: Buffer | undefined if (msg instanceof Buffer) { - buff = Buffer.copyBytesFrom(msg) + buff = msg } else if (Array.isArray(msg)) { - buff = Buffer.copyBytesFrom(Buffer.concat(msg)) + buff = Buffer.concat(msg) } if (buff !== undefined) { - doSessionOp(webSocketData, (s) => { - processRequest(s.session, cs, s.context, s.workspaceId, buff, handleRequest) - }) + doSessionOp( + webSocketData, + (s, buff) => { + s.context.measure('receive-data', buff?.length ?? 0) + processRequest(s.session, cs, s.context, s.workspaceId, buff, handleRequest) + }, + buff + ) } } catch (err: any) { Analytics.handleError(err) @@ -333,18 +338,26 @@ export function startHttpServer ( }) // eslint-disable-next-line @typescript-eslint/no-misused-promises ws.on('close', async (code: number, reason: Buffer) => { - doSessionOp(webSocketData, (s) => { - if (!(s.session.workspaceClosed ?? false)) { - // remove session after 1seconds, give a time to reconnect. - void sessions.close(ctx, cs, toWorkspaceString(token.workspace)) - } - }) + doSessionOp( + webSocketData, + (s) => { + if (!(s.session.workspaceClosed ?? false)) { + // remove session after 1seconds, give a time to reconnect. + void sessions.close(ctx, cs, toWorkspaceString(token.workspace)) + } + }, + Buffer.from('') + ) }) ws.on('error', (err) => { - doSessionOp(webSocketData, (s) => { - console.error(s.session.getUser(), 'error', err) - }) + doSessionOp( + webSocketData, + (s) => { + ctx.error('error', { err, user: s.session.getUser() }) + }, + Buffer.from('') + ) }) } wss.on('connection', handleConnection as any) @@ -372,18 +385,19 @@ export function startHttpServer ( if (LOGGING_ENABLED) { ctx.error('invalid token', err) } + const handler = new RPCHandler() wss.handleUpgrade(request, socket, head, (ws) => { const resp: Response = { id: -1, error: UNAUTHORIZED, result: 'hello' } - ws.send(serialize(resp, false), { binary: false }) + ws.send(handler.serialize(resp, false), { binary: false }) ws.onmessage = (msg) => { const resp: Response = { error: UNAUTHORIZED } - ws.send(serialize(resp, false), { binary: false }) + ws.send(handler.serialize(resp, false), { binary: false }) } }) } @@ -419,6 +433,7 @@ function createWebsocketClientSocket ( ws: WebSocket, data: { remoteAddress: string, userAgent: string, language: string, email: string, mode: any, model: any } ): ConnectionSocket { + const handler = new RPCHandler() const cs: ConnectionSocket = { id: generateId(), isClosed: false, @@ -426,34 +441,31 @@ function createWebsocketClientSocket ( cs.isClosed = true ws.close() }, + readRequest: (buffer: Buffer, binary: boolean) => { + return handler.readRequest(buffer, binary) + }, data: () => data, send: async (ctx: MeasureContext, msg, binary, compression) => { - const smsg = serialize(msg, binary) + const sst = Date.now() + const smsg = handler.serialize(msg, binary) + ctx.measure('serialize', Date.now() - sst) ctx.measure('send-data', smsg.length) - await new Promise((resolve, reject) => { - const doSend = (): void => { - if (ws.readyState !== ws.OPEN || cs.isClosed) { - return + const st = Date.now() + if (ws.readyState !== ws.OPEN || cs.isClosed) { + return + } + if (ws.bufferedAmount > 16 * 1024) { + ctx.measure('send-bufferAmmount', 1) + } + ws.send(smsg, { binary: true, compress: compression }, (err) => { + if (err != null) { + if (!`${err.message}`.includes('WebSocket is not open')) { + ctx.error('send error', { err }) + Analytics.handleError(err) } - if (ws.bufferedAmount > 16 * 1024) { - setTimeout(doSend) - return - } - ws.send(smsg, { binary: true, compress: compression }, (err) => { - if (err != null) { - if (!`${err.message}`.includes('WebSocket is not open')) { - ctx.error('send error', { err }) - Analytics.handleError(err) - reject(err) - } - // In case socket not open, just resolve - resolve() - } - resolve() - }) } - doSend() + ctx.measure('msg-send-delta', Date.now() - st) }) return smsg.length } diff --git a/server/ws/src/server_u.ts b/server/ws/src/server_u.ts index a50d1b59bf..2dae4318a3 100644 --- a/server/ws/src/server_u.ts +++ b/server/ws/src/server_u.ts @@ -20,7 +20,7 @@ import { generateId, toWorkspaceString, type MeasureContext } from '@hcengineeri import { decodeToken } from '@hcengineering/server-token' import { Analytics } from '@hcengineering/analytics' -import { serialize } from '@hcengineering/rpc' +import { RPCHandler } from '@hcengineering/rpc' import { getStatistics, wipeStatistics } from './stats' import { LOGGING_ENABLED, @@ -157,23 +157,28 @@ export function startUWebsocketServer ( }, message: (ws, message, isBinary) => { const data = ws.getUserData() - const msg = Buffer.copyBytesFrom(Buffer.from(message)) + const msg = Buffer.from(message) - doSessionOp(data, (s) => { - processRequest( - s.session, - data.connectionSocket as ConnectionSocket, - s.context, - s.workspaceId, - msg, - handleRequest - ) - }) + doSessionOp( + data, + (s, msg) => { + processRequest( + s.session, + data.connectionSocket as ConnectionSocket, + s.context, + s.workspaceId, + msg, + handleRequest + ) + }, + msg + ) }, drain: (ws) => { const data = ws.getUserData() while (data.unsendMsg.length > 0) { - if (ws.send(data.unsendMsg[0].data, data.unsendMsg[0].binary, data.unsendMsg[0].compression) !== 1) { + const sendResult = ws.send(data.unsendMsg[0].data, data.unsendMsg[0].binary, data.unsendMsg[0].compression) + if (sendResult !== 2) { ctx.measure('send-data', data.unsendMsg[0].data.length) data.unsendMsg.shift() @@ -185,19 +190,25 @@ export function startUWebsocketServer ( return } } + data.backPressureResolve?.() + data.backPressure = undefined }, close: (ws, code, message) => { const data = ws.getUserData() - doSessionOp(data, (s) => { - if (!(s.session.workspaceClosed ?? false)) { - // remove session after 1seconds, give a time to reconnect. - void sessions.close( - ctx, - data.connectionSocket as ConnectionSocket, - toWorkspaceString(data.payload.workspace) - ) - } - }) + doSessionOp( + data, + (s) => { + if (!(s.session.workspaceClosed ?? false)) { + // remove session after 1seconds, give a time to reconnect. + void sessions.close( + ctx, + data.connectionSocket as ConnectionSocket, + toWorkspaceString(data.payload.workspace) + ) + } + }, + Buffer.from('') + ) } }) .any('/api/v1/statistics', (response, request) => { @@ -363,6 +374,7 @@ function createWebSocketClientSocket ( ws: uWebSockets.WebSocket, data: WebsocketUserData ): ConnectionSocket { + const handler = new RPCHandler() const cs: ConnectionSocket = { id: generateId(), isClosed: false, @@ -375,9 +387,14 @@ function createWebSocketClientSocket ( // Ignore closed } }, + readRequest: (buffer: Buffer, binary: boolean) => { + return handler.readRequest(buffer, binary) + }, send: async (ctx, msg, binary, compression): Promise => { - await data.backPressure - const serialized = serialize(msg, binary) + if (data.backPressure !== undefined) { + await data.backPressure + } + const serialized = handler.serialize(msg, binary) try { const sendR = ws.send(serialized, binary, compression) if (sendR === 2) { diff --git a/server/ws/src/types.ts b/server/ws/src/types.ts index 5e2c4ab496..2d33744bd4 100644 --- a/server/ws/src/types.ts +++ b/server/ws/src/types.ts @@ -104,6 +104,8 @@ export interface ConnectionSocket { close: () => void send: (ctx: MeasureContext, msg: Response, binary: boolean, compression: boolean) => Promise data: () => Record + + readRequest: (buffer: Buffer, binary: boolean) => Request } /** diff --git a/server/ws/src/utils.ts b/server/ws/src/utils.ts index 3dde20a315..b1e66fddd7 100644 --- a/server/ws/src/utils.ts +++ b/server/ws/src/utils.ts @@ -1,5 +1,5 @@ import { toFindResult, type FindResult, type MeasureContext } from '@hcengineering/core' -import { readRequest, type Response } from '@hcengineering/rpc' +import { type Response } from '@hcengineering/rpc' import type { Token } from '@hcengineering/server-token' import type { AddSessionActive, AddSessionResponse, ConnectionSocket, HandleRequestFunction, Session } from './types' @@ -11,17 +11,23 @@ export interface WebsocketData { url: string } -export function doSessionOp (data: WebsocketData, op: (session: AddSessionActive) => void): void { +export function doSessionOp ( + data: WebsocketData, + op: (session: AddSessionActive, msg: Buffer) => void, + msg: Buffer +): void { if (data.session instanceof Promise) { + // We need to copy since we will out of protected buffer area + const msgCopy = Buffer.copyBytesFrom(msg) void data.session.then((_session) => { data.session = _session if ('session' in _session) { - op(_session) + op(_session, msgCopy) } }) } else { if (data.session !== undefined && 'session' in data.session) { - op(data.session) + op(data.session, msg) } } } @@ -34,8 +40,19 @@ export function processRequest ( buff: any, handleRequest: HandleRequestFunction ): void { - const request = readRequest(buff, session.binaryMode) - handleRequest(context, session, cs, request, workspaceId) + const st = Date.now() + try { + const request = cs.readRequest(buff, session.binaryMode) + const ed = Date.now() + context.measure('deserialize', ed - st) + handleRequest(context, session, cs, request, workspaceId) + } catch (err: any) { + if (err.message === 'Data read, but end of buffer not reached 123') { + // ignore it + } else { + throw err + } + } } export async function sendResponse (