UBERF-6712: Rework connection logic (#5455)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-04-25 22:15:11 +07:00 committed by GitHub
parent 96d4b0ec23
commit 1d95c39d62
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
37 changed files with 909 additions and 693 deletions

View File

@ -92,7 +92,10 @@ export async function benchmark (
os.cpus().forEach(() => {
/* Spawn a new thread running this source file */
const worker = new Worker(__filename)
console.error('__filename', __filename)
const worker = new Worker(__filename, {
argv: ['benchmarkWorker']
})
workers.push(worker)
worker.on('message', (data: Msg) => {
if (data === undefined) {
@ -319,89 +322,91 @@ function randNum (value = 2): number {
return Math.round(Math.random() * value) % value
}
if (!isMainThread) {
parentPort?.on('message', (msg: StartMessage) => {
console.log('starting worker', msg.workId)
void perform(msg)
})
}
async function perform (msg: StartMessage): Promise<void> {
let connection: Client | undefined
try {
setMetadata(client.metadata.UseBinaryProtocol, msg.binary)
setMetadata(client.metadata.UseProtocolCompression, msg.compression)
console.log('connecting to', msg.workspaceId)
connection = await connect(msg.transactorUrl, msg.workspaceId, undefined)
const opt = new TxOperations(connection, (core.account.System + '_benchmark') as Ref<Account>)
parentPort?.postMessage({
type: 'operate',
workId: msg.workId
export function benchmarkWorker (): void {
if (!isMainThread) {
parentPort?.on('message', (msg: StartMessage) => {
console.log('starting worker', msg.workId)
void perform(msg)
})
}
const h = connection.getHierarchy()
const allClasses = await connection.getModel().findAll(core.class.Class, {})
const classes = allClasses.filter((it) => it.kind === ClassifierKind.CLASS && h.findDomain(it._id) !== undefined)
while (msg.options.readRequests + msg.options.modelRequests > 0) {
if (msg.options.modelRequests > 0) {
await connection?.findAll(core.class.Tx, {}, { sort: { _id: -1 } })
msg.options.modelRequests--
}
let doc: Doc | undefined
if (msg.options.readRequests > 0) {
const cl = classes[randNum(classes.length - 1)]
if (cl !== undefined) {
const docs = await connection?.findAll(
cl._id,
{},
{
sort: { _id: -1 },
limit: msg.options.limit.min + randNum(msg.options.limit.rand)
}
)
if (docs.length > 0) {
doc = docs[randNum(docs.length - 1)]
}
msg.options.readRequests--
async function perform (msg: StartMessage): Promise<void> {
let connection: Client | undefined
try {
setMetadata(client.metadata.UseBinaryProtocol, msg.binary)
setMetadata(client.metadata.UseProtocolCompression, msg.compression)
console.log('connecting to', msg.workspaceId)
connection = await connect(msg.transactorUrl, msg.workspaceId, undefined)
const opt = new TxOperations(connection, (core.account.System + '_benchmark') as Ref<Account>)
parentPort?.postMessage({
type: 'operate',
workId: msg.workId
})
const h = connection.getHierarchy()
const allClasses = await connection.getModel().findAll(core.class.Class, {})
const classes = allClasses.filter((it) => it.kind === ClassifierKind.CLASS && h.findDomain(it._id) !== undefined)
while (msg.options.readRequests + msg.options.modelRequests > 0) {
if (msg.options.modelRequests > 0) {
await connection?.findAll(core.class.Tx, {}, { sort: { _id: -1 } })
msg.options.modelRequests--
}
if (msg.options.write && doc !== undefined) {
const attrs = connection.getHierarchy().getAllAttributes(doc._class)
const upd: DocumentUpdate<Doc> = {}
for (const [key, value] of attrs.entries()) {
if (value.type._class === core.class.TypeString || value.type._class === core.class.TypeBoolean) {
if (
key !== '_id' &&
key !== '_class' &&
key !== 'space' &&
key !== 'attachedTo' &&
key !== 'attachedToClass'
) {
const v = (doc as any)[key]
if (v != null) {
;(upd as any)[key] = v
let doc: Doc | undefined
if (msg.options.readRequests > 0) {
const cl = classes[randNum(classes.length - 1)]
if (cl !== undefined) {
const docs = await connection?.findAll(
cl._id,
{},
{
sort: { _id: -1 },
limit: msg.options.limit.min + randNum(msg.options.limit.rand)
}
)
if (docs.length > 0) {
doc = docs[randNum(docs.length - 1)]
}
msg.options.readRequests--
}
if (msg.options.write && doc !== undefined) {
const attrs = connection.getHierarchy().getAllAttributes(doc._class)
const upd: DocumentUpdate<Doc> = {}
for (const [key, value] of attrs.entries()) {
if (value.type._class === core.class.TypeString || value.type._class === core.class.TypeBoolean) {
if (
key !== '_id' &&
key !== '_class' &&
key !== 'space' &&
key !== 'attachedTo' &&
key !== 'attachedToClass'
) {
const v = (doc as any)[key]
if (v != null) {
;(upd as any)[key] = v
}
}
}
}
}
if (Object.keys(upd).length > 0) {
await opt.update(doc, upd)
if (Object.keys(upd).length > 0) {
await opt.update(doc, upd)
}
}
}
if (msg.options.sleep > 0) {
await new Promise((resolve) => setTimeout(resolve, randNum(msg.options.sleep)))
}
}
if (msg.options.sleep > 0) {
await new Promise((resolve) => setTimeout(resolve, randNum(msg.options.sleep)))
}
//
// console.log(`${msg.idd} perform complete`)
} catch (err: any) {
console.error(msg.workspaceId, err)
} finally {
await connection?.close()
}
//
// console.log(`${msg.idd} perform complete`)
} catch (err: any) {
console.error(msg.workspaceId, err)
} finally {
await connection?.close()
parentPort?.postMessage({
type: 'complete',
workId: msg.workId
})
}
parentPort?.postMessage({
type: 'complete',
workId: msg.workId
})
}

View File

@ -66,7 +66,7 @@ import { getMongoClient, getWorkspaceDB } from '@hcengineering/mongo'
import { openAIConfigDefaults } from '@hcengineering/openai'
import { type StorageAdapter } from '@hcengineering/server-core'
import { deepEqual } from 'fast-equals'
import { benchmark } from './benchmark'
import { benchmark, benchmarkWorker } from './benchmark'
import {
cleanArchivedSpaces,
cleanRemovedTransactions,
@ -332,25 +332,36 @@ export function devTool (
.option('-p|--parallel <parallel>', 'Parallel upgrade', '0')
.option('-l|--logs <logs>', 'Default logs folder', './logs')
.option('-r|--retry <retry>', 'Number of apply retries', '0')
.option('-i|--ignore [ignore]', 'Ignore workspaces', '')
.option(
'-c|--console',
'Display all information into console(default will create logs folder with {workspace}.log files',
false
)
.option('-f|--force [force]', 'Force update', false)
.action(async (cmd: { parallel: string, logs: string, retry: string, force: boolean, console: boolean }) => {
const { mongodbUri, version, txes, migrateOperations } = prepareTools()
await withDatabase(mongodbUri, async (db, client) => {
const worker = new UpgradeWorker(db, client, version, txes, migrateOperations, productId)
await worker.upgradeAll(toolCtx, {
errorHandler: async (ws, err) => {},
force: cmd.force,
console: cmd.console,
logs: cmd.logs,
parallel: parseInt(cmd.parallel ?? '1')
.action(
async (cmd: {
parallel: string
logs: string
retry: string
force: boolean
console: boolean
ignore: string
}) => {
const { mongodbUri, version, txes, migrateOperations } = prepareTools()
await withDatabase(mongodbUri, async (db, client) => {
const worker = new UpgradeWorker(db, client, version, txes, migrateOperations, productId)
await worker.upgradeAll(toolCtx, {
errorHandler: async (ws, err) => {},
force: cmd.force,
console: cmd.console,
logs: cmd.logs,
parallel: parseInt(cmd.parallel ?? '1'),
ignore: cmd.ignore
})
})
})
})
}
)
program
.command('remove-unused-workspaces')
@ -826,6 +837,13 @@ export function devTool (
)
}
)
program
.command('benchmarkWorker')
.description('benchmarkWorker')
.action(async (cmd: any) => {
console.log(JSON.stringify(cmd))
benchmarkWorker()
})
program
.command('fix-skills <workspace> <step>')

View File

@ -522,6 +522,7 @@ function pluginFilterTx (
): Tx[] {
const stx = toIdMap(systemTx)
const totalExcluded = new Set<Ref<Tx>>()
let msg = ''
for (const a of excludedPlugins) {
for (const c of configs.values()) {
if (a.pluginId === c.pluginId) {
@ -543,10 +544,11 @@ function pluginFilterTx (
totalExcluded.add(id as Ref<Tx>)
}
}
console.log('exclude plugin', c.pluginId, c.transactions.length)
msg += ` ${c.pluginId}:${c.transactions.length}`
}
}
}
console.log('exclude plugin', msg)
systemTx = systemTx.filter((t) => !totalExcluded.has(t._id))
return systemTx
}

View File

@ -18,7 +18,8 @@ export class MeasureMetricsContext implements MeasureContext {
params: ParamsType,
fullParams: FullParamsType = {},
metrics: Metrics = newMetrics(),
logger?: MeasureLogger
logger?: MeasureLogger,
readonly parent?: MeasureContext
) {
this.name = name
this.params = params
@ -43,13 +44,16 @@ export class MeasureMetricsContext implements MeasureContext {
error: (msg, args) => {
console.error(msg, ...Object.entries(args ?? {}).map((it) => `${it[0]}=${JSON.stringify(replacer(it[1]))}`))
},
warn: (msg, args) => {
console.warn(msg, ...Object.entries(args ?? {}).map((it) => `${it[0]}=${JSON.stringify(replacer(it[1]))}`))
},
close: async () => {},
logOperation: (operation, time, params) => {}
}
}
measure (name: string, value: number): void {
const c = new MeasureMetricsContext('#' + name, {}, {}, childMetrics(this.metrics, ['#' + name]), this.logger)
const c = new MeasureMetricsContext('#' + name, {}, {}, childMetrics(this.metrics, ['#' + name]), this.logger, this)
c.done(value)
}
@ -59,7 +63,8 @@ export class MeasureMetricsContext implements MeasureContext {
params,
fullParams ?? {},
childMetrics(this.metrics, [name]),
logger ?? this.logger
logger ?? this.logger,
this
)
}
@ -78,7 +83,7 @@ export class MeasureMetricsContext implements MeasureContext {
c.end()
return value
} catch (err: any) {
await c.error('Error during:' + name, { err })
c.error('Error during:' + name, { err })
throw err
}
}
@ -95,15 +100,37 @@ export class MeasureMetricsContext implements MeasureContext {
return r
}
async error (message: string, args?: Record<string, any>): Promise<void> {
error (message: string, args?: Record<string, any>): void {
this.logger.error(message, { ...this.params, ...args })
}
async info (message: string, args?: Record<string, any>): Promise<void> {
info (message: string, args?: Record<string, any>): void {
this.logger.info(message, { ...this.params, ...args })
}
warn (message: string, args?: Record<string, any>): void {
this.logger.warn(message, { ...this.params, ...args })
}
end (): void {
this.done()
}
}
/**
* Allow to use decorator for context enabled functions
*/
export function withContext (name: string, params: ParamsType = {}): any {
return (target: any, propertyKey: string, descriptor: PropertyDescriptor): PropertyDescriptor => {
const originalMethod = descriptor.value
descriptor.value = async function (...args: any[]): Promise<any> {
const ctx = args[0] as MeasureContext
return await ctx.with(
name,
params,
async (ctx) => await (originalMethod.apply(this, [ctx, ...args.slice(1)]) as Promise<any>)
)
}
return descriptor
}
}

View File

@ -40,6 +40,8 @@ export interface MeasureLogger {
info: (message: string, obj?: Record<string, any>) => void
error: (message: string, obj?: Record<string, any>) => void
warn: (message: string, obj?: Record<string, any>) => void
logOperation: (operation: string, time: number, params: ParamsType) => void
childLogger?: (name: string, params: Record<string, any>) => MeasureLogger
@ -69,11 +71,14 @@ export interface MeasureContext {
logger: MeasureLogger
parent?: MeasureContext
measure: (name: string, value: number) => void
// Capture error
error: (message: string, obj?: Record<string, any>) => Promise<void>
info: (message: string, obj?: Record<string, any>) => Promise<void>
error: (message: string, obj?: Record<string, any>) => void
info: (message: string, obj?: Record<string, any>) => void
warn: (message: string, obj?: Record<string, any>) => void
// Mark current context as complete
// If no value is passed, time difference will be used.

View File

@ -299,7 +299,7 @@ export class ModelDb extends MemDb {
if (doc !== undefined) {
TxProcessor.updateDoc2Doc(doc, cud)
} else {
void ctx.error('no document found, failed to apply model transaction, skipping', {
ctx.error('no document found, failed to apply model transaction, skipping', {
_id: tx._id,
_class: tx._class,
objectId: cud.objectId
@ -311,7 +311,7 @@ export class ModelDb extends MemDb {
try {
this.delDoc((tx as TxRemoveDoc<Doc>).objectId)
} catch (err: any) {
void ctx.error('no document found, failed to apply model transaction, skipping', {
ctx.error('no document found, failed to apply model transaction, skipping', {
_id: tx._id,
_class: tx._class,
objectId: (tx as TxRemoveDoc<Doc>).objectId
@ -324,7 +324,7 @@ export class ModelDb extends MemDb {
if (obj !== undefined) {
TxProcessor.updateMixin4Doc(obj, mix)
} else {
void ctx.error('no document found, failed to apply model transaction, skipping', {
ctx.error('no document found, failed to apply model transaction, skipping', {
_id: tx._id,
_class: tx._class,
objectId: mix.objectId

View File

@ -50,9 +50,11 @@ import { HelloRequest, HelloResponse, ReqId, readResponse, serialize } from '@hc
const SECOND = 1000
const pingTimeout = 10 * SECOND
const hangTimeout = 5 * 60 * SECOND
const dialTimeout = 60 * SECOND
const dialTimeout = 30 * SECOND
class RequestPromise {
startTime: number = Date.now()
handleTime?: (diff: number, result: any, serverTime: number, queue: number) => void
readonly promise: Promise<any>
resolve!: (value?: any) => void
reject!: (reason?: any) => void
@ -72,7 +74,7 @@ class RequestPromise {
}
class Connection implements ClientConnection {
private websocket: ClientSocket | Promise<ClientSocket> | null = null
private websocket: ClientSocket | null = null
private readonly requests = new Map<ReqId, RequestPromise>()
private lastId = 0
private interval: number | undefined
@ -93,42 +95,43 @@ class Connection implements ClientConnection {
readonly onConnect?: (event: ClientConnectEvent, data?: any) => Promise<void>
) {}
private schedulePing (): void {
private schedulePing (socketId: number): void {
clearInterval(this.interval)
this.pingResponse = Date.now()
this.interval = setInterval(() => {
const wsocket = this.websocket
const interval = setInterval(() => {
if (wsocket !== this.websocket) {
clearInterval(interval)
return
}
if (!this.upgrading && this.pingResponse !== 0 && Date.now() - this.pingResponse > hangTimeout) {
// No ping response from server.
const s = this.websocket
if (!(s instanceof Promise)) {
console.log(
'no ping response from server. Closing socket.',
this.workspace,
this.email,
s,
(s as any)?.readyState
)
// Trying to close connection and re-establish it.
s?.close(1000)
} else {
console.log('no ping response from server. Closing socket.', this.workspace, this.email, s)
void s.then((s) => {
s.close(1000)
})
if (this.websocket !== null) {
console.log('no ping response from server. Closing socket.', socketId, this.workspace, this.email)
clearInterval(this.interval)
this.websocket.close(1000)
return
}
this.websocket = null
}
if (!this.closed) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
void this.sendRequest({ method: 'ping', params: [] }).then((result) => {
this.pingResponse = Date.now()
void this.sendRequest({
method: 'ping',
params: [],
once: true,
handleResult: async (result) => {
if (this.websocket === wsocket) {
this.pingResponse = Date.now()
}
}
})
} else {
clearInterval(this.interval)
}
}, pingTimeout)
this.interval = interval
}
async close (): Promise<void> {
@ -147,257 +150,284 @@ class Connection implements ClientConnection {
}
delay = 0
pending: Promise<ClientSocket> | undefined
onConnectHandlers: (() => void)[] = []
private async waitOpenConnection (): Promise<ClientSocket> {
while (true) {
try {
const socket = await this.pending
if (socket != null && socket.readyState === ClientSocketReadyState.OPEN) {
return socket
}
this.pending = this.openConnection()
await this.pending
this.delay = 0
return await this.pending
} catch (err: any) {
if (this.closed) {
throw new Error('connection closed + ' + this.workspace + ' user: ' + this.email)
}
this.pending = undefined
if (!this.upgrading) {
console.log('connection: failed to connect', `requests: ${this.lastId}`, this.workspace, this.email)
} else {
console.log('connection: workspace during upgrade', `requests: ${this.lastId}`, this.workspace, this.email)
}
if (err?.code === UNAUTHORIZED.code) {
Analytics.handleError(err)
this.onUnauthorized?.()
throw err
}
await new Promise((resolve) => {
setTimeout(() => {
if (!this.upgrading) {
console.log(`delay ${this.delay} second`, this.workspace, this.email)
}
resolve(null)
if (this.delay < 5) {
this.delay++
}
}, this.delay * SECOND)
})
}
private waitOpenConnection (): Promise<void> | undefined {
if (this.websocket != null && this.websocket.readyState === ClientSocketReadyState.OPEN) {
return undefined
}
return new Promise((resolve) => {
this.onConnectHandlers.push(() => {
resolve()
})
// Websocket is null for first time
this.scheduleOpen(false)
})
}
sockets = 0
incomingTimer: any
private openConnection (): Promise<ClientSocket> {
return new Promise((resolve, reject) => {
// Use defined factory or browser default one.
const clientSocketFactory =
getMetadata(client.metadata.ClientSocketFactory) ??
((url: string) => {
const s = new WebSocket(url)
s.binaryType = 'arraybuffer'
return s as ClientSocket
})
openAction: any
if (this.sessionId === undefined) {
// Find local session id in session storage.
this.sessionId =
typeof sessionStorage !== 'undefined'
? sessionStorage.getItem('session.id.' + this.url) ?? undefined
: undefined
this.sessionId = this.sessionId ?? generateId()
if (typeof sessionStorage !== 'undefined') {
sessionStorage.setItem('session.id.' + this.url, this.sessionId)
}
scheduleOpen (force: boolean): void {
if (force) {
if (this.websocket !== null) {
this.websocket.close()
this.websocket = null
}
const websocket = clientSocketFactory(this.url + `?sessionId=${this.sessionId}`)
const opened = false
const socketId = this.sockets++
let binaryResponse = false
const dialTimer = setTimeout(() => {
if (!opened) {
websocket.close()
reject(new PlatformError(unknownError('timeout')))
}
}, dialTimeout)
websocket.onmessage = (event: MessageEvent) => {
const resp = readResponse<any>(event.data, binaryResponse)
if (resp.id === -1) {
if (resp.result?.state === 'upgrading') {
void this.onConnect?.(ClientConnectEvent.Maintenance, resp.result.stats)
this.upgrading = true
return
}
if (resp.result === 'hello') {
if (this.upgrading) {
// We need to call upgrade since connection is upgraded
this.onUpgrade?.()
}
console.log('connection established', this.workspace, this.email)
this.upgrading = false
if ((resp as HelloResponse).alreadyConnected === true) {
this.sessionId = generateId()
if (typeof sessionStorage !== 'undefined') {
sessionStorage.setItem('session.id.' + this.url, this.sessionId)
}
reject(new Error('alreadyConnected'))
}
if ((resp as HelloResponse).binary) {
binaryResponse = true
}
if (resp.error !== undefined) {
reject(resp.error)
return
}
for (const [, v] of this.requests.entries()) {
v.reconnect?.()
}
resolve(websocket)
void this.onConnect?.(
(resp as HelloResponse).reconnect === true ? ClientConnectEvent.Reconnected : ClientConnectEvent.Connected
)
return
} else {
Analytics.handleError(new Error(`unexpected response: ${JSON.stringify(resp)}`))
}
return
}
if (resp.result === 'ping') {
void this.sendRequest({ method: 'ping', params: [] })
return
}
if (resp.id !== undefined) {
const promise = this.requests.get(resp.id)
if (promise === undefined) {
throw new Error(`unknown response id: ${resp.id as string} ${this.workspace} ${this.email}`)
}
if (resp.chunk !== undefined) {
promise.chunks = [
...(promise.chunks ?? []),
{
index: resp.chunk.index,
data: resp.result as []
}
]
// console.log(socketId, 'chunk', promise.chunks.length, resp.chunk.total)
if (resp.chunk.final) {
promise.chunks.sort((a, b) => a.index - b.index)
let result: any[] = []
for (const c of promise.chunks) {
result = result.concat(c.data)
}
resp.result = result
resp.chunk = undefined
} else {
// Not all chunks are available yet.
return
}
}
const request = this.requests.get(resp.id)
this.requests.delete(resp.id)
if (resp.error !== undefined) {
console.log(
'ERROR',
'request:',
request?.method,
'response-id:',
resp.id,
'error: ',
resp.error,
'result: ',
resp.result,
this.workspace,
this.email
)
promise.reject(new PlatformError(resp.error))
} else {
if (request?.handleResult !== undefined) {
void request.handleResult(resp.result).then(() => {
promise.resolve(resp.result)
})
} else {
promise.resolve(resp.result)
}
}
void broadcastEvent(client.event.NetworkRequests, this.requests.size)
clearTimeout(this.openAction)
this.openAction = undefined
}
if (!this.closed && this.openAction === undefined) {
if (this.websocket === null) {
const socketId = ++this.sockets
// Re create socket in case of error, if not closed
if (this.delay === 0) {
this.openConnection(socketId)
} else {
const txArr = Array.isArray(resp.result) ? (resp.result as Tx[]) : [resp.result as Tx]
this.openAction = setTimeout(() => {
this.openAction = undefined
this.openConnection(socketId)
}, this.delay * 1000)
}
}
}
}
for (const tx of txArr) {
if (
(tx?._class === core.class.TxWorkspaceEvent &&
(tx as TxWorkspaceEvent).event === WorkspaceEvent.Upgrade) ||
tx?._class === core.class.TxModelUpgrade
) {
console.log('Processing upgrade', this.workspace, this.email)
websocket.send(
serialize(
{
method: '#upgrading',
params: [],
id: -1
},
false
)
)
this.onUpgrade?.()
return
}
private openConnection (socketId: number): void {
// Use defined factory or browser default one.
const clientSocketFactory =
getMetadata(client.metadata.ClientSocketFactory) ??
((url: string) => {
const s = new WebSocket(url)
s.binaryType = 'arraybuffer'
return s as ClientSocket
})
if (this.sessionId === undefined) {
// Find local session id in session storage.
this.sessionId =
typeof sessionStorage !== 'undefined'
? sessionStorage.getItem('session.id.' + this.url) ?? undefined
: undefined
this.sessionId = this.sessionId ?? generateId()
if (typeof sessionStorage !== 'undefined') {
sessionStorage.setItem('session.id.' + this.url, this.sessionId)
}
}
if (socketId !== this.sockets) {
return
}
const wsocket = clientSocketFactory(this.url + `?sessionId=${this.sessionId}`)
if (socketId !== this.sockets) {
wsocket.close()
return
}
this.websocket = wsocket
const opened = false
let binaryResponse = false
const dialTimer = setTimeout(() => {
if (!opened && !this.closed) {
this.scheduleOpen(true)
}
}, dialTimeout)
wsocket.onmessage = (event: MessageEvent) => {
if (this.websocket !== wsocket) {
return
}
const resp = readResponse<any>(event.data, binaryResponse)
if (resp.error !== undefined) {
if (resp.error?.code === UNAUTHORIZED.code) {
Analytics.handleError(new PlatformError(resp.error))
this.closed = true
this.websocket.close()
this.onUnauthorized?.()
}
console.error(resp.error)
return
}
if (resp.id === -1) {
this.delay = 0
if (resp.result?.state === 'upgrading') {
void this.onConnect?.(ClientConnectEvent.Maintenance, resp.result.stats)
this.upgrading = true
this.delay = 3
return
}
if (resp.result === 'hello') {
if (this.upgrading) {
// We need to call upgrade since connection is upgraded
this.onUpgrade?.()
}
this.handler(...txArr)
clearTimeout(this.incomingTimer)
void broadcastEvent(client.event.NetworkRequests, this.requests.size + 1)
this.upgrading = false
if ((resp as HelloResponse).alreadyConnected === true) {
this.sessionId = generateId()
if (typeof sessionStorage !== 'undefined') {
sessionStorage.setItem('session.id.' + this.url, this.sessionId)
}
console.log('Connection: alreadyConnected, reconnect with new Id')
clearTimeout(dialTimeout)
this.scheduleOpen(true)
return
}
if ((resp as HelloResponse).binary) {
binaryResponse = true
}
// Notify all waiting connection listeners
const handlers = this.onConnectHandlers.splice(0, this.onConnectHandlers.length)
for (const h of handlers) {
h()
}
this.incomingTimer = setTimeout(() => {
void broadcastEvent(client.event.NetworkRequests, this.requests.size)
}, 500)
for (const [, v] of this.requests.entries()) {
v.reconnect?.()
}
void this.onConnect?.(
(resp as HelloResponse).reconnect === true ? ClientConnectEvent.Reconnected : ClientConnectEvent.Connected
)
this.schedulePing(socketId)
return
} else {
Analytics.handleError(new Error(`unexpected response: ${JSON.stringify(resp)}`))
}
return
}
websocket.onclose = (ev) => {
// console.log('client websocket closed', socketId, ev?.reason)
if (!(this.websocket instanceof Promise)) {
this.websocket = null
if (resp.result === 'ping') {
void this.sendRequest({ method: 'ping', params: [] })
return
}
if (resp.id !== undefined) {
const promise = this.requests.get(resp.id)
if (promise === undefined) {
throw new Error(`unknown response id: ${resp.id as string} ${this.workspace} ${this.email}`)
}
void broadcastEvent(client.event.NetworkRequests, -1)
reject(new Error('websocket error'))
if (resp.chunk !== undefined) {
promise.chunks = [
...(promise.chunks ?? []),
{
index: resp.chunk.index,
data: resp.result as []
}
]
// console.log(socketId, 'chunk', promise.chunks.length, resp.chunk.total)
if (resp.chunk.final) {
promise.chunks.sort((a, b) => a.index - b.index)
let result: any[] = []
for (const c of promise.chunks) {
result = result.concat(c.data)
}
resp.result = result
resp.chunk = undefined
} else {
// Not all chunks are available yet.
return
}
}
const request = this.requests.get(resp.id)
promise.handleTime?.(Date.now() - promise.startTime, resp.result, resp.time ?? 0, resp.queue ?? 0)
this.requests.delete(resp.id)
if (resp.error !== undefined) {
console.log(
'ERROR',
'request:',
request?.method,
'response-id:',
resp.id,
'error: ',
resp.error,
'result: ',
resp.result,
this.workspace,
this.email
)
promise.reject(new PlatformError(resp.error))
} else {
if (request?.handleResult !== undefined) {
void request.handleResult(resp.result).then(() => {
promise.resolve(resp.result)
})
} else {
promise.resolve(resp.result)
}
}
void broadcastEvent(client.event.NetworkRequests, this.requests.size)
} else {
const txArr = Array.isArray(resp.result) ? (resp.result as Tx[]) : [resp.result as Tx]
for (const tx of txArr) {
if (
(tx?._class === core.class.TxWorkspaceEvent && (tx as TxWorkspaceEvent).event === WorkspaceEvent.Upgrade) ||
tx?._class === core.class.TxModelUpgrade
) {
console.log('Processing upgrade', this.workspace, this.email)
this.onUpgrade?.()
return
}
}
this.handler(...txArr)
clearTimeout(this.incomingTimer)
void broadcastEvent(client.event.NetworkRequests, this.requests.size + 1)
this.incomingTimer = setTimeout(() => {
void broadcastEvent(client.event.NetworkRequests, this.requests.size)
}, 500)
}
websocket.onopen = () => {
const useBinary = getMetadata(client.metadata.UseBinaryProtocol) ?? true
const useCompression = getMetadata(client.metadata.UseProtocolCompression) ?? false
}
wsocket.onclose = (ev) => {
clearTimeout(dialTimer)
if (this.websocket !== wsocket) {
wsocket.close()
clearTimeout(dialTimer)
const helloRequest: HelloRequest = {
method: 'hello',
params: [],
id: -1,
binary: useBinary,
compression: useCompression,
broadcast: true
}
websocket.send(serialize(helloRequest, false))
// Ok we connected, let's schedule ping
this.schedulePing()
return
}
websocket.onerror = (event: any) => {
console.error('client websocket error:', socketId, event, this.workspace, this.email)
void broadcastEvent(client.event.NetworkRequests, -1)
reject(new Error(`websocket error:${socketId}`))
// console.log('client websocket closed', socketId, ev?.reason)
void broadcastEvent(client.event.NetworkRequests, -1)
this.scheduleOpen(true)
}
wsocket.onopen = () => {
if (this.websocket !== wsocket) {
return
}
})
const useBinary = getMetadata(client.metadata.UseBinaryProtocol) ?? true
const useCompression = getMetadata(client.metadata.UseProtocolCompression) ?? false
clearTimeout(dialTimer)
const helloRequest: HelloRequest = {
method: 'hello',
params: [],
id: -1,
binary: useBinary,
compression: useCompression,
broadcast: true
}
this.websocket?.send(serialize(helloRequest, false))
}
wsocket.onerror = (event: any) => {
clearTimeout(dialTimer)
if (this.websocket !== wsocket) {
return
}
if (this.delay < 3) {
this.delay += 1
}
if (opened) {
console.error('client websocket error:', socketId, this.url, this.workspace, this.email)
}
void broadcastEvent(client.event.NetworkRequests, -1)
}
}
private async sendRequest (data: {
@ -406,43 +436,56 @@ class Connection implements ClientConnection {
// If not defined, on reconnect with timeout, will retry automatically.
retry?: () => Promise<boolean>
handleResult?: (result: any) => Promise<void>
once?: boolean // Require handleResult to retrieve result
measure?: (time: number, result: any, serverTime: number, queue: number) => void
}): Promise<any> {
if (this.closed) {
throw new PlatformError(unknownError('connection closed'))
}
if (data.once === true) {
// Check if has same request already then skip
for (const [, v] of this.requests) {
if (v.method === data.method && JSON.stringify(v.params) === JSON.stringify(data.params)) {
// We have same unanswered, do not add one more.
return
}
}
}
const id = this.lastId++
const promise = new RequestPromise(data.method, data.params, data.handleResult)
promise.handleTime = data.measure
const sendData = async (): Promise<void> => {
if (this.websocket instanceof Promise) {
this.websocket = await this.websocket
}
if (this.websocket === null) {
this.websocket = this.waitOpenConnection()
this.websocket = await this.websocket
}
this.requests.set(id, promise)
this.websocket.send(
serialize(
{
method: data.method,
params: data.params,
id
},
false
const w = this.waitOpenConnection()
if (w instanceof Promise) {
await w
}
this.requests.set(id, promise)
const sendData = (): void => {
if (this.websocket?.readyState === ClientSocketReadyState.OPEN) {
promise.startTime = Date.now()
this.websocket?.send(
serialize(
{
method: data.method,
params: data.params,
id
},
false
)
)
)
}
}
promise.reconnect = () => {
setTimeout(async () => {
// In case we don't have response yet.
if (this.requests.has(id) && ((await data.retry?.()) ?? true)) {
await sendData()
sendData()
}
}, 500)
}
await sendData()
sendData()
void broadcastEvent(client.event.NetworkRequests, this.requests.size)
return await promise.promise
}
@ -480,11 +523,15 @@ class Connection implements ClientConnection {
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> {
const st = Date.now()
const result = await this.sendRequest({ method: 'findAll', params: [_class, query, options] })
if (Date.now() - st > 1000) {
console.error('measure slow findAll', Date.now() - st, _class, query, options, result)
}
const result = await this.sendRequest({
method: 'findAll',
params: [_class, query, options],
measure: (time, result, serverTime, queue) => {
if (typeof window !== 'undefined' && time > 1000) {
console.error('measure slow findAll', time, serverTime, queue, _class, query, options, result)
}
}
})
return result
}

View File

@ -46,6 +46,8 @@ export interface ClientSocket {
close: (code?: number) => void
readyState: ClientSocketReadyState
bufferedAmount?: number
}
/**

View File

@ -24,7 +24,8 @@ import {
selectWorkspace,
sendInvite,
getEnpoint,
fetchWorkspace
fetchWorkspace,
createMissingEmployee
} from './utils'
/*!
* Anticrm Platform Login Plugin
@ -43,6 +44,7 @@ export default async () => ({
ChangePassword: changePassword,
SelectWorkspace: selectWorkspace,
FetchWorkspace: fetchWorkspace,
CreateEmployee: createMissingEmployee,
GetWorkspaces: getWorkspaces,
SendInvite: sendInvite,
GetEndpoint: getEnpoint

View File

@ -396,6 +396,53 @@ export async function fetchWorkspace (workspace: string): Promise<[Status, Works
return [unknownError(err), undefined]
}
}
export async function createMissingEmployee (workspace: string): Promise<[Status]> {
const accountsUrl = getMetadata(login.metadata.AccountsUrl)
if (accountsUrl === undefined) {
throw new Error('accounts url not specified')
}
const overrideToken = getMetadata(login.metadata.OverrideLoginToken)
if (overrideToken !== undefined) {
const endpoint = getMetadata(login.metadata.OverrideEndpoint)
if (endpoint !== undefined) {
return [OK]
}
}
const token = getMetadata(presentation.metadata.Token)
if (token === undefined) {
return [unknownStatus('Please login')]
}
const request = {
method: 'createMissingEmployee',
params: [token]
}
try {
const response = await fetch(accountsUrl, {
method: 'POST',
headers: {
Authorization: 'Bearer ' + token,
'Content-Type': 'application/json'
},
body: JSON.stringify(request)
})
const result = await response.json()
if (result.error == null) {
Analytics.handleEvent('Create missing employee')
Analytics.setTag('workspace', workspace)
} else {
await handleStatusError('Fetch workspace error', result.error)
}
return [result.error ?? OK]
} catch (err: any) {
Analytics.handleError(err)
return [unknownError(err)]
}
}
export function setLoginInfo (loginInfo: WorkspaceLoginInfo): void {
const tokens: Record<string, string> = fetchMetadataLocalStorage(login.metadata.LoginTokens) ?? {}

View File

@ -82,6 +82,7 @@ export default plugin(loginId, {
ChangePassword: '' as Resource<(oldPassword: string, password: string) => Promise<void>>,
SelectWorkspace: '' as Resource<(workspace: string) => Promise<[Status, WorkspaceLoginInfo | undefined]>>,
FetchWorkspace: '' as Resource<(workspace: string) => Promise<[Status, WorkspaceLoginInfo | undefined]>>,
CreateEmployee: '' as Resource<(workspace: string) => Promise<[Status]>>,
GetWorkspaces: '' as Resource<() => Promise<Workspace[]>>,
GetEndpoint: '' as Resource<() => Promise<string>>
}

View File

@ -7,6 +7,7 @@ import core, {
metricsToString,
setCurrentAccount,
versionToString,
type Account,
type AccountClient,
type Client,
type Version
@ -216,7 +217,10 @@ export async function connect (title: string): Promise<Client | undefined> {
_client = newClient
console.log('logging in as', email)
const me = await ctx.with('get-account', {}, async () => await newClient.getAccount())
let me: Account | undefined = await ctx.with('get-account', {}, async () => await newClient.getAccount())
if (me === undefined) {
me = await createEmployee(ctx, ws, me, newClient)
}
if (me !== undefined) {
Analytics.setUser(me.email)
Analytics.setTag('workspace', ws)
@ -224,6 +228,7 @@ export async function connect (title: string): Promise<Client | undefined> {
setCurrentAccount(me)
} else {
console.error('WARNING: no employee account found.')
clearMetadata(ws)
navigate({
path: [loginId],
@ -303,6 +308,28 @@ export async function connect (title: string): Promise<Client | undefined> {
return newClient
}
async function createEmployee (
ctx: MeasureMetricsContext,
ws: string,
me: Account,
newClient: AccountClient
): Promise<Account | undefined> {
const createEmployee = await getResource(login.function.CreateEmployee)
await ctx.with('create-missing-employee', {}, async () => {
await createEmployee(ws)
})
for (let i = 0; i < 5; i++) {
me = await ctx.with('get-account', {}, async () => await newClient.getAccount())
if (me !== undefined) {
break
}
await new Promise((resolve) => {
setTimeout(resolve, 100)
})
}
return me
}
function clearMetadata (ws: string): void {
const tokens = fetchMetadataLocalStorage(login.metadata.LoginTokens)
if (tokens !== null) {

View File

@ -64,7 +64,7 @@ export function registerGithub (
// Successful authentication, redirect to your application
ctx.redirect(concatLink(frontUrl, '/login/auth'))
} catch (err: any) {
await measureCtx.error('failed to auth', err)
measureCtx.error('failed to auth', err)
}
}
await next()

View File

@ -62,7 +62,7 @@ export function registerGoogle (
// Successful authentication, redirect to your application
ctx.redirect(concatLink(frontUrl, '/login/auth'))
} catch (err: any) {
await measureCtx.error('failed to auth', err)
measureCtx.error('failed to auth', err)
}
}
await next()

View File

@ -97,7 +97,7 @@ export function serveAccount (
class MyStream {
write (text: string): void {
void measureCtx.info(text)
measureCtx.info(text)
}
}
@ -181,12 +181,11 @@ export function serveAccount (
}
process.on('uncaughtException', (e) => {
void measureCtx.error('uncaughtException', { error: e })
measureCtx.error('uncaughtException', { error: e })
})
process.on('unhandledRejection', (reason, promise) => {
console.error('Unhandled Rejection at:', promise, 'reason:', reason)
void measureCtx.error('Unhandled Rejection at:', { reason, promise })
measureCtx.error('Unhandled Rejection at:', { reason, promise })
})
process.on('SIGINT', close)
process.on('SIGTERM', close)

View File

@ -251,7 +251,7 @@ async function getAccountInfoByToken (
email = decodeToken(token)?.email
} catch (err: any) {
Analytics.handleError(err)
await ctx.error('Invalid token', { token })
ctx.error('Invalid token', { token })
throw new PlatformError(new Status(Severity.ERROR, platform.status.Unauthorized, {}))
}
const account = await getAccount(db, email)
@ -292,11 +292,11 @@ export async function login (
confirmed: info.confirmed ?? true,
token: generateToken(email, getWorkspaceId('', productId), getExtra(info))
}
await ctx.info('login success', { email, productId })
ctx.info('login success', { email, productId })
return result
} catch (err: any) {
Analytics.handleError(err)
await ctx.error('login failed', { email, productId, _email, err })
ctx.error('login failed', { email, productId, _email, err })
throw err
}
}
@ -349,7 +349,7 @@ export async function selectWorkspace (
if (workspaceInfo !== null) {
if (workspaceInfo.disabled === true && workspaceInfo.creating !== true) {
await ctx.error('workspace disabled', { workspaceUrl, email })
ctx.error('workspace disabled', { workspaceUrl, email })
throw new PlatformError(
new Status(Severity.ERROR, platform.status.WorkspaceNotFound, { workspace: workspaceUrl })
)
@ -371,7 +371,7 @@ export async function selectWorkspace (
}
}
}
await ctx.error('workspace error', { workspaceUrl, email })
ctx.error('workspace error', { workspaceUrl, email })
throw new PlatformError(new Status(Severity.ERROR, platform.status.Forbidden, {}))
}
@ -387,17 +387,17 @@ export async function getInvite (db: Db, inviteId: ObjectId): Promise<Invite | n
*/
export async function checkInvite (ctx: MeasureContext, invite: Invite | null, email: string): Promise<WorkspaceId> {
if (invite === null || invite.limit === 0) {
void ctx.error('invite', { email, state: 'no invite or limit exceed' })
ctx.error('invite', { email, state: 'no invite or limit exceed' })
Analytics.handleError(new Error(`no invite or invite limit exceed ${email}`))
throw new PlatformError(new Status(Severity.ERROR, platform.status.Forbidden, {}))
}
if (invite.exp < Date.now()) {
void ctx.error('invite', { email, state: 'link expired' })
ctx.error('invite', { email, state: 'link expired' })
Analytics.handleError(new Error(`invite link expired ${invite._id.toString()} ${email}`))
throw new PlatformError(new Status(Severity.ERROR, platform.status.ExpiredLink, {}))
}
if (invite.emailMask != null && invite.emailMask.trim().length > 0 && !new RegExp(invite.emailMask).test(email)) {
void ctx.error('invite', { email, state: 'mask to match', mask: invite.emailMask })
ctx.error('invite', { email, state: 'mask to match', mask: invite.emailMask })
Analytics.handleError(new Error(`invite link mask failed ${invite._id.toString()} ${email} ${invite.emailMask}`))
throw new PlatformError(new Status(Severity.ERROR, platform.status.Forbidden, {}))
}
@ -426,7 +426,7 @@ export async function join (
const email = cleanEmail(_email)
const invite = await getInvite(db, inviteId)
const workspace = await checkInvite(ctx, invite, email)
await ctx.info(`join attempt:${email}, ${workspace.name}`)
ctx.info(`join attempt:${email}, ${workspace.name}`)
const ws = await assignWorkspace(ctx, db, productId, email, workspace.name)
const token = (await login(ctx, db, productId, email, password)).token
@ -462,7 +462,7 @@ export async function confirm (ctx: MeasureContext, db: Db, productId: string, t
const decode = decodeToken(token)
const _email = decode.extra?.confirm
if (_email === undefined) {
await ctx.error('confirm email invalid', { token: decode })
ctx.error('confirm email invalid', { token: decode })
throw new PlatformError(new Status(Severity.ERROR, platform.status.AccountNotFound, { account: _email }))
}
const email = cleanEmail(_email)
@ -473,7 +473,7 @@ export async function confirm (ctx: MeasureContext, db: Db, productId: string, t
email,
token: generateToken(email, getWorkspaceId('', productId), getExtra(account))
}
await ctx.info('confirm success', { email, productId })
ctx.info('confirm success', { email, productId })
return result
}
@ -576,7 +576,7 @@ export async function createAcc (
const systemEmails = [systemAccountEmail]
if (systemEmails.includes(email)) {
await ctx.error('system email used for account', { email })
ctx.error('system email used for account', { email })
throw new PlatformError(new Status(Severity.ERROR, platform.status.AccountAlreadyExists, { account: email }))
}
@ -607,11 +607,11 @@ export async function createAcc (
if (sesURL !== undefined && sesURL !== '') {
await sendConfirmation(productId, newAccount)
} else {
await ctx.info('Please provide email service url to enable email confirmations.')
ctx.info('Please provide email service url to enable email confirmations.')
await confirmEmail(db, email)
}
}
await ctx.info('account created', { account: email })
ctx.info('account created', { account: email })
return newAccount
}
@ -865,10 +865,10 @@ export async function createWorkspace (
const childLogger = ctx.newChild('createWorkspace', { workspace: workspaceInfo.workspace })
const ctxModellogger: ModelLogger = {
log: (msg, data) => {
void childLogger.info(msg, data)
childLogger.info(msg, data)
},
error: (msg, data) => {
void childLogger.error(msg, data)
childLogger.error(msg, data)
}
}
let model: Tx[] = []
@ -961,7 +961,7 @@ export async function upgradeWorkspace (
if (ws?.version !== undefined && !forceUpdate && versionStr === versionToString(ws.version)) {
return versionStr
}
await ctx.info('upgrading', {
ctx.info('upgrading', {
force: forceUpdate,
currentVersion: ws?.version !== undefined ? versionToString(ws.version) : '',
toVersion: versionStr,
@ -995,7 +995,7 @@ export const createUserWorkspace =
async (ctx: MeasureContext, db: Db, productId: string, token: string, workspaceName: string): Promise<LoginInfo> => {
const { email } = decodeToken(token)
await ctx.info('Creating workspace', { workspaceName, email })
ctx.info('Creating workspace', { workspaceName, email })
const info = await getAccount(db, email)
@ -1041,7 +1041,7 @@ export const createUserWorkspace =
try {
await assignWorkspace(ctx, db, productId, email, workspace.workspace, shouldUpdateAccount, client)
await setRole(email, workspace.workspace, productId, AccountRole.Owner, client)
await ctx.info('Creating server side done', { workspaceName, email })
ctx.info('Creating server side done', { workspaceName, email })
} catch (err: any) {
Analytics.handleError(err)
} finally {
@ -1051,7 +1051,7 @@ export const createUserWorkspace =
)
if (err != null) {
await ctx.error('failed to create workspace', { err, workspaceName, email })
ctx.error('failed to create workspace', { err, workspaceName, email })
// We need to drop workspace, to prevent wrong data usage.
await db.collection(WORKSPACE_COLLECTION).updateOne(
@ -1083,7 +1083,7 @@ export const createUserWorkspace =
productId,
workspace: workspaceInfo.workspaceUrl
}
await ctx.info('Creating user side done', { workspaceName, email })
ctx.info('Creating user side done', { workspaceName, email })
return result
}
@ -1102,12 +1102,12 @@ export async function getInviteLink (
const { workspace, email } = decodeToken(token)
const wsPromise = await getWorkspaceById(db, productId, workspace.name)
if (wsPromise === null) {
await ctx.error('workspace not found', { workspace, email })
ctx.error('workspace not found', { workspace, email })
throw new PlatformError(
new Status(Severity.ERROR, platform.status.WorkspaceNotFound, { workspace: workspace.name })
)
}
await ctx.info('Getting invite link', { workspace: workspace.name, emailMask, limit })
ctx.info('Getting invite link', { workspace: workspace.name, emailMask, limit })
const result = await db.collection(INVITE_COLLECTION).insertOne({
workspace,
exp: Date.now() + exp,
@ -1149,7 +1149,7 @@ export async function getUserWorkspaces (
const { email } = decodeToken(token)
const account = await getAccount(db, email)
if (account === null) {
await ctx.error('account not found', { email })
ctx.error('account not found', { email })
return []
}
return (
@ -1272,6 +1272,26 @@ export async function setRole (
}
}
/**
* @public
*/
export async function createMissingEmployee (
ctx: MeasureContext,
db: Db,
productId: string,
token: string
): Promise<void> {
const { email } = decodeToken(token)
const wsInfo = await getWorkspaceInfo(ctx, db, productId, token)
const account = await getAccount(db, email)
if (account === null) {
throw new PlatformError(new Status(Severity.ERROR, platform.status.AccountNotFound, { account: email }))
}
await createPersonAccount(account, productId, wsInfo.workspaceId, true)
}
/**
* @public
*/
@ -1289,7 +1309,7 @@ export async function assignWorkspace (
const initWS = getMetadata(toolPlugin.metadata.InitWorkspace)
if (initWS !== undefined && initWS === workspaceId) {
Analytics.handleError(new Error(`assign-workspace failed ${email} ${workspaceId}`))
await ctx.error('assign-workspace failed', { email, workspaceId, reason: 'initWs === workspaceId' })
ctx.error('assign-workspace failed', { email, workspaceId, reason: 'initWs === workspaceId' })
throw new PlatformError(new Status(Severity.ERROR, platform.status.Forbidden, {}))
}
const workspaceInfo = await getWorkspaceAndAccount(ctx, db, productId, email, workspaceId)
@ -1308,7 +1328,7 @@ export async function assignWorkspace (
// Add account into workspace.
await assignWorkspaceRaw(db, workspaceInfo)
await ctx.info('assign-workspace success', { email, workspaceId })
ctx.info('assign-workspace success', { email, workspaceId })
return workspaceInfo.workspace
}
@ -1482,7 +1502,7 @@ export async function changePassword (
const hash = hashWithSalt(password, salt)
await db.collection(ACCOUNT_COLLECTION).updateOne({ _id: account._id }, { $set: { salt, hash } })
await ctx.info('change-password success', { email })
ctx.info('change-password success', { email })
}
/**
@ -1490,7 +1510,7 @@ export async function changePassword (
*/
export async function changeEmail (ctx: MeasureContext, db: Db, account: Account, newEmail: string): Promise<void> {
await db.collection<Account>(ACCOUNT_COLLECTION).updateOne({ _id: account._id }, { $set: { email: newEmail } })
await ctx.info('change-email success', { email: newEmail })
ctx.info('change-email success', { email: newEmail })
}
/**
@ -1516,7 +1536,7 @@ export async function requestPassword (ctx: MeasureContext, db: Db, productId: s
const account = await getAccount(db, email)
if (account === null) {
await ctx.info('account not found', { email })
ctx.info('account not found', { email })
throw new PlatformError(new Status(Severity.ERROR, platform.status.AccountNotFound, { account: email }))
}
@ -1556,7 +1576,7 @@ export async function requestPassword (ctx: MeasureContext, db: Db, productId: s
to
})
})
await ctx.info('recovery email sent', { email, accountEmail: account.email })
ctx.info('recovery email sent', { email, accountEmail: account.email })
}
/**
@ -1609,7 +1629,7 @@ export async function removeWorkspace (
// Add account a workspace
await db.collection(ACCOUNT_COLLECTION).updateOne({ _id: account._id }, { $pull: { workspaces: workspace._id } })
await ctx.info('Workspace removed', { email, workspace })
ctx.info('Workspace removed', { email, workspace })
}
/**
@ -1627,7 +1647,7 @@ export async function checkJoin (
const workspace = await checkInvite(ctx, invite, email)
const ws = await getWorkspaceById(db, productId, workspace.name)
if (ws === null) {
await ctx.error('workspace not found', { name: workspace.name, email, inviteId })
ctx.error('workspace not found', { name: workspace.name, email, inviteId })
throw new PlatformError(
new Status(Severity.ERROR, platform.status.WorkspaceNotFound, { workspace: workspace.name })
)
@ -1653,7 +1673,7 @@ export async function dropWorkspace (
.collection<Account>(ACCOUNT_COLLECTION)
.updateMany({ _id: { $in: ws.accounts ?? [] } }, { $pull: { workspaces: ws._id } })
await ctx.info('Workspace dropped', { workspace: ws.workspace })
ctx.info('Workspace dropped', { workspace: ws.workspace })
}
/**
@ -1680,7 +1700,7 @@ export async function dropAccount (ctx: MeasureContext, db: Db, productId: strin
await db
.collection<Workspace>(WORKSPACE_COLLECTION)
.updateMany({ _id: { $in: account.workspaces } }, { $pull: { accounts: account._id } })
await ctx.info('Account Dropped', { email, account })
ctx.info('Account Dropped', { email, account })
}
/**
@ -1718,7 +1738,7 @@ export async function leaveWorkspace (
.collection<Account>(ACCOUNT_COLLECTION)
.updateOne({ _id: account._id }, { $pull: { workspaces: workspace._id } })
}
await ctx.info('Account removed from workspace', { email, workspace })
ctx.info('Account removed from workspace', { email, workspace })
}
/**
@ -1781,7 +1801,7 @@ export async function sendInvite (
to
})
})
await ctx.info('Invite sent', { email, workspace, link })
ctx.info('Invite sent', { email, workspace, link })
}
async function deactivatePersonAccount (
@ -1803,7 +1823,7 @@ async function deactivatePersonAccount (
active: false
})
}
await ctx.info('account deactivated', { email, workspace })
ctx.info('account deactivated', { email, workspace })
}
} finally {
await connection.close()
@ -1835,9 +1855,9 @@ function wrap (
: new Status(Severity.ERROR, platform.status.InternalServerError, {})
if (status.code === platform.status.InternalServerError) {
Analytics.handleError(err)
void ctx.error('error', { status, err })
ctx.error('error', { status, err })
} else {
void ctx.error('error', { status })
ctx.error('error', { status })
}
return {
error: status
@ -1970,7 +1990,8 @@ export function getMethods (
restorePassword: wrap(restorePassword),
sendInvite: wrap(sendInvite),
confirm: wrap(confirm),
getAccountInfoByToken: wrap(getAccountInfoByToken)
getAccountInfoByToken: wrap(getAccountInfoByToken),
createMissingEmployee: wrap(createMissingEmployee)
// updateAccount: wrap(updateAccount)
}
}

View File

@ -28,6 +28,8 @@ export interface UpgradeOptions {
console: boolean
logs: string
parallel: number
ignore?: string
}
export class UpgradeWorker {
@ -61,17 +63,17 @@ export class UpgradeWorker {
}
private async _upgradeWorkspace (ctx: MeasureContext, ws: WorkspaceInfo, opt: UpgradeOptions): Promise<void> {
if (ws.disabled === true) {
if (ws.disabled === true || (opt.ignore ?? '').includes(ws.workspace)) {
return
}
const t = Date.now()
const ctxModelLogger: ModelLogger = {
log (msg: string, data: any): void {
void ctx.info(msg, data)
ctx.info(msg, data)
},
error (msg: string, data: any): void {
void ctx.error(msg, data)
ctx.error(msg, data)
}
}
@ -79,7 +81,7 @@ export class UpgradeWorker {
const avgTime = (Date.now() - this.st) / (this.total - this.toProcess + 1)
this.eta = Math.floor(avgTime * this.toProcess)
await ctx.info('----------------------------------------------------------\n---UPGRADING----', {
ctx.info('----------------------------------------------------------\n---UPGRADING----', {
pending: this.toProcess,
eta: this.eta,
workspace: ws.workspace
@ -97,7 +99,7 @@ export class UpgradeWorker {
logger,
opt.force
)
await ctx.info('---done---------', {
ctx.info('---done---------', {
pending: this.toProcess,
time: Date.now() - t,
workspace: ws.workspace,
@ -109,10 +111,10 @@ export class UpgradeWorker {
logger.log('error', err)
if (!opt.console) {
await ctx.error('error', err)
ctx.error('error', err)
}
await ctx.info('---failed---------', {
ctx.info('---failed---------', {
pending: this.toProcess,
time: Date.now() - t,
workspace: ws.workspace
@ -149,7 +151,7 @@ export class UpgradeWorker {
if (opt.parallel !== 0) {
const parallel = opt.parallel
const rateLimit = new RateLimiter(parallel)
await ctx.info('parallel upgrade', { parallel })
ctx.info('parallel upgrade', { parallel })
await Promise.all(
workspaces.map((it) =>
rateLimit.add(async () => {
@ -159,14 +161,14 @@ export class UpgradeWorker {
})
)
)
await ctx.info('Upgrade done')
ctx.info('Upgrade done')
} else {
await ctx.info('UPGRADE write logs at:', { logs: opt.logs })
ctx.info('UPGRADE write logs at:', { logs: opt.logs })
for (const ws of workspaces) {
await this._upgradeWorkspace(ctx, ws, opt)
}
if (withError.length > 0) {
await ctx.info('Failed workspaces', withError)
ctx.info('Failed workspaces', withError)
}
}
}

View File

@ -50,9 +50,9 @@ export function startBackup (ctx: MeasureContext): void {
process.on('SIGINT', shutdown)
process.on('SIGTERM', shutdown)
process.on('uncaughtException', (e) => {
void ctx.error('uncaughtException', { err: e })
ctx.error('uncaughtException', { err: e })
})
process.on('unhandledRejection', (e) => {
void ctx.error('unhandledRejection', { err: e })
ctx.error('unhandledRejection', { err: e })
})
}

View File

@ -147,7 +147,7 @@ async function loadDigest (
result.delete(k as Ref<Doc>)
}
} catch (err: any) {
await ctx.error('digest is broken, will do full backup for', { domain })
ctx.error('digest is broken, will do full backup for', { domain })
}
}
// Stop if stop date is matched and provided
@ -392,14 +392,14 @@ export async function backup (
mode: 'backup'
})) as unknown as CoreClient & BackupClient
)
await ctx.info('starting backup', { workspace: workspaceId.name })
ctx.info('starting backup', { workspace: workspaceId.name })
let canceled = false
let timer: any
if (timeout > 0) {
timer = setTimeout(() => {
void ctx.error('Timeout during backup', { workspace: workspaceId.name, timeout: timeout / 1000 })
ctx.error('Timeout during backup', { workspace: workspaceId.name, timeout: timeout / 1000 })
canceled = true
}, timeout)
}
@ -411,7 +411,7 @@ export async function backup (
.domains()
.filter((it) => it !== DOMAIN_TRANSIENT && it !== DOMAIN_MODEL && !skipDomains.includes(it))
]
await ctx.info('domains for dump', { domains: domains.length })
ctx.info('domains for dump', { domains: domains.length })
let backupInfo: BackupInfo = {
workspace: workspaceId.name,
@ -440,7 +440,7 @@ export async function backup (
)
if (lastTx !== undefined) {
if (lastTx._id === backupInfo.lastTxId && !force) {
await ctx.info('No transaction changes. Skipping backup.', { workspace: workspaceId.name })
ctx.info('No transaction changes. Skipping backup.', { workspace: workspaceId.name })
return
}
}
@ -481,7 +481,7 @@ export async function backup (
for (const { id, hash, size } of currentChunk.docs) {
processed++
if (Date.now() - st > 2500) {
await ctx.info('processed', {
ctx.info('processed', {
processed,
digest: digest.size,
time: Date.now() - st,
@ -522,7 +522,7 @@ export async function backup (
}
} catch (err: any) {
console.error(err)
await ctx.error('failed to load chunks', { error: err })
ctx.error('failed to load chunks', { error: err })
if (idx !== undefined) {
await ctx.with('loadChunk', {}, async () => {
await connection.closeChunk(idx as number)
@ -577,7 +577,7 @@ export async function backup (
)
if (needRetrieveChunks.length > 0) {
await ctx.info('dumping domain...', { workspace: workspaceId.name, domain })
ctx.info('dumping domain...', { workspace: workspaceId.name, domain })
}
while (needRetrieveChunks.length > 0) {
@ -586,7 +586,7 @@ export async function backup (
}
const needRetrieve = needRetrieveChunks.shift() as Ref<Doc>[]
await ctx.info('Retrieve chunk', {
ctx.info('Retrieve chunk', {
needRetrieve: needRetrieveChunks.reduce((v, docs) => v + docs.length, 0),
toLoad: needRetrieve.length,
workspace: workspaceId.name
@ -595,7 +595,7 @@ export async function backup (
try {
docs = await ctx.with('load-docs', {}, async (ctx) => await connection.loadDocs(domain, needRetrieve))
} catch (err: any) {
await ctx.error('error loading docs', { domain, err, workspace: workspaceId.name })
ctx.error('error loading docs', { domain, err, workspace: workspaceId.name })
// Put back.
needRetrieveChunks.push(needRetrieve)
continue
@ -631,7 +631,7 @@ export async function backup (
_pack = pack()
stIndex++
const storageFile = join(backupIndex, `${domain}-data-${snapshot.date}-${stIndex}.tar.gz`)
await ctx.info('storing from domain', { domain, storageFile, workspace: workspaceId.name })
ctx.info('storing from domain', { domain, storageFile, workspace: workspaceId.name })
domainInfo.storage = [...(domainInfo.storage ?? []), storageFile]
const dataStream = await storage.write(storageFile)
const storageZip = createGzip({ level: defaultLevel })
@ -718,9 +718,9 @@ export async function backup (
await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel }))
}
} catch (err: any) {
await ctx.error('backup error', { err, workspace: workspaceId.name })
ctx.error('backup error', { err, workspace: workspaceId.name })
} finally {
await ctx.info('end backup', { workspace: workspaceId.name })
ctx.info('end backup', { workspace: workspaceId.name })
await connection.close()
ctx.end()
if (timeout !== -1) {

View File

@ -77,7 +77,7 @@ class BackupWorker {
return
}
index++
await ctx.info('\n\nBACKUP WORKSPACE ', {
ctx.info('\n\nBACKUP WORKSPACE ', {
workspace: ws.workspace,
productId: ws.productId,
index,
@ -102,7 +102,7 @@ class BackupWorker {
)
})
} catch (err: any) {
await ctx.error('\n\nFAILED to BACKUP', { workspace: ws.workspace, err })
ctx.error('\n\nFAILED to BACKUP', { workspace: ws.workspace, err })
}
}
}

View File

@ -84,7 +84,7 @@ export async function loadCollaborativeDoc (
for (const source of sources) {
const { documentId, versionId } = collaborativeDocParse(source)
await ctx.info('loading collaborative document', { source })
ctx.info('loading collaborative document', { source })
const ydoc = await loadCollaborativeDocVersion(ctx, storageAdapter, workspace, documentId, versionId)
if (ydoc !== undefined) {

View File

@ -49,7 +49,7 @@ export class StorageExtension implements Extension {
}
async onLoadDocument ({ context, documentName }: withContext<onLoadDocumentPayload>): Promise<any> {
await this.configuration.ctx.info('load document', { documentName })
this.configuration.ctx.info('load document', { documentName })
return await this.configuration.ctx.with('load-document', {}, async () => {
return await this.loadDocument(documentName as DocumentId, context)
})
@ -58,11 +58,11 @@ export class StorageExtension implements Extension {
async onStoreDocument ({ context, documentName, document }: withContext<onStoreDocumentPayload>): Promise<void> {
const { ctx } = this.configuration
await ctx.info('store document', { documentName })
ctx.info('store document', { documentName })
const collaborators = this.collaborators.get(documentName)
if (collaborators === undefined || collaborators.size === 0) {
await ctx.info('no changes for document', { documentName })
ctx.info('no changes for document', { documentName })
return
}
@ -75,7 +75,7 @@ export class StorageExtension implements Extension {
async onConnect ({ context, documentName, instance }: withContext<onConnectPayload>): Promise<any> {
const connections = instance.documents.get(documentName)?.getConnectionsCount() ?? 0
const params = { documentName, connectionId: context.connectionId, connections }
await this.configuration.ctx.info('connect to document', params)
this.configuration.ctx.info('connect to document', params)
}
async onDisconnect ({ context, documentName, document }: withContext<onDisconnectPayload>): Promise<any> {
@ -83,11 +83,11 @@ export class StorageExtension implements Extension {
const { connectionId } = context
const params = { documentName, connectionId, connections: document.getConnectionsCount() }
await ctx.info('disconnect from document', params)
ctx.info('disconnect from document', params)
const collaborators = this.collaborators.get(documentName)
if (collaborators === undefined || !collaborators.has(connectionId)) {
await ctx.info('no changes for document', { documentName })
ctx.info('no changes for document', { documentName })
return
}
@ -98,7 +98,7 @@ export class StorageExtension implements Extension {
}
async afterUnloadDocument ({ documentName }: afterUnloadDocumentPayload): Promise<any> {
await this.configuration.ctx.info('unload document', { documentName })
this.configuration.ctx.info('unload document', { documentName })
this.collaborators.delete(documentName)
}
@ -110,7 +110,7 @@ export class StorageExtension implements Extension {
return await adapter.loadDocument(ctx, documentId, context)
})
} catch (err) {
await ctx.error('failed to load document content', { documentId, error: err })
ctx.error('failed to load document content', { documentId, error: err })
return undefined
}
}
@ -123,7 +123,7 @@ export class StorageExtension implements Extension {
await adapter.saveDocument(ctx, documentId, document, context)
})
} catch (err) {
await ctx.error('failed to save document content', { documentId, error: err })
ctx.error('failed to save document content', { documentId, error: err })
return undefined
}
}

View File

@ -46,7 +46,7 @@ export async function removeDocument (
try {
await minio.remove(ctx, workspaceId, [minioDocumentId, historyDocumentId])
} catch (err) {
await ctx.error('failed to remove document', { documentId, error: err })
ctx.error('failed to remove document', { documentId, error: err })
}
return {}

View File

@ -51,7 +51,7 @@ export async function start (
): Promise<Shutdown> {
const port = config.Port
await ctx.info('Starting collaborator server', { port })
ctx.info('Starting collaborator server', { port })
const app = express()
app.use(cors())
@ -210,7 +210,7 @@ export async function start (
server.listen(port)
await ctx.info('Running collaborator server', { port })
ctx.info('Running collaborator server', { port })
return async () => {
server.close()

View File

@ -50,15 +50,15 @@ export async function startCollaborator (): Promise<void> {
void shutdown().then(() => {
void mongoClient.close()
})
void metricsContext.info('closed')
metricsContext.info('closed')
}
process.on('uncaughtException', (e) => {
void metricsContext.error('UncaughtException', { error: e })
metricsContext.error('UncaughtException', { error: e })
})
process.on('unhandledRejection', (reason, promise) => {
void metricsContext.error('Unhandled Rejection at:', { promise, reason })
metricsContext.error('Unhandled Rejection at:', { promise, reason })
})
process.on('SIGINT', close)

View File

@ -54,51 +54,51 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
try {
// try to load document content
try {
await ctx.info('load document content', { documentId })
ctx.info('load document content', { documentId })
const ydoc = await this.loadDocumentFromStorage(ctx, documentId, context)
if (ydoc !== undefined) {
return ydoc
}
} catch (err) {
await ctx.error('failed to load document content', { documentId, error: err })
ctx.error('failed to load document content', { documentId, error: err })
}
// then try to load from inital content
const { initialContentId } = context
if (initialContentId !== undefined && initialContentId.length > 0) {
try {
await ctx.info('load document initial content', { documentId, initialContentId })
ctx.info('load document initial content', { documentId, initialContentId })
const ydoc = await this.loadDocumentFromStorage(ctx, initialContentId, context)
// if document was loaded from the initial content or storage we need to save
// it to ensure the next time we load it from the ydoc document
if (ydoc !== undefined) {
await ctx.info('save document content', { documentId, initialContentId })
ctx.info('save document content', { documentId, initialContentId })
await this.saveDocumentToStorage(ctx, documentId, ydoc, context)
return ydoc
}
} catch (err) {
await ctx.error('failed to load initial document content', { documentId, initialContentId, error: err })
ctx.error('failed to load initial document content', { documentId, initialContentId, error: err })
}
}
// finally try to load from the platform
const { platformDocumentId } = context
if (platformDocumentId !== undefined) {
await ctx.info('load document platform content', { documentId, platformDocumentId })
ctx.info('load document platform content', { documentId, platformDocumentId })
const ydoc = await ctx.with('load-document', { storage: 'platform' }, async (ctx) => {
try {
return await this.loadDocumentFromPlatform(ctx, platformDocumentId, context)
} catch (err) {
await ctx.error('failed to load platform document', { documentId, platformDocumentId, error: err })
ctx.error('failed to load platform document', { documentId, platformDocumentId, error: err })
}
})
// if document was loaded from the initial content or storage we need to save
// it to ensure the next time we load it from the ydoc document
if (ydoc !== undefined) {
await ctx.info('save document content', { documentId, platformDocumentId })
ctx.info('save document content', { documentId, platformDocumentId })
await this.saveDocumentToStorage(ctx, documentId, ydoc, context)
return ydoc
}
@ -107,29 +107,29 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
// nothing found
return undefined
} catch (err) {
await ctx.error('failed to load document', { documentId, error: err })
ctx.error('failed to load document', { documentId, error: err })
}
}
async saveDocument (ctx: MeasureContext, documentId: DocumentId, document: YDoc, context: Context): Promise<void> {
let snapshot: YDocVersion | undefined
try {
await ctx.info('take document snapshot', { documentId })
ctx.info('take document snapshot', { documentId })
snapshot = await this.takeSnapshot(ctx, documentId, document, context)
} catch (err) {
await ctx.error('failed to take document snapshot', { documentId, error: err })
ctx.error('failed to take document snapshot', { documentId, error: err })
}
try {
await ctx.info('save document content', { documentId })
ctx.info('save document content', { documentId })
await this.saveDocumentToStorage(ctx, documentId, document, context)
} catch (err) {
await ctx.error('failed to save document', { documentId, error: err })
ctx.error('failed to save document', { documentId, error: err })
}
const { platformDocumentId } = context
if (platformDocumentId !== undefined) {
await ctx.info('save document content to platform', { documentId, platformDocumentId })
ctx.info('save document content to platform', { documentId, platformDocumentId })
await ctx.with('save-document', { storage: 'platform' }, async (ctx) => {
await this.saveDocumentToPlatform(ctx, documentId, platformDocumentId, document, snapshot, context)
})
@ -158,7 +158,7 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
try {
return await loadCollaborativeDoc(adapter, context.workspaceId, collaborativeDoc, ctx)
} catch (err) {
await ctx.error('failed to load storage document', { documentId, collaborativeDoc, error: err })
ctx.error('failed to load storage document', { documentId, collaborativeDoc, error: err })
return undefined
}
})
@ -249,7 +249,7 @@ export class PlatformStorageAdapter implements CollabStorageAdapter {
const attribute = client.getHierarchy().findAttribute(objectClass, objectAttr)
if (attribute === undefined) {
await ctx.info('attribute not found', { documentName, objectClass, objectAttr })
ctx.info('attribute not found', { documentName, objectClass, objectAttr })
return
}

View File

@ -97,7 +97,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
this.triggerIndexing()
await this.indexing
await this.flush(true)
await this.metrics.info('Cancel indexing', { workspace: this.workspace.name, indexId: this.indexId })
this.metrics.warn('Cancel indexing', { workspace: this.workspace.name, indexId: this.indexId })
}
async markRemove (doc: DocIndexState): Promise<void> {
@ -336,7 +336,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
try {
this.hierarchy.getClass(core.class.DocIndexState)
} catch (err: any) {
await this.metrics.info('Models is not upgraded to support indexer', {
this.metrics.warn('Models is not upgraded to support indexer', {
indexId: this.indexId,
workspace: this.workspace.name
})
@ -370,12 +370,12 @@ export class FullTextIndexPipeline implements FullTextPipeline {
_classes.forEach((it) => this.broadcastClasses.add(it))
if (this.triggerCounts > 0) {
await this.metrics.info('No wait, trigger counts', { triggerCount: this.triggerCounts })
this.metrics.info('No wait, trigger counts', { triggerCount: this.triggerCounts })
}
if (this.toIndex.size === 0 && this.stageChanged === 0 && this.triggerCounts === 0) {
if (this.toIndex.size === 0) {
await this.metrics.info('Indexing complete', { indexId: this.indexId, workspace: this.workspace.name })
this.metrics.warn('Indexing complete', { indexId: this.indexId, workspace: this.workspace.name })
}
if (!this.cancelling) {
// We need to send index update event
@ -401,7 +401,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
}
}
}
await this.metrics.info('Exit indexer', { indexId: this.indexId, workspace: this.workspace.name })
this.metrics.warn('Exit indexer', { indexId: this.indexId, workspace: this.workspace.name })
}
private async processIndex (ctx: MeasureContext): Promise<Ref<Class<Doc>>[]> {
@ -474,7 +474,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
}
if (result.length > 0) {
await this.metrics.info('Full text: Indexing', {
this.metrics.info('Full text: Indexing', {
indexId: this.indexId,
stageId: st.stageId,
workspace: this.workspace.name,
@ -531,7 +531,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
}
}
} catch (err: any) {
await this.metrics.error('error during index', { error: err })
this.metrics.error('error during index', { error: err })
}
}
})
@ -585,7 +585,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
if (toRemoveIds.length > 0) {
await this.storage.clean(this.metrics, DOMAIN_DOC_INDEX_STATE, toRemoveIds)
total += toRemoveIds.length
await this.metrics.info('indexer', {
this.metrics.info('indexer', {
_classes: Array.from(groupByArray(toIndex, (it) => it.objectClass).keys()),
total,
count: toRemoveIds.length

View File

@ -196,7 +196,7 @@ export class TServerStorage implements ServerStorage {
const txCUD = TxProcessor.extractTx(tx) as TxCUD<Doc>
if (!this.hierarchy.isDerived(txCUD._class, core.class.TxCUD)) {
// Skip unsupported tx
await ctx.error('Unsupported transaction', tx)
ctx.error('Unsupported transaction', tx)
continue
}
const domain = this.hierarchy.getDomain(txCUD.objectClass)
@ -394,7 +394,7 @@ export class TServerStorage implements ServerStorage {
{ clazz, query, options }
)
if (Date.now() - st > 1000) {
await ctx.error('FindAll', { time: Date.now() - st, clazz, query: cutObjectArray(query), options })
ctx.error('FindAll', { time: Date.now() - st, clazz, query: cutObjectArray(query), options })
}
return result
}
@ -791,7 +791,7 @@ export class TServerStorage implements ServerStorage {
await fx()
}
} catch (err: any) {
await ctx.error('error process tx', { error: err })
ctx.error('error process tx', { error: err })
throw err
} finally {
onEnds.forEach((p) => {

View File

@ -146,7 +146,7 @@ class ElasticAdapter implements FullTextAdapter {
}
if (k === 'workspaceId') {
if (va?.type !== 'keyword') {
await this.metrics().info('Force index-recreate, since wrong index type was used')
this.metrics().info('Force index-recreate, since wrong index type was used')
await this.client.indices.delete({
index: indexName
})

View File

@ -49,7 +49,7 @@ async function minioUpload (
{ file: file.name, contentType: file.mimetype }
)
await ctx.info('minio upload', resp)
ctx.info('minio upload', resp)
return id
}
@ -81,7 +81,7 @@ async function getFileRange (
): Promise<void> {
const stat = await ctx.with('stats', {}, async () => await client.stat(ctx, workspace, uuid))
if (stat === undefined) {
await ctx.error('No such key', { file: uuid })
ctx.error('No such key', { file: uuid })
res.status(404).send()
return
}
@ -127,7 +127,7 @@ async function getFileRange (
resolve()
})
dataStream.on('error', (err) => {
void ctx.error('error receive stream', { workspace: workspace.name, uuid, error: err })
ctx.error('error receive stream', { workspace: workspace.name, uuid, error: err })
Analytics.handleError(err)
res.end()
reject(err)
@ -138,12 +138,12 @@ async function getFileRange (
})
} catch (err: any) {
if (err?.code === 'NoSuchKey' || err?.code === 'NotFound') {
await ctx.info('No such key', { workspace: workspace.name, uuid })
ctx.info('No such key', { workspace: workspace.name, uuid })
res.status(404).send()
return
} else {
Analytics.handleError(err)
void ctx.error(err)
ctx.error(err)
}
res.status(500).send()
}
@ -162,7 +162,7 @@ async function getFile (
): Promise<void> {
const stat = await ctx.with('stat', {}, async () => await client.stat(ctx, workspace, uuid))
if (stat === undefined) {
await ctx.error('No such key', { file: req.query.file })
ctx.error('No such key', { file: req.query.file })
res.status(404).send()
return
}
@ -211,12 +211,12 @@ async function getFile (
dataStream.on('error', function (err) {
res.status(500).send()
Analytics.handleError(err)
void ctx.error('error', { err })
ctx.error('error', { err })
reject(err)
})
})
} catch (err: any) {
await ctx.error('get-file-error', { workspace: workspace.name, err })
ctx.error('get-file-error', { workspace: workspace.name, err })
Analytics.handleError(err)
res.status(500).send()
}
@ -261,7 +261,7 @@ export function start (
class MyStream {
write (text: string): void {
void ctx.info(text)
ctx.info(text)
}
}
@ -310,7 +310,7 @@ export function start (
})
res.end(json)
} catch (err: any) {
void ctx.error('statistics error', { err })
ctx.error('statistics error', { err })
Analytics.handleError(err)
res.writeHead(404, {})
res.end()
@ -348,7 +348,7 @@ export function start (
uuid = await getResizeID(ctx, size, uuid, config, payload)
const stat = await config.storageAdapter.stat(ctx, payload.workspace, uuid)
if (stat === undefined) {
await ctx.error('No such key', { file: req.query.file })
ctx.error('No such key', { file: req.query.file })
res.status(404).send()
return
}
@ -366,11 +366,11 @@ export function start (
res.end()
} catch (error: any) {
if (error?.code === 'NoSuchKey' || error?.code === 'NotFound') {
await ctx.error('No such key', { file: req.query.file })
ctx.error('No such key', { file: req.query.file })
res.status(404).send()
return
} else {
await ctx.error('error-handle-files', error)
ctx.error('error-handle-files', error)
}
res.status(500).send()
}
@ -447,11 +447,11 @@ export function start (
}
} catch (error: any) {
if (error?.code === 'NoSuchKey' || error?.code === 'NotFound') {
await ctx.error('No such key', { file: req.query.file })
ctx.error('No such key', { file: req.query.file })
res.status(404).send()
return
} else {
await ctx.error('error-handle-files', error)
ctx.error('error-handle-files', error)
}
res.status(500).send()
}
@ -507,7 +507,7 @@ export function start (
res.status(200).send(uuid)
} catch (error: any) {
await ctx.error('error-post-files', error)
ctx.error('error-post-files', error)
res.status(500).send()
}
},
@ -548,7 +548,7 @@ export function start (
res.status(200).send()
} catch (error: any) {
Analytics.handleError(error)
await ctx.error('failed to delete', { url: req.url })
ctx.error('failed to delete', { url: req.url })
res.status(500).send()
}
}
@ -617,25 +617,25 @@ export function start (
.catch((err: any) => {
if (err !== null) {
Analytics.handleError(err)
void ctx.error('error', { err })
ctx.error('error', { err })
res.status(500).send(err)
}
})
})
.on('error', function (err) {
Analytics.handleError(err)
void ctx.error('error', { err })
ctx.error('error', { err })
res.status(500).send(err)
})
})
.on('error', (e) => {
Analytics.handleError(e)
void ctx.error('error', { e })
ctx.error('error', { e })
res.status(500).send(e)
})
} catch (error: any) {
Analytics.handleError(error)
void ctx.error('error', { error })
ctx.error('error', { error })
res.status(500).send()
}
})
@ -695,19 +695,19 @@ export function start (
})
.catch((err: any) => {
Analytics.handleError(err)
void ctx.error('error', { err })
ctx.error('error', { err })
res.status(500).send(err)
})
})
.on('error', function (err) {
Analytics.handleError(err)
void ctx.error('error', { err })
ctx.error('error', { err })
res.status(500).send(err)
})
})
} catch (error: any) {
Analytics.handleError(error)
void ctx.error('error', { error })
ctx.error('error', { error })
res.status(500).send()
}
})

View File

@ -128,7 +128,7 @@ export class MinioService implements StorageAdapter {
version: result.versionId ?? null
}
} catch (err: any) {
await ctx.error('no object found', err)
ctx.error('no object found', { error: err, objectName, workspaceId: workspaceId.name })
}
}

View File

@ -839,7 +839,7 @@ abstract class MongoAdapterBase implements DbAdapter {
)
})
} catch (err: any) {
await ctx.error('failed on bulk write', { error: err, skip })
ctx.error('failed on bulk write', { error: err, skip })
if (skip !== 1) {
ops.push(...part)
skip = 1 // Let's update one by one, to loose only one failed variant.

View File

@ -63,6 +63,8 @@ export interface Response<R> {
index: number
final: boolean
}
time?: number // Server time to perform operation
queue?: number
}
/**

View File

@ -26,15 +26,16 @@ export function createAPMAgent (apmUrl: string): Agent {
export class APMMeasureContext implements MeasureContext {
logger: MeasureLogger
private readonly transaction?: Transaction | Span
private readonly parent?: Transaction | Span
private readonly parentTx?: Transaction | Span
constructor (
private readonly agent: Agent,
name: string,
params: Record<string, ParamType>,
parent?: Transaction | Span,
noTransaction?: boolean
parentTx?: Transaction | Span,
noTransaction?: boolean,
readonly parent?: MeasureContext
) {
this.parent = parent
this.parentTx = parentTx
this.logger = {
info: (msg, args) => {
agent.logger.info({ message: msg, ...args })
@ -42,14 +43,17 @@ export class APMMeasureContext implements MeasureContext {
error: (msg, args) => {
agent.logger.error({ message: msg, ...args })
},
warn: (msg, args) => {
agent.logger.warn({ message: msg, ...args })
},
logOperation (operation, time, params) {},
close: async () => {}
}
if (!(noTransaction ?? false)) {
if (this.parent === undefined) {
if (this.parentTx === undefined) {
this.transaction = agent.startTransaction(name) ?? undefined
} else {
this.transaction = agent.startSpan(name, { childOf: this.parent }) ?? undefined
this.transaction = agent.startSpan(name, { childOf: this.parentTx }) ?? undefined
}
for (const [k, v] of Object.entries(params)) {
this.transaction?.setLabel(k, v)
@ -58,7 +62,7 @@ export class APMMeasureContext implements MeasureContext {
}
newChild (name: string, params: Record<string, ParamType>): MeasureContext {
return new APMMeasureContext(this.agent, name, params, this.transaction)
return new APMMeasureContext(this.agent, name, params, this.transaction, undefined, this)
}
measure (name: string, value: number): void {}
@ -78,7 +82,7 @@ export class APMMeasureContext implements MeasureContext {
c.end()
return value
} catch (err: any) {
await c.error(err)
c.error(err)
throw err
}
}
@ -95,20 +99,19 @@ export class APMMeasureContext implements MeasureContext {
return r
}
async error (message: string, ...args: any[]): Promise<void> {
error (message: string, ...args: any[]): void {
this.logger.error(message, args)
await new Promise<void>((resolve) => {
this.agent.captureError({ message, params: args }, () => {
resolve()
})
})
this.agent.captureError({ message, params: args })
}
async info (message: string, ...args: any[]): Promise<void> {
info (message: string, ...args: any[]): void {
this.logger.info(message, args)
}
warn (message: string, ...args: any[]): void {
this.logger.warn(message, args)
}
end (): void {
this.transaction?.end()
}

View File

@ -91,7 +91,7 @@ class StorageBlobAdapter implements DbAdapter {
for (const item of docs) {
const stat = await this.client.stat(this.ctx, this.workspaceId, item)
if (stat === undefined) {
await ctx.error('Could not find blob', { domain, item, allDocs: cutObjectArray(docs) })
ctx.error('Could not find blob', { domain, item, allDocs: cutObjectArray(docs) })
continue
}
const chunks: Buffer[] = await this.client.read(this.ctx, this.workspaceId, item)

View File

@ -21,6 +21,7 @@ import core, {
systemAccountEmail,
toWorkspaceString,
versionToString,
withContext,
type BaseWorkspaceInfo,
type MeasureContext,
type Tx,
@ -160,7 +161,7 @@ class TSessionManager implements SessionManager {
}
if (diff > timeout && this.ticks % 10 === 0) {
void this.ctx.error('session hang, closing...', { wsId, user: s[1].session.getUser() })
this.ctx.warn('session hang, closing...', { wsId, user: s[1].session.getUser() })
// Force close workspace if only one client and it hang.
void this.close(s[1].socket, workspace.workspaceId)
@ -177,7 +178,7 @@ class TSessionManager implements SessionManager {
for (const r of s[1].session.requests.values()) {
if (now - r.start > 30000) {
void this.ctx.info('request hang found, 30sec', {
this.ctx.warn('request hang found, 30sec', {
wsId,
user: s[1].session.getUser(),
...r.params
@ -190,7 +191,7 @@ class TSessionManager implements SessionManager {
if (workspace.sessions.size === 0 && workspace.closing === undefined) {
workspace.softShutdown--
if (workspace.softShutdown <= 0) {
void this.ctx.info('closing workspace, no users', {
this.ctx.warn('closing workspace, no users', {
workspace: workspace.workspaceId.name,
wsId,
upgrade: workspace.upgrade,
@ -209,7 +210,8 @@ class TSessionManager implements SessionManager {
return this.sessionFactory(token, pipeline, this.broadcast.bind(this))
}
async getWorkspaceInfo (accounts: string, token: string): Promise<WorkspaceLoginInfo> {
@withContext('get-workspace-info')
async getWorkspaceInfo (ctx: MeasureContext, accounts: string, token: string): Promise<WorkspaceLoginInfo> {
const userInfo = await (
await fetch(accounts, {
method: 'POST',
@ -224,15 +226,16 @@ class TSessionManager implements SessionManager {
})
).json()
if (userInfo.error !== undefined) {
await this.ctx.error('Error response from account service', { error: JSON.stringify(userInfo) })
ctx.error('Error response from account service', { error: JSON.stringify(userInfo) })
throw new Error(JSON.stringify(userInfo.error))
}
return { ...userInfo.result, upgrade: userInfo.upgrade }
}
@withContext('📲 add-session')
async addSession (
baseCtx: MeasureContext,
ctx: MeasureContext,
ws: ConnectionSocket,
token: Token,
rawToken: string,
@ -245,134 +248,127 @@ class TSessionManager implements SessionManager {
| { upgrade: true, upgradeInfo?: WorkspaceLoginInfo['upgrade'] }
| { error: any }
> {
return await baseCtx.with('📲 add-session', {}, async (ctx) => {
const wsString = toWorkspaceString(token.workspace, '@')
const wsString = toWorkspaceString(token.workspace, '@')
let workspaceInfo = await ctx.with('check-token', {}, async (ctx) =>
accountsUrl !== '' ? await this.getWorkspaceInfo(accountsUrl, rawToken) : this.wsFromToken(token)
let workspaceInfo =
accountsUrl !== '' ? await this.getWorkspaceInfo(ctx, accountsUrl, rawToken) : this.wsFromToken(token)
if (workspaceInfo?.creating === true && token.email !== systemAccountEmail) {
// No access to workspace for token.
return { error: new Error(`Workspace during creation phase ${token.email} ${token.workspace.name}`) }
}
if (workspaceInfo === undefined && token.extra?.admin !== 'true') {
// No access to workspace for token.
return { error: new Error(`No access to workspace for token ${token.email} ${token.workspace.name}`) }
} else if (workspaceInfo === undefined) {
workspaceInfo = this.wsFromToken(token)
}
if (
this.modelVersion !== '' &&
workspaceInfo.version !== undefined &&
this.modelVersion !== versionToString(workspaceInfo.version) &&
token.extra?.model !== 'upgrade' &&
token.extra?.mode !== 'backup'
) {
ctx.warn('model version mismatch', {
version: this.modelVersion,
workspaceVersion: versionToString(workspaceInfo.version)
})
// Version mismatch, return upgrading.
return { upgrade: true, upgradeInfo: workspaceInfo.upgrade }
}
let workspace = this.workspaces.get(wsString)
if (workspace?.closing !== undefined) {
await workspace?.closing
}
workspace = this.workspaces.get(wsString)
if (sessionId !== undefined && workspace?.sessions?.has(sessionId) === true) {
const helloResponse: HelloResponse = {
id: -1,
result: 'hello',
binary: false,
reconnect: false,
alreadyConnected: true
}
await ws.send(ctx, helloResponse, false, false)
return { error: new Error('Session already exists') }
}
const workspaceName = workspaceInfo.workspaceName ?? workspaceInfo.workspaceUrl ?? workspaceInfo.workspaceId
if (workspace === undefined) {
ctx.warn('open workspace', {
email: token.email,
workspace: workspaceInfo.workspaceId,
wsUrl: workspaceInfo.workspaceUrl,
...token.extra
})
workspace = this.createWorkspace(
ctx.parent ?? ctx,
pipelineFactory,
token,
workspaceInfo.workspaceUrl ?? workspaceInfo.workspaceId,
workspaceName
)
if (workspaceInfo?.creating === true && token.email !== systemAccountEmail) {
// No access to workspace for token.
return { error: new Error(`Workspace during creation phase ${token.email} ${token.workspace.name}`) }
}
if (workspaceInfo === undefined && token.extra?.admin !== 'true') {
// No access to workspace for token.
return { error: new Error(`No access to workspace for token ${token.email} ${token.workspace.name}`) }
} else if (workspaceInfo === undefined) {
workspaceInfo = this.wsFromToken(token)
}
}
if (
this.modelVersion !== '' &&
workspaceInfo.version !== undefined &&
this.modelVersion !== versionToString(workspaceInfo.version) &&
token.extra?.model !== 'upgrade' &&
token.extra?.mode !== 'backup'
) {
await ctx.info('model version mismatch', {
version: this.modelVersion,
workspaceVersion: versionToString(workspaceInfo.version)
})
// Version mismatch, return upgrading.
return { upgrade: true, upgradeInfo: workspaceInfo.upgrade }
}
let workspace = this.workspaces.get(wsString)
if (workspace?.closing !== undefined) {
await workspace?.closing
}
workspace = this.workspaces.get(wsString)
if (sessionId !== undefined && workspace?.sessions?.has(sessionId) === true) {
const helloResponse: HelloResponse = {
id: -1,
result: 'hello',
binary: false,
reconnect: false,
alreadyConnected: true
}
await ws.send(ctx, helloResponse, false, false)
return { error: new Error('Session already exists') }
}
const workspaceName = workspaceInfo.workspaceName ?? workspaceInfo.workspaceUrl ?? workspaceInfo.workspaceId
if (workspace === undefined) {
await ctx.info('open workspace', {
let pipeline: Pipeline
if (token.extra?.model === 'upgrade') {
if (workspace.upgrade) {
ctx.warn('reconnect workspace in upgrade', {
email: token.email,
workspace: workspaceInfo.workspaceId,
wsUrl: workspaceInfo.workspaceUrl,
...token.extra
wsUrl: workspaceInfo.workspaceUrl
})
workspace = this.createWorkspace(
baseCtx,
pipelineFactory,
pipeline = await ctx.with('💤 wait', { workspaceName }, async () => await (workspace as Workspace).pipeline)
} else {
ctx.warn('reconnect workspace in upgrade switch', {
email: token.email,
workspace: workspaceInfo.workspaceId,
wsUrl: workspaceInfo.workspaceUrl
})
// We need to wait in case previous upgeade connection is already closing.
pipeline = await this.switchToUpgradeSession(
token,
sessionId,
ctx.parent ?? ctx,
wsString,
workspace,
pipelineFactory,
ws,
workspaceInfo.workspaceUrl ?? workspaceInfo.workspaceId,
workspaceName
)
}
let pipeline: Pipeline
if (token.extra?.model === 'upgrade') {
if (workspace.upgrade) {
await ctx.info('reconnect workspace in upgrade', {
email: token.email,
workspace: workspaceInfo.workspaceId,
wsUrl: workspaceInfo.workspaceUrl
})
pipeline = await ctx.with('💤 wait', { workspaceName }, async () => await (workspace as Workspace).pipeline)
} else {
await ctx.info('reconnect workspace in upgrade switch', {
email: token.email,
workspace: workspaceInfo.workspaceId,
wsUrl: workspaceInfo.workspaceUrl
})
// We need to wait in case previous upgeade connection is already closing.
pipeline = await this.switchToUpgradeSession(
token,
sessionId,
ctx,
wsString,
workspace,
pipelineFactory,
ws,
workspaceInfo.workspaceUrl ?? workspaceInfo.workspaceId,
workspaceName
)
}
} else {
if (workspace.upgrade) {
await ctx.info('connect during upgrade', {
email: token.email,
workspace: workspace.workspaceId.name,
sessionUsers: Array.from(workspace.sessions.values()).map((it) => it.session.getUser()),
sessionData: Array.from(workspace.sessions.values()).map((it) => it.socket.data())
})
return { upgrade: true }
}
pipeline = await ctx.with('💤 wait', { workspaceName }, async () => await (workspace as Workspace).pipeline)
} else {
if (workspace.upgrade) {
ctx.warn('connect during upgrade', {
email: token.email,
workspace: workspace.workspaceId.name,
sessionUsers: Array.from(workspace.sessions.values()).map((it) => it.session.getUser()),
sessionData: Array.from(workspace.sessions.values()).map((it) => it.socket.data())
})
return { upgrade: true }
}
pipeline = await ctx.with('💤 wait', { workspaceName }, async () => await (workspace as Workspace).pipeline)
}
const session = this.createSession(token, pipeline)
const session = this.createSession(token, pipeline)
session.sessionId = sessionId !== undefined && (sessionId ?? '').trim().length > 0 ? sessionId : generateId()
session.sessionInstanceId = generateId()
this.sessions.set(ws.id, { session, socket: ws })
// We need to delete previous session with Id if found.
workspace.sessions.set(session.sessionId, { session, socket: ws })
session.sessionId = sessionId !== undefined && (sessionId ?? '').trim().length > 0 ? sessionId : generateId()
session.sessionInstanceId = generateId()
this.sessions.set(ws.id, { session, socket: ws })
// We need to delete previous session with Id if found.
workspace.sessions.set(session.sessionId, { session, socket: ws })
// We do not need to wait for set-status, just return session to client
void ctx.with('set-status', {}, (ctx) => this.trySetStatus(ctx, session, true))
// We do not need to wait for set-status, just return session to client
void ctx.with('set-status', {}, (ctx) => this.trySetStatus(ctx, session, true))
if (this.timeMinutes > 0) {
void ws.send(
ctx,
{ result: this.createMaintenanceWarning() },
session.binaryResponseMode,
session.useCompression
)
}
return { session, context: workspace.context, workspaceId: wsString }
})
if (this.timeMinutes > 0) {
void ws.send(ctx, { result: this.createMaintenanceWarning() }, session.binaryResponseMode, session.useCompression)
}
return { session, context: workspace.context, workspaceId: wsString }
}
private wsFromToken (token: Token): WorkspaceLoginInfo {
@ -402,7 +398,7 @@ class TSessionManager implements SessionManager {
workspaceName: string
): Promise<Pipeline> {
if (LOGGING_ENABLED) {
await ctx.info('reloading workspace', { workspaceName, token: JSON.stringify(token) })
ctx.info('reloading workspace', { workspaceName, token: JSON.stringify(token) })
}
// Mark as upgrade, to prevent any new clients to connect during close
@ -450,7 +446,7 @@ class TSessionManager implements SessionManager {
)
} catch (err: any) {
Analytics.handleError(err)
void ctx.error('error during send', { error: err })
ctx.error('error during send', { error: err })
}
}
}
@ -539,13 +535,13 @@ class TSessionManager implements SessionManager {
async close (ws: ConnectionSocket, workspaceId: WorkspaceId): Promise<void> {
const wsid = toWorkspaceString(workspaceId)
const workspace = this.workspaces.get(wsid)
if (workspace === undefined) {
return
}
const sessionRef = this.sessions.get(ws.id)
if (sessionRef !== undefined) {
this.sessions.delete(ws.id)
workspace.sessions.delete(sessionRef.session.sessionId)
if (workspace !== undefined) {
workspace.sessions.delete(sessionRef.session.sessionId)
}
this.reconnectIds.add(sessionRef.session.sessionId)
setTimeout(() => {
@ -557,9 +553,11 @@ class TSessionManager implements SessionManager {
// Ignore if closed
}
const user = sessionRef.session.getUser()
const another = Array.from(workspace.sessions.values()).findIndex((p) => p.session.getUser() === user)
if (another === -1 && !workspace.upgrade) {
await this.trySetStatus(workspace.context, sessionRef.session, false)
if (workspace !== undefined) {
const another = Array.from(workspace.sessions.values()).findIndex((p) => p.session.getUser() === user)
if (another === -1 && !workspace.upgrade) {
await this.trySetStatus(workspace.context, sessionRef.session, false)
}
}
}
}
@ -582,7 +580,7 @@ class TSessionManager implements SessionManager {
ignoreSocket?: ConnectionSocket
): Promise<void> {
if (LOGGING_ENABLED) {
await this.ctx.info('closing workspace', {
this.ctx.warn('closing workspace', {
workspace: workspace.id,
wsName: workspace.workspaceName,
code,
@ -604,7 +602,7 @@ class TSessionManager implements SessionManager {
}
if (LOGGING_ENABLED) {
await this.ctx.info('Clients disconnected. Closing Workspace...', {
this.ctx.warn('Clients disconnected. Closing Workspace...', {
wsId,
workspace: workspace.id,
wsName: workspace.workspaceName
@ -621,14 +619,14 @@ class TSessionManager implements SessionManager {
})
} catch (err: any) {
Analytics.handleError(err)
await this.ctx.error('close-pipeline-error', { error: err })
this.ctx.error('close-pipeline-error', { error: err })
}
}
await this.ctx.with('closing', {}, async () => {
await Promise.race([closePipeline(), timeoutPromise(120000)])
})
if (LOGGING_ENABLED) {
await this.ctx.info('Workspace closed...', { workspace: workspace.id, wsId, wsName: workspace.workspaceName })
this.ctx.warn('Workspace closed...', { workspace: workspace.id, wsId, wsName: workspace.workspaceName })
}
}
@ -663,7 +661,7 @@ class TSessionManager implements SessionManager {
const logParams = { wsid, workspace: workspace.id, wsName: workspaceId.name }
if (workspace.sessions.size === 0) {
if (LOGGING_ENABLED) {
await this.ctx.info('no sessions for workspace', logParams)
this.ctx.warn('no sessions for workspace', logParams)
}
try {
if (workspace.sessions.size === 0) {
@ -676,19 +674,19 @@ class TSessionManager implements SessionManager {
}
workspace.context.end()
if (LOGGING_ENABLED) {
await this.ctx.info('Closed workspace', logParams)
this.ctx.warn('Closed workspace', logParams)
}
}
} catch (err: any) {
Analytics.handleError(err)
this.workspaces.delete(wsid)
if (LOGGING_ENABLED) {
await this.ctx.error('failed', { ...logParams, error: err })
this.ctx.error('failed', { ...logParams, error: err })
}
}
} else {
if (LOGGING_ENABLED) {
await this.ctx.info('few sessions for workspace, close skipped', {
this.ctx.info('few sessions for workspace, close skipped', {
...logParams,
sessions: workspace.sessions.size
})
@ -699,7 +697,7 @@ class TSessionManager implements SessionManager {
broadcast (from: Session | null, workspaceId: WorkspaceId, resp: Response<any>, target?: string[]): void {
const workspace = this.workspaces.get(toWorkspaceString(workspaceId))
if (workspace === undefined) {
void this.ctx.error('internal: cannot find sessions', {
this.ctx.error('internal: cannot find sessions', {
workspaceId: workspaceId.name,
target,
userId: from?.getUser() ?? '$unknown'
@ -710,7 +708,7 @@ class TSessionManager implements SessionManager {
return
}
if (LOGGING_ENABLED) {
void this.ctx.info('server broadcasting to clients...', {
this.ctx.info('server broadcasting to clients...', {
workspace: workspaceId.name,
count: workspace.sessions.size
})
@ -792,7 +790,7 @@ class TSessionManager implements SessionManager {
service.useBroadcast = hello.broadcast ?? false
if (LOGGING_ENABLED) {
await ctx.info('hello happen', {
ctx.info('hello happen', {
workspace,
user: service.getUser(),
binary: service.binaryResponseMode,
@ -834,7 +832,12 @@ class TSessionManager implements SessionManager {
? await f.apply(service, [service.measureCtx?.ctx, ...params])
: await ctx.with('🧨 process', {}, async (callTx) => f.apply(service, [callTx, ...params]))
const resp: Response<any> = { id: request.id, result }
const resp: Response<any> = {
id: request.id,
result,
time: Date.now() - st,
queue: service.requests.size
}
await handleSend(
ctx,
@ -847,7 +850,7 @@ class TSessionManager implements SessionManager {
} catch (err: any) {
Analytics.handleError(err)
if (LOGGING_ENABLED) {
await this.ctx.error('error handle request', { error: err, request })
this.ctx.error('error handle request', { error: err, request })
}
const resp: Response<any> = {
id: request.id,
@ -892,7 +895,7 @@ class TSessionManager implements SessionManager {
} catch (err: any) {
Analytics.handleError(err)
if (LOGGING_ENABLED) {
await ctx.error('error handle measure', { error: err, request })
ctx.error('error handle measure', { error: err, request })
}
const resp: Response<any> = {
id: request.id,

View File

@ -49,7 +49,7 @@ export function startHttpServer (
accountsUrl: string
): () => Promise<void> {
if (LOGGING_ENABLED) {
void ctx.info('starting server on', { port, productId, enableCompression, accountsUrl })
ctx.info('starting server on', { port, productId, enableCompression, accountsUrl })
}
const app = express()
@ -157,15 +157,18 @@ export function startHttpServer (
? {
zlibDeflateOptions: {
// See zlib defaults.
chunkSize: 16 * 1024,
level: 6
chunkSize: 10 * 1024,
memLevel: 7,
level: 3
},
zlibInflateOptions: {
chunkSize: 16 * 1024,
level: 6
chunkSize: 10 * 1024,
level: 3
},
threshold: 1024, // Size (in bytes) below which messages, should not be compressed if context takeover is disabled.
concurrencyLimit: 100
// Below options specified as default values.
concurrencyLimit: 20, // Limits zlib concurrency for perf.
threshold: 1024 // Size (in bytes) below which messages
// should not be compressed if context takeover is disabled.
}
: false,
skipUTF8Validation: true
@ -231,7 +234,7 @@ export function startHttpServer (
)
if ('upgrade' in session || 'error' in session) {
if ('error' in session) {
void ctx.error('error', { error: session.error?.message, stack: session.error?.stack })
ctx.error('error', { error: session.error?.message, stack: session.error?.stack })
}
await cs.send(ctx, { id: -1, result: { state: 'upgrading', stats: (session as any).upgradeInfo } }, false, false)
cs.close()
@ -252,7 +255,7 @@ export function startHttpServer (
} catch (err: any) {
Analytics.handleError(err)
if (LOGGING_ENABLED) {
void ctx.error('message error', err)
ctx.error('message error', err)
}
}
})
@ -282,7 +285,7 @@ export function startHttpServer (
if (payload.workspace.productId !== productId) {
if (LOGGING_ENABLED) {
void ctx.error('invalid product', { required: payload.workspace.productId, productId })
ctx.error('invalid product', { required: payload.workspace.productId, productId })
}
throw new Error('Invalid workspace product')
}
@ -291,7 +294,7 @@ export function startHttpServer (
} catch (err: any) {
Analytics.handleError(err)
if (LOGGING_ENABLED) {
void ctx.error('invalid token', err)
ctx.error('invalid token', err)
}
wss.handleUpgrade(request, socket, head, (ws) => {
const resp: Response<any> = {
@ -311,7 +314,7 @@ export function startHttpServer (
})
httpServer.on('error', (err) => {
if (LOGGING_ENABLED) {
void ctx.error('server error', err)
ctx.error('server error', err)
}
})