Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-07-10 13:17:00 +07:00 committed by GitHub
parent ded0b286ba
commit 5c4f0c55cb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 443 additions and 250 deletions

View File

@ -227,7 +227,7 @@ export async function benchmark (
requestTime = (r?.value ?? 0) / (((r?.operations as number) ?? 0) + 1) requestTime = (r?.value ?? 0) / (((r?.operations as number) ?? 0) + 1)
const tr = extract(json.metrics as Metrics, '🧲 session', 'client', 'handleRequest', '#send-data') const tr = extract(json.metrics as Metrics, '🧲 session', '#send-data')
transfer = (tr?.value ?? 0) - oldTransfer transfer = (tr?.value ?? 0) - oldTransfer
oldTransfer = tr?.value ?? 0 oldTransfer = tr?.value ?? 0
}) })

View File

@ -148,10 +148,8 @@ class ClientImpl implements AccountClient, BackupClient {
this.hierarchy.tx(tx) this.hierarchy.tx(tx)
await this.model.tx(tx) await this.model.tx(tx)
} }
// We need to handle it on server, before performing local live query updates. // We need to handle it on server, before performing local live query updates.
const result = await this.conn.tx(tx) return await this.conn.tx(tx)
return result
} }
async updateFromRemote (...tx: Tx[]): Promise<void> { async updateFromRemote (...tx: Tx[]): Promise<void> {

View File

@ -102,7 +102,7 @@ export function childMetrics (root: Metrics, path: string[]): Metrics {
* @public * @public
*/ */
export function metricsAggregate (m: Metrics, limit: number = -1): Metrics { export function metricsAggregate (m: Metrics, limit: number = -1): Metrics {
const ms = aggregateMetrics(m.measurements, limit) let ms = aggregateMetrics(m.measurements, limit)
// Use child overage, if there is no top level value specified. // Use child overage, if there is no top level value specified.
const me = Object.entries(ms) const me = Object.entries(ms)
@ -127,6 +127,7 @@ export function metricsAggregate (m: Metrics, limit: number = -1): Metrics {
break break
} }
} }
ms = newMs
} }
} }

View File

@ -156,7 +156,7 @@ export function matchQuery<T extends Doc> (
hierarchy: Hierarchy, hierarchy: Hierarchy,
skipLookup: boolean = false skipLookup: boolean = false
): Doc[] { ): Doc[] {
let result = [...docs] let result = docs
for (const key in query) { for (const key in query) {
if (skipLookup && key.startsWith('$lookup.')) { if (skipLookup && key.startsWith('$lookup.')) {
continue continue

View File

@ -46,6 +46,7 @@ import { TxOperations } from './operations'
import { isPredicate } from './predicate' import { isPredicate } from './predicate'
import { DocumentQuery, FindResult } from './storage' import { DocumentQuery, FindResult } from './storage'
import { DOMAIN_TX } from './tx' import { DOMAIN_TX } from './tx'
import { DOMAIN_BENCHMARK } from './benchmark'
function toHex (value: number, chars: number): string { function toHex (value: number, chars: number): string {
const result = value.toString(16) const result = value.toString(16)
@ -712,7 +713,8 @@ export function isClassIndexable (hierarchy: Hierarchy, c: Ref<Class<Doc>>): boo
domain === DOMAIN_TX || domain === DOMAIN_TX ||
domain === DOMAIN_MODEL || domain === DOMAIN_MODEL ||
domain === DOMAIN_BLOB || domain === DOMAIN_BLOB ||
domain === DOMAIN_TRANSIENT domain === DOMAIN_TRANSIENT ||
domain === DOMAIN_BENCHMARK
) { ) {
hierarchy.setClassifierProp(c, 'class_indexed', false) hierarchy.setClassifierProp(c, 'class_indexed', false)
return false return false

View File

@ -42,7 +42,8 @@ import core, {
} from '@hcengineering/core' } from '@hcengineering/core'
import { PlatformError, UNAUTHORIZED, broadcastEvent, getMetadata, unknownError } from '@hcengineering/platform' import { PlatformError, UNAUTHORIZED, broadcastEvent, getMetadata, unknownError } from '@hcengineering/platform'
import { HelloRequest, HelloResponse, ReqId, readResponse, serialize, type Response } from '@hcengineering/rpc' import { HelloRequest, HelloResponse, ReqId, type Response } from '@hcengineering/rpc'
import { RPCHandler } from '@hcengineering/rpc'
const SECOND = 1000 const SECOND = 1000
const pingTimeout = 10 * SECOND const pingTimeout = 10 * SECOND
@ -91,6 +92,8 @@ class Connection implements ClientConnection {
private pingResponse: number = Date.now() private pingResponse: number = Date.now()
rpcHandler = new RPCHandler()
constructor ( constructor (
private readonly url: string, private readonly url: string,
private readonly handler: TxHandler, private readonly handler: TxHandler,
@ -408,11 +411,11 @@ class Connection implements ClientConnection {
} }
if (event.data instanceof Blob) { if (event.data instanceof Blob) {
void event.data.arrayBuffer().then((data) => { void event.data.arrayBuffer().then((data) => {
const resp = readResponse<any>(data, this.binaryMode) const resp = this.rpcHandler.readResponse<any>(data, this.binaryMode)
this.handleMsg(socketId, resp) this.handleMsg(socketId, resp)
}) })
} else { } else {
const resp = readResponse<any>(event.data, this.binaryMode) const resp = this.rpcHandler.readResponse<any>(event.data, this.binaryMode)
this.handleMsg(socketId, resp) this.handleMsg(socketId, resp)
} }
} }
@ -441,7 +444,7 @@ class Connection implements ClientConnection {
binary: useBinary, binary: useBinary,
compression: useCompression compression: useCompression
} }
this.websocket?.send(serialize(helloRequest, false)) this.websocket?.send(this.rpcHandler.serialize(helloRequest, false))
} }
wsocket.onerror = (event: any) => { wsocket.onerror = (event: any) => {
@ -475,8 +478,9 @@ class Connection implements ClientConnection {
if (data.once === true) { if (data.once === true) {
// Check if has same request already then skip // Check if has same request already then skip
const dparams = JSON.stringify(data.params)
for (const [, v] of this.requests) { for (const [, v] of this.requests) {
if (v.method === data.method && JSON.stringify(v.params) === JSON.stringify(data.params)) { if (v.method === data.method && JSON.stringify(v.params) === dparams) {
// We have same unanswered, do not add one more. // We have same unanswered, do not add one more.
return return
} }
@ -495,8 +499,9 @@ class Connection implements ClientConnection {
const sendData = async (): Promise<void> => { const sendData = async (): Promise<void> => {
if (this.websocket?.readyState === ClientSocketReadyState.OPEN) { if (this.websocket?.readyState === ClientSocketReadyState.OPEN) {
promise.startTime = Date.now() promise.startTime = Date.now()
this.websocket?.send( this.websocket?.send(
serialize( this.rpcHandler.serialize(
{ {
method: data.method, method: data.method,
params: data.params, params: data.params,

View File

@ -13,7 +13,7 @@
// limitations under the License. // limitations under the License.
// //
import { import core, {
DOMAIN_MODEL, DOMAIN_MODEL,
cutObjectArray, cutObjectArray,
type Class, type Class,
@ -56,11 +56,14 @@ export class PresentationClientHook implements ClientHook {
addTxListener((...tx) => { addTxListener((...tx) => {
if (this.notifyEnabled) { if (this.notifyEnabled) {
const rtx = tx.filter((tx) => (tx as any).objectClass !== core.class.BenchmarkDoc)
if (rtx.length > 0) {
console.debug( console.debug(
'devmodel# notify=>', 'devmodel# notify=>',
testing ? JSON.stringify(cutObjectArray(tx)).slice(0, 160) : tx.length === 1 ? tx[0] : tx testing ? JSON.stringify(cutObjectArray(rtx)).slice(0, 160) : rtx.length === 1 ? rtx[0] : tx
) )
} }
}
}) })
} }
@ -152,7 +155,7 @@ export class PresentationClientHook implements ClientHook {
async tx (client: Client, tx: Tx): Promise<TxResult> { async tx (client: Client, tx: Tx): Promise<TxResult> {
const startTime = Date.now() const startTime = Date.now()
const result = await client.tx(tx) const result = await client.tx(tx)
if (this.notifyEnabled) { if (this.notifyEnabled && (tx as any).objectClass !== core.class.BenchmarkDoc) {
console.debug( console.debug(
'devmodel# tx=>', 'devmodel# tx=>',
testing ? JSON.stringify(cutObjectArray(tx)).slice(0, 160) : tx, testing ? JSON.stringify(cutObjectArray(tx)).slice(0, 160) : tx,

View File

@ -1,7 +1,8 @@
<script lang="ts"> <script lang="ts">
import core, { RateLimiter } from '@hcengineering/core'
import login from '@hcengineering/login' import login from '@hcengineering/login'
import { getEmbeddedLabel, getMetadata } from '@hcengineering/platform' import { getEmbeddedLabel, getMetadata } from '@hcengineering/platform'
import presentation, { isAdminUser } from '@hcengineering/presentation' import presentation, { getClient, isAdminUser } from '@hcengineering/presentation'
import { Button, IconArrowRight, fetchMetadataLocalStorage } from '@hcengineering/ui' import { Button, IconArrowRight, fetchMetadataLocalStorage } from '@hcengineering/ui'
import EditBox from '@hcengineering/ui/src/components/EditBox.svelte' import EditBox from '@hcengineering/ui/src/components/EditBox.svelte'
@ -13,6 +14,88 @@
endpoint = endpoint.substring(0, endpoint.length - 1) endpoint = endpoint.substring(0, endpoint.length - 1)
} }
let warningTimeout = 15 let warningTimeout = 15
let commandsToSend = 1000
let commandsToSendParallel = 1
let running = false
let maxTime = 0
let avgTime = 0
let rps = 0
let active = 0
let opss = 0
let dataSize = 0
let responseSize = 0
function genData (dataSize: number): string {
let result = ''
const characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
const charactersLength = characters.length
for (let i = 0; i < dataSize; i++) {
result += characters.charAt(Math.floor(Math.random() * charactersLength))
}
return result
}
async function doBenchmark (): Promise<void> {
avgTime = 0
maxTime = 0
let count = commandsToSend
let ops = 0
avgTime = 0
opss = 0
const int = setInterval(() => {
rps = ops
ops = 0
}, 1000)
const rate = new RateLimiter(commandsToSendParallel)
const doOp = async () => {
const st = Date.now()
active++
await getClient().createDoc(core.class.BenchmarkDoc, core.space.Configuration, {
source: genData(dataSize),
request: {
documents: 1,
size: responseSize
}
})
active--
const ed = Date.now()
if (ed - st > maxTime) {
maxTime = ed - st
}
if (avgTime !== 0) {
avgTime += ed - st
} else {
avgTime = ed - st
}
ops++
opss++
count--
}
// eslint-disable-next-line no-unmodified-loop-condition
while (count > 0 && running) {
if (commandsToSendParallel > 0) {
await rate.add(async () => {
await doOp()
})
} else {
await doOp()
}
}
await rate.waitProcessing()
running = false
clearInterval(int)
}
</script> </script>
{#if isAdminUser()} {#if isAdminUser()}
@ -34,6 +117,33 @@
</div> </div>
</div> </div>
</div> </div>
<div class="flex-col p-1">
<div class="flex-row-center p-1">
Command benchmark {avgTime / opss}
{maxTime} - {active}
</div>
<div class="flex-row-center p-1">
<div class="flex-row-center p-1">
<EditBox kind={'underline'} format={'number'} bind:value={commandsToSend} /> total
</div>
<div class="flex-row-center p-1">
<EditBox kind={'underline'} format={'number'} bind:value={commandsToSendParallel} /> parallel
</div>
<div class="flex-row-center p-1">
<EditBox kind={'underline'} format={'number'} bind:value={dataSize} /> dsize
</div>
<div class="flex-row-center p-1">
<EditBox kind={'underline'} format={'number'} bind:value={responseSize} /> rsize
</div>
<Button
label={getEmbeddedLabel('Benchmark')}
on:click={() => {
running = !running
void doBenchmark()
}}
/>
</div>
</div>
<div class="flex-row-center p-1"> <div class="flex-row-center p-1">
<div class="p-3">2.</div> <div class="p-3">2.</div>

View File

@ -628,8 +628,8 @@ function guessReferenceTx (hierarchy: Hierarchy, tx: TxCUD<Doc>): TxCUD<Doc> {
return tx return tx
} }
async function ActivityReferenceCreate (tx: TxCUD<Doc>, control: TriggerControl): Promise<Tx[]> { async function ActivityReferenceCreate (tx: TxCUD<Doc>, etx: TxCUD<Doc>, control: TriggerControl): Promise<Tx[]> {
const ctx = TxProcessor.extractTx(tx) as TxCreateDoc<Doc> const ctx = etx as TxCreateDoc<Doc>
if (ctx._class !== core.class.TxCreateDoc) return [] if (ctx._class !== core.class.TxCreateDoc) return []
if (control.hierarchy.isDerived(ctx.objectClass, notification.class.InboxNotification)) return [] if (control.hierarchy.isDerived(ctx.objectClass, notification.class.InboxNotification)) return []
@ -658,8 +658,8 @@ async function ActivityReferenceCreate (tx: TxCUD<Doc>, control: TriggerControl)
return [] return []
} }
async function ActivityReferenceUpdate (tx: TxCUD<Doc>, control: TriggerControl): Promise<Tx[]> { async function ActivityReferenceUpdate (tx: TxCUD<Doc>, etx: TxCUD<Doc>, control: TriggerControl): Promise<Tx[]> {
const ctx = TxProcessor.extractTx(tx) as TxUpdateDoc<Doc> const ctx = etx as TxUpdateDoc<Doc>
const attributes = control.hierarchy.getAllAttributes(ctx.objectClass) const attributes = control.hierarchy.getAllAttributes(ctx.objectClass)
let hasUpdates = false let hasUpdates = false
@ -705,8 +705,8 @@ async function ActivityReferenceUpdate (tx: TxCUD<Doc>, control: TriggerControl)
return [] return []
} }
async function ActivityReferenceRemove (tx: Tx, control: TriggerControl): Promise<Tx[]> { async function ActivityReferenceRemove (tx: Tx, etx: TxCUD<Doc>, control: TriggerControl): Promise<Tx[]> {
const ctx = TxProcessor.extractTx(tx) as TxRemoveDoc<Doc> const ctx = etx as TxRemoveDoc<Doc>
const attributes = control.hierarchy.getAllAttributes(ctx.objectClass) const attributes = control.hierarchy.getAllAttributes(ctx.objectClass)
let hasMarkdown = false let hasMarkdown = false
@ -741,13 +741,13 @@ export async function ReferenceTrigger (tx: TxCUD<Doc>, control: TriggerControl)
if (control.hierarchy.isDerived(etx.objectClass, notification.class.InboxNotification)) return [] if (control.hierarchy.isDerived(etx.objectClass, notification.class.InboxNotification)) return []
if (etx._class === core.class.TxCreateDoc) { if (etx._class === core.class.TxCreateDoc) {
result.push(...(await ActivityReferenceCreate(tx, control))) result.push(...(await ActivityReferenceCreate(tx, etx, control)))
} }
if (etx._class === core.class.TxUpdateDoc) { if (etx._class === core.class.TxUpdateDoc) {
result.push(...(await ActivityReferenceUpdate(tx, control))) result.push(...(await ActivityReferenceUpdate(tx, etx, control)))
} }
if (etx._class === core.class.TxRemoveDoc) { if (etx._class === core.class.TxRemoveDoc) {
result.push(...(await ActivityReferenceRemove(tx, control))) result.push(...(await ActivityReferenceRemove(tx, etx, control)))
} }
return result return result
} }

View File

@ -16,6 +16,7 @@
import core, { import core, {
generateId, generateId,
toFindResult, toFindResult,
TxProcessor,
type BenchmarkDoc, type BenchmarkDoc,
type Class, type Class,
type Doc, type Doc,
@ -27,6 +28,9 @@ import core, {
type ModelDb, type ModelDb,
type Ref, type Ref,
type Space, type Space,
type Tx,
type TxCreateDoc,
type TxResult,
type WorkspaceId type WorkspaceId
} from '@hcengineering/core' } from '@hcengineering/core'
import type { DbAdapter } from '../adapter' import type { DbAdapter } from '../adapter'
@ -85,6 +89,30 @@ class BenchmarkDbAdapter extends DummyDbAdapter {
return toFindResult<T>(result as T[]) return toFindResult<T>(result as T[])
} }
async tx (ctx: MeasureContext, ...tx: Tx[]): Promise<TxResult[]> {
if (benchData === '') {
benchData = genData(1024 * 1024)
}
for (const t of tx) {
if (t._class === core.class.TxCreateDoc) {
const doc = TxProcessor.createDoc2Doc(t as TxCreateDoc<BenchmarkDoc>)
const request = doc.request
if (request?.size != null) {
const dataSize =
typeof request.size === 'number' ? request.size : request.size.from + Math.random() * request.size.to
return [
{
response: benchData.slice(0, dataSize)
}
]
}
}
}
return [{}]
}
} }
/** /**
* @public * @public

View File

@ -128,7 +128,6 @@ export class FullTextIndex implements WithFind {
await ctx.with('queue', {}, async (ctx) => { await ctx.with('queue', {}, async (ctx) => {
await this.indexer.queue(ctx, stDocs) await this.indexer.queue(ctx, stDocs)
}) })
this.indexer.triggerIndexing()
return {} return {}
} }

View File

@ -9,7 +9,7 @@ import type {
ModelDb, ModelDb,
WorkspaceId WorkspaceId
} from '@hcengineering/core' } from '@hcengineering/core'
import core, { DOMAIN_MODEL, IndexKind, IndexOrder } from '@hcengineering/core' import core, { DOMAIN_BENCHMARK, DOMAIN_MODEL, IndexKind, IndexOrder } from '@hcengineering/core'
import { deepEqual } from 'fast-equals' import { deepEqual } from 'fast-equals'
import type { DomainHelper, DomainHelperOperations } from '../adapter' import type { DomainHelper, DomainHelperOperations } from '../adapter'
@ -38,7 +38,7 @@ export class DomainIndexHelperImpl implements DomainHelper {
for (const c of classes) { for (const c of classes) {
try { try {
const domain = hierarchy.findDomain(c._id) const domain = hierarchy.findDomain(c._id)
if (domain === undefined || domain === DOMAIN_MODEL) { if (domain === undefined || domain === DOMAIN_MODEL || domain === DOMAIN_BENCHMARK) {
continue continue
} }
const attrs = hierarchy.getAllAttributes(c._id) const attrs = hierarchy.getAllAttributes(c._id)
@ -103,7 +103,7 @@ export class DomainIndexHelperImpl implements DomainHelper {
try { try {
const has50Documents = await operations.hasDocuments(domain, 50) const has50Documents = await operations.hasDocuments(domain, 50)
const allIndexes = (await operations.listIndexes(domain)).filter((it) => it.name !== '_id_') const allIndexes = (await operations.listIndexes(domain)).filter((it) => it.name !== '_id_')
console.log('check indexes', domain, has50Documents) ctx.info('check indexes', { domain, has50Documents })
if (has50Documents) { if (has50Documents) {
for (const vv of [...(domainInfo?.values() ?? []), ...(cfg?.indexes ?? [])]) { for (const vv of [...(domainInfo?.values() ?? []), ...(cfg?.indexes ?? [])]) {
try { try {

View File

@ -174,7 +174,7 @@ export class TServerStorage implements ServerStorage {
await this.fulltext.close() await this.fulltext.close()
for (const [domain, info] of this.domainInfo.entries()) { for (const [domain, info] of this.domainInfo.entries()) {
if (info.checkPromise !== undefined) { if (info.checkPromise !== undefined) {
console.log('wait for check domain', domain) this.metrics.info('wait for check domain', { domain })
// We need to be sure we wait for check to be complete // We need to be sure we wait for check to be complete
await info.checkPromise await info.checkPromise
} }
@ -218,21 +218,22 @@ export class TServerStorage implements ServerStorage {
} }
private async routeTx (ctx: MeasureContext, removedDocs: Map<Ref<Doc>, Doc>, ...txes: Tx[]): Promise<TxResult[]> { private async routeTx (ctx: MeasureContext, removedDocs: Map<Ref<Doc>, Doc>, ...txes: Tx[]): Promise<TxResult[]> {
let part: TxCUD<Doc>[] = []
let lastDomain: Domain | undefined
const result: TxResult[] = [] const result: TxResult[] = []
const processPart = async (): Promise<void> => {
if (part.length > 0) { const domainGroups = new Map<Domain, TxCUD<Doc>[]>()
const processPart = async (domain: Domain, txes: TxCUD<Doc>[]): Promise<void> => {
if (txes.length > 0) {
// Find all deleted documents // Find all deleted documents
const adapter = this.getAdapter(lastDomain as Domain, true) const adapter = this.getAdapter(domain, true)
const toDelete = part.filter((it) => it._class === core.class.TxRemoveDoc).map((it) => it.objectId) const toDelete = txes.filter((it) => it._class === core.class.TxRemoveDoc).map((it) => it.objectId)
if (toDelete.length > 0) { if (toDelete.length > 0) {
const toDeleteDocs = await ctx.with( const toDeleteDocs = await ctx.with(
'adapter-load', 'adapter-load',
{ domain: lastDomain }, { domain },
async () => await adapter.load(ctx, lastDomain as Domain, toDelete) async () => await adapter.load(ctx, domain, toDelete)
) )
for (const ddoc of toDeleteDocs) { for (const ddoc of toDeleteDocs) {
@ -240,18 +241,18 @@ export class TServerStorage implements ServerStorage {
} }
} }
const r = await ctx.with('adapter-tx', { domain: lastDomain }, async (ctx) => await adapter.tx(ctx, ...part)) const r = await ctx.with('adapter-tx', { domain }, async (ctx) => await adapter.tx(ctx, ...txes))
// Update server live queries. // Update server live queries.
await this.liveQuery.tx(...part) await this.liveQuery.tx(...txes)
if (Array.isArray(r)) { if (Array.isArray(r)) {
result.push(...r) result.push(...r)
} else { } else {
result.push(r) result.push(r)
} }
part = []
} }
} }
for (const tx of txes) { for (const tx of txes) {
const txCUD = TxProcessor.extractTx(tx) as TxCUD<Doc> const txCUD = TxProcessor.extractTx(tx) as TxCUD<Doc>
if (!this.hierarchy.isDerived(txCUD._class, core.class.TxCUD)) { if (!this.hierarchy.isDerived(txCUD._class, core.class.TxCUD)) {
@ -260,18 +261,17 @@ export class TServerStorage implements ServerStorage {
continue continue
} }
const domain = this.hierarchy.getDomain(txCUD.objectClass) const domain = this.hierarchy.getDomain(txCUD.objectClass)
if (part.length > 0) {
if (lastDomain !== domain) { let group = domainGroups.get(domain)
await processPart() if (group === undefined) {
group = []
domainGroups.set(domain, group)
} }
lastDomain = domain group.push(txCUD)
part.push(txCUD)
} else {
lastDomain = domain
part.push(txCUD)
} }
for (const [domain, txes] of domainGroups.entries()) {
await processPart(domain, txes)
} }
await processPart()
return result return result
} }
@ -664,7 +664,6 @@ export class TServerStorage implements ServerStorage {
classes.add((etx as TxCUD<Doc>).objectClass) classes.add((etx as TxCUD<Doc>).objectClass)
} }
} }
console.log('Broadcasting compact bulk', derived.length)
const bevent = createBroadcastEvent(Array.from(classes)) const bevent = createBroadcastEvent(Array.from(classes))
this.options.broadcast([bevent], target, exclude) this.options.broadcast([bevent], target, exclude)
} }
@ -678,7 +677,6 @@ export class TServerStorage implements ServerStorage {
await sendWithPart(derived, target, exclude) await sendWithPart(derived, target, exclude)
} else { } else {
// Let's send after our response will go out // Let's send after our response will go out
console.log('Broadcasting', derived.length, derived.length)
this.options.broadcast(derived, target, exclude) this.options.broadcast(derived, target, exclude)
} }
} }
@ -879,7 +877,11 @@ export class TServerStorage implements ServerStorage {
if (!this.hierarchy.isDerived(tx._class, core.class.TxApplyIf)) { if (!this.hierarchy.isDerived(tx._class, core.class.TxApplyIf)) {
if (tx.space !== core.space.DerivedTx) { if (tx.space !== core.space.DerivedTx) {
if (this.hierarchy.isDerived(tx._class, core.class.TxCUD)) { if (this.hierarchy.isDerived(tx._class, core.class.TxCUD)) {
if (this.hierarchy.findDomain((tx as TxCUD<Doc>).objectClass) !== DOMAIN_TRANSIENT) { const objectClass = (tx as TxCUD<Doc>).objectClass
if (
objectClass !== core.class.BenchmarkDoc &&
this.hierarchy.findDomain(objectClass) !== DOMAIN_TRANSIENT
) {
txToStore.push(tx) txToStore.push(tx)
} }
} else { } else {
@ -944,10 +946,17 @@ export class TServerStorage implements ServerStorage {
await this.triggers.tx(tx) await this.triggers.tx(tx)
await this.modelDb.tx(tx) await this.modelDb.tx(tx)
} }
await ctx.with('domain-tx', {}, async (ctx) => await this.getAdapter(DOMAIN_TX, true).tx(ctx.ctx, ...txToStore), { if (txToStore.length > 0) {
await ctx.with(
'domain-tx',
{},
async (ctx) => await this.getAdapter(DOMAIN_TX, true).tx(ctx.ctx, ...txToStore),
{
count: txToStore.length count: txToStore.length
}) }
result.push(...(await ctx.with('apply', {}, (ctx) => this.routeTx(ctx.ctx, removedMap, ...txToProcess))), { )
}
result.push(...(await ctx.with('routeTx', {}, (ctx) => this.routeTx(ctx.ctx, removedMap, ...txToProcess))), {
count: txToProcess.length count: txToProcess.length
}) })
@ -955,14 +964,17 @@ export class TServerStorage implements ServerStorage {
derived.push(...(await this.processDerived(ctx, txToProcess, _findAll, removedMap))) derived.push(...(await this.processDerived(ctx, txToProcess, _findAll, removedMap)))
// index object // index object
const ftx = [...txToProcess, ...derived]
if (ftx.length > 0) {
await ctx.with( await ctx.with(
'fulltext-tx', 'fulltext-tx',
{}, {},
async (ctx) => { async (ctx) => {
await this.fulltext.tx(ctx.ctx, [...txToProcess, ...derived]) await this.fulltext.tx(ctx.ctx, ftx)
}, },
{ count: txToProcess.length + derived.length } { count: txToProcess.length + derived.length }
) )
}
} catch (err: any) { } catch (err: any) {
ctx.ctx.error('error process tx', { error: err }) ctx.ctx.error('error process tx', { error: err })
Analytics.handleError(err) Analytics.handleError(err)

View File

@ -107,7 +107,7 @@ export class Triggers {
...ctrl, ...ctrl,
operationContext: ctx, operationContext: ctx,
ctx: ctx.ctx, ctx: ctx.ctx,
txFactory: new TxFactory(core.account.System, true), txFactory: null as any, // Will be set later
findAll: async (clazz, query, options) => await ctrl.findAllCtx(ctx.ctx, clazz, query, options), findAll: async (clazz, query, options) => await ctrl.findAllCtx(ctx.ctx, clazz, query, options),
apply: async (tx, needResult) => { apply: async (tx, needResult) => {
apply.push(...tx) apply.push(...tx)
@ -195,9 +195,9 @@ export class Triggers {
return return
} }
if (typeof q[key] === 'string') { if (typeof q[key] === 'string') {
const descendants = this.hierarchy.getDescendants(q[key]) const descendants = this.hierarchy.getDescendants(q[key] as Ref<Class<Doc>>)
q[key] = { q[key] = {
$in: [...(q[key].$in ?? []), ...descendants] $in: [q[key] as Ref<Class<Doc>>, ...descendants]
} }
} else { } else {
if (Array.isArray(q[key].$in)) { if (Array.isArray(q[key].$in)) {

View File

@ -16,8 +16,6 @@
import platform, { PlatformError, Severity, Status } from '@hcengineering/platform' import platform, { PlatformError, Severity, Status } from '@hcengineering/platform'
import { Packr } from 'msgpackr' import { Packr } from 'msgpackr'
const packr = new Packr({ structuredClone: true, bundleStrings: true, copyBuffers: true })
/** /**
* @public * @public
*/ */
@ -49,79 +47,6 @@ export interface HelloResponse extends Response<any> {
reconnect?: boolean reconnect?: boolean
} }
/**
* Response object define a server response on transaction request.
* Also used to inform other clients about operations being performed by server.
*
* @public
*/
export interface Response<R> {
result?: R
id?: ReqId
error?: Status
chunk?: {
index: number
final: boolean
}
time?: number // Server time to perform operation
bfst?: number // Server time to perform operation
queue?: number
}
/**
* @public
* @param object -
* @returns
*/
export function protoSerialize (object: object, binary: boolean): any {
if (!binary) {
return JSON.stringify(object, replacer)
}
return new Uint8Array(packr.pack(object))
}
/**
* @public
* @param data -
* @returns
*/
export function protoDeserialize (data: any, binary: boolean): any {
if (!binary) {
let _data = data
if (_data instanceof ArrayBuffer) {
const decoder = new TextDecoder()
_data = decoder.decode(_data)
}
return JSON.parse(_data.toString(), receiver)
}
return packr.unpack(new Uint8Array(data))
}
/**
* @public
* @param object -
* @returns
*/
export function serialize (object: Request<any> | Response<any>, binary: boolean): any {
if ((object as any).result !== undefined) {
;(object as any).result = replacer('result', (object as any).result)
}
return protoSerialize(object, binary)
}
/**
* @public
* @param response -
* @returns
*/
export function readResponse<D> (response: any, binary: boolean): Response<D> {
const data = protoDeserialize(response, binary)
if (data.result !== undefined) {
data.result = receiver('result', data.result)
}
return data
}
function replacer (key: string, value: any): any { function replacer (key: string, value: any): any {
if (Array.isArray(value) && ((value as any).total !== undefined || (value as any).lookupMap !== undefined)) { if (Array.isArray(value) && ((value as any).total !== undefined || (value as any).lookupMap !== undefined)) {
return { return {
@ -145,16 +70,82 @@ function receiver (key: string, value: any): any {
} }
/** /**
* Response object define a server response on transaction request.
* Also used to inform other clients about operations being performed by server.
*
* @public
*/
export interface Response<R> {
result?: R
id?: ReqId
error?: Status
chunk?: {
index: number
final: boolean
}
time?: number // Server time to perform operation
bfst?: number // Server time to perform operation
queue?: number
}
export class RPCHandler {
packr = new Packr({ structuredClone: true, bundleStrings: true, copyBuffers: true })
protoSerialize (object: object, binary: boolean): any {
if (!binary) {
return JSON.stringify(object, replacer)
}
return new Uint8Array(this.packr.pack(object))
}
protoDeserialize (data: any, binary: boolean): any {
if (!binary) {
let _data = data
if (_data instanceof ArrayBuffer) {
const decoder = new TextDecoder()
_data = decoder.decode(_data)
}
return JSON.parse(_data.toString(), receiver)
}
return this.packr.unpack(new Uint8Array(data))
}
/**
* @public
* @param object -
* @returns
*/
serialize (object: Request<any> | Response<any>, binary: boolean): any {
if ((object as any).result !== undefined) {
;(object as any).result = replacer('result', (object as any).result)
}
return this.protoSerialize(object, binary)
}
/**
* @public
* @param response -
* @returns
*/
readResponse<D>(response: any, binary: boolean): Response<D> {
const data = this.protoDeserialize(response, binary)
if (data.result !== undefined) {
data.result = receiver('result', data.result)
}
return data
}
/**
* @public * @public
* @param request - * @param request -
* @returns * @returns
*/ */
export function readRequest<P extends any[]> (request: any, binary: boolean): Request<P> { readRequest<P extends any[]>(request: any, binary: boolean): Request<P> {
const result: Request<P> = protoDeserialize(request, binary) const result: Request<P> = this.protoDeserialize(request, binary)
if (typeof result.method !== 'string') { if (typeof result.method !== 'string') {
throw new PlatformError(new Status(Severity.ERROR, platform.status.BadRequest, {})) throw new PlatformError(new Status(Severity.ERROR, platform.status.BadRequest, {}))
} }
return result return result
}
} }
/** /**

View File

@ -18,7 +18,7 @@ export interface ServerEnv {
export function serverConfigFromEnv (): ServerEnv { export function serverConfigFromEnv (): ServerEnv {
const serverPort = parseInt(process.env.SERVER_PORT ?? '3333') const serverPort = parseInt(process.env.SERVER_PORT ?? '3333')
const enableCompression = (process.env.ENABLE_COMPRESSION ?? 'true') === 'true' const enableCompression = (process.env.ENABLE_COMPRESSION ?? 'false') === 'true'
const url = process.env.MONGO_URL const url = process.env.MONGO_URL
if (url === undefined) { if (url === undefined) {

View File

@ -18,6 +18,7 @@ import core, {
BackupClient, BackupClient,
Branding, Branding,
Client as CoreClient, Client as CoreClient,
DOMAIN_BENCHMARK,
DOMAIN_MIGRATION, DOMAIN_MIGRATION,
DOMAIN_MODEL, DOMAIN_MODEL,
DOMAIN_TRANSIENT, DOMAIN_TRANSIENT,
@ -480,7 +481,7 @@ async function createUpdateIndexes (
let completed = 0 let completed = 0
const allDomains = connection.getHierarchy().domains() const allDomains = connection.getHierarchy().domains()
for (const domain of allDomains) { for (const domain of allDomains) {
if (domain === DOMAIN_MODEL || domain === DOMAIN_TRANSIENT) { if (domain === DOMAIN_MODEL || domain === DOMAIN_TRANSIENT || domain === DOMAIN_BENCHMARK) {
continue continue
} }
const result = await domainHelper.checkDomain(ctx, domain, false, dbHelper) const result = await domainHelper.checkDomain(ctx, domain, false, dbHelper)

View File

@ -15,7 +15,7 @@
// //
import { UNAUTHORIZED } from '@hcengineering/platform' import { UNAUTHORIZED } from '@hcengineering/platform'
import { readResponse, serialize } from '@hcengineering/rpc' import { RPCHandler } from '@hcengineering/rpc'
import { generateToken } from '@hcengineering/server-token' import { generateToken } from '@hcengineering/server-token'
import WebSocket from 'ws' import WebSocket from 'ws'
import { start } from '../server' import { start } from '../server'
@ -45,6 +45,7 @@ import { startHttpServer } from '../server_http'
import { genMinModel } from './minmodel' import { genMinModel } from './minmodel'
describe('server', () => { describe('server', () => {
const handler = new RPCHandler()
async function getModelDb (): Promise<ModelDb> { async function getModelDb (): Promise<ModelDb> {
const txes = genMinModel() const txes = genMinModel()
const hierarchy = new Hierarchy() const hierarchy = new Hierarchy()
@ -116,7 +117,7 @@ describe('server', () => {
conn.close(1000) conn.close(1000)
}) })
conn.on('message', (msg: string) => { conn.on('message', (msg: string) => {
const resp = readResponse(msg, false) const resp = handler.readResponse(msg, false)
expect(resp.result === 'hello') expect(resp.result === 'hello')
expect(resp.error?.code).toBe(UNAUTHORIZED.code) expect(resp.error?.code).toBe(UNAUTHORIZED.code)
conn.close(1000) conn.close(1000)
@ -132,12 +133,12 @@ describe('server', () => {
// const start = Date.now() // const start = Date.now()
conn.on('open', () => { conn.on('open', () => {
for (let i = 0; i < total; i++) { for (let i = 0; i < total; i++) {
conn.send(serialize({ method: 'tx', params: [], id: i }, false)) conn.send(handler.serialize({ method: 'tx', params: [], id: i }, false))
} }
}) })
let received = 0 let received = 0
conn.on('message', (msg: string) => { conn.on('message', (msg: string) => {
readResponse(msg, false) handler.readResponse(msg, false)
if (++received === total) { if (++received === total) {
// console.log('resp:', resp, ' Time: ', Date.now() - start) // console.log('resp:', resp, ' Time: ', Date.now() - start)
conn.close(1000) conn.close(1000)
@ -199,8 +200,8 @@ describe('server', () => {
timeoutPromise, timeoutPromise,
new Promise((resolve) => { new Promise((resolve) => {
newConn.on('open', () => { newConn.on('open', () => {
newConn.send(serialize({ method: 'hello', params: [], id: -1 }, false)) newConn.send(handler.serialize({ method: 'hello', params: [], id: -1 }, false))
newConn.send(serialize({ method: 'findAll', params: [], id: -1 }, false)) newConn.send(handler.serialize({ method: 'findAll', params: [], id: -1 }, false))
resolve(null) resolve(null)
}) })
}) })
@ -216,13 +217,13 @@ describe('server', () => {
newConn.on('message', (msg: Buffer) => { newConn.on('message', (msg: Buffer) => {
try { try {
console.log('resp:', msg.toString()) console.log('resp:', msg.toString())
const parsedMsg = readResponse(msg.toString(), false) // Hello const parsedMsg = handler.readResponse(msg.toString(), false) // Hello
if (!helloReceived) { if (!helloReceived) {
expect(parsedMsg.result === 'hello') expect(parsedMsg.result === 'hello')
helloReceived = true helloReceived = true
return return
} }
responseMsg = readResponse(msg.toString(), false) // our message responseMsg = handler.readResponse(msg.toString(), false) // our message
resolve(null) resolve(null)
} catch (err: any) { } catch (err: any) {
console.error(err) console.error(err)

View File

@ -37,11 +37,13 @@ import core, {
import { SessionContextImpl, createBroadcastEvent, type Pipeline } from '@hcengineering/server-core' import { SessionContextImpl, createBroadcastEvent, type Pipeline } from '@hcengineering/server-core'
import { type Token } from '@hcengineering/server-token' import { type Token } from '@hcengineering/server-token'
import { type ClientSessionCtx, type Session, type SessionRequest, type StatisticsElement } from './types' import { type ClientSessionCtx, type Session, type SessionRequest, type StatisticsElement } from './types'
import { RPCHandler } from '@hcengineering/rpc'
/** /**
* @public * @public
*/ */
export class ClientSession implements Session { export class ClientSession implements Session {
handler = new RPCHandler()
createTime = Date.now() createTime = Date.now()
requests = new Map<string, SessionRequest>() requests = new Map<string, SessionRequest>()
binaryMode: boolean = false binaryMode: boolean = false
@ -256,7 +258,6 @@ export class ClientSession implements Session {
await this.sendWithPart(derived, ctx, target, exclude) await this.sendWithPart(derived, ctx, target, exclude)
} else { } else {
// Let's send after our response will go out // Let's send after our response will go out
console.log('Broadcasting', derived.length, derived.length)
await ctx.send(derived, target, exclude) await ctx.send(derived, target, exclude)
} }
} }
@ -315,7 +316,6 @@ export class ClientSession implements Session {
classes.add((etx as TxCUD<Doc>).objectClass) classes.add((etx as TxCUD<Doc>).objectClass)
} }
} }
console.log('Broadcasting compact bulk', derived.length)
const bevent = createBroadcastEvent(Array.from(classes)) const bevent = createBroadcastEvent(Array.from(classes))
await ctx.send([bevent], target, exclude) await ctx.send([bevent], target, exclude)
} }

View File

@ -502,12 +502,6 @@ class TSessionManager implements SessionManager {
if (workspace?.upgrade ?? false) { if (workspace?.upgrade ?? false) {
return return
} }
if (LOGGING_ENABLED) {
this.ctx.info('server broadcasting to clients...', {
workspace: workspaceId.name,
count: workspace.sessions.size
})
}
const sessions = [...workspace.sessions.values()] const sessions = [...workspace.sessions.values()]
const ctx = this.ctx.newChild('📭 broadcast', {}) const ctx = this.ctx.newChild('📭 broadcast', {})
@ -824,7 +818,7 @@ class TSessionManager implements SessionManager {
void userCtx.with(`🧭 ${backupMode ? 'handleBackup' : 'handleRequest'}`, {}, async (ctx) => { void userCtx.with(`🧭 ${backupMode ? 'handleBackup' : 'handleRequest'}`, {}, async (ctx) => {
if (request.time != null) { if (request.time != null) {
const delta = Date.now() - request.time const delta = Date.now() - request.time
userCtx.measure('receive msg', delta) requestCtx.measure('msg-receive-delta', delta)
} }
const wsRef = this.workspaces.get(workspace) const wsRef = this.workspaces.get(workspace)
if (wsRef === undefined) { if (wsRef === undefined) {
@ -843,7 +837,7 @@ class TSessionManager implements SessionManager {
let done = false let done = false
if (wsRef.upgrade) { if (wsRef.upgrade) {
done = true done = true
console.log('FORCE CLOSE', workspace) this.ctx.warn('FORCE CLOSE', { workspace })
// In case of upgrade, we need to force close workspace not in interval handler // In case of upgrade, we need to force close workspace not in interval handler
await this.forceClose(workspace, ws) await this.forceClose(workspace, ws)
} }
@ -882,12 +876,12 @@ class TSessionManager implements SessionManager {
binary: service.binaryMode, binary: service.binaryMode,
reconnect reconnect
} }
await ws.send(ctx, helloResponse, false, false) await ws.send(requestCtx, helloResponse, false, false)
return return
} }
const opContext = (ctx: MeasureContext): ClientSessionCtx => ({ const opContext = (ctx: MeasureContext): ClientSessionCtx => ({
sendResponse: async (msg) => { sendResponse: async (msg) => {
await sendResponse(ctx, service, ws, { await sendResponse(requestCtx, service, ws, {
id: request.id, id: request.id,
result: msg, result: msg,
time: Date.now() - st, time: Date.now() - st,

View File

@ -16,7 +16,7 @@
import { Analytics } from '@hcengineering/analytics' import { Analytics } from '@hcengineering/analytics'
import { generateId, toWorkspaceString, type MeasureContext } from '@hcengineering/core' import { generateId, toWorkspaceString, type MeasureContext } from '@hcengineering/core'
import { UNAUTHORIZED } from '@hcengineering/platform' import { UNAUTHORIZED } from '@hcengineering/platform'
import { serialize, type Response } from '@hcengineering/rpc' import { RPCHandler, type Response } from '@hcengineering/rpc'
import { decodeToken, type Token } from '@hcengineering/server-token' import { decodeToken, type Token } from '@hcengineering/server-token'
import compression from 'compression' import compression from 'compression'
import cors from 'cors' import cors from 'cors'
@ -109,7 +109,7 @@ export function startHttpServer (
res.end(json) res.end(json)
} catch (err: any) { } catch (err: any) {
Analytics.handleError(err) Analytics.handleError(err)
console.error(err) ctx.error('error', { err })
res.writeHead(404, {}) res.writeHead(404, {})
res.end() res.end()
} }
@ -158,7 +158,7 @@ export function startHttpServer (
res.end() res.end()
} catch (err: any) { } catch (err: any) {
Analytics.handleError(err) Analytics.handleError(err)
console.error(err) ctx.error('error', { err })
res.writeHead(404, {}) res.writeHead(404, {})
res.end() res.end()
} }
@ -313,16 +313,21 @@ export function startHttpServer (
// eslint-disable-next-line @typescript-eslint/no-misused-promises // eslint-disable-next-line @typescript-eslint/no-misused-promises
ws.on('message', (msg: RawData) => { ws.on('message', (msg: RawData) => {
try { try {
let buff: any | undefined let buff: Buffer | undefined
if (msg instanceof Buffer) { if (msg instanceof Buffer) {
buff = Buffer.copyBytesFrom(msg) buff = msg
} else if (Array.isArray(msg)) { } else if (Array.isArray(msg)) {
buff = Buffer.copyBytesFrom(Buffer.concat(msg)) buff = Buffer.concat(msg)
} }
if (buff !== undefined) { if (buff !== undefined) {
doSessionOp(webSocketData, (s) => { doSessionOp(
webSocketData,
(s, buff) => {
s.context.measure('receive-data', buff?.length ?? 0)
processRequest(s.session, cs, s.context, s.workspaceId, buff, handleRequest) processRequest(s.session, cs, s.context, s.workspaceId, buff, handleRequest)
}) },
buff
)
} }
} catch (err: any) { } catch (err: any) {
Analytics.handleError(err) Analytics.handleError(err)
@ -333,18 +338,26 @@ export function startHttpServer (
}) })
// eslint-disable-next-line @typescript-eslint/no-misused-promises // eslint-disable-next-line @typescript-eslint/no-misused-promises
ws.on('close', async (code: number, reason: Buffer) => { ws.on('close', async (code: number, reason: Buffer) => {
doSessionOp(webSocketData, (s) => { doSessionOp(
webSocketData,
(s) => {
if (!(s.session.workspaceClosed ?? false)) { if (!(s.session.workspaceClosed ?? false)) {
// remove session after 1seconds, give a time to reconnect. // remove session after 1seconds, give a time to reconnect.
void sessions.close(ctx, cs, toWorkspaceString(token.workspace)) void sessions.close(ctx, cs, toWorkspaceString(token.workspace))
} }
}) },
Buffer.from('')
)
}) })
ws.on('error', (err) => { ws.on('error', (err) => {
doSessionOp(webSocketData, (s) => { doSessionOp(
console.error(s.session.getUser(), 'error', err) webSocketData,
}) (s) => {
ctx.error('error', { err, user: s.session.getUser() })
},
Buffer.from('')
)
}) })
} }
wss.on('connection', handleConnection as any) wss.on('connection', handleConnection as any)
@ -372,18 +385,19 @@ export function startHttpServer (
if (LOGGING_ENABLED) { if (LOGGING_ENABLED) {
ctx.error('invalid token', err) ctx.error('invalid token', err)
} }
const handler = new RPCHandler()
wss.handleUpgrade(request, socket, head, (ws) => { wss.handleUpgrade(request, socket, head, (ws) => {
const resp: Response<any> = { const resp: Response<any> = {
id: -1, id: -1,
error: UNAUTHORIZED, error: UNAUTHORIZED,
result: 'hello' result: 'hello'
} }
ws.send(serialize(resp, false), { binary: false }) ws.send(handler.serialize(resp, false), { binary: false })
ws.onmessage = (msg) => { ws.onmessage = (msg) => {
const resp: Response<any> = { const resp: Response<any> = {
error: UNAUTHORIZED error: UNAUTHORIZED
} }
ws.send(serialize(resp, false), { binary: false }) ws.send(handler.serialize(resp, false), { binary: false })
} }
}) })
} }
@ -419,6 +433,7 @@ function createWebsocketClientSocket (
ws: WebSocket, ws: WebSocket,
data: { remoteAddress: string, userAgent: string, language: string, email: string, mode: any, model: any } data: { remoteAddress: string, userAgent: string, language: string, email: string, mode: any, model: any }
): ConnectionSocket { ): ConnectionSocket {
const handler = new RPCHandler()
const cs: ConnectionSocket = { const cs: ConnectionSocket = {
id: generateId(), id: generateId(),
isClosed: false, isClosed: false,
@ -426,34 +441,31 @@ function createWebsocketClientSocket (
cs.isClosed = true cs.isClosed = true
ws.close() ws.close()
}, },
readRequest: (buffer: Buffer, binary: boolean) => {
return handler.readRequest(buffer, binary)
},
data: () => data, data: () => data,
send: async (ctx: MeasureContext, msg, binary, compression) => { send: async (ctx: MeasureContext, msg, binary, compression) => {
const smsg = serialize(msg, binary) const sst = Date.now()
const smsg = handler.serialize(msg, binary)
ctx.measure('serialize', Date.now() - sst)
ctx.measure('send-data', smsg.length) ctx.measure('send-data', smsg.length)
await new Promise<void>((resolve, reject) => { const st = Date.now()
const doSend = (): void => {
if (ws.readyState !== ws.OPEN || cs.isClosed) { if (ws.readyState !== ws.OPEN || cs.isClosed) {
return return
} }
if (ws.bufferedAmount > 16 * 1024) { if (ws.bufferedAmount > 16 * 1024) {
setTimeout(doSend) ctx.measure('send-bufferAmmount', 1)
return
} }
ws.send(smsg, { binary: true, compress: compression }, (err) => { ws.send(smsg, { binary: true, compress: compression }, (err) => {
if (err != null) { if (err != null) {
if (!`${err.message}`.includes('WebSocket is not open')) { if (!`${err.message}`.includes('WebSocket is not open')) {
ctx.error('send error', { err }) ctx.error('send error', { err })
Analytics.handleError(err) Analytics.handleError(err)
reject(err)
} }
// In case socket not open, just resolve
resolve()
} }
resolve() ctx.measure('msg-send-delta', Date.now() - st)
})
}
doSend()
}) })
return smsg.length return smsg.length
} }

View File

@ -20,7 +20,7 @@ import { generateId, toWorkspaceString, type MeasureContext } from '@hcengineeri
import { decodeToken } from '@hcengineering/server-token' import { decodeToken } from '@hcengineering/server-token'
import { Analytics } from '@hcengineering/analytics' import { Analytics } from '@hcengineering/analytics'
import { serialize } from '@hcengineering/rpc' import { RPCHandler } from '@hcengineering/rpc'
import { getStatistics, wipeStatistics } from './stats' import { getStatistics, wipeStatistics } from './stats'
import { import {
LOGGING_ENABLED, LOGGING_ENABLED,
@ -157,9 +157,11 @@ export function startUWebsocketServer (
}, },
message: (ws, message, isBinary) => { message: (ws, message, isBinary) => {
const data = ws.getUserData() const data = ws.getUserData()
const msg = Buffer.copyBytesFrom(Buffer.from(message)) const msg = Buffer.from(message)
doSessionOp(data, (s) => { doSessionOp(
data,
(s, msg) => {
processRequest( processRequest(
s.session, s.session,
data.connectionSocket as ConnectionSocket, data.connectionSocket as ConnectionSocket,
@ -168,12 +170,15 @@ export function startUWebsocketServer (
msg, msg,
handleRequest handleRequest
) )
}) },
msg
)
}, },
drain: (ws) => { drain: (ws) => {
const data = ws.getUserData() const data = ws.getUserData()
while (data.unsendMsg.length > 0) { while (data.unsendMsg.length > 0) {
if (ws.send(data.unsendMsg[0].data, data.unsendMsg[0].binary, data.unsendMsg[0].compression) !== 1) { const sendResult = ws.send(data.unsendMsg[0].data, data.unsendMsg[0].binary, data.unsendMsg[0].compression)
if (sendResult !== 2) {
ctx.measure('send-data', data.unsendMsg[0].data.length) ctx.measure('send-data', data.unsendMsg[0].data.length)
data.unsendMsg.shift() data.unsendMsg.shift()
@ -185,10 +190,14 @@ export function startUWebsocketServer (
return return
} }
} }
data.backPressureResolve?.()
data.backPressure = undefined
}, },
close: (ws, code, message) => { close: (ws, code, message) => {
const data = ws.getUserData() const data = ws.getUserData()
doSessionOp(data, (s) => { doSessionOp(
data,
(s) => {
if (!(s.session.workspaceClosed ?? false)) { if (!(s.session.workspaceClosed ?? false)) {
// remove session after 1seconds, give a time to reconnect. // remove session after 1seconds, give a time to reconnect.
void sessions.close( void sessions.close(
@ -197,7 +206,9 @@ export function startUWebsocketServer (
toWorkspaceString(data.payload.workspace) toWorkspaceString(data.payload.workspace)
) )
} }
}) },
Buffer.from('')
)
} }
}) })
.any('/api/v1/statistics', (response, request) => { .any('/api/v1/statistics', (response, request) => {
@ -363,6 +374,7 @@ function createWebSocketClientSocket (
ws: uWebSockets.WebSocket<WebsocketUserData>, ws: uWebSockets.WebSocket<WebsocketUserData>,
data: WebsocketUserData data: WebsocketUserData
): ConnectionSocket { ): ConnectionSocket {
const handler = new RPCHandler()
const cs: ConnectionSocket = { const cs: ConnectionSocket = {
id: generateId(), id: generateId(),
isClosed: false, isClosed: false,
@ -375,9 +387,14 @@ function createWebSocketClientSocket (
// Ignore closed // Ignore closed
} }
}, },
readRequest: (buffer: Buffer, binary: boolean) => {
return handler.readRequest(buffer, binary)
},
send: async (ctx, msg, binary, compression): Promise<number> => { send: async (ctx, msg, binary, compression): Promise<number> => {
if (data.backPressure !== undefined) {
await data.backPressure await data.backPressure
const serialized = serialize(msg, binary) }
const serialized = handler.serialize(msg, binary)
try { try {
const sendR = ws.send(serialized, binary, compression) const sendR = ws.send(serialized, binary, compression)
if (sendR === 2) { if (sendR === 2) {

View File

@ -104,6 +104,8 @@ export interface ConnectionSocket {
close: () => void close: () => void
send: (ctx: MeasureContext, msg: Response<any>, binary: boolean, compression: boolean) => Promise<number> send: (ctx: MeasureContext, msg: Response<any>, binary: boolean, compression: boolean) => Promise<number>
data: () => Record<string, any> data: () => Record<string, any>
readRequest: (buffer: Buffer, binary: boolean) => Request<any>
} }
/** /**

View File

@ -1,5 +1,5 @@
import { toFindResult, type FindResult, type MeasureContext } from '@hcengineering/core' import { toFindResult, type FindResult, type MeasureContext } from '@hcengineering/core'
import { readRequest, type Response } from '@hcengineering/rpc' import { type Response } from '@hcengineering/rpc'
import type { Token } from '@hcengineering/server-token' import type { Token } from '@hcengineering/server-token'
import type { AddSessionActive, AddSessionResponse, ConnectionSocket, HandleRequestFunction, Session } from './types' import type { AddSessionActive, AddSessionResponse, ConnectionSocket, HandleRequestFunction, Session } from './types'
@ -11,17 +11,23 @@ export interface WebsocketData {
url: string url: string
} }
export function doSessionOp (data: WebsocketData, op: (session: AddSessionActive) => void): void { export function doSessionOp (
data: WebsocketData,
op: (session: AddSessionActive, msg: Buffer) => void,
msg: Buffer
): void {
if (data.session instanceof Promise) { if (data.session instanceof Promise) {
// We need to copy since we will out of protected buffer area
const msgCopy = Buffer.copyBytesFrom(msg)
void data.session.then((_session) => { void data.session.then((_session) => {
data.session = _session data.session = _session
if ('session' in _session) { if ('session' in _session) {
op(_session) op(_session, msgCopy)
} }
}) })
} else { } else {
if (data.session !== undefined && 'session' in data.session) { if (data.session !== undefined && 'session' in data.session) {
op(data.session) op(data.session, msg)
} }
} }
} }
@ -34,8 +40,19 @@ export function processRequest (
buff: any, buff: any,
handleRequest: HandleRequestFunction handleRequest: HandleRequestFunction
): void { ): void {
const request = readRequest(buff, session.binaryMode) const st = Date.now()
try {
const request = cs.readRequest(buff, session.binaryMode)
const ed = Date.now()
context.measure('deserialize', ed - st)
handleRequest(context, session, cs, request, workspaceId) handleRequest(context, session, cs, request, workspaceId)
} catch (err: any) {
if (err.message === 'Data read, but end of buffer not reached 123') {
// ignore it
} else {
throw err
}
}
} }
export async function sendResponse ( export async function sendResponse (