UBERF-6643: Fix few connection related exceptions (#5412)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2024-04-20 01:48:46 +07:00 committed by GitHub
parent 857d56bb0b
commit c1bb39898e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 76 additions and 56 deletions

View File

@ -95,12 +95,9 @@ class Connection implements ClientConnection {
private schedulePing (): void {
clearInterval(this.interval)
this.pingResponse = Date.now()
this.interval = setInterval(() => {
if (this.upgrading) {
// no need to check while upgrade waiting
return
}
if (this.pingResponse !== 0 && Date.now() - this.pingResponse > hangTimeout) {
if (!this.upgrading && this.pingResponse !== 0 && Date.now() - this.pingResponse > hangTimeout) {
// No ping response from server.
const s = this.websocket
@ -125,7 +122,9 @@ class Connection implements ClientConnection {
if (!this.closed) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
void this.sendRequest({ method: 'ping', params: [] })
void this.sendRequest({ method: 'ping', params: [] }).then((result) => {
this.pingResponse = Date.now()
})
} else {
clearInterval(this.interval)
}
@ -240,47 +239,48 @@ class Connection implements ClientConnection {
}, dialTimeout)
websocket.onmessage = (event: MessageEvent) => {
this.pingResponse = Date.now()
const resp = readResponse<any>(event.data, binaryResponse)
if (resp.id === -1 && resp.result.state === 'upgrading') {
void this.onConnect?.(ClientConnectEvent.Maintenance, resp.result.stats)
this.upgrading = true
return
}
if (resp.id === -1 && resp.result === 'hello') {
if (this.upgrading) {
// We need to call upgrade since connection is upgraded
this.onUpgrade?.()
}
console.log('connection established', this.workspace, this.email)
// Ok we connected, let's schedule ping
this.schedulePing()
this.upgrading = false
if ((resp as HelloResponse).alreadyConnected === true) {
this.sessionId = generateId()
if (typeof sessionStorage !== 'undefined') {
sessionStorage.setItem('session.id.' + this.url, this.sessionId)
}
reject(new Error('alreadyConnected'))
}
if ((resp as HelloResponse).binary) {
binaryResponse = true
}
if (resp.error !== undefined) {
reject(resp.error)
if (resp.id === -1) {
if (resp.result?.state === 'upgrading') {
void this.onConnect?.(ClientConnectEvent.Maintenance, resp.result.stats)
this.upgrading = true
return
}
for (const [, v] of this.requests.entries()) {
v.reconnect?.()
}
resolve(websocket)
if (resp.result === 'hello') {
if (this.upgrading) {
// We need to call upgrade since connection is upgraded
this.onUpgrade?.()
}
void this.onConnect?.(
(resp as HelloResponse).reconnect === true ? ClientConnectEvent.Reconnected : ClientConnectEvent.Connected
)
console.log('connection established', this.workspace, this.email)
this.upgrading = false
if ((resp as HelloResponse).alreadyConnected === true) {
this.sessionId = generateId()
if (typeof sessionStorage !== 'undefined') {
sessionStorage.setItem('session.id.' + this.url, this.sessionId)
}
reject(new Error('alreadyConnected'))
}
if ((resp as HelloResponse).binary) {
binaryResponse = true
}
if (resp.error !== undefined) {
reject(resp.error)
return
}
for (const [, v] of this.requests.entries()) {
v.reconnect?.()
}
resolve(websocket)
void this.onConnect?.(
(resp as HelloResponse).reconnect === true ? ClientConnectEvent.Reconnected : ClientConnectEvent.Connected
)
return
} else {
Analytics.handleError(new Error(`unexpected response: ${JSON.stringify(resp)}`))
}
return
}
if (resp.result === 'ping') {
@ -399,6 +399,8 @@ class Connection implements ClientConnection {
broadcast: true
}
websocket.send(serialize(helloRequest, false))
// Ok we connected, let's schedule ping
this.schedulePing()
}
websocket.onerror = (event: any) => {
console.error('client websocket error:', socketId, event, this.workspace, this.email)

View File

@ -151,7 +151,7 @@ export async function connect (title: string): Promise<Client | undefined> {
(event: ClientConnectEvent, data: any) => {
console.log('WorkbenchClient: onConnect', event)
if (event === ClientConnectEvent.Maintenance) {
if (data !== undefined && data.total !== 0) {
if (data != null && data.total !== 0) {
versionError.set(`Maintenance ${Math.floor((100 / data.total) * (data.total - data.toProcess))}%`)
} else {
versionError.set('Maintenance...')

View File

@ -965,21 +965,14 @@ export async function upgradeWorkspace (
}
const versionStr = versionToString(version)
if (ws?.version !== undefined && !forceUpdate && versionStr === versionToString(ws.version)) {
return versionStr
}
await ctx.info('upgrading', {
force: forceUpdate,
currentVersion: ws?.version !== undefined ? versionToString(ws.version) : '',
toVersion: versionStr
})
if (ws?.version !== undefined && !forceUpdate && versionStr === versionToString(ws.version)) {
return versionStr
}
await db.collection(WORKSPACE_COLLECTION).updateOne(
{ _id: ws._id },
{
$set: { version }
}
)
await (
await upgradeModel(
ctx,
@ -992,6 +985,13 @@ export async function upgradeWorkspace (
async (value) => {}
)
).close()
await db.collection(WORKSPACE_COLLECTION).updateOne(
{ _id: ws._id },
{
$set: { version }
}
)
return versionStr
}

View File

@ -71,11 +71,18 @@ class BackupWorker {
async backup (ctx: MeasureContext): Promise<void> {
const workspaces = await getWorkspaces(this.config.AccountsURL, this.config.Token)
workspaces.sort((a, b) => b.lastVisit - a.lastVisit)
let index = 0
for (const ws of workspaces) {
if (this.canceled) {
return
}
await ctx.info('\n\nBACKUP WORKSPACE ', { workspace: ws.workspace, productId: ws.productId })
index++
await ctx.info('\n\nBACKUP WORKSPACE ', {
workspace: ws.workspace,
productId: ws.productId,
index,
total: workspaces.length
})
try {
const storage = await createStorageBackupStorage(
ctx,

View File

@ -40,6 +40,7 @@ export class BackupClientSession extends ClientSession implements BackupSession
chunkInfo = new Map<number, ChunkInfo>()
async loadChunk (ctx: MeasureContext, domain: Domain, idx?: number): Promise<DocChunk> {
this.lastRequest = Date.now()
return await ctx.with('load-chunk', { domain }, async (ctx) => {
idx = idx ?? this.idIndex++
let chunk: ChunkInfo | undefined = this.chunkInfo.get(idx)
@ -79,6 +80,7 @@ export class BackupClientSession extends ClientSession implements BackupSession
}
async closeChunk (ctx: MeasureContext, idx: number): Promise<void> {
this.lastRequest = Date.now()
await ctx.with('close-chunk', {}, async () => {
const chunk = this.chunkInfo.get(idx)
this.chunkInfo.delete(idx)
@ -89,14 +91,17 @@ export class BackupClientSession extends ClientSession implements BackupSession
}
async loadDocs (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
this.lastRequest = Date.now()
return await this._pipeline.storage.load(ctx, domain, docs)
}
async upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise<void> {
this.lastRequest = Date.now()
await this._pipeline.storage.upload(ctx, domain, docs)
}
async clean (ctx: MeasureContext, domain: Domain, docs: Ref<Doc>[]): Promise<void> {
this.lastRequest = Date.now()
await this._pipeline.storage.clean(ctx, domain, docs)
}
}

View File

@ -155,7 +155,13 @@ class TSessionManager implements SessionManager {
}
const now = Date.now()
const diff = now - s[1].session.lastRequest
if (diff > 60000 && this.ticks % 10 === 0) {
let timeout = 60000
if (s[1].session.getUser() === systemAccountEmail) {
timeout = timeout * 10
}
if (diff > timeout && this.ticks % 10 === 0) {
void this.ctx.error('session hang, closing...', { sessionId: h[0], user: s[1].session.getUser() })
void this.close(s[1].socket, h[1].workspaceId, 1001, 'CLIENT_HANGOUT')
continue