UBERF-7790: Fix connection timeout issue (#6301)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-08-09 17:39:38 +07:00 committed by GitHub
parent 87dcee12ab
commit a1fe33b2be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 53 additions and 38 deletions

View File

@ -91,6 +91,8 @@ class Connection implements ClientConnection {
private pingResponse: number = Date.now()
private helloRecieved: boolean = false
rpcHandler = new RPCHandler()
constructor (
@ -173,7 +175,7 @@ class Connection implements ClientConnection {
}
isConnected (): boolean {
return this.websocket != null && this.websocket.readyState === ClientSocketReadyState.OPEN
return this.websocket != null && this.websocket.readyState === ClientSocketReadyState.OPEN && this.helloRecieved
}
delay = 0
@ -240,6 +242,10 @@ class Connection implements ClientConnection {
return
}
if (resp.result === 'hello') {
// We need to clear dial timer, since we recieve hello response.
clearTimeout(this.dialTimer)
this.dialTimer = null
this.helloRecieved = true
if (this.upgrading) {
// We need to call upgrade since connection is upgraded
this.opt?.onUpgrade?.()
@ -394,12 +400,15 @@ class Connection implements ClientConnection {
this.websocket = wsocket
const opened = false
this.dialTimer = setTimeout(() => {
if (!opened && !this.closed) {
void this.opt?.onDialTimeout?.()
this.scheduleOpen(true)
}
}, dialTimeout)
if (this.dialTimer != null) {
this.dialTimer = setTimeout(() => {
this.dialTimer = null
if (!opened && !this.closed) {
void this.opt?.onDialTimeout?.()
this.scheduleOpen(true)
}
}, dialTimeout)
}
wsocket.onmessage = (event: MessageEvent) => {
if (this.closed) {
@ -419,10 +428,8 @@ class Connection implements ClientConnection {
}
}
wsocket.onclose = (ev) => {
clearTimeout(this.dialTimer)
if (this.websocket !== wsocket) {
wsocket.close()
clearTimeout(this.dialTimer)
return
}
// console.log('client websocket closed', socketId, ev?.reason)
@ -435,7 +442,7 @@ class Connection implements ClientConnection {
}
const useBinary = getMetadata(client.metadata.UseBinaryProtocol) ?? true
const useCompression = getMetadata(client.metadata.UseProtocolCompression) ?? false
clearTimeout(this.dialTimer)
this.helloRecieved = false
const helloRequest: HelloRequest = {
method: 'hello',
params: [],
@ -447,7 +454,6 @@ class Connection implements ClientConnection {
}
wsocket.onerror = (event: any) => {
clearTimeout(this.dialTimer)
if (this.websocket !== wsocket) {
return
}

View File

@ -287,33 +287,19 @@ class TSessionManager implements SessionManager {
accountsUrl !== '' ? await this.getWorkspaceInfo(ctx, accountsUrl, rawToken) : this.wsFromToken(token)
} catch (err: any) {
this.updateConnectErrorInfo(token)
// No connection to account service, retry from client.
await new Promise<void>((resolve) => {
setTimeout(resolve, 1000)
})
return { error: err }
}
if (workspaceInfo === undefined) {
this.updateConnectErrorInfo(token)
// No connection to account service, retry from client.
await new Promise<void>((resolve) => {
setTimeout(resolve, 1000)
})
return { upgrade: true }
}
if (workspaceInfo?.creating === true && token.email !== systemAccountEmail) {
await new Promise<void>((resolve) => {
setTimeout(resolve, 1000)
})
// No access to workspace for token.
return { error: new Error(`Workspace during creation phase ${token.email} ${token.workspace.name}`) }
}
if (workspaceInfo === undefined && token.extra?.admin !== 'true') {
await new Promise<void>((resolve) => {
setTimeout(resolve, 5000)
})
this.updateConnectErrorInfo(token)
// No access to workspace for token.
return { error: new Error(`No access to workspace for token ${token.email} ${token.workspace.name}`) }

View File

@ -15,7 +15,7 @@
import { Analytics } from '@hcengineering/analytics'
import { generateId, toWorkspaceString, type MeasureContext } from '@hcengineering/core'
import { UNAUTHORIZED } from '@hcengineering/platform'
import { UNAUTHORIZED, unknownStatus } from '@hcengineering/platform'
import { RPCHandler, type Response } from '@hcengineering/rpc'
import { decodeToken, type Token } from '@hcengineering/server-token'
import cors from 'cors'
@ -277,13 +277,22 @@ export function startHttpServer (
if (webSocketData.session instanceof Promise) {
void webSocketData.session.then((s) => {
if ('error' in s) {
cs.close()
void cs
.send(ctx, { id: -1, error: unknownStatus(s.error.message ?? 'Unknown error') }, false, false)
.then(() => {
// No connection to account service, retry from client.
setTimeout(() => {
cs.close()
}, 1000)
})
}
if ('upgrade' in s) {
void cs
.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false)
.then(() => {
cs.close()
setTimeout(() => {
cs.close()
}, 5000)
})
}
})

View File

@ -29,6 +29,7 @@ import uWebSockets, { DISABLED, SHARED_COMPRESSOR, type HttpResponse, type WebSo
import { Readable } from 'stream'
import { getFile, getFileRange, type BlobResponse } from './blobs'
import { doSessionOp, processRequest, type WebsocketData } from './utils'
import { unknownStatus } from '@hcengineering/platform'
const rpcHandler = new RPCHandler()
@ -139,13 +140,22 @@ export function startUWebsocketServer (
if (data.session instanceof Promise) {
void data.session.then((s) => {
if ('error' in s) {
ctx.error('error', { error: s.error?.message, stack: s.error?.stack })
void cs
.send(ctx, { id: -1, error: unknownStatus(s.error.message ?? 'Unknown error') }, false, false)
.then(() => {
// No connection to account service, retry from client.
setTimeout(() => {
cs.close()
}, 1000)
})
}
if ('upgrade' in s) {
void cs
.send(ctx, { id: -1, result: { state: 'upgrading', stats: (s as any).upgradeInfo } }, false, false)
.then(() => {
cs.close()
setTimeout(() => {
cs.close()
}, 5000)
})
}
})

View File

@ -739,6 +739,9 @@ export class PlatformWorker {
}
}
}
this.ctx.info('************************* Check workspaces done ************************* ', {
workspaces: this.clients.size
})
return errors > 0
}

View File

@ -35,13 +35,6 @@ import core, {
type Blob,
type MigrationState
} from '@hcengineering/core'
import { LiveQuery } from '@hcengineering/query'
import { StorageAdapter } from '@hcengineering/server-core'
import { getPublicLinkUrl } from '@hcengineering/server-guest-resources'
import task, { ProjectType, TaskType } from '@hcengineering/task'
import { MarkupNode, jsonToMarkup, isMarkdownsEquals } from '@hcengineering/text'
import tracker from '@hcengineering/tracker'
import { User } from '@octokit/webhooks-types'
import github, {
DocSyncInfo,
GithubAuthentication,
@ -53,6 +46,13 @@ import github, {
GithubUserInfo,
githubId
} from '@hcengineering/github'
import { LiveQuery } from '@hcengineering/query'
import { StorageAdapter } from '@hcengineering/server-core'
import { getPublicLinkUrl } from '@hcengineering/server-guest-resources'
import task, { ProjectType, TaskType } from '@hcengineering/task'
import { MarkupNode, isMarkdownsEquals, jsonToMarkup } from '@hcengineering/text'
import tracker from '@hcengineering/tracker'
import { User } from '@octokit/webhooks-types'
import { App, Octokit } from 'octokit'
import { createPlatformClient } from './client'
import { createCollaboratorClient } from './collaborator'
@ -1487,7 +1487,7 @@ export class GithubWorker implements IntegrationManager {
ctx.info('Connecting to', { workspace: workspace.workspaceUrl, workspaceId: workspace.workspaceName })
let client: Client | undefined
try {
client = await createPlatformClient(workspace.name, workspace.productId, 10000, (event: ClientConnectEvent) => {
client = await createPlatformClient(workspace.name, workspace.productId, 30000, (event: ClientConnectEvent) => {
reconnect(workspace.name, event)
})
@ -1506,6 +1506,7 @@ export class GithubWorker implements IntegrationManager {
return worker
}
} catch (err: any) {
ctx.error('timeout during to connect', { workspace, error: err })
await client?.close()
}
}