EQMS-6844 Do not keep platform client open (#5539)

Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
Alexander Onnikov 2024-05-08 13:06:38 +07:00 committed by GitHub
parent bf80ba02e5
commit ceb1c95c7c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 73 additions and 73 deletions

View File

@ -17,7 +17,7 @@ import { type DocumentId, type PlatformDocumentId } from '@hcengineering/collabo
import { WorkspaceId, generateId } from '@hcengineering/core' import { WorkspaceId, generateId } from '@hcengineering/core'
import { decodeToken } from '@hcengineering/server-token' import { decodeToken } from '@hcengineering/server-token'
import { onAuthenticatePayload } from '@hocuspocus/server' import { onAuthenticatePayload } from '@hocuspocus/server'
import { ClientFactory, Controller, getClientFactory } from './platform' import { ClientFactory, simpleClientFactory } from './platform'
export interface Context { export interface Context {
connectionId: string connectionId: string
@ -36,7 +36,7 @@ export type withContext<T extends WithContext> = Omit<T, 'context'> & {
context: Context context: Context
} }
export function buildContext (data: onAuthenticatePayload, controller: Controller): Context { export function buildContext (data: onAuthenticatePayload): Context {
const context = data.context as Partial<Context> const context = data.context as Partial<Context>
const connectionId = context.connectionId ?? generateId() const connectionId = context.connectionId ?? generateId()
@ -48,7 +48,7 @@ export function buildContext (data: onAuthenticatePayload, controller: Controlle
return { return {
connectionId, connectionId,
workspaceId: decodedToken.workspace, workspaceId: decodedToken.workspace,
clientFactory: getClientFactory(decodedToken, controller), clientFactory: simpleClientFactory(decodedToken),
initialContentId, initialContentId,
platformDocumentId platformDocumentId
} }

View File

@ -20,11 +20,9 @@ import { Extension, onAuthenticatePayload } from '@hocuspocus/server'
import { getWorkspaceInfo } from '../account' import { getWorkspaceInfo } from '../account'
import { Context, buildContext } from '../context' import { Context, buildContext } from '../context'
import { Controller } from '../platform'
export interface AuthenticationConfiguration { export interface AuthenticationConfiguration {
ctx: MeasureContext ctx: MeasureContext
controller: Controller
} }
export class AuthenticationExtension implements Extension { export class AuthenticationExtension implements Extension {
@ -35,10 +33,10 @@ export class AuthenticationExtension implements Extension {
} }
async onAuthenticate (data: onAuthenticatePayload): Promise<Context> { async onAuthenticate (data: onAuthenticatePayload): Promise<Context> {
this.configuration.ctx.measure('authenticate', 1) const ctx = this.configuration.ctx
const { workspaceUrl, collaborativeDoc } = parseDocumentId(data.documentName as DocumentId) const { workspaceUrl, collaborativeDoc } = parseDocumentId(data.documentName as DocumentId)
return await ctx.with('authenticate', { workspace: workspaceUrl }, async () => {
// verify workspace can be accessed with the token // verify workspace can be accessed with the token
const workspaceInfo = await getWorkspaceInfo(data.token) const workspaceInfo = await getWorkspaceInfo(data.token)
@ -49,6 +47,7 @@ export class AuthenticationExtension implements Extension {
data.connection.readOnly = isReadonlyDoc(collaborativeDoc) data.connection.readOnly = isReadonlyDoc(collaborativeDoc)
return buildContext(data, this.configuration.controller) return buildContext(data)
})
} }
} }

View File

@ -15,7 +15,7 @@
import client from '@hcengineering/client' import client from '@hcengineering/client'
import clientResources from '@hcengineering/client-resources' import clientResources from '@hcengineering/client-resources'
import core, { Client, Tx, TxOperations, WorkspaceId, systemAccountEmail, toWorkspaceString } from '@hcengineering/core' import core, { Client, TxOperations, WorkspaceId, systemAccountEmail, toWorkspaceString } from '@hcengineering/core'
import { setMetadata } from '@hcengineering/platform' import { setMetadata } from '@hcengineering/platform'
import { Token, generateToken } from '@hcengineering/server-token' import { Token, generateToken } from '@hcengineering/server-token'
import config from './config' import config from './config'
@ -52,13 +52,25 @@ export interface ClientFactoryParams {
/** /**
* @public * @public
*/ */
export type ClientFactory = (params: ClientFactoryParams) => Promise<TxOperations> export type ClientFactory = (params?: ClientFactoryParams) => Promise<TxOperations>
/** /**
* @public * @public
*/ */
export function getClientFactory (token: Token, controller: Controller): ClientFactory { export function simpleClientFactory (token: Token): ClientFactory {
return async ({ derived }: ClientFactoryParams) => { return async (params?: ClientFactoryParams) => {
const derived = params?.derived ?? false
const client = await connect(generateToken(token.email, token.workspace))
return await getTxOperations(client, token, derived)
}
}
/**
* @public
*/
export function reusableClientFactory (token: Token, controller: Controller): ClientFactory {
return async (params?: ClientFactoryParams) => {
const derived = params?.derived ?? false
const workspaceClient = await controller.get(token.workspace) const workspaceClient = await controller.get(token.workspace)
return await getTxOperations(workspaceClient.client, token, derived) return await getTxOperations(workspaceClient.client, token, derived)
} }
@ -94,16 +106,10 @@ export class Controller {
* @public * @public
*/ */
export class WorkspaceClient { export class WorkspaceClient {
private readonly txHandlers: ((...tx: Tx[]) => Promise<void>)[] = []
private constructor ( private constructor (
readonly workspace: WorkspaceId, readonly workspace: WorkspaceId,
readonly client: Client readonly client: Client
) { ) {}
this.client.notify = (...tx: Tx[]) => {
void this.txHandler(...tx)
}
}
static async create (workspace: WorkspaceId): Promise<WorkspaceClient> { static async create (workspace: WorkspaceId): Promise<WorkspaceClient> {
const token = generateToken(systemAccountEmail, workspace) const token = generateToken(systemAccountEmail, workspace)
@ -114,10 +120,4 @@ export class WorkspaceClient {
async close (): Promise<void> { async close (): Promise<void> {
await this.client.close() await this.client.close()
} }
private async txHandler (...tx: Tx[]): Promise<void> {
for (const h of this.txHandlers) {
await h(...tx)
}
}
} }

View File

@ -18,7 +18,7 @@ import { MeasureContext, generateId, metricsAggregate } from '@hcengineering/cor
import { MinioService } from '@hcengineering/minio' import { MinioService } from '@hcengineering/minio'
import { Token, decodeToken } from '@hcengineering/server-token' import { Token, decodeToken } from '@hcengineering/server-token'
import { ServerKit } from '@hcengineering/text' import { ServerKit } from '@hcengineering/text'
import { Hocuspocus, onDestroyPayload } from '@hocuspocus/server' import { Hocuspocus } from '@hocuspocus/server'
import bp from 'body-parser' import bp from 'body-parser'
import compression from 'compression' import compression from 'compression'
import cors from 'cors' import cors from 'cors'
@ -31,7 +31,7 @@ import { Config } from './config'
import { Context } from './context' import { Context } from './context'
import { AuthenticationExtension } from './extensions/authentication' import { AuthenticationExtension } from './extensions/authentication'
import { StorageExtension } from './extensions/storage' import { StorageExtension } from './extensions/storage'
import { Controller, getClientFactory } from './platform' import { simpleClientFactory } from './platform'
import { RpcErrorResponse, RpcRequest, RpcResponse, methods } from './rpc' import { RpcErrorResponse, RpcRequest, RpcResponse, methods } from './rpc'
import { PlatformStorageAdapter } from './storage/platform' import { PlatformStorageAdapter } from './storage/platform'
import { MarkupTransformer } from './transformers/markup' import { MarkupTransformer } from './transformers/markup'
@ -83,8 +83,6 @@ export async function start (
const extensionsCtx = ctx.newChild('extensions', {}) const extensionsCtx = ctx.newChild('extensions', {})
const controller = new Controller()
const transformer = new MarkupTransformer(extensions) const transformer = new MarkupTransformer(extensions)
const hocuspocus = new Hocuspocus({ const hocuspocus = new Hocuspocus({
@ -124,18 +122,13 @@ export async function start (
extensions: [ extensions: [
new AuthenticationExtension({ new AuthenticationExtension({
ctx: extensionsCtx.newChild('authenticate', {}), ctx: extensionsCtx.newChild('authenticate', {})
controller
}), }),
new StorageExtension({ new StorageExtension({
ctx: extensionsCtx.newChild('storage', {}), ctx: extensionsCtx.newChild('storage', {}),
adapter: new PlatformStorageAdapter({ minio }, mongo, transformer) adapter: new PlatformStorageAdapter({ minio }, mongo, transformer)
}) })
], ]
async onDestroy (data: onDestroyPayload): Promise<void> {
await controller.close()
}
}) })
const rpcCtx = ctx.newChild('rpc', {}) const rpcCtx = ctx.newChild('rpc', {})
@ -144,7 +137,7 @@ export async function start (
return { return {
connectionId: generateId(), connectionId: generateId(),
workspaceId: token.workspace, workspaceId: token.workspace,
clientFactory: getClientFactory(token, controller) clientFactory: simpleClientFactory(token)
} }
} }
@ -192,9 +185,11 @@ export async function start (
} }
res.status(400).send(response) res.status(400).send(response)
} else { } else {
await rpcCtx.withLog('/rpc', { method: request.method }, async (ctx) => { await rpcCtx.with('/rpc', { method: request.method }, async (ctx) => {
try { try {
const response: RpcResponse = await method(ctx, context, request.payload, { hocuspocus, minio, transformer }) const response: RpcResponse = await rpcCtx.with(request.method, {}, async (ctx) => {
return await method(ctx, context, request.payload, { hocuspocus, minio, transformer })
})
res.status(200).send(response) res.status(200).send(response)
} catch (err: any) { } catch (err: any) {
res.status(500).send({ error: err.message }) res.status(500).send({ error: err.message })

View File

@ -29,6 +29,7 @@ import core, {
CollaborativeDoc, CollaborativeDoc,
Doc, Doc,
MeasureContext, MeasureContext,
TxOperations,
collaborativeDocWithLastVersion, collaborativeDocWithLastVersion,
toWorkspaceString toWorkspaceString
} from '@hcengineering/core' } from '@hcengineering/core'
@ -87,7 +88,7 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
const { platformDocumentId } = context const { platformDocumentId } = context
if (platformDocumentId !== undefined) { if (platformDocumentId !== undefined) {
ctx.info('load document platform content', { documentId, platformDocumentId }) ctx.info('load document platform content', { documentId, platformDocumentId })
const ydoc = await ctx.with('load-document', { storage: 'platform' }, async (ctx) => { const ydoc = await ctx.with('load-from-platform', {}, async (ctx) => {
try { try {
return await this.loadDocumentFromPlatform(ctx, platformDocumentId, context) return await this.loadDocumentFromPlatform(ctx, platformDocumentId, context)
} catch (err) { } catch (err) {
@ -112,10 +113,17 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
} }
async saveDocument (ctx: MeasureContext, documentId: DocumentId, document: YDoc, context: Context): Promise<void> { async saveDocument (ctx: MeasureContext, documentId: DocumentId, document: YDoc, context: Context): Promise<void> {
const { clientFactory } = context
const client = await ctx.with('connect', {}, async () => {
return await clientFactory()
})
try {
let snapshot: YDocVersion | undefined let snapshot: YDocVersion | undefined
try { try {
ctx.info('take document snapshot', { documentId }) ctx.info('take document snapshot', { documentId })
snapshot = await this.takeSnapshot(ctx, documentId, document, context) snapshot = await this.takeSnapshot(ctx, client, documentId, document, context)
} catch (err) { } catch (err) {
ctx.error('failed to take document snapshot', { documentId, error: err }) ctx.error('failed to take document snapshot', { documentId, error: err })
} }
@ -130,10 +138,13 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
const { platformDocumentId } = context const { platformDocumentId } = context
if (platformDocumentId !== undefined) { if (platformDocumentId !== undefined) {
ctx.info('save document content to platform', { documentId, platformDocumentId }) ctx.info('save document content to platform', { documentId, platformDocumentId })
await ctx.with('save-document', { storage: 'platform' }, async (ctx) => { await ctx.with('save-to-platform', {}, async (ctx) => {
await this.saveDocumentToPlatform(ctx, documentId, platformDocumentId, document, snapshot, context) await this.saveDocumentToPlatform(ctx, client, documentId, platformDocumentId, document, snapshot, context)
}) })
} }
} finally {
await client.close()
}
} }
getStorageAdapter (storage: string): StorageAdapter { getStorageAdapter (storage: string): StorageAdapter {
@ -180,6 +191,7 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
async takeSnapshot ( async takeSnapshot (
ctx: MeasureContext, ctx: MeasureContext,
client: Omit<TxOperations, 'close'>,
documentId: DocumentId, documentId: DocumentId,
document: YDoc, document: YDoc,
context: Context context: Context
@ -187,9 +199,8 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
const { storage, collaborativeDoc } = parseDocumentId(documentId) const { storage, collaborativeDoc } = parseDocumentId(documentId)
const adapter = this.getStorageAdapter(storage) const adapter = this.getStorageAdapter(storage)
const { clientFactory, workspaceId } = context const { workspaceId } = context
const client = await clientFactory({ derived: false })
const timestamp = Date.now() const timestamp = Date.now()
const yDocVersion: YDocVersion = { const yDocVersion: YDocVersion = {
@ -233,6 +244,7 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
async saveDocumentToPlatform ( async saveDocumentToPlatform (
ctx: MeasureContext, ctx: MeasureContext,
client: Omit<TxOperations, 'close'>,
documentName: string, documentName: string,
platformDocumentId: PlatformDocumentId, platformDocumentId: PlatformDocumentId,
document: YDoc, document: YDoc,
@ -241,12 +253,6 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
): Promise<void> { ): Promise<void> {
const { objectClass, objectId, objectAttr } = parsePlatformDocumentId(platformDocumentId) const { objectClass, objectId, objectAttr } = parsePlatformDocumentId(platformDocumentId)
const { clientFactory } = context
const client = await ctx.with('connect', {}, async () => {
return await clientFactory({ derived: false })
})
const attribute = client.getHierarchy().findAttribute(objectClass, objectAttr) const attribute = client.getHierarchy().findAttribute(objectClass, objectAttr)
if (attribute === undefined) { if (attribute === undefined) {
ctx.info('attribute not found', { documentName, objectClass, objectAttr }) ctx.info('attribute not found', { documentName, objectClass, objectAttr })