diff --git a/packages/common/infra/src/modules/workspace/entities/engine.ts b/packages/common/infra/src/modules/workspace/entities/engine.ts index d13ec6356c..39e5d94bc5 100644 --- a/packages/common/infra/src/modules/workspace/entities/engine.ts +++ b/packages/common/infra/src/modules/workspace/entities/engine.ts @@ -70,5 +70,7 @@ export class WorkspaceEngine extends Entity<{ override dispose(): void { this.forceStop(); + this.doc.dispose(); + this.awareness.dispose(); } } diff --git a/packages/common/infra/src/modules/workspace/services/engine.ts b/packages/common/infra/src/modules/workspace/services/engine.ts index 8dcf8ec6dd..93eba3e04a 100644 --- a/packages/common/infra/src/modules/workspace/services/engine.ts +++ b/packages/common/infra/src/modules/workspace/services/engine.ts @@ -16,4 +16,10 @@ export class WorkspaceEngineService extends Service { constructor(private readonly workspaceScope: WorkspaceScope) { super(); } + + override dispose(): void { + this._engine?.dispose(); + this._engine = null; + super.dispose(); + } } diff --git a/packages/common/infra/src/sync/awareness.ts b/packages/common/infra/src/sync/awareness.ts index c1a2d946c5..395de22ea8 100644 --- a/packages/common/infra/src/sync/awareness.ts +++ b/packages/common/infra/src/sync/awareness.ts @@ -3,6 +3,7 @@ import type { Awareness } from 'y-protocols/awareness.js'; export interface AwarenessConnection { connect(awareness: Awareness): void; disconnect(): void; + dispose?(): void; } export class AwarenessEngine { @@ -15,4 +16,8 @@ export class AwarenessEngine { disconnect() { this.connections.forEach(connection => connection.disconnect()); } + + dispose() { + this.connections.forEach(connection => connection.dispose?.()); + } } diff --git a/packages/common/infra/src/sync/doc/index.ts b/packages/common/infra/src/sync/doc/index.ts index 3737d2e7a1..3e1530a2be 100644 --- a/packages/common/infra/src/sync/doc/index.ts +++ b/packages/common/infra/src/sync/doc/index.ts @@ -219,4 +219,9 @@ export class DocEngine { }); }); } + + dispose() { + this.stop(); + this.server?.dispose?.(); + } } diff --git a/packages/common/infra/src/sync/doc/server.ts b/packages/common/infra/src/sync/doc/server.ts index fb73fdf6fd..9fdade3e77 100644 --- a/packages/common/infra/src/sync/doc/server.ts +++ b/packages/common/infra/src/sync/doc/server.ts @@ -23,4 +23,6 @@ export interface DocServer { waitForConnectingServer(signal: AbortSignal): Promise; disconnectServer(): void; onInterrupted(cb: (reason: string) => void): void; + + dispose?(): void; } diff --git a/packages/frontend/core/src/modules/cloud/index.ts b/packages/frontend/core/src/modules/cloud/index.ts index 8360ef699e..b0225dc0fc 100644 --- a/packages/frontend/core/src/modules/cloud/index.ts +++ b/packages/frontend/core/src/modules/cloud/index.ts @@ -59,7 +59,7 @@ export function configureCloudModule(framework: Framework) { framework .service(FetchService) .service(GraphQLService, [FetchService]) - .service(WebSocketService) + .service(WebSocketService, [AuthService]) .service(ServerConfigService) .entity(ServerConfig, [ServerConfigStore]) .store(ServerConfigStore, [GraphQLService]) diff --git a/packages/frontend/core/src/modules/cloud/services/websocket.ts b/packages/frontend/core/src/modules/cloud/services/websocket.ts index 7c9bb1515e..7fa87b5d91 100644 --- a/packages/frontend/core/src/modules/cloud/services/websocket.ts +++ b/packages/frontend/core/src/modules/cloud/services/websocket.ts @@ -1,37 +1,47 @@ -import { OnEvent, Service } from '@toeverything/infra'; -import type { Socket } from 'socket.io-client'; +import { ApplicationStarted, OnEvent, Service } from '@toeverything/infra'; import { Manager } from 'socket.io-client'; import { getAffineCloudBaseUrl } from '../services/fetch'; +import type { AuthService } from './auth'; import { AccountChanged } from './auth'; -@OnEvent(AccountChanged, e => e.reconnect) +@OnEvent(AccountChanged, e => e.update) +@OnEvent(ApplicationStarted, e => e.update) export class WebSocketService extends Service { ioManager: Manager = new Manager(`${getAffineCloudBaseUrl()}/`, { autoConnect: false, transports: ['websocket'], secure: location.protocol === 'https:', }); - sockets: Set = new Set(); + socket = this.ioManager.socket('/'); + refCount = 0; - constructor() { + constructor(private readonly authService: AuthService) { super(); } - newSocket(): Socket { - const socket = this.ioManager.socket('/'); - this.sockets.add(socket); - - return socket; + /** + * Connect socket, with automatic connect and reconnect logic. + * External code should not call `socket.connect()` or `socket.disconnect()` manually. + * When socket is no longer needed, call `dispose()` to clean up resources. + */ + connect() { + this.refCount++; + this.update(); + return { + socket: this.socket, + dispose: () => { + this.refCount--; + this.update(); + }, + }; } - reconnect(): void { - for (const socket of this.sockets) { - socket.disconnect(); - } - - for (const socket of this.sockets) { - socket.connect(); + update(): void { + if (this.authService.session.account$.value && this.refCount > 0) { + this.socket.connect(); + } else { + this.socket.disconnect(); } } } diff --git a/packages/frontend/core/src/modules/userspace/entities/user-db-engine.ts b/packages/frontend/core/src/modules/userspace/entities/user-db-engine.ts index 0023761273..eb927ccd64 100644 --- a/packages/frontend/core/src/modules/userspace/entities/user-db-engine.ts +++ b/packages/frontend/core/src/modules/userspace/entities/user-db-engine.ts @@ -8,10 +8,9 @@ export class UserDBEngine extends Entity<{ userId: string; }> { private readonly userId = this.props.userId; - private readonly socket = this.websocketService.newSocket(); readonly docEngine = new DocEngine( this.userspaceStorageProvider.getDocStorage('affine-cloud:' + this.userId), - new UserDBDocServer(this.userId, this.socket) + new UserDBDocServer(this.userId, this.websocketService) ); canGracefulStop() { @@ -29,6 +28,5 @@ export class UserDBEngine extends Entity<{ override dispose() { this.docEngine.stop(); - this.socket.close(); } } diff --git a/packages/frontend/core/src/modules/userspace/impls/user-db-doc-server.ts b/packages/frontend/core/src/modules/userspace/impls/user-db-doc-server.ts index 18fa89bd75..766cc193ef 100644 --- a/packages/frontend/core/src/modules/userspace/impls/user-db-doc-server.ts +++ b/packages/frontend/core/src/modules/userspace/impls/user-db-doc-server.ts @@ -7,6 +7,7 @@ import { import { type DocServer, throwIfAborted } from '@toeverything/infra'; import type { Socket } from 'socket.io-client'; +import type { WebSocketService } from '../../cloud'; import { base64ToUint8Array, uint8ArrayToBase64, @@ -19,10 +20,17 @@ export class UserDBDocServer implements DocServer { interruptCb: ((reason: string) => void) | null = null; SEND_TIMEOUT = 30000; + socket: Socket; + disposeSocket: () => void; + constructor( private readonly userId: string, - private readonly socket: Socket - ) {} + webSocketService: WebSocketService + ) { + const { socket, dispose } = webSocketService.connect(); + this.socket = socket; + this.disposeSocket = dispose; + } private async clientHandShake() { await this.socket.emitWithAck('space:join', { @@ -154,7 +162,6 @@ export class UserDBDocServer implements DocServer { if (this.socket.connected) { await this.clientHandShake(); } else { - this.socket.connect(); await new Promise((resolve, reject) => { this.socket.on('connect', () => { resolve(); @@ -168,17 +175,12 @@ export class UserDBDocServer implements DocServer { } } disconnectServer(): void { - if (!this.socket) { - return; - } - this.socket.emit('space:leave', { spaceType: 'userspace', spaceId: this.userId, }); this.socket.off('server-version-rejected', this.handleVersionRejected); this.socket.off('disconnect', this.handleDisconnect); - this.socket.disconnect(); } onInterrupted = (cb: (reason: string) => void) => { this.interruptCb = cb; @@ -192,4 +194,9 @@ export class UserDBDocServer implements DocServer { handleVersionRejected = () => { this.interruptCb?.('Client version rejected'); }; + + dispose(): void { + this.disconnectServer(); + this.disposeSocket(); + } } diff --git a/packages/frontend/core/src/modules/workspace-engine/impls/cloud.ts b/packages/frontend/core/src/modules/workspace-engine/impls/cloud.ts index f04ce6c665..e66672aad8 100644 --- a/packages/frontend/core/src/modules/workspace-engine/impls/cloud.ts +++ b/packages/frontend/core/src/modules/workspace-engine/impls/cloud.ts @@ -243,17 +243,11 @@ export class CloudWorkspaceFlavourProviderService getAwarenessConnections: () => { return [ new BroadcastChannelAwarenessConnection(workspaceId), - new CloudAwarenessConnection( - workspaceId, - this.webSocketService.newSocket() - ), + new CloudAwarenessConnection(workspaceId, this.webSocketService), ]; }, getDocServer: () => { - return new CloudDocEngineServer( - workspaceId, - this.webSocketService.newSocket() - ); + return new CloudDocEngineServer(workspaceId, this.webSocketService); }, getDocStorage: () => { return this.storageProvider.getDocStorage(workspaceId); diff --git a/packages/frontend/core/src/modules/workspace-engine/impls/engine/awareness-cloud.ts b/packages/frontend/core/src/modules/workspace-engine/impls/engine/awareness-cloud.ts index 90de4599ef..824ad84a34 100644 --- a/packages/frontend/core/src/modules/workspace-engine/impls/engine/awareness-cloud.ts +++ b/packages/frontend/core/src/modules/workspace-engine/impls/engine/awareness-cloud.ts @@ -1,3 +1,4 @@ +import type { WebSocketService } from '@affine/core/modules/cloud'; import { DebugLogger } from '@affine/debug'; import type { AwarenessConnection } from '@toeverything/infra'; import type { Socket } from 'socket.io-client'; @@ -17,10 +18,17 @@ type AwarenessChanges = Record<'added' | 'updated' | 'removed', number[]>; export class CloudAwarenessConnection implements AwarenessConnection { awareness: Awareness | null = null; + socket: Socket; + disposeSocket: () => void; + constructor( private readonly workspaceId: string, - private readonly socket: Socket - ) {} + webSocketService: WebSocketService + ) { + const { socket, dispose } = webSocketService.connect(); + this.socket = socket; + this.disposeSocket = dispose; + } connect(awareness: Awareness): void { this.socket.on('space:broadcast-awareness-update', this.awarenessBroadcast); @@ -38,8 +46,6 @@ export class CloudAwarenessConnection implements AwarenessConnection { if (this.socket.connected) { this.handleConnect(); - } else { - this.socket.connect(); } } @@ -181,7 +187,10 @@ export class CloudAwarenessConnection implements AwarenessConnection { handleReject = () => { this.socket.off('server-version-rejected', this.handleReject); - this.disconnect(); - this.socket.disconnect(); }; + + dispose() { + this.disconnect(); + this.disposeSocket(); + } } diff --git a/packages/frontend/core/src/modules/workspace-engine/impls/engine/doc-cloud.ts b/packages/frontend/core/src/modules/workspace-engine/impls/engine/doc-cloud.ts index 0dadc2f847..5773164db8 100644 --- a/packages/frontend/core/src/modules/workspace-engine/impls/engine/doc-cloud.ts +++ b/packages/frontend/core/src/modules/workspace-engine/impls/engine/doc-cloud.ts @@ -1,3 +1,4 @@ +import type { WebSocketService } from '@affine/core/modules/cloud'; import { DebugLogger } from '@affine/debug'; import { ErrorNames, @@ -20,10 +21,17 @@ export class CloudDocEngineServer implements DocServer { interruptCb: ((reason: string) => void) | null = null; SEND_TIMEOUT = 30000; + socket: Socket; + disposeSocket: () => void; + constructor( private readonly workspaceId: string, - private readonly socket: Socket - ) {} + webSocketService: WebSocketService + ) { + const { socket, dispose } = webSocketService.connect(); + this.socket = socket; + this.disposeSocket = dispose; + } private async clientHandShake() { await this.socket.emitWithAck('space:join', { @@ -169,17 +177,12 @@ export class CloudDocEngineServer implements DocServer { } } disconnectServer(): void { - if (!this.socket) { - return; - } - this.socket.emit('space:leave', { spaceType: 'workspace', spaceId: this.workspaceId, }); this.socket.off('server-version-rejected', this.handleVersionRejected); this.socket.off('disconnect', this.handleDisconnect); - this.socket.disconnect(); } onInterrupted = (cb: (reason: string) => void) => { this.interruptCb = cb; @@ -193,4 +196,9 @@ export class CloudDocEngineServer implements DocServer { handleVersionRejected = () => { this.interruptCb?.('Client version rejected'); }; + + dispose(): void { + this.disconnectServer(); + this.disposeSocket(); + } }