From 7625b73f084dd73ede36baf76acd05df536013db Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Fri, 10 May 2024 13:10:04 +0700 Subject: [PATCH] UBERF-6888: Async triggers (#5565) --- models/server-activity/src/index.ts | 3 +- models/server-chunter/src/index.ts | 3 +- .../activity-resources/src/references.ts | 68 ++++---- .../attachment-resources/src/index.ts | 11 +- .../collaboration-resources/src/index.ts | 15 +- server-plugins/contact-resources/src/index.ts | 24 ++- server/core/src/configuration.ts | 2 +- server/core/src/server/index.ts | 2 +- server/core/src/server/storage.ts | 164 +++++++++++++----- server/core/src/triggers.ts | 104 ++++++++--- server/core/src/types.ts | 11 +- server/core/src/utils.ts | 54 +++++- server/ws/src/client.ts | 59 +------ server/ws/src/server.ts | 85 ++++++--- server/ws/src/server_http.ts | 32 ++-- server/ws/src/types.ts | 1 - 16 files changed, 401 insertions(+), 237 deletions(-) diff --git a/models/server-activity/src/index.ts b/models/server-activity/src/index.ts index 9857a24c58..9e5f65f36a 100644 --- a/models/server-activity/src/index.ts +++ b/models/server-activity/src/index.ts @@ -39,6 +39,7 @@ export function createModel (builder: Builder): void { }) builder.createDoc(serverCore.class.Trigger, core.space.Model, { - trigger: serverActivity.trigger.ReferenceTrigger + trigger: serverActivity.trigger.ReferenceTrigger, + isAsync: true }) } diff --git a/models/server-chunter/src/index.ts b/models/server-chunter/src/index.ts index e7a2fea304..b9687e59e1 100644 --- a/models/server-chunter/src/index.ts +++ b/models/server-chunter/src/index.ts @@ -83,6 +83,7 @@ export function createModel (builder: Builder): void { _class: core.class.TxCollectionCUD, 'tx._class': core.class.TxCreateDoc, 'tx.objectClass': chunter.class.ChatMessage - } + }, + isAsync: true }) } diff --git a/server-plugins/activity-resources/src/references.ts b/server-plugins/activity-resources/src/references.ts index 7c313a80c0..8b402926e8 100644 --- a/server-plugins/activity-resources/src/references.ts +++ b/server-plugins/activity-resources/src/references.ts @@ -597,27 +597,25 @@ async function ActivityReferenceCreate (tx: TxCUD, control: TriggerControl) if (control.hierarchy.isDerived(ctx.objectClass, notification.class.InboxNotification)) return [] if (control.hierarchy.isDerived(ctx.objectClass, activity.class.ActivityReference)) return [] - control.storageFx(async (adapter) => { - const txFactory = new TxFactory(control.txFactory.account) + const txFactory = new TxFactory(control.txFactory.account) - const doc = TxProcessor.createDoc2Doc(ctx) - const targetTx = guessReferenceTx(control.hierarchy, tx) + const doc = TxProcessor.createDoc2Doc(ctx) + const targetTx = guessReferenceTx(control.hierarchy, tx) - const txes: Tx[] = await getCreateReferencesTxes( - control, - adapter, - txFactory, - doc, - targetTx.objectId, - targetTx.objectClass, - targetTx.objectSpace, - tx - ) + const txes: Tx[] = await getCreateReferencesTxes( + control, + control.storageAdapter, + txFactory, + doc, + targetTx.objectId, + targetTx.objectClass, + targetTx.objectSpace, + tx + ) - if (txes.length !== 0) { - await control.apply(txes, true) - } - }) + if (txes.length !== 0) { + await control.apply(txes, true) + } return [] } @@ -647,26 +645,24 @@ async function ActivityReferenceUpdate (tx: TxCUD, control: TriggerControl) return [] } - control.storageFx(async (adapter) => { - const txFactory = new TxFactory(control.txFactory.account) - const doc = TxProcessor.updateDoc2Doc(rawDoc, ctx) - const targetTx = guessReferenceTx(control.hierarchy, tx) + const txFactory = new TxFactory(control.txFactory.account) + const doc = TxProcessor.updateDoc2Doc(rawDoc, ctx) + const targetTx = guessReferenceTx(control.hierarchy, tx) - const txes: Tx[] = await getUpdateReferencesTxes( - control, - adapter, - txFactory, - doc, - targetTx.objectId, - targetTx.objectClass, - targetTx.objectSpace, - tx - ) + const txes: Tx[] = await getUpdateReferencesTxes( + control, + control.storageAdapter, + txFactory, + doc, + targetTx.objectId, + targetTx.objectClass, + targetTx.objectSpace, + tx + ) - if (txes.length !== 0) { - await control.apply(txes, true) - } - }) + if (txes.length !== 0) { + await control.apply(txes, true) + } return [] } diff --git a/server-plugins/attachment-resources/src/index.ts b/server-plugins/attachment-resources/src/index.ts index 32ec9c26c3..b277437f92 100644 --- a/server-plugins/attachment-resources/src/index.ts +++ b/server-plugins/attachment-resources/src/index.ts @@ -15,7 +15,7 @@ // import type { Attachment } from '@hcengineering/attachment' -import type { Doc, Ref, Tx, TxRemoveDoc } from '@hcengineering/core' +import type { Tx, TxRemoveDoc } from '@hcengineering/core' import { TxProcessor } from '@hcengineering/core' import type { TriggerControl } from '@hcengineering/server-core' @@ -24,7 +24,7 @@ import type { TriggerControl } from '@hcengineering/server-core' */ export async function OnAttachmentDelete ( tx: Tx, - { findAll, hierarchy, fulltextFx, storageFx, removedMap, ctx }: TriggerControl + { removedMap, ctx, storageAdapter, workspace }: TriggerControl ): Promise { const rmTx = TxProcessor.extractTx(tx) as TxRemoveDoc @@ -34,13 +34,8 @@ export async function OnAttachmentDelete ( if (attach === undefined) { return [] } - fulltextFx(async (adapter) => { - await adapter.remove([attach.file as Ref]) - }) - storageFx(async (adapter, bucket) => { - await adapter.remove(ctx, bucket, [attach.file]) - }) + await storageAdapter.remove(ctx, workspace, [attach.file]) return [] } diff --git a/server-plugins/collaboration-resources/src/index.ts b/server-plugins/collaboration-resources/src/index.ts index 18c945ecf0..2999fa6902 100644 --- a/server-plugins/collaboration-resources/src/index.ts +++ b/server-plugins/collaboration-resources/src/index.ts @@ -21,7 +21,10 @@ import type { TriggerControl } from '@hcengineering/server-core' /** * @public */ -export async function OnDelete (tx: Tx, { hierarchy, storageFx, removedMap, ctx }: TriggerControl): Promise { +export async function OnDelete ( + tx: Tx, + { hierarchy, storageAdapter, workspace, removedMap, ctx }: TriggerControl +): Promise { const rmTx = TxProcessor.extractTx(tx) as TxRemoveDoc if (rmTx._class !== core.class.TxRemoveDoc) { @@ -44,12 +47,10 @@ export async function OnDelete (tx: Tx, { hierarchy, storageFx, removedMap, ctx } } - storageFx(async (adapter, bucket) => { - // TODO This is not accurate way to delete collaborative document - // Even though we are deleting it here, the document can be currently in use by someone else - // and when editing session ends, the collborator service will recreate the document again - await removeCollaborativeDoc(adapter, bucket, toDelete, ctx) - }) + // TODO This is not accurate way to delete collaborative document + // Even though we are deleting it here, the document can be currently in use by someone else + // and when editing session ends, the collborator service will recreate the document again + await removeCollaborativeDoc(storageAdapter, workspace, toDelete, ctx) return [] } diff --git a/server-plugins/contact-resources/src/index.ts b/server-plugins/contact-resources/src/index.ts index 6a02263968..afca77c3c6 100644 --- a/server-plugins/contact-resources/src/index.ts +++ b/server-plugins/contact-resources/src/index.ts @@ -90,7 +90,7 @@ export async function OnPersonAccountCreate (tx: Tx, control: TriggerControl): P */ export async function OnContactDelete ( tx: Tx, - { findAll, hierarchy, storageFx, removedMap, txFactory, ctx }: TriggerControl + { findAll, hierarchy, storageAdapter, workspace, removedMap, txFactory, ctx }: TriggerControl ): Promise { const rmTx = tx as TxRemoveDoc @@ -112,20 +112,18 @@ export async function OnContactDelete ( return [] } - storageFx(async (adapter, bucket) => { - await adapter.remove(ctx, bucket, [avatar]) + await storageAdapter.remove(ctx, workspace, [avatar]) - if (avatar != null) { - const extra = await adapter.list(ctx, bucket, avatar) - if (extra.length > 0) { - await adapter.remove( - ctx, - bucket, - Array.from(extra.entries()).map((it) => it[1]._id) - ) - } + if (avatar != null) { + const extra = await storageAdapter.list(ctx, workspace, avatar) + if (extra.length > 0) { + await storageAdapter.remove( + ctx, + workspace, + Array.from(extra.entries()).map((it) => it[1]._id) + ) } - }) + } const result: Tx[] = [] diff --git a/server/core/src/configuration.ts b/server/core/src/configuration.ts index 6caaa53cd3..b123cc81c4 100644 --- a/server/core/src/configuration.ts +++ b/server/core/src/configuration.ts @@ -69,5 +69,5 @@ export interface DbConfiguration { contentAdapters: Record serviceAdapters: Record defaultContentAdapter: string - storageFactory?: () => StorageAdapter + storageFactory: () => StorageAdapter } diff --git a/server/core/src/server/index.ts b/server/core/src/server/index.ts index 3a76a0027a..1e6269b02b 100644 --- a/server/core/src/server/index.ts +++ b/server/core/src/server/index.ts @@ -54,7 +54,7 @@ export async function createServerStorage ( const adapters = new Map() const modelDb = new ModelDb(hierarchy) - const storageAdapter = conf.storageFactory?.() + const storageAdapter = conf.storageFactory() await ctx.with('create-adapters', {}, async (ctx) => { for (const key in conf.adapters) { diff --git a/server/core/src/server/storage.ts b/server/core/src/server/storage.ts index c28d995603..359100c66d 100644 --- a/server/core/src/server/storage.ts +++ b/server/core/src/server/storage.ts @@ -65,7 +65,14 @@ import serverCore from '../plugin' import { type ServiceAdaptersManager } from '../service' import { type StorageAdapter } from '../storage' import { type Triggers } from '../triggers' -import type { FullTextAdapter, ObjectDDParticipant, ServerStorageOptions, TriggerControl } from '../types' +import type { + FullTextAdapter, + ObjectDDParticipant, + ServerStorageOptions, + SessionContext, + TriggerControl +} from '../types' +import { SessionContextImpl, createBroadcastEvent } from '../utils' export class TServerStorage implements ServerStorage { private readonly fulltext: FullTextIndex @@ -86,7 +93,7 @@ export class TServerStorage implements ServerStorage { hierarchy: Hierarchy, private readonly triggers: Triggers, private readonly fulltextAdapter: FullTextAdapter, - readonly storageAdapter: StorageAdapter | undefined, + readonly storageAdapter: StorageAdapter, private readonly serviceAdaptersManager: ServiceAdaptersManager, readonly modelDb: ModelDb, private readonly workspace: WorkspaceIdWithUrl, @@ -555,10 +562,82 @@ export class TServerStorage implements ServerStorage { return result } + private async broadcastCtx (derived: SessionOperationContext['derived']): Promise { + const toSendTarget = new Map() + + const getTxes = (key: string): Tx[] => { + let txes = toSendTarget.get(key) + if (txes === undefined) { + txes = [] + toSendTarget.set(key, txes) + } + return txes + } + + // Put current user as send target + for (const txd of derived) { + if (txd.target === undefined) { + getTxes('') // Be sure we have empty one + + // Also add to all other targeted sends + for (const v of toSendTarget.values()) { + v.push(...txd.derived) + } + } else { + for (const t of txd.target) { + getTxes(t).push(...txd.derived) + } + } + } + + const sendWithPart = async ( + derived: Tx[], + target: string | undefined, + exclude: string[] | undefined + ): Promise => { + const classes = new Set>>() + for (const dtx of derived) { + if (this.hierarchy.isDerived(dtx._class, core.class.TxCUD)) { + classes.add((dtx as TxCUD).objectClass) + } + const etx = TxProcessor.extractTx(dtx) + if (this.hierarchy.isDerived(etx._class, core.class.TxCUD)) { + classes.add((etx as TxCUD).objectClass) + } + } + console.log('Broadcasting compact bulk', derived.length) + const bevent = createBroadcastEvent(Array.from(classes)) + this.options.broadcast([bevent], target, exclude) + } + + const handleSend = async (derived: Tx[], target?: string, exclude?: string[]): Promise => { + if (derived.length === 0) { + return + } + + if (derived.length > 10000) { + await sendWithPart(derived, target, exclude) + } else { + // Let's send after our response will go out + console.log('Broadcasting', derived.length, derived.length) + this.options.broadcast(derived, target, exclude) + } + } + + const toSendAll = toSendTarget.get('') ?? [] + toSendTarget.delete('') + + // Then send targeted and all other + for (const [k, v] of toSendTarget.entries()) { + void handleSend(v, k) + } + // Send all other except us. + void handleSend(toSendAll, undefined, Array.from(toSendTarget.keys())) + } + private async processDerived ( ctx: SessionOperationContext, txes: Tx[], - triggerFx: Effects, findAll: ServerStorage['findAll'], removedMap: Map, Doc> ): Promise { @@ -582,21 +661,8 @@ export class TServerStorage implements ServerStorage { const triggerControl: Omit = { removedMap, workspace: this.workspace, - fx: triggerFx.fx, - fulltextFx: (f) => { - triggerFx.fx(() => f(this.fulltextAdapter)) - }, - storageFx: (f) => { - const adapter = this.storageAdapter - if (adapter === undefined) { - return - } - - triggerFx.fx(() => f(adapter, this.workspace)) - }, - serviceFx: (f) => { - triggerFx.fx(() => f(this.serviceAdaptersManager)) - }, + storageAdapter: this.storageAdapter, + serviceAdaptersManager: this.serviceAdaptersManager, findAll: fAll(ctx.ctx), findAllCtx: findAll, modelDb: this.modelDb, @@ -614,26 +680,46 @@ export class TServerStorage implements ServerStorage { } const triggers = await ctx.with('process-triggers', {}, async (ctx) => { const result: Tx[] = [] - result.push( - ...(await this.triggers.apply(ctx, txes, { - ...triggerControl, - ctx: ctx.ctx, - findAll: fAll(ctx.ctx), - result - })) - ) + const { transactions, performAsync } = await this.triggers.apply(ctx, txes, { + ...triggerControl, + ctx: ctx.ctx, + findAll: fAll(ctx.ctx), + result + }) + result.push(...transactions) + + if (performAsync !== undefined) { + const asyncTriggerProcessor = async (): Promise => { + await ctx.with('process-async-triggers', {}, async (ctx) => { + const sctx = ctx as SessionContext + const applyCtx: SessionContextImpl = new SessionContextImpl( + ctx.ctx, + sctx.userEmail, + sctx.sessionId, + sctx.admin, + [] + ) + const result = await performAsync(applyCtx) + // We need to broadcast changes + await this.broadcastCtx([{ derived: result }, ...applyCtx.derived]) + }) + } + setImmediate(() => { + void asyncTriggerProcessor() + }) + } + return result }) const derived = [...removed, ...collections, ...moves, ...triggers] - return await this.processDerivedTxes(derived, ctx, triggerFx, findAll, removedMap) + return await this.processDerivedTxes(derived, ctx, findAll, removedMap) } private async processDerivedTxes ( derived: Tx[], ctx: SessionOperationContext, - triggerFx: Effects, findAll: ServerStorage['findAll'], removedMap: Map, Doc> ): Promise { @@ -643,7 +729,7 @@ export class TServerStorage implements ServerStorage { const nestedTxes: Tx[] = [] if (derived.length > 0) { - nestedTxes.push(...(await this.processDerived(ctx, derived, triggerFx, findAll, removedMap))) + nestedTxes.push(...(await this.processDerived(ctx, derived, findAll, removedMap))) } const res = [...derived, ...nestedTxes] @@ -740,7 +826,6 @@ export class TServerStorage implements ServerStorage { const modelTx: Tx[] = [] const applyTxes: Tx[] = [] const txToProcess: Tx[] = [] - const triggerFx = new Effects() const removedMap = new Map, Doc>() const onEnds: (() => void)[] = [] const result: TxResult[] = [] @@ -779,16 +864,12 @@ export class TServerStorage implements ServerStorage { result.push(...(await ctx.with('apply', {}, (ctx) => this.routeTx(ctx.ctx, removedMap, ...txToProcess)))) // invoke triggers and store derived objects - derived.push(...(await this.processDerived(ctx, txToProcess, triggerFx, _findAll, removedMap))) + derived.push(...(await this.processDerived(ctx, txToProcess, _findAll, removedMap))) // index object await ctx.with('fulltext-tx', {}, async (ctx) => { await this.fulltext.tx(ctx.ctx, [...txToProcess, ...derived]) }) - - for (const fx of triggerFx.effects) { - await fx() - } } catch (err: any) { ctx.ctx.error('error process tx', { error: err }) throw err @@ -825,16 +906,3 @@ export class TServerStorage implements ServerStorage { await this.getAdapter(domain).clean(ctx, domain, docs) } } - -type Effect = () => Promise -class Effects { - private readonly _effects: Effect[] = [] - - public fx = (f: Effect): void => { - this._effects.push(f) - } - - get effects (): Effect[] { - return [...this._effects] - } -} diff --git a/server/core/src/triggers.ts b/server/core/src/triggers.ts index 8956ef24d9..e12557b577 100644 --- a/server/core/src/triggers.ts +++ b/server/core/src/triggers.ts @@ -35,11 +35,15 @@ import type { Trigger, TriggerControl, TriggerFunc } from './types' import serverCore from './plugin' +interface TriggerRecord { + query?: DocumentQuery + trigger: { op: TriggerFunc, resource: Resource, isAsync: boolean } +} /** * @public */ export class Triggers { - private readonly triggers: [DocumentQuery | undefined, TriggerFunc, Resource][] = [] + private readonly triggers: TriggerRecord[] = [] constructor (protected readonly hierarchy: Hierarchy) {} @@ -50,17 +54,59 @@ export class Triggers { if (tx._class === core.class.TxCreateDoc) { const createTx = tx as TxCreateDoc if (createTx.objectClass === serverCore.class.Trigger) { - const trigger = (createTx as TxCreateDoc).attributes.trigger const match = (createTx as TxCreateDoc).attributes.txMatch + + const trigger = (createTx as TxCreateDoc).attributes.trigger const func = await getResource(trigger) - this.triggers.push([match, func, trigger]) + const isAsync = (createTx as TxCreateDoc).attributes.isAsync === true + + this.triggers.push({ + query: match, + trigger: { op: func, resource: trigger, isAsync } + }) } } } - async apply (ctx: SessionOperationContext, tx: Tx[], ctrl: Omit): Promise { + async apply ( + ctx: SessionOperationContext, + tx: Tx[], + ctrl: Omit + ): Promise<{ + transactions: Tx[] + performAsync?: (ctx: SessionOperationContext) => Promise + }> { const result: Tx[] = [] - for (const [query, trigger, resource] of this.triggers) { + + const asyncRequest: { + matches: Tx[] + trigger: TriggerRecord['trigger'] + }[] = [] + + const applyTrigger = async ( + ctx: SessionOperationContext, + matches: Tx[], + trigger: TriggerRecord['trigger'], + result: Tx[] + ): Promise => { + for (const tx of matches) { + result.push( + ...(await trigger.op(tx, { + ...ctrl, + ctx: ctx.ctx, + txFactory: new TxFactory(tx.modifiedBy, true), + findAll: async (clazz, query, options) => await ctrl.findAllCtx(ctx.ctx, clazz, query, options), + apply: async (tx, broadcast, target) => { + return await ctrl.applyCtx(ctx, tx, broadcast, target) + }, + result + })) + ) + } + } + + const promises: Promise[] = [] + for (const { query, trigger } of this.triggers) { let matches = tx if (query !== undefined) { this.addDerived(query, 'objectClass') @@ -68,25 +114,39 @@ export class Triggers { matches = matchQuery(tx, query, core.class.Tx, ctrl.hierarchy) as Tx[] } if (matches.length > 0) { - await ctx.with(resource, {}, async (ctx) => { - for (const tx of matches) { - result.push( - ...(await trigger(tx, { - ...ctrl, - ctx: ctx.ctx, - txFactory: new TxFactory(tx.modifiedBy, true), - findAll: async (clazz, query, options) => await ctrl.findAllCtx(ctx.ctx, clazz, query, options), - apply: async (tx, broadcast, target) => { - return await ctrl.applyCtx(ctx, tx, broadcast, target) - }, - result - })) - ) - } - }) + if (trigger.isAsync) { + asyncRequest.push({ + matches, + trigger + }) + } else { + promises.push( + ctx.with(trigger.resource, {}, async (ctx) => { + await applyTrigger(ctx, matches, trigger, result) + }) + ) + } } } - return result + // Wait all regular triggers to complete in parallel + await Promise.all(promises) + + return { + transactions: result, + performAsync: + asyncRequest.length > 0 + ? async (ctx) => { + // If we have async triggers let's sheculed them after IO phase. + const result: Tx[] = [] + for (const request of asyncRequest) { + await ctx.with(request.trigger.resource, {}, async (ctx) => { + await applyTrigger(ctx, request.matches, request.trigger, result) + }) + } + return result + } + : undefined + } } private addDerived (q: DocumentQuery, key: string): void { diff --git a/server/core/src/types.ts b/server/core/src/types.ts index 420d1b0577..a248938eb7 100644 --- a/server/core/src/types.ts +++ b/server/core/src/types.ts @@ -72,7 +72,7 @@ export interface Middleware { /** * @public */ -export type BroadcastFunc = (tx: Tx[], targets?: string[]) => void +export type BroadcastFunc = (tx: Tx[], targets?: string | string[], exclude?: string[]) => void /** * @public */ @@ -123,12 +123,10 @@ export interface TriggerControl { modelDb: ModelDb removedMap: Map, Doc> - fulltextFx: (f: (adapter: FullTextAdapter) => Promise) => void // Since we don't have other storages let's consider adapter is MinioClient // Later can be replaced with generic one with bucket encapsulated inside. - storageFx: (f: (adapter: StorageAdapter, workspaceId: WorkspaceId) => Promise) => void - fx: (f: () => Promise) => void - serviceFx: (f: (adapter: ServiceAdaptersManager) => Promise) => void + storageAdapter: StorageAdapter + serviceAdaptersManager: ServiceAdaptersManager // Bulk operations in case trigger require some apply: (tx: Tx[], broadcast: boolean, target?: string[]) => Promise applyCtx: (ctx: SessionOperationContext, tx: Tx[], broadcast: boolean, target?: string[]) => Promise @@ -155,6 +153,9 @@ export type TriggerFunc = (tx: Tx, ctrl: TriggerControl) => Promise export interface Trigger extends Doc { trigger: Resource + // In case defiled, trigger will be executed asyncronously after transaction will be done, trigger shouod use + isAsync?: boolean + // We should match transaction txMatch?: DocumentQuery } diff --git a/server/core/src/utils.ts b/server/core/src/utils.ts index 03e9d83d4c..5134532864 100644 --- a/server/core/src/utils.ts +++ b/server/core/src/utils.ts @@ -1,5 +1,18 @@ -import { getTypeOf } from '@hcengineering/core' +import core, { + WorkspaceEvent, + generateId, + getTypeOf, + type BulkUpdateEvent, + type Class, + type Doc, + type FullParamsType, + type MeasureContext, + type ParamsType, + type Ref, + type TxWorkspaceEvent +} from '@hcengineering/core' import { type Hash } from 'crypto' +import type { SessionContext } from './types' /** * Return some estimation for object size @@ -116,3 +129,42 @@ export function updateHashForDoc (hash: Hash, _obj: any): void { } } } + +export class SessionContextImpl implements SessionContext { + constructor ( + readonly ctx: MeasureContext, + readonly userEmail: string, + readonly sessionId: string, + readonly admin: boolean | undefined, + readonly derived: SessionContext['derived'] + ) {} + + with( + name: string, + params: ParamsType, + op: (ctx: SessionContext) => T | Promise, + fullParams?: FullParamsType + ): Promise { + return this.ctx.with( + name, + params, + async (ctx) => await op(new SessionContextImpl(ctx, this.userEmail, this.sessionId, this.admin, this.derived)), + fullParams + ) + } +} + +export function createBroadcastEvent (classes: Ref>[]): TxWorkspaceEvent { + return { + _class: core.class.TxWorkspaceEvent, + _id: generateId(), + event: WorkspaceEvent.BulkUpdate, + params: { + _class: classes + }, + modifiedBy: core.account.System, + modifiedOn: Date.now(), + objectSpace: core.space.DerivedTx, + space: core.space.DerivedTx + } +} diff --git a/server/ws/src/client.ts b/server/ws/src/client.ts index 84b5568d33..e93699c19b 100644 --- a/server/ws/src/client.ts +++ b/server/ws/src/client.ts @@ -17,58 +17,27 @@ import core, { AccountRole, TxFactory, TxProcessor, - WorkspaceEvent, - generateId, toIdMap, type Account, - type BulkUpdateEvent, type Class, type Doc, type DocumentQuery, type FindOptions, type FindResult, - type FullParamsType, type MeasureContext, - type ParamsType, type Ref, type SearchOptions, type SearchQuery, - type SessionOperationContext, type Timestamp, type Tx, type TxApplyIf, type TxApplyResult, - type TxCUD, - type TxWorkspaceEvent + type TxCUD } from '@hcengineering/core' -import { type Pipeline, type SessionContext } from '@hcengineering/server-core' +import { SessionContextImpl, createBroadcastEvent, type Pipeline } from '@hcengineering/server-core' import { type Token } from '@hcengineering/server-token' import { type ClientSessionCtx, type Session, type SessionRequest, type StatisticsElement } from './types' -class SessionContextImpl implements SessionContext { - constructor ( - readonly ctx: MeasureContext, - readonly userEmail: string, - readonly sessionId: string, - readonly admin: boolean | undefined, - readonly derived: SessionContext['derived'] - ) {} - - with( - name: string, - params: ParamsType, - op: (ctx: SessionOperationContext) => T | Promise, - fullParams?: FullParamsType - ): Promise { - return this.ctx.with( - name, - params, - async (ctx) => await op(new SessionContextImpl(ctx, this.userEmail, this.sessionId, this.admin, this.derived)), - fullParams - ) - } -} - /** * @public */ @@ -197,11 +166,6 @@ export class ClientSession implements Session { await ctx.sendResponse(await this._pipeline.searchFulltext(context, query, options)) } - async txRaw (ctx: MeasureContext, tx: Tx): Promise { - // Just do Tx and do not send anything - await this.tx({ ctx, sendResponse: async () => {}, send: async () => {}, sendError: async () => {} }, tx) - } - async tx (ctx: ClientSessionCtx, tx: Tx): Promise { this.lastRequest = Date.now() this.total.tx++ @@ -238,7 +202,7 @@ export class ClientSession implements Session { toSendTarget.set(this.getUser(), []) for (const txd of context.derived) { if (txd.target === undefined) { - getTxes('').push(...txd.derived) + getTxes('') // be sure we have empty one // Also add to all other targeted sends for (const v of toSendTarget.values()) { @@ -320,22 +284,7 @@ export class ClientSession implements Session { } } console.log('Broadcasting compact bulk', derived.length) - const bevent = this.createBroadcastEvent(Array.from(classes)) + const bevent = createBroadcastEvent(Array.from(classes)) await ctx.send([bevent], target, exclude) } - - private createBroadcastEvent (classes: Ref>[]): TxWorkspaceEvent { - return { - _class: core.class.TxWorkspaceEvent, - _id: generateId(), - event: WorkspaceEvent.BulkUpdate, - params: { - _class: classes - }, - modifiedBy: core.account.System, - modifiedOn: Date.now(), - objectSpace: core.space.DerivedTx, - space: core.space.DerivedTx - } - } } diff --git a/server/ws/src/server.ts b/server/ws/src/server.ts index 687466d9fc..1daa28de2d 100644 --- a/server/ws/src/server.ts +++ b/server/ws/src/server.ts @@ -18,7 +18,6 @@ import core, { TxFactory, WorkspaceEvent, generateId, - getWorkspaceId, systemAccountEmail, toWorkspaceString, versionToString, @@ -371,7 +370,8 @@ class TSessionManager implements SessionManager { workspace.sessions.set(session.sessionId, { session, socket: ws }) // We do not need to wait for set-status, just return session to client - void ctx.with('set-status', {}, (ctx) => this.trySetStatus(ctx, session, true)) + const _workspace = workspace + void ctx.with('set-status', {}, (ctx) => this.trySetStatus(ctx, session, true, _workspace.workspaceId)) if (this.timeMinutes > 0) { void ws.send(ctx, { result: this.createMaintenanceWarning() }, session.binaryMode, session.useCompression) @@ -428,20 +428,27 @@ class TSessionManager implements SessionManager { ctx, { ...token.workspace, workspaceUrl, workspaceName }, true, - (tx, targets) => { - this.broadcastAll(workspace, tx, targets) + (tx, targets, exclude) => { + this.broadcastAll(workspace, tx, targets, exclude) } ) return await workspace.pipeline } - broadcastAll (workspace: Workspace, tx: Tx[], targets?: string[]): void { + broadcastAll (workspace: Workspace, tx: Tx[], target?: string | string[], exclude?: string[]): void { if (workspace.upgrade) { return } + if (target !== undefined && !Array.isArray(target)) { + target = [target] + } const ctx = this.ctx.newChild('📬 broadcast-all', {}) const sessions = [...workspace.sessions.values()].filter((it) => { - return it !== undefined && (targets === undefined || targets.includes(it.session.getUser())) + if (it === undefined) { + return false + } + const tt = it.session.getUser() + return (target === undefined && !(exclude ?? []).includes(tt)) || (target?.includes(tt) ?? false) }) function send (): void { for (const session of sessions) { @@ -540,18 +547,28 @@ class TSessionManager implements SessionManager { return workspace } - private async trySetStatus (ctx: MeasureContext, session: Session, online: boolean): Promise { + private async trySetStatus ( + ctx: MeasureContext, + session: Session, + online: boolean, + workspaceId: WorkspaceId + ): Promise { const current = this.statusPromises.get(session.getUser()) if (current !== undefined) { await current } - const promise = this.setStatus(ctx, session, online) + const promise = this.setStatus(ctx, session, online, workspaceId) this.statusPromises.set(session.getUser(), promise) await promise this.statusPromises.delete(session.getUser()) } - private async setStatus (ctx: MeasureContext, session: Session, online: boolean): Promise { + private async setStatus ( + ctx: MeasureContext, + session: Session, + online: boolean, + workspaceId: WorkspaceId + ): Promise { try { const user = ( await session.pipeline().modelDb.findAll( @@ -563,6 +580,20 @@ class TSessionManager implements SessionManager { ) )[0] if (user === undefined) return + + const clientCtx: ClientSessionCtx = { + sendResponse: async (msg) => { + // No response + }, + ctx, + send: async (msg, target, exclude) => { + this.broadcast(null, workspaceId, msg, target, exclude) + }, + sendError: async (msg, error: Status) => { + // Assume no error send + } + } + const status = (await session.findAllRaw(ctx, core.class.UserStatus, { user: user._id }, { limit: 1 }))[0] const txFactory = new TxFactory(user._id, true) if (status === undefined) { @@ -570,12 +601,12 @@ class TSessionManager implements SessionManager { online, user: user._id }) - await session.txRaw(ctx, tx) + await session.tx(clientCtx, tx) } else if (status.online !== online) { const tx = txFactory.createTxUpdateDoc(status._class, status.space, status._id, { online }) - await session.txRaw(ctx, tx) + await session.tx(clientCtx, tx) } } catch {} } @@ -607,7 +638,7 @@ class TSessionManager implements SessionManager { if (workspace !== undefined) { const another = Array.from(workspace.sessions.values()).findIndex((p) => p.session.getUser() === user) if (another === -1 && !workspace.upgrade) { - void this.trySetStatus(workspace.context, sessionRef.session, false) + void this.trySetStatus(workspace.context, sessionRef.session, false, workspace.workspaceId) } } }, this.timeouts.reconnectTimeout) @@ -761,7 +792,7 @@ class TSessionManager implements SessionManager { service: S, ws: ConnectionSocket, request: Request, - workspace: string + workspace: string // wsId, toWorkspaceString() ): void { const userCtx = requestCtx.newChild('📞 client', { workspace: '🧲 ' + workspace @@ -778,16 +809,26 @@ class TSessionManager implements SessionManager { const delta = Date.now() - request.time userCtx.measure('receive msg', delta) } + const wsRef = this.workspaces.get(workspace) + if (wsRef === undefined) { + await ws.send( + ctx, + { + id: request.id, + error: unknownError('No workspace') + }, + service.binaryMode, + service.useCompression + ) + return + } if (request.method === 'forceClose') { - const wsRef = this.workspaces.get(workspace) let done = false - if (wsRef !== undefined) { - if (wsRef.upgrade) { - done = true - console.log('FORCE CLOSE', workspace) - // In case of upgrade, we need to force close workspace not in interval handler - await this.forceClose(workspace, ws) - } + if (wsRef.upgrade) { + done = true + console.log('FORCE CLOSE', workspace) + // In case of upgrade, we need to force close workspace not in interval handler + await this.forceClose(workspace, ws) } const forceCloseResponse: Response = { id: request.id, @@ -840,7 +881,7 @@ class TSessionManager implements SessionManager { }, ctx, send: async (msg, target, exclude) => { - this.broadcast(service, getWorkspaceId(workspace), msg, target, exclude) + this.broadcast(service, wsRef.workspaceId, msg, target, exclude) }, sendError: async (msg, error: Status) => { await sendResponse(ctx, service, ws, { diff --git a/server/ws/src/server_http.ts b/server/ws/src/server_http.ts index ffa91f7ac4..bf9178b519 100644 --- a/server/ws/src/server_http.ts +++ b/server/ws/src/server_http.ts @@ -234,9 +234,9 @@ export function startHttpServer ( try { let buff: any | undefined if (msg instanceof Buffer) { - buff = msg + buff = Buffer.copyBytesFrom(msg) } else if (Array.isArray(msg)) { - buff = Buffer.concat(msg) + buff = Buffer.copyBytesFrom(Buffer.concat(msg)) } if (buff !== undefined) { doSessionOp(webSocketData, (s) => { @@ -347,24 +347,26 @@ function createWebsocketClientSocket ( }, data: () => data, send: async (ctx: MeasureContext, msg, binary, compression) => { - if (ws.readyState !== ws.OPEN && !cs.isClosed) { - return 0 - } const smsg = serialize(msg, binary) - while (ws.bufferedAmount > 16 * 1024 && ws.readyState === ws.OPEN) { - await new Promise((resolve) => { - setImmediate(resolve) - }) - } ctx.measure('send-data', smsg.length) await new Promise((resolve, reject) => { - ws.send(smsg, { binary: true, compress: compression }, (err) => { - if (err != null) { - reject(err) + const doSend = (): void => { + if (ws.readyState !== ws.OPEN && !cs.isClosed) { + return } - resolve() - }) + if (ws.bufferedAmount > 16 * 1024) { + setImmediate(doSend) + return + } + ws.send(smsg, { binary: true, compress: compression }, (err) => { + if (err != null) { + reject(err) + } + resolve() + }) + } + doSend() }) return smsg.length } diff --git a/server/ws/src/types.ts b/server/ws/src/types.ts index 810903142a..b454c2fbec 100644 --- a/server/ws/src/types.ts +++ b/server/ws/src/types.ts @@ -62,7 +62,6 @@ export interface Session { options?: FindOptions ) => Promise> tx: (ctx: ClientSessionCtx, tx: Tx) => Promise - txRaw: (ctx: MeasureContext, tx: Tx) => Promise // Session restore information sessionId: string