From 993b620fe240c1833c69cfd13803b3d340ca97e5 Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Fri, 24 Mar 2023 11:53:12 +0700 Subject: [PATCH] TSK-825: Client proper reconnection (#2797) Signed-off-by: Andrey Sobolev --- packages/core/src/client.ts | 149 ++++++++++++------ packages/platform/src/event.ts | 1 + packages/presentation/src/utils.ts | 9 ++ packages/query/src/index.ts | 9 ++ plugins/client-resources/readme.md | 2 +- plugins/client-resources/src/connection.ts | 145 ++++++++++++----- plugins/client-resources/src/index.ts | 69 ++++---- plugins/client/src/index.ts | 5 +- .../components/issues/edit/EditIssue.svelte | 2 +- plugins/workbench-resources/src/connect.ts | 33 +++- server/core/src/indexer/indexer.ts | 15 +- server/core/src/types.ts | 1 + server/server/src/metrics.ts | 2 +- server/tool/src/connect.ts | 11 +- server/ws/src/__tests__/server.test.ts | 113 ++++++++++++- server/ws/src/server.ts | 108 ++++++++----- server/ws/src/types.ts | 3 +- 17 files changed, 482 insertions(+), 195 deletions(-) diff --git a/packages/core/src/client.ts b/packages/core/src/client.ts index 820ea629d7..90b3da54d0 100644 --- a/packages/core/src/client.ts +++ b/packages/core/src/client.ts @@ -25,6 +25,8 @@ import { SortingOrder } from './storage' import { Tx, TxCreateDoc, TxProcessor, TxUpdateDoc } from './tx' import { toFindResult } from './utils' +const transactionThreshold = 3000 + /** * @public */ @@ -50,6 +52,7 @@ export interface Client extends Storage { */ export interface ClientConnection extends Storage, BackupClient { close: () => Promise + onConnect?: () => Promise } class ClientImpl implements Client, BackupClient { @@ -151,11 +154,16 @@ export async function createClient ( allowedPlugins?: Plugin[] ): Promise { let client: ClientImpl | null = null + + // Temporal buffer, while we apply model let txBuffer: Tx[] | undefined = [] + const loadedTxIds = new Set>() const hierarchy = new Hierarchy() const model = new ModelDb(hierarchy) + let lastTx: number + function txHandler (tx: Tx): void { if (client === null) { txBuffer?.push(tx) @@ -163,63 +171,83 @@ export async function createClient ( // eslint-disable-next-line @typescript-eslint/no-floating-promises client.updateFromRemote(tx) } + lastTx = tx.modifiedOn } + const configs = new Map, PluginConfiguration>() const conn = await connect(txHandler) + + await loadModel(conn, loadedTxIds, allowedPlugins, configs, hierarchy, model) + + txBuffer = txBuffer.filter((tx) => !loadedTxIds.has(tx._id)) + + client = new ClientImpl(hierarchy, model, conn) + + for (const tx of txBuffer) { + txHandler(tx) + loadedTxIds.add(tx._id) + } + txBuffer = undefined + + const oldOnConnect: (() => void) | undefined = conn.onConnect + conn.onConnect = async () => { + // Find all new transactions and apply + await loadModel(conn, loadedTxIds, allowedPlugins, configs, hierarchy, model) + + // We need to look for last 1000 transactions and if it is more since lastTx one we receive, we need to perform full refresh. + const atxes = await conn.findAll( + core.class.Tx, + { modifiedOn: { $gt: lastTx } }, + { sort: { _id: SortingOrder.Ascending }, limit: transactionThreshold } + ) + if (atxes.total < transactionThreshold) { + console.log('applying input transactions', atxes.length) + for (const tx of atxes) { + txHandler(tx) + } + } else { + // We need to trigger full refresh on queries, etc. + await oldOnConnect?.() + } + } + + return client +} +async function loadModel ( + conn: ClientConnection, + processedTx: Set>, + allowedPlugins: Plugin[] | undefined, + configs: Map, PluginConfiguration>, + hierarchy: Hierarchy, + model: ModelDb +): Promise { const t = Date.now() + const atxes = await conn.findAll( core.class.Tx, - { objectSpace: core.space.Model }, + { objectSpace: core.space.Model, _id: { $nin: Array.from(processedTx.values()) } }, { sort: { _id: SortingOrder.Ascending } } ) - console.log('find model', atxes.length, Date.now() - t) let systemTx: Tx[] = [] const userTx: Tx[] = [] + console.log('find' + (processedTx.size === 0 ? 'full model' : 'model diff'), atxes.length, Date.now() - t) atxes.forEach((tx) => (tx.modifiedBy === core.account.System ? systemTx : userTx).push(tx)) if (allowedPlugins !== undefined) { - // Filter system transactions - const configs = new Map, PluginConfiguration>() - for (const t of systemTx) { - if (t._class === core.class.TxCreateDoc) { - const ct = t as TxCreateDoc - if (ct.objectClass === core.class.PluginConfiguration) { - configs.set(ct.objectId as Ref, TxProcessor.createDoc2Doc(ct) as PluginConfiguration) - } - } else if (t._class === core.class.TxUpdateDoc) { - const ut = t as TxUpdateDoc - if (ut.objectClass === core.class.PluginConfiguration) { - const c = configs.get(ut.objectId as Ref) - if (c !== undefined) { - TxProcessor.updateDoc2Doc(c, ut) - } - } - } - } - + fillConfiguration(systemTx, configs) const excludedPlugins = Array.from(configs.values()).filter((it) => !allowedPlugins.includes(it.pluginId as Plugin)) - for (const a of excludedPlugins) { - for (const c of configs.values()) { - if (a.pluginId === c.pluginId) { - const excluded = new Set>() - for (const id of c.transactions) { - excluded.add(id as Ref) - } - const exclude = systemTx.filter((t) => excluded.has(t._id)) - console.log('exclude plugin', c.pluginId, exclude.length) - systemTx = systemTx.filter((t) => !excluded.has(t._id)) - } - } - } + systemTx = pluginFilterTx(excludedPlugins, configs, systemTx) } const txes = systemTx.concat(userTx) - const txMap = new Map, Ref>() - for (const tx of txes) txMap.set(tx._id, tx._id) + for (const tx of txes) { + processedTx.add(tx._id) + } + for (const tx of txes) { try { hierarchy.tx(tx) @@ -234,13 +262,44 @@ export async function createClient ( console.error('failed to apply model transaction, skipping', JSON.stringify(tx), err) } } - - txBuffer = txBuffer.filter((tx) => txMap.get(tx._id) === undefined) - - client = new ClientImpl(hierarchy, model, conn) - - for (const tx of txBuffer) txHandler(tx) - txBuffer = undefined - - return client +} + +function fillConfiguration (systemTx: Tx[], configs: Map, PluginConfiguration>): void { + for (const t of systemTx) { + if (t._class === core.class.TxCreateDoc) { + const ct = t as TxCreateDoc + if (ct.objectClass === core.class.PluginConfiguration) { + configs.set(ct.objectId as Ref, TxProcessor.createDoc2Doc(ct) as PluginConfiguration) + } + } else if (t._class === core.class.TxUpdateDoc) { + const ut = t as TxUpdateDoc + if (ut.objectClass === core.class.PluginConfiguration) { + const c = configs.get(ut.objectId as Ref) + if (c !== undefined) { + TxProcessor.updateDoc2Doc(c, ut) + } + } + } + } +} + +function pluginFilterTx ( + excludedPlugins: PluginConfiguration[], + configs: Map, PluginConfiguration>, + systemTx: Tx[] +): Tx[] { + for (const a of excludedPlugins) { + for (const c of configs.values()) { + if (a.pluginId === c.pluginId) { + const excluded = new Set>() + for (const id of c.transactions) { + excluded.add(id as Ref) + } + const exclude = systemTx.filter((t) => excluded.has(t._id)) + console.log('exclude plugin', c.pluginId, exclude.length) + systemTx = systemTx.filter((t) => !excluded.has(t._id)) + } + } + } + return systemTx } diff --git a/packages/platform/src/event.ts b/packages/platform/src/event.ts index 62c32882f8..015e899034 100644 --- a/packages/platform/src/event.ts +++ b/packages/platform/src/event.ts @@ -88,6 +88,7 @@ export async function monitor (status: Status, promise: Promise): Promise< return result } catch (err) { void setPlatformStatus(unknownError(err)) // eslint-disable-line no-void + console.error(err) throw err } } diff --git a/packages/presentation/src/utils.ts b/packages/presentation/src/utils.ts index e25703c931..35381225bf 100644 --- a/packages/presentation/src/utils.ts +++ b/packages/presentation/src/utils.ts @@ -113,6 +113,15 @@ export function setClient (_client: Client): void { } } +/** + * @public + */ +export function refreshClient (): void { + if (liveQuery !== undefined) { + void liveQuery.refreshConnect() + } +} + /** * @public */ diff --git a/packages/query/src/index.ts b/packages/query/src/index.ts index 5309ced0e0..3e927fa418 100644 --- a/packages/query/src/index.ts +++ b/packages/query/src/index.ts @@ -85,6 +85,15 @@ export class LiveQuery extends TxProcessor implements Client { return this.client.getModel() } + // Perform refresh of content since connection established. + async refreshConnect (): Promise { + for (const q of [...this.queue]) { + if (!(await this.removeFromQueue(q))) { + await this.refresh(q) + } + } + } + private match (q: Query, doc: Doc): boolean { if (!this.getHierarchy().isDerived(doc._class, q._class)) { // Check if it is not a mixin and not match class diff --git a/plugins/client-resources/readme.md b/plugins/client-resources/readme.md index 35b4a6f718..64370999be 100644 --- a/plugins/client-resources/readme.md +++ b/plugins/client-resources/readme.md @@ -22,7 +22,7 @@ Package allow to create a client to interact with running platform. ## Node JS -For NodeJS enviornment it is required to configure ClientSocketFactory using 'ws' package. +For NodeJS environment it is required to configure ClientSocketFactory using 'ws' package. ```ts // We need to override default WebSocket factory with 'ws' one. diff --git a/plugins/client-resources/src/connection.ts b/plugins/client-resources/src/connection.ts index 7d02cd7df7..279ced9efd 100644 --- a/plugins/client-resources/src/connection.ts +++ b/plugins/client-resources/src/connection.ts @@ -27,15 +27,29 @@ import core, { generateId, Ref, Tx, + TxApplyIf, TxHandler, TxResult } from '@hcengineering/core' -import { getMetadata, PlatformError, readResponse, ReqId, serialize, UNAUTHORIZED } from '@hcengineering/platform' +import { + getMetadata, + PlatformError, + readResponse, + ReqId, + serialize, + UNAUTHORIZED, + unknownError +} from '@hcengineering/platform' -class DeferredPromise { +const SECOND = 1000 +const pingTimeout = 10 * SECOND +const dialTimeout = 20 * SECOND + +class RequestPromise { readonly promise: Promise resolve!: (value?: any) => void reject!: (reason?: any) => void + reconnect?: () => void constructor () { this.promise = new Promise((resolve, reject) => { this.resolve = resolve @@ -46,7 +60,7 @@ class DeferredPromise { class Connection implements ClientConnection { private websocket: ClientSocket | Promise | null = null - private readonly requests = new Map() + private readonly requests = new Map() private lastId = 0 private readonly interval: number private readonly sessionId = generateId() as string @@ -55,22 +69,25 @@ class Connection implements ClientConnection { private readonly url: string, private readonly handler: TxHandler, private readonly onUpgrade?: () => void, - private readonly onUnauthorized?: () => void + private readonly onUnauthorized?: () => void, + readonly onConnect?: () => Promise ) { console.log('connection created') this.interval = setInterval(() => { // eslint-disable-next-line @typescript-eslint/no-floating-promises - this.sendRequest('ping') - }, 10000) + this.sendRequest({ method: 'ping', params: [] }) + }, pingTimeout) } async close (): Promise { clearInterval(this.interval) if (this.websocket !== null) { if (this.websocket instanceof Promise) { - this.websocket = await this.websocket + await this.websocket.then((ws) => ws.close()) + } else { + this.websocket.close(1000) } - this.websocket.close() + this.websocket = null } } @@ -83,7 +100,7 @@ class Connection implements ClientConnection { return conn } catch (err: any) { console.log('failed to connect', err) - if (err.code === UNAUTHORIZED.code) { + if (err?.code === UNAUTHORIZED.code) { this.onUnauthorized?.() throw err } @@ -94,7 +111,7 @@ class Connection implements ClientConnection { if (this.delay !== 15) { this.delay++ } - }, this.delay * 1000) + }, this.delay * SECOND) }) } } @@ -109,7 +126,16 @@ class Connection implements ClientConnection { getMetadata(client.metadata.ClientSocketFactory) ?? ((url: string) => new WebSocket(url) as ClientSocket) const websocket = clientSocketFactory(this.url + `?sessionId=${this.sessionId}`) + const opened = false const socketId = this.sockets++ + + const dialTimer = setTimeout(() => { + if (!opened) { + websocket.close() + reject(new PlatformError(unknownError('timeout'))) + } + }, dialTimeout) + websocket.onmessage = (event: MessageEvent) => { const resp = readResponse(event.data) if (resp.id === -1 && resp.result === 'hello') { @@ -117,7 +143,12 @@ class Connection implements ClientConnection { reject(resp.error) return } + for (const [, v] of this.requests.entries()) { + v.reconnect?.() + } resolve(websocket) + void this.onConnect?.() + return } if (resp.id !== undefined) { @@ -144,13 +175,14 @@ class Connection implements ClientConnection { }) ) this.onUpgrade?.() + return } this.handler(tx) } } - websocket.onclose = () => { - console.log('client websocket closed', socketId) - // clearInterval(interval) + websocket.onclose = (ev) => { + console.log('client websocket closed', socketId, ev?.reason) + if (!(this.websocket instanceof Promise)) { this.websocket = null } @@ -158,6 +190,7 @@ class Connection implements ClientConnection { } websocket.onopen = () => { console.log('connection opened...', socketId) + clearTimeout(dialTimer) websocket.send( serialize({ method: 'hello', @@ -167,30 +200,47 @@ class Connection implements ClientConnection { ) } websocket.onerror = (event: any) => { - console.log('client websocket error:', socketId, JSON.stringify(event)) + console.error('client websocket error:', socketId, event) reject(new Error(`websocket error:${socketId}`)) } }) } - private async sendRequest (method: string, ...params: any[]): Promise { - if (this.websocket instanceof Promise) { - this.websocket = await this.websocket - } - if (this.websocket === null) { - this.websocket = this.waitOpenConnection() - this.websocket = await this.websocket - } + private async sendRequest (data: { + method: string + params: any[] + // If not defined, on reconnect with timeout, will retry automatically. + retry?: () => Promise + }): Promise { const id = this.lastId++ - this.websocket.send( - serialize({ - method, - params, - id - }) - ) - const promise = new DeferredPromise() - this.requests.set(id, promise) + const promise = new RequestPromise() + + const sendData = async (): Promise => { + if (this.websocket instanceof Promise) { + this.websocket = await this.websocket + } + if (this.websocket === null) { + this.websocket = this.waitOpenConnection() + this.websocket = await this.websocket + } + this.requests.set(id, promise) + this.websocket.send( + serialize({ + method: data.method, + params: data.params, + id + }) + ) + } + promise.reconnect = () => { + setTimeout(async () => { + // In case we don't have response yet. + if (this.requests.has(id) && ((await data.retry?.()) ?? true)) { + await sendData() + } + }, 500) + } + await sendData() return await promise.promise } @@ -199,31 +249,43 @@ class Connection implements ClientConnection { query: DocumentQuery, options?: FindOptions ): Promise> { - return this.sendRequest('findAll', _class, query, options) + return this.sendRequest({ method: 'findAll', params: [_class, query, options] }) } tx (tx: Tx): Promise { - return this.sendRequest('tx', tx) + return this.sendRequest({ + method: 'tx', + params: [tx], + retry: async () => { + if (tx._class === core.class.TxApplyIf) { + return ( + (await (await this.findAll(core.class.Tx, { _id: (tx as TxApplyIf).txes[0]._id }, { limit: 1 })).length) === + 0 + ) + } + return (await (await this.findAll(core.class.Tx, { _id: tx._id }, { limit: 1 })).length) === 0 + } + }) } loadChunk (domain: Domain, idx?: number): Promise { - return this.sendRequest('loadChunk', domain, idx) + return this.sendRequest({ method: 'loadChunk', params: [domain, idx] }) } closeChunk (idx: number): Promise { - return this.sendRequest('closeChunk', idx) + return this.sendRequest({ method: 'closeChunk', params: [idx] }) } loadDocs (domain: Domain, docs: Ref[]): Promise { - return this.sendRequest('loadDocs', domain, docs) + return this.sendRequest({ method: 'loadDocs', params: [domain, docs] }) } upload (domain: Domain, docs: Doc[]): Promise { - return this.sendRequest('upload', domain, docs) + return this.sendRequest({ method: 'upload', params: [domain, docs] }) } clean (domain: Domain, docs: Ref[]): Promise { - return this.sendRequest('clean', domain, docs) + return this.sendRequest({ method: 'clean', params: [domain, docs] }) } } @@ -234,7 +296,10 @@ export async function connect ( url: string, handler: TxHandler, onUpgrade?: () => void, - onUnauthorized?: () => void + onUnauthorized?: () => void, + onConnect?: () => void ): Promise { - return new Connection(url, handler, onUpgrade, onUnauthorized) + return new Connection(url, handler, onUpgrade, onUnauthorized, async () => { + onConnect?.() + }) } diff --git a/plugins/client-resources/src/index.ts b/plugins/client-resources/src/index.ts index c38ee33d15..80a04a9a89 100644 --- a/plugins/client-resources/src/index.ts +++ b/plugins/client-resources/src/index.ts @@ -20,59 +20,46 @@ import { connect } from './connection' export { connect } -/*! - * Anticrm Platform™ Client Plugin - * © 2020, 2021 Anticrm Platform Contributors. All Rights Reserved. - * Licensed under the Eclipse Public License, Version 2.0 - */ // eslint-disable-next-line @typescript-eslint/explicit-function-return-type export default async () => { - let _token: string | undefined - let client: Promise | Client | undefined - return { function: { GetClient: async ( token: string, endpoint: string, onUpgrade?: () => void, - onUnauthorized?: () => void + onUnauthorized?: () => void, + onConnect?: () => void ): Promise => { - if (client instanceof Promise) { - client = await client - } - if (token !== _token && client !== undefined) { - await client.close() - client = undefined - } - if (client === undefined) { - const filterModel = getMetadata(clientPlugin.metadata.FilterModel) ?? false - client = createClient( - (handler: TxHandler) => { - const url = new URL(`/${token}`, endpoint) - console.log('connecting to', url.href) - return connect(url.href, handler, onUpgrade, onUnauthorized) - }, - filterModel ? getPlugins() : undefined - ) - _token = token + const filterModel = getMetadata(clientPlugin.metadata.FilterModel) ?? false - // Check if we had dev hook for client. - const hook = getMetadata(clientPlugin.metadata.ClientHook) - if (hook !== undefined) { - const hookProc = await getResource(hook) - const _client = client - client = new Promise((resolve, reject) => { - _client - .then((res) => { - resolve(hookProc(res)) - }) - .catch((err) => reject(err)) - }) - } - } + let client = createClient( + (handler: TxHandler) => { + const url = new URL(`/${token}`, endpoint) + console.log('connecting to', url.href) + return connect(url.href, handler, onUpgrade, onUnauthorized, onConnect) + }, + filterModel ? getPlugins() : undefined + ) + // Check if we had dev hook for client. + client = hookClient(client) return await client } } } } +async function hookClient (client: Promise): Promise { + const hook = getMetadata(clientPlugin.metadata.ClientHook) + if (hook !== undefined) { + const hookProc = await getResource(hook) + const _client = client + client = new Promise((resolve, reject) => { + _client + .then((res) => { + resolve(hookProc(res)) + }) + .catch((err) => reject(err)) + }) + } + return await client +} diff --git a/plugins/client/src/index.ts b/plugins/client/src/index.ts index f605ff6709..cc634b1683 100644 --- a/plugins/client/src/index.ts +++ b/plugins/client/src/index.ts @@ -46,7 +46,7 @@ export interface ClientSocket { send: (data: string | ArrayBufferLike | Blob | ArrayBufferView) => void - close: () => void + close: (code?: number) => void } /** @@ -56,7 +56,8 @@ export type ClientFactory = ( token: string, endpoint: string, onUpgrade?: () => void, - onUnauthorized?: () => void + onUnauthorized?: () => void, + onConnect?: () => void ) => Promise export default plugin(clientId, { diff --git a/plugins/tracker-resources/src/components/issues/edit/EditIssue.svelte b/plugins/tracker-resources/src/components/issues/edit/EditIssue.svelte index 276426e08f..7fa0bbc6ca 100644 --- a/plugins/tracker-resources/src/components/issues/edit/EditIssue.svelte +++ b/plugins/tracker-resources/src/components/issues/edit/EditIssue.svelte @@ -192,7 +192,7 @@ _class={tracker.class.Issue} space={issue.space} alwaysEdit - shouldSaveDraft={false} + shouldSaveDraft={true} on:attached={save} on:detached={save} showButtons diff --git a/plugins/workbench-resources/src/connect.ts b/plugins/workbench-resources/src/connect.ts index e87ca980fe..205a34c1d2 100644 --- a/plugins/workbench-resources/src/connect.ts +++ b/plugins/workbench-resources/src/connect.ts @@ -3,11 +3,14 @@ import contact from '@hcengineering/contact' import core, { Client, setCurrentAccount, Version } from '@hcengineering/core' import login, { loginId } from '@hcengineering/login' import { getMetadata, getResource, setMetadata } from '@hcengineering/platform' -import presentation, { setClient } from '@hcengineering/presentation' +import presentation, { refreshClient, setClient } from '@hcengineering/presentation' import { fetchMetadataLocalStorage, getCurrentLocation, navigate, setMetadataLocalStorage } from '@hcengineering/ui' export let versionError: string | undefined = '' +let _token: string | undefined +let _client: Client | undefined + export async function connect (title: string): Promise { const loc = getCurrentLocation() const ws = loc.path[1] @@ -27,8 +30,17 @@ export async function connect (title: string): Promise { return } - const getClient = await getResource(client.function.GetClient) - const instance = await getClient( + if (_token !== token && _client !== undefined) { + await _client.close() + _client = undefined + } + if (_client !== undefined) { + return _client + } + _token = token + + const clientFactory = await getResource(client.function.GetClient) + _client = await clientFactory( token, endpoint, () => { @@ -40,11 +52,13 @@ export async function connect (title: string): Promise { path: [loginId], query: {} }) - } + }, + // We need to refresh all active live queries and clear old queries. + refreshClient ) console.log('logging in as', email) - const me = await instance.findOne(contact.class.EmployeeAccount, { email }) + const me = await _client.findOne(contact.class.EmployeeAccount, { email }) if (me !== undefined) { console.log('login: employee account', me) setCurrentAccount(me) @@ -55,11 +69,14 @@ export async function connect (title: string): Promise { path: [loginId], query: { navigateUrl: encodeURIComponent(JSON.stringify(getCurrentLocation())) } }) + + // Update on connect, so it will be triggered + setClient(_client) return } try { - const version = await instance.findOne(core.class.Version, {}) + const version = await _client.findOne(core.class.Version, {}) console.log('Model version', version) const requirdVersion = getMetadata(presentation.metadata.RequiredVersion) @@ -84,9 +101,9 @@ export async function connect (title: string): Promise { // Update window title document.title = [ws, title].filter((it) => it).join(' - ') + setClient(_client) - setClient(instance) - return instance + return _client } function clearMetadata (ws: string): void { const tokens = fetchMetadataLocalStorage(login.metadata.LoginTokens) diff --git a/server/core/src/indexer/indexer.ts b/server/core/src/indexer/indexer.ts index 6eae7057b1..306fca7f40 100644 --- a/server/core/src/indexer/indexer.ts +++ b/server/core/src/indexer/indexer.ts @@ -92,7 +92,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { async cancel (): Promise { console.log('Cancel indexing', this.indexId, this.workspace) this.cancelling = true - clearTimeout(this.waitTimeout) + clearTimeout(this.skippedReiterationTimeout) this.triggerIndexing() await this.indexing await this.flush(true) @@ -232,7 +232,7 @@ export class FullTextIndexPipeline implements FullTextPipeline { } triggerIndexing = (): void => {} - waitTimeout: any + skippedReiterationTimeout: any stats: Record = {} private async stageUpdate (udoc: DocIndexState, update: DocumentUpdate): Promise { @@ -270,23 +270,20 @@ export class FullTextIndexPipeline implements FullTextPipeline { if (this.toIndex.size === 0 || this.stageChanged === 0) { if (this.toIndex.size === 0) { - console.log(`${this.workspace.name} Indexing complete, waiting changes`, this.indexId) - } else { - console.log(`${this.workspace.name} Partial Indexing complete, waiting changes`, this.indexId) + console.log(`${this.workspace.name} Indexing complete`, this.indexId) } if (!this.cancelling) { await new Promise((resolve) => { this.triggerIndexing = () => { resolve(null) - clearTimeout(this.waitTimeout) + clearTimeout(this.skippedReiterationTimeout) } - this.waitTimeout = setTimeout(() => { + this.skippedReiterationTimeout = setTimeout(() => { // Force skipped reiteration, just decrease by -1 for (const [s, v] of Array.from(this.skipped.entries())) { this.skipped.set(s, v - 1) } - resolve(null) - }, 30000) + }, 60000) }) } } diff --git a/server/core/src/types.ts b/server/core/src/types.ts index 1bcb6bd37c..dff08737dd 100644 --- a/server/core/src/types.ts +++ b/server/core/src/types.ts @@ -46,6 +46,7 @@ import { Readable } from 'stream' */ export interface SessionContext extends MeasureContext { userEmail: string + sessionId: string } /** diff --git a/server/server/src/metrics.ts b/server/server/src/metrics.ts index ab8e8fe633..a11db54cf5 100644 --- a/server/server/src/metrics.ts +++ b/server/server/src/metrics.ts @@ -6,7 +6,7 @@ const apmUrl = process.env.APM_SERVER_URL const metricsFile = process.env.METRICS_FILE const metricsConsole = (process.env.METRICS_CONSOLE ?? 'false') === 'true' -const METRICS_UPDATE_INTERVAL = 30000 +const METRICS_UPDATE_INTERVAL = 60000 /** * @public diff --git a/server/tool/src/connect.ts b/server/tool/src/connect.ts index 1acc31accb..e605cdeb6b 100644 --- a/server/tool/src/connect.ts +++ b/server/tool/src/connect.ts @@ -14,10 +14,9 @@ // limitations under the License. // -import client from '@hcengineering/client' -import clientResources from '@hcengineering/client-resources' +import client, { clientId } from '@hcengineering/client' import { Client, WorkspaceId } from '@hcengineering/core' -import { setMetadata } from '@hcengineering/platform' +import { addLocation, getResource, setMetadata } from '@hcengineering/platform' import { generateToken } from '@hcengineering/server-token' /** @@ -36,5 +35,9 @@ export async function connect ( const WebSocket = require('ws') setMetadata(client.metadata.ClientSocketFactory, (url) => new WebSocket(url)) - return await (await clientResources()).function.GetClient(token, transactorUrl) + addLocation(clientId, () => import('@hcengineering/client-resources')) + + return await ( + await getResource(client.function.GetClient) + )(token, transactorUrl) } diff --git a/server/ws/src/__tests__/server.test.ts b/server/ws/src/__tests__/server.test.ts index c6854f2741..41c008abc4 100644 --- a/server/ws/src/__tests__/server.test.ts +++ b/server/ws/src/__tests__/server.test.ts @@ -20,6 +20,7 @@ import WebSocket from 'ws' import { disableLogging, start } from '../server' import { + Account, Class, Doc, DocumentQuery, @@ -32,6 +33,7 @@ import { ModelDb, Ref, ServerStorage, + Space, toFindResult, Tx, TxResult @@ -95,7 +97,7 @@ describe('server', () => { it('should connect to server', (done) => { const conn = connect() conn.on('open', () => { - conn.close() + conn.close(1000) }) conn.on('close', () => { done() @@ -105,13 +107,13 @@ describe('server', () => { it('should not connect to server without token', (done) => { const conn = new WebSocket('ws://localhost:3335/xyz') conn.on('error', () => { - conn.close() + conn.close(1000) }) conn.on('message', (msg: string) => { const resp = readResponse(msg) expect(resp.result === 'hello') expect(resp.error?.code).toBe(UNAUTHORIZED.code) - conn.close() + conn.close(1000) }) conn.on('close', () => { done() @@ -132,11 +134,114 @@ describe('server', () => { readResponse(msg) if (++received === total) { // console.log('resp:', resp, ' Time: ', Date.now() - start) - conn.close() + conn.close(1000) } }) conn.on('close', () => { done() }) }) + + it('reconnect', async () => { + const cancelOp = start( + new MeasureMetricsContext('test', {}), + async () => ({ + modelDb: await getModelDb(), + findAll: async ( + ctx: SessionContext, + _class: Ref>, + query: DocumentQuery, + options?: FindOptions + ): Promise> => { + const d: Doc & { sessionId: string } = { + _class: 'result' as Ref>, + _id: '1' as Ref, + space: '' as Ref, + modifiedBy: '' as Ref, + modifiedOn: Date.now(), + sessionId: ctx.sessionId + } + return toFindResult([d as unknown as T]) + }, + tx: async (ctx: SessionContext, tx: Tx): Promise<[TxResult, Tx[], string[] | undefined]> => [{}, [], undefined], + close: async () => {}, + storage: {} as unknown as ServerStorage, + domains: async () => [], + find: (domain: Domain) => ({ + next: async () => undefined, + close: async () => {} + }), + load: async (domain: Domain, docs: Ref[]) => [], + upload: async (domain: Domain, docs: Doc[]) => {}, + clean: async (domain: Domain, docs: Ref[]) => {} + }), + (token, pipeline, broadcast) => new ClientSession(broadcast, token, pipeline), + 3336, + '' + ) + + async function findClose (token: string, timeoutPromise: Promise, code: number): Promise { + const newConn = new WebSocket(`ws://localhost:3336/${token}?sessionId=s1`) + + await Promise.race([ + timeoutPromise, + new Promise((resolve) => { + newConn.on('open', () => { + newConn.send(serialize({ method: 'hello', params: [], id: -1 })) + newConn.send(serialize({ method: 'findAll', params: [], id: -1 })) + resolve(null) + }) + }) + ]) + + let helloReceived = false + + let responseMsg: any = {} + + await Promise.race([ + timeoutPromise, + new Promise((resolve) => { + newConn.on('message', (msg: Buffer) => { + try { + console.log('resp:', msg.toString()) + const parsedMsg = readResponse(msg.toString()) // Hello + if (!helloReceived) { + expect(parsedMsg.result === 'hello') + helloReceived = true + return + } + responseMsg = readResponse(msg.toString()) // our message + resolve(null) + } catch (err: any) { + console.error(err) + } + }) + }) + ]) + + if (code === 1005) { + newConn.close() + } else { + newConn.close(code) + } + return responseMsg.result[0].sessionId + } + + try { + // + const token: string = generateToken('my@email.com', getWorkspaceId('latest', '')) + const timeoutPromise = new Promise((resolve) => { + setTimeout(resolve, 4000) + }) + const t1 = await findClose(token, timeoutPromise, 1005) + const t2 = await findClose(token, timeoutPromise, 1000) + + expect(t1).toBe(t2) + } catch (err: any) { + console.error(err) + } finally { + console.log('calling shutdown') + await cancelOp() + } + }) }) diff --git a/server/ws/src/server.ts b/server/ws/src/server.ts index f3c1a5d6ad..e839c7ae11 100644 --- a/server/ws/src/server.ts +++ b/server/ws/src/server.ts @@ -24,10 +24,10 @@ import core, { WorkspaceId } from '@hcengineering/core' import { readRequest, Response, serialize, UNAUTHORIZED, unknownError } from '@hcengineering/platform' -import type { Pipeline } from '@hcengineering/server-core' +import type { Pipeline, SessionContext } from '@hcengineering/server-core' import { decodeToken, Token } from '@hcengineering/server-token' import { createServer, IncomingMessage } from 'http' -import WebSocket, { WebSocketServer } from 'ws' +import WebSocket, { RawData, WebSocketServer } from 'ws' import { BroadcastCall, PipelineFactory, Session } from './types' let LOGGING_ENABLED = true @@ -59,6 +59,8 @@ class SessionManager { return this.sessionFactory(token, pipeline, this.broadcast.bind(this)) } + upgradeId: string | undefined + async addSession ( ctx: MeasureContext, ws: WebSocket, @@ -79,6 +81,7 @@ class SessionManager { if (token.extra?.model === 'upgrade') { console.log('reloading workspace', JSON.stringify(token)) + this.upgradeId = sessionId // If upgrade client is used. // Drop all existing clients await this.closeAll(ctx, wsString, workspace, 0, 'upgrade') @@ -101,7 +104,7 @@ class SessionManager { return session } - if (workspace.upgrade) { + if (workspace.upgrade && sessionId !== this.upgradeId) { ws.close() throw new Error('Upgrade in progress....') } @@ -112,7 +115,14 @@ class SessionManager { // try restore session const existingSession = workspace.sessions.find((it) => it[0].sessionId === sessionId) if (existingSession !== undefined) { - if (LOGGING_ENABLED) console.log('found existing session', token.email, existingSession[0].sessionId) + if (LOGGING_ENABLED) { + console.log( + 'found existing session', + token.email, + existingSession[0].sessionId, + existingSession[0].sessionInstanceId + ) + } // Update websocket clearTimeout(existingSession[0].closeTimeout) existingSession[0].closeTimeout = undefined @@ -123,6 +133,7 @@ class SessionManager { const session = this.createSession(token, pipeline) session.sessionId = sessionId + session.sessionInstanceId = generateId() workspace.sessions.push([session, ws]) await this.setStatus(ctx, session, true) return session @@ -230,7 +241,13 @@ class SessionManager { } } - async closeAll (ctx: MeasureContext, wsId: string, workspace: Workspace, code: number, reason: string): Promise { + async closeAll ( + ctx: MeasureContext, + wsId: string, + workspace: Workspace, + code: number, + reason: 'upgrade' | 'shutdown' + ): Promise { console.log(`closing workspace ${wsId} - ${workspace.id}, code: ${code}, reason: ${reason}`) const sessions = Array.from(workspace.sessions) @@ -238,21 +255,24 @@ class SessionManager { const closeS = async (s: Session, webSocket: WebSocket): Promise => { clearTimeout(s.closeTimeout) - // await for message to go to client. - await new Promise((resolve) => { - // Override message handler, to wait for upgrading response from clients. - webSocket.on('close', () => { - resolve(null) - }) - webSocket.send( - serialize({ - result: { - _class: core.class.TxModelUpgrade - } + s.workspaceClosed = true + if (reason === 'upgrade') { + // await for message to go to client. + await new Promise((resolve) => { + // Override message handler, to wait for upgrading response from clients. + webSocket.on('close', () => { + resolve(null) }) - ) - setTimeout(resolve, 1000) - }) + webSocket.send( + serialize({ + result: { + _class: core.class.TxModelUpgrade + } + }) + ) + setTimeout(resolve, 1000) + }) + } webSocket.close() await this.setStatus(ctx, s, false) } @@ -305,30 +325,22 @@ async function handleRequest ( ): Promise { const request = readRequest(msg) if (request.id === -1 && request.method === 'hello') { + console.log('hello happen', service.getUser()) ws.send(serialize({ id: -1, result: 'hello' })) - - // Push result buffer messages to client. - for (const r of service.resultBuffer ?? []) { - ws.send(serialize(r)) - } - service.resultBuffer = [] return } if (request.id === -1 && request.method === '#upgrade') { ws.close(0, 'upgrade') return } + const userCtx = ctx.newChild(service.getUser(), { userId: service.getUser() }) as SessionContext + userCtx.sessionId = service.sessionInstanceId ?? '' const f = (service as any)[request.method] try { - const params = [ctx, ...request.params] + const params = [userCtx, ...request.params] const result = await f.apply(service, params) const resp: Response = { id: request.id, result } - ws.send(serialize(resp), (err) => { - if (err !== undefined) { - // It seems we failed to send to client. - service.resultBuffer = [...(service.resultBuffer ?? []), resp] - } - }) + ws.send(serialize(resp)) } catch (err: any) { const resp: Response = { id: request.id, @@ -379,13 +391,33 @@ export function start ( }) const session = await sessions.addSession(ctx, ws, token, pipelineFactory, productId, sessionId) // eslint-disable-next-line @typescript-eslint/no-misused-promises - ws.on('message', async (msg: string) => await handleRequest(ctx, session, ws, msg)) + ws.on('message', async (msg: RawData) => { + let msgStr = '' + if (typeof msg === 'string') { + msgStr = msg + } else if (msg instanceof Buffer) { + msgStr = msg.toString() + } else if (Array.isArray(msg)) { + msgStr = Buffer.concat(msg).toString() + } + await handleRequest(ctx, session, ws, msgStr) + }) // eslint-disable-next-line @typescript-eslint/no-misused-promises - ws.on('close', (code: number, reason: string) => { + ws.on('close', (code: number, reason: Buffer) => { + if (session.workspaceClosed ?? false) { + return + } // remove session after 1seconds, give a time to reconnect. - session.closeTimeout = setTimeout(() => { - void sessions.close(ctx, ws, token.workspace, code, reason) - }, 1000) + if (code === 1000) { + console.log(`client "${token.email}" closed normally`) + void sessions.close(ctx, ws, token.workspace, code, reason.toString()) + } else { + console.log(`client "${token.email}" closed abnormally, waiting reconnect`, code, reason.toString()) + session.closeTimeout = setTimeout(() => { + console.log(`client "${token.email}" force closed`) + void sessions.close(ctx, ws, token.workspace, code, reason.toString()) + }, 10000) + } }) const b = buffer buffer = undefined @@ -401,8 +433,8 @@ export function start ( try { const payload = decodeToken(token ?? '') - console.log('client connected with payload', payload) const sessionId = url.searchParams.get('sessionId') + console.log('client connected with payload', payload, sessionId) if (payload.workspace.productId !== productId) { throw new Error('Invalid workspace product') diff --git a/server/ws/src/types.ts b/server/ws/src/types.ts index c4ee4c7dca..94343effbd 100644 --- a/server/ws/src/types.ts +++ b/server/ws/src/types.ts @@ -30,8 +30,9 @@ export interface Session { // Session restore information sessionId?: string - resultBuffer?: TxResult[] + sessionInstanceId?: string closeTimeout?: any + workspaceClosed?: boolean } /**