remove mutex (#7056)
Some checks are pending
CI / build (push) Waiting to run
CI / svelte-check (push) Blocked by required conditions
CI / formatting (push) Blocked by required conditions
CI / test (push) Blocked by required conditions
CI / uitest (push) Waiting to run
CI / uitest-pg (push) Waiting to run
CI / uitest-qms (push) Waiting to run
CI / docker-build (push) Blocked by required conditions
CI / dist-build (push) Blocked by required conditions

Signed-off-by: Denis Bykhov <bykhov.denis@gmail.com>
This commit is contained in:
Denis Bykhov 2024-10-30 18:51:18 +05:00 committed by GitHub
parent 1b7d559180
commit e4bf53ad1b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 75 additions and 88 deletions

View File

@ -57,6 +57,7 @@ export class MeasureMetricsContext implements MeasureContext {
private readonly fullParams: FullParamsType | (() => FullParamsType) = {}
logger: MeasureLogger
metrics: Metrics
id?: string
st = Date.now()
contextData: object = {}
@ -125,6 +126,8 @@ export class MeasureMetricsContext implements MeasureContext {
this,
this.logParams
)
result.id = this.id
result.onEnd = this.onEnd.bind(this)
result.contextData = this.contextData
return result
}
@ -197,6 +200,8 @@ export class MeasureMetricsContext implements MeasureContext {
end (): void {
this.done()
}
async onEnd (ctx: MeasureContext): Promise<void> {}
}
/**

View File

@ -111,4 +111,6 @@ export interface MeasureContext<Q = any> {
// Mark current context as complete
// If no value is passed, time difference will be used.
end: (value?: number) => void
onEnd?: (ctx: MeasureContext) => Promise<void>
}

View File

@ -17,6 +17,7 @@
import core, {
TxFactory,
TxProcessor,
generateId,
groupByArray,
matchQuery,
type Class,
@ -149,8 +150,14 @@ export class Triggers {
trigger.resource,
{},
async (ctx) => {
if (mode === 'async') {
ctx.id = generateId()
}
const tresult = await this.applyTrigger(ctx, ctrl, matches, { trigger, arrays })
result.push(...tresult)
if (ctx.onEnd !== undefined && mode === 'async') {
await ctx.onEnd(ctx)
}
},
{ count: matches.length, arrays }
)

View File

@ -160,8 +160,6 @@ export interface PipelineContext {
head?: Middleware
broadcastEvent?: (ctx: MeasureContext, tx: Tx[]) => Promise<void>
endContext?: (ctx: MeasureContext) => Promise<void>
}
/**
* @public

View File

@ -23,9 +23,12 @@ export class ConnectionMgrMiddleware extends BaseMiddleware implements Middlewar
if (ctx.id === undefined) {
ctx.id = generateId()
}
ctx.onEnd = async (_ctx: MeasureContext) => {
await this.context.adapterManager?.closeContext?.(_ctx)
}
const result = await this.provideTx(ctx, tx)
this.context.endContext = async (_ctx: MeasureContext) => {
await this.context.adapterManager?.closeContext?.(ctx)
if (ctx.onEnd !== undefined) {
await ctx.onEnd(ctx)
}
return result
}

View File

@ -33,6 +33,7 @@ import core, {
type TxRemoveDoc,
type TxUpdateDoc,
addOperation,
generateId,
toFindResult,
withContext
} from '@hcengineering/core'
@ -225,11 +226,16 @@ export class TriggersMiddleware extends BaseMiddleware implements Middleware {
)
if (aresult.length > 0) {
await ctx.with('process-aync-result', {}, async (ctx) => {
ctx.id = generateId()
await this.processDerivedTxes(ctx, aresult)
// We need to send all to recipients
await this.context.head?.handleBroadcast(ctx)
if (ctx.onEnd !== undefined) {
await ctx.onEnd(ctx)
}
})
}
await this.context.endContext?.(ctx)
}
private async processDerivedTxes (ctx: MeasureContext<SessionData>, derived: Tx[]): Promise<void> {

View File

@ -79,7 +79,6 @@ import {
isDataField,
isOwner,
type JoinProps,
Mutex,
parseDoc,
parseDocWithProjection,
parseUpdate,
@ -90,16 +89,36 @@ import {
abstract class PostgresAdapterBase implements DbAdapter {
protected readonly _helper: DBCollectionHelper
protected readonly tableFields = new Map<string, string[]>()
protected readonly mutex = new Mutex()
protected readonly connections = new Map<string, postgres.ReservedSql>()
protected readonly connections = new Map<string, postgres.ReservedSql | Promise<postgres.ReservedSql>>()
protected readonly retryTxn = async (
connection: postgres.ReservedSql,
client: postgres.ReservedSql,
fn: (client: postgres.ReservedSql) => Promise<any>
): Promise<void> => {
await this.mutex.runExclusive(async () => {
await this.processOps(connection, fn)
})
const backoffInterval = 100 // millis
const maxTries = 5
let tries = 0
while (true) {
await client.unsafe('BEGIN;')
tries++
try {
const result = await fn(client)
await client.unsafe('COMMIT;')
return result
} catch (err: any) {
await client.unsafe('ROLLBACK;')
if (err.code !== '40001' || tries === maxTries) {
throw err
} else {
console.log('Transaction failed. Retrying.')
console.log(err.message)
await new Promise((resolve) => setTimeout(resolve, tries * backoffInterval))
}
}
}
}
constructor (
@ -133,7 +152,11 @@ abstract class PostgresAdapterBase implements DbAdapter {
if (ctx.id === undefined) return
const conn = this.connections.get(ctx.id)
if (conn !== undefined) {
if (conn instanceof Promise) {
;(await conn).release()
} else {
conn.release()
}
this.connections.delete(ctx.id)
}
}
@ -141,40 +164,10 @@ abstract class PostgresAdapterBase implements DbAdapter {
protected async getConnection (ctx: MeasureContext): Promise<postgres.ReservedSql | undefined> {
if (ctx.id === undefined) return
const conn = this.connections.get(ctx.id)
if (conn !== undefined) return conn
const client = await this.client.reserve()
if (conn !== undefined) return await conn
const client = this.client.reserve()
this.connections.set(ctx.id, client)
return client
}
private async processOps (
client: postgres.ReservedSql,
operation: (client: postgres.ReservedSql) => Promise<any>
): Promise<void> {
const backoffInterval = 100 // millis
const maxTries = 5
let tries = 0
while (true) {
await client.unsafe('BEGIN;')
tries++
try {
const result = await operation(client)
await client.unsafe('COMMIT;')
return result
} catch (err: any) {
await client.unsafe('ROLLBACK;')
if (err.code !== '40001' || tries === maxTries) {
throw err
} else {
console.log('Transaction failed. Retrying.')
console.log(err.message)
await new Promise((resolve) => setTimeout(resolve, tries * backoffInterval))
}
}
}
return await client
}
async traverse<T extends Doc>(
@ -239,9 +232,13 @@ abstract class PostgresAdapterBase implements DbAdapter {
abstract init (): Promise<void>
async close (): Promise<void> {
this.connections.forEach((c) => {
for (const c of this.connections.values()) {
if (c instanceof Promise) {
;(await c).release()
} else {
c.release()
})
}
}
this.refClient.close()
}
@ -1495,7 +1492,7 @@ class PostgresAdapter extends PostgresAdapterBase {
return { object }
}
} catch (err) {
console.error(err)
console.error(err, { tx, params, updates })
}
})
return {}

View File

@ -241,6 +241,10 @@ export function inferType (val: any): string {
if (typeof val === 'boolean') {
return '::boolean'
}
if (Array.isArray(val)) {
const type = inferType(val[0])
return type + '[]'
}
return ''
}
@ -409,38 +413,3 @@ export interface JoinProps {
toClass: Ref<Class<Doc>>
classes?: Ref<Class<Doc>>[] // filter by classes
}
export class Mutex {
private locked: boolean = false
private readonly waitingQueue: Array<(value: boolean) => void> = []
private async acquire (): Promise<void> {
while (this.locked) {
await new Promise<boolean>((resolve) => {
this.waitingQueue.push(resolve)
})
}
this.locked = true
}
private release (): void {
if (!this.locked) {
throw new Error('Mutex is not locked')
}
this.locked = false
const nextResolver = this.waitingQueue.shift()
if (nextResolver !== undefined) {
nextResolver(true)
}
}
async runExclusive<T>(fn: () => Promise<T> | T): Promise<T> {
await this.acquire()
try {
return await fn()
} finally {
this.release()
}
}
}