From 096bfacad786b397eedacc4490e6cfeaced61a47 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Mon, 6 May 2024 16:15:39 +0700 Subject: [PATCH] UBERF-6778: Add Support to uWebSocket.js library (#5503) Signed-off-by: Andrey Sobolev --- .github/workflows/main.yml | 72 +++++ packages/platform-rig/bin/format.js | 5 + plugins/client-resources/src/connection.ts | 337 +++++++++++---------- pods/server/Dockerfile | 4 + pods/server/package.json | 3 +- pods/server/src/__start.ts | 7 +- server/rpc/src/rpc.ts | 3 +- server/server/src/starter.ts | 6 +- server/ws/.gitignore | 2 + server/ws/package.json | 10 +- server/ws/src/factories.ts | 36 +++ server/ws/src/index.ts | 16 +- server/ws/src/server.ts | 124 ++------ server/ws/src/server_http.ts | 150 +++++---- server/ws/src/server_u.ts | 312 +++++++++++++++++++ server/ws/src/server_u.ts_ | 208 ------------- server/ws/src/types.ts | 16 +- server/ws/src/utils.ts | 105 +++++++ server/ws/uws.sh | 8 + tests/docker-compose.yaml | 1 + 20 files changed, 841 insertions(+), 584 deletions(-) create mode 100644 server/ws/.gitignore create mode 100644 server/ws/src/factories.ts create mode 100644 server/ws/src/server_u.ts delete mode 100644 server/ws/src/server_u.ts_ create mode 100644 server/ws/src/utils.ts create mode 100755 server/ws/uws.sh diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 530499cda0..a12a2827d9 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -282,6 +282,78 @@ jobs: # with: # name: db-snapshot # path: ./tests/db_dump + uitest-uweb: + runs-on: ubuntu-latest + timeout-minutes: 60 + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + filter: tree:0 + - uses: actions/setup-node@v4 + with: + node-version-file: '.nvmrc' + - name: Cache node modules + uses: actions/cache@v4 + env: + cache-name: cache-node-platform + with: + path: | + common/temp + key: ${{ runner.os }}-build-${{ env.cache-name }}-${{ hashFiles('**/pnpm-lock.yaml') }} + restore-keys: | + ${{ runner.os }}-build-${{ env.cache-name }}-${{ hashFiles('**/pnpm-lock.yaml') }} + + - name: Checking for mis-matching dependencies... + run: node common/scripts/install-run-rush.js check + + - name: Installing... + run: node common/scripts/install-run-rush.js install + + - name: Docker Build + run: node common/scripts/install-run-rush.js docker:build -p 20 + env: + DOCKER_CLI_HINTS: false + - name: Prepare server + env: + SERVER_PROVIDER: uweb + run: | + cd ./tests + ./prepare.sh + - name: Install Playwright + run: | + cd ./tests/sanity + node ../../common/scripts/install-run-rushx.js ci + - name: Run UI tests + run: | + cd ./tests/sanity + node ../../common/scripts/install-run-rushx.js uitest + - name: "Store docker logs" + if: always() + run: | + cd ./tests/sanity + mkdir logs + docker logs $(docker ps | grep transactor | cut -f 1 -d ' ') > logs/uweb-transactor.log + docker logs $(docker ps | grep account | cut -f 1 -d ' ') > logs/uweb-account.log + docker logs $(docker ps | grep front | cut -f 1 -d ' ') > logs/uweb-front.log + - name: Upload test results + if: always() + uses: actions/upload-artifact@v4 + with: + name: playwright-results-uweb + path: ./tests/sanity/playwright-report/ + - name: Upload Logs + if: always() + uses: actions/upload-artifact@v4 + with: + name: docker-logs-uweb + path: ./tests/sanity/logs + # - name: Upload DB snapshot + # if: always() + # uses: actions/upload-artifact@v3 + # with: + # name: db-snapshot + # path: ./tests/db_dump docker-build: needs: [build, test, svelte-check, uitest] runs-on: ubuntu-latest diff --git a/packages/platform-rig/bin/format.js b/packages/platform-rig/bin/format.js index 6fb9a011b8..2bd7894f0f 100755 --- a/packages/platform-rig/bin/format.js +++ b/packages/platform-rig/bin/format.js @@ -46,6 +46,10 @@ function calcHash(source, msg, addCheck) { if( !ext.endsWith('.ts') && !ext.endsWith('.js') && !ext.endsWith('.svelte')) { continue } + if( sourceFile.endsWith('.d.ts') ) { + // Skip declaration files + continue + } calcFileHash(sourceFile, msg, addCheck) } } @@ -76,6 +80,7 @@ if( process.argv.includes('-f') || process.argv.includes('--force')) { filesToCheck = allFiles } + if( filesToCheck.length > 0 ) { console.info(`running prettier ${filesToCheck.length}`) // Changes detected. diff --git a/plugins/client-resources/src/connection.ts b/plugins/client-resources/src/connection.ts index 8d6f47b4e3..8a966c6aba 100644 --- a/plugins/client-resources/src/connection.ts +++ b/plugins/client-resources/src/connection.ts @@ -46,7 +46,7 @@ import core, { } from '@hcengineering/core' import { PlatformError, UNAUTHORIZED, broadcastEvent, getMetadata, unknownError } from '@hcengineering/platform' -import { HelloRequest, HelloResponse, ReqId, readResponse, serialize } from '@hcengineering/rpc' +import { HelloRequest, HelloResponse, ReqId, readResponse, serialize, type Response } from '@hcengineering/rpc' const SECOND = 1000 const pingTimeout = 10 * SECOND @@ -55,7 +55,7 @@ const dialTimeout = 30 * SECOND class RequestPromise { startTime: number = Date.now() - handleTime?: (diff: number, result: any, serverTime: number, queue: number) => void + handleTime?: (diff: number, result: any, serverTime: number, queue: number, toRecieve: number) => void readonly promise: Promise resolve!: (value?: any) => void reject!: (reason?: any) => void @@ -198,6 +198,170 @@ class Connection implements ClientConnection { } } + handleMsg (socketId: number, resp: Response): void { + if (resp.error !== undefined) { + if (resp.error?.code === UNAUTHORIZED.code) { + Analytics.handleError(new PlatformError(resp.error)) + this.closed = true + this.websocket?.close() + this.onUnauthorized?.() + } + console.error(resp.error) + return + } + + if (resp.id === -1) { + this.delay = 0 + if (resp.result?.state === 'upgrading') { + void this.onConnect?.(ClientConnectEvent.Maintenance, resp.result.stats) + this.upgrading = true + this.delay = 3 + return + } + if (resp.result === 'hello') { + if (this.upgrading) { + // We need to call upgrade since connection is upgraded + this.onUpgrade?.() + } + + this.upgrading = false + if ((resp as HelloResponse).alreadyConnected === true) { + this.sessionId = generateId() + if (typeof sessionStorage !== 'undefined') { + sessionStorage.setItem('session.id.' + this.url, this.sessionId) + } + console.log('Connection: alreadyConnected, reconnect with new Id') + clearTimeout(dialTimeout) + this.scheduleOpen(true) + return + } + if ((resp as HelloResponse).binary) { + this.binaryMode = true + } + // Notify all waiting connection listeners + const handlers = this.onConnectHandlers.splice(0, this.onConnectHandlers.length) + for (const h of handlers) { + h() + } + + for (const [, v] of this.requests.entries()) { + v.reconnect?.() + } + + void this.onConnect?.( + (resp as HelloResponse).reconnect === true ? ClientConnectEvent.Reconnected : ClientConnectEvent.Connected + ) + this.schedulePing(socketId) + return + } else { + Analytics.handleError(new Error(`unexpected response: ${JSON.stringify(resp)}`)) + } + return + } + if (resp.result === 'ping') { + void this.sendRequest({ method: 'ping', params: [] }) + return + } + if (resp.id !== undefined) { + const promise = this.requests.get(resp.id) + if (promise === undefined) { + console.error( + new Error(`unknown response id: ${resp.id as string} ${this.workspace} ${this.email}`), + JSON.stringify(this.requests) + ) + return + } + + if (resp.chunk !== undefined) { + promise.chunks = [ + ...(promise.chunks ?? []), + { + index: resp.chunk.index, + data: resp.result as FindResult + } + ] + // console.log(socketId, 'chunk', promise.method, promise.params, promise.chunks.length, (resp.result as []).length) + if (resp.chunk.final) { + promise.chunks.sort((a, b) => a.index - b.index) + let result: any[] = [] + let total = -1 + let lookupMap: Record | undefined + + for (const c of promise.chunks) { + if (c.data.total !== 0) { + total = c.data.total + } + if (c.data.lookupMap !== undefined) { + lookupMap = c.data.lookupMap + } + result = result.concat(c.data) + } + resp.result = toFindResult(result, total, lookupMap) + resp.chunk = undefined + } else { + // Not all chunks are available yet. + return + } + } + + const request = this.requests.get(resp.id) + promise.handleTime?.( + Date.now() - promise.startTime, + resp.result, + resp.time ?? 0, + resp.queue ?? 0, + Date.now() - (resp.bfst ?? 0) + ) + this.requests.delete(resp.id) + if (resp.error !== undefined) { + console.log( + 'ERROR', + 'request:', + request?.method, + 'response-id:', + resp.id, + 'error: ', + resp.error, + 'result: ', + resp.result, + this.workspace, + this.email + ) + promise.reject(new PlatformError(resp.error)) + } else { + if (request?.handleResult !== undefined) { + void request.handleResult(resp.result).then(() => { + promise.resolve(resp.result) + }) + } else { + promise.resolve(resp.result) + } + } + void broadcastEvent(client.event.NetworkRequests, this.requests.size) + } else { + const txArr = Array.isArray(resp.result) ? (resp.result as Tx[]) : [resp.result as Tx] + + for (const tx of txArr) { + if ( + (tx?._class === core.class.TxWorkspaceEvent && (tx as TxWorkspaceEvent).event === WorkspaceEvent.Upgrade) || + tx?._class === core.class.TxModelUpgrade + ) { + console.log('Processing upgrade', this.workspace, this.email) + this.onUpgrade?.() + return + } + } + this.handler(...txArr) + + clearTimeout(this.incomingTimer) + void broadcastEvent(client.event.NetworkRequests, this.requests.size + 1) + + this.incomingTimer = setTimeout(() => { + void broadcastEvent(client.event.NetworkRequests, this.requests.size) + }, 500) + } + } + private openConnection (socketId: number): void { this.binaryMode = false // Use defined factory or browser default one. @@ -205,7 +369,7 @@ class Connection implements ClientConnection { getMetadata(client.metadata.ClientSocketFactory) ?? ((url: string) => { const s = new WebSocket(url) - s.binaryType = 'arraybuffer' + // s.binaryType = 'arraybuffer' return s as ClientSocket }) @@ -245,162 +409,14 @@ class Connection implements ClientConnection { if (this.websocket !== wsocket) { return } - const resp = readResponse(event.data, this.binaryMode) - - if (resp.error !== undefined) { - if (resp.error?.code === UNAUTHORIZED.code) { - Analytics.handleError(new PlatformError(resp.error)) - this.closed = true - this.websocket.close() - this.onUnauthorized?.() - } - console.error(resp.error) - return - } - - if (resp.id === -1) { - this.delay = 0 - if (resp.result?.state === 'upgrading') { - void this.onConnect?.(ClientConnectEvent.Maintenance, resp.result.stats) - this.upgrading = true - this.delay = 3 - return - } - if (resp.result === 'hello') { - if (this.upgrading) { - // We need to call upgrade since connection is upgraded - this.onUpgrade?.() - } - - this.upgrading = false - if ((resp as HelloResponse).alreadyConnected === true) { - this.sessionId = generateId() - if (typeof sessionStorage !== 'undefined') { - sessionStorage.setItem('session.id.' + this.url, this.sessionId) - } - console.log('Connection: alreadyConnected, reconnect with new Id') - clearTimeout(dialTimeout) - this.scheduleOpen(true) - return - } - if ((resp as HelloResponse).binary) { - this.binaryMode = true - } - // Notify all waiting connection listeners - const handlers = this.onConnectHandlers.splice(0, this.onConnectHandlers.length) - for (const h of handlers) { - h() - } - - for (const [, v] of this.requests.entries()) { - v.reconnect?.() - } - - void this.onConnect?.( - (resp as HelloResponse).reconnect === true ? ClientConnectEvent.Reconnected : ClientConnectEvent.Connected - ) - this.schedulePing(socketId) - return - } else { - Analytics.handleError(new Error(`unexpected response: ${JSON.stringify(resp)}`)) - } - return - } - if (resp.result === 'ping') { - void this.sendRequest({ method: 'ping', params: [] }) - return - } - if (resp.id !== undefined) { - const promise = this.requests.get(resp.id) - if (promise === undefined) { - console.error( - new Error(`unknown response id: ${resp.id as string} ${this.workspace} ${this.email}`), - JSON.stringify(this.requests) - ) - return - } - - if (resp.chunk !== undefined) { - promise.chunks = [ - ...(promise.chunks ?? []), - { - index: resp.chunk.index, - data: resp.result as FindResult - } - ] - // console.log(socketId, 'chunk', promise.method, promise.params, promise.chunks.length, (resp.result as []).length) - if (resp.chunk.final) { - promise.chunks.sort((a, b) => a.index - b.index) - let result: any[] = [] - let total = -1 - let lookupMap: Record | undefined - - for (const c of promise.chunks) { - if (c.data.total !== 0) { - total = c.data.total - } - if (c.data.lookupMap !== undefined) { - lookupMap = c.data.lookupMap - } - result = result.concat(c.data) - } - resp.result = toFindResult(result, total, lookupMap) - resp.chunk = undefined - } else { - // Not all chunks are available yet. - return - } - } - - const request = this.requests.get(resp.id) - promise.handleTime?.(Date.now() - promise.startTime, resp.result, resp.time ?? 0, resp.queue ?? 0) - this.requests.delete(resp.id) - if (resp.error !== undefined) { - console.log( - 'ERROR', - 'request:', - request?.method, - 'response-id:', - resp.id, - 'error: ', - resp.error, - 'result: ', - resp.result, - this.workspace, - this.email - ) - promise.reject(new PlatformError(resp.error)) - } else { - if (request?.handleResult !== undefined) { - void request.handleResult(resp.result).then(() => { - promise.resolve(resp.result) - }) - } else { - promise.resolve(resp.result) - } - } - void broadcastEvent(client.event.NetworkRequests, this.requests.size) + if (event.data instanceof Blob) { + void event.data.arrayBuffer().then((data) => { + const resp = readResponse(data, this.binaryMode) + this.handleMsg(socketId, resp) + }) } else { - const txArr = Array.isArray(resp.result) ? (resp.result as Tx[]) : [resp.result as Tx] - - for (const tx of txArr) { - if ( - (tx?._class === core.class.TxWorkspaceEvent && (tx as TxWorkspaceEvent).event === WorkspaceEvent.Upgrade) || - tx?._class === core.class.TxModelUpgrade - ) { - console.log('Processing upgrade', this.workspace, this.email) - this.onUpgrade?.() - return - } - } - this.handler(...txArr) - - clearTimeout(this.incomingTimer) - void broadcastEvent(client.event.NetworkRequests, this.requests.size + 1) - - this.incomingTimer = setTimeout(() => { - void broadcastEvent(client.event.NetworkRequests, this.requests.size) - }, 500) + const resp = readResponse(event.data, this.binaryMode) + this.handleMsg(socketId, resp) } } wsocket.onclose = (ev) => { @@ -454,7 +470,7 @@ class Connection implements ClientConnection { retry?: () => Promise handleResult?: (result: any) => Promise once?: boolean // Require handleResult to retrieve result - measure?: (time: number, result: any, serverTime: number, queue: number) => void + measure?: (time: number, result: any, serverTime: number, queue: number, toRecieve: number) => void allowReconnect?: boolean }): Promise { if (this.closed) { @@ -547,12 +563,13 @@ class Connection implements ClientConnection { const result = await this.sendRequest({ method: 'findAll', params: [_class, query, options], - measure: (time, result, serverTime, queue) => { + measure: (time, result, serverTime, queue, toReceive) => { if (typeof window !== 'undefined' && (time > 1000 || serverTime > 500)) { console.error( 'measure slow findAll', time, serverTime, + toReceive, queue, _class, query, diff --git a/pods/server/Dockerfile b/pods/server/Dockerfile index 477fcbfa66..8b5dcfa780 100644 --- a/pods/server/Dockerfile +++ b/pods/server/Dockerfile @@ -4,8 +4,12 @@ ENV NODE_ENV production WORKDIR /app RUN npm install --ignore-scripts=false --verbose bufferutil utf-8-validate @mongodb-js/zstd --unsafe-perm +RUN npm install --ignore-scripts=false --verbose uNetworking/uWebSockets.js#v20.43.0 COPY bundle/bundle.js ./ +RUN mv node_modules/uWebSockets.js/*.node . EXPOSE 8080 + +ENV UWS_HTTP_MAX_HEADERS_SIZE 32768 CMD [ "node", "./bundle.js" ] diff --git a/pods/server/package.json b/pods/server/package.json index fb17a5bd0d..be4480a8ae 100644 --- a/pods/server/package.json +++ b/pods/server/package.json @@ -13,8 +13,7 @@ "_phase:bundle": "rushx bundle", "_phase:docker-build": "rushx docker:build", "_phase:docker-staging": "rushx docker:staging", - "bundle": "mkdir -p bundle && esbuild src/__start.ts --bundle --sourcemap=inline --minify --platform=node --external:bufferutil --external:utf-8-validate --define:process.env.MODEL_VERSION=$(node ../../common/scripts/show_version.js) --define:process.env.GIT_REVISION=$(../../common/scripts/git_version.sh) > bundle/bundle.js", - "bundle:u": "mkdir -p bundle && esbuild src/__start.ts --bundle --sourcemap=inline --minify --platform=node > bundle/bundle.js && mkdir -p ./dist && cp -r ./node_modules/uWebSockets.js/*.node ./dist", + "bundle": "mkdir -p bundle && esbuild src/__start.ts --bundle --external:*.node --sourcemap=inline --minify --platform=node --external:bufferutil --external:utf-8-validate --define:process.env.MODEL_VERSION=$(node ../../common/scripts/show_version.js) --define:process.env.GIT_REVISION=$(../../common/scripts/git_version.sh) > bundle/bundle.js", "docker:build": "../../common/scripts/docker_build.sh hardcoreeng/transactor", "docker:staging": "../../common/scripts/docker_tag.sh hardcoreeng/transactor staging", "docker:push": "../../common/scripts/docker_tag.sh hardcoreeng/transactor", diff --git a/pods/server/src/__start.ts b/pods/server/src/__start.ts index b6c4ff2685..0e4a26298d 100644 --- a/pods/server/src/__start.ts +++ b/pods/server/src/__start.ts @@ -16,15 +16,18 @@ // Add this to the VERY top of the first file loaded in your app import contactPlugin from '@hcengineering/contact' +import notification from '@hcengineering/notification' import { setMetadata } from '@hcengineering/platform' import { serverConfigFromEnv, storageConfigFromEnv } from '@hcengineering/server' import serverCore, { type StorageConfiguration } from '@hcengineering/server-core' import serverNotification from '@hcengineering/server-notification' import serverToken from '@hcengineering/server-token' import { start } from '.' -import notification from '@hcengineering/notification' +import { serverFactories } from '@hcengineering/server-ws/src/factories' +const serverFactory = serverFactories[(process.env.SERVER_PROVIDER as string) ?? 'ws'] ?? serverFactories.ws const config = serverConfigFromEnv() + const storageConfig: StorageConfiguration = storageConfigFromEnv() const cursorMaxTime = process.env.SERVER_CURSOR_MAXTIMEMS @@ -53,7 +56,7 @@ const shutdown = start(config.url, { storageConfig, rekoniUrl: config.rekoniUrl, port: config.serverPort, - serverFactory: config.serverFactory, + serverFactory, indexParallel: 2, indexProcessing: 50, productId: '', diff --git a/server/rpc/src/rpc.ts b/server/rpc/src/rpc.ts index 77807cd841..b2448f1201 100644 --- a/server/rpc/src/rpc.ts +++ b/server/rpc/src/rpc.ts @@ -66,6 +66,7 @@ export interface Response { final: boolean } time?: number // Server time to perform operation + bfst?: number // Server time to perform operation queue?: number } @@ -95,7 +96,7 @@ export function protoDeserialize (data: any, binary: boolean): any { } return JSON.parse(_data.toString(), receiver) } - return packr.unpack(new Uint8Array(replacer('', data))) + return packr.unpack(new Uint8Array(data)) } /** diff --git a/server/server/src/starter.ts b/server/server/src/starter.ts index 959a1ad47e..6188675d87 100644 --- a/server/server/src/starter.ts +++ b/server/server/src/starter.ts @@ -1,7 +1,6 @@ import { MinioConfig, MinioService } from '@hcengineering/minio' import { createRawMongoDBAdapter } from '@hcengineering/mongo' -import { buildStorage, StorageAdapter, StorageConfiguration } from '@hcengineering/server-core' -import { serverFactories, ServerFactory } from '@hcengineering/server-ws' +import { StorageAdapter, StorageConfiguration, buildStorage } from '@hcengineering/server-core' export function storageConfigFromEnv (): StorageConfiguration { const storageConfig: StorageConfiguration = JSON.parse( @@ -60,7 +59,6 @@ export interface ServerEnv { sesUrl: string | undefined accountsUrl: string serverPort: number - serverFactory: ServerFactory enableCompression: boolean elasticIndexName: string pushPublicKey: string | undefined @@ -70,7 +68,6 @@ export interface ServerEnv { export function serverConfigFromEnv (): ServerEnv { const serverPort = parseInt(process.env.SERVER_PORT ?? '3333') - const serverFactory = serverFactories[(process.env.SERVER_PROVIDER as string) ?? 'ws'] ?? serverFactories.ws const enableCompression = (process.env.ENABLE_COMPRESSION ?? 'true') === 'true' const url = process.env.MONGO_URL @@ -136,7 +133,6 @@ export function serverConfigFromEnv (): ServerEnv { sesUrl, accountsUrl, serverPort, - serverFactory, enableCompression, pushPublicKey, pushPrivateKey, diff --git a/server/ws/.gitignore b/server/ws/.gitignore new file mode 100644 index 0000000000..3054b64f07 --- /dev/null +++ b/server/ws/.gitignore @@ -0,0 +1,2 @@ +v20.43.0.zip +src/uws \ No newline at end of file diff --git a/server/ws/package.json b/server/ws/package.json index c7125fc870..56d8ce8d44 100644 --- a/server/ws/package.json +++ b/server/ws/package.json @@ -5,17 +5,17 @@ "svelte": "src/index.ts", "types": "types/index.d.ts", "author": "Anticrm Platform Contributors", - "template": "@hcengineering/node-package", + "template": "@hcengineering/node-package-ws", "license": "EPL-2.0", "scripts": { - "build": "compile", - "build:watch": "compile", + "build": "./uws.sh && compile && cp ./src/uws/*.node ./lib/uws", + "build:watch": "./uws.sh && compile && cp ./src/uws/*.node ./lib/uws", "test": "jest --passWithNoTests --silent --forceExit", "format": "format src", - "_phase:build": "compile transpile src", + "_phase:build": "./uws.sh && compile transpile src && cp ./src/uws/*.node ./lib/uws", "_phase:test": "jest --passWithNoTests --silent --forceExit", "_phase:format": "format src", - "_phase:validate": "compile validate" + "_phase:validate": "./uws.sh && compile validate" }, "devDependencies": { "@hcengineering/platform-rig": "^0.6.0", diff --git a/server/ws/src/factories.ts b/server/ws/src/factories.ts new file mode 100644 index 0000000000..6522713023 --- /dev/null +++ b/server/ws/src/factories.ts @@ -0,0 +1,36 @@ +import { startHttpServer } from './server_http' +import { type ServerFactory } from './types' +/** + * @public + */ +export const serverFactories: Record = { + ws: startHttpServer, + uweb: (sessions, handleRequest, ctx, pipelineFactory, port, productId, enableCompression, accountsUrl) => { + try { + // eslint-disable-next-line @typescript-eslint/no-var-requires + const serverHttp = require('./server_u') + return serverHttp.startUWebsocketServer( + sessions, + handleRequest, + ctx, + pipelineFactory, + port, + productId, + enableCompression, + accountsUrl + ) + } catch (err: any) { + console.error('uwebsocket.js is not supported, switcg back to nodejs ws') + return startHttpServer( + sessions, + handleRequest, + ctx, + pipelineFactory, + port, + productId, + enableCompression, + accountsUrl + ) + } + } +} diff --git a/server/ws/src/index.ts b/server/ws/src/index.ts index b797e26063..e37fd94beb 100644 --- a/server/ws/src/index.ts +++ b/server/ws/src/index.ts @@ -14,19 +14,7 @@ // limitations under the License. // -import { startHttpServer } from './server_http' -// import { startUWebsocketServer } from './server_u' -import { type ServerFactory } from './types' - -export { start } from './server' -export * from './types' export * from './client' +export { start } from './server' export * from './server_http' - -/** - * @public - */ -export const serverFactories: Record = { - ws: startHttpServer - // uweb: startUWebsocketServer -} +export * from './types' diff --git a/server/ws/src/server.ts b/server/ws/src/server.ts index 1b0151fd90..e0ac1e6888 100644 --- a/server/ws/src/server.ts +++ b/server/ws/src/server.ts @@ -19,19 +19,17 @@ import core, { WorkspaceEvent, generateId, systemAccountEmail, - toFindResult, toWorkspaceString, versionToString, withContext, type BaseWorkspaceInfo, - type FindResult, type MeasureContext, type Tx, type TxWorkspaceEvent, type WorkspaceId } from '@hcengineering/core' import { unknownError } from '@hcengineering/platform' -import { readRequest, type HelloRequest, type HelloResponse, type Request, type Response } from '@hcengineering/rpc' +import { type HelloRequest, type HelloResponse, type Request, type Response } from '@hcengineering/rpc' import type { Pipeline, SessionContext } from '@hcengineering/server-core' import { type Token } from '@hcengineering/server-token' @@ -45,6 +43,7 @@ import { type SessionManager, type Workspace } from './types' +import { sendResponse } from './utils' interface WorkspaceLoginInfo extends Omit { upgrade?: { @@ -72,12 +71,6 @@ function onNextTick (op: () => void): void { setImmediate(op) } -function waitNextTick (): Promise { - return new Promise((resolve) => { - setImmediate(resolve) - }) -} - /** * @public */ @@ -450,24 +443,19 @@ class TSessionManager implements SessionManager { return } const ctx = this.ctx.newChild('📬 broadcast-all', {}) - const sessions = [...workspace.sessions.values()] + const sessions = [...workspace.sessions.values()].filter((it) => { + return it !== undefined && (targets === undefined || targets.includes(it.session.getUser())) + }) function send (): void { - for (const session of sessions.splice(0, 1)) { - if (targets !== undefined && !targets.includes(session.session.getUser())) continue - for (const _tx of tx) { - try { - void session.socket.send(ctx, { result: _tx }, session.session.binaryMode, session.session.useCompression) - } catch (err: any) { - Analytics.handleError(err) - ctx.error('error during send', { error: err }) - } + for (const session of sessions) { + try { + sendResponse(ctx, session.session, session.socket, { result: tx }) + } catch (err: any) { + Analytics.handleError(err) + ctx.error('error during send', { error: err }) } } - if (sessions.length > 0) { - onNextTick(send) - } else { - ctx.end() - } + ctx.end() } if (sessions.length > 0) { // We need to send broadcast after our client response so put it after all IO @@ -740,18 +728,14 @@ class TSessionManager implements SessionManager { const sessions = [...workspace.sessions.values()] const ctx = this.ctx.newChild('📭 broadcast', {}) function send (): void { - for (const sessionRef of sessions.splice(0, 1)) { - if (sessionRef.session.sessionId !== from?.sessionId) { + for (const sessionRef of sessions) { + if (sessionRef !== undefined && sessionRef.session.sessionId !== from?.sessionId) { if (target === undefined || target.includes(sessionRef.session.getUser())) { - void sessionRef.socket.send(ctx, resp, sessionRef.session.binaryMode, sessionRef.session.useCompression) + sendResponse(ctx, sessionRef.session, sessionRef.socket, resp) } } } - if (sessions.length > 0) { - onNextTick(send) - } else { - ctx.end() - } + ctx.end() } if (sessions.length > 0) { // We need to send broadcast after our client response so put it after all IO @@ -765,9 +749,9 @@ class TSessionManager implements SessionManager { requestCtx: MeasureContext, service: S, ws: ConnectionSocket, - msg: any, + request: Request, workspace: string - ): Promise { + ): Promise | undefined> { const userCtx = requestCtx.newChild('📞 client', { workspace: '🧲 ' + workspace }) as SessionContext @@ -779,9 +763,7 @@ class TSessionManager implements SessionManager { const st = Date.now() try { const backupMode = 'loadChunk' in service - await userCtx.with(`🧭 ${backupMode ? 'handleBackup' : 'handleRequest'}`, {}, async (ctx) => { - const request = readRequest(msg, service.binaryMode) - + return await userCtx.with(`🧭 ${backupMode ? 'handleBackup' : 'handleRequest'}`, {}, async (ctx) => { if (request.time != null) { const delta = Date.now() - request.time userCtx.measure('receive msg', delta) @@ -837,8 +819,7 @@ class TSessionManager implements SessionManager { return } if (request.method === 'measure' || request.method === 'measure-done') { - await this.handleMeasure(service, request, ctx, ws) - return + return await this.handleMeasure(service, request, ctx, ws) } service.requests.set(reqId, { id: reqId, @@ -859,25 +840,23 @@ class TSessionManager implements SessionManager { ? await f.apply(service, [service.measureCtx?.ctx, ...params]) : await ctx.with('🧨 process', {}, async (callTx) => f.apply(service, [callTx, ...params])) - const resp: Response = { + return { id: request.id, result, time: Date.now() - st, + bfst: Date.now(), queue: service.requests.size } - - await handleSend(ctx, ws, resp, 32 * 1024, service.binaryMode, service.useCompression) } catch (err: any) { Analytics.handleError(err) if (LOGGING_ENABLED) { this.ctx.error('error handle request', { error: err, request }) } - const resp: Response = { + return { id: request.id, error: unknownError(err), result: JSON.parse(JSON.stringify(err?.stack)) } - await ws.send(ctx, resp, service.binaryMode, service.useCompression) } }) } finally { @@ -891,7 +870,7 @@ class TSessionManager implements SessionManager { request: Request, ctx: MeasureContext, ws: ConnectionSocket - ): Promise { + ): Promise | undefined> { let serverTime = 0 if (request.method === 'measure') { service.measureCtx = { ctx: ctx.newChild('📶 ' + request.params[0], {}), time: Date.now() } @@ -902,72 +881,21 @@ class TSessionManager implements SessionManager { } } try { - const resp: Response = { id: request.id, result: request.method === 'measure' ? 'started' : serverTime } - - await handleSend(ctx, ws, resp, 32 * 1024, service.binaryMode, service.useCompression) + return { id: request.id, result: request.method === 'measure' ? 'started' : serverTime } } catch (err: any) { Analytics.handleError(err) if (LOGGING_ENABLED) { ctx.error('error handle measure', { error: err, request }) } - const resp: Response = { + return { id: request.id, error: unknownError(err), result: JSON.parse(JSON.stringify(err?.stack)) } - await ws.send(ctx, resp, service.binaryMode, service.useCompression) } } } -async function handleSend ( - ctx: MeasureContext, - ws: ConnectionSocket, - msg: Response, - chunkLimit: number, - useBinary: boolean, - useCompression: boolean -): Promise { - // ws.send(msg) - if (Array.isArray(msg.result) && msg.result.length > 1 && chunkLimit > 0) { - // Split and send by chunks - const data = [...msg.result] - - let cid = 1 - const dataSize = JSON.stringify(data).length - const avg = Math.round(dataSize / data.length) - const itemChunk = Math.round(chunkLimit / avg) + 1 - - while (data.length > 0 && !ws.isClosed) { - let itemChunkCurrent = itemChunk - if (data.length - itemChunk < itemChunk / 2) { - itemChunkCurrent = data.length - } - const chunk: FindResult = toFindResult(data.splice(0, itemChunkCurrent)) - if (data.length === 0) { - const orig = msg.result as FindResult - chunk.total = orig.total ?? 0 - chunk.lookupMap = orig.lookupMap - } - if (chunk !== undefined) { - await ws.send( - ctx, - { ...msg, result: chunk, chunk: { index: cid, final: data.length === 0 } }, - useBinary, - useCompression - ) - } - cid++ - - if (data.length > 0 && !ws.isClosed) { - await waitNextTick() - } - } - } else { - await ws.send(ctx, msg, useBinary, useCompression) - } -} - /** * @public */ diff --git a/server/ws/src/server_http.ts b/server/ws/src/server_http.ts index a13100bd4a..eca6f46079 100644 --- a/server/ws/src/server_http.ts +++ b/server/ws/src/server_http.ts @@ -27,7 +27,6 @@ import { WebSocketServer, type RawData, type WebSocket } from 'ws' import { getStatistics, wipeStatistics } from './stats' import { LOGGING_ENABLED, - type AddSessionResponse, type ConnectionSocket, type HandleRequestFunction, type PipelineFactory, @@ -36,6 +35,7 @@ import { import 'bufferutil' import 'utf-8-validate' +import { doSessionOp, processRequest, type WebsocketData } from './utils' /** * @public @@ -204,62 +204,31 @@ export function startHttpServer ( mode: token.extra?.mode, model: token.extra?.model } - const cs: ConnectionSocket = { - id: generateId(), - isClosed: false, - close: () => { - cs.isClosed = true - ws.close() - }, - data: () => data, - send: async (ctx: MeasureContext, msg, binary, compression) => { - if (ws.readyState !== ws.OPEN && !cs.isClosed) { - return 0 - } - const smsg = serialize(msg, binary) + const cs: ConnectionSocket = createWebsocketClientSocket(ws, data) - ctx.measure('send-data', smsg.length) - - while (ws.bufferedAmount > 128 && ws.readyState === ws.OPEN) { - await new Promise((resolve) => { - setImmediate(resolve) - }) - } - await new Promise((resolve, reject) => { - ws.send(smsg, { binary: true, compress: compression }, (err) => { - if (err != null) { - reject(err) - } - resolve() - }) - }) - return smsg.length - } + const webSocketData: WebsocketData = { + connectionSocket: cs, + payload: token, + token: rawToken, + session: sessions.addSession(ctx, cs, token, rawToken, pipelineFactory, productId, sessionId, accountsUrl), + url: '' } - let session: AddSessionResponse | Promise = sessions.addSession( - ctx, - cs, - token, - rawToken, - pipelineFactory, - productId, - sessionId, - accountsUrl - ) - - void session.then((s) => { - if ('upgrade' in s || 'error' in s) { - if ('error' in s) { - ctx.error('error', { error: s.error?.message, stack: s.error?.stack }) + if (webSocketData.session instanceof Promise) { + void webSocketData.session.then((s) => { + if ('upgrade' in s || 'error' in s) { + if ('error' in s) { + ctx.error('error', { error: s.error?.message, stack: s.error?.stack }) + } + void cs + .send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false) + .then(() => { + cs.close() + }) } - void cs - .send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false) - .then(() => { - cs.close() - }) - } - }) + }) + } + // eslint-disable-next-line @typescript-eslint/no-misused-promises ws.on('message', (msg: RawData) => { try { @@ -270,18 +239,9 @@ export function startHttpServer ( buff = Buffer.concat(msg) } if (buff !== undefined) { - if (session instanceof Promise) { - void session.then((_session) => { - session = _session - if ('session' in _session) { - void handleRequest(_session.context, _session.session, cs, buff, _session.workspaceId) - } - }) - } else { - if ('session' in session) { - void handleRequest(session.context, session.session, cs, buff, session.workspaceId) - } - } + doSessionOp(webSocketData, (s) => { + processRequest(s.session, cs, s.context, s.workspaceId, buff, handleRequest) + }) } } catch (err: any) { Analytics.handleError(err) @@ -292,28 +252,18 @@ export function startHttpServer ( }) // eslint-disable-next-line @typescript-eslint/no-misused-promises ws.on('close', async (code: number, reason: Buffer) => { - if (session instanceof Promise) { - session = await session - } - if ('session' in session) { - if (!(session.session.workspaceClosed ?? false)) { + doSessionOp(webSocketData, (s) => { + if (!(s.session.workspaceClosed ?? false)) { // remove session after 1seconds, give a time to reconnect. void sessions.close(cs, toWorkspaceString(token.workspace)) } - } + }) }) ws.on('error', (err) => { - if (session instanceof Promise) { - void session.then((s) => { - if ('session' in session) { - console.error(session.session.getUser(), 'error', err) - } - }) - } - if ('session' in session) { - console.error(session.session.getUser(), 'error', err) - } + doSessionOp(webSocketData, (s) => { + console.error(s.session.getUser(), 'error', err) + }) }) } wss.on('connection', handleConnection as any) @@ -335,7 +285,6 @@ export function startHttpServer ( wss.handleUpgrade(request, socket, head, (ws) => { void handleConnection(ws, request, payload, token, sessionId ?? undefined) - // wss.emit('connection', ws, request, payload, token, sessionId) }) } catch (err: any) { Analytics.handleError(err) @@ -385,3 +334,40 @@ export function startHttpServer ( ) } } +function createWebsocketClientSocket ( + ws: WebSocket, + data: { remoteAddress: string, userAgent: string, language: string, email: string, mode: any, model: any } +): ConnectionSocket { + const cs: ConnectionSocket = { + id: generateId(), + isClosed: false, + close: () => { + cs.isClosed = true + ws.close() + }, + data: () => data, + send: async (ctx: MeasureContext, msg, binary, compression) => { + if (ws.readyState !== ws.OPEN && !cs.isClosed) { + return 0 + } + const smsg = serialize(msg, binary) + + while (ws.bufferedAmount > 128 && ws.readyState === ws.OPEN) { + await new Promise((resolve) => { + setImmediate(resolve) + }) + } + ctx.measure('send-data', smsg.length) + await new Promise((resolve, reject) => { + ws.send(smsg, { binary: true, compress: compression }, (err) => { + if (err != null) { + reject(err) + } + resolve() + }) + }) + return smsg.length + } + } + return cs +} diff --git a/server/ws/src/server_u.ts b/server/ws/src/server_u.ts new file mode 100644 index 0000000000..0a8ae512c9 --- /dev/null +++ b/server/ws/src/server_u.ts @@ -0,0 +1,312 @@ +// +// Copyright © 2023 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// uWebSockets.js +// Import should be added: "uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.23.0" + +import { generateId, toWorkspaceString, type MeasureContext } from '@hcengineering/core' +import { decodeToken } from '@hcengineering/server-token' + +import { Analytics } from '@hcengineering/analytics' +import { serialize } from '@hcengineering/rpc' +import { getStatistics, wipeStatistics } from './stats' +import { + LOGGING_ENABLED, + type ConnectionSocket, + type HandleRequestFunction, + type PipelineFactory, + type SessionManager +} from './types' + +import { doSessionOp, processRequest, type WebsocketData } from './utils' +import uWebSockets, { DISABLED, SHARED_COMPRESSOR, type HttpResponse, type WebSocket } from './uws' + +interface WebsocketUserData extends WebsocketData { + backPressure?: Promise + backPressureResolve?: () => void + unsendMsg: { data: any, binary: boolean, compression: boolean }[] +} +/** + * @public + * @param port - + * @param host - + */ +export function startUWebsocketServer ( + sessions: SessionManager, + handleRequest: HandleRequestFunction, + ctx: MeasureContext, + pipelineFactory: PipelineFactory, + port: number, + productId: string, + enableCompression: boolean, + accountsUrl: string +): () => Promise { + if (LOGGING_ENABLED) console.log(`starting U server on port ${port} ...`) + + const uAPP = uWebSockets.App() + + const writeStatus = (response: HttpResponse, status: string): HttpResponse => { + return response + .writeStatus(status) + .writeHeader('Access-Control-Allow-Origin', '*') + .writeHeader('Access-Control-Allow-Methods', 'GET, OPTIONS, PUT') + .writeHeader('Access-Control-Allow-Headers', 'Content-Type') + } + + uAPP + .trace('/*', (res, req) => { + console.log(req.getUrl(), req.getMethod()) + }) + .ws('/*', { + /* There are many common helper features */ + // idleTimeout: 32, + maxBackpressure: 256 * 1024, + maxPayloadLength: 50 * 1024 * 1024, + compression: enableCompression ? SHARED_COMPRESSOR : DISABLED, + idleTimeout: 0, + maxLifetime: 0, + sendPingsAutomatically: true, + + upgrade (res, req, context) { + const url = new URL('http://localhost' + (req.getUrl() ?? '')) + const token = url.pathname.substring(1) + + try { + const payload = decodeToken(token ?? '') + + if (payload.workspace.productId !== productId) { + throw new Error('Invalid workspace product') + } + + /* You MUST copy data out of req here, as req is only valid within this immediate callback */ + const url = req.getUrl() + const secWebSocketKey = req.getHeader('sec-websocket-key') + const secWebSocketProtocol = req.getHeader('sec-websocket-protocol') + const secWebSocketExtensions = req.getHeader('sec-websocket-extensions') + + /* This immediately calls open handler, you must not use res after this call */ + res.upgrade( + { + payload, + token, + session: undefined, + unsendMsg: [], + url + }, + /* Spell these correctly */ + secWebSocketKey, + secWebSocketProtocol, + secWebSocketExtensions, + context + ) + } catch (err) { + if (LOGGING_ENABLED) console.error('invalid token', err) + writeStatus(res, '401 Unauthorized').end() + } + }, + open: (ws: WebSocket) => { + const data = ws.getUserData() + + const wrData = { + remoteAddress: ws.getRemoteAddressAsText() ?? '', + userAgent: '', + language: '', + email: data.payload.email, + mode: data.payload.extra?.mode, + model: data.payload.extra?.model + } + data.connectionSocket = createWebSocketClientSocket(wrData, ws, data) + + data.session = sessions.addSession( + ctx, + data.connectionSocket, + ws.getUserData().payload, + ws.getUserData().token, + pipelineFactory, + productId, + undefined, + accountsUrl + ) + }, + message: (ws, message, isBinary) => { + const data = ws.getUserData() + const msg = Buffer.copyBytesFrom(Buffer.from(message)) + + doSessionOp(data, (s) => { + processRequest( + s.session, + data.connectionSocket as ConnectionSocket, + s.context, + s.workspaceId, + msg, + handleRequest + ) + }) + }, + drain: (ws) => { + console.log(`WebSocket backpressure: ${ws.getBufferedAmount()}`) + const data = ws.getUserData() + while (data.unsendMsg.length > 0) { + if (ws.send(data.unsendMsg[0].data, data.unsendMsg[0].binary, data.unsendMsg[0].compression) !== 1) { + ctx.measure('send-data', data.unsendMsg[0].data.length) + data.unsendMsg.shift() + + // Ok we drained one item, let's unhold send + data.backPressureResolve?.() + data.backPressure = undefined + } else { + // Wait for next drain. + return + } + } + }, + close: (ws, code, message) => { + const data = ws.getUserData() + doSessionOp(data, (s) => { + if (!(s.session.workspaceClosed ?? false)) { + // remove session after 1seconds, give a time to reconnect. + void sessions.close(data.connectionSocket as ConnectionSocket, toWorkspaceString(data.payload.workspace)) + } + }) + } + }) + .any('/api/v1/statistics', (response, request) => { + const getUsers = (): any => Array.from(sessions.sessions.entries()).map(([k, v]) => v.session.getUser()) + + const token = request.getQuery('token') ?? '' + try { + const payload = decodeToken(token ?? '') + const admin = payload.extra?.admin === 'true' + + const json = JSON.stringify({ + ...getStatistics(ctx, sessions, admin), + users: getUsers, + admin + }) + + writeStatus(response, '200 OK').writeHeader('Content-Type', 'application/json').end(json) + } catch (err: any) { + Analytics.handleError(err) + writeStatus(response, '404 ERROR').end() + } + }) + .any('/api/v1/version', (response, request) => { + try { + writeStatus(response, '200 OK') + .writeHeader('Content-Type', 'application/json') + .end( + JSON.stringify({ + version: process.env.MODEL_VERSION + }) + ) + } catch (err: any) { + Analytics.handleError(err) + writeStatus(response, '404 ERROR').writeHeader('Content-Type', 'application/json').end() + } + }) + .any('/api/v1/manage', (res, req) => { + try { + const token = req.getQuery('token') as string + const payload = decodeToken(token) + if (payload.extra?.admin !== 'true') { + writeStatus(res, '404 ERROR').writeHeader('Content-Type', 'application/json').end() + return + } + + const operation = req.getQuery('operation') + + switch (operation) { + case 'maintenance': { + const timeMinutes = parseInt(req.getQuery('timeout' as string) ?? '5') + sessions.scheduleMaintenance(timeMinutes) + + writeStatus(res, '200 OK').end() + return + } + case 'wipe-statistics': { + wipeStatistics(ctx) + + writeStatus(res, '200 OK').end() + return + } + case 'force-close': { + const wsId = req.getQuery('wsId') as string + void sessions.forceClose(wsId) + writeStatus(res, '200 OK').end() + return + } + case 'reboot': { + process.exit(0) + } + } + + writeStatus(res, '404 ERROR').end() + } catch (err: any) { + Analytics.handleError(err) + console.error(err) + writeStatus(res, '404 ERROR').end() + } + }) + .any('/*', (res, req) => { + res.end('') + }) + + .listen(port, (s) => {}) + + return async () => { + await sessions.closeWorkspaces(ctx) + } +} +function createWebSocketClientSocket ( + wrData: { remoteAddress: ArrayBuffer, userAgent: string, language: string, email: string, mode: any, model: any }, + ws: uWebSockets.WebSocket, + data: WebsocketUserData +): ConnectionSocket { + const cs: ConnectionSocket = { + id: generateId(), + isClosed: false, + data: () => wrData, + close: () => { + cs.isClosed = true + try { + ws.close() + } catch (err) { + // Ignore closed + } + }, + send: async (ctx, msg, binary, compression): Promise => { + await data.backPressure + const serialized = serialize(msg, binary) + try { + const sendR = ws.send(serialized, binary, compression) + if (sendR === 2) { + data.backPressure = new Promise((resolve) => { + data.backPressureResolve = resolve + }) + data.unsendMsg.push({ data: serialized, binary, compression }) + } else { + ctx.measure('send-data', serialized.length) + } + } catch (err: any) { + if (!((err.message ?? '') as string).includes('Invalid access of closed')) { + console.error(err) + } + // Ignore socket is closed + } + return serialized.length + } + } + return cs +} diff --git a/server/ws/src/server_u.ts_ b/server/ws/src/server_u.ts_ deleted file mode 100644 index 59fb8614d2..0000000000 --- a/server/ws/src/server_u.ts_ +++ /dev/null @@ -1,208 +0,0 @@ -// -// Copyright © 2023 Hardcore Engineering Inc. -// -// Licensed under the Eclipse Public License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. You may -// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// -// See the License for the specific language governing permissions and -// limitations under the License. -// - -// uWebSockets.js -// Import should be added: "uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.23.0" - -import { MeasureContext, generateId } from '@hcengineering/core' -import { Token, decodeToken } from '@hcengineering/server-token' - -import { serialize } from '@hcengineering/rpc' -import uWebSockets, { SHARED_COMPRESSOR, WebSocket } from 'uWebSockets.js' -import { getStatistics } from './stats' -import { - ConnectionSocket, - HandleRequestFunction, - LOGGING_ENABLED, - PipelineFactory, - Session, - SessionManager -} from './types' - -interface WebsocketUserData { - wrapper?: ConnectionSocket - payload: Token - session?: Promise - backPressure?: Promise - backPressureResolve?: () => void - unsendMsg: any[] -} - -/** - * @public - * @param port - - * @param host - - */ -export function startUWebsocketServer ( - sessions: SessionManager, - handleRequest: HandleRequestFunction, - ctx: MeasureContext, - pipelineFactory: PipelineFactory, - port: number, - productId: string -): () => Promise { - if (LOGGING_ENABLED) console.log(`starting U server on port ${port} ...`) - - const uAPP = uWebSockets.App() - - uAPP - .ws('/*', { - /* There are many common helper features */ - // idleTimeout: 32, - // maxBackpressure: 1024, - maxPayloadLength: 50 * 1024 * 1024, - compression: SHARED_COMPRESSOR, - maxLifetime: 0, - sendPingsAutomatically: true, - - upgrade (res, req, context) { - const url = new URL('http://localhost' + (req.getUrl() ?? '')) - const token = url.pathname.substring(1) - - try { - const payload = decodeToken(token ?? '') - - if (payload.workspace.productId !== productId) { - throw new Error('Invalid workspace product') - } - - /* This immediately calls open handler, you must not use res after this call */ - res.upgrade( - { - payload, - session: undefined, - unsendMsg: [] - }, - /* Spell these correctly */ - req.getHeader('sec-websocket-key'), - req.getHeader('sec-websocket-protocol'), - req.getHeader('sec-websocket-extensions'), - context - ) - } catch (err) { - if (LOGGING_ENABLED) console.error('invalid token', err) - res.writeStatus('401 Unauthorized').end() - } - }, - open: (ws: WebSocket) => { - const data = ws.getUserData() - data.wrapper = { - id: generateId(), - close: () => { - try { - ws.close() - } catch (err) { - // Ignore closed - } - }, - send: async (ctx, msg): Promise => { - await ctx.with('backpressure', {}, async () => await data.backPressure) - const serialized = await ctx.with('serialize', {}, async () => serialize(msg)) - ctx.measure('send-data', serialized.length) - try { - const sendR = await ctx.with('backpressure', {}, async () => - ws.send(serialized, false, Array.isArray(msg.result)) - ) - if (sendR === 2) { - data.backPressure = new Promise((resolve) => { - data.backPressureResolve = resolve - }) - data.unsendMsg.push(msg) - } - } catch (err: any) { - if (!((err.message ?? '') as string).includes('Invalid access of closed')) { - console.error(err) - } - // Ignore socket is closed - } - } - } - - data.session = sessions.addSession( - ctx, - data.wrapper, - ws.getUserData().payload, - pipelineFactory, - productId, - undefined - ) - }, - message: (ws, message, isBinary) => { - const data = ws.getUserData() - const enc = new TextDecoder('utf-8') - const tmsg = enc.decode(message) - - void data.session?.then((s) => { - void handleRequest(ctx, s, data.wrapper as ConnectionSocket, tmsg, data.payload.workspace.name) - }) - }, - drain: (ws) => { - console.log(`WebSocket backpressure: ${ws.getBufferedAmount()}`) - const data = ws.getUserData() - while (data.unsendMsg.length > 0) { - if (ws.send(data.unsendMsg[0]) !== 1) { - data.unsendMsg.shift() - } else { - // Wait for next drain. - return - } - } - data.backPressureResolve?.() - data.backPressure = undefined - }, - close: (ws, code, message) => { - const enc = new TextDecoder('utf-8') - const data = ws.getUserData() - try { - const tmsg = enc.decode(message) - if (tmsg !== undefined && tmsg !== '') { - console.error(tmsg) - } - } catch (err) { - console.error(err) - } - void data.session?.then((s) => { - void sessions.close(ctx, data.wrapper as ConnectionSocket, data.payload.workspace, code, '') - }) - } - }) - .any('/*', (response, request) => { - const url = new URL('http://localhost' + (request.getUrl() ?? '')) - - const token = url.pathname.substring(1) - try { - const payload = decodeToken(token ?? '') - console.log(payload.workspace, 'statistics request') - - const json = JSON.stringify(getStatistics(ctx, sessions)) - - response - .writeStatus('200 OK') - .writeHeader('Content-Type', 'application/json') - .writeHeader('Access-Control-Allow-Origin', '*') - .writeHeader('Access-Control-Allow-Methods', 'GET, OPTIONS') - .writeHeader('Access-Control-Allow-Headers', 'Content-Type') - .end(json) - } catch (err) { - response.writeHead(404, {}) - response.end() - } - }) - .listen(port, (s) => {}) - - return async () => { - await sessions.closeWorkspaces(ctx) - } -} diff --git a/server/ws/src/types.ts b/server/ws/src/types.ts index 75380fbe04..f1d8ff28b4 100644 --- a/server/ws/src/types.ts +++ b/server/ws/src/types.ts @@ -11,7 +11,7 @@ import { type WorkspaceId, type WorkspaceIdWithUrl } from '@hcengineering/core' -import { type Response } from '@hcengineering/rpc' +import { type Request, type Response } from '@hcengineering/rpc' import { type BroadcastFunc, type Pipeline } from '@hcengineering/server-core' import { type Token } from '@hcengineering/server-token' @@ -132,10 +132,12 @@ export interface Workspace { workspaceName: string } -export type AddSessionResponse = - | { session: Session, context: MeasureContext, workspaceId: string } - | { upgrade: true } - | { error: any } +export interface AddSessionActive { + session: Session + context: MeasureContext + workspaceId: string +} +export type AddSessionResponse = AddSessionActive | { upgrade: true } | { error: any } /** * @public @@ -185,9 +187,9 @@ export type HandleRequestFunction = ( rctx: MeasureContext, service: S, ws: ConnectionSocket, - msg: Buffer, + msg: Request, workspaceId: string -) => Promise +) => Promise | undefined> /** * @public diff --git a/server/ws/src/utils.ts b/server/ws/src/utils.ts new file mode 100644 index 0000000000..98580f2825 --- /dev/null +++ b/server/ws/src/utils.ts @@ -0,0 +1,105 @@ +import { toFindResult, type FindResult, type MeasureContext } from '@hcengineering/core' +import { readRequest, type Response } from '@hcengineering/rpc' +import type { Token } from '@hcengineering/server-token' +import type { AddSessionActive, AddSessionResponse, ConnectionSocket, HandleRequestFunction, Session } from './types' + +export interface WebsocketData { + connectionSocket?: ConnectionSocket + payload: Token + token: string + session: Promise | AddSessionResponse | undefined + url: string +} + +export function doSessionOp (data: WebsocketData, op: (session: AddSessionActive) => void): void { + if (data.session instanceof Promise) { + void data.session.then((_session) => { + data.session = _session + if ('session' in _session) { + op(_session) + } + }) + } else { + if (data.session !== undefined && 'session' in data.session) { + op(data.session) + } + } +} + +export function processRequest ( + session: Session, + cs: ConnectionSocket, + context: MeasureContext, + workspaceId: string, + buff: any, + handleRequest: HandleRequestFunction +): void { + const request = readRequest(buff, session.binaryMode) + void handleRequest(context, session, cs, request, workspaceId).then((resp) => { + if (resp !== undefined) { + void handleSend(context, cs, resp, 32 * 1024, session.binaryMode, session.useCompression) + } + }) +} + +export function sendResponse ( + ctx: MeasureContext, + session: Session, + socket: ConnectionSocket, + resp: Response +): void { + void handleSend(ctx, socket, resp, 32 * 1024, session.binaryMode, session.useCompression) +} + +function waitNextTick (): Promise | undefined { + return new Promise((resolve) => { + setImmediate(resolve) + }) +} +export async function handleSend ( + ctx: MeasureContext, + ws: ConnectionSocket, + msg: Response, + chunkLimit: number, + useBinary: boolean, + useCompression: boolean +): Promise { + // ws.send(msg) + if (Array.isArray(msg.result) && msg.result.length > 1 && chunkLimit > 0) { + // Split and send by chunks + const data = [...msg.result] + + let cid = 1 + const dataSize = JSON.stringify(data).length + const avg = Math.round(dataSize / data.length) + const itemChunk = Math.round(chunkLimit / avg) + 1 + + while (data.length > 0 && !ws.isClosed) { + let itemChunkCurrent = itemChunk + if (data.length - itemChunk < itemChunk / 2) { + itemChunkCurrent = data.length + } + const chunk: FindResult = toFindResult(data.splice(0, itemChunkCurrent)) + if (data.length === 0) { + const orig = msg.result as FindResult + chunk.total = orig.total ?? 0 + chunk.lookupMap = orig.lookupMap + } + if (chunk !== undefined) { + await ws.send( + ctx, + { ...msg, result: chunk, chunk: { index: cid, final: data.length === 0 } }, + useBinary, + useCompression + ) + } + cid++ + + if (data.length > 0 && !ws.isClosed) { + await waitNextTick() + } + } + } else { + await ws.send(ctx, msg, useBinary, useCompression) + } +} diff --git a/server/ws/uws.sh b/server/ws/uws.sh new file mode 100755 index 0000000000..c8d7445199 --- /dev/null +++ b/server/ws/uws.sh @@ -0,0 +1,8 @@ +if test -f ./src/server_u.ts; then + if ! test -f ./v20.43.0.zip; then + wget --quiet https://github.com/uNetworking/uWebSockets.js/archive/refs/tags/v20.43.0.zip + fi + if ! test -f ./src/uws/uws.js; then + unzip -qq -j ./v20.43.0.zip -d ./src/uws/ + fi +fi \ No newline at end of file diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml index 3e19cbe31b..c14bffab86 100644 --- a/tests/docker-compose.yaml +++ b/tests/docker-compose.yaml @@ -91,6 +91,7 @@ services: ports: - 3334:3334 environment: + - SERVER_PROVIDER=${SERVER_PROVIDER} - SERVER_PORT=3334 - SERVER_SECRET=secret - ELASTIC_URL=http://elastic:9200