TSK-825: Client proper reconnection (#2797)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2023-03-24 11:53:12 +07:00 committed by GitHub
parent 7af7fd5cb4
commit 993b620fe2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 482 additions and 195 deletions

View File

@ -25,6 +25,8 @@ import { SortingOrder } from './storage'
import { Tx, TxCreateDoc, TxProcessor, TxUpdateDoc } from './tx'
import { toFindResult } from './utils'
const transactionThreshold = 3000
/**
* @public
*/
@ -50,6 +52,7 @@ export interface Client extends Storage {
*/
export interface ClientConnection extends Storage, BackupClient {
close: () => Promise<void>
onConnect?: () => Promise<void>
}
class ClientImpl implements Client, BackupClient {
@ -151,11 +154,16 @@ export async function createClient (
allowedPlugins?: Plugin[]
): Promise<Client> {
let client: ClientImpl | null = null
// Temporal buffer, while we apply model
let txBuffer: Tx[] | undefined = []
const loadedTxIds = new Set<Ref<Tx>>()
const hierarchy = new Hierarchy()
const model = new ModelDb(hierarchy)
let lastTx: number
function txHandler (tx: Tx): void {
if (client === null) {
txBuffer?.push(tx)
@ -163,63 +171,83 @@ export async function createClient (
// eslint-disable-next-line @typescript-eslint/no-floating-promises
client.updateFromRemote(tx)
}
lastTx = tx.modifiedOn
}
const configs = new Map<Ref<PluginConfiguration>, PluginConfiguration>()
const conn = await connect(txHandler)
await loadModel(conn, loadedTxIds, allowedPlugins, configs, hierarchy, model)
txBuffer = txBuffer.filter((tx) => !loadedTxIds.has(tx._id))
client = new ClientImpl(hierarchy, model, conn)
for (const tx of txBuffer) {
txHandler(tx)
loadedTxIds.add(tx._id)
}
txBuffer = undefined
const oldOnConnect: (() => void) | undefined = conn.onConnect
conn.onConnect = async () => {
// Find all new transactions and apply
await loadModel(conn, loadedTxIds, allowedPlugins, configs, hierarchy, model)
// We need to look for last 1000 transactions and if it is more since lastTx one we receive, we need to perform full refresh.
const atxes = await conn.findAll(
core.class.Tx,
{ modifiedOn: { $gt: lastTx } },
{ sort: { _id: SortingOrder.Ascending }, limit: transactionThreshold }
)
if (atxes.total < transactionThreshold) {
console.log('applying input transactions', atxes.length)
for (const tx of atxes) {
txHandler(tx)
}
} else {
// We need to trigger full refresh on queries, etc.
await oldOnConnect?.()
}
}
return client
}
async function loadModel (
conn: ClientConnection,
processedTx: Set<Ref<Tx>>,
allowedPlugins: Plugin[] | undefined,
configs: Map<Ref<PluginConfiguration>, PluginConfiguration>,
hierarchy: Hierarchy,
model: ModelDb
): Promise<void> {
const t = Date.now()
const atxes = await conn.findAll(
core.class.Tx,
{ objectSpace: core.space.Model },
{ objectSpace: core.space.Model, _id: { $nin: Array.from(processedTx.values()) } },
{ sort: { _id: SortingOrder.Ascending } }
)
console.log('find model', atxes.length, Date.now() - t)
let systemTx: Tx[] = []
const userTx: Tx[] = []
console.log('find' + (processedTx.size === 0 ? 'full model' : 'model diff'), atxes.length, Date.now() - t)
atxes.forEach((tx) => (tx.modifiedBy === core.account.System ? systemTx : userTx).push(tx))
if (allowedPlugins !== undefined) {
// Filter system transactions
const configs = new Map<Ref<PluginConfiguration>, PluginConfiguration>()
for (const t of systemTx) {
if (t._class === core.class.TxCreateDoc) {
const ct = t as TxCreateDoc<Doc>
if (ct.objectClass === core.class.PluginConfiguration) {
configs.set(ct.objectId as Ref<PluginConfiguration>, TxProcessor.createDoc2Doc(ct) as PluginConfiguration)
}
} else if (t._class === core.class.TxUpdateDoc) {
const ut = t as TxUpdateDoc<Doc>
if (ut.objectClass === core.class.PluginConfiguration) {
const c = configs.get(ut.objectId as Ref<PluginConfiguration>)
if (c !== undefined) {
TxProcessor.updateDoc2Doc(c, ut)
}
}
}
}
fillConfiguration(systemTx, configs)
const excludedPlugins = Array.from(configs.values()).filter((it) => !allowedPlugins.includes(it.pluginId as Plugin))
for (const a of excludedPlugins) {
for (const c of configs.values()) {
if (a.pluginId === c.pluginId) {
const excluded = new Set<Ref<Tx>>()
for (const id of c.transactions) {
excluded.add(id as Ref<Tx>)
}
const exclude = systemTx.filter((t) => excluded.has(t._id))
console.log('exclude plugin', c.pluginId, exclude.length)
systemTx = systemTx.filter((t) => !excluded.has(t._id))
}
}
}
systemTx = pluginFilterTx(excludedPlugins, configs, systemTx)
}
const txes = systemTx.concat(userTx)
const txMap = new Map<Ref<Tx>, Ref<Tx>>()
for (const tx of txes) txMap.set(tx._id, tx._id)
for (const tx of txes) {
processedTx.add(tx._id)
}
for (const tx of txes) {
try {
hierarchy.tx(tx)
@ -234,13 +262,44 @@ export async function createClient (
console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err)
}
}
txBuffer = txBuffer.filter((tx) => txMap.get(tx._id) === undefined)
client = new ClientImpl(hierarchy, model, conn)
for (const tx of txBuffer) txHandler(tx)
txBuffer = undefined
return client
}
function fillConfiguration (systemTx: Tx[], configs: Map<Ref<PluginConfiguration>, PluginConfiguration>): void {
for (const t of systemTx) {
if (t._class === core.class.TxCreateDoc) {
const ct = t as TxCreateDoc<Doc>
if (ct.objectClass === core.class.PluginConfiguration) {
configs.set(ct.objectId as Ref<PluginConfiguration>, TxProcessor.createDoc2Doc(ct) as PluginConfiguration)
}
} else if (t._class === core.class.TxUpdateDoc) {
const ut = t as TxUpdateDoc<Doc>
if (ut.objectClass === core.class.PluginConfiguration) {
const c = configs.get(ut.objectId as Ref<PluginConfiguration>)
if (c !== undefined) {
TxProcessor.updateDoc2Doc(c, ut)
}
}
}
}
}
function pluginFilterTx (
excludedPlugins: PluginConfiguration[],
configs: Map<Ref<PluginConfiguration>, PluginConfiguration>,
systemTx: Tx[]
): Tx[] {
for (const a of excludedPlugins) {
for (const c of configs.values()) {
if (a.pluginId === c.pluginId) {
const excluded = new Set<Ref<Tx>>()
for (const id of c.transactions) {
excluded.add(id as Ref<Tx>)
}
const exclude = systemTx.filter((t) => excluded.has(t._id))
console.log('exclude plugin', c.pluginId, exclude.length)
systemTx = systemTx.filter((t) => !excluded.has(t._id))
}
}
}
return systemTx
}

View File

@ -88,6 +88,7 @@ export async function monitor<T> (status: Status, promise: Promise<T>): Promise<
return result
} catch (err) {
void setPlatformStatus(unknownError(err)) // eslint-disable-line no-void
console.error(err)
throw err
}
}

View File

@ -113,6 +113,15 @@ export function setClient (_client: Client): void {
}
}
/**
* @public
*/
export function refreshClient (): void {
if (liveQuery !== undefined) {
void liveQuery.refreshConnect()
}
}
/**
* @public
*/

View File

@ -85,6 +85,15 @@ export class LiveQuery extends TxProcessor implements Client {
return this.client.getModel()
}
// Perform refresh of content since connection established.
async refreshConnect (): Promise<void> {
for (const q of [...this.queue]) {
if (!(await this.removeFromQueue(q))) {
await this.refresh(q)
}
}
}
private match (q: Query, doc: Doc): boolean {
if (!this.getHierarchy().isDerived(doc._class, q._class)) {
// Check if it is not a mixin and not match class

View File

@ -22,7 +22,7 @@ Package allow to create a client to interact with running platform.
## Node JS
For NodeJS enviornment it is required to configure ClientSocketFactory using 'ws' package.
For NodeJS environment it is required to configure ClientSocketFactory using 'ws' package.
```ts
// We need to override default WebSocket factory with 'ws' one.

View File

@ -27,15 +27,29 @@ import core, {
generateId,
Ref,
Tx,
TxApplyIf,
TxHandler,
TxResult
} from '@hcengineering/core'
import { getMetadata, PlatformError, readResponse, ReqId, serialize, UNAUTHORIZED } from '@hcengineering/platform'
import {
getMetadata,
PlatformError,
readResponse,
ReqId,
serialize,
UNAUTHORIZED,
unknownError
} from '@hcengineering/platform'
class DeferredPromise {
const SECOND = 1000
const pingTimeout = 10 * SECOND
const dialTimeout = 20 * SECOND
class RequestPromise {
readonly promise: Promise<any>
resolve!: (value?: any) => void
reject!: (reason?: any) => void
reconnect?: () => void
constructor () {
this.promise = new Promise((resolve, reject) => {
this.resolve = resolve
@ -46,7 +60,7 @@ class DeferredPromise {
class Connection implements ClientConnection {
private websocket: ClientSocket | Promise<ClientSocket> | null = null
private readonly requests = new Map<ReqId, DeferredPromise>()
private readonly requests = new Map<ReqId, RequestPromise>()
private lastId = 0
private readonly interval: number
private readonly sessionId = generateId() as string
@ -55,22 +69,25 @@ class Connection implements ClientConnection {
private readonly url: string,
private readonly handler: TxHandler,
private readonly onUpgrade?: () => void,
private readonly onUnauthorized?: () => void
private readonly onUnauthorized?: () => void,
readonly onConnect?: () => Promise<void>
) {
console.log('connection created')
this.interval = setInterval(() => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
this.sendRequest('ping')
}, 10000)
this.sendRequest({ method: 'ping', params: [] })
}, pingTimeout)
}
async close (): Promise<void> {
clearInterval(this.interval)
if (this.websocket !== null) {
if (this.websocket instanceof Promise) {
this.websocket = await this.websocket
await this.websocket.then((ws) => ws.close())
} else {
this.websocket.close(1000)
}
this.websocket.close()
this.websocket = null
}
}
@ -83,7 +100,7 @@ class Connection implements ClientConnection {
return conn
} catch (err: any) {
console.log('failed to connect', err)
if (err.code === UNAUTHORIZED.code) {
if (err?.code === UNAUTHORIZED.code) {
this.onUnauthorized?.()
throw err
}
@ -94,7 +111,7 @@ class Connection implements ClientConnection {
if (this.delay !== 15) {
this.delay++
}
}, this.delay * 1000)
}, this.delay * SECOND)
})
}
}
@ -109,7 +126,16 @@ class Connection implements ClientConnection {
getMetadata(client.metadata.ClientSocketFactory) ?? ((url: string) => new WebSocket(url) as ClientSocket)
const websocket = clientSocketFactory(this.url + `?sessionId=${this.sessionId}`)
const opened = false
const socketId = this.sockets++
const dialTimer = setTimeout(() => {
if (!opened) {
websocket.close()
reject(new PlatformError(unknownError('timeout')))
}
}, dialTimeout)
websocket.onmessage = (event: MessageEvent) => {
const resp = readResponse(event.data)
if (resp.id === -1 && resp.result === 'hello') {
@ -117,7 +143,12 @@ class Connection implements ClientConnection {
reject(resp.error)
return
}
for (const [, v] of this.requests.entries()) {
v.reconnect?.()
}
resolve(websocket)
void this.onConnect?.()
return
}
if (resp.id !== undefined) {
@ -144,13 +175,14 @@ class Connection implements ClientConnection {
})
)
this.onUpgrade?.()
return
}
this.handler(tx)
}
}
websocket.onclose = () => {
console.log('client websocket closed', socketId)
// clearInterval(interval)
websocket.onclose = (ev) => {
console.log('client websocket closed', socketId, ev?.reason)
if (!(this.websocket instanceof Promise)) {
this.websocket = null
}
@ -158,6 +190,7 @@ class Connection implements ClientConnection {
}
websocket.onopen = () => {
console.log('connection opened...', socketId)
clearTimeout(dialTimer)
websocket.send(
serialize({
method: 'hello',
@ -167,30 +200,47 @@ class Connection implements ClientConnection {
)
}
websocket.onerror = (event: any) => {
console.log('client websocket error:', socketId, JSON.stringify(event))
console.error('client websocket error:', socketId, event)
reject(new Error(`websocket error:${socketId}`))
}
})
}
private async sendRequest (method: string, ...params: any[]): Promise<any> {
if (this.websocket instanceof Promise) {
this.websocket = await this.websocket
}
if (this.websocket === null) {
this.websocket = this.waitOpenConnection()
this.websocket = await this.websocket
}
private async sendRequest (data: {
method: string
params: any[]
// If not defined, on reconnect with timeout, will retry automatically.
retry?: () => Promise<boolean>
}): Promise<any> {
const id = this.lastId++
this.websocket.send(
serialize({
method,
params,
id
})
)
const promise = new DeferredPromise()
this.requests.set(id, promise)
const promise = new RequestPromise()
const sendData = async (): Promise<void> => {
if (this.websocket instanceof Promise) {
this.websocket = await this.websocket
}
if (this.websocket === null) {
this.websocket = this.waitOpenConnection()
this.websocket = await this.websocket
}
this.requests.set(id, promise)
this.websocket.send(
serialize({
method: data.method,
params: data.params,
id
})
)
}
promise.reconnect = () => {
setTimeout(async () => {
// In case we don't have response yet.
if (this.requests.has(id) && ((await data.retry?.()) ?? true)) {
await sendData()
}
}, 500)
}
await sendData()
return await promise.promise
}
@ -199,31 +249,43 @@ class Connection implements ClientConnection {
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> {
return this.sendRequest('findAll', _class, query, options)
return this.sendRequest({ method: 'findAll', params: [_class, query, options] })
}
tx (tx: Tx): Promise<TxResult> {
return this.sendRequest('tx', tx)
return this.sendRequest({
method: 'tx',
params: [tx],
retry: async () => {
if (tx._class === core.class.TxApplyIf) {
return (
(await (await this.findAll(core.class.Tx, { _id: (tx as TxApplyIf).txes[0]._id }, { limit: 1 })).length) ===
0
)
}
return (await (await this.findAll(core.class.Tx, { _id: tx._id }, { limit: 1 })).length) === 0
}
})
}
loadChunk (domain: Domain, idx?: number): Promise<DocChunk> {
return this.sendRequest('loadChunk', domain, idx)
return this.sendRequest({ method: 'loadChunk', params: [domain, idx] })
}
closeChunk (idx: number): Promise<void> {
return this.sendRequest('closeChunk', idx)
return this.sendRequest({ method: 'closeChunk', params: [idx] })
}
loadDocs (domain: Domain, docs: Ref<Doc>[]): Promise<Doc[]> {
return this.sendRequest('loadDocs', domain, docs)
return this.sendRequest({ method: 'loadDocs', params: [domain, docs] })
}
upload (domain: Domain, docs: Doc[]): Promise<void> {
return this.sendRequest('upload', domain, docs)
return this.sendRequest({ method: 'upload', params: [domain, docs] })
}
clean (domain: Domain, docs: Ref<Doc>[]): Promise<void> {
return this.sendRequest('clean', domain, docs)
return this.sendRequest({ method: 'clean', params: [domain, docs] })
}
}
@ -234,7 +296,10 @@ export async function connect (
url: string,
handler: TxHandler,
onUpgrade?: () => void,
onUnauthorized?: () => void
onUnauthorized?: () => void,
onConnect?: () => void
): Promise<ClientConnection> {
return new Connection(url, handler, onUpgrade, onUnauthorized)
return new Connection(url, handler, onUpgrade, onUnauthorized, async () => {
onConnect?.()
})
}

View File

@ -20,59 +20,46 @@ import { connect } from './connection'
export { connect }
/*!
* Anticrm Platform Client Plugin
* © 2020, 2021 Anticrm Platform Contributors. All Rights Reserved.
* Licensed under the Eclipse Public License, Version 2.0
*/
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
export default async () => {
let _token: string | undefined
let client: Promise<Client> | Client | undefined
return {
function: {
GetClient: async (
token: string,
endpoint: string,
onUpgrade?: () => void,
onUnauthorized?: () => void
onUnauthorized?: () => void,
onConnect?: () => void
): Promise<Client> => {
if (client instanceof Promise) {
client = await client
}
if (token !== _token && client !== undefined) {
await client.close()
client = undefined
}
if (client === undefined) {
const filterModel = getMetadata(clientPlugin.metadata.FilterModel) ?? false
client = createClient(
(handler: TxHandler) => {
const url = new URL(`/${token}`, endpoint)
console.log('connecting to', url.href)
return connect(url.href, handler, onUpgrade, onUnauthorized)
},
filterModel ? getPlugins() : undefined
)
_token = token
const filterModel = getMetadata(clientPlugin.metadata.FilterModel) ?? false
// Check if we had dev hook for client.
const hook = getMetadata(clientPlugin.metadata.ClientHook)
if (hook !== undefined) {
const hookProc = await getResource(hook)
const _client = client
client = new Promise((resolve, reject) => {
_client
.then((res) => {
resolve(hookProc(res))
})
.catch((err) => reject(err))
})
}
}
let client = createClient(
(handler: TxHandler) => {
const url = new URL(`/${token}`, endpoint)
console.log('connecting to', url.href)
return connect(url.href, handler, onUpgrade, onUnauthorized, onConnect)
},
filterModel ? getPlugins() : undefined
)
// Check if we had dev hook for client.
client = hookClient(client)
return await client
}
}
}
}
async function hookClient (client: Promise<Client>): Promise<Client> {
const hook = getMetadata(clientPlugin.metadata.ClientHook)
if (hook !== undefined) {
const hookProc = await getResource(hook)
const _client = client
client = new Promise((resolve, reject) => {
_client
.then((res) => {
resolve(hookProc(res))
})
.catch((err) => reject(err))
})
}
return await client
}

View File

@ -46,7 +46,7 @@ export interface ClientSocket {
send: (data: string | ArrayBufferLike | Blob | ArrayBufferView) => void
close: () => void
close: (code?: number) => void
}
/**
@ -56,7 +56,8 @@ export type ClientFactory = (
token: string,
endpoint: string,
onUpgrade?: () => void,
onUnauthorized?: () => void
onUnauthorized?: () => void,
onConnect?: () => void
) => Promise<Client>
export default plugin(clientId, {

View File

@ -192,7 +192,7 @@
_class={tracker.class.Issue}
space={issue.space}
alwaysEdit
shouldSaveDraft={false}
shouldSaveDraft={true}
on:attached={save}
on:detached={save}
showButtons

View File

@ -3,11 +3,14 @@ import contact from '@hcengineering/contact'
import core, { Client, setCurrentAccount, Version } from '@hcengineering/core'
import login, { loginId } from '@hcengineering/login'
import { getMetadata, getResource, setMetadata } from '@hcengineering/platform'
import presentation, { setClient } from '@hcengineering/presentation'
import presentation, { refreshClient, setClient } from '@hcengineering/presentation'
import { fetchMetadataLocalStorage, getCurrentLocation, navigate, setMetadataLocalStorage } from '@hcengineering/ui'
export let versionError: string | undefined = ''
let _token: string | undefined
let _client: Client | undefined
export async function connect (title: string): Promise<Client | undefined> {
const loc = getCurrentLocation()
const ws = loc.path[1]
@ -27,8 +30,17 @@ export async function connect (title: string): Promise<Client | undefined> {
return
}
const getClient = await getResource(client.function.GetClient)
const instance = await getClient(
if (_token !== token && _client !== undefined) {
await _client.close()
_client = undefined
}
if (_client !== undefined) {
return _client
}
_token = token
const clientFactory = await getResource(client.function.GetClient)
_client = await clientFactory(
token,
endpoint,
() => {
@ -40,11 +52,13 @@ export async function connect (title: string): Promise<Client | undefined> {
path: [loginId],
query: {}
})
}
},
// We need to refresh all active live queries and clear old queries.
refreshClient
)
console.log('logging in as', email)
const me = await instance.findOne(contact.class.EmployeeAccount, { email })
const me = await _client.findOne(contact.class.EmployeeAccount, { email })
if (me !== undefined) {
console.log('login: employee account', me)
setCurrentAccount(me)
@ -55,11 +69,14 @@ export async function connect (title: string): Promise<Client | undefined> {
path: [loginId],
query: { navigateUrl: encodeURIComponent(JSON.stringify(getCurrentLocation())) }
})
// Update on connect, so it will be triggered
setClient(_client)
return
}
try {
const version = await instance.findOne<Version>(core.class.Version, {})
const version = await _client.findOne<Version>(core.class.Version, {})
console.log('Model version', version)
const requirdVersion = getMetadata(presentation.metadata.RequiredVersion)
@ -84,9 +101,9 @@ export async function connect (title: string): Promise<Client | undefined> {
// Update window title
document.title = [ws, title].filter((it) => it).join(' - ')
setClient(_client)
setClient(instance)
return instance
return _client
}
function clearMetadata (ws: string): void {
const tokens = fetchMetadataLocalStorage(login.metadata.LoginTokens)

View File

@ -92,7 +92,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
async cancel (): Promise<void> {
console.log('Cancel indexing', this.indexId, this.workspace)
this.cancelling = true
clearTimeout(this.waitTimeout)
clearTimeout(this.skippedReiterationTimeout)
this.triggerIndexing()
await this.indexing
await this.flush(true)
@ -232,7 +232,7 @@ export class FullTextIndexPipeline implements FullTextPipeline {
}
triggerIndexing = (): void => {}
waitTimeout: any
skippedReiterationTimeout: any
stats: Record<string, number> = {}
private async stageUpdate (udoc: DocIndexState, update: DocumentUpdate<DocIndexState>): Promise<void> {
@ -270,23 +270,20 @@ export class FullTextIndexPipeline implements FullTextPipeline {
if (this.toIndex.size === 0 || this.stageChanged === 0) {
if (this.toIndex.size === 0) {
console.log(`${this.workspace.name} Indexing complete, waiting changes`, this.indexId)
} else {
console.log(`${this.workspace.name} Partial Indexing complete, waiting changes`, this.indexId)
console.log(`${this.workspace.name} Indexing complete`, this.indexId)
}
if (!this.cancelling) {
await new Promise((resolve) => {
this.triggerIndexing = () => {
resolve(null)
clearTimeout(this.waitTimeout)
clearTimeout(this.skippedReiterationTimeout)
}
this.waitTimeout = setTimeout(() => {
this.skippedReiterationTimeout = setTimeout(() => {
// Force skipped reiteration, just decrease by -1
for (const [s, v] of Array.from(this.skipped.entries())) {
this.skipped.set(s, v - 1)
}
resolve(null)
}, 30000)
}, 60000)
})
}
}

View File

@ -46,6 +46,7 @@ import { Readable } from 'stream'
*/
export interface SessionContext extends MeasureContext {
userEmail: string
sessionId: string
}
/**

View File

@ -6,7 +6,7 @@ const apmUrl = process.env.APM_SERVER_URL
const metricsFile = process.env.METRICS_FILE
const metricsConsole = (process.env.METRICS_CONSOLE ?? 'false') === 'true'
const METRICS_UPDATE_INTERVAL = 30000
const METRICS_UPDATE_INTERVAL = 60000
/**
* @public

View File

@ -14,10 +14,9 @@
// limitations under the License.
//
import client from '@hcengineering/client'
import clientResources from '@hcengineering/client-resources'
import client, { clientId } from '@hcengineering/client'
import { Client, WorkspaceId } from '@hcengineering/core'
import { setMetadata } from '@hcengineering/platform'
import { addLocation, getResource, setMetadata } from '@hcengineering/platform'
import { generateToken } from '@hcengineering/server-token'
/**
@ -36,5 +35,9 @@ export async function connect (
const WebSocket = require('ws')
setMetadata(client.metadata.ClientSocketFactory, (url) => new WebSocket(url))
return await (await clientResources()).function.GetClient(token, transactorUrl)
addLocation(clientId, () => import('@hcengineering/client-resources'))
return await (
await getResource(client.function.GetClient)
)(token, transactorUrl)
}

View File

@ -20,6 +20,7 @@ import WebSocket from 'ws'
import { disableLogging, start } from '../server'
import {
Account,
Class,
Doc,
DocumentQuery,
@ -32,6 +33,7 @@ import {
ModelDb,
Ref,
ServerStorage,
Space,
toFindResult,
Tx,
TxResult
@ -95,7 +97,7 @@ describe('server', () => {
it('should connect to server', (done) => {
const conn = connect()
conn.on('open', () => {
conn.close()
conn.close(1000)
})
conn.on('close', () => {
done()
@ -105,13 +107,13 @@ describe('server', () => {
it('should not connect to server without token', (done) => {
const conn = new WebSocket('ws://localhost:3335/xyz')
conn.on('error', () => {
conn.close()
conn.close(1000)
})
conn.on('message', (msg: string) => {
const resp = readResponse(msg)
expect(resp.result === 'hello')
expect(resp.error?.code).toBe(UNAUTHORIZED.code)
conn.close()
conn.close(1000)
})
conn.on('close', () => {
done()
@ -132,11 +134,114 @@ describe('server', () => {
readResponse(msg)
if (++received === total) {
// console.log('resp:', resp, ' Time: ', Date.now() - start)
conn.close()
conn.close(1000)
}
})
conn.on('close', () => {
done()
})
})
it('reconnect', async () => {
const cancelOp = start(
new MeasureMetricsContext('test', {}),
async () => ({
modelDb: await getModelDb(),
findAll: async <T extends Doc>(
ctx: SessionContext,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> => {
const d: Doc & { sessionId: string } = {
_class: 'result' as Ref<Class<Doc>>,
_id: '1' as Ref<Doc & { sessionId: string }>,
space: '' as Ref<Space>,
modifiedBy: '' as Ref<Account>,
modifiedOn: Date.now(),
sessionId: ctx.sessionId
}
return toFindResult([d as unknown as T])
},
tx: async (ctx: SessionContext, tx: Tx): Promise<[TxResult, Tx[], string[] | undefined]> => [{}, [], undefined],
close: async () => {},
storage: {} as unknown as ServerStorage,
domains: async () => [],
find: (domain: Domain) => ({
next: async () => undefined,
close: async () => {}
}),
load: async (domain: Domain, docs: Ref<Doc>[]) => [],
upload: async (domain: Domain, docs: Doc[]) => {},
clean: async (domain: Domain, docs: Ref<Doc>[]) => {}
}),
(token, pipeline, broadcast) => new ClientSession(broadcast, token, pipeline),
3336,
''
)
async function findClose (token: string, timeoutPromise: Promise<void>, code: number): Promise<string> {
const newConn = new WebSocket(`ws://localhost:3336/${token}?sessionId=s1`)
await Promise.race([
timeoutPromise,
new Promise((resolve) => {
newConn.on('open', () => {
newConn.send(serialize({ method: 'hello', params: [], id: -1 }))
newConn.send(serialize({ method: 'findAll', params: [], id: -1 }))
resolve(null)
})
})
])
let helloReceived = false
let responseMsg: any = {}
await Promise.race([
timeoutPromise,
new Promise((resolve) => {
newConn.on('message', (msg: Buffer) => {
try {
console.log('resp:', msg.toString())
const parsedMsg = readResponse(msg.toString()) // Hello
if (!helloReceived) {
expect(parsedMsg.result === 'hello')
helloReceived = true
return
}
responseMsg = readResponse(msg.toString()) // our message
resolve(null)
} catch (err: any) {
console.error(err)
}
})
})
])
if (code === 1005) {
newConn.close()
} else {
newConn.close(code)
}
return responseMsg.result[0].sessionId
}
try {
//
const token: string = generateToken('my@email.com', getWorkspaceId('latest', ''))
const timeoutPromise = new Promise<void>((resolve) => {
setTimeout(resolve, 4000)
})
const t1 = await findClose(token, timeoutPromise, 1005)
const t2 = await findClose(token, timeoutPromise, 1000)
expect(t1).toBe(t2)
} catch (err: any) {
console.error(err)
} finally {
console.log('calling shutdown')
await cancelOp()
}
})
})

View File

@ -24,10 +24,10 @@ import core, {
WorkspaceId
} from '@hcengineering/core'
import { readRequest, Response, serialize, UNAUTHORIZED, unknownError } from '@hcengineering/platform'
import type { Pipeline } from '@hcengineering/server-core'
import type { Pipeline, SessionContext } from '@hcengineering/server-core'
import { decodeToken, Token } from '@hcengineering/server-token'
import { createServer, IncomingMessage } from 'http'
import WebSocket, { WebSocketServer } from 'ws'
import WebSocket, { RawData, WebSocketServer } from 'ws'
import { BroadcastCall, PipelineFactory, Session } from './types'
let LOGGING_ENABLED = true
@ -59,6 +59,8 @@ class SessionManager {
return this.sessionFactory(token, pipeline, this.broadcast.bind(this))
}
upgradeId: string | undefined
async addSession (
ctx: MeasureContext,
ws: WebSocket,
@ -79,6 +81,7 @@ class SessionManager {
if (token.extra?.model === 'upgrade') {
console.log('reloading workspace', JSON.stringify(token))
this.upgradeId = sessionId
// If upgrade client is used.
// Drop all existing clients
await this.closeAll(ctx, wsString, workspace, 0, 'upgrade')
@ -101,7 +104,7 @@ class SessionManager {
return session
}
if (workspace.upgrade) {
if (workspace.upgrade && sessionId !== this.upgradeId) {
ws.close()
throw new Error('Upgrade in progress....')
}
@ -112,7 +115,14 @@ class SessionManager {
// try restore session
const existingSession = workspace.sessions.find((it) => it[0].sessionId === sessionId)
if (existingSession !== undefined) {
if (LOGGING_ENABLED) console.log('found existing session', token.email, existingSession[0].sessionId)
if (LOGGING_ENABLED) {
console.log(
'found existing session',
token.email,
existingSession[0].sessionId,
existingSession[0].sessionInstanceId
)
}
// Update websocket
clearTimeout(existingSession[0].closeTimeout)
existingSession[0].closeTimeout = undefined
@ -123,6 +133,7 @@ class SessionManager {
const session = this.createSession(token, pipeline)
session.sessionId = sessionId
session.sessionInstanceId = generateId()
workspace.sessions.push([session, ws])
await this.setStatus(ctx, session, true)
return session
@ -230,7 +241,13 @@ class SessionManager {
}
}
async closeAll (ctx: MeasureContext, wsId: string, workspace: Workspace, code: number, reason: string): Promise<void> {
async closeAll (
ctx: MeasureContext,
wsId: string,
workspace: Workspace,
code: number,
reason: 'upgrade' | 'shutdown'
): Promise<void> {
console.log(`closing workspace ${wsId} - ${workspace.id}, code: ${code}, reason: ${reason}`)
const sessions = Array.from(workspace.sessions)
@ -238,21 +255,24 @@ class SessionManager {
const closeS = async (s: Session, webSocket: WebSocket): Promise<void> => {
clearTimeout(s.closeTimeout)
// await for message to go to client.
await new Promise((resolve) => {
// Override message handler, to wait for upgrading response from clients.
webSocket.on('close', () => {
resolve(null)
})
webSocket.send(
serialize({
result: {
_class: core.class.TxModelUpgrade
}
s.workspaceClosed = true
if (reason === 'upgrade') {
// await for message to go to client.
await new Promise((resolve) => {
// Override message handler, to wait for upgrading response from clients.
webSocket.on('close', () => {
resolve(null)
})
)
setTimeout(resolve, 1000)
})
webSocket.send(
serialize({
result: {
_class: core.class.TxModelUpgrade
}
})
)
setTimeout(resolve, 1000)
})
}
webSocket.close()
await this.setStatus(ctx, s, false)
}
@ -305,30 +325,22 @@ async function handleRequest<S extends Session> (
): Promise<void> {
const request = readRequest(msg)
if (request.id === -1 && request.method === 'hello') {
console.log('hello happen', service.getUser())
ws.send(serialize({ id: -1, result: 'hello' }))
// Push result buffer messages to client.
for (const r of service.resultBuffer ?? []) {
ws.send(serialize(r))
}
service.resultBuffer = []
return
}
if (request.id === -1 && request.method === '#upgrade') {
ws.close(0, 'upgrade')
return
}
const userCtx = ctx.newChild(service.getUser(), { userId: service.getUser() }) as SessionContext
userCtx.sessionId = service.sessionInstanceId ?? ''
const f = (service as any)[request.method]
try {
const params = [ctx, ...request.params]
const params = [userCtx, ...request.params]
const result = await f.apply(service, params)
const resp: Response<any> = { id: request.id, result }
ws.send(serialize(resp), (err) => {
if (err !== undefined) {
// It seems we failed to send to client.
service.resultBuffer = [...(service.resultBuffer ?? []), resp]
}
})
ws.send(serialize(resp))
} catch (err: any) {
const resp: Response<any> = {
id: request.id,
@ -379,13 +391,33 @@ export function start (
})
const session = await sessions.addSession(ctx, ws, token, pipelineFactory, productId, sessionId)
// eslint-disable-next-line @typescript-eslint/no-misused-promises
ws.on('message', async (msg: string) => await handleRequest(ctx, session, ws, msg))
ws.on('message', async (msg: RawData) => {
let msgStr = ''
if (typeof msg === 'string') {
msgStr = msg
} else if (msg instanceof Buffer) {
msgStr = msg.toString()
} else if (Array.isArray(msg)) {
msgStr = Buffer.concat(msg).toString()
}
await handleRequest(ctx, session, ws, msgStr)
})
// eslint-disable-next-line @typescript-eslint/no-misused-promises
ws.on('close', (code: number, reason: string) => {
ws.on('close', (code: number, reason: Buffer) => {
if (session.workspaceClosed ?? false) {
return
}
// remove session after 1seconds, give a time to reconnect.
session.closeTimeout = setTimeout(() => {
void sessions.close(ctx, ws, token.workspace, code, reason)
}, 1000)
if (code === 1000) {
console.log(`client "${token.email}" closed normally`)
void sessions.close(ctx, ws, token.workspace, code, reason.toString())
} else {
console.log(`client "${token.email}" closed abnormally, waiting reconnect`, code, reason.toString())
session.closeTimeout = setTimeout(() => {
console.log(`client "${token.email}" force closed`)
void sessions.close(ctx, ws, token.workspace, code, reason.toString())
}, 10000)
}
})
const b = buffer
buffer = undefined
@ -401,8 +433,8 @@ export function start (
try {
const payload = decodeToken(token ?? '')
console.log('client connected with payload', payload)
const sessionId = url.searchParams.get('sessionId')
console.log('client connected with payload', payload, sessionId)
if (payload.workspace.productId !== productId) {
throw new Error('Invalid workspace product')

View File

@ -30,8 +30,9 @@ export interface Session {
// Session restore information
sessionId?: string
resultBuffer?: TxResult[]
sessionInstanceId?: string
closeTimeout?: any
workspaceClosed?: boolean
}
/**