UBERF-6778: Add Support to uWebSocket.js library (#5503)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-05-06 16:15:39 +07:00 committed by GitHub
parent 45446a497f
commit 096bfacad7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 841 additions and 584 deletions

View File

@ -282,6 +282,78 @@ jobs:
# with:
# name: db-snapshot
# path: ./tests/db_dump
uitest-uweb:
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
filter: tree:0
- uses: actions/setup-node@v4
with:
node-version-file: '.nvmrc'
- name: Cache node modules
uses: actions/cache@v4
env:
cache-name: cache-node-platform
with:
path: |
common/temp
key: ${{ runner.os }}-build-${{ env.cache-name }}-${{ hashFiles('**/pnpm-lock.yaml') }}
restore-keys: |
${{ runner.os }}-build-${{ env.cache-name }}-${{ hashFiles('**/pnpm-lock.yaml') }}
- name: Checking for mis-matching dependencies...
run: node common/scripts/install-run-rush.js check
- name: Installing...
run: node common/scripts/install-run-rush.js install
- name: Docker Build
run: node common/scripts/install-run-rush.js docker:build -p 20
env:
DOCKER_CLI_HINTS: false
- name: Prepare server
env:
SERVER_PROVIDER: uweb
run: |
cd ./tests
./prepare.sh
- name: Install Playwright
run: |
cd ./tests/sanity
node ../../common/scripts/install-run-rushx.js ci
- name: Run UI tests
run: |
cd ./tests/sanity
node ../../common/scripts/install-run-rushx.js uitest
- name: "Store docker logs"
if: always()
run: |
cd ./tests/sanity
mkdir logs
docker logs $(docker ps | grep transactor | cut -f 1 -d ' ') > logs/uweb-transactor.log
docker logs $(docker ps | grep account | cut -f 1 -d ' ') > logs/uweb-account.log
docker logs $(docker ps | grep front | cut -f 1 -d ' ') > logs/uweb-front.log
- name: Upload test results
if: always()
uses: actions/upload-artifact@v4
with:
name: playwright-results-uweb
path: ./tests/sanity/playwright-report/
- name: Upload Logs
if: always()
uses: actions/upload-artifact@v4
with:
name: docker-logs-uweb
path: ./tests/sanity/logs
# - name: Upload DB snapshot
# if: always()
# uses: actions/upload-artifact@v3
# with:
# name: db-snapshot
# path: ./tests/db_dump
docker-build:
needs: [build, test, svelte-check, uitest]
runs-on: ubuntu-latest

View File

@ -46,6 +46,10 @@ function calcHash(source, msg, addCheck) {
if( !ext.endsWith('.ts') && !ext.endsWith('.js') && !ext.endsWith('.svelte')) {
continue
}
if( sourceFile.endsWith('.d.ts') ) {
// Skip declaration files
continue
}
calcFileHash(sourceFile, msg, addCheck)
}
}
@ -76,6 +80,7 @@ if( process.argv.includes('-f') || process.argv.includes('--force')) {
filesToCheck = allFiles
}
if( filesToCheck.length > 0 ) {
console.info(`running prettier ${filesToCheck.length}`)
// Changes detected.

View File

@ -46,7 +46,7 @@ import core, {
} from '@hcengineering/core'
import { PlatformError, UNAUTHORIZED, broadcastEvent, getMetadata, unknownError } from '@hcengineering/platform'
import { HelloRequest, HelloResponse, ReqId, readResponse, serialize } from '@hcengineering/rpc'
import { HelloRequest, HelloResponse, ReqId, readResponse, serialize, type Response } from '@hcengineering/rpc'
const SECOND = 1000
const pingTimeout = 10 * SECOND
@ -55,7 +55,7 @@ const dialTimeout = 30 * SECOND
class RequestPromise {
startTime: number = Date.now()
handleTime?: (diff: number, result: any, serverTime: number, queue: number) => void
handleTime?: (diff: number, result: any, serverTime: number, queue: number, toRecieve: number) => void
readonly promise: Promise<any>
resolve!: (value?: any) => void
reject!: (reason?: any) => void
@ -198,6 +198,170 @@ class Connection implements ClientConnection {
}
}
handleMsg (socketId: number, resp: Response<any>): void {
if (resp.error !== undefined) {
if (resp.error?.code === UNAUTHORIZED.code) {
Analytics.handleError(new PlatformError(resp.error))
this.closed = true
this.websocket?.close()
this.onUnauthorized?.()
}
console.error(resp.error)
return
}
if (resp.id === -1) {
this.delay = 0
if (resp.result?.state === 'upgrading') {
void this.onConnect?.(ClientConnectEvent.Maintenance, resp.result.stats)
this.upgrading = true
this.delay = 3
return
}
if (resp.result === 'hello') {
if (this.upgrading) {
// We need to call upgrade since connection is upgraded
this.onUpgrade?.()
}
this.upgrading = false
if ((resp as HelloResponse).alreadyConnected === true) {
this.sessionId = generateId()
if (typeof sessionStorage !== 'undefined') {
sessionStorage.setItem('session.id.' + this.url, this.sessionId)
}
console.log('Connection: alreadyConnected, reconnect with new Id')
clearTimeout(dialTimeout)
this.scheduleOpen(true)
return
}
if ((resp as HelloResponse).binary) {
this.binaryMode = true
}
// Notify all waiting connection listeners
const handlers = this.onConnectHandlers.splice(0, this.onConnectHandlers.length)
for (const h of handlers) {
h()
}
for (const [, v] of this.requests.entries()) {
v.reconnect?.()
}
void this.onConnect?.(
(resp as HelloResponse).reconnect === true ? ClientConnectEvent.Reconnected : ClientConnectEvent.Connected
)
this.schedulePing(socketId)
return
} else {
Analytics.handleError(new Error(`unexpected response: ${JSON.stringify(resp)}`))
}
return
}
if (resp.result === 'ping') {
void this.sendRequest({ method: 'ping', params: [] })
return
}
if (resp.id !== undefined) {
const promise = this.requests.get(resp.id)
if (promise === undefined) {
console.error(
new Error(`unknown response id: ${resp.id as string} ${this.workspace} ${this.email}`),
JSON.stringify(this.requests)
)
return
}
if (resp.chunk !== undefined) {
promise.chunks = [
...(promise.chunks ?? []),
{
index: resp.chunk.index,
data: resp.result as FindResult<any>
}
]
// console.log(socketId, 'chunk', promise.method, promise.params, promise.chunks.length, (resp.result as []).length)
if (resp.chunk.final) {
promise.chunks.sort((a, b) => a.index - b.index)
let result: any[] = []
let total = -1
let lookupMap: Record<string, Doc> | undefined
for (const c of promise.chunks) {
if (c.data.total !== 0) {
total = c.data.total
}
if (c.data.lookupMap !== undefined) {
lookupMap = c.data.lookupMap
}
result = result.concat(c.data)
}
resp.result = toFindResult(result, total, lookupMap)
resp.chunk = undefined
} else {
// Not all chunks are available yet.
return
}
}
const request = this.requests.get(resp.id)
promise.handleTime?.(
Date.now() - promise.startTime,
resp.result,
resp.time ?? 0,
resp.queue ?? 0,
Date.now() - (resp.bfst ?? 0)
)
this.requests.delete(resp.id)
if (resp.error !== undefined) {
console.log(
'ERROR',
'request:',
request?.method,
'response-id:',
resp.id,
'error: ',
resp.error,
'result: ',
resp.result,
this.workspace,
this.email
)
promise.reject(new PlatformError(resp.error))
} else {
if (request?.handleResult !== undefined) {
void request.handleResult(resp.result).then(() => {
promise.resolve(resp.result)
})
} else {
promise.resolve(resp.result)
}
}
void broadcastEvent(client.event.NetworkRequests, this.requests.size)
} else {
const txArr = Array.isArray(resp.result) ? (resp.result as Tx[]) : [resp.result as Tx]
for (const tx of txArr) {
if (
(tx?._class === core.class.TxWorkspaceEvent && (tx as TxWorkspaceEvent).event === WorkspaceEvent.Upgrade) ||
tx?._class === core.class.TxModelUpgrade
) {
console.log('Processing upgrade', this.workspace, this.email)
this.onUpgrade?.()
return
}
}
this.handler(...txArr)
clearTimeout(this.incomingTimer)
void broadcastEvent(client.event.NetworkRequests, this.requests.size + 1)
this.incomingTimer = setTimeout(() => {
void broadcastEvent(client.event.NetworkRequests, this.requests.size)
}, 500)
}
}
private openConnection (socketId: number): void {
this.binaryMode = false
// Use defined factory or browser default one.
@ -205,7 +369,7 @@ class Connection implements ClientConnection {
getMetadata(client.metadata.ClientSocketFactory) ??
((url: string) => {
const s = new WebSocket(url)
s.binaryType = 'arraybuffer'
// s.binaryType = 'arraybuffer'
return s as ClientSocket
})
@ -245,162 +409,14 @@ class Connection implements ClientConnection {
if (this.websocket !== wsocket) {
return
}
const resp = readResponse<any>(event.data, this.binaryMode)
if (resp.error !== undefined) {
if (resp.error?.code === UNAUTHORIZED.code) {
Analytics.handleError(new PlatformError(resp.error))
this.closed = true
this.websocket.close()
this.onUnauthorized?.()
}
console.error(resp.error)
return
}
if (resp.id === -1) {
this.delay = 0
if (resp.result?.state === 'upgrading') {
void this.onConnect?.(ClientConnectEvent.Maintenance, resp.result.stats)
this.upgrading = true
this.delay = 3
return
}
if (resp.result === 'hello') {
if (this.upgrading) {
// We need to call upgrade since connection is upgraded
this.onUpgrade?.()
}
this.upgrading = false
if ((resp as HelloResponse).alreadyConnected === true) {
this.sessionId = generateId()
if (typeof sessionStorage !== 'undefined') {
sessionStorage.setItem('session.id.' + this.url, this.sessionId)
}
console.log('Connection: alreadyConnected, reconnect with new Id')
clearTimeout(dialTimeout)
this.scheduleOpen(true)
return
}
if ((resp as HelloResponse).binary) {
this.binaryMode = true
}
// Notify all waiting connection listeners
const handlers = this.onConnectHandlers.splice(0, this.onConnectHandlers.length)
for (const h of handlers) {
h()
}
for (const [, v] of this.requests.entries()) {
v.reconnect?.()
}
void this.onConnect?.(
(resp as HelloResponse).reconnect === true ? ClientConnectEvent.Reconnected : ClientConnectEvent.Connected
)
this.schedulePing(socketId)
return
} else {
Analytics.handleError(new Error(`unexpected response: ${JSON.stringify(resp)}`))
}
return
}
if (resp.result === 'ping') {
void this.sendRequest({ method: 'ping', params: [] })
return
}
if (resp.id !== undefined) {
const promise = this.requests.get(resp.id)
if (promise === undefined) {
console.error(
new Error(`unknown response id: ${resp.id as string} ${this.workspace} ${this.email}`),
JSON.stringify(this.requests)
)
return
}
if (resp.chunk !== undefined) {
promise.chunks = [
...(promise.chunks ?? []),
{
index: resp.chunk.index,
data: resp.result as FindResult<any>
}
]
// console.log(socketId, 'chunk', promise.method, promise.params, promise.chunks.length, (resp.result as []).length)
if (resp.chunk.final) {
promise.chunks.sort((a, b) => a.index - b.index)
let result: any[] = []
let total = -1
let lookupMap: Record<string, Doc> | undefined
for (const c of promise.chunks) {
if (c.data.total !== 0) {
total = c.data.total
}
if (c.data.lookupMap !== undefined) {
lookupMap = c.data.lookupMap
}
result = result.concat(c.data)
}
resp.result = toFindResult(result, total, lookupMap)
resp.chunk = undefined
} else {
// Not all chunks are available yet.
return
}
}
const request = this.requests.get(resp.id)
promise.handleTime?.(Date.now() - promise.startTime, resp.result, resp.time ?? 0, resp.queue ?? 0)
this.requests.delete(resp.id)
if (resp.error !== undefined) {
console.log(
'ERROR',
'request:',
request?.method,
'response-id:',
resp.id,
'error: ',
resp.error,
'result: ',
resp.result,
this.workspace,
this.email
)
promise.reject(new PlatformError(resp.error))
} else {
if (request?.handleResult !== undefined) {
void request.handleResult(resp.result).then(() => {
promise.resolve(resp.result)
})
} else {
promise.resolve(resp.result)
}
}
void broadcastEvent(client.event.NetworkRequests, this.requests.size)
if (event.data instanceof Blob) {
void event.data.arrayBuffer().then((data) => {
const resp = readResponse<any>(data, this.binaryMode)
this.handleMsg(socketId, resp)
})
} else {
const txArr = Array.isArray(resp.result) ? (resp.result as Tx[]) : [resp.result as Tx]
for (const tx of txArr) {
if (
(tx?._class === core.class.TxWorkspaceEvent && (tx as TxWorkspaceEvent).event === WorkspaceEvent.Upgrade) ||
tx?._class === core.class.TxModelUpgrade
) {
console.log('Processing upgrade', this.workspace, this.email)
this.onUpgrade?.()
return
}
}
this.handler(...txArr)
clearTimeout(this.incomingTimer)
void broadcastEvent(client.event.NetworkRequests, this.requests.size + 1)
this.incomingTimer = setTimeout(() => {
void broadcastEvent(client.event.NetworkRequests, this.requests.size)
}, 500)
const resp = readResponse<any>(event.data, this.binaryMode)
this.handleMsg(socketId, resp)
}
}
wsocket.onclose = (ev) => {
@ -454,7 +470,7 @@ class Connection implements ClientConnection {
retry?: () => Promise<boolean>
handleResult?: (result: any) => Promise<void>
once?: boolean // Require handleResult to retrieve result
measure?: (time: number, result: any, serverTime: number, queue: number) => void
measure?: (time: number, result: any, serverTime: number, queue: number, toRecieve: number) => void
allowReconnect?: boolean
}): Promise<any> {
if (this.closed) {
@ -547,12 +563,13 @@ class Connection implements ClientConnection {
const result = await this.sendRequest({
method: 'findAll',
params: [_class, query, options],
measure: (time, result, serverTime, queue) => {
measure: (time, result, serverTime, queue, toReceive) => {
if (typeof window !== 'undefined' && (time > 1000 || serverTime > 500)) {
console.error(
'measure slow findAll',
time,
serverTime,
toReceive,
queue,
_class,
query,

View File

@ -4,8 +4,12 @@ ENV NODE_ENV production
WORKDIR /app
RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd --unsafe-perm
RUN npm install --ignore-scripts=false --verbose uNetworking/uWebSockets.js#v20.43.0
COPY bundle/bundle.js ./
RUN mv node_modules/uWebSockets.js/*.node .
EXPOSE 8080
ENV UWS_HTTP_MAX_HEADERS_SIZE 32768
CMD [ "node", "./bundle.js" ]

View File

@ -13,8 +13,7 @@
"_phase:bundle": "rushx bundle",
"_phase:docker-build": "rushx docker:build",
"_phase:docker-staging": "rushx docker:staging",
"bundle": "mkdir -p bundle && esbuild src/__start.ts --bundle --sourcemap=inline --minify --platform=node --external:bufferutil --external:utf-8-validate --define:process.env.MODEL_VERSION=$(node ../../common/scripts/show_version.js) --define:process.env.GIT_REVISION=$(../../common/scripts/git_version.sh) > bundle/bundle.js",
"bundle:u": "mkdir -p bundle && esbuild src/__start.ts --bundle --sourcemap=inline --minify --platform=node > bundle/bundle.js && mkdir -p ./dist && cp -r ./node_modules/uWebSockets.js/*.node ./dist",
"bundle": "mkdir -p bundle && esbuild src/__start.ts --bundle --external:*.node --sourcemap=inline --minify --platform=node --external:bufferutil --external:utf-8-validate --define:process.env.MODEL_VERSION=$(node ../../common/scripts/show_version.js) --define:process.env.GIT_REVISION=$(../../common/scripts/git_version.sh) > bundle/bundle.js",
"docker:build": "../../common/scripts/docker_build.sh hardcoreeng/transactor",
"docker:staging": "../../common/scripts/docker_tag.sh hardcoreeng/transactor staging",
"docker:push": "../../common/scripts/docker_tag.sh hardcoreeng/transactor",

View File

@ -16,15 +16,18 @@
// Add this to the VERY top of the first file loaded in your app
import contactPlugin from '@hcengineering/contact'
import notification from '@hcengineering/notification'
import { setMetadata } from '@hcengineering/platform'
import { serverConfigFromEnv, storageConfigFromEnv } from '@hcengineering/server'
import serverCore, { type StorageConfiguration } from '@hcengineering/server-core'
import serverNotification from '@hcengineering/server-notification'
import serverToken from '@hcengineering/server-token'
import { start } from '.'
import notification from '@hcengineering/notification'
import { serverFactories } from '@hcengineering/server-ws/src/factories'
const serverFactory = serverFactories[(process.env.SERVER_PROVIDER as string) ?? 'ws'] ?? serverFactories.ws
const config = serverConfigFromEnv()
const storageConfig: StorageConfiguration = storageConfigFromEnv()
const cursorMaxTime = process.env.SERVER_CURSOR_MAXTIMEMS
@ -53,7 +56,7 @@ const shutdown = start(config.url, {
storageConfig,
rekoniUrl: config.rekoniUrl,
port: config.serverPort,
serverFactory: config.serverFactory,
serverFactory,
indexParallel: 2,
indexProcessing: 50,
productId: '',

View File

@ -66,6 +66,7 @@ export interface Response<R> {
final: boolean
}
time?: number // Server time to perform operation
bfst?: number // Server time to perform operation
queue?: number
}
@ -95,7 +96,7 @@ export function protoDeserialize (data: any, binary: boolean): any {
}
return JSON.parse(_data.toString(), receiver)
}
return packr.unpack(new Uint8Array(replacer('', data)))
return packr.unpack(new Uint8Array(data))
}
/**

View File

@ -1,7 +1,6 @@
import { MinioConfig, MinioService } from '@hcengineering/minio'
import { createRawMongoDBAdapter } from '@hcengineering/mongo'
import { buildStorage, StorageAdapter, StorageConfiguration } from '@hcengineering/server-core'
import { serverFactories, ServerFactory } from '@hcengineering/server-ws'
import { StorageAdapter, StorageConfiguration, buildStorage } from '@hcengineering/server-core'
export function storageConfigFromEnv (): StorageConfiguration {
const storageConfig: StorageConfiguration = JSON.parse(
@ -60,7 +59,6 @@ export interface ServerEnv {
sesUrl: string | undefined
accountsUrl: string
serverPort: number
serverFactory: ServerFactory
enableCompression: boolean
elasticIndexName: string
pushPublicKey: string | undefined
@ -70,7 +68,6 @@ export interface ServerEnv {
export function serverConfigFromEnv (): ServerEnv {
const serverPort = parseInt(process.env.SERVER_PORT ?? '3333')
const serverFactory = serverFactories[(process.env.SERVER_PROVIDER as string) ?? 'ws'] ?? serverFactories.ws
const enableCompression = (process.env.ENABLE_COMPRESSION ?? 'true') === 'true'
const url = process.env.MONGO_URL
@ -136,7 +133,6 @@ export function serverConfigFromEnv (): ServerEnv {
sesUrl,
accountsUrl,
serverPort,
serverFactory,
enableCompression,
pushPublicKey,
pushPrivateKey,

2
server/ws/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
v20.43.0.zip
src/uws

View File

@ -5,17 +5,17 @@
"svelte": "src/index.ts",
"types": "types/index.d.ts",
"author": "Anticrm Platform Contributors",
"template": "@hcengineering/node-package",
"template": "@hcengineering/node-package-ws",
"license": "EPL-2.0",
"scripts": {
"build": "compile",
"build:watch": "compile",
"build": "./uws.sh && compile && cp ./src/uws/*.node ./lib/uws",
"build:watch": "./uws.sh && compile && cp ./src/uws/*.node ./lib/uws",
"test": "jest --passWithNoTests --silent --forceExit",
"format": "format src",
"_phase:build": "compile transpile src",
"_phase:build": "./uws.sh && compile transpile src && cp ./src/uws/*.node ./lib/uws",
"_phase:test": "jest --passWithNoTests --silent --forceExit",
"_phase:format": "format src",
"_phase:validate": "compile validate"
"_phase:validate": "./uws.sh && compile validate"
},
"devDependencies": {
"@hcengineering/platform-rig": "^0.6.0",

View File

@ -0,0 +1,36 @@
import { startHttpServer } from './server_http'
import { type ServerFactory } from './types'
/**
* @public
*/
export const serverFactories: Record<string, ServerFactory> = {
ws: startHttpServer,
uweb: (sessions, handleRequest, ctx, pipelineFactory, port, productId, enableCompression, accountsUrl) => {
try {
// eslint-disable-next-line @typescript-eslint/no-var-requires
const serverHttp = require('./server_u')
return serverHttp.startUWebsocketServer(
sessions,
handleRequest,
ctx,
pipelineFactory,
port,
productId,
enableCompression,
accountsUrl
)
} catch (err: any) {
console.error('uwebsocket.js is not supported, switcg back to nodejs ws')
return startHttpServer(
sessions,
handleRequest,
ctx,
pipelineFactory,
port,
productId,
enableCompression,
accountsUrl
)
}
}
}

View File

@ -14,19 +14,7 @@
// limitations under the License.
//
import { startHttpServer } from './server_http'
// import { startUWebsocketServer } from './server_u'
import { type ServerFactory } from './types'
export { start } from './server'
export * from './types'
export * from './client'
export { start } from './server'
export * from './server_http'
/**
* @public
*/
export const serverFactories: Record<string, ServerFactory> = {
ws: startHttpServer
// uweb: startUWebsocketServer
}
export * from './types'

View File

@ -19,19 +19,17 @@ import core, {
WorkspaceEvent,
generateId,
systemAccountEmail,
toFindResult,
toWorkspaceString,
versionToString,
withContext,
type BaseWorkspaceInfo,
type FindResult,
type MeasureContext,
type Tx,
type TxWorkspaceEvent,
type WorkspaceId
} from '@hcengineering/core'
import { unknownError } from '@hcengineering/platform'
import { readRequest, type HelloRequest, type HelloResponse, type Request, type Response } from '@hcengineering/rpc'
import { type HelloRequest, type HelloResponse, type Request, type Response } from '@hcengineering/rpc'
import type { Pipeline, SessionContext } from '@hcengineering/server-core'
import { type Token } from '@hcengineering/server-token'
@ -45,6 +43,7 @@ import {
type SessionManager,
type Workspace
} from './types'
import { sendResponse } from './utils'
interface WorkspaceLoginInfo extends Omit<BaseWorkspaceInfo, 'workspace'> {
upgrade?: {
@ -72,12 +71,6 @@ function onNextTick (op: () => void): void {
setImmediate(op)
}
function waitNextTick (): Promise<void> {
return new Promise<void>((resolve) => {
setImmediate(resolve)
})
}
/**
* @public
*/
@ -450,24 +443,19 @@ class TSessionManager implements SessionManager {
return
}
const ctx = this.ctx.newChild('📬 broadcast-all', {})
const sessions = [...workspace.sessions.values()]
const sessions = [...workspace.sessions.values()].filter((it) => {
return it !== undefined && (targets === undefined || targets.includes(it.session.getUser()))
})
function send (): void {
for (const session of sessions.splice(0, 1)) {
if (targets !== undefined && !targets.includes(session.session.getUser())) continue
for (const _tx of tx) {
try {
void session.socket.send(ctx, { result: _tx }, session.session.binaryMode, session.session.useCompression)
} catch (err: any) {
Analytics.handleError(err)
ctx.error('error during send', { error: err })
}
for (const session of sessions) {
try {
sendResponse(ctx, session.session, session.socket, { result: tx })
} catch (err: any) {
Analytics.handleError(err)
ctx.error('error during send', { error: err })
}
}
if (sessions.length > 0) {
onNextTick(send)
} else {
ctx.end()
}
ctx.end()
}
if (sessions.length > 0) {
// We need to send broadcast after our client response so put it after all IO
@ -740,18 +728,14 @@ class TSessionManager implements SessionManager {
const sessions = [...workspace.sessions.values()]
const ctx = this.ctx.newChild('📭 broadcast', {})
function send (): void {
for (const sessionRef of sessions.splice(0, 1)) {
if (sessionRef.session.sessionId !== from?.sessionId) {
for (const sessionRef of sessions) {
if (sessionRef !== undefined && sessionRef.session.sessionId !== from?.sessionId) {
if (target === undefined || target.includes(sessionRef.session.getUser())) {
void sessionRef.socket.send(ctx, resp, sessionRef.session.binaryMode, sessionRef.session.useCompression)
sendResponse(ctx, sessionRef.session, sessionRef.socket, resp)
}
}
}
if (sessions.length > 0) {
onNextTick(send)
} else {
ctx.end()
}
ctx.end()
}
if (sessions.length > 0) {
// We need to send broadcast after our client response so put it after all IO
@ -765,9 +749,9 @@ class TSessionManager implements SessionManager {
requestCtx: MeasureContext,
service: S,
ws: ConnectionSocket,
msg: any,
request: Request<any>,
workspace: string
): Promise<void> {
): Promise<Response<any> | undefined> {
const userCtx = requestCtx.newChild('📞 client', {
workspace: '🧲 ' + workspace
}) as SessionContext
@ -779,9 +763,7 @@ class TSessionManager implements SessionManager {
const st = Date.now()
try {
const backupMode = 'loadChunk' in service
await userCtx.with(`🧭 ${backupMode ? 'handleBackup' : 'handleRequest'}`, {}, async (ctx) => {
const request = readRequest(msg, service.binaryMode)
return await userCtx.with(`🧭 ${backupMode ? 'handleBackup' : 'handleRequest'}`, {}, async (ctx) => {
if (request.time != null) {
const delta = Date.now() - request.time
userCtx.measure('receive msg', delta)
@ -837,8 +819,7 @@ class TSessionManager implements SessionManager {
return
}
if (request.method === 'measure' || request.method === 'measure-done') {
await this.handleMeasure<S>(service, request, ctx, ws)
return
return await this.handleMeasure<S>(service, request, ctx, ws)
}
service.requests.set(reqId, {
id: reqId,
@ -859,25 +840,23 @@ class TSessionManager implements SessionManager {
? await f.apply(service, [service.measureCtx?.ctx, ...params])
: await ctx.with('🧨 process', {}, async (callTx) => f.apply(service, [callTx, ...params]))
const resp: Response<any> = {
return {
id: request.id,
result,
time: Date.now() - st,
bfst: Date.now(),
queue: service.requests.size
}
await handleSend(ctx, ws, resp, 32 * 1024, service.binaryMode, service.useCompression)
} catch (err: any) {
Analytics.handleError(err)
if (LOGGING_ENABLED) {
this.ctx.error('error handle request', { error: err, request })
}
const resp: Response<any> = {
return {
id: request.id,
error: unknownError(err),
result: JSON.parse(JSON.stringify(err?.stack))
}
await ws.send(ctx, resp, service.binaryMode, service.useCompression)
}
})
} finally {
@ -891,7 +870,7 @@ class TSessionManager implements SessionManager {
request: Request<any[]>,
ctx: MeasureContext,
ws: ConnectionSocket
): Promise<void> {
): Promise<Response<any> | undefined> {
let serverTime = 0
if (request.method === 'measure') {
service.measureCtx = { ctx: ctx.newChild('📶 ' + request.params[0], {}), time: Date.now() }
@ -902,72 +881,21 @@ class TSessionManager implements SessionManager {
}
}
try {
const resp: Response<any> = { id: request.id, result: request.method === 'measure' ? 'started' : serverTime }
await handleSend(ctx, ws, resp, 32 * 1024, service.binaryMode, service.useCompression)
return { id: request.id, result: request.method === 'measure' ? 'started' : serverTime }
} catch (err: any) {
Analytics.handleError(err)
if (LOGGING_ENABLED) {
ctx.error('error handle measure', { error: err, request })
}
const resp: Response<any> = {
return {
id: request.id,
error: unknownError(err),
result: JSON.parse(JSON.stringify(err?.stack))
}
await ws.send(ctx, resp, service.binaryMode, service.useCompression)
}
}
}
async function handleSend (
ctx: MeasureContext,
ws: ConnectionSocket,
msg: Response<any>,
chunkLimit: number,
useBinary: boolean,
useCompression: boolean
): Promise<void> {
// ws.send(msg)
if (Array.isArray(msg.result) && msg.result.length > 1 && chunkLimit > 0) {
// Split and send by chunks
const data = [...msg.result]
let cid = 1
const dataSize = JSON.stringify(data).length
const avg = Math.round(dataSize / data.length)
const itemChunk = Math.round(chunkLimit / avg) + 1
while (data.length > 0 && !ws.isClosed) {
let itemChunkCurrent = itemChunk
if (data.length - itemChunk < itemChunk / 2) {
itemChunkCurrent = data.length
}
const chunk: FindResult<any> = toFindResult(data.splice(0, itemChunkCurrent))
if (data.length === 0) {
const orig = msg.result as FindResult<any>
chunk.total = orig.total ?? 0
chunk.lookupMap = orig.lookupMap
}
if (chunk !== undefined) {
await ws.send(
ctx,
{ ...msg, result: chunk, chunk: { index: cid, final: data.length === 0 } },
useBinary,
useCompression
)
}
cid++
if (data.length > 0 && !ws.isClosed) {
await waitNextTick()
}
}
} else {
await ws.send(ctx, msg, useBinary, useCompression)
}
}
/**
* @public
*/

View File

@ -27,7 +27,6 @@ import { WebSocketServer, type RawData, type WebSocket } from 'ws'
import { getStatistics, wipeStatistics } from './stats'
import {
LOGGING_ENABLED,
type AddSessionResponse,
type ConnectionSocket,
type HandleRequestFunction,
type PipelineFactory,
@ -36,6 +35,7 @@ import {
import 'bufferutil'
import 'utf-8-validate'
import { doSessionOp, processRequest, type WebsocketData } from './utils'
/**
* @public
@ -204,62 +204,31 @@ export function startHttpServer (
mode: token.extra?.mode,
model: token.extra?.model
}
const cs: ConnectionSocket = {
id: generateId(),
isClosed: false,
close: () => {
cs.isClosed = true
ws.close()
},
data: () => data,
send: async (ctx: MeasureContext, msg, binary, compression) => {
if (ws.readyState !== ws.OPEN && !cs.isClosed) {
return 0
}
const smsg = serialize(msg, binary)
const cs: ConnectionSocket = createWebsocketClientSocket(ws, data)
ctx.measure('send-data', smsg.length)
while (ws.bufferedAmount > 128 && ws.readyState === ws.OPEN) {
await new Promise<void>((resolve) => {
setImmediate(resolve)
})
}
await new Promise<void>((resolve, reject) => {
ws.send(smsg, { binary: true, compress: compression }, (err) => {
if (err != null) {
reject(err)
}
resolve()
})
})
return smsg.length
}
const webSocketData: WebsocketData = {
connectionSocket: cs,
payload: token,
token: rawToken,
session: sessions.addSession(ctx, cs, token, rawToken, pipelineFactory, productId, sessionId, accountsUrl),
url: ''
}
let session: AddSessionResponse | Promise<AddSessionResponse> = sessions.addSession(
ctx,
cs,
token,
rawToken,
pipelineFactory,
productId,
sessionId,
accountsUrl
)
void session.then((s) => {
if ('upgrade' in s || 'error' in s) {
if ('error' in s) {
ctx.error('error', { error: s.error?.message, stack: s.error?.stack })
if (webSocketData.session instanceof Promise) {
void webSocketData.session.then((s) => {
if ('upgrade' in s || 'error' in s) {
if ('error' in s) {
ctx.error('error', { error: s.error?.message, stack: s.error?.stack })
}
void cs
.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false)
.then(() => {
cs.close()
})
}
void cs
.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false)
.then(() => {
cs.close()
})
}
})
})
}
// eslint-disable-next-line @typescript-eslint/no-misused-promises
ws.on('message', (msg: RawData) => {
try {
@ -270,18 +239,9 @@ export function startHttpServer (
buff = Buffer.concat(msg)
}
if (buff !== undefined) {
if (session instanceof Promise) {
void session.then((_session) => {
session = _session
if ('session' in _session) {
void handleRequest(_session.context, _session.session, cs, buff, _session.workspaceId)
}
})
} else {
if ('session' in session) {
void handleRequest(session.context, session.session, cs, buff, session.workspaceId)
}
}
doSessionOp(webSocketData, (s) => {
processRequest(s.session, cs, s.context, s.workspaceId, buff, handleRequest)
})
}
} catch (err: any) {
Analytics.handleError(err)
@ -292,28 +252,18 @@ export function startHttpServer (
})
// eslint-disable-next-line @typescript-eslint/no-misused-promises
ws.on('close', async (code: number, reason: Buffer) => {
if (session instanceof Promise) {
session = await session
}
if ('session' in session) {
if (!(session.session.workspaceClosed ?? false)) {
doSessionOp(webSocketData, (s) => {
if (!(s.session.workspaceClosed ?? false)) {
// remove session after 1seconds, give a time to reconnect.
void sessions.close(cs, toWorkspaceString(token.workspace))
}
}
})
})
ws.on('error', (err) => {
if (session instanceof Promise) {
void session.then((s) => {
if ('session' in session) {
console.error(session.session.getUser(), 'error', err)
}
})
}
if ('session' in session) {
console.error(session.session.getUser(), 'error', err)
}
doSessionOp(webSocketData, (s) => {
console.error(s.session.getUser(), 'error', err)
})
})
}
wss.on('connection', handleConnection as any)
@ -335,7 +285,6 @@ export function startHttpServer (
wss.handleUpgrade(request, socket, head, (ws) => {
void handleConnection(ws, request, payload, token, sessionId ?? undefined)
// wss.emit('connection', ws, request, payload, token, sessionId)
})
} catch (err: any) {
Analytics.handleError(err)
@ -385,3 +334,40 @@ export function startHttpServer (
)
}
}
function createWebsocketClientSocket (
ws: WebSocket,
data: { remoteAddress: string, userAgent: string, language: string, email: string, mode: any, model: any }
): ConnectionSocket {
const cs: ConnectionSocket = {
id: generateId(),
isClosed: false,
close: () => {
cs.isClosed = true
ws.close()
},
data: () => data,
send: async (ctx: MeasureContext, msg, binary, compression) => {
if (ws.readyState !== ws.OPEN && !cs.isClosed) {
return 0
}
const smsg = serialize(msg, binary)
while (ws.bufferedAmount > 128 && ws.readyState === ws.OPEN) {
await new Promise<void>((resolve) => {
setImmediate(resolve)
})
}
ctx.measure('send-data', smsg.length)
await new Promise<void>((resolve, reject) => {
ws.send(smsg, { binary: true, compress: compression }, (err) => {
if (err != null) {
reject(err)
}
resolve()
})
})
return smsg.length
}
}
return cs
}

312
server/ws/src/server_u.ts Normal file
View File

@ -0,0 +1,312 @@
//
// Copyright © 2023 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
// uWebSockets.js
// Import should be added: "uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.23.0"
import { generateId, toWorkspaceString, type MeasureContext } from '@hcengineering/core'
import { decodeToken } from '@hcengineering/server-token'
import { Analytics } from '@hcengineering/analytics'
import { serialize } from '@hcengineering/rpc'
import { getStatistics, wipeStatistics } from './stats'
import {
LOGGING_ENABLED,
type ConnectionSocket,
type HandleRequestFunction,
type PipelineFactory,
type SessionManager
} from './types'
import { doSessionOp, processRequest, type WebsocketData } from './utils'
import uWebSockets, { DISABLED, SHARED_COMPRESSOR, type HttpResponse, type WebSocket } from './uws'
interface WebsocketUserData extends WebsocketData {
backPressure?: Promise<void>
backPressureResolve?: () => void
unsendMsg: { data: any, binary: boolean, compression: boolean }[]
}
/**
* @public
* @param port -
* @param host -
*/
export function startUWebsocketServer (
sessions: SessionManager,
handleRequest: HandleRequestFunction,
ctx: MeasureContext,
pipelineFactory: PipelineFactory,
port: number,
productId: string,
enableCompression: boolean,
accountsUrl: string
): () => Promise<void> {
if (LOGGING_ENABLED) console.log(`starting U server on port ${port} ...`)
const uAPP = uWebSockets.App()
const writeStatus = (response: HttpResponse, status: string): HttpResponse => {
return response
.writeStatus(status)
.writeHeader('Access-Control-Allow-Origin', '*')
.writeHeader('Access-Control-Allow-Methods', 'GET, OPTIONS, PUT')
.writeHeader('Access-Control-Allow-Headers', 'Content-Type')
}
uAPP
.trace('/*', (res, req) => {
console.log(req.getUrl(), req.getMethod())
})
.ws<WebsocketUserData>('/*', {
/* There are many common helper features */
// idleTimeout: 32,
maxBackpressure: 256 * 1024,
maxPayloadLength: 50 * 1024 * 1024,
compression: enableCompression ? SHARED_COMPRESSOR : DISABLED,
idleTimeout: 0,
maxLifetime: 0,
sendPingsAutomatically: true,
upgrade (res, req, context) {
const url = new URL('http://localhost' + (req.getUrl() ?? ''))
const token = url.pathname.substring(1)
try {
const payload = decodeToken(token ?? '')
if (payload.workspace.productId !== productId) {
throw new Error('Invalid workspace product')
}
/* You MUST copy data out of req here, as req is only valid within this immediate callback */
const url = req.getUrl()
const secWebSocketKey = req.getHeader('sec-websocket-key')
const secWebSocketProtocol = req.getHeader('sec-websocket-protocol')
const secWebSocketExtensions = req.getHeader('sec-websocket-extensions')
/* This immediately calls open handler, you must not use res after this call */
res.upgrade<WebsocketUserData>(
{
payload,
token,
session: undefined,
unsendMsg: [],
url
},
/* Spell these correctly */
secWebSocketKey,
secWebSocketProtocol,
secWebSocketExtensions,
context
)
} catch (err) {
if (LOGGING_ENABLED) console.error('invalid token', err)
writeStatus(res, '401 Unauthorized').end()
}
},
open: (ws: WebSocket<WebsocketUserData>) => {
const data = ws.getUserData()
const wrData = {
remoteAddress: ws.getRemoteAddressAsText() ?? '',
userAgent: '',
language: '',
email: data.payload.email,
mode: data.payload.extra?.mode,
model: data.payload.extra?.model
}
data.connectionSocket = createWebSocketClientSocket(wrData, ws, data)
data.session = sessions.addSession(
ctx,
data.connectionSocket,
ws.getUserData().payload,
ws.getUserData().token,
pipelineFactory,
productId,
undefined,
accountsUrl
)
},
message: (ws, message, isBinary) => {
const data = ws.getUserData()
const msg = Buffer.copyBytesFrom(Buffer.from(message))
doSessionOp(data, (s) => {
processRequest(
s.session,
data.connectionSocket as ConnectionSocket,
s.context,
s.workspaceId,
msg,
handleRequest
)
})
},
drain: (ws) => {
console.log(`WebSocket backpressure: ${ws.getBufferedAmount()}`)
const data = ws.getUserData()
while (data.unsendMsg.length > 0) {
if (ws.send(data.unsendMsg[0].data, data.unsendMsg[0].binary, data.unsendMsg[0].compression) !== 1) {
ctx.measure('send-data', data.unsendMsg[0].data.length)
data.unsendMsg.shift()
// Ok we drained one item, let's unhold send
data.backPressureResolve?.()
data.backPressure = undefined
} else {
// Wait for next drain.
return
}
}
},
close: (ws, code, message) => {
const data = ws.getUserData()
doSessionOp(data, (s) => {
if (!(s.session.workspaceClosed ?? false)) {
// remove session after 1seconds, give a time to reconnect.
void sessions.close(data.connectionSocket as ConnectionSocket, toWorkspaceString(data.payload.workspace))
}
})
}
})
.any('/api/v1/statistics', (response, request) => {
const getUsers = (): any => Array.from(sessions.sessions.entries()).map(([k, v]) => v.session.getUser())
const token = request.getQuery('token') ?? ''
try {
const payload = decodeToken(token ?? '')
const admin = payload.extra?.admin === 'true'
const json = JSON.stringify({
...getStatistics(ctx, sessions, admin),
users: getUsers,
admin
})
writeStatus(response, '200 OK').writeHeader('Content-Type', 'application/json').end(json)
} catch (err: any) {
Analytics.handleError(err)
writeStatus(response, '404 ERROR').end()
}
})
.any('/api/v1/version', (response, request) => {
try {
writeStatus(response, '200 OK')
.writeHeader('Content-Type', 'application/json')
.end(
JSON.stringify({
version: process.env.MODEL_VERSION
})
)
} catch (err: any) {
Analytics.handleError(err)
writeStatus(response, '404 ERROR').writeHeader('Content-Type', 'application/json').end()
}
})
.any('/api/v1/manage', (res, req) => {
try {
const token = req.getQuery('token') as string
const payload = decodeToken(token)
if (payload.extra?.admin !== 'true') {
writeStatus(res, '404 ERROR').writeHeader('Content-Type', 'application/json').end()
return
}
const operation = req.getQuery('operation')
switch (operation) {
case 'maintenance': {
const timeMinutes = parseInt(req.getQuery('timeout' as string) ?? '5')
sessions.scheduleMaintenance(timeMinutes)
writeStatus(res, '200 OK').end()
return
}
case 'wipe-statistics': {
wipeStatistics(ctx)
writeStatus(res, '200 OK').end()
return
}
case 'force-close': {
const wsId = req.getQuery('wsId') as string
void sessions.forceClose(wsId)
writeStatus(res, '200 OK').end()
return
}
case 'reboot': {
process.exit(0)
}
}
writeStatus(res, '404 ERROR').end()
} catch (err: any) {
Analytics.handleError(err)
console.error(err)
writeStatus(res, '404 ERROR').end()
}
})
.any('/*', (res, req) => {
res.end('')
})
.listen(port, (s) => {})
return async () => {
await sessions.closeWorkspaces(ctx)
}
}
function createWebSocketClientSocket (
wrData: { remoteAddress: ArrayBuffer, userAgent: string, language: string, email: string, mode: any, model: any },
ws: uWebSockets.WebSocket<WebsocketUserData>,
data: WebsocketUserData
): ConnectionSocket {
const cs: ConnectionSocket = {
id: generateId(),
isClosed: false,
data: () => wrData,
close: () => {
cs.isClosed = true
try {
ws.close()
} catch (err) {
// Ignore closed
}
},
send: async (ctx, msg, binary, compression): Promise<number> => {
await data.backPressure
const serialized = serialize(msg, binary)
try {
const sendR = ws.send(serialized, binary, compression)
if (sendR === 2) {
data.backPressure = new Promise((resolve) => {
data.backPressureResolve = resolve
})
data.unsendMsg.push({ data: serialized, binary, compression })
} else {
ctx.measure('send-data', serialized.length)
}
} catch (err: any) {
if (!((err.message ?? '') as string).includes('Invalid access of closed')) {
console.error(err)
}
// Ignore socket is closed
}
return serialized.length
}
}
return cs
}

View File

@ -1,208 +0,0 @@
//
// Copyright © 2023 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
// uWebSockets.js
// Import should be added: "uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.23.0"
import { MeasureContext, generateId } from '@hcengineering/core'
import { Token, decodeToken } from '@hcengineering/server-token'
import { serialize } from '@hcengineering/rpc'
import uWebSockets, { SHARED_COMPRESSOR, WebSocket } from 'uWebSockets.js'
import { getStatistics } from './stats'
import {
ConnectionSocket,
HandleRequestFunction,
LOGGING_ENABLED,
PipelineFactory,
Session,
SessionManager
} from './types'
interface WebsocketUserData {
wrapper?: ConnectionSocket
payload: Token
session?: Promise<Session>
backPressure?: Promise<void>
backPressureResolve?: () => void
unsendMsg: any[]
}
/**
* @public
* @param port -
* @param host -
*/
export function startUWebsocketServer (
sessions: SessionManager,
handleRequest: HandleRequestFunction,
ctx: MeasureContext,
pipelineFactory: PipelineFactory,
port: number,
productId: string
): () => Promise<void> {
if (LOGGING_ENABLED) console.log(`starting U server on port ${port} ...`)
const uAPP = uWebSockets.App()
uAPP
.ws<WebsocketUserData>('/*', {
/* There are many common helper features */
// idleTimeout: 32,
// maxBackpressure: 1024,
maxPayloadLength: 50 * 1024 * 1024,
compression: SHARED_COMPRESSOR,
maxLifetime: 0,
sendPingsAutomatically: true,
upgrade (res, req, context) {
const url = new URL('http://localhost' + (req.getUrl() ?? ''))
const token = url.pathname.substring(1)
try {
const payload = decodeToken(token ?? '')
if (payload.workspace.productId !== productId) {
throw new Error('Invalid workspace product')
}
/* This immediately calls open handler, you must not use res after this call */
res.upgrade<WebsocketUserData>(
{
payload,
session: undefined,
unsendMsg: []
},
/* Spell these correctly */
req.getHeader('sec-websocket-key'),
req.getHeader('sec-websocket-protocol'),
req.getHeader('sec-websocket-extensions'),
context
)
} catch (err) {
if (LOGGING_ENABLED) console.error('invalid token', err)
res.writeStatus('401 Unauthorized').end()
}
},
open: (ws: WebSocket<WebsocketUserData>) => {
const data = ws.getUserData()
data.wrapper = {
id: generateId(),
close: () => {
try {
ws.close()
} catch (err) {
// Ignore closed
}
},
send: async (ctx, msg): Promise<void> => {
await ctx.with('backpressure', {}, async () => await data.backPressure)
const serialized = await ctx.with('serialize', {}, async () => serialize(msg))
ctx.measure('send-data', serialized.length)
try {
const sendR = await ctx.with('backpressure', {}, async () =>
ws.send(serialized, false, Array.isArray(msg.result))
)
if (sendR === 2) {
data.backPressure = new Promise((resolve) => {
data.backPressureResolve = resolve
})
data.unsendMsg.push(msg)
}
} catch (err: any) {
if (!((err.message ?? '') as string).includes('Invalid access of closed')) {
console.error(err)
}
// Ignore socket is closed
}
}
}
data.session = sessions.addSession(
ctx,
data.wrapper,
ws.getUserData().payload,
pipelineFactory,
productId,
undefined
)
},
message: (ws, message, isBinary) => {
const data = ws.getUserData()
const enc = new TextDecoder('utf-8')
const tmsg = enc.decode(message)
void data.session?.then((s) => {
void handleRequest(ctx, s, data.wrapper as ConnectionSocket, tmsg, data.payload.workspace.name)
})
},
drain: (ws) => {
console.log(`WebSocket backpressure: ${ws.getBufferedAmount()}`)
const data = ws.getUserData()
while (data.unsendMsg.length > 0) {
if (ws.send(data.unsendMsg[0]) !== 1) {
data.unsendMsg.shift()
} else {
// Wait for next drain.
return
}
}
data.backPressureResolve?.()
data.backPressure = undefined
},
close: (ws, code, message) => {
const enc = new TextDecoder('utf-8')
const data = ws.getUserData()
try {
const tmsg = enc.decode(message)
if (tmsg !== undefined && tmsg !== '') {
console.error(tmsg)
}
} catch (err) {
console.error(err)
}
void data.session?.then((s) => {
void sessions.close(ctx, data.wrapper as ConnectionSocket, data.payload.workspace, code, '')
})
}
})
.any('/*', (response, request) => {
const url = new URL('http://localhost' + (request.getUrl() ?? ''))
const token = url.pathname.substring(1)
try {
const payload = decodeToken(token ?? '')
console.log(payload.workspace, 'statistics request')
const json = JSON.stringify(getStatistics(ctx, sessions))
response
.writeStatus('200 OK')
.writeHeader('Content-Type', 'application/json')
.writeHeader('Access-Control-Allow-Origin', '*')
.writeHeader('Access-Control-Allow-Methods', 'GET, OPTIONS')
.writeHeader('Access-Control-Allow-Headers', 'Content-Type')
.end(json)
} catch (err) {
response.writeHead(404, {})
response.end()
}
})
.listen(port, (s) => {})
return async () => {
await sessions.closeWorkspaces(ctx)
}
}

View File

@ -11,7 +11,7 @@ import {
type WorkspaceId,
type WorkspaceIdWithUrl
} from '@hcengineering/core'
import { type Response } from '@hcengineering/rpc'
import { type Request, type Response } from '@hcengineering/rpc'
import { type BroadcastFunc, type Pipeline } from '@hcengineering/server-core'
import { type Token } from '@hcengineering/server-token'
@ -132,10 +132,12 @@ export interface Workspace {
workspaceName: string
}
export type AddSessionResponse =
| { session: Session, context: MeasureContext, workspaceId: string }
| { upgrade: true }
| { error: any }
export interface AddSessionActive {
session: Session
context: MeasureContext
workspaceId: string
}
export type AddSessionResponse = AddSessionActive | { upgrade: true } | { error: any }
/**
* @public
@ -185,9 +187,9 @@ export type HandleRequestFunction = <S extends Session>(
rctx: MeasureContext,
service: S,
ws: ConnectionSocket,
msg: Buffer,
msg: Request<any>,
workspaceId: string
) => Promise<void>
) => Promise<Response<any> | undefined>
/**
* @public

105
server/ws/src/utils.ts Normal file
View File

@ -0,0 +1,105 @@
import { toFindResult, type FindResult, type MeasureContext } from '@hcengineering/core'
import { readRequest, type Response } from '@hcengineering/rpc'
import type { Token } from '@hcengineering/server-token'
import type { AddSessionActive, AddSessionResponse, ConnectionSocket, HandleRequestFunction, Session } from './types'
export interface WebsocketData {
connectionSocket?: ConnectionSocket
payload: Token
token: string
session: Promise<AddSessionResponse> | AddSessionResponse | undefined
url: string
}
export function doSessionOp (data: WebsocketData, op: (session: AddSessionActive) => void): void {
if (data.session instanceof Promise) {
void data.session.then((_session) => {
data.session = _session
if ('session' in _session) {
op(_session)
}
})
} else {
if (data.session !== undefined && 'session' in data.session) {
op(data.session)
}
}
}
export function processRequest (
session: Session,
cs: ConnectionSocket,
context: MeasureContext,
workspaceId: string,
buff: any,
handleRequest: HandleRequestFunction
): void {
const request = readRequest(buff, session.binaryMode)
void handleRequest(context, session, cs, request, workspaceId).then((resp) => {
if (resp !== undefined) {
void handleSend(context, cs, resp, 32 * 1024, session.binaryMode, session.useCompression)
}
})
}
export function sendResponse (
ctx: MeasureContext,
session: Session,
socket: ConnectionSocket,
resp: Response<any>
): void {
void handleSend(ctx, socket, resp, 32 * 1024, session.binaryMode, session.useCompression)
}
function waitNextTick (): Promise<void> | undefined {
return new Promise<void>((resolve) => {
setImmediate(resolve)
})
}
export async function handleSend (
ctx: MeasureContext,
ws: ConnectionSocket,
msg: Response<any>,
chunkLimit: number,
useBinary: boolean,
useCompression: boolean
): Promise<void> {
// ws.send(msg)
if (Array.isArray(msg.result) && msg.result.length > 1 && chunkLimit > 0) {
// Split and send by chunks
const data = [...msg.result]
let cid = 1
const dataSize = JSON.stringify(data).length
const avg = Math.round(dataSize / data.length)
const itemChunk = Math.round(chunkLimit / avg) + 1
while (data.length > 0 && !ws.isClosed) {
let itemChunkCurrent = itemChunk
if (data.length - itemChunk < itemChunk / 2) {
itemChunkCurrent = data.length
}
const chunk: FindResult<any> = toFindResult(data.splice(0, itemChunkCurrent))
if (data.length === 0) {
const orig = msg.result as FindResult<any>
chunk.total = orig.total ?? 0
chunk.lookupMap = orig.lookupMap
}
if (chunk !== undefined) {
await ws.send(
ctx,
{ ...msg, result: chunk, chunk: { index: cid, final: data.length === 0 } },
useBinary,
useCompression
)
}
cid++
if (data.length > 0 && !ws.isClosed) {
await waitNextTick()
}
}
} else {
await ws.send(ctx, msg, useBinary, useCompression)
}
}

8
server/ws/uws.sh Executable file
View File

@ -0,0 +1,8 @@
if test -f ./src/server_u.ts; then
if ! test -f ./v20.43.0.zip; then
wget --quiet https://github.com/uNetworking/uWebSockets.js/archive/refs/tags/v20.43.0.zip
fi
if ! test -f ./src/uws/uws.js; then
unzip -qq -j ./v20.43.0.zip -d ./src/uws/
fi
fi

View File

@ -91,6 +91,7 @@ services:
ports:
- 3334:3334
environment:
- SERVER_PROVIDER=${SERVER_PROVIDER}
- SERVER_PORT=3334
- SERVER_SECRET=secret
- ELASTIC_URL=http://elastic:9200