Remove UWS (#6575)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-09-16 12:57:29 +07:00 committed by GitHub
parent 310f18c3ff
commit 8b8f385073
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 4 additions and 768 deletions

View File

@ -472,79 +472,6 @@ jobs:
with:
name: docker-logs-qms
path: ./qms-tests/sanity/logs
uitest-uweb:
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
filter: tree:0
- uses: actions/setup-node@v4
with:
node-version-file: '.nvmrc'
- name: Cache node modules
uses: actions/cache@v4
env:
cache-name: cache-node-platform
with:
path: |
common/temp
key: ${{ runner.os }}-build-${{ env.cache-name }}-${{ hashFiles('**/pnpm-lock.yaml') }}
restore-keys: |
${{ runner.os }}-build-${{ env.cache-name }}-${{ hashFiles('**/pnpm-lock.yaml') }}
- name: Checking for mis-matching dependencies...
run: node common/scripts/install-run-rush.js check
- name: Installing...
run: node common/scripts/install-run-rush.js install
- name: Docker Build
run: node common/scripts/install-run-rush.js docker:build -p 20
env:
DOCKER_CLI_HINTS: false
- name: Prepare server
env:
SERVER_PROVIDER: uweb
run: |
cd ./tests
./prepare.sh
- name: Install Playwright
run: |
cd ./tests/sanity
node ../../common/scripts/install-run-rushx.js ci
- name: Run UI tests
run: |
cd ./tests/sanity
node ../../common/scripts/install-run-rushx.js uitest
- name: 'Store docker logs'
if: always()
run: |
cd ./tests/sanity
mkdir logs
docker logs $(docker ps | grep transactor | cut -f 1 -d ' ') > logs/uweb-transactor.log
docker logs $(docker ps | grep account | cut -f 1 -d ' ') > logs/uweb-account.log
docker logs $(docker ps | grep front | cut -f 1 -d ' ') > logs/uweb-front.log
docker logs $(docker ps | grep collaborator | cut -f 1 -d ' ') > logs/uweb-collaborator.log
- name: Upload test results
if: always()
uses: actions/upload-artifact@v4
with:
name: playwright-results-uweb
path: ./tests/sanity/playwright-report/
- name: Upload Logs
if: always()
uses: actions/upload-artifact@v4
with:
name: docker-logs-uweb
path: ./tests/sanity/logs
# - name: Upload DB snapshot
# if: always()
# uses: actions/upload-artifact@v3
# with:
# name: db-snapshot
# path: ./tests/db_dump
docker-build:
needs: [build, test, svelte-check, uitest]
runs-on: ubuntu-latest

View File

@ -1049,9 +1049,6 @@ dependencies:
'@rush-temp/uploader-resources':
specifier: file:./projects/uploader-resources.tgz
version: file:projects/uploader-resources.tgz(@types/node@20.11.19)(esbuild@0.20.1)(postcss-load-config@4.0.2)(postcss@8.4.35)(ts-node@10.9.2)
'@rush-temp/uws':
specifier: file:./projects/uws.tgz
version: file:projects/uws.tgz
'@rush-temp/view':
specifier: file:./projects/view.tgz
version: file:projects/view.tgz(@types/node@20.11.19)(esbuild@0.20.1)(ts-node@10.9.2)
@ -30099,7 +30096,7 @@ packages:
dev: false
file:projects/pod-server.tgz:
resolution: {integrity: sha512-ZxtZx6xD8K6aE/+YNCmfUaRxZCU8LQdOEhhPY32sdCdD/JbTI0TiooPxFTqJ7ywmFt2P/QIUgGi+KbYqzjxOlQ==, tarball: file:projects/pod-server.tgz}
resolution: {integrity: sha512-whFq8Jk23rIMTrUXr4OjzFdc5zsKYr5mOSKwwVntT2JYVi1Wr05s4+Y2ulpcS7Sjvi0rLCRjd7PQNgDopmlM2A==, tarball: file:projects/pod-server.tgz}
name: '@rush-temp/pod-server'
version: 0.0.0
dependencies:
@ -33579,7 +33576,7 @@ packages:
dev: false
file:projects/server-ws.tgz(esbuild@0.20.1)(ts-node@10.9.2):
resolution: {integrity: sha512-YrBVAtr3TNiYfwgohue5M/bOosO2VQnlmkRC2t8YWix+eviRY+RLECpxlGmBqGdrIjgG3fLF+ZE5Qv01Uf9qzw==, tarball: file:projects/server-ws.tgz}
resolution: {integrity: sha512-TkAzjnl1QK2rx7iiItSZsZHPD2Evt+OViuZWwgwxm9cDKki7j3txpo7G21rkOsTMohXiS4FcA9fKkCiVFfvERw==, tarball: file:projects/server-ws.tgz}
id: file:projects/server-ws.tgz
name: '@rush-temp/server-ws'
version: 0.0.0
@ -35303,12 +35300,6 @@ packages:
- ts-node
dev: false
file:projects/uws.tgz:
resolution: {integrity: sha512-cJKrvv/9a935gnsLADANzT1ZcSe9j47NxbDaYX93htNBgTZRhke5VQ9zJ+hViufzd4u0aZNHRkdtnES7cPEHGw==, tarball: file:projects/uws.tgz}
name: '@rush-temp/uws'
version: 0.0.0
dev: false
file:projects/view-assets.tgz(esbuild@0.20.1)(ts-node@10.9.2):
resolution: {integrity: sha512-fUlJn33jw96g/rWFmQyp1LsAitTL2nt6T9N08Rp5S3FMggO3+e6HxUQIaQrKriCE/fRaf6SlJiBgH0W7kiNHMg==, tarball: file:projects/view-assets.tgz}
id: file:projects/view-assets.tgz

View File

@ -15,5 +15,4 @@ COPY bundle/bundle.js ./
COPY bundle/bundle.js.map ./
EXPOSE 8080
ENV UWS_HTTP_MAX_HEADERS_SIZE 32768
CMD node bundle.js

View File

@ -9,7 +9,7 @@
"license": "EPL-2.0",
"scripts": {
"start": "rush bundle --to @hcengineering/pod-server && cross-env NODE_ENV=production ELASTIC_INDEX_NAME=local_storage_index MODEL_VERSION=$(node ../../common/scripts/show_version.js) ACCOUNTS_URL=http://localhost:3000 REKONI_URL=http://localhost:4004 MONGO_URL=mongodb://localhost:27017 ELASTIC_URL=http://localhost:9200 FRONT_URL=http://localhost:8087 UPLOAD_URL=/upload MINIO_ENDPOINT=localhost MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin METRICS_CONSOLE=true SERVER_SECRET=secret OPERATION_PROFILING=false node bundle/bundle.js",
"start-u": "rush bundle --to @hcengineering/pod-server && cp ./node_modules/@hcengineering/uws/lib/*.node ./bundle/ && cross-env NODE_ENV=production SERVER_PROVIDER=uweb ELASTIC_INDEX_NAME=local_storage_index MODEL_VERSION=$(node ../../common/scripts/show_version.js) ACCOUNTS_URL=http://localhost:3000 REKONI_URL=http://localhost:4004 MONGO_URL=mongodb://localhost:27017 ELASTIC_URL=http://localhost:9200 FRONT_URL=http://localhost:8087 UPLOAD_URL=/upload MINIO_ENDPOINT=localhost MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin METRICS_CONSOLE=true SERVER_SECRET=secret node bundle/bundle.js",
"start-u": "rush bundle --to @hcengineering/pod-server && ./bundle/ && cross-env NODE_ENV=production SERVER_PROVIDER=uweb ELASTIC_INDEX_NAME=local_storage_index MODEL_VERSION=$(node ../../common/scripts/show_version.js) ACCOUNTS_URL=http://localhost:3000 REKONI_URL=http://localhost:4004 MONGO_URL=mongodb://localhost:27017 ELASTIC_URL=http://localhost:9200 FRONT_URL=http://localhost:8087 UPLOAD_URL=/upload MINIO_ENDPOINT=localhost MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin METRICS_CONSOLE=true SERVER_SECRET=secret node bundle/bundle.js",
"start-flame": "rush bundle --to @hcengineering/pod-server && cross-env NODE_ENV=production ELASTIC_INDEX_NAME=local_storage_index MODEL_VERSION=$(node ../../common/scripts/show_version.js) ACCOUNTS_URL=http://localhost:3000 REKONI_URL=http://localhost:4004 MONGO_URL=mongodb://localhost:27017 ELASTIC_URL=http://localhost:9200 FRONT_URL=http://localhost:8087 UPLOAD_URL=/upload MINIO_ENDPOINT=localhost MINIO_ACCESS_KEY=minioadmin MINIO_SECRET_KEY=minioadmin METRICS_CONSOLE=true SERVER_SECRET=secret clinic flame --dest ./out -- node --nolazy -r ts-node/register --enable-source-maps src/__start.ts",
"build": "compile",
"_phase:bundle": "rushx bundle",
@ -55,7 +55,6 @@
"@hcengineering/platform": "^0.6.11",
"@hcengineering/server-core": "^0.6.1",
"@hcengineering/server-ws": "^0.6.11",
"@hcengineering/uws": "^0.6.0",
"@hcengineering/server": "^0.6.4",
"@hcengineering/server-storage": "^0.6.0",
"@hcengineering/server-pipeline": "^0.6.0",

View File

@ -486,11 +486,6 @@
"projectFolder": "server/ws",
"shouldPublish": false
},
{
"packageName": "@hcengineering/uws",
"projectFolder": "server/uws",
"shouldPublish": false
},
{
"packageName": "@hcengineering/theme",
"projectFolder": "packages/theme",

View File

@ -1,5 +0,0 @@
{
"$schema": "https://developer.microsoft.com/json-schemas/rig-package/rig.schema.json",
"rigPackageName": "@hcengineering/platform-rig",
"rigProfile": "node"
}

View File

@ -1,28 +0,0 @@
{
"name": "@hcengineering/uws",
"version": "0.6.0",
"main": "lib/uws.js",
"types": "lib/index.d.ts",
"author": "Anticrm Platform Contributors",
"template": "@hcengineering/node-package-uws",
"license": "EPL-2.0",
"exports": {
"import": "./lib/ESM_wrapper.mjs",
"require": "./lib/uws.js",
"types": "./lib/index.d.ts"
},
"scripts": {
"build": "./uws.sh",
"build:watch": "./uws.sh",
"test": "echo 'test'",
"format": "echo 'format'",
"_phase:build": "rushx build",
"_phase:test": "echo 'test'",
"_phase:format": "echo 'format'",
"_phase:validate": "./uws.sh"
},
"devDependencies": {
"@hcengineering/platform-rig": "^0.6.0"
},
"dependencies": {}
}

View File

@ -1,10 +0,0 @@
#!/usr/bin/env bash
mkdir -p ./.build
cd ./.build
if ! test -f ./v20.47.0.zip; then
wget --quiet https://github.com/uNetworking/uWebSockets.js/archive/refs/tags/v20.47.0.zip
fi
if ! test -f ../lib/uws.js; then
unzip -qq -j -o ./v20.47.0.zip -d ../lib
fi

View File

@ -39,7 +39,6 @@
"@types/body-parser": "~1.19.2"
},
"dependencies": {
"@hcengineering/uws": "^0.6.0",
"@hcengineering/analytics": "^0.6.0",
"@hcengineering/core": "^0.6.32",
"@hcengineering/platform": "^0.6.11",

View File

@ -4,33 +4,5 @@ import { type ServerFactory } from './types'
* @public
*/
export const serverFactories: Record<string, ServerFactory> = {
ws: startHttpServer,
uweb: (sessions, handleRequest, ctx, pipelineFactory, port, enableCompression, accountsUrl, externalStorage) => {
try {
// eslint-disable-next-line @typescript-eslint/no-var-requires
const serverHttp = require('./server_u')
return serverHttp.startUWebsocketServer(
sessions,
handleRequest,
ctx,
pipelineFactory,
port,
enableCompression,
accountsUrl,
externalStorage
)
} catch (err: any) {
console.error('uwebsocket.js is not supported, switcg back to nodejs ws', err)
return startHttpServer(
sessions,
handleRequest,
ctx,
pipelineFactory,
port,
enableCompression,
accountsUrl,
externalStorage
)
}
}
ws: startHttpServer
}

View File

@ -1,603 +0,0 @@
//
// Copyright © 2023 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//
// uWebSockets.js
// Import should be added: "uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.23.0"
import { generateId, toWorkspaceString, type MeasureContext } from '@hcengineering/core'
import { decodeToken } from '@hcengineering/server-token'
import { Analytics } from '@hcengineering/analytics'
import { RPCHandler } from '@hcengineering/rpc'
import { getStatistics, wipeStatistics } from './stats'
import { LOGGING_ENABLED, type ConnectionSocket, type HandleRequestFunction, type SessionManager } from './types'
import { unknownStatus } from '@hcengineering/platform'
import { type PipelineFactory, type StorageAdapter } from '@hcengineering/server-core'
import uWebSockets, { DISABLED, SHARED_COMPRESSOR, type HttpResponse, type WebSocket } from '@hcengineering/uws'
import { Readable } from 'stream'
import { getFile, getFileRange, type BlobResponse } from './blobs'
import { doSessionOp, processRequest, type WebsocketData } from './utils'
const rpcHandler = new RPCHandler()
interface WebsocketUserData extends WebsocketData {
backPressure?: Promise<void>
backPressureResolve?: () => void
unsendMsg: { data: any, binary: boolean, compression: boolean }[]
}
/**
* @public
* @param port -
* @param host -
*/
export function startUWebsocketServer (
sessions: SessionManager,
handleRequest: HandleRequestFunction,
ctx: MeasureContext,
pipelineFactory: PipelineFactory,
port: number,
enableCompression: boolean,
accountsUrl: string,
externalStorage: StorageAdapter
): () => Promise<void> {
if (LOGGING_ENABLED) console.log(`starting U server on port ${port} ...`)
let profiling = false
const uAPP = uWebSockets.App()
const writeStatus = (response: HttpResponse, status: string): HttpResponse => {
return response
.writeStatus(status)
.writeHeader('Access-Control-Allow-Origin', '*')
.writeHeader('Access-Control-Allow-Methods', 'GET, OPTIONS, PUT')
.writeHeader('Access-Control-Allow-Headers', 'Content-Type')
}
uAPP
.ws<WebsocketUserData>('/*', {
/* There are many common helper features */
maxPayloadLength: 250 * 1024 * 1024,
compression: enableCompression ? SHARED_COMPRESSOR : DISABLED,
idleTimeout: 0,
maxLifetime: 0,
sendPingsAutomatically: true,
upgrade (res, req, context) {
const url = new URL('http://localhost' + (req.getUrl() ?? ''))
const token = url.pathname.substring(1)
try {
const payload = decodeToken(token ?? '')
/* You MUST copy data out of req here, as req is only valid within this immediate callback */
const url = req.getUrl()
const secWebSocketKey = req.getHeader('sec-websocket-key')
const secWebSocketProtocol = req.getHeader('sec-websocket-protocol')
const secWebSocketExtensions = req.getHeader('sec-websocket-extensions')
/* This immediately calls open handler, you must not use res after this call */
res.upgrade<WebsocketUserData>(
{
payload,
token,
session: undefined,
unsendMsg: [],
url
},
/* Spell these correctly */
secWebSocketKey,
secWebSocketProtocol,
secWebSocketExtensions,
context
)
} catch (err) {
if (LOGGING_ENABLED) console.error('invalid token', err)
writeStatus(res, '401 Unauthorized').end()
}
},
open: (ws: WebSocket<WebsocketUserData>) => {
const data = ws.getUserData()
const wrData = {
remoteAddress: ws.getRemoteAddressAsText() ?? '',
userAgent: '',
language: '',
email: data.payload.email,
mode: data.payload.extra?.mode,
model: data.payload.extra?.model
}
const cs = createWebSocketClientSocket(wrData, ws, data)
data.connectionSocket = cs
data.session = sessions.addSession(
ctx,
data.connectionSocket,
ws.getUserData().payload,
ws.getUserData().token,
pipelineFactory,
undefined,
accountsUrl
)
if (data.session instanceof Promise) {
void data.session.then((s) => {
if ('error' in s) {
cs.send(
ctx,
{ id: -1, error: unknownStatus(s.error.message ?? 'Unknown error'), terminate: s.terminate },
false,
false
)
setTimeout(() => {
cs.close()
}, 1000)
}
if ('upgrade' in s) {
cs.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false)
setTimeout(() => {
cs.close()
}, 5000)
}
})
}
},
message: (ws, message, isBinary) => {
const data = ws.getUserData()
const msg = Buffer.from(message)
doSessionOp(
data,
(s, msg) => {
processRequest(
s.session,
data.connectionSocket as ConnectionSocket,
s.context,
s.workspaceId,
msg,
handleRequest
)
},
msg
)
},
drain: (ws) => {
const data = ws.getUserData()
while (data.unsendMsg.length > 0) {
const sendResult = ws.send(data.unsendMsg[0].data, data.unsendMsg[0].binary, data.unsendMsg[0].compression)
if (sendResult !== 2) {
ctx.measure('send-data', data.unsendMsg[0].data.length)
data.unsendMsg.shift()
// Ok we drained one item, let's unhold send
data.backPressureResolve?.()
data.backPressure = undefined
} else {
// Wait for next drain.
return
}
}
data.backPressureResolve?.()
data.backPressure = undefined
},
close: (ws, code, message) => {
const data = ws.getUserData()
doSessionOp(
data,
(s) => {
if (!(s.session.workspaceClosed ?? false)) {
// remove session after 1seconds, give a time to reconnect.
void sessions.close(
ctx,
data.connectionSocket as ConnectionSocket,
toWorkspaceString(data.payload.workspace)
)
}
},
Buffer.from('')
)
}
})
.any('/api/v1/statistics', (response, request) => {
const getUsers = (): any => Array.from(sessions.sessions.entries()).map(([k, v]) => v.session.getUser())
const token = request.getQuery('token') ?? ''
try {
const payload = decodeToken(token ?? '')
const admin = payload.extra?.admin === 'true'
const json = JSON.stringify({
...getStatistics(ctx, sessions, admin),
users: getUsers,
admin,
profiling
})
writeStatus(response, '200 OK').writeHeader('Content-Type', 'application/json').end(json)
} catch (err: any) {
Analytics.handleError(err)
writeStatus(response, '404 ERROR').end()
}
})
.any('/api/v1/profiling', (response, request) => {
const token = request.getQuery('token') ?? ''
try {
decodeToken(token ?? '')
const json = JSON.stringify({
profiling
})
writeStatus(response, '200 OK').writeHeader('Content-Type', 'application/json').end(json)
} catch (err: any) {
Analytics.handleError(err)
writeStatus(response, '404 ERROR').end()
}
})
.any('/api/v1/version', (response, request) => {
try {
writeStatus(response, '200 OK')
.writeHeader('Content-Type', 'application/json')
.end(
JSON.stringify({
version: process.env.MODEL_VERSION
})
)
} catch (err: any) {
Analytics.handleError(err)
writeStatus(response, '404 ERROR').writeHeader('Content-Type', 'application/json').end()
}
})
.any('/api/v1/manage', (res, req) => {
try {
const token = req.getQuery('token') as string
const payload = decodeToken(token)
if (payload.extra?.admin !== 'true') {
writeStatus(res, '404 ERROR').writeHeader('Content-Type', 'application/json').end()
return
}
const operation = req.getQuery('operation')
switch (operation) {
case 'maintenance': {
const timeMinutes = parseInt(req.getQuery('timeout' as string) ?? '5')
sessions.scheduleMaintenance(timeMinutes)
writeStatus(res, '200 OK').end()
return
}
case 'profile-start': {
ctx.warn(
'---------------------------------------------PROFILING SESSION STARTED---------------------------------------------'
)
profiling = true
sessions.profiling?.start()
writeStatus(res, '200 OK').end()
return
}
case 'profile-stop': {
profiling = false
if (sessions.profiling?.stop != null) {
void sessions.profiling?.stop()?.then((profile) => {
ctx.warn(
'---------------------------------------------PROFILING SESSION STOPPED---------------------------------------------',
{ profile }
)
writeStatus(res, '200 OK')
.writeHeader('Content-Type', 'application/json')
.end(profile ?? '{ error: "no profiling" }')
})
} else {
writeStatus(res, '404 ERROR').end()
}
writeStatus(res, '200 OK').end()
return
}
case 'wipe-statistics': {
wipeStatistics(ctx)
writeStatus(res, '200 OK').end()
return
}
case 'force-close': {
const wsId = req.getQuery('wsId') as string
void sessions.forceClose(wsId)
writeStatus(res, '200 OK').end()
return
}
case 'reboot': {
process.exit(0)
}
}
writeStatus(res, '404 ERROR').end()
} catch (err: any) {
Analytics.handleError(err)
console.error(err)
writeStatus(res, '404 ERROR').end()
}
})
.get('/api/v1/blob/*', (res, req) => {
try {
const authHeader = req.getHeader('authorization')
if (authHeader === undefined) {
res.status(403).send({ error: 'Unauthorized' })
return
}
const payload = decodeToken(authHeader.split(' ')[1])
const name = req.getQuery('name') as string
const wrappedRes = wrapRes(res)
res.onAborted(() => {
wrappedRes.aborted = true
})
const range = req.getHeader('range')
if (range !== undefined) {
void ctx.with('file-range', { workspace: payload.workspace.name }, (ctx) =>
getFileRange(ctx, range, externalStorage, payload.workspace, name, wrappedRes)
)
} else {
void getFile(ctx, externalStorage, payload.workspace, name, wrappedRes)
}
} catch (err: any) {
Analytics.handleError(err)
writeStatus(res, '404 ERROR').end()
}
})
.put('/api/v1/blob/*', (res, req) => {
try {
const authHeader = req.getHeader('authorization')
if (authHeader === undefined) {
res.status(403).send({ error: 'Unauthorized' })
return
}
const payload = decodeToken(authHeader.split(' ')[1])
const name = req.getQuery('name') as string
const contentType = req.getQuery('contentType') as string
const size = parseInt((req.getQuery('size') as string) ?? '-1')
const pipe = pipeFromRequest(res)
void ctx
.with(
'storage upload',
{ workspace: payload.workspace.name },
() => externalStorage.put(ctx, payload.workspace, name, pipe, contentType, size !== -1 ? size : undefined),
{ file: name, contentType }
)
.then(() => {
res.cork(() => {
res.writeStatus('200 OK')
res.end()
})
})
} catch (err: any) {
Analytics.handleError(err)
console.error(err)
writeStatus(res, '404 ERROR').end()
}
})
.any('/*', (res, req) => {
writeStatus(res, '404 ERROR').end()
})
.listen(port, (s) => {})
return async () => {
await sessions.closeWorkspaces(ctx)
}
}
function createWebSocketClientSocket (
wrData: {
remoteAddress: ArrayBuffer
userAgent: string
language: string
email: string
mode: any
model: any
},
ws: uWebSockets.WebSocket<WebsocketUserData>,
data: WebsocketUserData
): ConnectionSocket {
const cs: ConnectionSocket = {
id: generateId(),
isClosed: false,
data: () => wrData,
close: () => {
cs.isClosed = true
try {
ws.close()
} catch (err) {
// Ignore closed
}
},
readRequest: (buffer: Buffer, binary: boolean) => {
return rpcHandler.readRequest(buffer, binary)
},
send: (ctx, msg, binary, compression) => {
void (async (): Promise<void> => {
if (data.backPressure !== undefined) {
await data.backPressure
}
const serialized = rpcHandler.serialize(msg, binary)
try {
const sendR = ws.send(serialized, binary, compression)
if (sendR === 2) {
data.backPressure = new Promise((resolve) => {
data.backPressureResolve = resolve
})
data.unsendMsg.push({ data: serialized, binary, compression })
} else {
ctx.measure('send-data', serialized.length)
}
} catch (err: any) {
if (!((err.message ?? '') as string).includes('Invalid access of closed')) {
console.error(err)
}
// Ignore socket is closed
}
})()
}
}
return cs
}
/* Helper function converting Node.js buffer to ArrayBuffer */
function toArrayBuffer (buffer: Buffer): ArrayBufferLike {
return buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength)
}
/* Either onAborted or simply finished request */
function onAbortedOrFinishedResponse (res: HttpResponse, readStream: Readable): void {
if (res.id === -1) {
console.log('ERROR! onAbortedOrFinishedResponse called twice for the same res!')
} else {
readStream.destroy()
}
/* Mark this response already accounted for */
res.id = -1
}
function pipeFromRequest (res: HttpResponse): Readable {
const readable = new Readable()
readable._read = () => {}
res.onAborted(() => {
readable.push(null)
})
res.onData((ab, isLast) => {
const chunk = Buffer.copyBytesFrom(Buffer.from(ab))
readable.push(chunk)
if (isLast) {
readable.push(null)
}
})
return readable
}
function pipeStreamOverResponse (
res: HttpResponse,
readStream: Readable,
totalSize: number,
checkAborted: () => boolean
): void {
readStream
.on('data', (chunk) => {
if (checkAborted()) {
readStream.destroy()
return
}
const ab = toArrayBuffer(chunk)
const lastOffset = res.getWriteOffset()
res.cork(() => {
const [ok, done] = res.tryEnd(ab, totalSize)
if (done) {
onAbortedOrFinishedResponse(res, readStream)
} else if (!ok) {
readStream.pause()
res.ab = ab
res.abOffset = lastOffset
res.onWritable((offset) => {
const [ok, done] = res.tryEnd(res.ab.slice(offset - res.abOffset), totalSize)
if (done) {
onAbortedOrFinishedResponse(res, readStream)
} else if (ok) {
readStream.resume()
}
return ok
})
}
})
})
.on('error', (err) => {
Analytics.handleError(err)
res.close()
})
/* If you plan to asyncronously respond later on, you MUST listen to onAborted BEFORE returning */
res.onAborted(() => {
onAbortedOrFinishedResponse(res, readStream)
})
}
function wrapRes (res: HttpResponse): BlobResponse {
const result: BlobResponse = {
aborted: false,
cork: (cb: () => void) => {
if (result.aborted || res.id === -1) {
cb()
return
}
res.cork(cb)
},
end: () => {
if (result.aborted || res.id === -1) {
return
}
res.end()
},
status: (code) => {
if (result.aborted || res.id === -1) {
return
}
switch (code) {
case 200:
res.writeStatus(`${code} OK`)
break
case 206:
res.writeStatus(`${code} Partial Content`)
break
case 304:
res.writeStatus(`${code} Not Modified`)
break
case 400:
res.writeStatus(`${code} Bad Request`)
break
case 404:
res.writeStatus(`${code} Not Found`)
break
case 416:
res.writeStatus(`${code} Range Not Satisfiable`)
break
default:
res.writeStatus(`${code} ERROR`)
break
}
},
pipeFrom: (readable, size) => {
if (result.aborted || res.id === -1) {
return
}
pipeStreamOverResponse(res, readable, size, () => result.aborted)
},
writeHead: (code, header) => {
if (result.aborted || res.id === -1) {
return
}
result.status(code)
for (const [k, v] of Object.entries(header)) {
res.writeHeader(k, `${v}`)
}
}
}
return result
}