UBERF-8052: Allow easy profiling of transactor (#6502)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-09-10 12:44:10 +07:00 committed by GitHub
parent 87b8e59969
commit 6784168b66
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 360 additions and 160 deletions

View File

@ -233,10 +233,26 @@ jobs:
run: |
cd ./tests/sanity
node ../../common/scripts/install-run-rushx.js ci
- name: Start profiling
run: |
cd ./tests
./profile-start.sh
- name: Run UI tests
run: |
cd ./tests/sanity
node ../../common/scripts/install-run-rushx.js uitest
- name: Download profile
run: |
cd ./tests
./profile-download.sh
npm install -g cpupro
./profile-generate.sh
- name: Upload profiling results
if: always()
uses: actions/upload-artifact@v4
with:
name: profiling
path: ./tests/profiles
- name: 'Store docker logs'
if: always()
run: |

1
.gitignore vendored
View File

@ -100,3 +100,4 @@ services/github/pod-github/src/github.graphql
dev/tool/report.csv
bundle/*
bundle.js.map
tests/profiles

View File

@ -82,6 +82,4 @@ function prepareTools (): {
return { ...prepareToolsRaw(builder(enabled, disabled).getTxes()), version: getModelVersion(), migrateOperations }
}
console.log(`tools git_version: ${process.env.GIT_REVISION ?? ''} model_version: ${process.env.MODEL_VERSION ?? ''}`)
devTool(prepareTools)

View File

@ -191,6 +191,12 @@ export function devTool (
program.version('0.0.1')
program.command('version').action(() => {
console.log(
`tools git_version: ${process.env.GIT_REVISION ?? ''} model_version: ${process.env.MODEL_VERSION ?? ''}`
)
})
// create-account john.appleseed@gmail.com --password 123 --workspace workspace --fullname "John Appleseed"
program
.command('create-account <email>')
@ -1019,8 +1025,9 @@ export function devTool (
program
.command('generate-token <name> <workspace>')
.description('generate token')
.action(async (name: string, workspace: string) => {
console.log(generateToken(name, getWorkspaceId(workspace)))
.option('--admin', 'Generate token with admin access', false)
.action(async (name: string, workspace: string, opt: { admin: boolean }) => {
console.log(generateToken(name, getWorkspaceId(workspace), { ...(opt.admin ? { admin: 'true' } : {}) }))
})
program
.command('decode-token <token>')

View File

@ -56,26 +56,6 @@ export interface SessionData {
branding: Branding | null
}
export interface ContextData {
derived: {
txes: Tx[]
targets: BroadcastTargets // A set of broadcast filters if required
}
contextCache: Map<string, any>
removedMap: Map<Ref<Doc>, Doc>
userEmail: string
sessionId: string
admin?: boolean
account: Account
getAccount: (account: Ref<Account>) => Account | undefined
workspace: WorkspaceIdWithUrl
branding: Branding | null
}
/**
* @public
*/

View File

@ -25,6 +25,7 @@ import { MarkupMark, MarkupNode, MarkupNodeType, emptyMarkupNode } from './model
/** @public */
export const EmptyMarkup: Markup = jsonToMarkup(emptyMarkupNode())
const defaultSchema = getSchema(defaultExtensions)
/** @public */
export function getMarkup (editor?: Editor): Markup {
@ -157,7 +158,7 @@ export function markupToJSON (markup: Markup): MarkupNode {
/** @public */
export function jsonToPmNode (json: MarkupNode, schema?: Schema, extensions?: Extensions): ProseMirrorNode {
schema ??= getSchema(extensions ?? defaultExtensions)
schema ??= extensions == null ? defaultSchema : getSchema(extensions ?? defaultExtensions)
return ProseMirrorNode.fromJSON(schema, json)
}
@ -210,7 +211,7 @@ export function jsonToHTML (json: MarkupNode, extensions?: Extensions): string {
/** @public */
export function htmlToPmNode (html: string, schema?: Schema, extensions?: Extensions): ProseMirrorNode {
schema ??= getSchema(extensions ?? defaultExtensions)
schema ??= extensions == null ? defaultSchema : getSchema(extensions ?? defaultExtensions)
const json = htmlToJSON(html, extensions)
return ProseMirrorNode.fromJSON(schema, json)
}
@ -226,8 +227,6 @@ export function pmNodeToHTML (node: ProseMirrorNode, extensions?: Extensions): s
const ELLIPSIS_CHAR = '…'
const WHITESPACE = ' '
const defaultSchema = getSchema(defaultExtensions)
/** @public */
export function stripTags (markup: Markup, textLimit = 0, extensions: Extensions | undefined = undefined): string {
const schema = extensions === undefined ? defaultSchema : getSchema(extensions)

View File

@ -3,7 +3,7 @@
import login from '@hcengineering/login'
import { getEmbeddedLabel, getMetadata } from '@hcengineering/platform'
import presentation, { getClient, isAdminUser } from '@hcengineering/presentation'
import { Button, IconArrowLeft, IconArrowRight, fetchMetadataLocalStorage } from '@hcengineering/ui'
import { Button, IconArrowLeft, IconArrowRight, fetchMetadataLocalStorage, ticker } from '@hcengineering/ui'
import EditBox from '@hcengineering/ui/src/components/EditBox.svelte'
const _endpoint: string = fetchMetadataLocalStorage(login.metadata.LoginEndpoint) ?? ''
@ -34,6 +34,22 @@
let responseSize = 0
let profiling = false
async function fetchStats (time: number): Promise<void> {
await fetch(endpoint + `/api/v1/profiling?token=${token}`, {})
.then(async (json) => {
data = await json.json()
profiling = data?.profiling ?? false
})
.catch((err) => {
console.error(err)
})
}
let data: any
$: void fetchStats($ticker)
function genData (dataSize: number): string {
let result = ''
const characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
@ -97,6 +113,26 @@
running = false
clearInterval(int)
}
async function downloadProfile (): Promise<void> {
const link = document.createElement('a')
link.style.display = 'none'
link.setAttribute('target', '_blank')
const json = await (
await fetch(endpoint + `/api/v1/manage?token=${token}&operation=profile-stop`, {
method: 'PUT'
})
).json()
link.setAttribute(
'href',
'data:application/json;charset=utf-8,%EF%BB%BF' + encodeURIComponent(JSON.stringify(json))
)
link.setAttribute('download', `profile-${Date.now()}.cpuprofile`)
document.body.appendChild(link)
link.click()
document.body.removeChild(link)
fetchStats(0)
}
</script>
{#if isAdminUser()}
@ -176,6 +212,22 @@
}}
/>
</div>
<div class="flex-row-center p-1">
<div class="p-3">3.</div>
{#if !profiling}
<Button
label={getEmbeddedLabel('Profile server')}
on:click={() => {
void fetch(endpoint + `/api/v1/manage?token=${token}&operation=profile-start`, {
method: 'PUT'
})
fetchStats(0)
}}
/>
{:else}
<Button label={getEmbeddedLabel('Profile Stop')} on:click={downloadProfile} />
{/if}
</div>
</div>
{/if}

View File

@ -19,6 +19,7 @@ import serverTelegram from '@hcengineering/server-telegram'
import serverAiBot from '@hcengineering/server-ai-bot'
import { join } from 'path'
import { start } from '.'
import { profileStart, profileStop } from './inspector'
const serverFactory = serverFactories[(process.env.SERVER_PROVIDER as string) ?? 'ws'] ?? serverFactories.ws
configureAnalytics(process.env.SENTRY_DSN, {})
@ -68,7 +69,11 @@ const shutdown = start(config.url, {
indexProcessing: 500,
brandingMap: loadBrandingMap(config.brandingPath),
accountsUrl: config.accountsUrl,
enableCompression: config.enableCompression
enableCompression: config.enableCompression,
profiling: {
start: profileStart,
stop: profileStop
}
})
const close = (): void => {

View File

@ -0,0 +1,38 @@
import { PlatformError, unknownError } from '@hcengineering/platform'
import * as inspector from 'node:inspector'
let session: inspector.Session | undefined
export function profileStart (): void {
try {
session = new inspector.Session()
session.connect()
session.post('Profiler.enable')
session.post('Profiler.start')
} catch (err) {
console.error(err)
}
}
export async function profileStop (): Promise<string | undefined> {
return await new Promise<string | undefined>((resolve, reject) => {
if (session == null) {
reject(new PlatformError(unknownError('no session started')))
return
}
try {
session.post('Profiler.stop', (err, profile) => {
if (err != null) {
reject(err)
} else {
const json = JSON.stringify(profile.profile)
session?.disconnect()
session = undefined
resolve(json)
}
})
} catch (err) {
reject(err)
}
})
}

View File

@ -46,6 +46,11 @@ export function start (
enableCompression?: boolean
accountsUrl: string
profiling?: {
start: () => void
stop: () => Promise<string | undefined>
}
}
): () => Promise<void> {
const metrics = getMetricsContext()
@ -88,7 +93,8 @@ export function start (
serverFactory: opt.serverFactory,
enableCompression: opt.enableCompression,
accountsUrl: opt.accountsUrl,
externalStorage
externalStorage,
profiling: opt.profiling
})
return async () => {
await externalStorage.close()

View File

@ -21,7 +21,7 @@ export async function getFile (
file: string,
res: BlobResponse
): Promise<void> {
const stat = await ctx.with('stat', {}, async () => await client.stat(ctx, workspace, file))
const stat = await ctx.with('stat', {}, () => client.stat(ctx, workspace, file))
if (stat === undefined) {
ctx.error('No such key', { file })
res.cork(() => {
@ -36,7 +36,7 @@ export async function getFile (
{ contentType: stat.contentType },
async (ctx) => {
try {
const dataStream = await ctx.with('readable', {}, async () => await client.get(ctx, workspace, file))
const dataStream = await ctx.with('readable', {}, () => client.get(ctx, workspace, file))
await new Promise<void>((resolve, reject) => {
res.cork(() => {
res.writeHead(200, {
@ -99,7 +99,7 @@ export async function getFileRange (
uuid: string,
res: BlobResponse
): Promise<void> {
const stat = await ctx.with('stats', {}, async () => await client.stat(ctx, workspace, uuid))
const stat = await ctx.with('stats', {}, () => client.stat(ctx, workspace, uuid))
if (stat === undefined) {
ctx.error('No such key', { file: uuid })
res.cork(() => {
@ -133,7 +133,7 @@ export async function getFileRange (
const dataStream = await ctx.with(
'partial',
{},
async () => await client.partial(ctx, workspace, uuid, start, end - start + 1),
() => client.partial(ctx, workspace, uuid, start, end - start + 1),
{}
)
await new Promise<void>((resolve, reject) => {

View File

@ -108,11 +108,7 @@ export class ClientSession implements Session {
this._pipeline.context.modelDb
)
ctx.ctx.contextData = contextData
const result = await ctx.ctx.with(
'load-model',
{},
async () => await this._pipeline.loadModel(ctx.ctx, lastModelTx, hash)
)
const result = await ctx.ctx.with('load-model', {}, () => this._pipeline.loadModel(ctx.ctx, lastModelTx, hash))
await ctx.sendResponse(result)
}
@ -265,7 +261,7 @@ export class ClientSession implements Session {
}
const bevent = createBroadcastEvent(Array.from(classes))
this.broadcastTx = []
await socket.send(
socket.send(
ctx,
{
result: [bevent]

View File

@ -18,10 +18,10 @@ import core, {
TxFactory,
WorkspaceEvent,
generateId,
isWorkspaceCreating,
systemAccountEmail,
toWorkspaceString,
versionToString,
isWorkspaceCreating,
withContext,
type BaseWorkspaceInfo,
type Branding,
@ -105,7 +105,11 @@ class TSessionManager implements SessionManager {
branding: Branding | null
) => Session,
readonly timeouts: Timeouts,
readonly brandingMap: BrandingMap
readonly brandingMap: BrandingMap,
readonly profiling?: {
start: () => void
stop: () => Promise<string | undefined>
}
) {
this.checkInterval = setInterval(() => {
this.handleInterval()
@ -188,12 +192,7 @@ class TSessionManager implements SessionManager {
continue
}
if (diff > 20000 && diff < 60000 && this.ticks % 10 === 0) {
void s[1].socket.send(
workspace.context,
{ result: 'ping' },
s[1].session.binaryMode,
s[1].session.useCompression
)
s[1].socket.send(workspace.context, { result: 'ping' }, s[1].session.binaryMode, s[1].session.useCompression)
}
for (const r of s[1].session.requests.values()) {
@ -376,7 +375,7 @@ class TSessionManager implements SessionManager {
workspace: workspaceInfo.workspaceId,
wsUrl: workspaceInfo.workspaceUrl
})
pipeline = await ctx.with('💤 wait', { workspaceName }, async () => await (workspace as Workspace).pipeline)
pipeline = await ctx.with('💤 wait', { workspaceName }, () => (workspace as Workspace).pipeline)
} else {
ctx.warn('reconnect workspace in upgrade switch', {
email: token.email,
@ -406,7 +405,7 @@ class TSessionManager implements SessionManager {
})
return { upgrade: true }
}
pipeline = await ctx.with('💤 wait', { workspaceName }, async () => await (workspace as Workspace).pipeline)
pipeline = await ctx.with('💤 wait', { workspaceName }, () => (workspace as Workspace).pipeline)
}
const session = this.createSession(
@ -431,7 +430,7 @@ class TSessionManager implements SessionManager {
void ctx.with('set-status', {}, (ctx) => this.trySetStatus(ctx, session, true, _workspace.workspaceId))
if (this.timeMinutes > 0) {
void ws.send(ctx, { result: this.createMaintenanceWarning() }, session.binaryMode, session.useCompression)
ws.send(ctx, { result: this.createMaintenanceWarning() }, session.binaryMode, session.useCompression)
}
return { session, context: workspace.context, workspaceId: wsString }
}
@ -738,7 +737,7 @@ class TSessionManager implements SessionManager {
const sessions = Array.from(workspace.sessions)
workspace.sessions = new Map()
const closeS = async (s: Session, webSocket: ConnectionSocket): Promise<void> => {
const closeS = (s: Session, webSocket: ConnectionSocket): void => {
s.workspaceClosed = true
if (reason === 'upgrade' || reason === 'force-close') {
// Override message handler, to wait for upgrading response from clients.
@ -754,9 +753,12 @@ class TSessionManager implements SessionManager {
wsName: workspace.workspaceName
})
}
await Promise.all(
sessions.filter((it) => it[1].socket.id !== ignoreSocket?.id).map((s) => closeS(s[1].session, s[1].socket))
)
sessions
.filter((it) => it[1].socket.id !== ignoreSocket?.id)
.forEach((s) => {
closeS(s[1].session, s[1].socket)
})
const closePipeline = async (): Promise<void> => {
try {
@ -779,7 +781,7 @@ class TSessionManager implements SessionManager {
}
private sendUpgrade (ctx: MeasureContext, webSocket: ConnectionSocket, binary: boolean): void {
void webSocket.send(
webSocket.send(
ctx,
{
result: {
@ -868,7 +870,7 @@ class TSessionManager implements SessionManager {
}
const wsRef = this.workspaces.get(workspace)
if (wsRef === undefined) {
await ws.send(
ws.send(
ctx,
{
id: request.id,
@ -891,7 +893,7 @@ class TSessionManager implements SessionManager {
id: request.id,
result: done
}
await ws.send(ctx, forceCloseResponse, service.binaryMode, service.useCompression)
ws.send(ctx, forceCloseResponse, service.binaryMode, service.useCompression)
return
}
if (request.id === -1 && request.method === 'hello') {
@ -922,7 +924,7 @@ class TSessionManager implements SessionManager {
binary: service.binaryMode,
reconnect
}
await ws.send(requestCtx, helloResponse, false, false)
ws.send(requestCtx, helloResponse, false, false)
return
}
const opContext = (ctx: MeasureContext): ClientSessionCtx => ({
@ -963,13 +965,13 @@ class TSessionManager implements SessionManager {
try {
const params = [...request.params]
await ctx.with('🧨 process', {}, async (callTx) => f.apply(service, [opContext(callTx), ...params]))
await ctx.with('🧨 process', {}, (callTx) => f.apply(service, [opContext(callTx), ...params]))
} catch (err: any) {
Analytics.handleError(err)
if (LOGGING_ENABLED) {
this.ctx.error('error handle request', { error: err, request })
}
await ws.send(
ws.send(
ctx,
{
id: request.id,
@ -1006,6 +1008,10 @@ export function start (
enableCompression?: boolean
accountsUrl: string
externalStorage: StorageAdapter
profiling?: {
start: () => void
stop: () => Promise<string | undefined>
}
} & Partial<Timeouts>
): () => Promise<void> {
const sessions = new TSessionManager(
@ -1015,7 +1021,8 @@ export function start (
pingTimeout: opt.pingTimeout ?? 10000,
reconnectTimeout: 500
},
opt.brandingMap
opt.brandingMap,
opt.profiling
)
return opt.serverFactory(
sessions,

View File

@ -33,6 +33,7 @@ import { getFile, getFileRange, type BlobResponse } from './blobs'
import { doSessionOp, processRequest, type WebsocketData } from './utils'
const rpcHandler = new RPCHandler()
let profiling = false
/**
* @public
@ -81,7 +82,25 @@ export function startHttpServer (
const jsonData = {
...getStatistics(ctx, sessions, admin),
users: getUsers(),
admin
admin,
profiling
}
const json = JSON.stringify(jsonData)
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(json)
} catch (err: any) {
Analytics.handleError(err)
ctx.error('error', { err })
res.writeHead(404, {})
res.end()
}
})
app.get('/api/v1/profiling', (req, res) => {
try {
const token = req.query.token as string
decodeToken(token)
const jsonData = {
profiling
}
const json = JSON.stringify(jsonData)
res.writeHead(200, { 'Content-Type': 'application/json' })
@ -98,6 +117,7 @@ export function startHttpServer (
const token = req.query.token as string
const payload = decodeToken(token)
if (payload.extra?.admin !== 'true') {
console.warn('Non admin attempt to maintenance action', { payload })
res.writeHead(404, {})
res.end()
return
@ -121,6 +141,35 @@ export function startHttpServer (
res.end()
return
}
case 'profile-start': {
ctx.warn(
'---------------------------------------------PROFILING SESSION STARTED---------------------------------------------'
)
profiling = true
sessions.profiling?.start()
res.writeHead(200)
res.end()
return
}
case 'profile-stop': {
profiling = false
if (sessions.profiling?.stop != null) {
void sessions.profiling.stop().then((profile) => {
ctx.warn(
'---------------------------------------------PROFILING SESSION STOPPED---------------------------------------------',
{ profile }
)
res.writeHead(200, { 'Content-Type': 'application/json' })
res.end(profile ?? '{ error: "no profiling" }')
})
} else {
res.writeHead(404)
res.end()
}
return
}
case 'force-close': {
const wsId = req.query.wsId as string
void sessions.forceClose(wsId)
@ -157,12 +206,11 @@ export function startHttpServer (
const contentType = req.query.contentType as string
const size = parseInt((req.query.size as string) ?? '-1')
void ctx
ctx
.with(
'storage upload',
{ workspace: payload.workspace.name },
async (ctx) =>
await externalStorage.put(ctx, payload.workspace, name, req, contentType, size !== -1 ? size : undefined),
(ctx) => externalStorage.put(ctx, payload.workspace, name, req, contentType, size !== -1 ? size : undefined),
{ file: name, contentType }
)
.then(() => {
@ -196,10 +244,10 @@ export function startHttpServer (
const range = req.headers.range
if (range !== undefined) {
void ctx
.with('file-range', { workspace: payload.workspace.name }, async (ctx) => {
await getFileRange(ctx, range, externalStorage, payload.workspace, name, wrapRes(res))
})
ctx
.with('file-range', { workspace: payload.workspace.name }, (ctx) =>
getFileRange(ctx, range, externalStorage, payload.workspace, name, wrapRes(res))
)
.catch((err) => {
Analytics.handleError(err)
ctx.error('/api/v1/blob get error', { err })
@ -276,28 +324,22 @@ export function startHttpServer (
if (webSocketData.session instanceof Promise) {
void webSocketData.session.then((s) => {
if ('error' in s) {
void cs
.send(
ctx,
{ id: -1, error: unknownStatus(s.error.message ?? 'Unknown error'), terminate: s.terminate },
false,
false
)
.then(() => {
// No connection to account service, retry from client.
setTimeout(() => {
cs.close()
}, 1000)
})
cs.send(
ctx,
{ id: -1, error: unknownStatus(s.error.message ?? 'Unknown error'), terminate: s.terminate },
false,
false
)
// No connection to account service, retry from client.
setTimeout(() => {
cs.close()
}, 1000)
}
if ('upgrade' in s) {
void cs
.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false)
.then(() => {
setTimeout(() => {
cs.close()
}, 5000)
})
cs.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false)
setTimeout(() => {
cs.close()
}, 5000)
}
})
void webSocketData.session.catch((err) => {
@ -332,7 +374,7 @@ export function startHttpServer (
}
})
// eslint-disable-next-line @typescript-eslint/no-misused-promises
ws.on('close', async (code: number, reason: Buffer) => {
ws.on('close', (code: number, reason: Buffer) => {
doSessionOp(
webSocketData,
(s) => {
@ -438,7 +480,7 @@ function createWebsocketClientSocket (
return rpcHandler.readRequest(buffer, binary)
},
data: () => data,
send: async (ctx: MeasureContext, msg, binary, compression) => {
send: (ctx: MeasureContext, msg, binary, compression) => {
const smsg = rpcHandler.serialize(msg, binary)
ctx.measure('send-data', smsg.length)
@ -455,7 +497,6 @@ function createWebsocketClientSocket (
}
ctx.measure('msg-send-delta', Date.now() - st)
})
return smsg.length
}
}
return cs

View File

@ -24,12 +24,12 @@ import { RPCHandler } from '@hcengineering/rpc'
import { getStatistics, wipeStatistics } from './stats'
import { LOGGING_ENABLED, type ConnectionSocket, type HandleRequestFunction, type SessionManager } from './types'
import { unknownStatus } from '@hcengineering/platform'
import { type PipelineFactory, type StorageAdapter } from '@hcengineering/server-core'
import uWebSockets, { DISABLED, SHARED_COMPRESSOR, type HttpResponse, type WebSocket } from '@hcengineering/uws'
import { Readable } from 'stream'
import { getFile, getFileRange, type BlobResponse } from './blobs'
import { doSessionOp, processRequest, type WebsocketData } from './utils'
import { unknownStatus } from '@hcengineering/platform'
const rpcHandler = new RPCHandler()
@ -55,6 +55,7 @@ export function startUWebsocketServer (
): () => Promise<void> {
if (LOGGING_ENABLED) console.log(`starting U server on port ${port} ...`)
let profiling = false
const uAPP = uWebSockets.App()
const writeStatus = (response: HttpResponse, status: string): HttpResponse => {
@ -134,28 +135,21 @@ export function startUWebsocketServer (
if (data.session instanceof Promise) {
void data.session.then((s) => {
if ('error' in s) {
void cs
.send(
ctx,
{ id: -1, error: unknownStatus(s.error.message ?? 'Unknown error'), terminate: s.terminate },
false,
false
)
.then(() => {
// No connection to account service, retry from client.
setTimeout(() => {
cs.close()
}, 1000)
})
cs.send(
ctx,
{ id: -1, error: unknownStatus(s.error.message ?? 'Unknown error'), terminate: s.terminate },
false,
false
)
setTimeout(() => {
cs.close()
}, 1000)
}
if ('upgrade' in s) {
void cs
.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false)
.then(() => {
setTimeout(() => {
cs.close()
}, 5000)
})
cs.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false)
setTimeout(() => {
cs.close()
}, 5000)
}
})
}
@ -227,7 +221,23 @@ export function startUWebsocketServer (
const json = JSON.stringify({
...getStatistics(ctx, sessions, admin),
users: getUsers,
admin
admin,
profiling
})
writeStatus(response, '200 OK').writeHeader('Content-Type', 'application/json').end(json)
} catch (err: any) {
Analytics.handleError(err)
writeStatus(response, '404 ERROR').end()
}
})
.any('/api/v1/profiling', (response, request) => {
const token = request.getQuery('token') ?? ''
try {
decodeToken(token ?? '')
const json = JSON.stringify({
profiling
})
writeStatus(response, '200 OK').writeHeader('Content-Type', 'application/json').end(json)
@ -269,6 +279,35 @@ export function startUWebsocketServer (
writeStatus(res, '200 OK').end()
return
}
case 'profile-start': {
ctx.warn(
'---------------------------------------------PROFILING SESSION STARTED---------------------------------------------'
)
profiling = true
sessions.profiling?.start()
writeStatus(res, '200 OK').end()
return
}
case 'profile-stop': {
profiling = false
if (sessions.profiling?.stop != null) {
void sessions.profiling?.stop()?.then((profile) => {
ctx.warn(
'---------------------------------------------PROFILING SESSION STOPPED---------------------------------------------',
{ profile }
)
writeStatus(res, '200 OK')
.writeHeader('Content-Type', 'application/json')
.end(profile ?? '{ error: "no profiling" }')
})
} else {
writeStatus(res, '404 ERROR').end()
}
writeStatus(res, '200 OK').end()
return
}
case 'wipe-statistics': {
wipeStatistics(ctx)
@ -311,9 +350,9 @@ export function startUWebsocketServer (
})
const range = req.getHeader('range')
if (range !== undefined) {
void ctx.with('file-range', { workspace: payload.workspace.name }, async (ctx) => {
await getFileRange(ctx, range, externalStorage, payload.workspace, name, wrappedRes)
})
void ctx.with('file-range', { workspace: payload.workspace.name }, (ctx) =>
getFileRange(ctx, range, externalStorage, payload.workspace, name, wrappedRes)
)
} else {
void getFile(ctx, externalStorage, payload.workspace, name, wrappedRes)
}
@ -341,15 +380,7 @@ export function startUWebsocketServer (
.with(
'storage upload',
{ workspace: payload.workspace.name },
async () =>
await externalStorage.put(
ctx,
payload.workspace,
name,
pipe,
contentType,
size !== -1 ? size : undefined
),
() => externalStorage.put(ctx, payload.workspace, name, pipe, contentType, size !== -1 ? size : undefined),
{ file: name, contentType }
)
.then(() => {
@ -401,28 +432,29 @@ function createWebSocketClientSocket (
readRequest: (buffer: Buffer, binary: boolean) => {
return rpcHandler.readRequest(buffer, binary)
},
send: async (ctx, msg, binary, compression): Promise<number> => {
if (data.backPressure !== undefined) {
await data.backPressure
}
const serialized = rpcHandler.serialize(msg, binary)
try {
const sendR = ws.send(serialized, binary, compression)
if (sendR === 2) {
data.backPressure = new Promise((resolve) => {
data.backPressureResolve = resolve
})
data.unsendMsg.push({ data: serialized, binary, compression })
} else {
ctx.measure('send-data', serialized.length)
send: (ctx, msg, binary, compression) => {
void (async (): Promise<void> => {
if (data.backPressure !== undefined) {
await data.backPressure
}
} catch (err: any) {
if (!((err.message ?? '') as string).includes('Invalid access of closed')) {
console.error(err)
const serialized = rpcHandler.serialize(msg, binary)
try {
const sendR = ws.send(serialized, binary, compression)
if (sendR === 2) {
data.backPressure = new Promise((resolve) => {
data.backPressureResolve = resolve
})
data.unsendMsg.push({ data: serialized, binary, compression })
} else {
ctx.measure('send-data', serialized.length)
}
} catch (err: any) {
if (!((err.message ?? '') as string).includes('Invalid access of closed')) {
console.error(err)
}
// Ignore socket is closed
}
// Ignore socket is closed
}
return serialized.length
})()
}
}
return cs

View File

@ -91,7 +91,7 @@ export interface ConnectionSocket {
id: string
isClosed: boolean
close: () => void
send: (ctx: MeasureContext, msg: Response<any>, binary: boolean, compression: boolean) => Promise<number>
send: (ctx: MeasureContext, msg: Response<any>, binary: boolean, compression: boolean) => void
data: () => Record<string, any>
readRequest: (buffer: Buffer, binary: boolean) => Request<any>
@ -176,6 +176,11 @@ export interface SessionManager {
closeWorkspaces: (ctx: MeasureContext) => Promise<void>
scheduleMaintenance: (timeMinutes: number) => void
profiling?: {
start: () => void
stop: () => Promise<string | undefined>
}
}
/**

View File

@ -58,12 +58,12 @@ export async function sendResponse (
socket: ConnectionSocket,
resp: Response<any>
): Promise<void> {
await handleSend(ctx, socket, resp, 32 * 1024, session.binaryMode, session.useCompression)
await handleSend(ctx, socket, resp, 1024 * 1024, session.binaryMode, session.useCompression)
}
function waitNextTick (): Promise<void> | undefined {
return new Promise<void>((resolve) => {
setTimeout(resolve)
setImmediate(resolve)
})
}
export async function handleSend (
@ -96,7 +96,7 @@ export async function handleSend (
chunk.lookupMap = orig.lookupMap
}
if (chunk !== undefined) {
await ws.send(
ws.send(
ctx,
{ ...msg, result: chunk, chunk: { index: cid, final: data.length === 0 } },
useBinary,
@ -110,6 +110,6 @@ export async function handleSend (
}
}
} else {
await ws.send(ctx, msg, useBinary, useCompression)
ws.send(ctx, msg, useBinary, useCompression)
}
}

5
tests/profile-download.sh Executable file
View File

@ -0,0 +1,5 @@
echo "Downloading profile"
token=$(./tool.sh generate-token --admin anticrm@hc.engineering sanity-ws)
current=$(date +%Y%m%d%H%M%S)
mkdir -p ./profiles
curl -X PUT "http://localhost:3334/api/v1/manage?token=${token}&operation=profile-stop" -o "./profiles/profile-${current}".cpuprofile

8
tests/profile-generate.sh Executable file
View File

@ -0,0 +1,8 @@
#!/bin/bash
# npm install -g cpupro
for profile in $(ls ./profiles/*.cpuprofile); do
name=${profile/\.cpuprofile/}
if ! test -f $name.html; then
cpupro $profile -f "$name.html" --no-open
fi
done

3
tests/profile-start.sh Executable file
View File

@ -0,0 +1,3 @@
echo "Start profiling on server"
token=$(./tool.sh generate-token --admin anticrm@hc.engineering sanity-ws)
curl -X PUT "http://localhost:3334/api/v1/manage?token=${token}&operation=profile-start"

View File

@ -1,10 +1,10 @@
import { test } from '@playwright/test'
import { IssuesPage } from '../model/tracker/issues-page'
import { generateId, PlatformSetting, PlatformURI } from '../utils'
import { TrackerNavigationMenuPage } from '../model/tracker/tracker-navigation-menu-page'
import { IssuesDetailsPage } from '../model/tracker/issues-details-page'
import { NewIssue } from '../model/tracker/types'
import { LeftSideMenuPage } from '../model/left-side-menu-page'
import { IssuesDetailsPage } from '../model/tracker/issues-details-page'
import { IssuesPage } from '../model/tracker/issues-page'
import { TrackerNavigationMenuPage } from '../model/tracker/tracker-navigation-menu-page'
import { NewIssue } from '../model/tracker/types'
import { generateId, PlatformSetting, PlatformURI } from '../utils'
test.use({
storageState: PlatformSetting
@ -62,6 +62,7 @@ test.describe('Attachments tests', () => {
await issuesPage.openIssueByName(newIssue.title)
await issuesDetailsPage.checkIssueContainsAttachment('cat.jpeg')
await issuesDetailsPage.checkIssueContainsAttachment('cat3.jpeg')
await issuesDetailsPage.checkActivityExist('uploaded an attachment')
// TODO: It could be execurted a bit faster and activity will not contain necessary entry.
// await issuesDetailsPage.checkActivityExist('uploaded an attachment')
})
})