diff --git a/packages/core/src/measurements/context.ts b/packages/core/src/measurements/context.ts index 25595c0889..f6e3bddc98 100644 --- a/packages/core/src/measurements/context.ts +++ b/packages/core/src/measurements/context.ts @@ -57,6 +57,7 @@ export class MeasureMetricsContext implements MeasureContext { private readonly fullParams: FullParamsType | (() => FullParamsType) = {} logger: MeasureLogger metrics: Metrics + id?: string st = Date.now() contextData: object = {} @@ -125,6 +126,8 @@ export class MeasureMetricsContext implements MeasureContext { this, this.logParams ) + result.id = this.id + result.onEnd = this.onEnd.bind(this) result.contextData = this.contextData return result } @@ -197,6 +200,8 @@ export class MeasureMetricsContext implements MeasureContext { end (): void { this.done() } + + async onEnd (ctx: MeasureContext): Promise {} } /** diff --git a/packages/core/src/measurements/types.ts b/packages/core/src/measurements/types.ts index 85c7cceed3..7ea9339a54 100644 --- a/packages/core/src/measurements/types.ts +++ b/packages/core/src/measurements/types.ts @@ -111,4 +111,6 @@ export interface MeasureContext { // Mark current context as complete // If no value is passed, time difference will be used. end: (value?: number) => void + + onEnd?: (ctx: MeasureContext) => Promise } diff --git a/server/core/src/triggers.ts b/server/core/src/triggers.ts index 4caeafeb2a..7ed316b42b 100644 --- a/server/core/src/triggers.ts +++ b/server/core/src/triggers.ts @@ -17,6 +17,7 @@ import core, { TxFactory, TxProcessor, + generateId, groupByArray, matchQuery, type Class, @@ -149,8 +150,14 @@ export class Triggers { trigger.resource, {}, async (ctx) => { + if (mode === 'async') { + ctx.id = generateId() + } const tresult = await this.applyTrigger(ctx, ctrl, matches, { trigger, arrays }) result.push(...tresult) + if (ctx.onEnd !== undefined && mode === 'async') { + await ctx.onEnd(ctx) + } }, { count: matches.length, arrays } ) diff --git a/server/core/src/types.ts b/server/core/src/types.ts index 98cda36589..e570de7e97 100644 --- a/server/core/src/types.ts +++ b/server/core/src/types.ts @@ -160,8 +160,6 @@ export interface PipelineContext { head?: Middleware broadcastEvent?: (ctx: MeasureContext, tx: Tx[]) => Promise - - endContext?: (ctx: MeasureContext) => Promise } /** * @public diff --git a/server/middleware/src/connectionMgr.ts b/server/middleware/src/connectionMgr.ts index fc6665dc75..6abb509ba5 100644 --- a/server/middleware/src/connectionMgr.ts +++ b/server/middleware/src/connectionMgr.ts @@ -23,9 +23,12 @@ export class ConnectionMgrMiddleware extends BaseMiddleware implements Middlewar if (ctx.id === undefined) { ctx.id = generateId() } + ctx.onEnd = async (_ctx: MeasureContext) => { + await this.context.adapterManager?.closeContext?.(_ctx) + } const result = await this.provideTx(ctx, tx) - this.context.endContext = async (_ctx: MeasureContext) => { - await this.context.adapterManager?.closeContext?.(ctx) + if (ctx.onEnd !== undefined) { + await ctx.onEnd(ctx) } return result } diff --git a/server/middleware/src/triggers.ts b/server/middleware/src/triggers.ts index 7a64d6a37f..6ecce4c76f 100644 --- a/server/middleware/src/triggers.ts +++ b/server/middleware/src/triggers.ts @@ -33,6 +33,7 @@ import core, { type TxRemoveDoc, type TxUpdateDoc, addOperation, + generateId, toFindResult, withContext } from '@hcengineering/core' @@ -225,11 +226,16 @@ export class TriggersMiddleware extends BaseMiddleware implements Middleware { ) if (aresult.length > 0) { - await this.processDerivedTxes(ctx, aresult) - // We need to send all to recipients - await this.context.head?.handleBroadcast(ctx) + await ctx.with('process-aync-result', {}, async (ctx) => { + ctx.id = generateId() + await this.processDerivedTxes(ctx, aresult) + // We need to send all to recipients + await this.context.head?.handleBroadcast(ctx) + if (ctx.onEnd !== undefined) { + await ctx.onEnd(ctx) + } + }) } - await this.context.endContext?.(ctx) } private async processDerivedTxes (ctx: MeasureContext, derived: Tx[]): Promise { diff --git a/server/postgres/src/storage.ts b/server/postgres/src/storage.ts index 255a7cd668..89935257ed 100644 --- a/server/postgres/src/storage.ts +++ b/server/postgres/src/storage.ts @@ -79,7 +79,6 @@ import { isDataField, isOwner, type JoinProps, - Mutex, parseDoc, parseDocWithProjection, parseUpdate, @@ -90,16 +89,36 @@ import { abstract class PostgresAdapterBase implements DbAdapter { protected readonly _helper: DBCollectionHelper protected readonly tableFields = new Map() - protected readonly mutex = new Mutex() - protected readonly connections = new Map() + protected readonly connections = new Map>() protected readonly retryTxn = async ( - connection: postgres.ReservedSql, + client: postgres.ReservedSql, fn: (client: postgres.ReservedSql) => Promise ): Promise => { - await this.mutex.runExclusive(async () => { - await this.processOps(connection, fn) - }) + const backoffInterval = 100 // millis + const maxTries = 5 + let tries = 0 + + while (true) { + await client.unsafe('BEGIN;') + tries++ + + try { + const result = await fn(client) + await client.unsafe('COMMIT;') + return result + } catch (err: any) { + await client.unsafe('ROLLBACK;') + + if (err.code !== '40001' || tries === maxTries) { + throw err + } else { + console.log('Transaction failed. Retrying.') + console.log(err.message) + await new Promise((resolve) => setTimeout(resolve, tries * backoffInterval)) + } + } + } } constructor ( @@ -133,7 +152,11 @@ abstract class PostgresAdapterBase implements DbAdapter { if (ctx.id === undefined) return const conn = this.connections.get(ctx.id) if (conn !== undefined) { - conn.release() + if (conn instanceof Promise) { + ;(await conn).release() + } else { + conn.release() + } this.connections.delete(ctx.id) } } @@ -141,40 +164,10 @@ abstract class PostgresAdapterBase implements DbAdapter { protected async getConnection (ctx: MeasureContext): Promise { if (ctx.id === undefined) return const conn = this.connections.get(ctx.id) - if (conn !== undefined) return conn - const client = await this.client.reserve() + if (conn !== undefined) return await conn + const client = this.client.reserve() this.connections.set(ctx.id, client) - return client - } - - private async processOps ( - client: postgres.ReservedSql, - operation: (client: postgres.ReservedSql) => Promise - ): Promise { - const backoffInterval = 100 // millis - const maxTries = 5 - let tries = 0 - - while (true) { - await client.unsafe('BEGIN;') - tries++ - - try { - const result = await operation(client) - await client.unsafe('COMMIT;') - return result - } catch (err: any) { - await client.unsafe('ROLLBACK;') - - if (err.code !== '40001' || tries === maxTries) { - throw err - } else { - console.log('Transaction failed. Retrying.') - console.log(err.message) - await new Promise((resolve) => setTimeout(resolve, tries * backoffInterval)) - } - } - } + return await client } async traverse( @@ -239,9 +232,13 @@ abstract class PostgresAdapterBase implements DbAdapter { abstract init (): Promise async close (): Promise { - this.connections.forEach((c) => { - c.release() - }) + for (const c of this.connections.values()) { + if (c instanceof Promise) { + ;(await c).release() + } else { + c.release() + } + } this.refClient.close() } @@ -1495,7 +1492,7 @@ class PostgresAdapter extends PostgresAdapterBase { return { object } } } catch (err) { - console.error(err) + console.error(err, { tx, params, updates }) } }) return {} diff --git a/server/postgres/src/utils.ts b/server/postgres/src/utils.ts index 791fba465e..fc0b04a9d6 100644 --- a/server/postgres/src/utils.ts +++ b/server/postgres/src/utils.ts @@ -241,6 +241,10 @@ export function inferType (val: any): string { if (typeof val === 'boolean') { return '::boolean' } + if (Array.isArray(val)) { + const type = inferType(val[0]) + return type + '[]' + } return '' } @@ -409,38 +413,3 @@ export interface JoinProps { toClass: Ref> classes?: Ref>[] // filter by classes } - -export class Mutex { - private locked: boolean = false - private readonly waitingQueue: Array<(value: boolean) => void> = [] - - private async acquire (): Promise { - while (this.locked) { - await new Promise((resolve) => { - this.waitingQueue.push(resolve) - }) - } - this.locked = true - } - - private release (): void { - if (!this.locked) { - throw new Error('Mutex is not locked') - } - - this.locked = false - const nextResolver = this.waitingQueue.shift() - if (nextResolver !== undefined) { - nextResolver(true) - } - } - - async runExclusive(fn: () => Promise | T): Promise { - await this.acquire() - try { - return await fn() - } finally { - this.release() - } - } -}