From 6784168b664361d9a38ab79b4f868681fab17ac5 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Tue, 10 Sep 2024 12:44:10 +0700 Subject: [PATCH] UBERF-8052: Allow easy profiling of transactor (#6502) Signed-off-by: Andrey Sobolev --- .github/workflows/main.yml | 18 ++- .gitignore | 1 + dev/tool/src/__start.ts | 2 - dev/tool/src/index.ts | 11 +- packages/core/src/server.ts | 20 --- packages/text/src/markup/utils.ts | 7 +- .../components/ServerManagerGeneral.svelte | 54 ++++++- pods/server/src/__start.ts | 7 +- pods/server/src/inspector.ts | 38 +++++ pods/server/src/server.ts | 8 +- server/ws/src/blobs.ts | 8 +- server/ws/src/client.ts | 8 +- server/ws/src/server.ts | 51 ++++--- server/ws/src/server_http.ts | 103 +++++++++---- server/ws/src/server_u.ts | 140 +++++++++++------- server/ws/src/types.ts | 7 +- server/ws/src/utils.ts | 8 +- tests/profile-download.sh | 5 + tests/profile-generate.sh | 8 + tests/profile-start.sh | 3 + .../sanity/tests/tracker/attachments.spec.ts | 13 +- 21 files changed, 360 insertions(+), 160 deletions(-) create mode 100644 pods/server/src/inspector.ts create mode 100755 tests/profile-download.sh create mode 100755 tests/profile-generate.sh create mode 100755 tests/profile-start.sh diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index dbd4342226..cd938ddeca 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -233,10 +233,26 @@ jobs: run: | cd ./tests/sanity node ../../common/scripts/install-run-rushx.js ci + - name: Start profiling + run: | + cd ./tests + ./profile-start.sh - name: Run UI tests run: | - cd ./tests/sanity + cd ./tests/sanity node ../../common/scripts/install-run-rushx.js uitest + - name: Download profile + run: | + cd ./tests + ./profile-download.sh + npm install -g cpupro + ./profile-generate.sh + - name: Upload profiling results + if: always() + uses: actions/upload-artifact@v4 + with: + name: profiling + path: ./tests/profiles - name: 'Store docker logs' if: always() run: | diff --git a/.gitignore b/.gitignore index 5d39fa5430..94226a14be 100644 --- a/.gitignore +++ b/.gitignore @@ -100,3 +100,4 @@ services/github/pod-github/src/github.graphql dev/tool/report.csv bundle/* bundle.js.map +tests/profiles \ No newline at end of file diff --git a/dev/tool/src/__start.ts b/dev/tool/src/__start.ts index c3a43ea79c..ab0e6a74bc 100644 --- a/dev/tool/src/__start.ts +++ b/dev/tool/src/__start.ts @@ -82,6 +82,4 @@ function prepareTools (): { return { ...prepareToolsRaw(builder(enabled, disabled).getTxes()), version: getModelVersion(), migrateOperations } } -console.log(`tools git_version: ${process.env.GIT_REVISION ?? ''} model_version: ${process.env.MODEL_VERSION ?? ''}`) - devTool(prepareTools) diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 30a53c1296..c2c345f089 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -191,6 +191,12 @@ export function devTool ( program.version('0.0.1') + program.command('version').action(() => { + console.log( + `tools git_version: ${process.env.GIT_REVISION ?? ''} model_version: ${process.env.MODEL_VERSION ?? ''}` + ) + }) + // create-account john.appleseed@gmail.com --password 123 --workspace workspace --fullname "John Appleseed" program .command('create-account ') @@ -1019,8 +1025,9 @@ export function devTool ( program .command('generate-token ') .description('generate token') - .action(async (name: string, workspace: string) => { - console.log(generateToken(name, getWorkspaceId(workspace))) + .option('--admin', 'Generate token with admin access', false) + .action(async (name: string, workspace: string, opt: { admin: boolean }) => { + console.log(generateToken(name, getWorkspaceId(workspace), { ...(opt.admin ? { admin: 'true' } : {}) })) }) program .command('decode-token ') diff --git a/packages/core/src/server.ts b/packages/core/src/server.ts index 1d09a2bd81..24a200ec23 100644 --- a/packages/core/src/server.ts +++ b/packages/core/src/server.ts @@ -56,26 +56,6 @@ export interface SessionData { branding: Branding | null } -export interface ContextData { - derived: { - txes: Tx[] - targets: BroadcastTargets // A set of broadcast filters if required - } - contextCache: Map - removedMap: Map, Doc> - - userEmail: string - sessionId: string - admin?: boolean - - account: Account - - getAccount: (account: Ref) => Account | undefined - - workspace: WorkspaceIdWithUrl - branding: Branding | null -} - /** * @public */ diff --git a/packages/text/src/markup/utils.ts b/packages/text/src/markup/utils.ts index 8edcf75c20..59a3eb0cdb 100644 --- a/packages/text/src/markup/utils.ts +++ b/packages/text/src/markup/utils.ts @@ -25,6 +25,7 @@ import { MarkupMark, MarkupNode, MarkupNodeType, emptyMarkupNode } from './model /** @public */ export const EmptyMarkup: Markup = jsonToMarkup(emptyMarkupNode()) +const defaultSchema = getSchema(defaultExtensions) /** @public */ export function getMarkup (editor?: Editor): Markup { @@ -157,7 +158,7 @@ export function markupToJSON (markup: Markup): MarkupNode { /** @public */ export function jsonToPmNode (json: MarkupNode, schema?: Schema, extensions?: Extensions): ProseMirrorNode { - schema ??= getSchema(extensions ?? defaultExtensions) + schema ??= extensions == null ? defaultSchema : getSchema(extensions ?? defaultExtensions) return ProseMirrorNode.fromJSON(schema, json) } @@ -210,7 +211,7 @@ export function jsonToHTML (json: MarkupNode, extensions?: Extensions): string { /** @public */ export function htmlToPmNode (html: string, schema?: Schema, extensions?: Extensions): ProseMirrorNode { - schema ??= getSchema(extensions ?? defaultExtensions) + schema ??= extensions == null ? defaultSchema : getSchema(extensions ?? defaultExtensions) const json = htmlToJSON(html, extensions) return ProseMirrorNode.fromJSON(schema, json) } @@ -226,8 +227,6 @@ export function pmNodeToHTML (node: ProseMirrorNode, extensions?: Extensions): s const ELLIPSIS_CHAR = 'โ€ฆ' const WHITESPACE = ' ' -const defaultSchema = getSchema(defaultExtensions) - /** @public */ export function stripTags (markup: Markup, textLimit = 0, extensions: Extensions | undefined = undefined): string { const schema = extensions === undefined ? defaultSchema : getSchema(extensions) diff --git a/plugins/workbench-resources/src/components/ServerManagerGeneral.svelte b/plugins/workbench-resources/src/components/ServerManagerGeneral.svelte index 6e2f40b2c3..f0926d679d 100644 --- a/plugins/workbench-resources/src/components/ServerManagerGeneral.svelte +++ b/plugins/workbench-resources/src/components/ServerManagerGeneral.svelte @@ -3,7 +3,7 @@ import login from '@hcengineering/login' import { getEmbeddedLabel, getMetadata } from '@hcengineering/platform' import presentation, { getClient, isAdminUser } from '@hcengineering/presentation' - import { Button, IconArrowLeft, IconArrowRight, fetchMetadataLocalStorage } from '@hcengineering/ui' + import { Button, IconArrowLeft, IconArrowRight, fetchMetadataLocalStorage, ticker } from '@hcengineering/ui' import EditBox from '@hcengineering/ui/src/components/EditBox.svelte' const _endpoint: string = fetchMetadataLocalStorage(login.metadata.LoginEndpoint) ?? '' @@ -34,6 +34,22 @@ let responseSize = 0 + let profiling = false + + async function fetchStats (time: number): Promise { + await fetch(endpoint + `/api/v1/profiling?token=${token}`, {}) + .then(async (json) => { + data = await json.json() + profiling = data?.profiling ?? false + }) + .catch((err) => { + console.error(err) + }) + } + let data: any + + $: void fetchStats($ticker) + function genData (dataSize: number): string { let result = '' const characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' @@ -97,6 +113,26 @@ running = false clearInterval(int) } + + async function downloadProfile (): Promise { + const link = document.createElement('a') + link.style.display = 'none' + link.setAttribute('target', '_blank') + const json = await ( + await fetch(endpoint + `/api/v1/manage?token=${token}&operation=profile-stop`, { + method: 'PUT' + }) + ).json() + link.setAttribute( + 'href', + 'data:application/json;charset=utf-8,%EF%BB%BF' + encodeURIComponent(JSON.stringify(json)) + ) + link.setAttribute('download', `profile-${Date.now()}.cpuprofile`) + document.body.appendChild(link) + link.click() + document.body.removeChild(link) + fetchStats(0) + } {#if isAdminUser()} @@ -176,6 +212,22 @@ }} /> +
+
3.
+ {#if !profiling} +
{/if} diff --git a/pods/server/src/__start.ts b/pods/server/src/__start.ts index ba2027d978..5d1ba26f9f 100644 --- a/pods/server/src/__start.ts +++ b/pods/server/src/__start.ts @@ -19,6 +19,7 @@ import serverTelegram from '@hcengineering/server-telegram' import serverAiBot from '@hcengineering/server-ai-bot' import { join } from 'path' import { start } from '.' +import { profileStart, profileStop } from './inspector' const serverFactory = serverFactories[(process.env.SERVER_PROVIDER as string) ?? 'ws'] ?? serverFactories.ws configureAnalytics(process.env.SENTRY_DSN, {}) @@ -68,7 +69,11 @@ const shutdown = start(config.url, { indexProcessing: 500, brandingMap: loadBrandingMap(config.brandingPath), accountsUrl: config.accountsUrl, - enableCompression: config.enableCompression + enableCompression: config.enableCompression, + profiling: { + start: profileStart, + stop: profileStop + } }) const close = (): void => { diff --git a/pods/server/src/inspector.ts b/pods/server/src/inspector.ts new file mode 100644 index 0000000000..2a65ea4736 --- /dev/null +++ b/pods/server/src/inspector.ts @@ -0,0 +1,38 @@ +import { PlatformError, unknownError } from '@hcengineering/platform' +import * as inspector from 'node:inspector' + +let session: inspector.Session | undefined +export function profileStart (): void { + try { + session = new inspector.Session() + session.connect() + + session.post('Profiler.enable') + session.post('Profiler.start') + } catch (err) { + console.error(err) + } +} + +export async function profileStop (): Promise { + return await new Promise((resolve, reject) => { + if (session == null) { + reject(new PlatformError(unknownError('no session started'))) + return + } + try { + session.post('Profiler.stop', (err, profile) => { + if (err != null) { + reject(err) + } else { + const json = JSON.stringify(profile.profile) + session?.disconnect() + session = undefined + resolve(json) + } + }) + } catch (err) { + reject(err) + } + }) +} diff --git a/pods/server/src/server.ts b/pods/server/src/server.ts index 94441f7d49..4ea0b91771 100644 --- a/pods/server/src/server.ts +++ b/pods/server/src/server.ts @@ -46,6 +46,11 @@ export function start ( enableCompression?: boolean accountsUrl: string + + profiling?: { + start: () => void + stop: () => Promise + } } ): () => Promise { const metrics = getMetricsContext() @@ -88,7 +93,8 @@ export function start ( serverFactory: opt.serverFactory, enableCompression: opt.enableCompression, accountsUrl: opt.accountsUrl, - externalStorage + externalStorage, + profiling: opt.profiling }) return async () => { await externalStorage.close() diff --git a/server/ws/src/blobs.ts b/server/ws/src/blobs.ts index 57138e7cac..a7d6d08b7d 100644 --- a/server/ws/src/blobs.ts +++ b/server/ws/src/blobs.ts @@ -21,7 +21,7 @@ export async function getFile ( file: string, res: BlobResponse ): Promise { - const stat = await ctx.with('stat', {}, async () => await client.stat(ctx, workspace, file)) + const stat = await ctx.with('stat', {}, () => client.stat(ctx, workspace, file)) if (stat === undefined) { ctx.error('No such key', { file }) res.cork(() => { @@ -36,7 +36,7 @@ export async function getFile ( { contentType: stat.contentType }, async (ctx) => { try { - const dataStream = await ctx.with('readable', {}, async () => await client.get(ctx, workspace, file)) + const dataStream = await ctx.with('readable', {}, () => client.get(ctx, workspace, file)) await new Promise((resolve, reject) => { res.cork(() => { res.writeHead(200, { @@ -99,7 +99,7 @@ export async function getFileRange ( uuid: string, res: BlobResponse ): Promise { - const stat = await ctx.with('stats', {}, async () => await client.stat(ctx, workspace, uuid)) + const stat = await ctx.with('stats', {}, () => client.stat(ctx, workspace, uuid)) if (stat === undefined) { ctx.error('No such key', { file: uuid }) res.cork(() => { @@ -133,7 +133,7 @@ export async function getFileRange ( const dataStream = await ctx.with( 'partial', {}, - async () => await client.partial(ctx, workspace, uuid, start, end - start + 1), + () => client.partial(ctx, workspace, uuid, start, end - start + 1), {} ) await new Promise((resolve, reject) => { diff --git a/server/ws/src/client.ts b/server/ws/src/client.ts index 889749a3d7..3aae5f0811 100644 --- a/server/ws/src/client.ts +++ b/server/ws/src/client.ts @@ -108,11 +108,7 @@ export class ClientSession implements Session { this._pipeline.context.modelDb ) ctx.ctx.contextData = contextData - const result = await ctx.ctx.with( - 'load-model', - {}, - async () => await this._pipeline.loadModel(ctx.ctx, lastModelTx, hash) - ) + const result = await ctx.ctx.with('load-model', {}, () => this._pipeline.loadModel(ctx.ctx, lastModelTx, hash)) await ctx.sendResponse(result) } @@ -265,7 +261,7 @@ export class ClientSession implements Session { } const bevent = createBroadcastEvent(Array.from(classes)) this.broadcastTx = [] - await socket.send( + socket.send( ctx, { result: [bevent] diff --git a/server/ws/src/server.ts b/server/ws/src/server.ts index 70b18a30af..3f26cffe14 100644 --- a/server/ws/src/server.ts +++ b/server/ws/src/server.ts @@ -18,10 +18,10 @@ import core, { TxFactory, WorkspaceEvent, generateId, + isWorkspaceCreating, systemAccountEmail, toWorkspaceString, versionToString, - isWorkspaceCreating, withContext, type BaseWorkspaceInfo, type Branding, @@ -105,7 +105,11 @@ class TSessionManager implements SessionManager { branding: Branding | null ) => Session, readonly timeouts: Timeouts, - readonly brandingMap: BrandingMap + readonly brandingMap: BrandingMap, + readonly profiling?: { + start: () => void + stop: () => Promise + } ) { this.checkInterval = setInterval(() => { this.handleInterval() @@ -188,12 +192,7 @@ class TSessionManager implements SessionManager { continue } if (diff > 20000 && diff < 60000 && this.ticks % 10 === 0) { - void s[1].socket.send( - workspace.context, - { result: 'ping' }, - s[1].session.binaryMode, - s[1].session.useCompression - ) + s[1].socket.send(workspace.context, { result: 'ping' }, s[1].session.binaryMode, s[1].session.useCompression) } for (const r of s[1].session.requests.values()) { @@ -376,7 +375,7 @@ class TSessionManager implements SessionManager { workspace: workspaceInfo.workspaceId, wsUrl: workspaceInfo.workspaceUrl }) - pipeline = await ctx.with('๐Ÿ’ค wait', { workspaceName }, async () => await (workspace as Workspace).pipeline) + pipeline = await ctx.with('๐Ÿ’ค wait', { workspaceName }, () => (workspace as Workspace).pipeline) } else { ctx.warn('reconnect workspace in upgrade switch', { email: token.email, @@ -406,7 +405,7 @@ class TSessionManager implements SessionManager { }) return { upgrade: true } } - pipeline = await ctx.with('๐Ÿ’ค wait', { workspaceName }, async () => await (workspace as Workspace).pipeline) + pipeline = await ctx.with('๐Ÿ’ค wait', { workspaceName }, () => (workspace as Workspace).pipeline) } const session = this.createSession( @@ -431,7 +430,7 @@ class TSessionManager implements SessionManager { void ctx.with('set-status', {}, (ctx) => this.trySetStatus(ctx, session, true, _workspace.workspaceId)) if (this.timeMinutes > 0) { - void ws.send(ctx, { result: this.createMaintenanceWarning() }, session.binaryMode, session.useCompression) + ws.send(ctx, { result: this.createMaintenanceWarning() }, session.binaryMode, session.useCompression) } return { session, context: workspace.context, workspaceId: wsString } } @@ -738,7 +737,7 @@ class TSessionManager implements SessionManager { const sessions = Array.from(workspace.sessions) workspace.sessions = new Map() - const closeS = async (s: Session, webSocket: ConnectionSocket): Promise => { + const closeS = (s: Session, webSocket: ConnectionSocket): void => { s.workspaceClosed = true if (reason === 'upgrade' || reason === 'force-close') { // Override message handler, to wait for upgrading response from clients. @@ -754,9 +753,12 @@ class TSessionManager implements SessionManager { wsName: workspace.workspaceName }) } - await Promise.all( - sessions.filter((it) => it[1].socket.id !== ignoreSocket?.id).map((s) => closeS(s[1].session, s[1].socket)) - ) + + sessions + .filter((it) => it[1].socket.id !== ignoreSocket?.id) + .forEach((s) => { + closeS(s[1].session, s[1].socket) + }) const closePipeline = async (): Promise => { try { @@ -779,7 +781,7 @@ class TSessionManager implements SessionManager { } private sendUpgrade (ctx: MeasureContext, webSocket: ConnectionSocket, binary: boolean): void { - void webSocket.send( + webSocket.send( ctx, { result: { @@ -868,7 +870,7 @@ class TSessionManager implements SessionManager { } const wsRef = this.workspaces.get(workspace) if (wsRef === undefined) { - await ws.send( + ws.send( ctx, { id: request.id, @@ -891,7 +893,7 @@ class TSessionManager implements SessionManager { id: request.id, result: done } - await ws.send(ctx, forceCloseResponse, service.binaryMode, service.useCompression) + ws.send(ctx, forceCloseResponse, service.binaryMode, service.useCompression) return } if (request.id === -1 && request.method === 'hello') { @@ -922,7 +924,7 @@ class TSessionManager implements SessionManager { binary: service.binaryMode, reconnect } - await ws.send(requestCtx, helloResponse, false, false) + ws.send(requestCtx, helloResponse, false, false) return } const opContext = (ctx: MeasureContext): ClientSessionCtx => ({ @@ -963,13 +965,13 @@ class TSessionManager implements SessionManager { try { const params = [...request.params] - await ctx.with('๐Ÿงจ process', {}, async (callTx) => f.apply(service, [opContext(callTx), ...params])) + await ctx.with('๐Ÿงจ process', {}, (callTx) => f.apply(service, [opContext(callTx), ...params])) } catch (err: any) { Analytics.handleError(err) if (LOGGING_ENABLED) { this.ctx.error('error handle request', { error: err, request }) } - await ws.send( + ws.send( ctx, { id: request.id, @@ -1006,6 +1008,10 @@ export function start ( enableCompression?: boolean accountsUrl: string externalStorage: StorageAdapter + profiling?: { + start: () => void + stop: () => Promise + } } & Partial ): () => Promise { const sessions = new TSessionManager( @@ -1015,7 +1021,8 @@ export function start ( pingTimeout: opt.pingTimeout ?? 10000, reconnectTimeout: 500 }, - opt.brandingMap + opt.brandingMap, + opt.profiling ) return opt.serverFactory( sessions, diff --git a/server/ws/src/server_http.ts b/server/ws/src/server_http.ts index 7ef167a42b..b0c9095fb6 100644 --- a/server/ws/src/server_http.ts +++ b/server/ws/src/server_http.ts @@ -33,6 +33,7 @@ import { getFile, getFileRange, type BlobResponse } from './blobs' import { doSessionOp, processRequest, type WebsocketData } from './utils' const rpcHandler = new RPCHandler() +let profiling = false /** * @public @@ -81,7 +82,25 @@ export function startHttpServer ( const jsonData = { ...getStatistics(ctx, sessions, admin), users: getUsers(), - admin + admin, + profiling + } + const json = JSON.stringify(jsonData) + res.writeHead(200, { 'Content-Type': 'application/json' }) + res.end(json) + } catch (err: any) { + Analytics.handleError(err) + ctx.error('error', { err }) + res.writeHead(404, {}) + res.end() + } + }) + app.get('/api/v1/profiling', (req, res) => { + try { + const token = req.query.token as string + decodeToken(token) + const jsonData = { + profiling } const json = JSON.stringify(jsonData) res.writeHead(200, { 'Content-Type': 'application/json' }) @@ -98,6 +117,7 @@ export function startHttpServer ( const token = req.query.token as string const payload = decodeToken(token) if (payload.extra?.admin !== 'true') { + console.warn('Non admin attempt to maintenance action', { payload }) res.writeHead(404, {}) res.end() return @@ -121,6 +141,35 @@ export function startHttpServer ( res.end() return } + case 'profile-start': { + ctx.warn( + '---------------------------------------------PROFILING SESSION STARTED---------------------------------------------' + ) + profiling = true + sessions.profiling?.start() + + res.writeHead(200) + res.end() + return + } + case 'profile-stop': { + profiling = false + if (sessions.profiling?.stop != null) { + void sessions.profiling.stop().then((profile) => { + ctx.warn( + '---------------------------------------------PROFILING SESSION STOPPED---------------------------------------------', + { profile } + ) + res.writeHead(200, { 'Content-Type': 'application/json' }) + res.end(profile ?? '{ error: "no profiling" }') + }) + } else { + res.writeHead(404) + res.end() + } + + return + } case 'force-close': { const wsId = req.query.wsId as string void sessions.forceClose(wsId) @@ -157,12 +206,11 @@ export function startHttpServer ( const contentType = req.query.contentType as string const size = parseInt((req.query.size as string) ?? '-1') - void ctx + ctx .with( 'storage upload', { workspace: payload.workspace.name }, - async (ctx) => - await externalStorage.put(ctx, payload.workspace, name, req, contentType, size !== -1 ? size : undefined), + (ctx) => externalStorage.put(ctx, payload.workspace, name, req, contentType, size !== -1 ? size : undefined), { file: name, contentType } ) .then(() => { @@ -196,10 +244,10 @@ export function startHttpServer ( const range = req.headers.range if (range !== undefined) { - void ctx - .with('file-range', { workspace: payload.workspace.name }, async (ctx) => { - await getFileRange(ctx, range, externalStorage, payload.workspace, name, wrapRes(res)) - }) + ctx + .with('file-range', { workspace: payload.workspace.name }, (ctx) => + getFileRange(ctx, range, externalStorage, payload.workspace, name, wrapRes(res)) + ) .catch((err) => { Analytics.handleError(err) ctx.error('/api/v1/blob get error', { err }) @@ -276,28 +324,22 @@ export function startHttpServer ( if (webSocketData.session instanceof Promise) { void webSocketData.session.then((s) => { if ('error' in s) { - void cs - .send( - ctx, - { id: -1, error: unknownStatus(s.error.message ?? 'Unknown error'), terminate: s.terminate }, - false, - false - ) - .then(() => { - // No connection to account service, retry from client. - setTimeout(() => { - cs.close() - }, 1000) - }) + cs.send( + ctx, + { id: -1, error: unknownStatus(s.error.message ?? 'Unknown error'), terminate: s.terminate }, + false, + false + ) + // No connection to account service, retry from client. + setTimeout(() => { + cs.close() + }, 1000) } if ('upgrade' in s) { - void cs - .send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false) - .then(() => { - setTimeout(() => { - cs.close() - }, 5000) - }) + cs.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false) + setTimeout(() => { + cs.close() + }, 5000) } }) void webSocketData.session.catch((err) => { @@ -332,7 +374,7 @@ export function startHttpServer ( } }) // eslint-disable-next-line @typescript-eslint/no-misused-promises - ws.on('close', async (code: number, reason: Buffer) => { + ws.on('close', (code: number, reason: Buffer) => { doSessionOp( webSocketData, (s) => { @@ -438,7 +480,7 @@ function createWebsocketClientSocket ( return rpcHandler.readRequest(buffer, binary) }, data: () => data, - send: async (ctx: MeasureContext, msg, binary, compression) => { + send: (ctx: MeasureContext, msg, binary, compression) => { const smsg = rpcHandler.serialize(msg, binary) ctx.measure('send-data', smsg.length) @@ -455,7 +497,6 @@ function createWebsocketClientSocket ( } ctx.measure('msg-send-delta', Date.now() - st) }) - return smsg.length } } return cs diff --git a/server/ws/src/server_u.ts b/server/ws/src/server_u.ts index 4eabef5ae4..41870e02b9 100644 --- a/server/ws/src/server_u.ts +++ b/server/ws/src/server_u.ts @@ -24,12 +24,12 @@ import { RPCHandler } from '@hcengineering/rpc' import { getStatistics, wipeStatistics } from './stats' import { LOGGING_ENABLED, type ConnectionSocket, type HandleRequestFunction, type SessionManager } from './types' +import { unknownStatus } from '@hcengineering/platform' import { type PipelineFactory, type StorageAdapter } from '@hcengineering/server-core' import uWebSockets, { DISABLED, SHARED_COMPRESSOR, type HttpResponse, type WebSocket } from '@hcengineering/uws' import { Readable } from 'stream' import { getFile, getFileRange, type BlobResponse } from './blobs' import { doSessionOp, processRequest, type WebsocketData } from './utils' -import { unknownStatus } from '@hcengineering/platform' const rpcHandler = new RPCHandler() @@ -55,6 +55,7 @@ export function startUWebsocketServer ( ): () => Promise { if (LOGGING_ENABLED) console.log(`starting U server on port ${port} ...`) + let profiling = false const uAPP = uWebSockets.App() const writeStatus = (response: HttpResponse, status: string): HttpResponse => { @@ -134,28 +135,21 @@ export function startUWebsocketServer ( if (data.session instanceof Promise) { void data.session.then((s) => { if ('error' in s) { - void cs - .send( - ctx, - { id: -1, error: unknownStatus(s.error.message ?? 'Unknown error'), terminate: s.terminate }, - false, - false - ) - .then(() => { - // No connection to account service, retry from client. - setTimeout(() => { - cs.close() - }, 1000) - }) + cs.send( + ctx, + { id: -1, error: unknownStatus(s.error.message ?? 'Unknown error'), terminate: s.terminate }, + false, + false + ) + setTimeout(() => { + cs.close() + }, 1000) } if ('upgrade' in s) { - void cs - .send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false) - .then(() => { - setTimeout(() => { - cs.close() - }, 5000) - }) + cs.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false) + setTimeout(() => { + cs.close() + }, 5000) } }) } @@ -227,7 +221,23 @@ export function startUWebsocketServer ( const json = JSON.stringify({ ...getStatistics(ctx, sessions, admin), users: getUsers, - admin + admin, + profiling + }) + + writeStatus(response, '200 OK').writeHeader('Content-Type', 'application/json').end(json) + } catch (err: any) { + Analytics.handleError(err) + writeStatus(response, '404 ERROR').end() + } + }) + .any('/api/v1/profiling', (response, request) => { + const token = request.getQuery('token') ?? '' + try { + decodeToken(token ?? '') + + const json = JSON.stringify({ + profiling }) writeStatus(response, '200 OK').writeHeader('Content-Type', 'application/json').end(json) @@ -269,6 +279,35 @@ export function startUWebsocketServer ( writeStatus(res, '200 OK').end() return } + case 'profile-start': { + ctx.warn( + '---------------------------------------------PROFILING SESSION STARTED---------------------------------------------' + ) + profiling = true + sessions.profiling?.start() + + writeStatus(res, '200 OK').end() + return + } + case 'profile-stop': { + profiling = false + if (sessions.profiling?.stop != null) { + void sessions.profiling?.stop()?.then((profile) => { + ctx.warn( + '---------------------------------------------PROFILING SESSION STOPPED---------------------------------------------', + { profile } + ) + writeStatus(res, '200 OK') + .writeHeader('Content-Type', 'application/json') + .end(profile ?? '{ error: "no profiling" }') + }) + } else { + writeStatus(res, '404 ERROR').end() + } + + writeStatus(res, '200 OK').end() + return + } case 'wipe-statistics': { wipeStatistics(ctx) @@ -311,9 +350,9 @@ export function startUWebsocketServer ( }) const range = req.getHeader('range') if (range !== undefined) { - void ctx.with('file-range', { workspace: payload.workspace.name }, async (ctx) => { - await getFileRange(ctx, range, externalStorage, payload.workspace, name, wrappedRes) - }) + void ctx.with('file-range', { workspace: payload.workspace.name }, (ctx) => + getFileRange(ctx, range, externalStorage, payload.workspace, name, wrappedRes) + ) } else { void getFile(ctx, externalStorage, payload.workspace, name, wrappedRes) } @@ -341,15 +380,7 @@ export function startUWebsocketServer ( .with( 'storage upload', { workspace: payload.workspace.name }, - async () => - await externalStorage.put( - ctx, - payload.workspace, - name, - pipe, - contentType, - size !== -1 ? size : undefined - ), + () => externalStorage.put(ctx, payload.workspace, name, pipe, contentType, size !== -1 ? size : undefined), { file: name, contentType } ) .then(() => { @@ -401,28 +432,29 @@ function createWebSocketClientSocket ( readRequest: (buffer: Buffer, binary: boolean) => { return rpcHandler.readRequest(buffer, binary) }, - send: async (ctx, msg, binary, compression): Promise => { - if (data.backPressure !== undefined) { - await data.backPressure - } - const serialized = rpcHandler.serialize(msg, binary) - try { - const sendR = ws.send(serialized, binary, compression) - if (sendR === 2) { - data.backPressure = new Promise((resolve) => { - data.backPressureResolve = resolve - }) - data.unsendMsg.push({ data: serialized, binary, compression }) - } else { - ctx.measure('send-data', serialized.length) + send: (ctx, msg, binary, compression) => { + void (async (): Promise => { + if (data.backPressure !== undefined) { + await data.backPressure } - } catch (err: any) { - if (!((err.message ?? '') as string).includes('Invalid access of closed')) { - console.error(err) + const serialized = rpcHandler.serialize(msg, binary) + try { + const sendR = ws.send(serialized, binary, compression) + if (sendR === 2) { + data.backPressure = new Promise((resolve) => { + data.backPressureResolve = resolve + }) + data.unsendMsg.push({ data: serialized, binary, compression }) + } else { + ctx.measure('send-data', serialized.length) + } + } catch (err: any) { + if (!((err.message ?? '') as string).includes('Invalid access of closed')) { + console.error(err) + } + // Ignore socket is closed } - // Ignore socket is closed - } - return serialized.length + })() } } return cs diff --git a/server/ws/src/types.ts b/server/ws/src/types.ts index 9741a61427..46f8a47351 100644 --- a/server/ws/src/types.ts +++ b/server/ws/src/types.ts @@ -91,7 +91,7 @@ export interface ConnectionSocket { id: string isClosed: boolean close: () => void - send: (ctx: MeasureContext, msg: Response, binary: boolean, compression: boolean) => Promise + send: (ctx: MeasureContext, msg: Response, binary: boolean, compression: boolean) => void data: () => Record readRequest: (buffer: Buffer, binary: boolean) => Request @@ -176,6 +176,11 @@ export interface SessionManager { closeWorkspaces: (ctx: MeasureContext) => Promise scheduleMaintenance: (timeMinutes: number) => void + + profiling?: { + start: () => void + stop: () => Promise + } } /** diff --git a/server/ws/src/utils.ts b/server/ws/src/utils.ts index e0ddb2744f..a3f4f709d6 100644 --- a/server/ws/src/utils.ts +++ b/server/ws/src/utils.ts @@ -58,12 +58,12 @@ export async function sendResponse ( socket: ConnectionSocket, resp: Response ): Promise { - await handleSend(ctx, socket, resp, 32 * 1024, session.binaryMode, session.useCompression) + await handleSend(ctx, socket, resp, 1024 * 1024, session.binaryMode, session.useCompression) } function waitNextTick (): Promise | undefined { return new Promise((resolve) => { - setTimeout(resolve) + setImmediate(resolve) }) } export async function handleSend ( @@ -96,7 +96,7 @@ export async function handleSend ( chunk.lookupMap = orig.lookupMap } if (chunk !== undefined) { - await ws.send( + ws.send( ctx, { ...msg, result: chunk, chunk: { index: cid, final: data.length === 0 } }, useBinary, @@ -110,6 +110,6 @@ export async function handleSend ( } } } else { - await ws.send(ctx, msg, useBinary, useCompression) + ws.send(ctx, msg, useBinary, useCompression) } } diff --git a/tests/profile-download.sh b/tests/profile-download.sh new file mode 100755 index 0000000000..91d8f5454f --- /dev/null +++ b/tests/profile-download.sh @@ -0,0 +1,5 @@ +echo "Downloading profile" +token=$(./tool.sh generate-token --admin anticrm@hc.engineering sanity-ws) +current=$(date +%Y%m%d%H%M%S) +mkdir -p ./profiles +curl -X PUT "http://localhost:3334/api/v1/manage?token=${token}&operation=profile-stop" -o "./profiles/profile-${current}".cpuprofile diff --git a/tests/profile-generate.sh b/tests/profile-generate.sh new file mode 100755 index 0000000000..bd0078aa75 --- /dev/null +++ b/tests/profile-generate.sh @@ -0,0 +1,8 @@ +#!/bin/bash +# npm install -g cpupro +for profile in $(ls ./profiles/*.cpuprofile); do + name=${profile/\.cpuprofile/} + if ! test -f $name.html; then + cpupro $profile -f "$name.html" --no-open + fi +done diff --git a/tests/profile-start.sh b/tests/profile-start.sh new file mode 100755 index 0000000000..8ab5b509e6 --- /dev/null +++ b/tests/profile-start.sh @@ -0,0 +1,3 @@ +echo "Start profiling on server" +token=$(./tool.sh generate-token --admin anticrm@hc.engineering sanity-ws) +curl -X PUT "http://localhost:3334/api/v1/manage?token=${token}&operation=profile-start" diff --git a/tests/sanity/tests/tracker/attachments.spec.ts b/tests/sanity/tests/tracker/attachments.spec.ts index 3da75a2549..8beb100b5b 100644 --- a/tests/sanity/tests/tracker/attachments.spec.ts +++ b/tests/sanity/tests/tracker/attachments.spec.ts @@ -1,10 +1,10 @@ import { test } from '@playwright/test' -import { IssuesPage } from '../model/tracker/issues-page' -import { generateId, PlatformSetting, PlatformURI } from '../utils' -import { TrackerNavigationMenuPage } from '../model/tracker/tracker-navigation-menu-page' -import { IssuesDetailsPage } from '../model/tracker/issues-details-page' -import { NewIssue } from '../model/tracker/types' import { LeftSideMenuPage } from '../model/left-side-menu-page' +import { IssuesDetailsPage } from '../model/tracker/issues-details-page' +import { IssuesPage } from '../model/tracker/issues-page' +import { TrackerNavigationMenuPage } from '../model/tracker/tracker-navigation-menu-page' +import { NewIssue } from '../model/tracker/types' +import { generateId, PlatformSetting, PlatformURI } from '../utils' test.use({ storageState: PlatformSetting @@ -62,6 +62,7 @@ test.describe('Attachments tests', () => { await issuesPage.openIssueByName(newIssue.title) await issuesDetailsPage.checkIssueContainsAttachment('cat.jpeg') await issuesDetailsPage.checkIssueContainsAttachment('cat3.jpeg') - await issuesDetailsPage.checkActivityExist('uploaded an attachment') + // TODO: It could be execurted a bit faster and activity will not contain necessary entry. + // await issuesDetailsPage.checkActivityExist('uploaded an attachment') }) })