mirror of
https://github.com/hcengineering/platform.git
synced 2024-11-22 11:42:30 +03:00
Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
parent
cf1e8b284f
commit
149d8cda46
1
.gitignore
vendored
1
.gitignore
vendored
@ -81,3 +81,4 @@ pods/front/dist
|
||||
*.cpuprofile
|
||||
*.pyc
|
||||
metrics.txt
|
||||
dev/tool/report.csv
|
||||
|
3
.vscode/launch.json
vendored
3
.vscode/launch.json
vendored
@ -45,6 +45,9 @@
|
||||
"SERVER_SECRET": "secret",
|
||||
"REKONI_URL": "http://localhost:4004",
|
||||
"FRONT_URL": "http://localhost:8080",
|
||||
// "SERVER_PROVIDER":"uweb"
|
||||
"SERVER_PROVIDER":"ws"
|
||||
|
||||
// "RETRANSLATE_URL": "http://127.0.0.1:4500",
|
||||
//"RETRANSLATE_URL": "https://208.167.249.201",
|
||||
// "RETRANSLATE_TOKEN": ""
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -141,6 +141,7 @@ services:
|
||||
- REKONI_URL=http://rekoni:4004
|
||||
- FRONT_URL=http://localhost:8087
|
||||
# - APM_SERVER_URL=http://apm-server:8200
|
||||
- SERVER_PROVIDER=ws
|
||||
restart: unless-stopped
|
||||
rekoni:
|
||||
image: hardcoreeng/rekoni-service
|
||||
|
@ -24,7 +24,7 @@ import {
|
||||
DummyFullTextAdapter,
|
||||
FullTextAdapter
|
||||
} from '@hcengineering/server-core'
|
||||
import { ClientSession, start as startJsonRpc } from '@hcengineering/server-ws'
|
||||
import { ClientSession, startHttpServer, start as startJsonRpc } from '@hcengineering/server-ws'
|
||||
|
||||
async function createNullFullTextAdapter (): Promise<FullTextAdapter> {
|
||||
return new DummyFullTextAdapter()
|
||||
@ -43,9 +43,8 @@ async function createNullContentTextAdapter (): Promise<ContentTextAdapter> {
|
||||
*/
|
||||
export async function start (port: number, host?: string): Promise<void> {
|
||||
const ctx = new MeasureMetricsContext('server', {})
|
||||
startJsonRpc(
|
||||
ctx,
|
||||
(ctx) => {
|
||||
startJsonRpc(ctx, {
|
||||
pipelineFactory: (ctx) => {
|
||||
const conf: DbConfiguration = {
|
||||
domains: {
|
||||
[DOMAIN_TX]: 'InMemoryTx'
|
||||
@ -75,9 +74,10 @@ export async function start (port: number, host?: string): Promise<void> {
|
||||
}
|
||||
return createPipeline(ctx, conf, [], false, () => {})
|
||||
},
|
||||
(token, pipeline, broadcast) => new ClientSession(broadcast, token, pipeline),
|
||||
sessionFactory: (token, pipeline, broadcast) => new ClientSession(broadcast, token, pipeline),
|
||||
port,
|
||||
'',
|
||||
host
|
||||
)
|
||||
productId: '',
|
||||
serverFactory: startHttpServer,
|
||||
chunking: -1
|
||||
})
|
||||
}
|
||||
|
@ -13,8 +13,8 @@
|
||||
"docker:build": "docker build -t hardcoreeng/tool .",
|
||||
"docker:staging": "../../common/scripts/docker_tag.sh hardcoreeng/tool staging",
|
||||
"docker:push": "../../common/scripts/docker_tag.sh hardcoreeng/tool",
|
||||
"run-local": "cross-env SERVER_SECRET=secret MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost MONGO_URL=mongodb://localhost:27017 TRANSACTOR_URL=ws:/localhost:3333 TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 REKONI_URL=http://localhost:4004 node -r ts-node/register ./src/__start.ts",
|
||||
"run": "cross-env ts-node ./src/__start.ts",
|
||||
"run-local": "cross-env SERVER_SECRET=secret MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin MINIO_ENDPOINT=localhost MONGO_URL=mongodb://localhost:27017 TRANSACTOR_URL=ws:/localhost:3333 TELEGRAM_DATABASE=telegram-service ELASTIC_URL=http://localhost:9200 REKONI_URL=http://localhost:4004 node -r ts-node/register --max-old-space-size=8000 ./src/__start.ts",
|
||||
"run": "cross-env node -r ts-node/register --max-old-space-size=8000 ./src/__start.ts",
|
||||
"upgrade": "rushx run-local upgrade",
|
||||
"lint": "eslint src",
|
||||
"format": "prettier --write src && eslint --fix src"
|
||||
|
301
dev/tool/src/benchmark.ts
Normal file
301
dev/tool/src/benchmark.ts
Normal file
@ -0,0 +1,301 @@
|
||||
//
|
||||
// Copyright © 2023 Hardcore Engineering Inc.
|
||||
//
|
||||
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License. You may
|
||||
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
//
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import contact from '@hcengineering/contact'
|
||||
import core, {
|
||||
ClassifierKind,
|
||||
Client,
|
||||
MeasureMetricsContext,
|
||||
WorkspaceId,
|
||||
generateId,
|
||||
metricsToString,
|
||||
newMetrics,
|
||||
systemAccountEmail
|
||||
} from '@hcengineering/core'
|
||||
import { RateLimitter } from '@hcengineering/server-core'
|
||||
import { generateToken } from '@hcengineering/server-token'
|
||||
import { connect } from '@hcengineering/server-tool'
|
||||
|
||||
import os from 'os'
|
||||
import { Worker, isMainThread, parentPort } from 'worker_threads'
|
||||
import { CSVWriter } from './csv'
|
||||
|
||||
interface StartMessage {
|
||||
workspaceId: WorkspaceId
|
||||
transactorUrl: string
|
||||
id: number
|
||||
idd: number
|
||||
workId: string
|
||||
options: {
|
||||
readRequests: number
|
||||
modelRequests: number
|
||||
limit: {
|
||||
min: number
|
||||
rand: number
|
||||
}
|
||||
sleep: number
|
||||
}
|
||||
}
|
||||
interface Msg {
|
||||
type: 'complete' | 'operate'
|
||||
}
|
||||
|
||||
interface CompleteMsg extends Msg {
|
||||
type: 'complete'
|
||||
workId: string
|
||||
}
|
||||
|
||||
// interface OperatingMsg extends Msg {
|
||||
// type: 'operate'
|
||||
// workId: string
|
||||
// }
|
||||
|
||||
export async function benchmark (
|
||||
workspaceId: WorkspaceId[],
|
||||
transactorUrl: string,
|
||||
cmd: { from: number, steps: number, sleep: number }
|
||||
): Promise<void> {
|
||||
let operating = 0
|
||||
const workers: Worker[] = []
|
||||
|
||||
const works = new Map<string, () => void>()
|
||||
|
||||
os.cpus().forEach(() => {
|
||||
/* Spawn a new thread running this source file */
|
||||
const worker = new Worker(__filename)
|
||||
workers.push(worker)
|
||||
worker.on('message', (data: Msg) => {
|
||||
if (data === undefined) {
|
||||
return
|
||||
}
|
||||
if (data.type === 'operate') {
|
||||
operating += 1
|
||||
}
|
||||
if (data.type === 'complete') {
|
||||
const resolve = works.get((data as CompleteMsg).workId)
|
||||
if (resolve != null) {
|
||||
resolve()
|
||||
operating -= 1
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
const m = newMetrics()
|
||||
const ctx = new MeasureMetricsContext('benchmark', {}, m)
|
||||
|
||||
const csvWriter = new CSVWriter<{
|
||||
time: number
|
||||
clients: number
|
||||
average: number
|
||||
moment: number
|
||||
mem: number
|
||||
memTotal: number
|
||||
cpu: number
|
||||
requestTime: number
|
||||
operations: number
|
||||
transfer: number
|
||||
}>({
|
||||
time: 'Time',
|
||||
clients: 'Clients',
|
||||
average: 'Average',
|
||||
moment: 'Moment Time',
|
||||
mem: 'Mem',
|
||||
memTotal: 'Mem total',
|
||||
cpu: 'CPU',
|
||||
requestTime: 'Request time',
|
||||
operations: 'OPS',
|
||||
transfer: 'Transfer(kb)'
|
||||
})
|
||||
|
||||
let opTime: number = 0
|
||||
let moment: number = 0
|
||||
let ops = 0
|
||||
let cpu: number = 0
|
||||
let memUsed: number = 0
|
||||
let memTotal: number = 0
|
||||
let elapsed = 0
|
||||
let requestTime: number = 0
|
||||
let operations = 0
|
||||
let transfer: number = 0
|
||||
|
||||
const token = generateToken(systemAccountEmail, workspaceId[0])
|
||||
|
||||
const monitorConnection = isMainThread
|
||||
? await ctx.with('connect', {}, async () => await connect(transactorUrl, workspaceId[0], undefined))
|
||||
: undefined
|
||||
let running = false
|
||||
|
||||
let timer: any
|
||||
if (isMainThread) {
|
||||
timer = setInterval(() => {
|
||||
const st = Date.now()
|
||||
|
||||
void fetch(transactorUrl.replace('ws:/', 'http:/') + '/' + token).then((res) => {
|
||||
void res.json().then((json) => {
|
||||
memUsed = json.statistics.memoryUsed
|
||||
memTotal = json.statistics.memoryTotal
|
||||
cpu = json.statistics.cpuUsage
|
||||
const r = json.metrics?.measurements?.client?.measurements?.handleRequest?.measurements?.call
|
||||
operations = r?.operations ?? 0
|
||||
requestTime = (r?.value ?? 0) / (((r?.operations as number) ?? 0) + 1)
|
||||
transfer =
|
||||
json.metrics?.measurements?.client?.measurements?.handleRequest?.measurements?.send?.measurements?.[
|
||||
'#send-data'
|
||||
]?.value ?? 0
|
||||
})
|
||||
})
|
||||
|
||||
if (!running) {
|
||||
running = true
|
||||
void ctx.with(
|
||||
'avg',
|
||||
{},
|
||||
async () =>
|
||||
await monitorConnection?.findAll(contact.class.Employee, {}).then((res) => {
|
||||
const cur = Date.now() - st
|
||||
opTime += cur
|
||||
moment = cur
|
||||
ops++
|
||||
running = false
|
||||
})
|
||||
)
|
||||
}
|
||||
elapsed++
|
||||
csvWriter.add(
|
||||
{
|
||||
time: elapsed,
|
||||
clients: operating,
|
||||
moment,
|
||||
average: Math.round(opTime / (ops + 1)),
|
||||
mem: memUsed,
|
||||
memTotal,
|
||||
cpu,
|
||||
requestTime,
|
||||
operations: Math.round(operations / (elapsed + 1)),
|
||||
transfer: Math.round(transfer / (elapsed + 1)) / 1024
|
||||
},
|
||||
true
|
||||
)
|
||||
void csvWriter.write('report.csv')
|
||||
}, 1000)
|
||||
}
|
||||
for (let i = cmd.from; i < cmd.from + cmd.steps; i++) {
|
||||
await ctx.with('iteration', { i }, async () => {
|
||||
await Promise.all(
|
||||
Array.from(Array(i))
|
||||
.map((it, idx) => idx)
|
||||
.map((it) => {
|
||||
const wsid = workspaceId[randNum(workspaceId.length)]
|
||||
const workId = generateId()
|
||||
const msg: StartMessage = {
|
||||
workspaceId: wsid,
|
||||
transactorUrl,
|
||||
id: i,
|
||||
idd: it,
|
||||
workId,
|
||||
options: {
|
||||
readRequests: 100,
|
||||
modelRequests: 0,
|
||||
limit: {
|
||||
min: 10,
|
||||
rand: 1000
|
||||
},
|
||||
sleep: cmd.sleep
|
||||
}
|
||||
}
|
||||
workers[i % workers.length].postMessage(msg)
|
||||
|
||||
return new Promise((resolve) => {
|
||||
works.set(workId, () => resolve(null))
|
||||
})
|
||||
})
|
||||
)
|
||||
})
|
||||
console.log(metricsToString(m, `iteration-${i}`, 120))
|
||||
}
|
||||
|
||||
if (isMainThread) {
|
||||
for (const w of workers) {
|
||||
await w.terminate()
|
||||
}
|
||||
clearInterval(timer)
|
||||
await monitorConnection?.close()
|
||||
}
|
||||
}
|
||||
|
||||
function randNum (value = 2): number {
|
||||
return Math.round(Math.random() * value) % value
|
||||
}
|
||||
|
||||
if (!isMainThread) {
|
||||
parentPort?.on('message', (msg: StartMessage) => {
|
||||
console.log('starting worker', msg.workId)
|
||||
void perform(msg)
|
||||
})
|
||||
}
|
||||
|
||||
const connectLimitter = new RateLimitter(() => ({ rate: 50 }))
|
||||
|
||||
async function perform (msg: StartMessage): Promise<void> {
|
||||
let connection: Client | undefined
|
||||
try {
|
||||
console.log('connecting to', msg.workspaceId)
|
||||
|
||||
connection = await connectLimitter.exec(async () => await connect(msg.transactorUrl, msg.workspaceId, undefined))
|
||||
|
||||
parentPort?.postMessage({
|
||||
type: 'operate',
|
||||
workId: msg.workId
|
||||
})
|
||||
|
||||
const h = connection.getHierarchy()
|
||||
const allClasses = await connection.getModel().findAll(core.class.Class, {})
|
||||
const classes = allClasses.filter((it) => it.kind === ClassifierKind.CLASS && h.findDomain(it._id) !== undefined)
|
||||
while (msg.options.readRequests + msg.options.modelRequests > 0) {
|
||||
if (msg.options.modelRequests > 0) {
|
||||
await connection?.findAll(core.class.Tx, {}, { sort: { _id: -1 } })
|
||||
msg.options.modelRequests--
|
||||
}
|
||||
if (msg.options.readRequests > 0) {
|
||||
const cl = classes[randNum(classes.length - 1)]
|
||||
if (cl !== undefined) {
|
||||
await connection?.findAll(
|
||||
cl._id,
|
||||
{},
|
||||
{
|
||||
sort: { _id: -1 },
|
||||
limit: msg.options.limit.min + randNum(msg.options.limit.rand)
|
||||
}
|
||||
)
|
||||
msg.options.readRequests--
|
||||
}
|
||||
}
|
||||
if (msg.options.sleep > 0) {
|
||||
await new Promise((resolve) => setTimeout(resolve, randNum(msg.options.sleep)))
|
||||
}
|
||||
}
|
||||
//
|
||||
// console.log(`${msg.idd} perform complete`)
|
||||
} catch (err: any) {
|
||||
console.error(msg.workspaceId, err)
|
||||
} finally {
|
||||
await connection?.close()
|
||||
}
|
||||
parentPort?.postMessage({
|
||||
type: 'complete',
|
||||
workId: msg.workId
|
||||
})
|
||||
}
|
41
dev/tool/src/csv.ts
Normal file
41
dev/tool/src/csv.ts
Normal file
@ -0,0 +1,41 @@
|
||||
import { writeFile } from 'fs/promises'
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export class CSVWriter<T extends Record<string, string | number>> {
|
||||
data: string[] = []
|
||||
constructor (readonly fields: Record<keyof T, string>) {
|
||||
this.data.push(
|
||||
Object.entries(this.fields)
|
||||
.map(([key, value]) => `"${value}"`)
|
||||
.join(',')
|
||||
)
|
||||
}
|
||||
|
||||
toStr (val: string | number): string {
|
||||
if (typeof val === 'number') {
|
||||
return `"${Math.round(val * 100) / 100}"`.replace('.', ',')
|
||||
}
|
||||
return `"${val}"`
|
||||
}
|
||||
|
||||
add (record: T, print: boolean = true): void {
|
||||
this.data.push(
|
||||
Object.entries(this.fields)
|
||||
.map(([key, value]) => this.toStr(record[key]))
|
||||
.join(',')
|
||||
)
|
||||
if (print) {
|
||||
console.log(
|
||||
Object.entries(this.fields)
|
||||
.map(([key, value]) => `${value}=${this.toStr(record[key])}`)
|
||||
.join(' ')
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
async write (filename: string): Promise<void> {
|
||||
return await writeFile(filename, this.data.join('\n'))
|
||||
}
|
||||
}
|
@ -49,10 +49,11 @@ import { Data, getWorkspaceId, Tx, Version } from '@hcengineering/core'
|
||||
import { MinioService } from '@hcengineering/minio'
|
||||
import { MigrateOperation } from '@hcengineering/model'
|
||||
import { openAIConfigDefaults } from '@hcengineering/openai'
|
||||
import { benchmark } from './benchmark'
|
||||
import { cleanArchivedSpaces, cleanRemovedTransactions, cleanWorkspace } from './clean'
|
||||
import { changeConfiguration } from './configuration'
|
||||
import { rebuildElastic } from './elastic'
|
||||
import { openAIConfig } from './openai'
|
||||
import { changeConfiguration } from './configuration'
|
||||
|
||||
/**
|
||||
* @public
|
||||
@ -482,5 +483,25 @@ export function devTool (
|
||||
await changeConfiguration(getWorkspaceId(workspace, productId), transactorUrl, cmd)
|
||||
})
|
||||
|
||||
program
|
||||
.command('benchmark')
|
||||
.description('clean archived spaces')
|
||||
.option('--from <from>', 'Min client count', '10')
|
||||
.option('--steps <steps>', 'Step with client count', '10')
|
||||
.option('--sleep <sleep>', 'Random Delay max between operations', '0')
|
||||
.option('--workspaces <workspaces>', 'Workspaces to test on, comma separated', '')
|
||||
.action(async (cmd: { from: string, steps: string, sleep: string, workspaces: string }) => {
|
||||
console.log(JSON.stringify(cmd))
|
||||
await benchmark(
|
||||
cmd.workspaces.split(',').map((it) => getWorkspaceId(it, productId)),
|
||||
transactorUrl,
|
||||
{
|
||||
steps: parseInt(cmd.steps),
|
||||
from: parseInt(cmd.from),
|
||||
sleep: parseInt(cmd.sleep)
|
||||
}
|
||||
)
|
||||
})
|
||||
|
||||
program.parse(process.argv)
|
||||
}
|
||||
|
@ -246,11 +246,13 @@ async function loadModel (
|
||||
): Promise<boolean> {
|
||||
const t = Date.now()
|
||||
|
||||
const atxes = await conn.findAll(
|
||||
core.class.Tx,
|
||||
{ objectSpace: core.space.Model, _id: { $nin: Array.from(processedTx.values()) } },
|
||||
{ sort: { modifiedOn: SortingOrder.Ascending, _id: SortingOrder.Ascending } }
|
||||
)
|
||||
const mq: DocumentQuery<Tx> = { objectSpace: core.space.Model }
|
||||
if (processedTx.size > 0) {
|
||||
mq._id = { $nin: Array.from(processedTx.values()) }
|
||||
}
|
||||
const atxes = await conn.findAll(core.class.Tx, mq, {
|
||||
sort: { modifiedOn: SortingOrder.Ascending, _id: SortingOrder.Ascending }
|
||||
})
|
||||
|
||||
if (reload && atxes.length > modelTransactionThreshold) {
|
||||
return true
|
||||
|
@ -11,7 +11,7 @@ export class MeasureMetricsContext implements MeasureContext {
|
||||
private readonly params: Record<string, ParamType>
|
||||
logger: MeasureLogger
|
||||
metrics: Metrics
|
||||
private readonly done: () => void
|
||||
private readonly done: (value?: number) => void
|
||||
|
||||
constructor (name: string, params: Record<string, ParamType>, metrics: Metrics = newMetrics()) {
|
||||
this.name = name
|
||||
@ -29,6 +29,11 @@ export class MeasureMetricsContext implements MeasureContext {
|
||||
}
|
||||
}
|
||||
|
||||
measure (name: string, value: number): void {
|
||||
const c = new MeasureMetricsContext('#' + name, {}, childMetrics(this.metrics, ['#' + name]))
|
||||
c.done(value)
|
||||
}
|
||||
|
||||
newChild (name: string, params: Record<string, ParamType>): MeasureContext {
|
||||
return new MeasureMetricsContext(name, params, childMetrics(this.metrics, [name]))
|
||||
}
|
||||
|
@ -15,7 +15,7 @@ export const globals: Metrics = newMetrics()
|
||||
export function newMetrics (): Metrics {
|
||||
return {
|
||||
operations: 0,
|
||||
time: 0,
|
||||
value: 0,
|
||||
measurements: {},
|
||||
params: {}
|
||||
}
|
||||
@ -27,7 +27,7 @@ export function newMetrics (): Metrics {
|
||||
*/
|
||||
export function measure (metrics: Metrics, params: Record<string, ParamType>): () => void {
|
||||
const st = Date.now()
|
||||
return () => {
|
||||
return (value?: number) => {
|
||||
const ed = Date.now()
|
||||
// Update params if required
|
||||
for (const [k, v] of Object.entries(params)) {
|
||||
@ -41,15 +41,15 @@ export function measure (metrics: Metrics, params: Record<string, ParamType>): (
|
||||
if (param === undefined) {
|
||||
param = {
|
||||
operations: 0,
|
||||
time: 0
|
||||
value: 0
|
||||
}
|
||||
params[vKey] = param
|
||||
}
|
||||
param.time += ed - st
|
||||
param.value += value ?? ed - st
|
||||
param.operations++
|
||||
}
|
||||
// Update leaf data
|
||||
metrics.time += ed - st
|
||||
metrics.value += value ?? ed - st
|
||||
metrics.operations++
|
||||
}
|
||||
}
|
||||
@ -61,7 +61,7 @@ export function childMetrics (root: Metrics, path: string[]): Metrics {
|
||||
const segments = path
|
||||
let oop = root
|
||||
for (const p of segments) {
|
||||
const v = oop.measurements[p] ?? { operations: 0, time: 0, measurements: {}, params: {} }
|
||||
const v = oop.measurements[p] ?? { operations: 0, value: 0, measurements: {}, params: {} }
|
||||
oop.measurements[p] = v
|
||||
oop = v
|
||||
}
|
||||
@ -76,21 +76,24 @@ export function metricsAggregate (m: Metrics): Metrics {
|
||||
|
||||
// Use child overage, if there is no top level value specified.
|
||||
const keysLen = Object.keys(ms).length
|
||||
const childAverage = m.time === 0 && keysLen > 0
|
||||
const childAverage = m.value === 0 && keysLen > 0
|
||||
const sumVal: Metrics | undefined = childAverage
|
||||
? Object.values(ms).reduce(
|
||||
(p, v) => {
|
||||
p.operations += v.operations
|
||||
p.time += v.time
|
||||
return p
|
||||
},
|
||||
{
|
||||
operations: 0,
|
||||
time: 0,
|
||||
measurements: ms,
|
||||
params: {}
|
||||
}
|
||||
)
|
||||
? Object.entries(ms)
|
||||
.filter((it) => !it[0].startsWith('#'))
|
||||
.map((it) => it[1])
|
||||
.reduce(
|
||||
(p, v) => {
|
||||
p.operations += v.operations
|
||||
p.value += v.value
|
||||
return p
|
||||
},
|
||||
{
|
||||
operations: 0,
|
||||
value: 0,
|
||||
measurements: ms,
|
||||
params: {}
|
||||
}
|
||||
)
|
||||
: undefined
|
||||
if (sumVal !== undefined) {
|
||||
return {
|
||||
@ -107,7 +110,7 @@ export function metricsAggregate (m: Metrics): Metrics {
|
||||
|
||||
function aggregateMetrics (m: Record<string, Metrics>): Record<string, Metrics> {
|
||||
const result: Record<string, Metrics> = {}
|
||||
for (const [k, v] of Object.entries(m).sort((a, b) => b[1].time - a[1].time)) {
|
||||
for (const [k, v] of Object.entries(m).sort((a, b) => b[1].value - a[1].value)) {
|
||||
result[k] = metricsAggregate(v)
|
||||
}
|
||||
return result
|
||||
@ -140,8 +143,8 @@ function printMetricsParams (
|
||||
const joinP = (key: string, data: Record<string, MetricsData>): string[] => {
|
||||
return Object.entries(data).map(([k, vv]) =>
|
||||
`${toLen('', ' ', offset)}${toLen(key + '=' + k, '-', length - offset)}: avg ${
|
||||
vv.time / (vv.operations > 0 ? vv.operations : 1)
|
||||
} total: ${vv.time} ops: ${vv.operations}`.trim()
|
||||
vv.value / (vv.operations > 0 ? vv.operations : 1)
|
||||
} total: ${vv.value} ops: ${vv.operations}`.trim()
|
||||
)
|
||||
}
|
||||
const joinParams = Object.entries(params).reduce<string[]>((p, c) => [...p, ...joinP(c[0], c[1])], [])
|
||||
@ -154,8 +157,8 @@ function printMetricsParams (
|
||||
|
||||
function toString (name: string, m: Metrics, offset: number, length: number): string {
|
||||
let r = `${toLen('', ' ', offset)}${toLen(name, '-', length - offset)}: avg ${
|
||||
m.time / (m.operations > 0 ? m.operations : 1)
|
||||
} total: ${m.time} ops: ${m.operations}`.trim()
|
||||
m.value / (m.operations > 0 ? m.operations : 1)
|
||||
} total: ${m.value} ops: ${m.operations}`.trim()
|
||||
r += printMetricsParams(m.params, offset + 4, length)
|
||||
r += printMetricsChildren(m.measurements, offset + 4, length)
|
||||
return r
|
||||
@ -177,8 +180,8 @@ function printMetricsParamsRows (
|
||||
return Object.entries(data).map(([k, vv]) => [
|
||||
offset,
|
||||
`${key}=${k}`,
|
||||
vv.time / (vv.operations > 0 ? vv.operations : 1),
|
||||
vv.time,
|
||||
vv.value / (vv.operations > 0 ? vv.operations : 1),
|
||||
vv.value,
|
||||
vv.operations
|
||||
])
|
||||
}
|
||||
@ -198,7 +201,7 @@ function printMetricsChildrenRows (params: Record<string, Metrics>, offset: numb
|
||||
|
||||
function toStringRows (name: string, m: Metrics, offset: number): (number | string)[][] {
|
||||
const r: (number | string)[][] = [
|
||||
[offset, name, m.time / (m.operations > 0 ? m.operations : 1), m.time, m.operations]
|
||||
[offset, name, m.value / (m.operations > 0 ? m.operations : 1), m.value, m.operations]
|
||||
]
|
||||
r.push(...printMetricsParamsRows(m.params, offset + 1))
|
||||
r.push(...printMetricsChildrenRows(m.measurements, offset + 1))
|
||||
|
@ -8,7 +8,7 @@ export type ParamType = string | number | boolean | undefined
|
||||
*/
|
||||
export interface MetricsData {
|
||||
operations: number
|
||||
time: number
|
||||
value: number
|
||||
}
|
||||
|
||||
/**
|
||||
@ -37,9 +37,12 @@ export interface MeasureContext {
|
||||
|
||||
logger: MeasureLogger
|
||||
|
||||
measure: (name: string, value: number) => void
|
||||
|
||||
// Capture error
|
||||
error: (err: Error | string | any) => Promise<void>
|
||||
|
||||
// Mark current context as complete
|
||||
end: () => void
|
||||
// If no value is passed, time difference will be used.
|
||||
end: (value?: number) => void
|
||||
}
|
||||
|
@ -40,6 +40,10 @@ export interface Response<R> {
|
||||
result?: R
|
||||
id?: ReqId
|
||||
error?: Status
|
||||
chunk?: {
|
||||
index: number
|
||||
final: boolean
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -47,7 +47,7 @@ import {
|
||||
const SECOND = 1000
|
||||
const pingTimeout = 10 * SECOND
|
||||
const hangTimeout = 5 * 60 * SECOND
|
||||
const dialTimeout = 20 * SECOND
|
||||
const dialTimeout = 60 * SECOND
|
||||
|
||||
class RequestPromise {
|
||||
readonly promise: Promise<any>
|
||||
@ -60,6 +60,8 @@ class RequestPromise {
|
||||
this.reject = reject
|
||||
})
|
||||
}
|
||||
|
||||
chunks?: { index: number, data: any[] }[]
|
||||
}
|
||||
|
||||
class Connection implements ClientConnection {
|
||||
@ -192,6 +194,30 @@ class Connection implements ClientConnection {
|
||||
if (promise === undefined) {
|
||||
throw new Error(`unknown response id: ${resp.id}`)
|
||||
}
|
||||
|
||||
if (resp.chunk !== undefined) {
|
||||
promise.chunks = [
|
||||
...(promise.chunks ?? []),
|
||||
{
|
||||
index: resp.chunk.index,
|
||||
data: resp.result as []
|
||||
}
|
||||
]
|
||||
// console.log(socketId, 'chunk', promise.chunks.length, resp.chunk.total)
|
||||
if (resp.chunk.final) {
|
||||
promise.chunks.sort((a, b) => a.index - b.index)
|
||||
let result: any[] = []
|
||||
for (const c of promise.chunks) {
|
||||
result = result.concat(c.data)
|
||||
}
|
||||
resp.result = result
|
||||
resp.chunk = undefined
|
||||
} else {
|
||||
// Not all chunks are available yet.
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
this.requests.delete(resp.id)
|
||||
if (resp.error !== undefined) {
|
||||
console.log('ERROR', promise, resp.id)
|
||||
@ -228,7 +254,7 @@ class Connection implements ClientConnection {
|
||||
}
|
||||
}
|
||||
websocket.onclose = (ev) => {
|
||||
console.log('client websocket closed', socketId, ev?.reason)
|
||||
// console.log('client websocket closed', socketId, ev?.reason)
|
||||
|
||||
if (!(this.websocket instanceof Promise)) {
|
||||
this.websocket = null
|
||||
@ -237,7 +263,7 @@ class Connection implements ClientConnection {
|
||||
reject(new Error('websocket error'))
|
||||
}
|
||||
websocket.onopen = () => {
|
||||
console.log('connection opened...', socketId)
|
||||
// console.log('connection opened...', socketId)
|
||||
clearTimeout(dialTimer)
|
||||
websocket.send(
|
||||
serialize({
|
||||
|
@ -15,22 +15,21 @@
|
||||
|
||||
import login, { LoginInfo, Workspace, WorkspaceLoginInfo } from '@hcengineering/login'
|
||||
import {
|
||||
getMetadata,
|
||||
OK,
|
||||
PlatformError,
|
||||
Request,
|
||||
Response,
|
||||
serialize,
|
||||
setMetadata,
|
||||
Status,
|
||||
getMetadata,
|
||||
setMetadata,
|
||||
unknownError,
|
||||
unknownStatus
|
||||
} from '@hcengineering/platform'
|
||||
import presentation from '@hcengineering/presentation'
|
||||
import {
|
||||
Location,
|
||||
fetchMetadataLocalStorage,
|
||||
getCurrentLocation,
|
||||
Location,
|
||||
navigate,
|
||||
setMetadataLocalStorage
|
||||
} from '@hcengineering/ui'
|
||||
@ -67,7 +66,7 @@ export async function doLogin (email: string, password: string): Promise<[Status
|
||||
headers: {
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
body: serialize(request)
|
||||
body: JSON.stringify(request)
|
||||
})
|
||||
const result: Response<any> = await response.json()
|
||||
console.log('login result', result)
|
||||
@ -109,7 +108,7 @@ export async function signUp (
|
||||
headers: {
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
body: serialize(request)
|
||||
body: JSON.stringify(request)
|
||||
})
|
||||
const result: Response<any> = await response.json()
|
||||
return [result.error ?? OK, result.result]
|
||||
@ -155,7 +154,7 @@ export async function createWorkspace (workspace: string): Promise<[Status, Logi
|
||||
Authorization: 'Bearer ' + token,
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
body: serialize(request)
|
||||
body: JSON.stringify(request)
|
||||
})
|
||||
const result: Response<any> = await response.json()
|
||||
return [result.error ?? OK, result.result]
|
||||
@ -204,7 +203,7 @@ export async function getWorkspaces (): Promise<Workspace[]> {
|
||||
Authorization: 'Bearer ' + token,
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
body: serialize(request)
|
||||
body: JSON.stringify(request)
|
||||
})
|
||||
const result: Response<any> = await response.json()
|
||||
if (result.error != null) {
|
||||
@ -253,7 +252,7 @@ export async function selectWorkspace (workspace: string): Promise<[Status, Work
|
||||
Authorization: 'Bearer ' + token,
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
body: serialize(request)
|
||||
body: JSON.stringify(request)
|
||||
})
|
||||
const result: Response<any> = await response.json()
|
||||
return [result.error ?? OK, result.result]
|
||||
@ -334,7 +333,7 @@ export async function checkJoined (inviteId: string): Promise<[Status, Workspace
|
||||
Authorization: 'Bearer ' + token,
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
body: serialize(request)
|
||||
body: JSON.stringify(request)
|
||||
})
|
||||
const result: Response<any> = await response.json()
|
||||
return [result.error ?? OK, result.result]
|
||||
@ -372,7 +371,7 @@ export async function getInviteLink (expHours: number = 1, emailMask: string = '
|
||||
Authorization: 'Bearer ' + token,
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
body: serialize(request)
|
||||
body: JSON.stringify(request)
|
||||
})
|
||||
const result: Response<any> = await response.json()
|
||||
return result.result
|
||||
@ -408,7 +407,7 @@ export async function join (
|
||||
headers: {
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
body: serialize(request)
|
||||
body: JSON.stringify(request)
|
||||
})
|
||||
const result: Response<any> = await response.json()
|
||||
return [result.error ?? OK, result.result]
|
||||
@ -449,7 +448,7 @@ export async function signUpJoin (
|
||||
headers: {
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
body: serialize(request)
|
||||
body: JSON.stringify(request)
|
||||
})
|
||||
const result: Response<any> = await response.json()
|
||||
return [result.error ?? OK, result.result]
|
||||
@ -485,7 +484,7 @@ export async function changeName (first: string, last: string): Promise<void> {
|
||||
Authorization: 'Bearer ' + token,
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
body: serialize(request)
|
||||
body: JSON.stringify(request)
|
||||
})
|
||||
}
|
||||
|
||||
@ -516,7 +515,7 @@ export async function changePassword (oldPassword: string, password: string): Pr
|
||||
Authorization: 'Bearer ' + token,
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
body: serialize(request)
|
||||
body: JSON.stringify(request)
|
||||
})
|
||||
const resp = await response.json()
|
||||
if (resp.error !== undefined) {
|
||||
@ -551,7 +550,7 @@ export async function leaveWorkspace (email: string): Promise<void> {
|
||||
Authorization: 'Bearer ' + token,
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
body: serialize(request)
|
||||
body: JSON.stringify(request)
|
||||
})
|
||||
}
|
||||
|
||||
@ -582,7 +581,7 @@ export async function sendInvite (email: string): Promise<void> {
|
||||
Authorization: 'Bearer ' + token,
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
body: serialize(request)
|
||||
body: JSON.stringify(request)
|
||||
})
|
||||
}
|
||||
|
||||
@ -611,7 +610,7 @@ export async function requestPassword (email: string): Promise<Status> {
|
||||
headers: {
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
body: serialize(request)
|
||||
body: JSON.stringify(request)
|
||||
})
|
||||
const result: Response<any> = await response.json()
|
||||
return result.error ?? OK
|
||||
@ -639,7 +638,7 @@ export async function restorePassword (token: string, password: string): Promise
|
||||
Authorization: 'Bearer ' + token,
|
||||
'Content-Type': 'application/json'
|
||||
},
|
||||
body: serialize(request)
|
||||
body: JSON.stringify(request)
|
||||
})
|
||||
const result: Response<any> = await response.json()
|
||||
return [result.error ?? OK, result.result]
|
||||
|
@ -24,7 +24,20 @@
|
||||
|
||||
<Card on:close fullSize label={getEmbeddedLabel('Statistics')} okAction={() => {}} okLabel={getEmbeddedLabel('Ok')}>
|
||||
{#if data}
|
||||
{JSON.stringify(data.activeSessions, null, 2)}
|
||||
{#each Object.entries(data.statistics?.activeSessions) as act}
|
||||
{act[0]}: {act[1]}
|
||||
{/each}
|
||||
|
||||
<span class="fs-title flex-row-center">
|
||||
Memory usage: {data.statistics.memoryUsed} / {data.statistics.memoryTotal}
|
||||
</span>
|
||||
<span class="fs-title flex-row-center">
|
||||
CPU: {data.statistics.cpuUsage}
|
||||
</span>
|
||||
<span class="fs-title flex-row-center">
|
||||
Mem: {data.statistics.freeMem} / {data.statistics.totalMem}
|
||||
</span>
|
||||
|
||||
<table class="antiTable" class:highlightRows={true}>
|
||||
<thead class="scroller-thead">
|
||||
<tr>
|
||||
|
@ -1,8 +1,10 @@
|
||||
FROM node:18
|
||||
|
||||
RUN apt install git
|
||||
WORKDIR /usr/src/app
|
||||
|
||||
COPY bundle.js ./
|
||||
# COPY ./dist/*.node ./
|
||||
|
||||
EXPOSE 8080
|
||||
CMD [ "node", "bundle.js" ]
|
||||
|
@ -5,10 +5,11 @@
|
||||
"author": "Anticrm Platform Contributors",
|
||||
"license": "EPL-2.0",
|
||||
"scripts": {
|
||||
"start": "cross-env MONGO_URL=mongodb://localhost:27017 ELASTIC_URL=http://localhost:9200 MINIO_ENDPOINT=localhost MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin METRICS_CONSOLE=true SERVER_SECRET=secret REKONI_URL=http://localhost:4004 FRONT_URL=http://localhost:8080 ts-node src/__start.ts",
|
||||
"start": "cross-env MONGO_URL=mongodb://localhost:27017 ELASTIC_URL=http://localhost:9200 MINIO_ENDPOINT=localhost MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin METRICS_CONSOLE=false SERVER_SECRET=secret REKONI_URL=http://localhost:4004 FRONT_URL=http://localhost:8080 node --nolazy -r ts-node/register src/__start.ts",
|
||||
"build": "heft build",
|
||||
"lint:fix": "eslint --fix src",
|
||||
"bundle": "esbuild src/__start.ts --bundle --sourcemap=inline --minify --platform=node > bundle.js",
|
||||
"bundle:u": "esbuild src/__start.ts --bundle --sourcemap=inline --minify --platform=node > bundle.js && mkdir -p ./dist && cp -r ./node_modules/uWebSockets.js/*.node ./dist",
|
||||
"docker:build": "docker build -t hardcoreeng/transactor .",
|
||||
"docker:staging": "../../common/scripts/docker_tag.sh hardcoreeng/transactor staging",
|
||||
"docker:push": "../../common/scripts/docker_tag.sh hardcoreeng/transactor",
|
||||
@ -34,7 +35,7 @@
|
||||
"eslint-config-standard-with-typescript": "^23.0.0",
|
||||
"prettier": "^2.7.1",
|
||||
"@rushstack/heft": "^0.47.9",
|
||||
"typescript": "^4.3.5"
|
||||
"typescript": "^4.3.5"
|
||||
},
|
||||
"dependencies": {
|
||||
"@hcengineering/core": "^0.6.23",
|
||||
@ -137,5 +138,8 @@
|
||||
"@hcengineering/document": "^0.6.0",
|
||||
"@hcengineering/bitrix": "^0.6.34",
|
||||
"@hcengineering/request": "^0.6.0"
|
||||
},
|
||||
"optionalDependencies": {
|
||||
"uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.23.0"
|
||||
}
|
||||
}
|
||||
|
@ -16,12 +16,17 @@
|
||||
|
||||
// Add this to the VERY top of the first file loaded in your app
|
||||
import { setMetadata } from '@hcengineering/platform'
|
||||
import serverToken from '@hcengineering/server-token'
|
||||
import serverCore from '@hcengineering/server-core'
|
||||
import serverToken from '@hcengineering/server-token'
|
||||
import { serverFactories } from '@hcengineering/server-ws'
|
||||
import { start } from '.'
|
||||
|
||||
const serverPort = parseInt(process.env.SERVER_PORT ?? '3333')
|
||||
|
||||
const serverFactory = serverFactories[(process.env.SERVER_PROVIDER as string) ?? 'ws'] ?? serverFactories.ws
|
||||
|
||||
const serverChinking = parseInt(process.env.CHUNKING ?? '101')
|
||||
|
||||
const url = process.env.MONGO_URL
|
||||
if (url === undefined) {
|
||||
console.error('please provide mongodb url')
|
||||
@ -81,7 +86,17 @@ setMetadata(serverToken.metadata.Secret, serverSecret)
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-floating-promises
|
||||
console.log(`starting server on ${serverPort}`)
|
||||
const shutdown = start(url, elasticUrl, minioConf, rekoniUrl, serverPort, '')
|
||||
const shutdown = start(url, {
|
||||
fullTextUrl: elasticUrl,
|
||||
minioConf,
|
||||
rekoniUrl,
|
||||
port: serverPort,
|
||||
serverFactory,
|
||||
chunking: serverChinking,
|
||||
indexParallel: 2,
|
||||
indexProcessing: 500,
|
||||
productId: ''
|
||||
})
|
||||
|
||||
const close = (): void => {
|
||||
console.trace('Exiting from server')
|
||||
|
@ -29,6 +29,7 @@ import {
|
||||
ConfigurationMiddleware,
|
||||
ModifiedMiddleware,
|
||||
PrivateMiddleware,
|
||||
QueryJoinMiddleware,
|
||||
SpaceSecurityMiddleware
|
||||
} from '@hcengineering/middleware'
|
||||
import { MinioService } from '@hcengineering/minio'
|
||||
@ -76,7 +77,14 @@ import { serverTelegramId } from '@hcengineering/server-telegram'
|
||||
import { Token } from '@hcengineering/server-token'
|
||||
import { serverTrackerId } from '@hcengineering/server-tracker'
|
||||
import { serverViewId } from '@hcengineering/server-view'
|
||||
import { BroadcastCall, ClientSession, start as startJsonRpc } from '@hcengineering/server-ws'
|
||||
import {
|
||||
BroadcastCall,
|
||||
ClientSession,
|
||||
PipelineFactory,
|
||||
ServerFactory,
|
||||
Session,
|
||||
start as startJsonRpc
|
||||
} from '@hcengineering/server-ws'
|
||||
|
||||
import { activityId } from '@hcengineering/activity'
|
||||
import { attachmentId } from '@hcengineering/attachment'
|
||||
@ -168,12 +176,18 @@ addStringsLoader(requestId, async (lang: string) => requestEn)
|
||||
*/
|
||||
export function start (
|
||||
dbUrl: string,
|
||||
fullTextUrl: string,
|
||||
minioConf: MinioConfig,
|
||||
rekoniUrl: string,
|
||||
port: number,
|
||||
productId: string,
|
||||
host?: string
|
||||
opt: {
|
||||
fullTextUrl: string
|
||||
minioConf: MinioConfig
|
||||
rekoniUrl: string
|
||||
port: number
|
||||
productId: string
|
||||
serverFactory: ServerFactory
|
||||
chunking: number // 25
|
||||
|
||||
indexProcessing: number // 1000
|
||||
indexParallel: number // 2
|
||||
}
|
||||
): () => Promise<void> {
|
||||
addLocation(serverAttachmentId, () => import('@hcengineering/server-attachment-resources'))
|
||||
addLocation(serverContactId, () => import('@hcengineering/server-contact-resources'))
|
||||
@ -198,7 +212,8 @@ export function start (
|
||||
ModifiedMiddleware.create,
|
||||
PrivateMiddleware.create,
|
||||
SpaceSecurityMiddleware.create,
|
||||
ConfigurationMiddleware.create
|
||||
ConfigurationMiddleware.create,
|
||||
QueryJoinMiddleware.create // Should be last one
|
||||
]
|
||||
|
||||
const metrics = getMetricsContext().newChild('indexing', {})
|
||||
@ -211,8 +226,8 @@ export function start (
|
||||
contentAdapter: ContentTextAdapter
|
||||
): FullTextPipelineStage[] {
|
||||
// Allow 2 workspaces to be indexed in parallel
|
||||
globalIndexer.allowParallel = 2
|
||||
globalIndexer.processingSize = 1000
|
||||
globalIndexer.allowParallel = opt.indexParallel
|
||||
globalIndexer.processingSize = opt.indexProcessing
|
||||
|
||||
const stages: FullTextPipelineStage[] = []
|
||||
|
||||
@ -252,80 +267,77 @@ export function start (
|
||||
return stages
|
||||
}
|
||||
|
||||
return startJsonRpc(
|
||||
getMetricsContext(),
|
||||
(ctx, workspace, upgrade, broadcast) => {
|
||||
const conf: DbConfiguration = {
|
||||
domains: {
|
||||
[DOMAIN_TX]: 'MongoTx',
|
||||
[DOMAIN_TRANSIENT]: 'InMemory',
|
||||
[DOMAIN_BLOB]: 'MinioData',
|
||||
[DOMAIN_FULLTEXT_BLOB]: 'FullTextBlob',
|
||||
[DOMAIN_MODEL]: 'Null'
|
||||
const pipelineFactory: PipelineFactory = (ctx, workspace, upgrade, broadcast) => {
|
||||
const conf: DbConfiguration = {
|
||||
domains: {
|
||||
[DOMAIN_TX]: 'MongoTx',
|
||||
[DOMAIN_TRANSIENT]: 'InMemory',
|
||||
[DOMAIN_BLOB]: 'MinioData',
|
||||
[DOMAIN_FULLTEXT_BLOB]: 'FullTextBlob',
|
||||
[DOMAIN_MODEL]: 'Null'
|
||||
},
|
||||
metrics,
|
||||
defaultAdapter: 'Mongo',
|
||||
adapters: {
|
||||
MongoTx: {
|
||||
factory: createMongoTxAdapter,
|
||||
url: dbUrl
|
||||
},
|
||||
metrics,
|
||||
defaultAdapter: 'Mongo',
|
||||
adapters: {
|
||||
MongoTx: {
|
||||
factory: createMongoTxAdapter,
|
||||
url: dbUrl
|
||||
},
|
||||
Mongo: {
|
||||
factory: createMongoAdapter,
|
||||
url: dbUrl
|
||||
},
|
||||
Null: {
|
||||
factory: createNullAdapter,
|
||||
url: ''
|
||||
},
|
||||
InMemory: {
|
||||
factory: createInMemoryAdapter,
|
||||
url: ''
|
||||
},
|
||||
MinioData: {
|
||||
factory: createMinioDataAdapter,
|
||||
url: ''
|
||||
},
|
||||
FullTextBlob: {
|
||||
factory: createElasticBackupDataAdapter,
|
||||
url: fullTextUrl
|
||||
}
|
||||
Mongo: {
|
||||
factory: createMongoAdapter,
|
||||
url: dbUrl
|
||||
},
|
||||
fulltextAdapter: {
|
||||
factory: createElasticAdapter,
|
||||
url: fullTextUrl,
|
||||
stages: (adapter, storage, storageAdapter, contentAdapter) =>
|
||||
createIndexStages(
|
||||
metrics.newChild('stages', {}),
|
||||
workspace,
|
||||
adapter,
|
||||
storage,
|
||||
storageAdapter,
|
||||
contentAdapter
|
||||
)
|
||||
Null: {
|
||||
factory: createNullAdapter,
|
||||
url: ''
|
||||
},
|
||||
contentAdapter: {
|
||||
factory: createRekoniAdapter,
|
||||
url: rekoniUrl
|
||||
InMemory: {
|
||||
factory: createInMemoryAdapter,
|
||||
url: ''
|
||||
},
|
||||
storageFactory: () =>
|
||||
new MinioService({
|
||||
...minioConf,
|
||||
port: 9000,
|
||||
useSSL: false
|
||||
}),
|
||||
workspace
|
||||
}
|
||||
return createPipeline(ctx, conf, middlewares, upgrade, broadcast)
|
||||
},
|
||||
(token: Token, pipeline: Pipeline, broadcast: BroadcastCall) => {
|
||||
if (token.extra?.mode === 'backup') {
|
||||
return new BackupClientSession(broadcast, token, pipeline)
|
||||
}
|
||||
return new ClientSession(broadcast, token, pipeline)
|
||||
},
|
||||
port,
|
||||
productId,
|
||||
host
|
||||
)
|
||||
MinioData: {
|
||||
factory: createMinioDataAdapter,
|
||||
url: ''
|
||||
},
|
||||
FullTextBlob: {
|
||||
factory: createElasticBackupDataAdapter,
|
||||
url: opt.fullTextUrl
|
||||
}
|
||||
},
|
||||
fulltextAdapter: {
|
||||
factory: createElasticAdapter,
|
||||
url: opt.fullTextUrl,
|
||||
stages: (adapter, storage, storageAdapter, contentAdapter) =>
|
||||
createIndexStages(metrics.newChild('stages', {}), workspace, adapter, storage, storageAdapter, contentAdapter)
|
||||
},
|
||||
contentAdapter: {
|
||||
factory: createRekoniAdapter,
|
||||
url: opt.rekoniUrl
|
||||
},
|
||||
storageFactory: () =>
|
||||
new MinioService({
|
||||
...opt.minioConf,
|
||||
port: 9000,
|
||||
useSSL: false
|
||||
}),
|
||||
workspace
|
||||
}
|
||||
return createPipeline(ctx, conf, middlewares, upgrade, broadcast)
|
||||
}
|
||||
|
||||
const sessionFactory = (token: Token, pipeline: Pipeline, broadcast: BroadcastCall): Session => {
|
||||
if (token.extra?.mode === 'backup') {
|
||||
return new BackupClientSession(broadcast, token, pipeline)
|
||||
}
|
||||
return new ClientSession(broadcast, token, pipeline)
|
||||
}
|
||||
|
||||
return startJsonRpc(getMetricsContext(), {
|
||||
pipelineFactory,
|
||||
sessionFactory,
|
||||
port: opt.port,
|
||||
productId: opt.productId,
|
||||
chunking: opt.chunking,
|
||||
serverFactory: opt.serverFactory
|
||||
})
|
||||
}
|
||||
|
@ -20,4 +20,5 @@ export * from './fulltext'
|
||||
export * from './storage'
|
||||
export * from './pipeline'
|
||||
export * from './indexer'
|
||||
export * from './limitter'
|
||||
export { default, serverCoreId } from './plugin'
|
||||
|
@ -158,9 +158,11 @@ export class IndexedFieldStage implements FullTextPipelineStage {
|
||||
if (propagate.length > 0) {
|
||||
// We need to propagate all changes to all child's of following classes.
|
||||
if (allChildDocs === undefined) {
|
||||
allChildDocs = await this.dbStorage.findAll(metrics.newChild('propagate', {}), core.class.DocIndexState, {
|
||||
const pc = metrics.newChild('propagate', {})
|
||||
allChildDocs = await this.dbStorage.findAll(pc, core.class.DocIndexState, {
|
||||
attachedTo: { $in: docs.map((it) => it._id) }
|
||||
})
|
||||
pc.end()
|
||||
}
|
||||
const childs = allChildDocs.filter((it) => it.attachedTo === docState._id)
|
||||
for (const u of childs) {
|
||||
|
@ -102,9 +102,14 @@ export class FullTextPushStage implements FullTextPipelineStage {
|
||||
while (part.length > 0) {
|
||||
const toIndexPart = part.splice(0, 1000)
|
||||
|
||||
const allChildDocs = await this.dbStorage.findAll(metrics.newChild('find-child', {}), core.class.DocIndexState, {
|
||||
attachedTo: { $in: toIndexPart.map((it) => it._id) }
|
||||
})
|
||||
const allChildDocs = await metrics.with(
|
||||
'find-child',
|
||||
{},
|
||||
async (ctx) =>
|
||||
await this.dbStorage.findAll(ctx, core.class.DocIndexState, {
|
||||
attachedTo: { $in: toIndexPart.map((it) => it._id) }
|
||||
})
|
||||
)
|
||||
|
||||
for (const doc of toIndexPart) {
|
||||
if (pipeline.cancelling) {
|
||||
@ -133,10 +138,13 @@ export class FullTextPushStage implements FullTextPipelineStage {
|
||||
const propagate: Ref<Class<Doc>>[] = collectPropagate(pipeline, doc.attachedToClass)
|
||||
if (propagate.some((it) => pipeline.hierarchy.isDerived(doc.objectClass, it))) {
|
||||
// We need to include all parent content into this one.
|
||||
const [parentDoc] = await this.dbStorage.findAll(
|
||||
metrics.newChild('propagate', {}),
|
||||
core.class.DocIndexState,
|
||||
{ _id: doc.attachedTo as Ref<DocIndexState> }
|
||||
const [parentDoc] = await metrics.with(
|
||||
'find-parent',
|
||||
{},
|
||||
async (ctx) =>
|
||||
await this.dbStorage.findAll(ctx, core.class.DocIndexState, {
|
||||
_id: doc.attachedTo as Ref<DocIndexState>
|
||||
})
|
||||
)
|
||||
if (parentDoc !== undefined) {
|
||||
updateDoc2Elastic(parentDoc.attributes, elasticDoc, parentDoc._id)
|
||||
|
@ -20,4 +20,3 @@ export * from './types'
|
||||
export * from './utils'
|
||||
export * from './fulltextPush'
|
||||
export * from './summary'
|
||||
export * from './limitter'
|
||||
|
@ -33,7 +33,7 @@ import core, {
|
||||
} from '@hcengineering/core'
|
||||
import { DbAdapter } from '../adapter'
|
||||
import type { IndexedDoc } from '../types'
|
||||
import { RateLimitter } from './limitter'
|
||||
import { RateLimitter } from '../limitter'
|
||||
import { FullTextPipeline, FullTextPipelineStage } from './types'
|
||||
import { createStateDoc, isClassIndexable } from './utils'
|
||||
|
||||
|
@ -92,10 +92,13 @@ export class FullSummaryStage implements FullTextPipelineStage {
|
||||
while (part.length > 0) {
|
||||
const toIndexPart = part.splice(0, 1000)
|
||||
|
||||
const allChildDocs = await this.dbStorage.findAll(
|
||||
metrics.newChild('fulltext-find-child', {}),
|
||||
core.class.DocIndexState,
|
||||
{ attachedTo: { $in: toIndexPart.map((it) => it._id) } }
|
||||
const allChildDocs = await metrics.with(
|
||||
'find-child',
|
||||
{},
|
||||
async (ctx) =>
|
||||
await this.dbStorage.findAll(ctx, core.class.DocIndexState, {
|
||||
attachedTo: { $in: toIndexPart.map((it) => it._id) }
|
||||
})
|
||||
)
|
||||
|
||||
for (const doc of toIndexPart) {
|
||||
|
@ -5,15 +5,17 @@
|
||||
export class RateLimitter {
|
||||
idCounter: number = 0
|
||||
processingQueue = new Map<string, Promise<void>>()
|
||||
last: number = 0
|
||||
|
||||
queue: (() => Promise<void>)[] = []
|
||||
|
||||
constructor (readonly config: () => { rate: number }) {}
|
||||
constructor (readonly config: () => { rate: number, perSecond?: number }) {}
|
||||
|
||||
async exec<T, B extends Record<string, any> = {}>(op: (args?: B) => Promise<T>, args?: B): Promise<T> {
|
||||
const processingId = `${this.idCounter++}`
|
||||
const cfg = this.config()
|
||||
|
||||
if (this.processingQueue.size > this.config().rate) {
|
||||
if (this.processingQueue.size > cfg.rate) {
|
||||
await Promise.race(this.processingQueue.values())
|
||||
}
|
||||
try {
|
||||
@ -26,7 +28,9 @@ export class RateLimitter {
|
||||
}
|
||||
|
||||
async add<T, B extends Record<string, any> = {}>(op: (args?: B) => Promise<T>, args?: B): Promise<void> {
|
||||
if (this.processingQueue.size < this.config().rate) {
|
||||
const cfg = this.config()
|
||||
|
||||
if (this.processingQueue.size < cfg.rate) {
|
||||
void this.exec(op, args)
|
||||
} else {
|
||||
await this.exec(op, args)
|
@ -28,7 +28,7 @@ import {
|
||||
Tx,
|
||||
TxResult
|
||||
} from '@hcengineering/core'
|
||||
import { createServerStorage, DbConfiguration } from './storage'
|
||||
import { DbConfiguration, createServerStorage } from './storage'
|
||||
import { Middleware, MiddlewareCreator, Pipeline, SessionContext } from './types'
|
||||
|
||||
/**
|
||||
|
@ -29,6 +29,7 @@
|
||||
"@hcengineering/core": "^0.6.23",
|
||||
"@hcengineering/platform": "^0.6.8",
|
||||
"@hcengineering/server-core": "^0.6.1",
|
||||
"@hcengineering/server-preference": "^0.6.0"
|
||||
"@hcengineering/server-preference": "^0.6.0",
|
||||
"fast-equals": "^2.0.3"
|
||||
}
|
||||
}
|
||||
|
@ -14,7 +14,8 @@
|
||||
//
|
||||
|
||||
export * from './base'
|
||||
export * from './configuration'
|
||||
export * from './modified'
|
||||
export * from './private'
|
||||
export * from './configuration'
|
||||
export * from './queryJoin'
|
||||
export * from './spaceSecurity'
|
||||
|
125
server/middleware/src/queryJoin.ts
Normal file
125
server/middleware/src/queryJoin.ts
Normal file
@ -0,0 +1,125 @@
|
||||
//
|
||||
// Copyright © 2022 Hardcore Engineering Inc.
|
||||
//
|
||||
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License. You may
|
||||
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
//
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import {
|
||||
Class,
|
||||
Doc,
|
||||
DocumentQuery,
|
||||
FindOptions,
|
||||
FindResult,
|
||||
MeasureContext,
|
||||
Ref,
|
||||
ServerStorage,
|
||||
Tx
|
||||
} from '@hcengineering/core'
|
||||
import { Middleware, SessionContext, TxMiddlewareResult } from '@hcengineering/server-core'
|
||||
import { BaseMiddleware } from './base'
|
||||
|
||||
import { deepEqual } from 'fast-equals'
|
||||
|
||||
interface Query {
|
||||
_class: Ref<Class<Doc>>
|
||||
query: DocumentQuery<Doc>
|
||||
result: FindResult<Doc> | Promise<FindResult<Doc>> | undefined
|
||||
options?: FindOptions<Doc>
|
||||
callbacks: number
|
||||
max: number
|
||||
}
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export class QueryJoinMiddleware extends BaseMiddleware implements Middleware {
|
||||
private readonly queries: Map<Ref<Class<Doc>>, Query[]> = new Map<Ref<Class<Doc>>, Query[]>()
|
||||
|
||||
private constructor (storage: ServerStorage, next?: Middleware) {
|
||||
super(storage, next)
|
||||
}
|
||||
|
||||
static async create (ctx: MeasureContext, storage: ServerStorage, next?: Middleware): Promise<QueryJoinMiddleware> {
|
||||
return new QueryJoinMiddleware(storage, next)
|
||||
}
|
||||
|
||||
async tx (ctx: SessionContext, tx: Tx): Promise<TxMiddlewareResult> {
|
||||
return await this.provideTx(ctx, tx)
|
||||
}
|
||||
|
||||
override async findAll<T extends Doc>(
|
||||
ctx: SessionContext,
|
||||
_class: Ref<Class<T>>,
|
||||
query: DocumentQuery<T>,
|
||||
options?: FindOptions<T>
|
||||
): Promise<FindResult<T>> {
|
||||
// Will find a query or add + 1 to callbacks
|
||||
const q = this.findQuery(_class, query, options) ?? this.createQuery(_class, query, options)
|
||||
if (q.result === undefined) {
|
||||
q.result = this.provideFindAll(ctx, _class, query, options)
|
||||
}
|
||||
if (q.result instanceof Promise) {
|
||||
const st = Date.now()
|
||||
q.result = await q.result
|
||||
const ed = Date.now()
|
||||
if (q.callbacks > 1) {
|
||||
console.log('QUERY Wait', _class, JSON.stringify(query), ed - st, q.callbacks, q.max)
|
||||
}
|
||||
q.callbacks--
|
||||
}
|
||||
this.removeFromQueue(q)
|
||||
|
||||
return q.result as FindResult<T>
|
||||
}
|
||||
|
||||
private findQuery<T extends Doc>(
|
||||
_class: Ref<Class<T>>,
|
||||
query: DocumentQuery<T>,
|
||||
options?: FindOptions<T>
|
||||
): Query | undefined {
|
||||
const queries = this.queries.get(_class)
|
||||
if (queries === undefined) return
|
||||
for (const q of queries) {
|
||||
if (!deepEqual(query, q.query) || !deepEqual(options, q.options)) {
|
||||
continue
|
||||
}
|
||||
q.callbacks++
|
||||
q.max++
|
||||
return q
|
||||
}
|
||||
}
|
||||
|
||||
private createQuery<T extends Doc>(_class: Ref<Class<T>>, query: DocumentQuery<T>, options?: FindOptions<T>): Query {
|
||||
const queries = this.queries.get(_class) ?? []
|
||||
const q: Query = {
|
||||
_class,
|
||||
query,
|
||||
result: undefined,
|
||||
options: options as FindOptions<Doc>,
|
||||
callbacks: 1,
|
||||
max: 1
|
||||
}
|
||||
|
||||
queries.push(q)
|
||||
this.queries.set(_class, queries)
|
||||
return q
|
||||
}
|
||||
|
||||
private removeFromQueue (q: Query): void {
|
||||
if (q.callbacks === 0) {
|
||||
const queries = this.queries.get(q._class) ?? []
|
||||
this.queries.set(
|
||||
q._class,
|
||||
queries.filter((it) => it !== q)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
@ -522,7 +522,10 @@ abstract class MongoAdapterBase implements DbAdapter {
|
||||
const domain = this.hierarchy.getDomain(_class)
|
||||
const coll = this.db.collection(domain)
|
||||
const mongoQuery = this.translateQuery(_class, query)
|
||||
let cursor = coll.find<T>(mongoQuery)
|
||||
let cursor = coll.find<T>(mongoQuery, {
|
||||
checkKeys: false,
|
||||
enableUtf8Validation: false
|
||||
})
|
||||
|
||||
if (options?.projection !== undefined) {
|
||||
const projection: Projection<T> = {}
|
||||
|
@ -59,6 +59,8 @@ export class APMMeasureContext implements MeasureContext {
|
||||
return new APMMeasureContext(this.agent, name, params, this.transaction)
|
||||
}
|
||||
|
||||
measure (name: string, value: number): void {}
|
||||
|
||||
async with<T>(
|
||||
name: string,
|
||||
params: Record<string, ParamType>,
|
||||
|
@ -34,5 +34,8 @@
|
||||
"@hcengineering/core": "^0.6.23",
|
||||
"@hcengineering/server-core": "^0.6.1",
|
||||
"@hcengineering/server-token": "^0.6.2"
|
||||
},
|
||||
"optionalDependencies": {
|
||||
"uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.23.0"
|
||||
}
|
||||
}
|
||||
|
@ -17,7 +17,7 @@
|
||||
import { readResponse, serialize, UNAUTHORIZED } from '@hcengineering/platform'
|
||||
import { generateToken } from '@hcengineering/server-token'
|
||||
import WebSocket from 'ws'
|
||||
import { disableLogging, start } from '../server'
|
||||
import { start } from '../server'
|
||||
|
||||
import {
|
||||
Account,
|
||||
@ -40,6 +40,8 @@ import {
|
||||
} from '@hcengineering/core'
|
||||
import { SessionContext } from '@hcengineering/server-core'
|
||||
import { ClientSession } from '../client'
|
||||
import { startHttpServer } from '../server_http'
|
||||
import { disableLogging } from '../types'
|
||||
import { genMinModel } from './minmodel'
|
||||
|
||||
describe('server', () => {
|
||||
@ -58,9 +60,8 @@ describe('server', () => {
|
||||
return modelDb
|
||||
}
|
||||
|
||||
const cancelOp = start(
|
||||
new MeasureMetricsContext('test', {}),
|
||||
async () => ({
|
||||
const cancelOp = start(new MeasureMetricsContext('test', {}), {
|
||||
pipelineFactory: async () => ({
|
||||
modelDb: await getModelDb(),
|
||||
findAll: async <T extends Doc>(
|
||||
ctx: SessionContext,
|
||||
@ -80,10 +81,12 @@ describe('server', () => {
|
||||
upload: async (domain: Domain, docs: Doc[]) => {},
|
||||
clean: async (domain: Domain, docs: Ref<Doc>[]) => {}
|
||||
}),
|
||||
(token, pipeline, broadcast) => new ClientSession(broadcast, token, pipeline),
|
||||
3335,
|
||||
''
|
||||
)
|
||||
sessionFactory: (token, pipeline, broadcast) => new ClientSession(broadcast, token, pipeline),
|
||||
port: 3335,
|
||||
productId: '',
|
||||
serverFactory: startHttpServer,
|
||||
chunking: -1
|
||||
})
|
||||
|
||||
function connect (): WebSocket {
|
||||
const token: string = generateToken('', getWorkspaceId('latest', ''))
|
||||
@ -143,9 +146,8 @@ describe('server', () => {
|
||||
})
|
||||
|
||||
it('reconnect', async () => {
|
||||
const cancelOp = start(
|
||||
new MeasureMetricsContext('test', {}),
|
||||
async () => ({
|
||||
const cancelOp = start(new MeasureMetricsContext('test', {}), {
|
||||
pipelineFactory: async () => ({
|
||||
modelDb: await getModelDb(),
|
||||
findAll: async <T extends Doc>(
|
||||
ctx: SessionContext,
|
||||
@ -175,10 +177,12 @@ describe('server', () => {
|
||||
upload: async (domain: Domain, docs: Doc[]) => {},
|
||||
clean: async (domain: Domain, docs: Ref<Doc>[]) => {}
|
||||
}),
|
||||
(token, pipeline, broadcast) => new ClientSession(broadcast, token, pipeline),
|
||||
3336,
|
||||
''
|
||||
)
|
||||
sessionFactory: (token, pipeline, broadcast) => new ClientSession(broadcast, token, pipeline),
|
||||
port: 3336,
|
||||
productId: '',
|
||||
serverFactory: startHttpServer,
|
||||
chunking: -1
|
||||
})
|
||||
|
||||
async function findClose (token: string, timeoutPromise: Promise<void>, code: number): Promise<string> {
|
||||
const newConn = new WebSocket(`ws://localhost:3336/${token}?sessionId=s1`)
|
||||
|
@ -24,14 +24,16 @@ import {
|
||||
Tx,
|
||||
TxResult
|
||||
} from '@hcengineering/core'
|
||||
import type { Pipeline, SessionContext } from '@hcengineering/server-core'
|
||||
import { Pipeline, SessionContext } from '@hcengineering/server-core'
|
||||
import { Token } from '@hcengineering/server-token'
|
||||
import { BroadcastCall, Session } from './types'
|
||||
import { BroadcastCall, Session, SessionRequest } from './types'
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export class ClientSession implements Session {
|
||||
requests: Map<string, SessionRequest> = new Map()
|
||||
|
||||
constructor (
|
||||
protected readonly broadcast: BroadcastCall,
|
||||
protected readonly token: Token,
|
||||
|
@ -14,6 +14,19 @@
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import { startHttpServer } from './server_http'
|
||||
// import { startUWebsocketServer } from './server_u'
|
||||
import { ServerFactory } from './types'
|
||||
|
||||
export { start } from './server'
|
||||
export * from './types'
|
||||
export * from './client'
|
||||
export * from './server_http'
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export const serverFactories: Record<string, ServerFactory> = {
|
||||
ws: startHttpServer
|
||||
// uweb: startUWebsocketServer
|
||||
}
|
||||
|
@ -14,28 +14,29 @@
|
||||
//
|
||||
|
||||
import core, {
|
||||
metricsAggregate,
|
||||
generateId,
|
||||
MeasureContext,
|
||||
Ref,
|
||||
Space,
|
||||
toWorkspaceString,
|
||||
Tx,
|
||||
TxFactory,
|
||||
WorkspaceId
|
||||
WorkspaceId,
|
||||
generateId,
|
||||
toWorkspaceString
|
||||
} from '@hcengineering/core'
|
||||
import { readRequest, Response, serialize, UNAUTHORIZED, unknownError } from '@hcengineering/platform'
|
||||
import { Response, readRequest, unknownError } from '@hcengineering/platform'
|
||||
import type { Pipeline, SessionContext } from '@hcengineering/server-core'
|
||||
import { decodeToken, Token } from '@hcengineering/server-token'
|
||||
import { createServer, IncomingMessage, ServerResponse } from 'http'
|
||||
import WebSocket, { RawData, WebSocketServer } from 'ws'
|
||||
import { BroadcastCall, PipelineFactory, Session } from './types'
|
||||
import { Token } from '@hcengineering/server-token'
|
||||
// import WebSocket, { RawData } from 'ws'
|
||||
|
||||
let LOGGING_ENABLED = true
|
||||
|
||||
export function disableLogging (): void {
|
||||
LOGGING_ENABLED = false
|
||||
}
|
||||
import {
|
||||
BroadcastCall,
|
||||
ConnectionSocket,
|
||||
LOGGING_ENABLED,
|
||||
PipelineFactory,
|
||||
ServerFactory,
|
||||
Session,
|
||||
SessionManager
|
||||
} from './types'
|
||||
|
||||
function timeoutPromise (time: number): Promise<void> {
|
||||
return new Promise((resolve) => {
|
||||
@ -46,15 +47,35 @@ function timeoutPromise (time: number): Promise<void> {
|
||||
interface Workspace {
|
||||
id: string
|
||||
pipeline: Promise<Pipeline>
|
||||
sessions: [Session, WebSocket][]
|
||||
sessions: [Session, ConnectionSocket][]
|
||||
upgrade: boolean
|
||||
closing?: Promise<void>
|
||||
}
|
||||
|
||||
class SessionManager {
|
||||
class TSessionManager implements SessionManager {
|
||||
readonly workspaces = new Map<string, Workspace>()
|
||||
checkInterval: any
|
||||
|
||||
constructor (readonly sessionFactory: (token: Token, pipeline: Pipeline, broadcast: BroadcastCall) => Session) {}
|
||||
constructor (
|
||||
readonly ctx: MeasureContext,
|
||||
readonly sessionFactory: (token: Token, pipeline: Pipeline, broadcast: BroadcastCall) => Session
|
||||
) {
|
||||
this.checkInterval = setInterval(() => this.handleInterval(), 1000)
|
||||
}
|
||||
|
||||
handleInterval (): void {
|
||||
for (const h of this.workspaces.entries()) {
|
||||
for (const s of h[1].sessions) {
|
||||
for (const r of s[0].requests.values()) {
|
||||
const ed = Date.now()
|
||||
|
||||
if (ed - r.start > 30000) {
|
||||
console.log(h[0], 'request hang found, 30sec', h[0], s[0].getUser(), r.params)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
createSession (token: Token, pipeline: Pipeline): Session {
|
||||
return this.sessionFactory(token, pipeline, this.broadcast.bind(this))
|
||||
@ -64,90 +85,97 @@ class SessionManager {
|
||||
|
||||
async addSession (
|
||||
ctx: MeasureContext,
|
||||
ws: WebSocket,
|
||||
ws: ConnectionSocket,
|
||||
token: Token,
|
||||
pipelineFactory: PipelineFactory,
|
||||
productId: string,
|
||||
sessionId?: string
|
||||
): Promise<Session> {
|
||||
const wsString = toWorkspaceString(token.workspace, '@')
|
||||
return await ctx.with('add-session', {}, async (ctx) => {
|
||||
const wsString = toWorkspaceString(token.workspace, '@')
|
||||
|
||||
let workspace = this.workspaces.get(wsString)
|
||||
await workspace?.closing
|
||||
workspace = this.workspaces.get(wsString)
|
||||
let workspace = this.workspaces.get(wsString)
|
||||
await workspace?.closing
|
||||
workspace = this.workspaces.get(wsString)
|
||||
|
||||
if (workspace === undefined) {
|
||||
workspace = this.createWorkspace(ctx, pipelineFactory, token)
|
||||
}
|
||||
|
||||
if (token.extra?.model === 'upgrade') {
|
||||
if (LOGGING_ENABLED) console.log(token.workspace.name, 'reloading workspace', JSON.stringify(token))
|
||||
this.upgradeId = sessionId
|
||||
// If upgrade client is used.
|
||||
// Drop all existing clients
|
||||
await this.closeAll(ctx, wsString, workspace, 0, 'upgrade')
|
||||
// Wipe workspace and update values.
|
||||
if (!workspace.upgrade) {
|
||||
// This is previous workspace, intended to be closed.
|
||||
workspace.id = generateId()
|
||||
workspace.sessions = []
|
||||
workspace.upgrade = token.extra?.model === 'upgrade'
|
||||
if (workspace === undefined) {
|
||||
workspace = this.createWorkspace(ctx, pipelineFactory, token)
|
||||
}
|
||||
if (LOGGING_ENABLED) console.log(token.workspace.name, 'no sessions for workspace', wsString)
|
||||
// Re-create pipeline.
|
||||
workspace.pipeline = pipelineFactory(ctx, token.workspace, true, (tx) =>
|
||||
this.broadcastAll(workspace as Workspace, tx)
|
||||
)
|
||||
|
||||
const pipeline = await workspace.pipeline
|
||||
if (token.extra?.model === 'upgrade') {
|
||||
return await this.createUpgradeSession(token, sessionId, ctx, wsString, workspace, pipelineFactory, ws)
|
||||
}
|
||||
|
||||
if (workspace.upgrade && sessionId !== this.upgradeId) {
|
||||
ws.close()
|
||||
throw new Error('Upgrade in progress....')
|
||||
}
|
||||
|
||||
const pipeline = await ctx.with('pipeline', {}, async () => await (workspace as Workspace).pipeline)
|
||||
|
||||
const session = this.createSession(token, pipeline)
|
||||
session.sessionId = sessionId
|
||||
session.sessionInstanceId = generateId()
|
||||
workspace.sessions.push([session, ws])
|
||||
await ctx.with('set-status', {}, () => this.setStatus(ctx, session, true))
|
||||
return session
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
if (workspace.upgrade && sessionId !== this.upgradeId) {
|
||||
ws.close()
|
||||
throw new Error('Upgrade in progress....')
|
||||
private async createUpgradeSession (
|
||||
token: Token,
|
||||
sessionId: string | undefined,
|
||||
ctx: MeasureContext,
|
||||
wsString: string,
|
||||
workspace: Workspace,
|
||||
pipelineFactory: PipelineFactory,
|
||||
ws: ConnectionSocket
|
||||
): Promise<Session> {
|
||||
if (LOGGING_ENABLED) {
|
||||
console.log(token.workspace.name, 'reloading workspace', JSON.stringify(token))
|
||||
}
|
||||
this.upgradeId = sessionId
|
||||
// If upgrade client is used.
|
||||
// Drop all existing clients
|
||||
await this.closeAll(ctx, wsString, workspace, 0, 'upgrade')
|
||||
// Wipe workspace and update values.
|
||||
if (!workspace.upgrade) {
|
||||
// This is previous workspace, intended to be closed.
|
||||
workspace.id = generateId()
|
||||
workspace.sessions = []
|
||||
workspace.upgrade = token.extra?.model === 'upgrade'
|
||||
}
|
||||
if (LOGGING_ENABLED) {
|
||||
console.log(token.workspace.name, 'no sessions for workspace', wsString)
|
||||
}
|
||||
// Re-create pipeline.
|
||||
workspace.pipeline = pipelineFactory(ctx, token.workspace, true, (tx) => this.broadcastAll(workspace, tx))
|
||||
|
||||
const pipeline = await workspace.pipeline
|
||||
|
||||
if (sessionId !== undefined) {
|
||||
// try restore session
|
||||
const existingSession = workspace.sessions.find((it) => it[0].sessionId === sessionId)
|
||||
if (existingSession !== undefined) {
|
||||
if (LOGGING_ENABLED) {
|
||||
console.log(
|
||||
token.workspace.name,
|
||||
'found existing session',
|
||||
token.email,
|
||||
existingSession[0].sessionId,
|
||||
existingSession[0].sessionInstanceId
|
||||
)
|
||||
}
|
||||
// Update websocket
|
||||
clearTimeout(existingSession[0].closeTimeout)
|
||||
existingSession[0].closeTimeout = undefined
|
||||
existingSession[1] = ws
|
||||
return existingSession[0]
|
||||
}
|
||||
}
|
||||
|
||||
const session = this.createSession(token, pipeline)
|
||||
session.sessionId = sessionId
|
||||
session.sessionInstanceId = generateId()
|
||||
workspace.sessions.push([session, ws])
|
||||
await this.setStatus(ctx, session, true)
|
||||
return session
|
||||
}
|
||||
|
||||
broadcastAll (workspace: Workspace, tx: Tx[]): void {
|
||||
for (const _tx of tx) {
|
||||
const msg = serialize({ result: _tx })
|
||||
for (const session of workspace.sessions) {
|
||||
session[1].send(msg)
|
||||
if (workspace?.upgrade ?? false) {
|
||||
return
|
||||
}
|
||||
const ctx = this.ctx.newChild('broadcast-all', {})
|
||||
const sessions = [...workspace.sessions]
|
||||
function send (): void {
|
||||
for (const session of sessions.splice(0, 1)) {
|
||||
for (const _tx of tx) {
|
||||
void session[1].send(ctx, { result: _tx })
|
||||
}
|
||||
}
|
||||
if (sessions.length > 0) {
|
||||
setImmediate(send)
|
||||
} else {
|
||||
ctx.end()
|
||||
}
|
||||
}
|
||||
send()
|
||||
}
|
||||
|
||||
private createWorkspace (ctx: MeasureContext, pipelineFactory: PipelineFactory, token: Token): Workspace {
|
||||
@ -195,23 +223,27 @@ class SessionManager {
|
||||
|
||||
async close (
|
||||
ctx: MeasureContext,
|
||||
ws: WebSocket,
|
||||
ws: ConnectionSocket,
|
||||
workspaceId: WorkspaceId,
|
||||
code: number,
|
||||
reason: string
|
||||
): Promise<void> {
|
||||
if (LOGGING_ENABLED) console.log(workspaceId.name, `closing websocket, code: ${code}, reason: ${reason}`)
|
||||
// if (LOGGING_ENABLED) console.log(workspaceId.name, `closing websocket, code: ${code}, reason: ${reason}`)
|
||||
const wsid = toWorkspaceString(workspaceId)
|
||||
const workspace = this.workspaces.get(wsid)
|
||||
if (workspace === undefined) {
|
||||
if (LOGGING_ENABLED) console.error(new Error('internal: cannot find sessions'))
|
||||
return
|
||||
}
|
||||
const index = workspace.sessions.findIndex((p) => p[1] === ws)
|
||||
const index = workspace.sessions.findIndex((p) => p[1].id === ws.id)
|
||||
if (index !== -1) {
|
||||
const session = workspace.sessions[index]
|
||||
workspace.sessions.splice(index, 1)
|
||||
session[1].close()
|
||||
try {
|
||||
session[1].close()
|
||||
} catch (err) {
|
||||
// Ignore if closed
|
||||
}
|
||||
const user = session[0].getUser()
|
||||
const another = workspace.sessions.findIndex((p) => p[0].getUser() === user)
|
||||
if (another === -1) {
|
||||
@ -254,24 +286,14 @@ class SessionManager {
|
||||
const sessions = Array.from(workspace.sessions)
|
||||
workspace.sessions = []
|
||||
|
||||
const closeS = async (s: Session, webSocket: WebSocket): Promise<void> => {
|
||||
clearTimeout(s.closeTimeout)
|
||||
const closeS = async (s: Session, webSocket: ConnectionSocket): Promise<void> => {
|
||||
s.workspaceClosed = true
|
||||
if (reason === 'upgrade') {
|
||||
// await for message to go to client.
|
||||
await new Promise((resolve) => {
|
||||
// Override message handler, to wait for upgrading response from clients.
|
||||
webSocket.on('close', () => {
|
||||
resolve(null)
|
||||
})
|
||||
webSocket.send(
|
||||
serialize({
|
||||
result: {
|
||||
_class: core.class.TxModelUpgrade
|
||||
}
|
||||
})
|
||||
)
|
||||
setTimeout(resolve, 1000)
|
||||
// Override message handler, to wait for upgrading response from clients.
|
||||
void webSocket.send(ctx, {
|
||||
result: {
|
||||
_class: core.class.TxModelUpgrade
|
||||
}
|
||||
})
|
||||
}
|
||||
webSocket.close()
|
||||
@ -307,241 +329,133 @@ class SessionManager {
|
||||
console.error(new Error('internal: cannot find sessions'))
|
||||
return
|
||||
}
|
||||
if (workspace?.upgrade ?? false) {
|
||||
return
|
||||
}
|
||||
if (LOGGING_ENABLED) console.log(workspaceId.name, `server broadcasting to ${workspace.sessions.length} clients...`)
|
||||
const msg = serialize(resp)
|
||||
for (const session of workspace.sessions) {
|
||||
if (session[0] !== from) {
|
||||
if (target === undefined) {
|
||||
session[1].send(msg)
|
||||
} else if (target.includes(session[0].getUser())) {
|
||||
session[1].send(msg)
|
||||
|
||||
const sessions = [...workspace.sessions]
|
||||
const ctx = this.ctx.newChild('broadcast', {})
|
||||
function send (): void {
|
||||
for (const session of sessions.splice(0, 1)) {
|
||||
if (session[0] !== from) {
|
||||
if (target === undefined) {
|
||||
void session[1].send(ctx, resp)
|
||||
} else if (target.includes(session[0].getUser())) {
|
||||
void session[1].send(ctx, resp)
|
||||
}
|
||||
}
|
||||
}
|
||||
if (sessions.length > 0) {
|
||||
setImmediate(send)
|
||||
} else {
|
||||
ctx.end()
|
||||
}
|
||||
}
|
||||
send()
|
||||
}
|
||||
}
|
||||
|
||||
async function handleSend (
|
||||
ctx: MeasureContext,
|
||||
ws: ConnectionSocket,
|
||||
msg: Response<any>,
|
||||
chunkLimit: number
|
||||
): Promise<void> {
|
||||
// ws.send(msg)
|
||||
if (Array.isArray(msg.result) && chunkLimit > 0 && msg.result.length > chunkLimit) {
|
||||
// Split and send by chunks
|
||||
const data = [...msg.result]
|
||||
|
||||
let cid = 1
|
||||
while (data.length > 0) {
|
||||
const chunk = data.splice(0, chunkLimit)
|
||||
if (chunk !== undefined) {
|
||||
await ws.send(ctx, { ...msg, result: chunk, chunk: { index: cid, final: data.length === 0 } })
|
||||
}
|
||||
cid++
|
||||
}
|
||||
} else {
|
||||
await ws.send(ctx, msg)
|
||||
}
|
||||
}
|
||||
|
||||
async function handleRequest<S extends Session> (
|
||||
ctx: MeasureContext,
|
||||
rctx: MeasureContext,
|
||||
service: S,
|
||||
ws: WebSocket,
|
||||
ws: ConnectionSocket,
|
||||
msg: string,
|
||||
workspace: string
|
||||
workspace: string,
|
||||
chunkLimit: number
|
||||
): Promise<void> {
|
||||
const request = readRequest(msg)
|
||||
if (request.id === -1 && request.method === 'hello') {
|
||||
if (LOGGING_ENABLED) console.timeLog(workspace, 'hello happen', service.getUser())
|
||||
ws.send(serialize({ id: -1, result: 'hello' }))
|
||||
return
|
||||
}
|
||||
if (request.id === -1 && request.method === '#upgrade') {
|
||||
ws.close(0, 'upgrade')
|
||||
return
|
||||
}
|
||||
const userCtx = ctx.newChild('client', { workspace }) as SessionContext
|
||||
const userCtx = rctx.newChild('client', { workspace }) as SessionContext
|
||||
userCtx.sessionId = service.sessionInstanceId ?? ''
|
||||
const f = (service as any)[request.method]
|
||||
let timeout: any
|
||||
let hangTimeout: any
|
||||
|
||||
const reqId = generateId()
|
||||
|
||||
const st = Date.now()
|
||||
try {
|
||||
const params = [userCtx, ...request.params]
|
||||
|
||||
const st = Date.now()
|
||||
timeout = setTimeout(() => {
|
||||
if (LOGGING_ENABLED) console.timeLog(workspace, 'long request found', service.getUser(), request, params)
|
||||
}, 4000)
|
||||
|
||||
hangTimeout = setTimeout(() => {
|
||||
if (LOGGING_ENABLED) {
|
||||
console.timeLog(workspace, 'request hang found, 30sec', workspace, service.getUser(), request, params)
|
||||
await userCtx.with('handleRequest', {}, async (ctx) => {
|
||||
const request = await ctx.with('read', {}, async () => readRequest(msg))
|
||||
if (request.id === -1 && request.method === 'hello') {
|
||||
if (LOGGING_ENABLED) console.timeLog(workspace, 'hello happen', service.getUser())
|
||||
await ws.send(ctx, { id: -1, result: 'hello' })
|
||||
return
|
||||
}
|
||||
}, 30000)
|
||||
service.requests.set(reqId, {
|
||||
id: reqId,
|
||||
params: request,
|
||||
start: st
|
||||
})
|
||||
if (request.id === -1 && request.method === '#upgrade') {
|
||||
ws.close()
|
||||
return
|
||||
}
|
||||
const f = (service as any)[request.method]
|
||||
try {
|
||||
const params = [...request.params]
|
||||
|
||||
let result = await f.apply(service, params)
|
||||
clearTimeout(timeout)
|
||||
clearTimeout(hangTimeout)
|
||||
const resp: Response<any> = { id: request.id, result }
|
||||
const result = await ctx.with('call', {}, async (callTx) => f.apply(service, [callTx, ...params]))
|
||||
|
||||
const diff = Date.now() - st
|
||||
if (diff > 5000 && LOGGING_ENABLED) {
|
||||
console.timeLog(
|
||||
timeout,
|
||||
'very long request found',
|
||||
workspace,
|
||||
service.getUser(),
|
||||
request,
|
||||
params,
|
||||
Array.isArray(result) ? result.length : '0',
|
||||
diff
|
||||
)
|
||||
}
|
||||
const toSend = serialize(resp)
|
||||
// Clear for gc to make work
|
||||
resp.result = undefined
|
||||
result = undefined
|
||||
ws.send(toSend)
|
||||
} catch (err: any) {
|
||||
if (LOGGING_ENABLED) console.error(err)
|
||||
clearTimeout(timeout)
|
||||
clearTimeout(hangTimeout)
|
||||
const resp: Response<any> = {
|
||||
id: request.id,
|
||||
error: unknownError(err)
|
||||
}
|
||||
ws.send(serialize(resp))
|
||||
const resp: Response<any> = { id: request.id, result }
|
||||
|
||||
await handleSend(ctx, ws, resp, chunkLimit)
|
||||
} catch (err: any) {
|
||||
if (LOGGING_ENABLED) console.error(err)
|
||||
const resp: Response<any> = {
|
||||
id: request.id,
|
||||
error: unknownError(err)
|
||||
}
|
||||
await ws.send(ctx, resp)
|
||||
}
|
||||
})
|
||||
} finally {
|
||||
userCtx.end()
|
||||
service.requests.delete(reqId)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @public
|
||||
* @param sessionFactory -
|
||||
* @param port -
|
||||
* @param host -
|
||||
*/
|
||||
export function start (
|
||||
ctx: MeasureContext,
|
||||
pipelineFactory: PipelineFactory,
|
||||
sessionFactory: (token: Token, pipeline: Pipeline, broadcast: BroadcastCall) => Session,
|
||||
port: number,
|
||||
productId: string,
|
||||
host?: string
|
||||
): () => Promise<void> {
|
||||
if (LOGGING_ENABLED) console.log(`starting server on port ${port} ...`)
|
||||
|
||||
const sessions = new SessionManager(sessionFactory)
|
||||
|
||||
const wss = new WebSocketServer({
|
||||
noServer: true,
|
||||
perMessageDeflate: {
|
||||
zlibDeflateOptions: {
|
||||
// See zlib defaults.
|
||||
chunkSize: 10 * 1024,
|
||||
memLevel: 7,
|
||||
level: 3
|
||||
},
|
||||
zlibInflateOptions: {
|
||||
chunkSize: 10 * 1024
|
||||
}
|
||||
}
|
||||
})
|
||||
// eslint-disable-next-line @typescript-eslint/no-misused-promises
|
||||
wss.on('connection', async (ws: WebSocket, request: any, token: Token, sessionId?: string) => {
|
||||
let buffer: string[] | undefined = []
|
||||
|
||||
ws.on('message', (msg: string) => {
|
||||
buffer?.push(msg)
|
||||
})
|
||||
const session = await sessions.addSession(ctx, ws, token, pipelineFactory, productId, sessionId)
|
||||
// eslint-disable-next-line @typescript-eslint/no-misused-promises
|
||||
ws.on('message', (msg: RawData) => {
|
||||
let msgStr = ''
|
||||
if (typeof msg === 'string') {
|
||||
msgStr = msg
|
||||
} else if (msg instanceof Buffer) {
|
||||
msgStr = msg.toString()
|
||||
} else if (Array.isArray(msg)) {
|
||||
msgStr = Buffer.concat(msg).toString()
|
||||
}
|
||||
void handleRequest(ctx, session, ws, msgStr, token.workspace.name)
|
||||
})
|
||||
// eslint-disable-next-line @typescript-eslint/no-misused-promises
|
||||
ws.on('close', (code: number, reason: Buffer) => {
|
||||
if (session.workspaceClosed ?? false) {
|
||||
return
|
||||
}
|
||||
// remove session after 1seconds, give a time to reconnect.
|
||||
if (code === 1000) {
|
||||
if (LOGGING_ENABLED) console.log(token.workspace.name, `client "${token.email}" closed normally`)
|
||||
void sessions.close(ctx, ws, token.workspace, code, reason.toString())
|
||||
} else {
|
||||
if (LOGGING_ENABLED) {
|
||||
console.log(
|
||||
token.workspace.name,
|
||||
`client "${token.email}" closed abnormally, waiting reconnect`,
|
||||
code,
|
||||
reason.toString()
|
||||
)
|
||||
}
|
||||
session.closeTimeout = setTimeout(() => {
|
||||
if (LOGGING_ENABLED) console.log(token.workspace.name, `client "${token.email}" force closed`)
|
||||
void sessions.close(ctx, ws, token.workspace, code, reason.toString())
|
||||
}, 10000)
|
||||
}
|
||||
})
|
||||
const b = buffer
|
||||
buffer = undefined
|
||||
for (const msg of b) {
|
||||
await handleRequest(ctx, session, ws, msg, token.workspace.name)
|
||||
}
|
||||
})
|
||||
|
||||
const server = createServer()
|
||||
|
||||
server.on('request', (request: IncomingMessage, response: ServerResponse) => {
|
||||
const url = new URL('http://localhost' + (request.url ?? ''))
|
||||
|
||||
const token = url.pathname.substring(1)
|
||||
try {
|
||||
const payload = decodeToken(token ?? '')
|
||||
console.log(payload.workspace, 'statistics request')
|
||||
|
||||
response.writeHead(200, {
|
||||
'Content-Type': 'application/json',
|
||||
'Access-Control-Allow-Origin': '*',
|
||||
'Access-Control-Allow-Methods': 'GET, OPTIONS',
|
||||
'Access-Control-Allow-Headers': 'Content-Type'
|
||||
})
|
||||
const data = {
|
||||
metrics: metricsAggregate((ctx as any).metrics),
|
||||
activeSessions: {}
|
||||
}
|
||||
for (const [k, v] of sessions.workspaces) {
|
||||
;(data.activeSessions as any)[k] = v.sessions.length
|
||||
}
|
||||
const json = JSON.stringify(data)
|
||||
response.end(json)
|
||||
} catch (err) {
|
||||
response.writeHead(404, {})
|
||||
response.end()
|
||||
}
|
||||
})
|
||||
|
||||
server.on('upgrade', (request: IncomingMessage, socket: any, head: Buffer) => {
|
||||
const url = new URL('http://localhost' + (request.url ?? ''))
|
||||
const token = url.pathname.substring(1)
|
||||
|
||||
try {
|
||||
const payload = decodeToken(token ?? '')
|
||||
const sessionId = url.searchParams.get('sessionId')
|
||||
if (LOGGING_ENABLED) console.log(payload.workspace.name, 'client connected with payload', payload, sessionId)
|
||||
|
||||
if (payload.workspace.productId !== productId) {
|
||||
throw new Error('Invalid workspace product')
|
||||
}
|
||||
|
||||
wss.handleUpgrade(request, socket, head, (ws) => wss.emit('connection', ws, request, payload, sessionId))
|
||||
} catch (err) {
|
||||
if (LOGGING_ENABLED) console.error('invalid token', err)
|
||||
wss.handleUpgrade(request, socket, head, (ws) => {
|
||||
const resp: Response<any> = {
|
||||
id: -1,
|
||||
error: UNAUTHORIZED,
|
||||
result: 'hello'
|
||||
}
|
||||
ws.send(serialize(resp))
|
||||
ws.onmessage = (msg) => {
|
||||
const resp: Response<any> = {
|
||||
error: UNAUTHORIZED
|
||||
}
|
||||
ws.send(serialize(resp))
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
server.listen(port, host)
|
||||
return async () => {
|
||||
server.close()
|
||||
await sessions.closeWorkspaces(ctx)
|
||||
opt: {
|
||||
port: number
|
||||
pipelineFactory: PipelineFactory
|
||||
sessionFactory: (token: Token, pipeline: Pipeline, broadcast: BroadcastCall) => Session
|
||||
productId: string
|
||||
serverFactory: ServerFactory
|
||||
chunking: number // 25
|
||||
}
|
||||
): () => Promise<void> {
|
||||
const sessions = new TSessionManager(ctx, opt.sessionFactory)
|
||||
return opt.serverFactory(
|
||||
sessions,
|
||||
(rctx, service, ws, msg, workspace) => handleRequest(rctx, service, ws, msg, workspace, opt.chunking),
|
||||
ctx,
|
||||
opt.pipelineFactory,
|
||||
opt.port,
|
||||
opt.productId
|
||||
)
|
||||
}
|
||||
|
184
server/ws/src/server_http.ts
Normal file
184
server/ws/src/server_http.ts
Normal file
@ -0,0 +1,184 @@
|
||||
//
|
||||
// Copyright © 2023 Hardcore Engineering Inc.
|
||||
//
|
||||
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License. You may
|
||||
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
//
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
import { MeasureContext, generateId } from '@hcengineering/core'
|
||||
import { Response, UNAUTHORIZED, serialize } from '@hcengineering/platform'
|
||||
import { Token, decodeToken } from '@hcengineering/server-token'
|
||||
import { IncomingMessage, ServerResponse, createServer } from 'http'
|
||||
import { RawData, WebSocket, WebSocketServer } from 'ws'
|
||||
import { getStatistics } from './stats'
|
||||
import { ConnectionSocket, HandleRequestFunction, LOGGING_ENABLED, PipelineFactory, SessionManager } from './types'
|
||||
/**
|
||||
* @public
|
||||
* @param sessionFactory -
|
||||
* @param port -
|
||||
* @param host -
|
||||
*/
|
||||
export function startHttpServer (
|
||||
sessions: SessionManager,
|
||||
handleRequest: HandleRequestFunction,
|
||||
ctx: MeasureContext,
|
||||
pipelineFactory: PipelineFactory,
|
||||
port: number,
|
||||
productId: string
|
||||
): () => Promise<void> {
|
||||
if (LOGGING_ENABLED) console.log(`starting server on port ${port} ...`)
|
||||
|
||||
const wss = new WebSocketServer({
|
||||
noServer: true,
|
||||
perMessageDeflate: {
|
||||
zlibDeflateOptions: {
|
||||
// See zlib defaults.
|
||||
chunkSize: 16 * 1024,
|
||||
level: 6
|
||||
},
|
||||
zlibInflateOptions: {
|
||||
chunkSize: 16 * 1024,
|
||||
level: 6
|
||||
},
|
||||
threshold: 1024 // Size (in bytes) below which messages, should not be compressed if context takeover is disabled.
|
||||
},
|
||||
skipUTF8Validation: true
|
||||
})
|
||||
// eslint-disable-next-line @typescript-eslint/no-misused-promises
|
||||
wss.on('connection', async (ws: WebSocket, request: any, token: Token, sessionId?: string) => {
|
||||
let buffer: string[] | undefined = []
|
||||
|
||||
const cs: ConnectionSocket = {
|
||||
id: generateId(),
|
||||
close: () => ws.close(),
|
||||
send: async (ctx: MeasureContext, msg) => {
|
||||
if (ws.readyState !== ws.OPEN) {
|
||||
return
|
||||
}
|
||||
const smsg = await ctx.with('serialize', {}, async () => serialize(msg))
|
||||
|
||||
ctx.measure('send-data', smsg.length)
|
||||
|
||||
return await ctx.with(
|
||||
'socket-send',
|
||||
{},
|
||||
async (ctx) =>
|
||||
await new Promise((resolve, reject) => {
|
||||
ws.send(smsg, (err) => {
|
||||
if (err != null) {
|
||||
reject(err)
|
||||
} else {
|
||||
resolve()
|
||||
}
|
||||
})
|
||||
})
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
ws.on('message', (msg: string) => {
|
||||
buffer?.push(msg)
|
||||
})
|
||||
const session = await sessions.addSession(ctx, cs, token, pipelineFactory, productId, sessionId)
|
||||
// eslint-disable-next-line @typescript-eslint/no-misused-promises
|
||||
ws.on('message', (msg: RawData) => {
|
||||
let msgStr = ''
|
||||
if (typeof msg === 'string') {
|
||||
msgStr = msg
|
||||
} else if (msg instanceof Buffer) {
|
||||
msgStr = msg.toString()
|
||||
} else if (Array.isArray(msg)) {
|
||||
msgStr = Buffer.concat(msg).toString()
|
||||
}
|
||||
void handleRequest(ctx, session, cs, msgStr, token.workspace.name)
|
||||
})
|
||||
// eslint-disable-next-line @typescript-eslint/no-misused-promises
|
||||
ws.on('close', (code: number, reason: Buffer) => {
|
||||
if (session.workspaceClosed ?? false) {
|
||||
return
|
||||
}
|
||||
// remove session after 1seconds, give a time to reconnect.
|
||||
// if (LOGGING_ENABLED) console.log(token.workspace.name, `client "${token.email}" closed ${code === 1000 ? 'normally' : 'abnormally'}`)
|
||||
void sessions.close(ctx, cs, token.workspace, code, reason.toString())
|
||||
})
|
||||
const b = buffer
|
||||
buffer = undefined
|
||||
for (const msg of b) {
|
||||
await handleRequest(ctx, session, cs, msg, token.workspace.name)
|
||||
}
|
||||
})
|
||||
|
||||
const server = createServer()
|
||||
|
||||
server.on('request', (request: IncomingMessage, response: ServerResponse) => {
|
||||
const url = new URL('http://localhost' + (request.url ?? ''))
|
||||
|
||||
const token = url.pathname.substring(1)
|
||||
try {
|
||||
const payload = decodeToken(token ?? '')
|
||||
console.log(payload.workspace, 'statistics request')
|
||||
|
||||
response.writeHead(200, {
|
||||
'Content-Type': 'application/json',
|
||||
'Access-Control-Allow-Origin': '*',
|
||||
'Access-Control-Allow-Methods': 'GET, OPTIONS',
|
||||
'Access-Control-Allow-Headers': 'Content-Type'
|
||||
})
|
||||
const json = JSON.stringify(getStatistics(ctx, sessions))
|
||||
response.end(json)
|
||||
} catch (err) {
|
||||
response.writeHead(404, {})
|
||||
response.end()
|
||||
}
|
||||
})
|
||||
|
||||
server.on('upgrade', (request: IncomingMessage, socket: any, head: Buffer) => {
|
||||
const url = new URL('http://localhost' + (request.url ?? ''))
|
||||
const token = url.pathname.substring(1)
|
||||
|
||||
try {
|
||||
const payload = decodeToken(token ?? '')
|
||||
const sessionId = url.searchParams.get('sessionId')
|
||||
// if (LOGGING_ENABLED) console.log(payload.workspace.name, 'client connected with payload', payload, sessionId)
|
||||
|
||||
if (payload.workspace.productId !== productId) {
|
||||
throw new Error('Invalid workspace product')
|
||||
}
|
||||
|
||||
wss.handleUpgrade(request, socket, head, (ws) => wss.emit('connection', ws, request, payload, sessionId))
|
||||
} catch (err) {
|
||||
if (LOGGING_ENABLED) console.error('invalid token', err)
|
||||
wss.handleUpgrade(request, socket, head, (ws) => {
|
||||
const resp: Response<any> = {
|
||||
id: -1,
|
||||
error: UNAUTHORIZED,
|
||||
result: 'hello'
|
||||
}
|
||||
ws.send(serialize(resp))
|
||||
ws.onmessage = (msg) => {
|
||||
const resp: Response<any> = {
|
||||
error: UNAUTHORIZED
|
||||
}
|
||||
ws.send(serialize(resp))
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
server.on('error', () => {
|
||||
if (LOGGING_ENABLED) console.error('server error')
|
||||
})
|
||||
|
||||
server.listen(port)
|
||||
return async () => {
|
||||
server.close()
|
||||
await sessions.closeWorkspaces(ctx)
|
||||
}
|
||||
}
|
208
server/ws/src/server_u.ts_
Normal file
208
server/ws/src/server_u.ts_
Normal file
@ -0,0 +1,208 @@
|
||||
//
|
||||
// Copyright © 2023 Hardcore Engineering Inc.
|
||||
//
|
||||
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License. You may
|
||||
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
//
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
// uWebSockets.js
|
||||
// Import should be added: "uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.23.0"
|
||||
|
||||
import { MeasureContext, generateId } from '@hcengineering/core'
|
||||
import { Token, decodeToken } from '@hcengineering/server-token'
|
||||
|
||||
import { serialize } from '@hcengineering/platform'
|
||||
import uWebSockets, { SHARED_COMPRESSOR, WebSocket } from 'uWebSockets.js'
|
||||
import { getStatistics } from './stats'
|
||||
import {
|
||||
ConnectionSocket,
|
||||
HandleRequestFunction,
|
||||
LOGGING_ENABLED,
|
||||
PipelineFactory,
|
||||
Session,
|
||||
SessionManager
|
||||
} from './types'
|
||||
|
||||
interface WebsocketUserData {
|
||||
wrapper?: ConnectionSocket
|
||||
payload: Token
|
||||
session?: Promise<Session>
|
||||
backPressure?: Promise<void>
|
||||
backPressureResolve?: () => void
|
||||
unsendMsg: any[]
|
||||
}
|
||||
|
||||
/**
|
||||
* @public
|
||||
* @param port -
|
||||
* @param host -
|
||||
*/
|
||||
export function startUWebsocketServer (
|
||||
sessions: SessionManager,
|
||||
handleRequest: HandleRequestFunction,
|
||||
ctx: MeasureContext,
|
||||
pipelineFactory: PipelineFactory,
|
||||
port: number,
|
||||
productId: string
|
||||
): () => Promise<void> {
|
||||
if (LOGGING_ENABLED) console.log(`starting U server on port ${port} ...`)
|
||||
|
||||
const uAPP = uWebSockets.App()
|
||||
|
||||
uAPP
|
||||
.ws<WebsocketUserData>('/*', {
|
||||
/* There are many common helper features */
|
||||
// idleTimeout: 32,
|
||||
// maxBackpressure: 1024,
|
||||
maxPayloadLength: 50 * 1024 * 1024,
|
||||
compression: SHARED_COMPRESSOR,
|
||||
maxLifetime: 0,
|
||||
sendPingsAutomatically: true,
|
||||
|
||||
upgrade (res, req, context) {
|
||||
const url = new URL('http://localhost' + (req.getUrl() ?? ''))
|
||||
const token = url.pathname.substring(1)
|
||||
|
||||
try {
|
||||
const payload = decodeToken(token ?? '')
|
||||
|
||||
if (payload.workspace.productId !== productId) {
|
||||
throw new Error('Invalid workspace product')
|
||||
}
|
||||
|
||||
/* This immediately calls open handler, you must not use res after this call */
|
||||
res.upgrade<WebsocketUserData>(
|
||||
{
|
||||
payload,
|
||||
session: undefined,
|
||||
unsendMsg: []
|
||||
},
|
||||
/* Spell these correctly */
|
||||
req.getHeader('sec-websocket-key'),
|
||||
req.getHeader('sec-websocket-protocol'),
|
||||
req.getHeader('sec-websocket-extensions'),
|
||||
context
|
||||
)
|
||||
} catch (err) {
|
||||
if (LOGGING_ENABLED) console.error('invalid token', err)
|
||||
res.writeStatus('401 Unauthorized').end()
|
||||
}
|
||||
},
|
||||
open: (ws: WebSocket<WebsocketUserData>) => {
|
||||
const data = ws.getUserData()
|
||||
data.wrapper = {
|
||||
id: generateId(),
|
||||
close: () => {
|
||||
try {
|
||||
ws.close()
|
||||
} catch (err) {
|
||||
// Ignore closed
|
||||
}
|
||||
},
|
||||
send: async (ctx, msg): Promise<void> => {
|
||||
await ctx.with('backpressure', {}, async () => await data.backPressure)
|
||||
const serialized = await ctx.with('serialize', {}, async () => serialize(msg))
|
||||
ctx.measure('send-data', serialized.length)
|
||||
try {
|
||||
const sendR = await ctx.with('backpressure', {}, async () =>
|
||||
ws.send(serialized, false, Array.isArray(msg.result))
|
||||
)
|
||||
if (sendR === 2) {
|
||||
data.backPressure = new Promise((resolve) => {
|
||||
data.backPressureResolve = resolve
|
||||
})
|
||||
data.unsendMsg.push(msg)
|
||||
}
|
||||
} catch (err: any) {
|
||||
if (!((err.message ?? '') as string).includes('Invalid access of closed')) {
|
||||
console.error(err)
|
||||
}
|
||||
// Ignore socket is closed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
data.session = sessions.addSession(
|
||||
ctx,
|
||||
data.wrapper,
|
||||
ws.getUserData().payload,
|
||||
pipelineFactory,
|
||||
productId,
|
||||
undefined
|
||||
)
|
||||
},
|
||||
message: (ws, message, isBinary) => {
|
||||
const data = ws.getUserData()
|
||||
const enc = new TextDecoder('utf-8')
|
||||
const tmsg = enc.decode(message)
|
||||
|
||||
void data.session?.then((s) => {
|
||||
void handleRequest(ctx, s, data.wrapper as ConnectionSocket, tmsg, data.payload.workspace.name)
|
||||
})
|
||||
},
|
||||
drain: (ws) => {
|
||||
console.log(`WebSocket backpressure: ${ws.getBufferedAmount()}`)
|
||||
const data = ws.getUserData()
|
||||
while (data.unsendMsg.length > 0) {
|
||||
if (ws.send(data.unsendMsg[0]) !== 1) {
|
||||
data.unsendMsg.shift()
|
||||
} else {
|
||||
// Wait for next drain.
|
||||
return
|
||||
}
|
||||
}
|
||||
data.backPressureResolve?.()
|
||||
data.backPressure = undefined
|
||||
},
|
||||
close: (ws, code, message) => {
|
||||
const enc = new TextDecoder('utf-8')
|
||||
const data = ws.getUserData()
|
||||
try {
|
||||
const tmsg = enc.decode(message)
|
||||
if (tmsg !== undefined && tmsg !== '') {
|
||||
console.error(tmsg)
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(err)
|
||||
}
|
||||
void data.session?.then((s) => {
|
||||
void sessions.close(ctx, data.wrapper as ConnectionSocket, data.payload.workspace, code, '')
|
||||
})
|
||||
}
|
||||
})
|
||||
.any('/*', (response, request) => {
|
||||
const url = new URL('http://localhost' + (request.getUrl() ?? ''))
|
||||
|
||||
const token = url.pathname.substring(1)
|
||||
try {
|
||||
const payload = decodeToken(token ?? '')
|
||||
console.log(payload.workspace, 'statistics request')
|
||||
|
||||
const json = JSON.stringify(getStatistics(ctx, sessions))
|
||||
|
||||
response
|
||||
.writeStatus('200 OK')
|
||||
.writeHeader('Content-Type', 'application/json')
|
||||
.writeHeader('Access-Control-Allow-Origin', '*')
|
||||
.writeHeader('Access-Control-Allow-Methods', 'GET, OPTIONS')
|
||||
.writeHeader('Access-Control-Allow-Headers', 'Content-Type')
|
||||
.end(json)
|
||||
} catch (err) {
|
||||
response.writeHead(404, {})
|
||||
response.end()
|
||||
}
|
||||
})
|
||||
.listen(port, (s) => {})
|
||||
|
||||
return async () => {
|
||||
await sessions.closeWorkspaces(ctx)
|
||||
}
|
||||
}
|
26
server/ws/src/stats.ts
Normal file
26
server/ws/src/stats.ts
Normal file
@ -0,0 +1,26 @@
|
||||
import { MeasureContext, metricsAggregate } from '@hcengineering/core'
|
||||
import { SessionManager } from './types'
|
||||
import os from 'os'
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export function getStatistics (ctx: MeasureContext, sessions: SessionManager): any {
|
||||
const data: Record<string, any> = {
|
||||
metrics: metricsAggregate((ctx as any).metrics),
|
||||
statistics: {
|
||||
activeSessions: {}
|
||||
}
|
||||
}
|
||||
for (const [k, v] of sessions.workspaces) {
|
||||
data.statistics.activeSessions[k] = v.sessions.length
|
||||
}
|
||||
|
||||
data.statistics.memoryUsed = Math.round((process.memoryUsage().heapUsed / 1024 / 1024) * 100) / 100
|
||||
data.statistics.memoryTotal = Math.round((process.memoryUsage().heapTotal / 1024 / 1024) * 100) / 100
|
||||
data.statistics.cpuUsage = Math.round(os.loadavg()[0] * 100) / 100
|
||||
data.statistics.freeMem = Math.round((os.freemem() / 1024 / 1024) * 100) / 100
|
||||
data.statistics.totalMem = Math.round((os.totalmem() / 1024 / 1024) * 100) / 100
|
||||
|
||||
return data
|
||||
}
|
@ -12,6 +12,16 @@ import {
|
||||
} from '@hcengineering/core'
|
||||
import { Response } from '@hcengineering/platform'
|
||||
import { Pipeline } from '@hcengineering/server-core'
|
||||
import { Token } from '@hcengineering/server-token'
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export interface SessionRequest {
|
||||
id: string
|
||||
params: any
|
||||
start: number
|
||||
}
|
||||
|
||||
/**
|
||||
* @public
|
||||
@ -33,6 +43,8 @@ export interface Session {
|
||||
sessionInstanceId?: string
|
||||
closeTimeout?: any
|
||||
workspaceClosed?: boolean
|
||||
|
||||
requests: Map<string, SessionRequest>
|
||||
}
|
||||
|
||||
/**
|
||||
@ -54,3 +66,99 @@ export type PipelineFactory = (
|
||||
upgrade: boolean,
|
||||
broadcast: (tx: Tx[]) => void
|
||||
) => Promise<Pipeline>
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export interface ConnectionSocket {
|
||||
id: string
|
||||
close: () => void
|
||||
send: (ctx: MeasureContext, msg: Response<any>) => Promise<void>
|
||||
}
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export let LOGGING_ENABLED = true
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export function disableLogging (): void {
|
||||
LOGGING_ENABLED = false
|
||||
}
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export interface Workspace {
|
||||
id: string
|
||||
pipeline: Promise<Pipeline>
|
||||
sessions: [Session, ConnectionSocket][]
|
||||
upgrade: boolean
|
||||
closing?: Promise<void>
|
||||
}
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export interface SessionManager {
|
||||
workspaces: Map<string, Workspace>
|
||||
|
||||
createSession: (token: Token, pipeline: Pipeline) => Session
|
||||
|
||||
addSession: (
|
||||
ctx: MeasureContext,
|
||||
ws: ConnectionSocket,
|
||||
token: Token,
|
||||
pipelineFactory: PipelineFactory,
|
||||
productId: string,
|
||||
sessionId?: string
|
||||
) => Promise<Session>
|
||||
|
||||
broadcastAll: (workspace: Workspace, tx: Tx[]) => void
|
||||
|
||||
close: (
|
||||
ctx: MeasureContext,
|
||||
ws: ConnectionSocket,
|
||||
workspaceId: WorkspaceId,
|
||||
code: number,
|
||||
reason: string
|
||||
) => Promise<void>
|
||||
|
||||
closeAll: (
|
||||
ctx: MeasureContext,
|
||||
wsId: string,
|
||||
workspace: Workspace,
|
||||
code: number,
|
||||
reason: 'upgrade' | 'shutdown'
|
||||
) => Promise<void>
|
||||
|
||||
closeWorkspaces: (ctx: MeasureContext) => Promise<void>
|
||||
|
||||
broadcast: (from: Session | null, workspaceId: WorkspaceId, resp: Response<any>, target?: string[]) => void
|
||||
}
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export type HandleRequestFunction = <S extends Session>(
|
||||
rctx: MeasureContext,
|
||||
service: S,
|
||||
ws: ConnectionSocket,
|
||||
msg: string,
|
||||
workspace: string
|
||||
) => Promise<void>
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
|
||||
export type ServerFactory = (
|
||||
sessions: SessionManager,
|
||||
handleRequest: HandleRequestFunction,
|
||||
ctx: MeasureContext,
|
||||
pipelineFactory: PipelineFactory,
|
||||
port: number,
|
||||
productId: string
|
||||
) => () => Promise<void>
|
||||
|
Loading…
Reference in New Issue
Block a user