diff --git a/models/contact/src/index.ts b/models/contact/src/index.ts index 96733b1fc5..904be893ef 100644 --- a/models/contact/src/index.ts +++ b/models/contact/src/index.ts @@ -173,6 +173,7 @@ export class TEmployee extends TPerson implements Employee { export class TEmployeeAccount extends TAccount implements EmployeeAccount { employee!: Ref name!: string + mergedTo!: Ref } @Model(contact.class.Organizations, core.class.Space) diff --git a/models/server-contact/src/index.ts b/models/server-contact/src/index.ts index 75e5d2c794..9720813774 100644 --- a/models/server-contact/src/index.ts +++ b/models/server-contact/src/index.ts @@ -47,8 +47,11 @@ export function createModel (builder: Builder): void { trigger: serverContact.trigger.OnChannelUpdate }) - builder.createDoc(serverCore.class.AsyncTrigger, core.space.Model, { + builder.createDoc(serverCore.class.Trigger, core.space.Model, { trigger: serverContact.trigger.OnEmployeeUpdate, - classes: [contact.class.Employee] + txMatch: { + objectClass: contact.class.Employee, + _class: core.class.TxUpdateDoc + } }) } diff --git a/models/server-core/src/index.ts b/models/server-core/src/index.ts index 157a14afdb..7e40e2a6f7 100644 --- a/models/server-core/src/index.ts +++ b/models/server-core/src/index.ts @@ -14,30 +14,21 @@ // limitations under the License. // -import { Model, Builder } from '@hcengineering/model' -import type { Resource } from '@hcengineering/platform' +import { Builder, Model } from '@hcengineering/model' import { TClass, TDoc } from '@hcengineering/model-core' +import type { Resource } from '@hcengineering/platform' -import type { - AsyncTrigger, - ObjectDDParticipant, - Trigger, - TriggerFunc, - AsyncTriggerState, - AsyncTriggerFunc -} from '@hcengineering/server-core' import core, { Class, + DOMAIN_MODEL, Doc, DocumentQuery, - DOMAIN_DOC_INDEX_STATE, - DOMAIN_MODEL, FindOptions, FindResult, Hierarchy, - Ref, - TxCUD + Ref } from '@hcengineering/core' +import type { ObjectDDParticipant, Trigger, TriggerFunc } from '@hcengineering/server-core' import serverCore from '@hcengineering/server-core' @Model(serverCore.class.Trigger, core.class.Doc, DOMAIN_MODEL) @@ -45,18 +36,6 @@ export class TTrigger extends TDoc implements Trigger { trigger!: Resource } -@Model(serverCore.class.AsyncTrigger, core.class.Doc, DOMAIN_MODEL) -export class TAsyncTrigger extends TDoc implements AsyncTrigger { - trigger!: Resource - classes!: Ref>[] -} - -@Model(serverCore.class.AsyncTriggerState, core.class.Doc, DOMAIN_DOC_INDEX_STATE) -export class TAsyncTriggerState extends TDoc implements AsyncTriggerState { - tx!: TxCUD - message!: string -} - @Model(serverCore.mixin.ObjectDDParticipant, core.class.Class) export class TObjectDDParticipant extends TClass implements ObjectDDParticipant { collectDocs!: Resource< @@ -73,5 +52,5 @@ export class TObjectDDParticipant extends TClass implements ObjectDDParticipant } export function createModel (builder: Builder): void { - builder.createModel(TTrigger, TObjectDDParticipant, TAsyncTriggerState, TAsyncTrigger) + builder.createModel(TTrigger, TObjectDDParticipant) } diff --git a/models/server-openai/src/index.ts b/models/server-openai/src/index.ts index c5b0fea263..98e669543b 100644 --- a/models/server-openai/src/index.ts +++ b/models/server-openai/src/index.ts @@ -44,8 +44,11 @@ export class TOpenAIConfiguration extends TConfiguration implements OpenAIConfig export function createModel (builder: Builder): void { builder.createModel(TOpenAIConfiguration) - builder.createDoc(serverCore.class.AsyncTrigger, core.space.Model, { + builder.createDoc(serverCore.class.Trigger, core.space.Model, { trigger: openai.trigger.AsyncOnGPTRequest, - classes: [chunter.class.Comment, recruit.class.ApplicantMatch] + txMatch: { + objectClass: { $in: [chunter.class.Comment, recruit.class.ApplicantMatch] }, + _class: core.class.TxCreateDoc + } }) } diff --git a/packages/core/src/server.ts b/packages/core/src/server.ts index a475e2402e..180b2fc2f2 100644 --- a/packages/core/src/server.ts +++ b/packages/core/src/server.ts @@ -66,6 +66,6 @@ export interface ServerStorage extends LowLevelStorage { options?: FindOptions ) => Promise> tx: (ctx: MeasureContext, tx: Tx) => Promise<[TxResult, Tx[]]> - apply: (ctx: MeasureContext, tx: Tx[], broadcast: boolean, updateTx: boolean) => Promise + apply: (ctx: MeasureContext, tx: Tx[], broadcast: boolean) => Promise close: () => Promise } diff --git a/plugins/chunter-resources/src/components/CommentPresenter.svelte b/plugins/chunter-resources/src/components/CommentPresenter.svelte index 822afb5a21..a150908427 100644 --- a/plugins/chunter-resources/src/components/CommentPresenter.svelte +++ b/plugins/chunter-resources/src/components/CommentPresenter.svelte @@ -17,26 +17,28 @@ import { AttachmentDocList } from '@hcengineering/attachment-resources' import type { Comment } from '@hcengineering/chunter' import chunter from '@hcengineering/chunter' - import contact, { Employee, EmployeeAccount, getName } from '@hcengineering/contact' - import { Avatar, employeeByIdStore } from '@hcengineering/contact-resources' - import { Ref } from '@hcengineering/core' - import { getClient, MessageViewer } from '@hcengineering/presentation' + import { Employee, EmployeeAccount, getName } from '@hcengineering/contact' + import { Avatar, employeeAccountByIdStore, employeeByIdStore } from '@hcengineering/contact-resources' + import { IdMap, Ref } from '@hcengineering/core' + import { MessageViewer } from '@hcengineering/presentation' import { Icon, ShowMore, TimeSince } from '@hcengineering/ui' export let value: Comment export let inline: boolean = false export let disableClick = false - const client = getClient() - const cutId = (str: string): string => { return str.slice(0, 4) + '...' + str.slice(-4) } - async function getEmployee (value: Comment): Promise { - const acc = await client.findOne(contact.class.EmployeeAccount, { _id: value.modifiedBy as Ref }) + async function getEmployee ( + value: Comment, + employees: IdMap, + accounts: IdMap + ): Promise { + const acc = accounts.get(value.modifiedBy as Ref) if (acc !== undefined) { - const emp = $employeeByIdStore.get(acc.employee) + const emp = employees.get(acc.employee) return emp } } @@ -52,7 +54,7 @@  #{cutId(value._id.toString())} {:else}
- {#await getEmployee(value) then employee} + {#await getEmployee(value, $employeeByIdStore, $employeeAccountByIdStore) then employee}
diff --git a/plugins/chunter-resources/src/components/PinnedMessagesPopup.svelte b/plugins/chunter-resources/src/components/PinnedMessagesPopup.svelte index 09d090275d..9d88f4dbba 100644 --- a/plugins/chunter-resources/src/components/PinnedMessagesPopup.svelte +++ b/plugins/chunter-resources/src/components/PinnedMessagesPopup.svelte @@ -1,10 +1,9 @@ {#if account} diff --git a/plugins/contact-resources/src/components/SpaceMembers.svelte b/plugins/contact-resources/src/components/SpaceMembers.svelte index c3fd02d094..d30d479411 100644 --- a/plugins/contact-resources/src/components/SpaceMembers.svelte +++ b/plugins/contact-resources/src/components/SpaceMembers.svelte @@ -39,7 +39,12 @@ const query: DocumentQuery = isSearch > 0 ? { name: { $like: '%' + search + '%' } } : { _id: { $in: accounts as Ref[] } } const employess = await client.findAll(contact.class.EmployeeAccount, query) - members = new Set(employess.filter((p) => accounts.includes(p._id)).map((p) => p.employee)) + members = new Set( + employess + .filter((it) => it.mergedTo == null) + .filter((p) => accounts.includes(p._id)) + .map((p) => p.employee) + ) return await client.findAll( contact.class.Employee, { diff --git a/plugins/contact-resources/src/utils.ts b/plugins/contact-resources/src/utils.ts index ee7007574a..94a2b673f5 100644 --- a/plugins/contact-resources/src/utils.ts +++ b/plugins/contact-resources/src/utils.ts @@ -223,6 +223,9 @@ async function generateLocation (loc: Location, id: Ref): Promise>(new Map()) export const employeesStore = writable([]) + +export const employeeAccountByIdStore = writable>(new Map()) + export const channelProviders = writable([]) function fillStores (): void { @@ -234,6 +237,22 @@ function fillStores (): void { employeeByIdStore.set(toIdMap(res)) }) + const accountQ = createQuery(true) + accountQ.query(contact.class.EmployeeAccount, {}, (res) => { + const mergedEmployees = res.filter((it) => it.mergedTo !== undefined) + const activeEmployees = res.filter((it) => it.mergedTo === undefined) + const ids = toIdMap(activeEmployees) + for (const e of mergedEmployees) { + if (e.mergedTo !== undefined) { + const mergeTo = ids.get(e.mergedTo) + if (mergeTo !== undefined) { + ids.set(e._id, mergeTo) + } + } + } + employeeAccountByIdStore.set(ids) + }) + const providerQuery = createQuery(true) providerQuery.query(contact.class.ChannelProvider, {}, (res) => channelProviders.set(res)) } else { diff --git a/plugins/contact/src/index.ts b/plugins/contact/src/index.ts index 268018fef1..50b956042c 100644 --- a/plugins/contact/src/index.ts +++ b/plugins/contact/src/index.ts @@ -151,6 +151,7 @@ export interface Employee extends Person { export interface EmployeeAccount extends Account { employee: Ref name: string + mergedTo?: Ref } /** diff --git a/plugins/gmail-resources/src/components/Chats.svelte b/plugins/gmail-resources/src/components/Chats.svelte index 21fb832f21..f2a5864cde 100644 --- a/plugins/gmail-resources/src/components/Chats.svelte +++ b/plugins/gmail-resources/src/components/Chats.svelte @@ -14,9 +14,9 @@ // limitations under the License. --> {#if object !== undefined} @@ -298,8 +292,8 @@ - {#if messages && accounts} - + {#if messages} + {/if} diff --git a/plugins/tracker-resources/src/components/issues/edit/ControlPanel.svelte b/plugins/tracker-resources/src/components/issues/edit/ControlPanel.svelte index 642834db35..15a8377a2a 100644 --- a/plugins/tracker-resources/src/components/issues/edit/ControlPanel.svelte +++ b/plugins/tracker-resources/src/components/issues/edit/ControlPanel.svelte @@ -13,15 +13,15 @@ // limitations under the License. -->
diff --git a/plugins/workbench-resources/src/connect.ts b/plugins/workbench-resources/src/connect.ts index 0377dfa3c0..521c84c4b1 100644 --- a/plugins/workbench-resources/src/connect.ts +++ b/plugins/workbench-resources/src/connect.ts @@ -66,7 +66,7 @@ export async function connect (title: string): Promise { void (async () => { const newVersion = await _client?.findOne(core.class.Version, {}) - console.log('Reconnect Model version', version) + console.log('Reconnect Model version', newVersion) const currentVersionStr = versionToString(version as Version) const reconnectVersionStr = versionToString(newVersion as Version) diff --git a/server-plugins/contact-resources/src/index.ts b/server-plugins/contact-resources/src/index.ts index a3065203fe..c681c63665 100644 --- a/server-plugins/contact-resources/src/index.ts +++ b/server-plugins/contact-resources/src/index.ts @@ -41,7 +41,7 @@ import core, { } from '@hcengineering/core' import notification, { Collaborators } from '@hcengineering/notification' import { getMetadata } from '@hcengineering/platform' -import serverCore, { AsyncTriggerControl, TriggerControl } from '@hcengineering/server-core' +import serverCore, { TriggerControl } from '@hcengineering/server-core' import { workbenchId } from '@hcengineering/workbench' /** @@ -110,13 +110,13 @@ export async function OnContactDelete ( } async function updateAllRefs ( - control: AsyncTriggerControl, + control: TriggerControl, sourceAccount: EmployeeAccount, targetAccount: EmployeeAccount, modifiedOn: Timestamp, modifiedBy: Ref ): Promise { - const res: Tx[] = [] + console.log('merge employee:', sourceAccount.name, 'to', targetAccount.name) // Move all possible references to Account and Employee and replace to target one. const reftos = (await control.modelDb.findAll(core.class.Attribute, { 'type._class': core.class.RefTo })).filter( (it) => { @@ -133,6 +133,9 @@ async function updateAllRefs ( if (to.to === contact.class.Employee) { const descendants = control.hierarchy.getDescendants(attr.attributeOf) for (const d of descendants) { + if (control.hierarchy.isDerived(d, core.class.Tx)) { + continue + } if (control.hierarchy.findDomain(d) !== undefined) { while (true) { const values = await control.findAll(d, { [attr.name]: sourceAccount.employee }, { limit: 100 }) @@ -144,7 +147,10 @@ async function updateAllRefs ( for (const v of values) { await updateAttribute(builder, v, d, { key: attr.name, attr }, targetAccount.employee, targetAccount._id) } - await control.apply(builder.txes, true, true) + if (builder.txes.length > 0) { + console.log('merge employee:', sourceAccount.name, 'to', targetAccount.name, d, builder.txes.length) + await control.apply(builder.txes, false) + } } } } @@ -156,6 +162,9 @@ async function updateAllRefs ( ) { const descendants = control.hierarchy.getDescendants(attr.attributeOf) for (const d of descendants) { + if (control.hierarchy.isDerived(d, core.class.Tx)) { + continue + } if (control.hierarchy.findDomain(d) !== undefined) { while (true) { const values = await control.findAll(d, { [attr.name]: sourceAccount._id }, { limit: 100 }) @@ -166,7 +175,10 @@ async function updateAllRefs ( for (const v of values) { await updateAttribute(builder, v, d, { key: attr.name, attr }, targetAccount._id, targetAccount._id) } - await control.apply(builder.txes, true, true) + if (builder.txes.length > 0) { + console.log('merge employee:', sourceAccount.name, 'to', targetAccount.name, d, builder.txes.length) + await control.apply(builder.txes, false) + } } } } @@ -181,6 +193,9 @@ async function updateAllRefs ( if (to.to === contact.class.Employee) { const descendants = control.hierarchy.getDescendants(attr.attributeOf) for (const d of descendants) { + if (control.hierarchy.isDerived(d, core.class.Tx)) { + continue + } if (control.hierarchy.findDomain(d) !== undefined) { while (true) { const values = await control.findAll( @@ -195,7 +210,10 @@ async function updateAllRefs ( for (const v of values) { await updateAttribute(builder, v, d, { key: attr.name, attr }, targetAccount.employee, targetAccount._id) } - await control.apply(builder.txes, true, true) + if (builder.txes.length > 0) { + console.log('merge employee:', sourceAccount.name, 'to', targetAccount.name, d, builder.txes.length) + await control.apply(builder.txes, false) + } } } } @@ -207,6 +225,9 @@ async function updateAllRefs ( ) { const descendants = control.hierarchy.getDescendants(attr.attributeOf) for (const d of descendants) { + if (control.hierarchy.isDerived(d, core.class.Tx)) { + continue + } if (control.hierarchy.findDomain(d) !== undefined) { while (true) { const values = await control.findAll(d, { [attr.name]: sourceAccount._id }, { limit: 100 }) @@ -217,27 +238,30 @@ async function updateAllRefs ( for (const v of values) { await updateAttribute(builder, v, d, { key: attr.name, attr }, targetAccount._id, targetAccount._id) } - await control.apply(builder.txes, true, true) + if (builder.txes.length > 0) { + console.log('merge employee:', sourceAccount.name, 'to', targetAccount.name, d, builder.txes.length) + await control.apply(builder.txes, false) + } } } } } } const employee = (await control.findAll(contact.class.Employee, { _id: sourceAccount.employee })).shift() + const builder = new TxBuilder(control.hierarchy, control.modelDb, modifiedBy) - await builder.remove(sourceAccount) if (employee !== undefined) { await builder.remove(employee) } - await control.apply(builder.txes, true, true) + await builder.update(sourceAccount, { mergedTo: targetAccount._id }) + await control.apply(builder.txes, true) - return res + return [] } -async function mergeEmployee (control: AsyncTriggerControl, uTx: TxUpdateDoc): Promise { +async function mergeEmployee (control: TriggerControl, uTx: TxUpdateDoc): Promise { if (uTx.operations.mergedTo === undefined) return [] const target = uTx.operations.mergedTo - const res: Tx[] = [] const attributes = control.hierarchy.getAllAttributes(contact.class.Employee) @@ -245,19 +269,26 @@ async function mergeEmployee (control: AsyncTriggerControl, uTx: TxUpdateDoc - const allAttached = await control.findAll(collection.of, { attachedTo: uTx.objectId }) - for (const attached of allAttached) { - const tx = control.txFactory.createTxUpdateDoc(attached._class, attached.space, attached._id, { - attachedTo: target - }) - const parent = control.txFactory.createTxCollectionCUD( - attached.attachedToClass, - target, - attached.space, - attached.collection, - tx - ) - res.push(parent) + const res: Tx[] = [] + while (true) { + const allAttached = await control.findAll(collection.of, { attachedTo: uTx.objectId }, { limit: 100 }) + if (allAttached.length === 0) { + break + } + for (const attached of allAttached) { + const tx = control.txFactory.createTxUpdateDoc(attached._class, attached.space, attached._id, { + attachedTo: target + }) + const parent = control.txFactory.createTxCollectionCUD( + attached.attachedToClass, + target, + attached.space, + attached.collection, + tx + ) + res.push(parent) + } + await control.apply(res, false) } } } @@ -267,32 +298,18 @@ async function mergeEmployee (control: AsyncTriggerControl, uTx: TxUpdateDoc { - if (tx._class !== core.class.TxUpdateDoc) { - return [] - } - +export async function OnEmployeeUpdate (tx: Tx, control: TriggerControl): Promise { const uTx = tx as TxUpdateDoc - if (!control.hierarchy.isDerived(uTx.objectClass, contact.class.Employee)) { - return [] - } - const result: Tx[] = [] const txes = await mergeEmployee(control, uTx) diff --git a/server-plugins/contact/src/index.ts b/server-plugins/contact/src/index.ts index bab205c25f..f8929a8380 100644 --- a/server-plugins/contact/src/index.ts +++ b/server-plugins/contact/src/index.ts @@ -14,9 +14,9 @@ // limitations under the License. // -import type { Resource, Plugin } from '@hcengineering/platform' +import type { Plugin, Resource } from '@hcengineering/platform' import { plugin } from '@hcengineering/platform' -import type { AsyncTriggerFunc, TriggerFunc } from '@hcengineering/server-core' +import type { TriggerFunc } from '@hcengineering/server-core' import { Presenter } from '@hcengineering/server-notification' /** @@ -31,7 +31,7 @@ export default plugin(serverContactId, { trigger: { OnContactDelete: '' as Resource, OnChannelUpdate: '' as Resource, - OnEmployeeUpdate: '' as Resource + OnEmployeeUpdate: '' as Resource }, function: { PersonHTMLPresenter: '' as Resource, diff --git a/server-plugins/openai/src/plugin.ts b/server-plugins/openai/src/plugin.ts index c92715a1a3..a95ddf305d 100644 --- a/server-plugins/openai/src/plugin.ts +++ b/server-plugins/openai/src/plugin.ts @@ -17,7 +17,7 @@ import type { Plugin, Resource } from '@hcengineering/platform' import { plugin } from '@hcengineering/platform' import type { Account, Class, Ref } from '@hcengineering/core' -import { AsyncTriggerFunc } from '@hcengineering/server-core' +import { TriggerFunc } from '@hcengineering/server-core' import type { OpenAIConfiguration } from './types' export * from './types' @@ -31,7 +31,7 @@ export const openAIId = 'openai' as Plugin */ const openaiPlugin = plugin(openAIId, { trigger: { - AsyncOnGPTRequest: '' as Resource + AsyncOnGPTRequest: '' as Resource }, class: { OpenAIConfiguration: '' as Ref> diff --git a/server-plugins/openai/src/resources.ts b/server-plugins/openai/src/resources.ts index 3cb4b79996..68d0cdd6ed 100644 --- a/server-plugins/openai/src/resources.ts +++ b/server-plugins/openai/src/resources.ts @@ -27,7 +27,7 @@ import core, { TxProcessor } from '@hcengineering/core' import recruit, { ApplicantMatch } from '@hcengineering/recruit' -import type { AsyncTriggerControl } from '@hcengineering/server-core' +import type { TriggerControl } from '@hcengineering/server-core' import got from 'got' import { convert } from 'html-to-text' import { chunks } from './encoder/encoder' @@ -111,7 +111,7 @@ async function performCompletion ( /** * @public */ -export async function AsyncOnGPTRequest (tx: Tx, tc: AsyncTriggerControl): Promise { +export async function AsyncOnGPTRequest (tx: Tx, tc: TriggerControl): Promise { const actualTx = TxProcessor.extractTx(tx) if (tc.hierarchy.isDerived(actualTx._class, core.class.TxCUD) && actualTx.modifiedBy !== openai.account.GPT) { @@ -127,7 +127,7 @@ export async function AsyncOnGPTRequest (tx: Tx, tc: AsyncTriggerControl): Promi return [] } -async function handleComment (tx: Tx, tc: AsyncTriggerControl): Promise { +async function handleComment (tx: Tx, tc: TriggerControl): Promise { const actualTx = TxProcessor.extractTx(tx) const cud: TxCUD = actualTx as TxCUD @@ -269,7 +269,7 @@ async function summarizeVacancy (config: OpenAIConfiguration, chunks: string[], return getText(await performCompletion(candidateSummaryRequest, options, config, maxLen)) ?? chunks[0] } -async function handleApplicantMatch (tx: Tx, tc: AsyncTriggerControl): Promise { +async function handleApplicantMatch (tx: Tx, tc: TriggerControl): Promise { const [config] = await tc.findAll(openai.class.OpenAIConfiguration, {}) if (!(config?.enabled ?? false)) { diff --git a/server/backup/src/index.ts b/server/backup/src/index.ts index f0b6293a59..18cb64d682 100644 --- a/server/backup/src/index.ts +++ b/server/backup/src/index.ts @@ -500,18 +500,27 @@ export async function restore ( let idx: number | undefined let loaded = 0 let last = 0 + let el = 0 + let chunks = 0 while (true) { + const st = Date.now() const it = await connection.loadChunk(c, idx) + chunks++ + idx = it.idx + el += Date.now() - st for (const [_id, hash] of Object.entries(it.docs)) { serverChangeset.set(_id as Ref, hash) loaded++ } + const mr = Math.round(loaded / 10000) if (mr !== last) { last = mr - console.log(' loaded', loaded) + console.log(' loaded from server', loaded, el, chunks) + el = 0 + chunks = 0 } if (it.finished) { break @@ -649,7 +658,10 @@ export async function restore ( await sendChunk(undefined, 0) if (docsToRemove.length > 0 && merge !== true) { console.log('cleanup', docsToRemove.length) - await connection.clean(c, docsToRemove) + while (docsToRemove.length > 0) { + const part = docsToRemove.splice(0, 10000) + await connection.clean(c, part) + } } } } finally { diff --git a/server/core/src/plugin.ts b/server/core/src/plugin.ts index a0db3b361d..ee786e04a2 100644 --- a/server/core/src/plugin.ts +++ b/server/core/src/plugin.ts @@ -18,7 +18,7 @@ import type { Metadata, Plugin } from '@hcengineering/platform' import { plugin } from '@hcengineering/platform' import type { Class, Ref, Space } from '@hcengineering/core' -import type { AsyncTrigger, AsyncTriggerState, ObjectDDParticipant, Trigger } from './types' +import type { ObjectDDParticipant, Trigger } from './types' /** * @public @@ -30,9 +30,7 @@ export const serverCoreId = 'server-core' as Plugin */ const serverCore = plugin(serverCoreId, { class: { - Trigger: '' as Ref>, - AsyncTrigger: '' as Ref>, - AsyncTriggerState: '' as Ref> + Trigger: '' as Ref> }, mixin: { ObjectDDParticipant: '' as Ref diff --git a/server/core/src/processor/index.ts b/server/core/src/processor/index.ts deleted file mode 100644 index 9acc702fb6..0000000000 --- a/server/core/src/processor/index.ts +++ /dev/null @@ -1,133 +0,0 @@ -import core, { - Class, - Doc, - Hierarchy, - MeasureContext, - ModelDb, - Ref, - ServerStorage, - Tx, - TxCUD, - TxFactory, - TxProcessor -} from '@hcengineering/core' -import { getResource } from '@hcengineering/platform' -import plugin from '../plugin' -import { AsyncTrigger, AsyncTriggerControl, AsyncTriggerFunc } from '../types' -/** - * @public - */ -export class AsyncTriggerProcessor { - canceling: boolean = false - - processing: Promise | undefined - - triggers: AsyncTrigger[] = [] - - classes: Ref>[] = [] - - factory = new TxFactory(core.account.System, true) - - functions: AsyncTriggerFunc[] = [] - - trigger = (): void => {} - - control: AsyncTriggerControl - - constructor ( - readonly model: ModelDb, - readonly hierarchy: Hierarchy, - readonly storage: ServerStorage, - readonly metrics: MeasureContext - ) { - this.control = { - hierarchy: this.hierarchy, - modelDb: this.model, - txFactory: this.factory, - findAll: async (_class, query, options) => { - return await this.storage.findAll(this.metrics, _class, query, options) - }, - apply: async (tx: Tx[], broadcast: boolean, updateTx: boolean): Promise => { - await this.storage.apply(this.metrics, tx, broadcast, updateTx) - } - } - } - - async cancel (): Promise { - this.canceling = true - await this.processing - } - - async start (): Promise { - await this.updateTriggers() - this.processing = this.doProcessing() - } - - async updateTriggers (): Promise { - try { - this.triggers = await this.model.findAll(plugin.class.AsyncTrigger, {}) - this.classes = this.triggers.reduce>[]>((arr, it) => arr.concat(it.classes), []) - this.functions = await Promise.all(this.triggers.map(async (trigger) => await getResource(trigger.trigger))) - } catch (err: any) { - console.error(err) - } - } - - async tx (tx: Tx[]): Promise { - const result: Tx[] = [] - for (const _tx of tx) { - const actualTx = TxProcessor.extractTx(_tx) - if ( - this.hierarchy.isDerived(actualTx._class, core.class.TxCUD) && - this.hierarchy.isDerived(_tx._class, core.class.TxCUD) - ) { - const cud = actualTx as TxCUD - if (this.classes.some((it) => this.hierarchy.isDerived(cud.objectClass, it))) { - // We need processing - result.push( - this.factory.createTxCreateDoc(plugin.class.AsyncTriggerState, plugin.space.TriggerState, { - tx: _tx as TxCUD, - message: 'Processing...' - }) - ) - } - } - } - if (result.length > 0) { - await this.storage.apply(this.metrics, result, false, false) - this.processing = this.doProcessing() - } - } - - private async doProcessing (): Promise { - while (!this.canceling) { - const docs = await this.storage.findAll(this.metrics, plugin.class.AsyncTriggerState, {}, { limit: 10 }) - if (docs.length === 0) { - return - } - - for (const doc of docs) { - const result: Tx[] = [] - if (this.canceling) { - break - } - - try { - for (const f of this.functions) { - result.push(...(await f(doc.tx, this.control))) - } - } catch (err: any) { - console.error(err) - } - await this.storage.apply( - this.metrics, - [this.factory.createTxRemoveDoc(doc._class, doc.space, doc._id)], - false, - false - ) - - await this.storage.apply(this.metrics, result, true, false) - } - } - } -} diff --git a/server/core/src/storage.ts b/server/core/src/storage.ts index 69856b3901..0ca412b05a 100644 --- a/server/core/src/storage.ts +++ b/server/core/src/storage.ts @@ -58,7 +58,6 @@ import { FullTextIndex } from './fulltext' import { FullTextIndexPipeline } from './indexer' import { FullTextPipelineStage } from './indexer/types' import serverCore from './plugin' -import { AsyncTriggerProcessor } from './processor' import { Triggers } from './triggers' import type { ContentAdapterFactory, @@ -68,7 +67,7 @@ import type { ObjectDDParticipant, TriggerControl } from './types' -import { createCacheFindAll } from './utils' +import { createFindAll } from './utils' /** * @public @@ -104,7 +103,6 @@ export interface DbConfiguration { class TServerStorage implements ServerStorage { private readonly fulltext: FullTextIndex hierarchy: Hierarchy - triggerProcessor: AsyncTriggerProcessor scopes = new Map>() @@ -124,15 +122,11 @@ class TServerStorage implements ServerStorage { ) { this.hierarchy = hierarchy this.fulltext = indexFactory(this) - this.triggerProcessor = new AsyncTriggerProcessor(modelDb, hierarchy, this, metrics.newChild('triggers', {})) - void this.triggerProcessor.start() } async close (): Promise { console.timeLog(this.workspace.name, 'closing') await this.fulltext.close() - console.timeLog(this.workspace.name, 'closing triggers') - await this.triggerProcessor.cancel() console.timeLog(this.workspace.name, 'closing adapters') for (const o of this.adapters.values()) { await o.close() @@ -525,13 +519,15 @@ class TServerStorage implements ServerStorage { }, findAll: fAll(ctx), modelDb: this.modelDb, - hierarchy: this.hierarchy + hierarchy: this.hierarchy, + apply: async (tx, broadcast) => { + await this.apply(ctx, tx, broadcast) + } } const triggers = await ctx.with('process-triggers', {}, async (ctx) => { const result: Tx[] = [] for (const tx of txes) { result.push(...(await this.triggers.apply(tx.modifiedBy, tx, triggerControl))) - await ctx.with('async-triggers', {}, (ctx) => this.triggerProcessor.tx([tx])) } return result }) @@ -597,26 +593,14 @@ class TServerStorage implements ServerStorage { return { passed, onEnd } } - async apply (ctx: MeasureContext, tx: Tx[], broadcast: boolean, updateTx: boolean): Promise { + async apply (ctx: MeasureContext, tx: Tx[], broadcast: boolean): Promise { const triggerFx = new Effects() - const cacheFind = createCacheFindAll(this) + const _findAll = createFindAll(this) const txToStore = tx.filter( (it) => it.space !== core.space.DerivedTx && !this.hierarchy.isDerived(it._class, core.class.TxApplyIf) ) - if (updateTx) { - const ops = new Map( - tx - .filter((it) => it._class === core.class.TxUpdateDoc) - .map((it) => [(it as TxUpdateDoc).objectId, (it as TxUpdateDoc).operations]) - ) - - if (ops.size > 0) { - await ctx.with('domain-tx-update', {}, async () => await this.getAdapter(DOMAIN_TX).update(DOMAIN_TX, ops)) - } - } else { - await ctx.with('domain-tx', {}, async () => await this.getAdapter(DOMAIN_TX).tx(...txToStore)) - } + await ctx.with('domain-tx', {}, async () => await this.getAdapter(DOMAIN_TX).tx(...txToStore)) const removedMap = new Map, Doc>() await ctx.with('apply', {}, (ctx) => this.routeTx(ctx, removedMap, ...tx)) @@ -626,7 +610,7 @@ class TServerStorage implements ServerStorage { this.options?.broadcast?.(tx) } // invoke triggers and store derived objects - const derived = await this.processDerived(ctx, tx, triggerFx, cacheFind, removedMap) + const derived = await this.processDerived(ctx, tx, triggerFx, _findAll, removedMap) // index object for (const _tx of tx) { @@ -641,13 +625,16 @@ class TServerStorage implements ServerStorage { for (const fx of triggerFx.effects) { await fx() } + if (broadcast && derived.length > 0) { + this.options?.broadcast?.(derived) + } return [...tx, ...derived] } async tx (ctx: MeasureContext, tx: Tx): Promise<[TxResult, Tx[]]> { // store tx const _class = txClass(tx) - const cacheFind = createCacheFindAll(this) + const _findAll = createFindAll(this) const objClass = txObjectClass(tx) const removedDocs = new Map, Doc>() return await ctx.with('tx', { _class, objClass }, async (ctx) => { @@ -673,7 +660,7 @@ class TServerStorage implements ServerStorage { const applyIf = tx as TxApplyIf // Wait for scope promise if found let passed: boolean - ;({ passed, onEnd } = await this.verifyApplyIf(ctx, applyIf, cacheFind)) + ;({ passed, onEnd } = await this.verifyApplyIf(ctx, applyIf, _findAll)) result = passed if (passed) { // Store apply if transaction's if required @@ -681,13 +668,13 @@ class TServerStorage implements ServerStorage { const atx = await this.getAdapter(DOMAIN_TX) await atx.tx(...applyIf.txes) }) - derived = await this.processDerivedTxes(applyIf.txes, ctx, triggerFx, cacheFind, removedDocs) + derived = await this.processDerivedTxes(applyIf.txes, ctx, triggerFx, _findAll, removedDocs) } } else { // store object result = await ctx.with('route-tx', { _class, objClass }, (ctx) => this.routeTx(ctx, removedDocs, tx)) // invoke triggers and store derived objects - derived = await this.processDerived(ctx, [tx], triggerFx, cacheFind, removedDocs) + derived = await this.processDerived(ctx, [tx], triggerFx, _findAll, removedDocs) } // index object diff --git a/server/core/src/triggers.ts b/server/core/src/triggers.ts index 55801a2a1a..d4cb4e650e 100644 --- a/server/core/src/triggers.ts +++ b/server/core/src/triggers.ts @@ -14,8 +14,18 @@ // limitations under the License. // -import type { Tx, Doc, TxCreateDoc, Ref, Account, TxCollectionCUD, AttachedDoc } from '@hcengineering/core' -import core, { TxFactory } from '@hcengineering/core' +import core, { + Tx, + Doc, + TxCreateDoc, + Ref, + Account, + TxCollectionCUD, + AttachedDoc, + DocumentQuery, + matchQuery, + TxFactory +} from '@hcengineering/core' import { getResource } from '@hcengineering/platform' import type { Trigger, TriggerFunc, TriggerControl } from './types' @@ -26,7 +36,7 @@ import serverCore from './plugin' * @public */ export class Triggers { - private readonly triggers: TriggerFunc[] = [] + private readonly triggers: [DocumentQuery | undefined, TriggerFunc][] = [] async tx (tx: Tx): Promise { if (tx._class === core.class.TxCollectionCUD) { @@ -36,14 +46,18 @@ export class Triggers { const createTx = tx as TxCreateDoc if (createTx.objectClass === serverCore.class.Trigger) { const trigger = (createTx as TxCreateDoc).attributes.trigger + const match = (createTx as TxCreateDoc).attributes.txMatch const func = await getResource(trigger) - this.triggers.push(func) + this.triggers.push([match, func]) } } } async apply (account: Ref, tx: Tx, ctrl: Omit): Promise { - const derived = this.triggers.map((trigger) => trigger(tx, { ...ctrl, txFactory: new TxFactory(account, true) })) + const control = { ...ctrl, txFactory: new TxFactory(account, true) } + const derived = this.triggers + .filter(([query]) => query === undefined || matchQuery([tx], query, core.class.Tx, control.hierarchy).length > 0) + .map(([, trigger]) => trigger(tx, control)) const result = await Promise.all(derived) return result.flatMap((x) => x) } diff --git a/server/core/src/types.ts b/server/core/src/types.ts index e369ca0421..73f0bd21ca 100644 --- a/server/core/src/types.ts +++ b/server/core/src/types.ts @@ -32,7 +32,6 @@ import { Storage, Timestamp, Tx, - TxCUD, TxFactory, TxResult, WorkspaceId @@ -104,6 +103,9 @@ export interface TriggerControl { // Later can be replaced with generic one with bucket encapsulated inside. storageFx: (f: (adapter: MinioService, workspaceId: WorkspaceId) => Promise) => void fx: (f: () => Promise) => void + + // Bulk operations in case trigger require some + apply: (tx: Tx[], broadcast: boolean) => Promise } /** @@ -111,42 +113,14 @@ export interface TriggerControl { */ export type TriggerFunc = (tx: Tx, ctrl: TriggerControl) => Promise -/** - * @public - */ -export interface AsyncTriggerControl { - txFactory: TxFactory - findAll: Storage['findAll'] - apply: (tx: Tx[], broadcast: boolean, updateTx: boolean) => Promise - hierarchy: Hierarchy - modelDb: ModelDb -} -/** - * @public - */ -export type AsyncTriggerFunc = (tx: Tx, ctrl: AsyncTriggerControl) => Promise - /** * @public */ export interface Trigger extends Doc { trigger: Resource -} -/** - * @public - */ -export interface AsyncTrigger extends Doc { - trigger: Resource - classes: Ref>[] -} - -/** - * @public - */ -export interface AsyncTriggerState extends Doc { - tx: TxCUD - message: string + // We should match transaction + txMatch?: DocumentQuery } /** diff --git a/server/core/src/utils.ts b/server/core/src/utils.ts index 967473abb8..8977cf5b0a 100644 --- a/server/core/src/utils.ts +++ b/server/core/src/utils.ts @@ -12,23 +12,13 @@ import { /** * @public */ -export function createCacheFindAll (storage: ServerStorage): ServerStorage['findAll'] { - // We will cache all queries for same objects for all derived data checks. - const queryCache = new Map>() - +export function createFindAll (storage: ServerStorage): ServerStorage['findAll'] { return async ( ctx: MeasureContext, clazz: Ref>, query: DocumentQuery, options?: FindOptions ): Promise> => { - const key = JSON.stringify(clazz) + JSON.stringify(query) + JSON.stringify(options) - let cacheResult = queryCache.get(key) - if (cacheResult !== undefined) { - return cacheResult as FindResult - } - cacheResult = await storage.findAll(ctx, clazz, query, options) - queryCache.set(key, cacheResult) - return storage.hierarchy.clone(cacheResult) as FindResult + return await storage.findAll(ctx, clazz, query, options) } } diff --git a/server/mongo/src/__tests__/minmodel.ts b/server/mongo/src/__tests__/minmodel.ts index de1461d332..8c9fa4ecd7 100644 --- a/server/mongo/src/__tests__/minmodel.ts +++ b/server/mongo/src/__tests__/minmodel.ts @@ -33,7 +33,6 @@ import core, { } from '@hcengineering/core' import type { IntlString, Plugin } from '@hcengineering/platform' import { plugin } from '@hcengineering/platform' -import server from '@hcengineering/server-core' export const txFactory = new TxFactory(core.account.System) @@ -126,22 +125,7 @@ export function genMinModel (): TxCUD[] { domain: DOMAIN_DOC_INDEX_STATE }) ) - txes.push( - createClass(server.class.AsyncTrigger, { - label: 'AsyncTrigger' as IntlString, - extends: core.class.Doc, - kind: ClassifierKind.CLASS, - domain: DOMAIN_MODEL - }) - ) - txes.push( - createClass(server.class.AsyncTriggerState, { - label: 'AsyncTriggerState' as IntlString, - extends: core.class.Doc, - kind: ClassifierKind.CLASS, - domain: DOMAIN_DOC_INDEX_STATE - }) - ) + txes.push( createClass(core.class.Account, { label: 'Account' as IntlString, diff --git a/server/mongo/src/storage.ts b/server/mongo/src/storage.ts index 1275eb8bdd..8be3f2f75d 100644 --- a/server/mongo/src/storage.ts +++ b/server/mongo/src/storage.ts @@ -542,7 +542,7 @@ abstract class MongoAdapterBase implements DbAdapter { find (domain: Domain): StorageIterator { const coll = this.db.collection(domain) - const iterator = coll.find({}, {}) + const iterator = coll.find({}, {}).batchSize(100) return { next: async () => { diff --git a/server/server/src/backup.ts b/server/server/src/backup.ts index d7e39a5584..12475a121a 100644 --- a/server/server/src/backup.ts +++ b/server/server/src/backup.ts @@ -65,7 +65,7 @@ export class BackupClientSession extends ClientSession implements BackupSession break } - size = size + doc.id.length + doc.hash.length + doc.size + size = size + doc.size docs[doc.id] = doc.hash } diff --git a/server/ws/src/server.ts b/server/ws/src/server.ts index 9435f6e841..249c1147c1 100644 --- a/server/ws/src/server.ts +++ b/server/ws/src/server.ts @@ -355,7 +355,7 @@ async function handleRequest ( } }, 30000) - const result = await f.apply(service, params) + let result = await f.apply(service, params) clearTimeout(timeout) clearTimeout(hangTimeout) const resp: Response = { id: request.id, result } @@ -373,7 +373,11 @@ async function handleRequest ( diff ) } - ws.send(serialize(resp)) + const toSend = serialize(resp) + // Clear for gc to make work + resp.result = undefined + result = undefined + ws.send(toSend) } catch (err: any) { if (LOGGING_ENABLED) console.error(err) clearTimeout(timeout) @@ -427,7 +431,7 @@ export function start ( }) const session = await sessions.addSession(ctx, ws, token, pipelineFactory, productId, sessionId) // eslint-disable-next-line @typescript-eslint/no-misused-promises - ws.on('message', async (msg: RawData) => { + ws.on('message', (msg: RawData) => { let msgStr = '' if (typeof msg === 'string') { msgStr = msg @@ -436,7 +440,7 @@ export function start ( } else if (Array.isArray(msg)) { msgStr = Buffer.concat(msg).toString() } - await handleRequest(ctx, session, ws, msgStr, token.workspace.name) + void handleRequest(ctx, session, ws, msgStr, token.workspace.name) }) // eslint-disable-next-line @typescript-eslint/no-misused-promises ws.on('close', (code: number, reason: Buffer) => {