UBERF-8608: Rework connection management (#7248)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-12-02 14:37:12 +07:00 committed by GitHub
parent e0f2c87ed3
commit e4221f779f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 588 additions and 454 deletions

View File

@ -114,7 +114,6 @@ export class MeasureMetricsContext implements MeasureContext {
this.logParams
)
result.id = this.id
result.onEnd = this.onEnd.bind(this)
result.contextData = this.contextData
return result
}
@ -190,8 +189,6 @@ export class MeasureMetricsContext implements MeasureContext {
end (): void {
this.done()
}
async onEnd (ctx: MeasureContext): Promise<void> {}
}
/**
@ -220,7 +217,10 @@ export function registerOperationLog (ctx: MeasureContext): { opLogMetrics?: Met
}
const op: OperationLog = { start: Date.now(), ops: [], end: -1 }
let opLogMetrics: Metrics | undefined
ctx.id = generateId()
if (ctx.id === undefined) {
ctx.id = 'op_' + generateId()
}
if (ctx.metrics !== undefined) {
if (ctx.metrics.opLog === undefined) {
ctx.metrics.opLog = {}

View File

@ -111,6 +111,4 @@ export interface MeasureContext<Q = any> {
// Mark current context as complete
// If no value is passed, time difference will be used.
end: (value?: number) => void
onEnd?: (ctx: MeasureContext) => Promise<void>
}

View File

@ -59,6 +59,8 @@ export interface SessionData {
branding: Branding | null
fulltextUpdates?: Map<Ref<DocIndexState>, DocIndexState>
asyncRequests?: (() => Promise<void>)[]
}
/**

View File

@ -144,9 +144,6 @@ class WorkspaceIndexer {
await helper.checkDomain(ctx, DOMAIN_DOC_INDEX_STATE, 10000, dhelper)
}
}
},
async (ctx: MeasureContext) => {
await result.pipeline.context.adapterManager?.closeContext?.(ctx)
}
)
await result.fulltext.startIndexing(() => {

View File

@ -69,8 +69,7 @@ export interface DbAdapter extends LowLevelStorage {
helper?: () => DomainHelperOperations
closeContext?: (ctx: MeasureContext) => Promise<void>
reserveContext?: (id: string) => () => void
close: () => Promise<void>
findAll: <T extends Doc>(
ctx: MeasureContext,

View File

@ -42,16 +42,22 @@ export class DbAdapterManagerImpl implements DBAdapterManager {
private readonly adapters: Map<string, DbAdapter>
) {}
async closeContext (ctx: MeasureContext): Promise<void> {
reserveContext (id: string): () => void {
const ops: (() => void)[] = []
for (const adapter of this.adapters.values()) {
try {
if (adapter.closeContext !== undefined) {
await adapter.closeContext(ctx)
if (adapter.reserveContext !== undefined) {
ops.push(adapter.reserveContext(id))
}
} catch (err: any) {
Analytics.handleError(err)
}
}
return () => {
for (const op of ops) {
op()
}
}
}
getDefaultAdapter (): DbAdapter {

View File

@ -17,7 +17,6 @@
import core, {
TxFactory,
TxProcessor,
generateId,
groupByArray,
matchQuery,
type Class,
@ -145,13 +144,11 @@ export class Triggers {
trigger.resource,
{},
async (ctx) => {
if (mode === 'async') {
ctx.id = generateId()
}
try {
const tresult = await this.applyTrigger(ctx, ctrl, matches, { trigger })
result.push(...tresult)
if (ctx.onEnd !== undefined && mode === 'async') {
await ctx.onEnd(ctx)
} catch (err: any) {
ctx.error('error during async processing', { err })
}
},
{ count: matches.length }

View File

@ -152,7 +152,7 @@ export interface DBAdapterManager {
initAdapters: (ctx: MeasureContext) => Promise<void>
closeContext: (ctx: MeasureContext) => Promise<void>
reserveContext: (id: string) => () => void
domainHelper?: DomainHelper
}

View File

@ -127,8 +127,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
readonly storageAdapter: StorageAdapter,
readonly contentAdapter: ContentTextAdapter,
readonly broadcastUpdate: (ctx: MeasureContext, classes: Ref<Class<Doc>>[]) => void,
readonly checkIndexes: () => Promise<void>,
readonly closeContext: (ctx: MeasureContext) => Promise<void>
readonly checkIndexes: () => Promise<void>
) {
this.contexts = new Map(model.findAllSync(core.class.FullTextSearchContext, {}).map((it) => [it.toClass, it]))
}
@ -406,7 +405,6 @@ export class FullTextIndexPipeline implements FullTextPipeline {
await rateLimiter.exec(async () => {
let st = Date.now()
ctx.id = generateId()
let groupBy = await this.storage.groupBy(ctx, DOMAIN_DOC_INDEX_STATE, 'objectClass', { needIndex: true })
const total = Array.from(groupBy.values()).reduce((a, b) => a + b, 0)
while (true) {
@ -524,7 +522,6 @@ export class FullTextIndexPipeline implements FullTextPipeline {
this.metrics.error('error during index', { error: err })
}
}
await this.closeContext(ctx)
})
return { classUpdate: Array.from(_classUpdate.values()), processed }
}

View File

@ -1,35 +0,0 @@
import { generateId, type MeasureContext, type Tx } from '@hcengineering/core'
import {
BaseMiddleware,
type Middleware,
type PipelineContext,
type TxMiddlewareResult
} from '@hcengineering/server-core'
/**
* Will support apply tx
* @public
*/
export class ConnectionMgrMiddleware extends BaseMiddleware implements Middleware {
static async create (
ctx: MeasureContext,
context: PipelineContext,
next?: Middleware
): Promise<Middleware | undefined> {
return new ConnectionMgrMiddleware(context, next)
}
async tx (ctx: MeasureContext, tx: Tx[]): Promise<TxMiddlewareResult> {
if (ctx.id === undefined) {
ctx.id = generateId()
}
ctx.onEnd = async (_ctx: MeasureContext) => {
await this.context.adapterManager?.closeContext?.(_ctx)
}
const result = await this.provideTx(ctx, tx)
if (ctx.onEnd !== undefined) {
await ctx.onEnd(ctx)
}
return result
}
}

View File

@ -16,7 +16,6 @@
export * from './applyTx'
export * from './broadcast'
export * from './configuration'
export * from './connectionMgr'
export * from './contextName'
export * from './dbAdapter'
export * from './dbAdapterHelper'

View File

@ -33,7 +33,6 @@ import core, {
type TxRemoveDoc,
type TxUpdateDoc,
addOperation,
generateId,
toFindResult,
withContext
} from '@hcengineering/core'
@ -170,15 +169,18 @@ export class TriggersMiddleware extends BaseMiddleware implements Middleware {
await this.processDerivedTxes(ctx, derived)
}
const asyncProcess = this.processAsyncTriggers(ctx, triggerControl, findAll, txes, triggers)
// In case of async context, we execute both async and sync triggers as sync
const performSync = (ctx as MeasureContext<SessionDataImpl>).contextData.isAsyncContext ?? false
if ((ctx as MeasureContext<SessionDataImpl>).contextData.isAsyncContext ?? false) {
await asyncProcess
if (performSync) {
await this.processAsyncTriggers(ctx, triggerControl, findAll, txes, triggers)
} else {
asyncProcess.catch((err) => {
ctx.error('error during processing async triggers', { err })
})
ctx.contextData.asyncRequests = [
...(ctx.contextData.asyncRequests ?? []),
async () => {
// In case of async context, we execute both async and sync triggers as sync
await this.processAsyncTriggers(ctx, triggerControl, findAll, txes, triggers)
}
]
}
}
@ -224,10 +226,6 @@ export class TriggersMiddleware extends BaseMiddleware implements Middleware {
this.context.modelDb
)
ctx.contextData = asyncContextData
if ((ctx as MeasureContext<SessionDataImpl>).contextData.isAsyncContext ?? false) {
ctx.id = 'async_tr' + generateId()
}
const aresult = await this.triggers.apply(
ctx,
txes,
@ -247,9 +245,6 @@ export class TriggersMiddleware extends BaseMiddleware implements Middleware {
await this.context.head?.handleBroadcast(ctx)
})
}
if (ctx.onEnd !== undefined) {
await ctx.onEnd(ctx)
}
}
private async processDerivedTxes (ctx: MeasureContext<SessionData>, derived: Tx[]): Promise<void> {

View File

@ -27,6 +27,7 @@ import core, {
DOMAIN_TX,
type FindOptions,
type FindResult,
generateId,
groupByArray,
type Hierarchy,
isOperator,
@ -50,6 +51,7 @@ import core, {
type TxRemoveDoc,
type TxResult,
type TxUpdateDoc,
withContext,
type WithLookup,
type WorkspaceId
} from '@hcengineering/core'
@ -107,88 +109,229 @@ async function * createCursorGenerator (
}
}
abstract class PostgresAdapterBase implements DbAdapter {
protected readonly _helper: DBCollectionHelper
protected readonly tableFields = new Map<string, string[]>()
protected readonly connections = new Map<string, postgres.ReservedSql | Promise<postgres.ReservedSql>>()
class ConnectionInfo {
// It should preserve at least one available connection in pool, other connection should be closed
available: postgres.ReservedSql[] = []
protected readonly retryTxn = async (
client: postgres.ReservedSql,
fn: (client: postgres.ReservedSql) => Promise<any>
): Promise<void> => {
const backoffInterval = 100 // millis
released: boolean = false
constructor (
readonly mgrId: string,
readonly connectionId: string,
protected readonly client: postgres.Sql,
readonly managed: boolean
) {}
async withReserve (
reserveOrPool: boolean,
action: (reservedClient: postgres.ReservedSql | postgres.Sql) => Promise<any>
): Promise<any> {
let reserved: postgres.ReservedSql | undefined
// Check if we have at least one available connection and reserve one more if required.
if (this.available.length === 0) {
if (reserveOrPool) {
reserved = await this.client.reserve()
}
} else {
reserved = this.available.shift() as postgres.ReservedSql
}
try {
// Use reserved or pool
return await action(reserved ?? this.client)
} catch (err: any) {
console.error(err)
throw err
} finally {
if (this.released) {
reserved?.release()
} else {
// after use we put into available
if (reserved !== undefined) {
this.available.push(reserved)
}
if (this.available.length > 1) {
// We need to release any >= 1
const toRelease = this.available.splice(1, this.available.length - 1)
for (const r of toRelease) {
r.release()
}
}
}
}
}
release (): void {
for (const c of this.available) {
c.release()
}
this.available = []
}
}
const connections = new Map<string, ConnectionInfo>()
class ConnectionMgr {
constructor (
protected readonly client: postgres.Sql,
readonly mgrId: string
) {}
async write (
id: string | undefined,
fn: (client: postgres.Sql | postgres.ReservedSql) => Promise<any>
): Promise<void> {
const backoffInterval = 25 // millis
const maxTries = 5
let tries = 0
while (true) {
await client.unsafe('BEGIN;')
tries++
const realId = id ?? generateId()
const connection = this.getConnection(realId, false)
try {
const result = await fn(client)
while (true) {
const retry: boolean | Error = await connection.withReserve(true, async (client) => {
tries++
try {
await client.unsafe('BEGIN;')
await fn(client)
await client.unsafe('COMMIT;')
return result
return true
} catch (err: any) {
await client.unsafe('ROLLBACK;')
if (err.code !== '40001' || tries === maxTries) {
throw err
return err
} else {
console.log('Transaction failed. Retrying.')
console.log(err.message)
await new Promise((resolve) => setTimeout(resolve, tries * backoffInterval))
return false
}
}
})
if (retry === true) {
break
}
if (retry instanceof Error) {
// Pass it to exit
throw retry
}
// Retry for a timeout
await new Promise((resolve) => setTimeout(resolve, backoffInterval))
}
} finally {
if (!connection.managed) {
// We need to relase in case it temporaty connection was used
connection.release()
}
}
}
async read (id: string | undefined, fn: (client: postgres.Sql | postgres.ReservedSql) => Promise<any>): Promise<any> {
const backoffInterval = 25 // millis
const maxTries = 5
let tries = 0
const realId = id ?? generateId()
// Will reuse reserved if had and use new one if not
const connection = this.getConnection(realId, false)
try {
while (true) {
const retry: false | { result: any } | Error = await connection.withReserve(false, async (client) => {
tries++
try {
return { result: await fn(client) }
} catch (err: any) {
if (err.code !== '40001' || tries === maxTries) {
return err
} else {
console.log('Read Transaction failed. Retrying.')
console.log(err.message)
return false
}
}
})
if (retry instanceof Error) {
// Pass it to exit
throw retry
}
if (retry === false) {
// Retry for a timeout
await new Promise((resolve) => setTimeout(resolve, backoffInterval))
continue
}
return retry.result
}
} finally {
if (!connection.managed) {
// We need to relase in case it temporaty connection was used
connection.release()
}
}
}
release (id: string): void {
const conn = connections.get(id)
if (conn !== undefined) {
conn.released = true
connections.delete(id) // We need to delete first
conn.release()
} else {
console.log('wrne')
}
}
close (): void {
for (const [k, conn] of Array.from(connections.entries()).filter(
([, it]: [string, ConnectionInfo]) => it.mgrId === this.mgrId
)) {
connections.delete(k)
conn.release()
}
}
getConnection (id: string, managed: boolean = true): ConnectionInfo {
let conn = connections.get(id)
if (conn === undefined) {
conn = new ConnectionInfo(this.mgrId, id, this.client, managed)
}
if (managed) {
connections.set(id, conn)
}
return conn
}
}
abstract class PostgresAdapterBase implements DbAdapter {
protected readonly _helper: DBCollectionHelper
protected readonly tableFields = new Map<string, string[]>()
mgr: ConnectionMgr
constructor (
protected readonly client: postgres.Sql,
protected readonly refClient: PostgresClientReference,
protected readonly workspaceId: WorkspaceId,
protected readonly hierarchy: Hierarchy,
protected readonly modelDb: ModelDb
protected readonly modelDb: ModelDb,
readonly mgrId: string
) {
this._helper = new DBCollectionHelper(this.client, this.workspaceId)
this.mgr = new ConnectionMgr(client, mgrId)
}
protected async withConnection<T>(
ctx: MeasureContext,
operation: (client: postgres.ReservedSql) => Promise<T>
): Promise<T> {
const connection = await this.getConnection(ctx)
if (connection !== undefined) {
return await operation(connection)
} else {
const client = await this.client.reserve()
try {
return await operation(client)
} finally {
client.release()
}
}
}
async closeContext (ctx: MeasureContext): Promise<void> {
if (ctx.id === undefined) return
const conn = this.connections.get(ctx.id)
if (conn !== undefined) {
if (conn instanceof Promise) {
;(await conn).release()
} else {
reserveContext (id: string): () => void {
const conn = this.mgr.getConnection(id, true)
return () => {
conn.released = true
conn.release()
connections.delete(id) // We need to delete first
}
this.connections.delete(ctx.id)
}
}
protected async getConnection (ctx: MeasureContext): Promise<postgres.ReservedSql | undefined> {
if (ctx.id === undefined) return
const conn = this.connections.get(ctx.id)
if (conn !== undefined) return await conn
const client = this.client.reserve()
this.connections.set(ctx.id, client)
return await client
}
async traverse<T extends Doc>(
@ -236,13 +379,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
abstract init (): Promise<void>
async close (): Promise<void> {
for (const c of this.connections.values()) {
if (c instanceof Promise) {
;(await c).release()
} else {
c.release()
}
}
this.mgr.close()
this.refClient.close()
}
@ -258,8 +395,8 @@ abstract class PostgresAdapterBase implements DbAdapter {
sqlChunks.push(`LIMIT ${options.limit}`)
}
const finalSql: string = [select, ...sqlChunks].join(' ')
const result = await this.client.unsafe(finalSql)
return result.map((p) => parseDocWithProjection(p as any, domain, options?.projection))
const result: DBDoc[] = await this.mgr.read(undefined, (client) => client.unsafe(finalSql))
return result.map((p) => parseDocWithProjection(p, domain, options?.projection))
}
buildRawOrder<T extends Doc>(domain: string, sort: SortingQuery<T>): string {
@ -305,25 +442,22 @@ abstract class PostgresAdapterBase implements DbAdapter {
if ((operations as any)['%hash%'] === undefined) {
;(operations as any)['%hash%'] = null
}
const domainFields = new Set(getDocFieldsByDomains(domain))
if (isOps) {
const conn = await this.client.reserve()
try {
await this.retryTxn(conn, async (client) => {
const res = await client.unsafe(
`SELECT * FROM ${translateDomain(domain)} WHERE ${translatedQuery} FOR UPDATE`
)
await this.mgr.write(undefined, async (client) => {
const res = await client.unsafe(`SELECT * FROM ${translateDomain(domain)} WHERE ${translatedQuery} FOR UPDATE`)
const schema = getSchema(domain)
const docs = res.map((p) => parseDoc(p as any, schema))
const domainFields = new Set(getDocFieldsByDomains(domain))
for (const doc of docs) {
if (doc === undefined) continue
const prevAttachedTo = (doc as any).attachedTo
TxProcessor.applyUpdate(doc, operations)
;(doc as any)['%hash%'] = null
const converted = convertDoc(domain, doc, this.workspaceId.name, domainFields)
const params: any[] = [doc._id, this.workspaceId.name]
let paramsIndex = params.length + 1
const updates: string[] = []
const { extractedFields, remainingData } = parseUpdate(domain, operations)
const { extractedFields, remainingData } = parseUpdate(operations, domainFields)
const newAttachedTo = (doc as any).attachedTo
if (Object.keys(extractedFields).length > 0) {
for (const key in extractedFields) {
@ -347,24 +481,22 @@ abstract class PostgresAdapterBase implements DbAdapter {
)
}
})
} finally {
conn.release()
}
} else {
await this.rawUpdateDoc(domain, query, operations)
await this.rawUpdateDoc(domain, query, operations, domainFields)
}
}
private async rawUpdateDoc<T extends Doc>(
domain: Domain,
query: DocumentQuery<T>,
operations: DocumentUpdate<T>
operations: DocumentUpdate<T>,
domainFields: Set<string>
): Promise<void> {
const translatedQuery = this.buildRawQuery(domain, query)
const updates: string[] = []
const params: any[] = []
let paramsIndex = params.length + 1
const { extractedFields, remainingData } = parseUpdate(domain, operations)
const { extractedFields, remainingData } = parseUpdate(operations, domainFields)
const { space, attachedTo, ...ops } = operations as any
for (const key in extractedFields) {
updates.push(`"${key}" = $${paramsIndex++}`)
@ -382,32 +514,19 @@ abstract class PostgresAdapterBase implements DbAdapter {
if (dataUpdated) {
updates.push(`data = ${from}`)
}
const conn = await this.client.reserve()
try {
await this.retryTxn(conn, async (client) => {
await this.mgr.write(undefined, async (client) => {
await client.unsafe(
`UPDATE ${translateDomain(domain)} SET ${updates.join(', ')} WHERE ${translatedQuery}`,
params
)
})
} catch (err: any) {
console.error(err, { domain, params, updates })
throw err
} finally {
conn.release()
}
}
async rawDeleteMany<T extends Doc>(domain: Domain, query: DocumentQuery<T>): Promise<void> {
const translatedQuery = this.buildRawQuery(domain, query)
const conn = await this.client.reserve()
try {
await this.retryTxn(conn, async (client) => {
await this.mgr.write(undefined, async (client) => {
await client.unsafe(`DELETE FROM ${translateDomain(domain)} WHERE ${translatedQuery}`)
})
} finally {
conn.release()
}
}
findAll<T extends Doc>(
@ -447,8 +566,10 @@ abstract class PostgresAdapterBase implements DbAdapter {
sqlChunks.push(this.buildJoinString(joins))
}
sqlChunks.push(`WHERE ${this.buildQuery(_class, domain, query, joins, options)}`)
const connection = (await this.getConnection(ctx)) ?? this.client
const findId = ctx.id ?? generateId()
return (await this.mgr.read(findId, async (connection) => {
let total = options?.total === true ? 0 : -1
if (options?.total === true) {
const totalReq = `SELECT COUNT(${domain}._id) as count FROM ${domain}`
@ -475,6 +596,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
const res = this.parseLookup<T>(result, joins, options?.projection, domain)
return toFindResult(res, total)
}
})) as FindResult<T>
} catch (err) {
ctx.error('Error in findAll', { err })
throw err
@ -1131,20 +1253,23 @@ abstract class PostgresAdapterBase implements DbAdapter {
const tdomain = translateDomain(domain)
const schema = getSchema(domain)
const findId = generateId()
const flush = async (flush = false): Promise<void> => {
if (bulkUpdate.size > 1000 || flush) {
if (bulkUpdate.size > 0) {
const entries = Array.from(bulkUpdate.entries())
bulkUpdate.clear()
const updateClient = await this.client.reserve()
try {
while (entries.length > 0) {
const part = entries.splice(0, 200)
const data: string[] = part.flat()
const indexes = part.map((val, idx) => `($${2 * idx + 1}::text, $${2 * idx + 2}::text)`).join(', ')
await ctx.with('bulk-write-find', {}, () => {
return this.retryTxn(updateClient, (client) =>
client.unsafe(
return this.mgr.write(
findId,
async (client) =>
await client.unsafe(
`
UPDATE ${tdomain} SET "%hash%" = update_data.hash
FROM (values ${indexes}) AS update_data(_id, hash)
@ -1157,8 +1282,6 @@ abstract class PostgresAdapterBase implements DbAdapter {
}
} catch (err: any) {
ctx.error('failed to update hash', { err })
} finally {
updateClient.release()
}
}
}
@ -1182,10 +1305,10 @@ abstract class PostgresAdapterBase implements DbAdapter {
}
if (recheck === true) {
await this.retryTxn(
client,
(client) =>
client`UPDATE ${client(tdomain)} SET "%hash%" = NULL WHERE "workspaceId" = ${this.workspaceId.name} AND "%hash%" IS NOT NULL`
await this.mgr.write(
findId,
async (client) =>
await client`UPDATE ${client(tdomain)} SET "%hash%" = NULL WHERE "workspaceId" = ${this.workspaceId.name} AND "%hash%" IS NOT NULL`
)
}
@ -1207,10 +1330,10 @@ abstract class PostgresAdapterBase implements DbAdapter {
if (pos === -1) {
await bulk.return([]) // We need to close generator
docs = { done: true, value: undefined }
await this.retryTxn(
client,
(client) =>
client`UPDATE ${client(tdomain)} SET "%hash%" = NULL WHERE "workspaceId" = ${this.workspaceId.name} AND "%hash%" IS NOT NULL`
await this.mgr.write(
findId,
async (client) =>
await client`UPDATE ${client(tdomain)} SET "%hash%" = NULL WHERE "workspaceId" = ${this.workspaceId.name} AND "%hash%" IS NOT NULL`
)
break
}
@ -1247,14 +1370,12 @@ abstract class PostgresAdapterBase implements DbAdapter {
if (docs.length === 0) {
return []
}
const client = await this.client.reserve()
try {
return await this.mgr.read('', async (client) => {
const res =
await client`SELECT * FROM ${client(translateDomain(domain))} WHERE _id = ANY(${docs}) AND "workspaceId" = ${this.workspaceId.name}`
return res.map((p) => parseDocWithProjection(p as any, domain))
} finally {
client.release()
}
})
})
}
@ -1271,7 +1392,6 @@ abstract class PostgresAdapterBase implements DbAdapter {
const insertStr = insertFields.join(', ')
const onConflictStr = onConflict.join(', ')
const client = await this.client.reserve()
try {
const domainFields = new Set(getDocFieldsByDomains(domain))
const toUpload = [...docs]
@ -1305,8 +1425,10 @@ abstract class PostgresAdapterBase implements DbAdapter {
}
const vals = vars.join(',')
await this.retryTxn(client, (client) =>
client.unsafe(
await this.mgr.write(
ctx.id,
async (client) =>
await client.unsafe(
`INSERT INTO ${tdomain} ("workspaceId", ${insertStr}) VALUES ${vals}
ON CONFLICT ("workspaceId", _id) DO UPDATE SET ${onConflictStr};`,
values
@ -1316,30 +1438,23 @@ abstract class PostgresAdapterBase implements DbAdapter {
} catch (err: any) {
ctx.error('failed to upload', { err })
throw err
} finally {
client.release()
}
})
}
async clean (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<void> {
const updateClient = await this.client.reserve()
try {
const tdomain = translateDomain(domain)
const toClean = [...docs]
while (toClean.length > 0) {
const part = toClean.splice(0, 200)
await ctx.with('clean', {}, () => {
return this.retryTxn(
updateClient,
return this.mgr.write(
ctx.id,
(client) =>
client`DELETE FROM ${client(tdomain)} WHERE _id = ANY(${part}) AND "workspaceId" = ${this.workspaceId.name}`
)
})
}
} finally {
updateClient.release()
}
}
groupBy<T, P extends Doc>(
@ -1350,12 +1465,13 @@ abstract class PostgresAdapterBase implements DbAdapter {
): Promise<Map<T, number>> {
const key = isDataField(domain, field) ? `data ->> '${field}'` : `"${field}"`
return ctx.with('groupBy', { domain }, async (ctx) => {
const connection = (await this.getConnection(ctx)) ?? this.client
try {
return await this.mgr.read(ctx.id ?? generateId(), async (connection) => {
const result = await connection.unsafe(
`SELECT DISTINCT ${key} as ${field}, Count(*) AS count FROM ${translateDomain(domain)} WHERE ${this.buildRawQuery(domain, query ?? {})} GROUP BY ${key}`
)
return new Map(result.map((r) => [r[field.toLocaleLowerCase()], parseInt(r.count)]))
})
} catch (err) {
ctx.error('Error while grouping by', { domain, field })
throw err
@ -1365,13 +1481,12 @@ abstract class PostgresAdapterBase implements DbAdapter {
update (ctx: MeasureContext, domain: Domain, operations: Map<Ref<Doc>, DocumentUpdate<Doc>>): Promise<void> {
const ids = Array.from(operations.keys())
return this.withConnection(ctx, (client) => {
return this.retryTxn(client, async (client) => {
return this.mgr.write(ctx.id, async (client) => {
try {
const res =
const res: DBDoc[] =
await client`SELECT * FROM ${client(translateDomain(domain))} WHERE _id = ANY(${ids}) AND "workspaceId" = ${this.workspaceId.name} FOR UPDATE`
const schema = getSchema(domain)
const docs = res.map((p) => parseDoc(p as any, schema))
const docs = res.map((p) => parseDoc(p, schema))
const map = new Map(docs.map((d) => [d._id, d]))
const domainFields = new Set(getDocFieldsByDomains(domain))
for (const [_id, ops] of operations) {
@ -1385,7 +1500,7 @@ abstract class PostgresAdapterBase implements DbAdapter {
const converted = convertDoc(domain, doc, this.workspaceId.name, domainFields)
const columns: string[] = []
const { extractedFields, remainingData } = parseUpdate(domain, op)
const { extractedFields, remainingData } = parseUpdate(op, domainFields)
for (const key in extractedFields) {
columns.push(key)
}
@ -1404,18 +1519,18 @@ abstract class PostgresAdapterBase implements DbAdapter {
throw err
}
})
})
}
@withContext('insert')
async insert (ctx: MeasureContext, domain: string, docs: Doc[]): Promise<TxResult> {
const fields = getDocFieldsByDomains(domain)
const schema = getSchema(domain)
const fields = Object.keys(schema)
const filedsWithData = [...fields, 'data']
const columns: string[] = ['workspaceId']
for (const field of filedsWithData) {
columns.push(field)
}
await this.withConnection(ctx, async (connection) => {
const domainFields = new Set(getDocFieldsByDomains(domain))
const domainFields = new Set(fields)
while (docs.length > 0) {
const part = docs.splice(0, 500)
const values: DBDoc[] = []
@ -1424,15 +1539,27 @@ abstract class PostgresAdapterBase implements DbAdapter {
const d = convertDoc(domain, doc, this.workspaceId.name, domainFields)
values.push(d)
}
await this.retryTxn(connection, async (client) => {
await this.mgr.write(ctx.id, async (client) => {
try {
await client`INSERT INTO ${client(translateDomain(domain))} ${client(values, columns)}`
})
} catch (err: any) {
console.error('inserting error', err)
}
})
}
return {}
}
}
interface OperationBulk {
add: Doc[]
updates: TxUpdateDoc<Doc>[]
removes: TxRemoveDoc<Doc>[]
mixins: TxMixin<Doc, Doc>[]
}
class PostgresAdapter extends PostgresAdapterBase {
async init (domains?: string[], excludeDomains?: string[]): Promise<void> {
let resultDomains = domains ?? this.hierarchy.domains()
@ -1443,17 +1570,20 @@ class PostgresAdapter extends PostgresAdapterBase {
this._helper.domains = new Set(resultDomains as Domain[])
}
private async process (ctx: MeasureContext, tx: Tx): Promise<TxResult | undefined> {
private process (ops: OperationBulk, tx: Tx): void {
switch (tx._class) {
case core.class.TxCreateDoc:
return await this.txCreateDoc(ctx, tx as TxCreateDoc<Doc>)
ops.add.push(TxProcessor.createDoc2Doc(tx as TxCreateDoc<Doc>))
break
case core.class.TxUpdateDoc:
return await this.txUpdateDoc(ctx, tx as TxUpdateDoc<Doc>)
ops.updates.push(tx as TxUpdateDoc<Doc>)
break
case core.class.TxRemoveDoc:
await this.txRemoveDoc(ctx, tx as TxRemoveDoc<Doc>)
ops.removes.push(tx as TxRemoveDoc<Doc>)
break
case core.class.TxMixin:
return await this.txMixin(ctx, tx as TxMixin<Doc, Doc>)
ops.mixins.push(tx as TxMixin<Doc, Doc>)
break
case core.class.TxApplyIf:
return undefined
default:
@ -1462,17 +1592,16 @@ class PostgresAdapter extends PostgresAdapterBase {
}
}
private async txMixin (ctx: MeasureContext, tx: TxMixin<Doc, Doc>): Promise<TxResult> {
private async txMixin (ctx: MeasureContext, tx: TxMixin<Doc, Doc>, domainFields: Set<string>): Promise<TxResult> {
await ctx.with('tx-mixin', { _class: tx.objectClass, mixin: tx.mixin }, async (ctx) => {
await this.withConnection(ctx, async (connection) => {
await this.retryTxn(connection, async (client) => {
await this.mgr.write(ctx.id, async (client) => {
const doc = await this.findDoc(ctx, client, tx.objectClass, tx.objectId, true)
if (doc === undefined) return
TxProcessor.updateMixin4Doc(doc, tx)
;(doc as any)['%hash%'] = null
const domain = this.hierarchy.getDomain(tx.objectClass)
const converted = convertDoc(domain, doc, this.workspaceId.name)
const { extractedFields } = parseUpdate(domain, tx.attributes as Partial<Doc>)
const converted = convertDoc(domain, doc, this.workspaceId.name, domainFields)
const { extractedFields } = parseUpdate(tx.attributes as Partial<Doc>, domainFields)
const columns = new Set<string>()
for (const key in extractedFields) {
columns.add(key)
@ -1483,7 +1612,6 @@ class PostgresAdapter extends PostgresAdapterBase {
await client`UPDATE ${client(translateDomain(domain))} SET ${client(converted, Array.from(columns))} WHERE _id = ${tx.objectId} AND "workspaceId" = ${this.workspaceId.name}`
})
})
})
return {}
}
@ -1501,25 +1629,52 @@ class PostgresAdapter extends PostgresAdapterBase {
if (domain === undefined) {
continue
}
const ops: OperationBulk = {
add: [],
mixins: [],
removes: [],
updates: []
}
for (const tx of txs) {
const res = await this.process(ctx, tx)
if (res !== undefined) {
this.process(ops, tx)
}
const domainFields = new Set(getDocFieldsByDomains(domain))
if (ops.add.length > 0) {
const res = await this.insert(ctx, domain, ops.add)
if (Object.keys(res).length > 0) {
result.push(res)
}
}
// TODO: Optimize updates
if (ops.updates.length > 0) {
for (const upd of ops.updates) {
const res = await this.txUpdateDoc(ctx, upd, domainFields)
if (Object.keys(res).length > 0) {
result.push(res)
}
}
}
// TODO: Optimize mixins
for (const mix of ops.mixins) {
const res = await this.txMixin(ctx, mix, domainFields)
if (Object.keys(res).length > 0) {
result.push(res)
}
}
if (ops.removes.length > 0) {
await this.clean(
ctx,
domain,
ops.removes.map((it) => it.objectId)
)
}
}
return result
}
protected async txCreateDoc (ctx: MeasureContext, tx: TxCreateDoc<Doc>): Promise<TxResult> {
const doc = TxProcessor.createDoc2Doc(tx)
return await ctx.with('create-doc', { _class: doc._class }, (_ctx) => {
return this.insert(_ctx, this.hierarchy.getDomain(doc._class), [doc])
})
}
protected txUpdateDoc (ctx: MeasureContext, tx: TxUpdateDoc<Doc>): Promise<TxResult> {
protected txUpdateDoc (ctx: MeasureContext, tx: TxUpdateDoc<Doc>, domainFields: Set<string>): Promise<TxResult> {
return ctx.with('tx-update-doc', { _class: tx.objectClass }, (_ctx) => {
if (isOperator(tx.operations)) {
let doc: Doc | undefined
@ -1528,17 +1683,17 @@ class PostgresAdapter extends PostgresAdapterBase {
'update with operations',
{ operations: JSON.stringify(Object.keys(tx.operations)) },
async (ctx) => {
return await this.withConnection(ctx, async (connection) => {
await this.retryTxn(connection, async (client) => {
await this.mgr.write(ctx.id, async (client) => {
doc = await this.findDoc(ctx, client, tx.objectClass, tx.objectId, true)
if (doc === undefined) return {}
ops.modifiedBy = tx.modifiedBy
ops.modifiedOn = tx.modifiedOn
TxProcessor.applyUpdate(doc, ops)
;(doc as any)['%hash%'] = null
const domain = this.hierarchy.getDomain(tx.objectClass)
const converted = convertDoc(domain, doc, this.workspaceId.name)
const converted = convertDoc(domain, doc, this.workspaceId.name, domainFields)
const columns: string[] = []
const { extractedFields, remainingData } = parseUpdate(domain, ops)
const { extractedFields, remainingData } = parseUpdate(ops, domainFields)
for (const key in extractedFields) {
columns.push(key)
}
@ -1551,22 +1706,25 @@ class PostgresAdapter extends PostgresAdapterBase {
return { object: doc }
}
return {}
})
}
)
} else {
return this.updateDoc(_ctx, tx, tx.retrieve ?? false)
return this.updateDoc(_ctx, tx, tx.retrieve ?? false, domainFields)
}
})
}
private updateDoc<T extends Doc>(ctx: MeasureContext, tx: TxUpdateDoc<T>, retrieve: boolean): Promise<TxResult> {
private updateDoc<T extends Doc>(
ctx: MeasureContext,
tx: TxUpdateDoc<T>,
retrieve: boolean,
domainFields: Set<string>
): Promise<TxResult> {
return ctx.with('update jsonb_set', {}, async (_ctx) => {
const updates: string[] = ['"modifiedBy" = $1', '"modifiedOn" = $2']
const params: any[] = [tx.modifiedBy, tx.modifiedOn, tx.objectId, this.workspaceId.name]
let paramsIndex = params.length + 1
const domain = this.hierarchy.getDomain(tx.objectClass)
const { extractedFields, remainingData } = parseUpdate(domain, tx.operations)
const { extractedFields, remainingData } = parseUpdate(tx.operations, domainFields)
const { space, attachedTo, ...ops } = tx.operations as any
if (ops['%hash%'] === undefined) {
ops['%hash%'] = null
@ -1587,29 +1745,29 @@ class PostgresAdapter extends PostgresAdapterBase {
if (dataUpdated) {
updates.push(`data = ${from}`)
}
await this.withConnection(ctx, async (connection) => {
try {
await this.retryTxn(connection, (client) =>
client.unsafe(
await this.mgr.write(ctx.id, async (client) => {
await client.unsafe(
`UPDATE ${translateDomain(this.hierarchy.getDomain(tx.objectClass))} SET ${updates.join(', ')} WHERE _id = $3 AND "workspaceId" = $4`,
params
)
)
})
if (retrieve) {
const object = await this.findDoc(_ctx, connection, tx.objectClass, tx.objectId)
return await this.mgr.read(ctx.id, async (client) => {
const object = await this.findDoc(_ctx, client, tx.objectClass, tx.objectId)
return { object }
})
}
} catch (err) {
console.error(err, { tx, params, updates })
}
})
return {}
})
}
private findDoc (
ctx: MeasureContext,
client: postgres.Sql | postgres.ReservedSql,
client: postgres.ReservedSql | postgres.Sql,
_class: Ref<Class<Doc>>,
_id: Ref<Doc>,
forUpdate: boolean = false
@ -1617,27 +1775,13 @@ class PostgresAdapter extends PostgresAdapterBase {
const domain = this.hierarchy.getDomain(_class)
return ctx.with('find-doc', { _class }, async () => {
const res =
await client`SELECT * FROM ${this.client(translateDomain(domain))} WHERE _id = ${_id} AND "workspaceId" = ${this.workspaceId.name} ${
await client`SELECT * FROM ${client(translateDomain(domain))} WHERE _id = ${_id} AND "workspaceId" = ${this.workspaceId.name} ${
forUpdate ? client` FOR UPDATE` : client``
}`
const dbDoc = res[0]
return dbDoc !== undefined ? parseDoc(dbDoc as any, getSchema(domain)) : undefined
const dbDoc = res[0] as any
return dbDoc !== undefined ? parseDoc(dbDoc, getSchema(domain)) : undefined
})
}
protected async txRemoveDoc (ctx: MeasureContext, tx: TxRemoveDoc<Doc>): Promise<TxResult> {
await ctx.with('tx-remove-doc', { _class: tx.objectClass }, async (_ctx) => {
const domain = translateDomain(this.hierarchy.getDomain(tx.objectClass))
await this.withConnection(_ctx, async (connection) => {
await this.retryTxn(
connection,
(client) =>
client`DELETE FROM ${client(domain)} WHERE _id = ${tx.objectId} AND "workspaceId" = ${this.workspaceId.name}`
)
})
})
return {}
}
}
class PostgresTxAdapter extends PostgresAdapterBase implements TxAdapter {
@ -1674,10 +1818,13 @@ class PostgresTxAdapter extends PostgresAdapterBase implements TxAdapter {
}
async getModel (ctx: MeasureContext): Promise<Tx[]> {
const res = await this
.client`SELECT * FROM ${this.client(translateDomain(DOMAIN_MODEL_TX))} WHERE "workspaceId" = ${this.workspaceId.name} ORDER BY _id ASC, "modifiedOn" ASC`
const res: DBDoc[] = await this.mgr.read(
undefined,
(client) =>
client`SELECT * FROM ${client(translateDomain(DOMAIN_MODEL_TX))} WHERE "workspaceId" = ${this.workspaceId.name} ORDER BY _id ASC, "modifiedOn" ASC`
)
const model = res.map((p) => parseDoc<Tx>(p as any, getSchema(DOMAIN_MODEL_TX)))
const model = res.map((p) => parseDoc<Tx>(p, getSchema(DOMAIN_MODEL_TX)))
// We need to put all core.account.System transactions first
const systemTx: Tx[] = []
const userTx: Tx[] = []
@ -1698,7 +1845,14 @@ export async function createPostgresAdapter (
): Promise<DbAdapter> {
const client = getDBClient(url)
const connection = await client.getClient()
const adapter = new PostgresAdapter(connection, client, workspaceId, hierarchy, modelDb)
const adapter = new PostgresAdapter(
connection,
client,
workspaceId,
hierarchy,
modelDb,
'default-' + workspaceId.name
)
return adapter
}
@ -1714,7 +1868,7 @@ export async function createPostgresTxAdapter (
): Promise<TxAdapter> {
const client = getDBClient(url)
const connection = await client.getClient()
const adapter = new PostgresTxAdapter(connection, client, workspaceId, hierarchy, modelDb)
const adapter = new PostgresTxAdapter(connection, client, workspaceId, hierarchy, modelDb, 'tx' + workspaceId.name)
await adapter.init()
return adapter
}

View File

@ -272,8 +272,9 @@ export function convertDoc<T extends Doc> (
createdBy: doc.createdBy,
modifiedBy: doc.modifiedBy,
modifiedOn: doc.modifiedOn,
createdOn: doc.createdOn,
_class: doc._class
createdOn: doc.createdOn ?? doc.modifiedOn,
_class: doc._class,
'%hash%': (doc as any)['%hash%'] ?? null
}
const remainingData: Partial<T> = {}
@ -326,8 +327,8 @@ export function inferType (val: any): string {
}
export function parseUpdate<T extends Doc> (
domain: string,
ops: DocumentUpdate<T> | MixinUpdate<Doc, T>
ops: DocumentUpdate<T> | MixinUpdate<Doc, T>,
fields: Set<string>
): {
extractedFields: Partial<T>
remainingData: Partial<T>
@ -339,14 +340,14 @@ export function parseUpdate<T extends Doc> (
const val = (ops as any)[key]
if (key.startsWith('$')) {
for (const k in val) {
if (getDocFieldsByDomains(domain).includes(k)) {
if (fields.has(k)) {
;(extractedFields as any)[k] = val[key]
} else {
;(remainingData as any)[k] = val[key]
}
}
} else {
if (getDocFieldsByDomains(domain).includes(key)) {
if (fields.has(key)) {
;(extractedFields as any)[key] = val
} else {
;(remainingData as any)[key] = val

View File

@ -17,7 +17,6 @@ import {
ApplyTxMiddleware,
BroadcastMiddleware,
ConfigurationMiddleware,
ConnectionMgrMiddleware,
ContextNameMiddleware,
DBAdapterInitMiddleware,
DBAdapterMiddleware,
@ -114,7 +113,6 @@ export function createServerPipeline (
SpacePermissionsMiddleware.create,
ConfigurationMiddleware.create,
ContextNameMiddleware.create,
ConnectionMgrMiddleware.create,
MarkDerivedEntryMiddleware.create,
ApplyTxMiddleware.create, // Extract apply
TxMiddleware.create, // Store tx into transaction domain

View File

@ -15,6 +15,7 @@
import core, {
AccountRole,
generateId,
TxFactory,
TxProcessor,
type Account,
@ -29,6 +30,7 @@ import core, {
type Ref,
type SearchOptions,
type SearchQuery,
type SessionData,
type Timestamp,
type Tx,
type TxCUD,
@ -49,6 +51,8 @@ import {
import { type Token } from '@hcengineering/server-token'
import { handleSend } from './utils'
const useReserveContext = (process.env.USE_RESERVE_CTX ?? 'true') === 'true'
/**
* @public
*/
@ -192,6 +196,10 @@ export class ClientSession implements Session {
this.current.tx++
this.includeSessionContext(ctx.ctx)
let cid = 'client_' + generateId()
ctx.ctx.id = cid
let onEnd = useReserveContext ? this._pipeline.context.adapterManager?.reserveContext?.(cid) : undefined
try {
const result = await this._pipeline.tx(ctx.ctx, [tx])
// Send result immideately
@ -199,6 +207,24 @@ export class ClientSession implements Session {
// We need to broadcast all collected transactions
await this._pipeline.handleBroadcast(ctx.ctx)
} finally {
onEnd?.()
}
// ok we could perform async requests if any
const asyncs = (ctx.ctx.contextData as SessionData).asyncRequests ?? []
if (asyncs.length > 0) {
cid = 'client_async_' + generateId()
ctx.ctx.id = cid
onEnd = useReserveContext ? this._pipeline.context.adapterManager?.reserveContext?.(cid) : undefined
try {
for (const r of (ctx.ctx.contextData as SessionData).asyncRequests ?? []) {
await r()
}
} finally {
onEnd?.()
}
}
}
broadcast (ctx: MeasureContext, socket: ConnectionSocket, tx: Tx[]): void {