mirror of
https://github.com/hcengineering/platform.git
synced 2024-11-22 21:50:34 +03:00
UBERF-5837 Enhance logging in collaborator (#4929)
Signed-off-by: Alexander Onnikov <Alexander.Onnikov@xored.com>
This commit is contained in:
parent
45475bb2dd
commit
d5b6942f7d
@ -35,18 +35,18 @@ export type withContext<T extends WithContext> = Omit<T, 'context'> & {
|
||||
}
|
||||
|
||||
export function buildContext (data: onAuthenticatePayload, controller: Controller): Context {
|
||||
const connectionId = generateId()
|
||||
const context = data.context as Partial<Context>
|
||||
|
||||
const connectionId = context.connectionId ?? generateId()
|
||||
const decodedToken = decodeToken(data.token)
|
||||
const initialContentId = data.requestParameters.get('initialContentId') as string
|
||||
const targetContentId = data.requestParameters.get('targetContentId') as string
|
||||
|
||||
const context: Context = {
|
||||
return {
|
||||
connectionId,
|
||||
workspaceId: decodedToken.workspace,
|
||||
clientFactory: getClientFactory(decodedToken, controller),
|
||||
initialContentId: initialContentId ?? '',
|
||||
targetContentId: targetContentId ?? ''
|
||||
}
|
||||
|
||||
return context
|
||||
}
|
||||
|
58
server/collaborator/src/extensions/authentication.ts
Normal file
58
server/collaborator/src/extensions/authentication.ts
Normal file
@ -0,0 +1,58 @@
|
||||
//
|
||||
// Copyright © 2023 Hardcore Engineering Inc.
|
||||
//
|
||||
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License. You may
|
||||
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
//
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import { isReadonlyDocVersion } from '@hcengineering/collaboration'
|
||||
import { MeasureContext } from '@hcengineering/core'
|
||||
import { Extension, onAuthenticatePayload } from '@hocuspocus/server'
|
||||
|
||||
import { getWorkspaceInfo } from '../account'
|
||||
import { Context, buildContext } from '../context'
|
||||
import { Controller } from '../platform'
|
||||
import { parseDocumentId } from '../storage/minio'
|
||||
|
||||
export interface AuthenticationConfiguration {
|
||||
ctx: MeasureContext
|
||||
controller: Controller
|
||||
}
|
||||
|
||||
export class AuthenticationExtension implements Extension {
|
||||
private readonly configuration: AuthenticationConfiguration
|
||||
|
||||
constructor (configuration: AuthenticationConfiguration) {
|
||||
this.configuration = configuration
|
||||
}
|
||||
|
||||
async onAuthenticate (data: onAuthenticatePayload): Promise<Context> {
|
||||
this.configuration.ctx.measure('authenticate', 1)
|
||||
|
||||
let documentName = data.documentName
|
||||
if (documentName.includes('://')) {
|
||||
documentName = documentName.split('://', 2)[1]
|
||||
}
|
||||
|
||||
const { workspaceUrl, versionId } = parseDocumentId(documentName)
|
||||
|
||||
// verify workspace can be accessed with the token
|
||||
const workspaceInfo = await getWorkspaceInfo(data.token)
|
||||
// verify workspace url in the document matches the token
|
||||
if (workspaceInfo.workspace !== workspaceUrl) {
|
||||
throw new Error('documentName must include workspace')
|
||||
}
|
||||
|
||||
data.connection.readOnly = isReadonlyDocVersion(versionId)
|
||||
|
||||
return buildContext(data, this.configuration.controller)
|
||||
}
|
||||
}
|
@ -19,6 +19,7 @@ import {
|
||||
Extension,
|
||||
afterUnloadDocumentPayload,
|
||||
onChangePayload,
|
||||
onConnectPayload,
|
||||
onDisconnectPayload,
|
||||
onLoadDocumentPayload,
|
||||
onStoreDocumentPayload
|
||||
@ -47,16 +48,18 @@ export class StorageExtension implements Extension {
|
||||
}
|
||||
|
||||
async onLoadDocument ({ context, documentName }: withContext<onLoadDocumentPayload>): Promise<any> {
|
||||
await this.configuration.ctx.info('load document', { documentId: documentName })
|
||||
return await this.configuration.ctx.with('load-document', {}, async () => {
|
||||
return await this.loadDocument(documentName, context)
|
||||
})
|
||||
}
|
||||
|
||||
async onStoreDocument ({ context, documentName, document }: withContext<onStoreDocumentPayload>): Promise<void> {
|
||||
const collaborators = this.collaborators.get(documentName)
|
||||
await this.configuration.ctx.info('store document', { documentId: documentName })
|
||||
|
||||
const collaborators = this.collaborators.get(documentName)
|
||||
if (collaborators === undefined || collaborators.size === 0) {
|
||||
console.log('no changes for document', documentName)
|
||||
await this.configuration.ctx.info('no changes for document', { documentId: documentName })
|
||||
return
|
||||
}
|
||||
|
||||
@ -66,12 +69,21 @@ export class StorageExtension implements Extension {
|
||||
})
|
||||
}
|
||||
|
||||
async onConnect ({ context, documentName, instance }: withContext<onConnectPayload>): Promise<any> {
|
||||
const connections = instance.documents.get(documentName)?.getConnectionsCount() ?? 0
|
||||
const params = { documentId: documentName, connectionId: context.connectionId, connections }
|
||||
await this.configuration.ctx.info('connect to document', params)
|
||||
}
|
||||
|
||||
async onDisconnect ({ context, documentName, document }: withContext<onDisconnectPayload>): Promise<any> {
|
||||
const { connectionId } = context
|
||||
const collaborators = this.collaborators.get(documentName)
|
||||
|
||||
const params = { documentId: documentName, connectionId, connections: document.getConnectionsCount() }
|
||||
await this.configuration.ctx.info('disconnect from document', params)
|
||||
|
||||
const collaborators = this.collaborators.get(documentName)
|
||||
if (collaborators === undefined || !collaborators.has(connectionId)) {
|
||||
console.log('no changes for document', documentName)
|
||||
await this.configuration.ctx.info('no changes for document', { documentId: documentName })
|
||||
return
|
||||
}
|
||||
|
||||
@ -82,25 +94,25 @@ export class StorageExtension implements Extension {
|
||||
}
|
||||
|
||||
async afterUnloadDocument ({ documentName }: afterUnloadDocumentPayload): Promise<any> {
|
||||
await this.configuration.ctx.info('unload document', { documentId: documentName })
|
||||
this.collaborators.delete(documentName)
|
||||
}
|
||||
|
||||
async loadDocument (documentId: string, context: Context): Promise<YDoc | undefined> {
|
||||
const { adapter } = this.configuration
|
||||
|
||||
console.log('load document', documentId)
|
||||
try {
|
||||
const ydoc = await adapter.loadDocument(documentId, context)
|
||||
if (ydoc !== undefined) {
|
||||
return ydoc
|
||||
}
|
||||
} catch (err) {
|
||||
console.error('failed to load document', documentId, err)
|
||||
await this.configuration.ctx.error('failed to load document', { documentId, error: err })
|
||||
}
|
||||
|
||||
const { initialContentId } = context
|
||||
if (initialContentId !== undefined && initialContentId.length > 0) {
|
||||
console.log('load document initial content', initialContentId)
|
||||
await this.configuration.ctx.info('load document initial content', { documentId, initialContentId })
|
||||
try {
|
||||
const ydoc = await adapter.loadDocument(initialContentId, context)
|
||||
|
||||
@ -112,7 +124,11 @@ export class StorageExtension implements Extension {
|
||||
|
||||
return ydoc
|
||||
} catch (err) {
|
||||
console.error('failed to load document', initialContentId, err)
|
||||
await this.configuration.ctx.error('failed to load document initial content', {
|
||||
documentId,
|
||||
initialContentId,
|
||||
error: err
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -120,20 +136,23 @@ export class StorageExtension implements Extension {
|
||||
async storeDocument (documentId: string, document: Document, context: Context): Promise<void> {
|
||||
const { adapter } = this.configuration
|
||||
|
||||
console.log('store document', documentId)
|
||||
try {
|
||||
await adapter.saveDocument(documentId, document, context)
|
||||
} catch (err) {
|
||||
console.error('failed to save document', documentId, err)
|
||||
await this.configuration.ctx.error('failed to save document', { documentId, error: err })
|
||||
}
|
||||
|
||||
const { targetContentId } = context
|
||||
if (targetContentId !== undefined && targetContentId.length > 0) {
|
||||
console.log('store document target content', targetContentId)
|
||||
await this.configuration.ctx.info('store document target content', { documentId, targetContentId })
|
||||
try {
|
||||
await adapter.saveDocument(targetContentId, document, context)
|
||||
} catch (err) {
|
||||
console.error('failed to save document', targetContentId, err)
|
||||
await this.configuration.ctx.error('failed to save document target content', {
|
||||
documentId,
|
||||
targetContentId,
|
||||
error: err
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ export async function removeDocument (
|
||||
try {
|
||||
await minio.remove(workspaceId, [minioDocumentId, historyDocumentId])
|
||||
} catch (err) {
|
||||
console.error(err)
|
||||
await ctx.error('failed to remove document', { documentId, error: err })
|
||||
}
|
||||
|
||||
return {}
|
||||
|
@ -13,12 +13,11 @@
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import { isReadonlyDocVersion } from '@hcengineering/collaboration'
|
||||
import { MeasureContext, generateId } from '@hcengineering/core'
|
||||
import { MinioService } from '@hcengineering/minio'
|
||||
import { Token, decodeToken } from '@hcengineering/server-token'
|
||||
import { ServerKit } from '@hcengineering/text'
|
||||
import { Hocuspocus, onAuthenticatePayload, onDestroyPayload } from '@hocuspocus/server'
|
||||
import { Hocuspocus, onDestroyPayload } from '@hocuspocus/server'
|
||||
import bp from 'body-parser'
|
||||
import compression from 'compression'
|
||||
import cors from 'cors'
|
||||
@ -27,17 +26,17 @@ import { IncomingMessage, createServer } from 'http'
|
||||
import { MongoClient } from 'mongodb'
|
||||
import { WebSocket, WebSocketServer } from 'ws'
|
||||
|
||||
import { getWorkspaceInfo } from './account'
|
||||
import { Config } from './config'
|
||||
import { Context, buildContext } from './context'
|
||||
import { HtmlTransformer } from './transformers/html'
|
||||
import { Context } from './context'
|
||||
import { AuthenticationExtension } from './extensions/authentication'
|
||||
import { StorageExtension } from './extensions/storage'
|
||||
import { Controller, getClientFactory } from './platform'
|
||||
import { MinioStorageAdapter, parseDocumentId } from './storage/minio'
|
||||
import { RpcErrorResponse, RpcRequest, RpcResponse, methods } from './rpc'
|
||||
import { MinioStorageAdapter } from './storage/minio'
|
||||
import { MongodbStorageAdapter } from './storage/mongodb'
|
||||
import { PlatformStorageAdapter } from './storage/platform'
|
||||
import { RouterStorageAdapter } from './storage/router'
|
||||
import { RpcErrorResponse, RpcRequest, RpcResponse, methods } from './rpc'
|
||||
import { HtmlTransformer } from './transformers/html'
|
||||
|
||||
/**
|
||||
* @public
|
||||
@ -54,7 +53,8 @@ export async function start (
|
||||
mongo: MongoClient
|
||||
): Promise<Shutdown> {
|
||||
const port = config.Port
|
||||
console.log(`starting server on :${port} ...`)
|
||||
|
||||
await ctx.info('Starting collaborator server', { port })
|
||||
|
||||
const app = express()
|
||||
app.use(cors())
|
||||
@ -125,6 +125,10 @@ export async function start (
|
||||
unloadImmediately: false,
|
||||
|
||||
extensions: [
|
||||
new AuthenticationExtension({
|
||||
ctx: extensionsCtx.newChild('authenticate', {}),
|
||||
controller
|
||||
}),
|
||||
new StorageExtension({
|
||||
ctx: extensionsCtx.newChild('storage', {}),
|
||||
adapter: new RouterStorageAdapter(
|
||||
@ -138,28 +142,6 @@ export async function start (
|
||||
})
|
||||
],
|
||||
|
||||
async onAuthenticate (data: onAuthenticatePayload): Promise<Context> {
|
||||
ctx.measure('authenticate', 1)
|
||||
|
||||
let documentName = data.documentName
|
||||
if (documentName.includes('://')) {
|
||||
documentName = documentName.split('://', 2)[1]
|
||||
}
|
||||
|
||||
const { workspaceUrl, versionId } = parseDocumentId(documentName)
|
||||
|
||||
// verify workspace can be accessed with the token
|
||||
const workspaceInfo = await getWorkspaceInfo(data.token)
|
||||
// verify workspace url in the document matches the token
|
||||
if (workspaceInfo.workspace !== workspaceUrl) {
|
||||
throw new Error('documentName must include workspace')
|
||||
}
|
||||
|
||||
data.connection.readOnly = isReadonlyDocVersion(versionId)
|
||||
|
||||
return buildContext(data, controller)
|
||||
},
|
||||
|
||||
async onDestroy (data: onDestroyPayload): Promise<void> {
|
||||
await controller.close()
|
||||
}
|
||||
@ -196,7 +178,7 @@ export async function start (
|
||||
}
|
||||
res.status(400).send(response)
|
||||
} else {
|
||||
await rpcCtx.with(request.method, {}, async (ctx) => {
|
||||
await rpcCtx.withLog('/rpc', { method: request.method }, async (ctx) => {
|
||||
try {
|
||||
const response: RpcResponse = await method(ctx, context, request.payload, { hocuspocus, minio, transformer })
|
||||
res.status(200).send(response)
|
||||
@ -227,7 +209,8 @@ export async function start (
|
||||
})
|
||||
|
||||
wss.on('connection', (incoming: WebSocket, request: IncomingMessage) => {
|
||||
hocuspocus.handleConnection(incoming, request)
|
||||
const context: Partial<Context> = { connectionId: generateId() }
|
||||
hocuspocus.handleConnection(incoming, request, context)
|
||||
})
|
||||
|
||||
const server = createServer(app)
|
||||
@ -239,7 +222,8 @@ export async function start (
|
||||
})
|
||||
|
||||
server.listen(port)
|
||||
console.log(`started server on :${port}`)
|
||||
|
||||
await ctx.info('Running collaborator server', { port })
|
||||
|
||||
return async () => {
|
||||
server.close()
|
||||
|
@ -50,14 +50,15 @@ export async function startCollaborator (): Promise<void> {
|
||||
void shutdown().then(() => {
|
||||
void mongoClient.close()
|
||||
})
|
||||
void metricsContext.info('closed')
|
||||
}
|
||||
|
||||
process.on('uncaughtException', (e) => {
|
||||
console.error(e)
|
||||
void metricsContext.error('UncaughtException', { error: e })
|
||||
})
|
||||
|
||||
process.on('unhandledRejection', (reason, promise) => {
|
||||
console.error('Unhandled Rejection at:', promise, 'reason:', reason)
|
||||
void metricsContext.error('Unhandled Rejection at:', { promise, reason })
|
||||
})
|
||||
|
||||
process.on('SIGINT', close)
|
||||
|
@ -42,8 +42,8 @@ export function parseDocumentId (documentId: string): MinioDocumentId {
|
||||
}
|
||||
}
|
||||
|
||||
function isValidDocumentId (documentId: MinioDocumentId): boolean {
|
||||
return documentId.workspaceUrl !== '' && documentId.minioDocumentId !== '' && documentId.versionId !== ''
|
||||
function isValidDocumentId (documentId: Omit<MinioDocumentId, 'workspaceUrl'>): boolean {
|
||||
return documentId.minioDocumentId !== '' && documentId.versionId !== ''
|
||||
}
|
||||
|
||||
export class MinioStorageAdapter implements StorageAdapter {
|
||||
@ -55,10 +55,10 @@ export class MinioStorageAdapter implements StorageAdapter {
|
||||
async loadDocument (documentId: string, context: Context): Promise<YDoc | undefined> {
|
||||
const { workspaceId } = context
|
||||
|
||||
const { workspaceUrl, minioDocumentId, versionId } = parseDocumentId(documentId)
|
||||
const { minioDocumentId, versionId } = parseDocumentId(documentId)
|
||||
|
||||
if (!isValidDocumentId({ workspaceUrl, minioDocumentId, versionId })) {
|
||||
console.warn('malformed document id', documentId)
|
||||
if (!isValidDocumentId({ minioDocumentId, versionId })) {
|
||||
await this.ctx.error('malformed document id', { documentId })
|
||||
return undefined
|
||||
}
|
||||
|
||||
@ -75,10 +75,10 @@ export class MinioStorageAdapter implements StorageAdapter {
|
||||
async saveDocument (documentId: string, document: YDoc, context: Context): Promise<void> {
|
||||
const { workspaceId } = context
|
||||
|
||||
const { workspaceUrl, minioDocumentId, versionId } = parseDocumentId(documentId)
|
||||
const { minioDocumentId, versionId } = parseDocumentId(documentId)
|
||||
|
||||
if (!isValidDocumentId({ workspaceUrl, minioDocumentId, versionId })) {
|
||||
console.warn('malformed document id', documentId)
|
||||
if (!isValidDocumentId({ minioDocumentId, versionId })) {
|
||||
await this.ctx.error('malformed document id', { documentId })
|
||||
return undefined
|
||||
}
|
||||
|
||||
|
@ -39,11 +39,8 @@ function parseDocumentId (documentId: string): MongodbDocumentId {
|
||||
}
|
||||
}
|
||||
|
||||
function isValidDocumentId (documentId: MongodbDocumentId, context: Context): boolean {
|
||||
return (
|
||||
documentId.objectDomain !== '' && documentId.objectId !== '' && documentId.objectAttr !== ''
|
||||
// && documentId.workspace === context.workspaceId.name
|
||||
)
|
||||
function isValidDocumentId (documentId: Omit<MongodbDocumentId, 'workspaceUrl'>, context: Context): boolean {
|
||||
return documentId.objectDomain !== '' && documentId.objectId !== '' && documentId.objectAttr !== ''
|
||||
}
|
||||
|
||||
export class MongodbStorageAdapter implements StorageAdapter {
|
||||
@ -54,10 +51,10 @@ export class MongodbStorageAdapter implements StorageAdapter {
|
||||
) {}
|
||||
|
||||
async loadDocument (documentId: string, context: Context): Promise<YDoc | undefined> {
|
||||
const { workspaceUrl, objectId, objectDomain, objectAttr } = parseDocumentId(documentId)
|
||||
const { objectId, objectDomain, objectAttr } = parseDocumentId(documentId)
|
||||
|
||||
if (!isValidDocumentId({ workspaceUrl, objectId, objectDomain, objectAttr }, context)) {
|
||||
console.warn('malformed document id', documentId)
|
||||
if (!isValidDocumentId({ objectId, objectDomain, objectAttr }, context)) {
|
||||
await this.ctx.error('malformed document id', { documentId })
|
||||
return undefined
|
||||
}
|
||||
|
||||
@ -77,7 +74,7 @@ export class MongodbStorageAdapter implements StorageAdapter {
|
||||
})
|
||||
}
|
||||
|
||||
async saveDocument (_documentId: string, _document: YDoc, _context: Context): Promise<void> {
|
||||
// do nothing, not supported
|
||||
async saveDocument (documentId: string, _document: YDoc, _context: Context): Promise<void> {
|
||||
await this.ctx.error('saving documents into mongodb not supported', { documentId })
|
||||
}
|
||||
}
|
||||
|
@ -39,11 +39,8 @@ function parseDocumentId (documentId: string): PlatformDocumentId {
|
||||
}
|
||||
}
|
||||
|
||||
function isValidDocumentId (documentId: PlatformDocumentId, context: Context): boolean {
|
||||
return (
|
||||
documentId.objectClass !== '' && documentId.objectId !== '' && documentId.objectAttr !== '' // &&
|
||||
// documentId.workspace === context.workspaceId.name
|
||||
)
|
||||
function isValidDocumentId (documentId: Omit<PlatformDocumentId, 'workspaceUrl'>, context: Context): boolean {
|
||||
return documentId.objectClass !== '' && documentId.objectId !== '' && documentId.objectAttr !== ''
|
||||
}
|
||||
|
||||
export class PlatformStorageAdapter implements StorageAdapter {
|
||||
@ -53,54 +50,16 @@ export class PlatformStorageAdapter implements StorageAdapter {
|
||||
) {}
|
||||
|
||||
async loadDocument (documentId: string, context: Context): Promise<YDoc | undefined> {
|
||||
console.warn('loading documents from the platform not supported', documentId)
|
||||
|
||||
const { clientFactory } = context
|
||||
const { workspaceUrl, objectId, objectClass, objectAttr } = parseDocumentId(documentId)
|
||||
|
||||
if (!isValidDocumentId({ workspaceUrl, objectId, objectClass, objectAttr }, context)) {
|
||||
console.warn('malformed document id', documentId)
|
||||
return undefined
|
||||
}
|
||||
|
||||
return await this.ctx.with('load-document', {}, async (ctx) => {
|
||||
let content = ''
|
||||
|
||||
const client = await ctx.with('connect', {}, async () => {
|
||||
return await clientFactory({ derived: false })
|
||||
})
|
||||
|
||||
const hierarchy = client.getHierarchy()
|
||||
const attribute = hierarchy.findAttribute(objectClass, objectAttr)
|
||||
if (attribute === undefined) {
|
||||
console.warn('invalid attribute', objectAttr)
|
||||
return undefined
|
||||
}
|
||||
|
||||
if (hierarchy.isDerived(attribute.type._class, core.class.TypeCollaborativeMarkup)) {
|
||||
console.warn('unsupported attribute type', attribute?.type._class)
|
||||
return undefined
|
||||
}
|
||||
|
||||
const doc = await ctx.with('query', {}, async () => {
|
||||
return await client.findOne(objectClass, { _id: objectId }, { projection: { [objectAttr]: 1 } })
|
||||
})
|
||||
if (doc !== undefined && objectAttr in doc) {
|
||||
content = (doc as any)[objectAttr] as string
|
||||
}
|
||||
|
||||
return await ctx.with('transform', {}, () => {
|
||||
return this.transformer.toYdoc(content, objectAttr)
|
||||
})
|
||||
})
|
||||
await this.ctx.error('loading documents from the platform not supported', { documentId })
|
||||
return undefined
|
||||
}
|
||||
|
||||
async saveDocument (documentId: string, document: YDoc, context: Context): Promise<void> {
|
||||
const { clientFactory } = context
|
||||
const { workspaceUrl, objectId, objectClass, objectAttr } = parseDocumentId(documentId)
|
||||
const { objectId, objectClass, objectAttr } = parseDocumentId(documentId)
|
||||
|
||||
if (!isValidDocumentId({ workspaceUrl, objectId, objectClass, objectAttr }, context)) {
|
||||
console.warn('malformed document id', documentId)
|
||||
if (!isValidDocumentId({ objectId, objectClass, objectAttr }, context)) {
|
||||
await this.ctx.error('malformed document id', { documentId })
|
||||
return undefined
|
||||
}
|
||||
|
||||
@ -111,7 +70,7 @@ export class PlatformStorageAdapter implements StorageAdapter {
|
||||
|
||||
const attribute = client.getHierarchy().findAttribute(objectClass, objectAttr)
|
||||
if (attribute === undefined) {
|
||||
console.warn('attribute not found', objectClass, objectAttr)
|
||||
await this.ctx.info('attribute not found', { documentId, objectClass, objectAttr })
|
||||
return
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user