UBERF-6888: Async triggers (#5565)

This commit is contained in:
Andrey Sobolev 2024-05-10 13:10:04 +07:00 committed by GitHub
parent 0a8d3cce69
commit 7625b73f08
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 401 additions and 237 deletions

View File

@ -39,6 +39,7 @@ export function createModel (builder: Builder): void {
})
builder.createDoc(serverCore.class.Trigger, core.space.Model, {
trigger: serverActivity.trigger.ReferenceTrigger
trigger: serverActivity.trigger.ReferenceTrigger,
isAsync: true
})
}

View File

@ -83,6 +83,7 @@ export function createModel (builder: Builder): void {
_class: core.class.TxCollectionCUD,
'tx._class': core.class.TxCreateDoc,
'tx.objectClass': chunter.class.ChatMessage
}
},
isAsync: true
})
}

View File

@ -597,27 +597,25 @@ async function ActivityReferenceCreate (tx: TxCUD<Doc>, control: TriggerControl)
if (control.hierarchy.isDerived(ctx.objectClass, notification.class.InboxNotification)) return []
if (control.hierarchy.isDerived(ctx.objectClass, activity.class.ActivityReference)) return []
control.storageFx(async (adapter) => {
const txFactory = new TxFactory(control.txFactory.account)
const txFactory = new TxFactory(control.txFactory.account)
const doc = TxProcessor.createDoc2Doc(ctx)
const targetTx = guessReferenceTx(control.hierarchy, tx)
const doc = TxProcessor.createDoc2Doc(ctx)
const targetTx = guessReferenceTx(control.hierarchy, tx)
const txes: Tx[] = await getCreateReferencesTxes(
control,
adapter,
txFactory,
doc,
targetTx.objectId,
targetTx.objectClass,
targetTx.objectSpace,
tx
)
const txes: Tx[] = await getCreateReferencesTxes(
control,
control.storageAdapter,
txFactory,
doc,
targetTx.objectId,
targetTx.objectClass,
targetTx.objectSpace,
tx
)
if (txes.length !== 0) {
await control.apply(txes, true)
}
})
if (txes.length !== 0) {
await control.apply(txes, true)
}
return []
}
@ -647,26 +645,24 @@ async function ActivityReferenceUpdate (tx: TxCUD<Doc>, control: TriggerControl)
return []
}
control.storageFx(async (adapter) => {
const txFactory = new TxFactory(control.txFactory.account)
const doc = TxProcessor.updateDoc2Doc(rawDoc, ctx)
const targetTx = guessReferenceTx(control.hierarchy, tx)
const txFactory = new TxFactory(control.txFactory.account)
const doc = TxProcessor.updateDoc2Doc(rawDoc, ctx)
const targetTx = guessReferenceTx(control.hierarchy, tx)
const txes: Tx[] = await getUpdateReferencesTxes(
control,
adapter,
txFactory,
doc,
targetTx.objectId,
targetTx.objectClass,
targetTx.objectSpace,
tx
)
const txes: Tx[] = await getUpdateReferencesTxes(
control,
control.storageAdapter,
txFactory,
doc,
targetTx.objectId,
targetTx.objectClass,
targetTx.objectSpace,
tx
)
if (txes.length !== 0) {
await control.apply(txes, true)
}
})
if (txes.length !== 0) {
await control.apply(txes, true)
}
return []
}

View File

@ -15,7 +15,7 @@
//
import type { Attachment } from '@hcengineering/attachment'
import type { Doc, Ref, Tx, TxRemoveDoc } from '@hcengineering/core'
import type { Tx, TxRemoveDoc } from '@hcengineering/core'
import { TxProcessor } from '@hcengineering/core'
import type { TriggerControl } from '@hcengineering/server-core'
@ -24,7 +24,7 @@ import type { TriggerControl } from '@hcengineering/server-core'
*/
export async function OnAttachmentDelete (
tx: Tx,
{ findAll, hierarchy, fulltextFx, storageFx, removedMap, ctx }: TriggerControl
{ removedMap, ctx, storageAdapter, workspace }: TriggerControl
): Promise<Tx[]> {
const rmTx = TxProcessor.extractTx(tx) as TxRemoveDoc<Attachment>
@ -34,13 +34,8 @@ export async function OnAttachmentDelete (
if (attach === undefined) {
return []
}
fulltextFx(async (adapter) => {
await adapter.remove([attach.file as Ref<Doc>])
})
storageFx(async (adapter, bucket) => {
await adapter.remove(ctx, bucket, [attach.file])
})
await storageAdapter.remove(ctx, workspace, [attach.file])
return []
}

View File

@ -21,7 +21,10 @@ import type { TriggerControl } from '@hcengineering/server-core'
/**
* @public
*/
export async function OnDelete (tx: Tx, { hierarchy, storageFx, removedMap, ctx }: TriggerControl): Promise<Tx[]> {
export async function OnDelete (
tx: Tx,
{ hierarchy, storageAdapter, workspace, removedMap, ctx }: TriggerControl
): Promise<Tx[]> {
const rmTx = TxProcessor.extractTx(tx) as TxRemoveDoc<Doc>
if (rmTx._class !== core.class.TxRemoveDoc) {
@ -44,12 +47,10 @@ export async function OnDelete (tx: Tx, { hierarchy, storageFx, removedMap, ctx
}
}
storageFx(async (adapter, bucket) => {
// TODO This is not accurate way to delete collaborative document
// Even though we are deleting it here, the document can be currently in use by someone else
// and when editing session ends, the collborator service will recreate the document again
await removeCollaborativeDoc(adapter, bucket, toDelete, ctx)
})
// TODO This is not accurate way to delete collaborative document
// Even though we are deleting it here, the document can be currently in use by someone else
// and when editing session ends, the collborator service will recreate the document again
await removeCollaborativeDoc(storageAdapter, workspace, toDelete, ctx)
return []
}

View File

@ -90,7 +90,7 @@ export async function OnPersonAccountCreate (tx: Tx, control: TriggerControl): P
*/
export async function OnContactDelete (
tx: Tx,
{ findAll, hierarchy, storageFx, removedMap, txFactory, ctx }: TriggerControl
{ findAll, hierarchy, storageAdapter, workspace, removedMap, txFactory, ctx }: TriggerControl
): Promise<Tx[]> {
const rmTx = tx as TxRemoveDoc<Contact>
@ -112,20 +112,18 @@ export async function OnContactDelete (
return []
}
storageFx(async (adapter, bucket) => {
await adapter.remove(ctx, bucket, [avatar])
await storageAdapter.remove(ctx, workspace, [avatar])
if (avatar != null) {
const extra = await adapter.list(ctx, bucket, avatar)
if (extra.length > 0) {
await adapter.remove(
ctx,
bucket,
Array.from(extra.entries()).map((it) => it[1]._id)
)
}
if (avatar != null) {
const extra = await storageAdapter.list(ctx, workspace, avatar)
if (extra.length > 0) {
await storageAdapter.remove(
ctx,
workspace,
Array.from(extra.entries()).map((it) => it[1]._id)
)
}
})
}
const result: Tx[] = []

View File

@ -69,5 +69,5 @@ export interface DbConfiguration {
contentAdapters: Record<string, ContentTextAdapterConfiguration>
serviceAdapters: Record<string, ServiceAdapterConfig>
defaultContentAdapter: string
storageFactory?: () => StorageAdapter
storageFactory: () => StorageAdapter
}

View File

@ -54,7 +54,7 @@ export async function createServerStorage (
const adapters = new Map<string, DbAdapter>()
const modelDb = new ModelDb(hierarchy)
const storageAdapter = conf.storageFactory?.()
const storageAdapter = conf.storageFactory()
await ctx.with('create-adapters', {}, async (ctx) => {
for (const key in conf.adapters) {

View File

@ -65,7 +65,14 @@ import serverCore from '../plugin'
import { type ServiceAdaptersManager } from '../service'
import { type StorageAdapter } from '../storage'
import { type Triggers } from '../triggers'
import type { FullTextAdapter, ObjectDDParticipant, ServerStorageOptions, TriggerControl } from '../types'
import type {
FullTextAdapter,
ObjectDDParticipant,
ServerStorageOptions,
SessionContext,
TriggerControl
} from '../types'
import { SessionContextImpl, createBroadcastEvent } from '../utils'
export class TServerStorage implements ServerStorage {
private readonly fulltext: FullTextIndex
@ -86,7 +93,7 @@ export class TServerStorage implements ServerStorage {
hierarchy: Hierarchy,
private readonly triggers: Triggers,
private readonly fulltextAdapter: FullTextAdapter,
readonly storageAdapter: StorageAdapter | undefined,
readonly storageAdapter: StorageAdapter,
private readonly serviceAdaptersManager: ServiceAdaptersManager,
readonly modelDb: ModelDb,
private readonly workspace: WorkspaceIdWithUrl,
@ -555,10 +562,82 @@ export class TServerStorage implements ServerStorage {
return result
}
private async broadcastCtx (derived: SessionOperationContext['derived']): Promise<void> {
const toSendTarget = new Map<string, Tx[]>()
const getTxes = (key: string): Tx[] => {
let txes = toSendTarget.get(key)
if (txes === undefined) {
txes = []
toSendTarget.set(key, txes)
}
return txes
}
// Put current user as send target
for (const txd of derived) {
if (txd.target === undefined) {
getTxes('') // Be sure we have empty one
// Also add to all other targeted sends
for (const v of toSendTarget.values()) {
v.push(...txd.derived)
}
} else {
for (const t of txd.target) {
getTxes(t).push(...txd.derived)
}
}
}
const sendWithPart = async (
derived: Tx[],
target: string | undefined,
exclude: string[] | undefined
): Promise<void> => {
const classes = new Set<Ref<Class<Doc>>>()
for (const dtx of derived) {
if (this.hierarchy.isDerived(dtx._class, core.class.TxCUD)) {
classes.add((dtx as TxCUD<Doc>).objectClass)
}
const etx = TxProcessor.extractTx(dtx)
if (this.hierarchy.isDerived(etx._class, core.class.TxCUD)) {
classes.add((etx as TxCUD<Doc>).objectClass)
}
}
console.log('Broadcasting compact bulk', derived.length)
const bevent = createBroadcastEvent(Array.from(classes))
this.options.broadcast([bevent], target, exclude)
}
const handleSend = async (derived: Tx[], target?: string, exclude?: string[]): Promise<void> => {
if (derived.length === 0) {
return
}
if (derived.length > 10000) {
await sendWithPart(derived, target, exclude)
} else {
// Let's send after our response will go out
console.log('Broadcasting', derived.length, derived.length)
this.options.broadcast(derived, target, exclude)
}
}
const toSendAll = toSendTarget.get('') ?? []
toSendTarget.delete('')
// Then send targeted and all other
for (const [k, v] of toSendTarget.entries()) {
void handleSend(v, k)
}
// Send all other except us.
void handleSend(toSendAll, undefined, Array.from(toSendTarget.keys()))
}
private async processDerived (
ctx: SessionOperationContext,
txes: Tx[],
triggerFx: Effects,
findAll: ServerStorage['findAll'],
removedMap: Map<Ref<Doc>, Doc>
): Promise<Tx[]> {
@ -582,21 +661,8 @@ export class TServerStorage implements ServerStorage {
const triggerControl: Omit<TriggerControl, 'txFactory' | 'ctx' | 'result'> = {
removedMap,
workspace: this.workspace,
fx: triggerFx.fx,
fulltextFx: (f) => {
triggerFx.fx(() => f(this.fulltextAdapter))
},
storageFx: (f) => {
const adapter = this.storageAdapter
if (adapter === undefined) {
return
}
triggerFx.fx(() => f(adapter, this.workspace))
},
serviceFx: (f) => {
triggerFx.fx(() => f(this.serviceAdaptersManager))
},
storageAdapter: this.storageAdapter,
serviceAdaptersManager: this.serviceAdaptersManager,
findAll: fAll(ctx.ctx),
findAllCtx: findAll,
modelDb: this.modelDb,
@ -614,26 +680,46 @@ export class TServerStorage implements ServerStorage {
}
const triggers = await ctx.with('process-triggers', {}, async (ctx) => {
const result: Tx[] = []
result.push(
...(await this.triggers.apply(ctx, txes, {
...triggerControl,
ctx: ctx.ctx,
findAll: fAll(ctx.ctx),
result
}))
)
const { transactions, performAsync } = await this.triggers.apply(ctx, txes, {
...triggerControl,
ctx: ctx.ctx,
findAll: fAll(ctx.ctx),
result
})
result.push(...transactions)
if (performAsync !== undefined) {
const asyncTriggerProcessor = async (): Promise<void> => {
await ctx.with('process-async-triggers', {}, async (ctx) => {
const sctx = ctx as SessionContext
const applyCtx: SessionContextImpl = new SessionContextImpl(
ctx.ctx,
sctx.userEmail,
sctx.sessionId,
sctx.admin,
[]
)
const result = await performAsync(applyCtx)
// We need to broadcast changes
await this.broadcastCtx([{ derived: result }, ...applyCtx.derived])
})
}
setImmediate(() => {
void asyncTriggerProcessor()
})
}
return result
})
const derived = [...removed, ...collections, ...moves, ...triggers]
return await this.processDerivedTxes(derived, ctx, triggerFx, findAll, removedMap)
return await this.processDerivedTxes(derived, ctx, findAll, removedMap)
}
private async processDerivedTxes (
derived: Tx[],
ctx: SessionOperationContext,
triggerFx: Effects,
findAll: ServerStorage['findAll'],
removedMap: Map<Ref<Doc>, Doc>
): Promise<Tx[]> {
@ -643,7 +729,7 @@ export class TServerStorage implements ServerStorage {
const nestedTxes: Tx[] = []
if (derived.length > 0) {
nestedTxes.push(...(await this.processDerived(ctx, derived, triggerFx, findAll, removedMap)))
nestedTxes.push(...(await this.processDerived(ctx, derived, findAll, removedMap)))
}
const res = [...derived, ...nestedTxes]
@ -740,7 +826,6 @@ export class TServerStorage implements ServerStorage {
const modelTx: Tx[] = []
const applyTxes: Tx[] = []
const txToProcess: Tx[] = []
const triggerFx = new Effects()
const removedMap = new Map<Ref<Doc>, Doc>()
const onEnds: (() => void)[] = []
const result: TxResult[] = []
@ -779,16 +864,12 @@ export class TServerStorage implements ServerStorage {
result.push(...(await ctx.with('apply', {}, (ctx) => this.routeTx(ctx.ctx, removedMap, ...txToProcess))))
// invoke triggers and store derived objects
derived.push(...(await this.processDerived(ctx, txToProcess, triggerFx, _findAll, removedMap)))
derived.push(...(await this.processDerived(ctx, txToProcess, _findAll, removedMap)))
// index object
await ctx.with('fulltext-tx', {}, async (ctx) => {
await this.fulltext.tx(ctx.ctx, [...txToProcess, ...derived])
})
for (const fx of triggerFx.effects) {
await fx()
}
} catch (err: any) {
ctx.ctx.error('error process tx', { error: err })
throw err
@ -825,16 +906,3 @@ export class TServerStorage implements ServerStorage {
await this.getAdapter(domain).clean(ctx, domain, docs)
}
}
type Effect = () => Promise<void>
class Effects {
private readonly _effects: Effect[] = []
public fx = (f: Effect): void => {
this._effects.push(f)
}
get effects (): Effect[] {
return [...this._effects]
}
}

View File

@ -35,11 +35,15 @@ import type { Trigger, TriggerControl, TriggerFunc } from './types'
import serverCore from './plugin'
interface TriggerRecord {
query?: DocumentQuery<Tx>
trigger: { op: TriggerFunc, resource: Resource<TriggerFunc>, isAsync: boolean }
}
/**
* @public
*/
export class Triggers {
private readonly triggers: [DocumentQuery<Tx> | undefined, TriggerFunc, Resource<TriggerFunc>][] = []
private readonly triggers: TriggerRecord[] = []
constructor (protected readonly hierarchy: Hierarchy) {}
@ -50,17 +54,59 @@ export class Triggers {
if (tx._class === core.class.TxCreateDoc) {
const createTx = tx as TxCreateDoc<Doc>
if (createTx.objectClass === serverCore.class.Trigger) {
const trigger = (createTx as TxCreateDoc<Trigger>).attributes.trigger
const match = (createTx as TxCreateDoc<Trigger>).attributes.txMatch
const trigger = (createTx as TxCreateDoc<Trigger>).attributes.trigger
const func = await getResource(trigger)
this.triggers.push([match, func, trigger])
const isAsync = (createTx as TxCreateDoc<Trigger>).attributes.isAsync === true
this.triggers.push({
query: match,
trigger: { op: func, resource: trigger, isAsync }
})
}
}
}
async apply (ctx: SessionOperationContext, tx: Tx[], ctrl: Omit<TriggerControl, 'txFactory'>): Promise<Tx[]> {
async apply (
ctx: SessionOperationContext,
tx: Tx[],
ctrl: Omit<TriggerControl, 'txFactory'>
): Promise<{
transactions: Tx[]
performAsync?: (ctx: SessionOperationContext) => Promise<Tx[]>
}> {
const result: Tx[] = []
for (const [query, trigger, resource] of this.triggers) {
const asyncRequest: {
matches: Tx[]
trigger: TriggerRecord['trigger']
}[] = []
const applyTrigger = async (
ctx: SessionOperationContext,
matches: Tx[],
trigger: TriggerRecord['trigger'],
result: Tx[]
): Promise<void> => {
for (const tx of matches) {
result.push(
...(await trigger.op(tx, {
...ctrl,
ctx: ctx.ctx,
txFactory: new TxFactory(tx.modifiedBy, true),
findAll: async (clazz, query, options) => await ctrl.findAllCtx(ctx.ctx, clazz, query, options),
apply: async (tx, broadcast, target) => {
return await ctrl.applyCtx(ctx, tx, broadcast, target)
},
result
}))
)
}
}
const promises: Promise<void>[] = []
for (const { query, trigger } of this.triggers) {
let matches = tx
if (query !== undefined) {
this.addDerived(query, 'objectClass')
@ -68,25 +114,39 @@ export class Triggers {
matches = matchQuery(tx, query, core.class.Tx, ctrl.hierarchy) as Tx[]
}
if (matches.length > 0) {
await ctx.with(resource, {}, async (ctx) => {
for (const tx of matches) {
result.push(
...(await trigger(tx, {
...ctrl,
ctx: ctx.ctx,
txFactory: new TxFactory(tx.modifiedBy, true),
findAll: async (clazz, query, options) => await ctrl.findAllCtx(ctx.ctx, clazz, query, options),
apply: async (tx, broadcast, target) => {
return await ctrl.applyCtx(ctx, tx, broadcast, target)
},
result
}))
)
}
})
if (trigger.isAsync) {
asyncRequest.push({
matches,
trigger
})
} else {
promises.push(
ctx.with(trigger.resource, {}, async (ctx) => {
await applyTrigger(ctx, matches, trigger, result)
})
)
}
}
}
return result
// Wait all regular triggers to complete in parallel
await Promise.all(promises)
return {
transactions: result,
performAsync:
asyncRequest.length > 0
? async (ctx) => {
// If we have async triggers let's sheculed them after IO phase.
const result: Tx[] = []
for (const request of asyncRequest) {
await ctx.with(request.trigger.resource, {}, async (ctx) => {
await applyTrigger(ctx, request.matches, request.trigger, result)
})
}
return result
}
: undefined
}
}
private addDerived (q: DocumentQuery<Tx>, key: string): void {

View File

@ -72,7 +72,7 @@ export interface Middleware {
/**
* @public
*/
export type BroadcastFunc = (tx: Tx[], targets?: string[]) => void
export type BroadcastFunc = (tx: Tx[], targets?: string | string[], exclude?: string[]) => void
/**
* @public
*/
@ -123,12 +123,10 @@ export interface TriggerControl {
modelDb: ModelDb
removedMap: Map<Ref<Doc>, Doc>
fulltextFx: (f: (adapter: FullTextAdapter) => Promise<void>) => void
// Since we don't have other storages let's consider adapter is MinioClient
// Later can be replaced with generic one with bucket encapsulated inside.
storageFx: (f: (adapter: StorageAdapter, workspaceId: WorkspaceId) => Promise<void>) => void
fx: (f: () => Promise<void>) => void
serviceFx: (f: (adapter: ServiceAdaptersManager) => Promise<void>) => void
storageAdapter: StorageAdapter
serviceAdaptersManager: ServiceAdaptersManager
// Bulk operations in case trigger require some
apply: (tx: Tx[], broadcast: boolean, target?: string[]) => Promise<TxResult>
applyCtx: (ctx: SessionOperationContext, tx: Tx[], broadcast: boolean, target?: string[]) => Promise<TxResult>
@ -155,6 +153,9 @@ export type TriggerFunc = (tx: Tx, ctrl: TriggerControl) => Promise<Tx[]>
export interface Trigger extends Doc {
trigger: Resource<TriggerFunc>
// In case defiled, trigger will be executed asyncronously after transaction will be done, trigger shouod use
isAsync?: boolean
// We should match transaction
txMatch?: DocumentQuery<Tx>
}

View File

@ -1,5 +1,18 @@
import { getTypeOf } from '@hcengineering/core'
import core, {
WorkspaceEvent,
generateId,
getTypeOf,
type BulkUpdateEvent,
type Class,
type Doc,
type FullParamsType,
type MeasureContext,
type ParamsType,
type Ref,
type TxWorkspaceEvent
} from '@hcengineering/core'
import { type Hash } from 'crypto'
import type { SessionContext } from './types'
/**
* Return some estimation for object size
@ -116,3 +129,42 @@ export function updateHashForDoc (hash: Hash, _obj: any): void {
}
}
}
export class SessionContextImpl implements SessionContext {
constructor (
readonly ctx: MeasureContext,
readonly userEmail: string,
readonly sessionId: string,
readonly admin: boolean | undefined,
readonly derived: SessionContext['derived']
) {}
with<T>(
name: string,
params: ParamsType,
op: (ctx: SessionContext) => T | Promise<T>,
fullParams?: FullParamsType
): Promise<T> {
return this.ctx.with(
name,
params,
async (ctx) => await op(new SessionContextImpl(ctx, this.userEmail, this.sessionId, this.admin, this.derived)),
fullParams
)
}
}
export function createBroadcastEvent (classes: Ref<Class<Doc>>[]): TxWorkspaceEvent<BulkUpdateEvent> {
return {
_class: core.class.TxWorkspaceEvent,
_id: generateId(),
event: WorkspaceEvent.BulkUpdate,
params: {
_class: classes
},
modifiedBy: core.account.System,
modifiedOn: Date.now(),
objectSpace: core.space.DerivedTx,
space: core.space.DerivedTx
}
}

View File

@ -17,58 +17,27 @@ import core, {
AccountRole,
TxFactory,
TxProcessor,
WorkspaceEvent,
generateId,
toIdMap,
type Account,
type BulkUpdateEvent,
type Class,
type Doc,
type DocumentQuery,
type FindOptions,
type FindResult,
type FullParamsType,
type MeasureContext,
type ParamsType,
type Ref,
type SearchOptions,
type SearchQuery,
type SessionOperationContext,
type Timestamp,
type Tx,
type TxApplyIf,
type TxApplyResult,
type TxCUD,
type TxWorkspaceEvent
type TxCUD
} from '@hcengineering/core'
import { type Pipeline, type SessionContext } from '@hcengineering/server-core'
import { SessionContextImpl, createBroadcastEvent, type Pipeline } from '@hcengineering/server-core'
import { type Token } from '@hcengineering/server-token'
import { type ClientSessionCtx, type Session, type SessionRequest, type StatisticsElement } from './types'
class SessionContextImpl implements SessionContext {
constructor (
readonly ctx: MeasureContext,
readonly userEmail: string,
readonly sessionId: string,
readonly admin: boolean | undefined,
readonly derived: SessionContext['derived']
) {}
with<T>(
name: string,
params: ParamsType,
op: (ctx: SessionOperationContext) => T | Promise<T>,
fullParams?: FullParamsType
): Promise<T> {
return this.ctx.with(
name,
params,
async (ctx) => await op(new SessionContextImpl(ctx, this.userEmail, this.sessionId, this.admin, this.derived)),
fullParams
)
}
}
/**
* @public
*/
@ -197,11 +166,6 @@ export class ClientSession implements Session {
await ctx.sendResponse(await this._pipeline.searchFulltext(context, query, options))
}
async txRaw (ctx: MeasureContext, tx: Tx): Promise<void> {
// Just do Tx and do not send anything
await this.tx({ ctx, sendResponse: async () => {}, send: async () => {}, sendError: async () => {} }, tx)
}
async tx (ctx: ClientSessionCtx, tx: Tx): Promise<void> {
this.lastRequest = Date.now()
this.total.tx++
@ -238,7 +202,7 @@ export class ClientSession implements Session {
toSendTarget.set(this.getUser(), [])
for (const txd of context.derived) {
if (txd.target === undefined) {
getTxes('').push(...txd.derived)
getTxes('') // be sure we have empty one
// Also add to all other targeted sends
for (const v of toSendTarget.values()) {
@ -320,22 +284,7 @@ export class ClientSession implements Session {
}
}
console.log('Broadcasting compact bulk', derived.length)
const bevent = this.createBroadcastEvent(Array.from(classes))
const bevent = createBroadcastEvent(Array.from(classes))
await ctx.send([bevent], target, exclude)
}
private createBroadcastEvent (classes: Ref<Class<Doc>>[]): TxWorkspaceEvent<BulkUpdateEvent> {
return {
_class: core.class.TxWorkspaceEvent,
_id: generateId(),
event: WorkspaceEvent.BulkUpdate,
params: {
_class: classes
},
modifiedBy: core.account.System,
modifiedOn: Date.now(),
objectSpace: core.space.DerivedTx,
space: core.space.DerivedTx
}
}
}

View File

@ -18,7 +18,6 @@ import core, {
TxFactory,
WorkspaceEvent,
generateId,
getWorkspaceId,
systemAccountEmail,
toWorkspaceString,
versionToString,
@ -371,7 +370,8 @@ class TSessionManager implements SessionManager {
workspace.sessions.set(session.sessionId, { session, socket: ws })
// We do not need to wait for set-status, just return session to client
void ctx.with('set-status', {}, (ctx) => this.trySetStatus(ctx, session, true))
const _workspace = workspace
void ctx.with('set-status', {}, (ctx) => this.trySetStatus(ctx, session, true, _workspace.workspaceId))
if (this.timeMinutes > 0) {
void ws.send(ctx, { result: this.createMaintenanceWarning() }, session.binaryMode, session.useCompression)
@ -428,20 +428,27 @@ class TSessionManager implements SessionManager {
ctx,
{ ...token.workspace, workspaceUrl, workspaceName },
true,
(tx, targets) => {
this.broadcastAll(workspace, tx, targets)
(tx, targets, exclude) => {
this.broadcastAll(workspace, tx, targets, exclude)
}
)
return await workspace.pipeline
}
broadcastAll (workspace: Workspace, tx: Tx[], targets?: string[]): void {
broadcastAll (workspace: Workspace, tx: Tx[], target?: string | string[], exclude?: string[]): void {
if (workspace.upgrade) {
return
}
if (target !== undefined && !Array.isArray(target)) {
target = [target]
}
const ctx = this.ctx.newChild('📬 broadcast-all', {})
const sessions = [...workspace.sessions.values()].filter((it) => {
return it !== undefined && (targets === undefined || targets.includes(it.session.getUser()))
if (it === undefined) {
return false
}
const tt = it.session.getUser()
return (target === undefined && !(exclude ?? []).includes(tt)) || (target?.includes(tt) ?? false)
})
function send (): void {
for (const session of sessions) {
@ -540,18 +547,28 @@ class TSessionManager implements SessionManager {
return workspace
}
private async trySetStatus (ctx: MeasureContext, session: Session, online: boolean): Promise<void> {
private async trySetStatus (
ctx: MeasureContext,
session: Session,
online: boolean,
workspaceId: WorkspaceId
): Promise<void> {
const current = this.statusPromises.get(session.getUser())
if (current !== undefined) {
await current
}
const promise = this.setStatus(ctx, session, online)
const promise = this.setStatus(ctx, session, online, workspaceId)
this.statusPromises.set(session.getUser(), promise)
await promise
this.statusPromises.delete(session.getUser())
}
private async setStatus (ctx: MeasureContext, session: Session, online: boolean): Promise<void> {
private async setStatus (
ctx: MeasureContext,
session: Session,
online: boolean,
workspaceId: WorkspaceId
): Promise<void> {
try {
const user = (
await session.pipeline().modelDb.findAll(
@ -563,6 +580,20 @@ class TSessionManager implements SessionManager {
)
)[0]
if (user === undefined) return
const clientCtx: ClientSessionCtx = {
sendResponse: async (msg) => {
// No response
},
ctx,
send: async (msg, target, exclude) => {
this.broadcast(null, workspaceId, msg, target, exclude)
},
sendError: async (msg, error: Status) => {
// Assume no error send
}
}
const status = (await session.findAllRaw(ctx, core.class.UserStatus, { user: user._id }, { limit: 1 }))[0]
const txFactory = new TxFactory(user._id, true)
if (status === undefined) {
@ -570,12 +601,12 @@ class TSessionManager implements SessionManager {
online,
user: user._id
})
await session.txRaw(ctx, tx)
await session.tx(clientCtx, tx)
} else if (status.online !== online) {
const tx = txFactory.createTxUpdateDoc(status._class, status.space, status._id, {
online
})
await session.txRaw(ctx, tx)
await session.tx(clientCtx, tx)
}
} catch {}
}
@ -607,7 +638,7 @@ class TSessionManager implements SessionManager {
if (workspace !== undefined) {
const another = Array.from(workspace.sessions.values()).findIndex((p) => p.session.getUser() === user)
if (another === -1 && !workspace.upgrade) {
void this.trySetStatus(workspace.context, sessionRef.session, false)
void this.trySetStatus(workspace.context, sessionRef.session, false, workspace.workspaceId)
}
}
}, this.timeouts.reconnectTimeout)
@ -761,7 +792,7 @@ class TSessionManager implements SessionManager {
service: S,
ws: ConnectionSocket,
request: Request<any>,
workspace: string
workspace: string // wsId, toWorkspaceString()
): void {
const userCtx = requestCtx.newChild('📞 client', {
workspace: '🧲 ' + workspace
@ -778,16 +809,26 @@ class TSessionManager implements SessionManager {
const delta = Date.now() - request.time
userCtx.measure('receive msg', delta)
}
const wsRef = this.workspaces.get(workspace)
if (wsRef === undefined) {
await ws.send(
ctx,
{
id: request.id,
error: unknownError('No workspace')
},
service.binaryMode,
service.useCompression
)
return
}
if (request.method === 'forceClose') {
const wsRef = this.workspaces.get(workspace)
let done = false
if (wsRef !== undefined) {
if (wsRef.upgrade) {
done = true
console.log('FORCE CLOSE', workspace)
// In case of upgrade, we need to force close workspace not in interval handler
await this.forceClose(workspace, ws)
}
if (wsRef.upgrade) {
done = true
console.log('FORCE CLOSE', workspace)
// In case of upgrade, we need to force close workspace not in interval handler
await this.forceClose(workspace, ws)
}
const forceCloseResponse: Response<any> = {
id: request.id,
@ -840,7 +881,7 @@ class TSessionManager implements SessionManager {
},
ctx,
send: async (msg, target, exclude) => {
this.broadcast(service, getWorkspaceId(workspace), msg, target, exclude)
this.broadcast(service, wsRef.workspaceId, msg, target, exclude)
},
sendError: async (msg, error: Status) => {
await sendResponse(ctx, service, ws, {

View File

@ -234,9 +234,9 @@ export function startHttpServer (
try {
let buff: any | undefined
if (msg instanceof Buffer) {
buff = msg
buff = Buffer.copyBytesFrom(msg)
} else if (Array.isArray(msg)) {
buff = Buffer.concat(msg)
buff = Buffer.copyBytesFrom(Buffer.concat(msg))
}
if (buff !== undefined) {
doSessionOp(webSocketData, (s) => {
@ -347,24 +347,26 @@ function createWebsocketClientSocket (
},
data: () => data,
send: async (ctx: MeasureContext, msg, binary, compression) => {
if (ws.readyState !== ws.OPEN && !cs.isClosed) {
return 0
}
const smsg = serialize(msg, binary)
while (ws.bufferedAmount > 16 * 1024 && ws.readyState === ws.OPEN) {
await new Promise<void>((resolve) => {
setImmediate(resolve)
})
}
ctx.measure('send-data', smsg.length)
await new Promise<void>((resolve, reject) => {
ws.send(smsg, { binary: true, compress: compression }, (err) => {
if (err != null) {
reject(err)
const doSend = (): void => {
if (ws.readyState !== ws.OPEN && !cs.isClosed) {
return
}
resolve()
})
if (ws.bufferedAmount > 16 * 1024) {
setImmediate(doSend)
return
}
ws.send(smsg, { binary: true, compress: compression }, (err) => {
if (err != null) {
reject(err)
}
resolve()
})
}
doSend()
})
return smsg.length
}

View File

@ -62,7 +62,6 @@ export interface Session {
options?: FindOptions<T>
) => Promise<FindResult<T>>
tx: (ctx: ClientSessionCtx, tx: Tx) => Promise<void>
txRaw: (ctx: MeasureContext, tx: Tx) => Promise<void>
// Session restore information
sessionId: string