fix(core): disconnect ws when user logout (#8188)

This commit is contained in:
EYHN 2024-09-11 07:55:42 +00:00
parent d93c3b3719
commit 85aa73bcf6
No known key found for this signature in database
GPG Key ID: 46C9E26A75AB276C
12 changed files with 96 additions and 50 deletions

View File

@ -70,5 +70,7 @@ export class WorkspaceEngine extends Entity<{
override dispose(): void {
this.forceStop();
this.doc.dispose();
this.awareness.dispose();
}
}

View File

@ -16,4 +16,10 @@ export class WorkspaceEngineService extends Service {
constructor(private readonly workspaceScope: WorkspaceScope) {
super();
}
override dispose(): void {
this._engine?.dispose();
this._engine = null;
super.dispose();
}
}

View File

@ -3,6 +3,7 @@ import type { Awareness } from 'y-protocols/awareness.js';
export interface AwarenessConnection {
connect(awareness: Awareness): void;
disconnect(): void;
dispose?(): void;
}
export class AwarenessEngine {
@ -15,4 +16,8 @@ export class AwarenessEngine {
disconnect() {
this.connections.forEach(connection => connection.disconnect());
}
dispose() {
this.connections.forEach(connection => connection.dispose?.());
}
}

View File

@ -219,4 +219,9 @@ export class DocEngine {
});
});
}
dispose() {
this.stop();
this.server?.dispose?.();
}
}

View File

@ -23,4 +23,6 @@ export interface DocServer {
waitForConnectingServer(signal: AbortSignal): Promise<void>;
disconnectServer(): void;
onInterrupted(cb: (reason: string) => void): void;
dispose?(): void;
}

View File

@ -59,7 +59,7 @@ export function configureCloudModule(framework: Framework) {
framework
.service(FetchService)
.service(GraphQLService, [FetchService])
.service(WebSocketService)
.service(WebSocketService, [AuthService])
.service(ServerConfigService)
.entity(ServerConfig, [ServerConfigStore])
.store(ServerConfigStore, [GraphQLService])

View File

@ -1,37 +1,47 @@
import { OnEvent, Service } from '@toeverything/infra';
import type { Socket } from 'socket.io-client';
import { ApplicationStarted, OnEvent, Service } from '@toeverything/infra';
import { Manager } from 'socket.io-client';
import { getAffineCloudBaseUrl } from '../services/fetch';
import type { AuthService } from './auth';
import { AccountChanged } from './auth';
@OnEvent(AccountChanged, e => e.reconnect)
@OnEvent(AccountChanged, e => e.update)
@OnEvent(ApplicationStarted, e => e.update)
export class WebSocketService extends Service {
ioManager: Manager = new Manager(`${getAffineCloudBaseUrl()}/`, {
autoConnect: false,
transports: ['websocket'],
secure: location.protocol === 'https:',
});
sockets: Set<Socket> = new Set();
socket = this.ioManager.socket('/');
refCount = 0;
constructor() {
constructor(private readonly authService: AuthService) {
super();
}
newSocket(): Socket {
const socket = this.ioManager.socket('/');
this.sockets.add(socket);
return socket;
/**
* Connect socket, with automatic connect and reconnect logic.
* External code should not call `socket.connect()` or `socket.disconnect()` manually.
* When socket is no longer needed, call `dispose()` to clean up resources.
*/
connect() {
this.refCount++;
this.update();
return {
socket: this.socket,
dispose: () => {
this.refCount--;
this.update();
},
};
}
reconnect(): void {
for (const socket of this.sockets) {
socket.disconnect();
}
for (const socket of this.sockets) {
socket.connect();
update(): void {
if (this.authService.session.account$.value && this.refCount > 0) {
this.socket.connect();
} else {
this.socket.disconnect();
}
}
}

View File

@ -8,10 +8,9 @@ export class UserDBEngine extends Entity<{
userId: string;
}> {
private readonly userId = this.props.userId;
private readonly socket = this.websocketService.newSocket();
readonly docEngine = new DocEngine(
this.userspaceStorageProvider.getDocStorage('affine-cloud:' + this.userId),
new UserDBDocServer(this.userId, this.socket)
new UserDBDocServer(this.userId, this.websocketService)
);
canGracefulStop() {
@ -29,6 +28,5 @@ export class UserDBEngine extends Entity<{
override dispose() {
this.docEngine.stop();
this.socket.close();
}
}

View File

@ -7,6 +7,7 @@ import {
import { type DocServer, throwIfAborted } from '@toeverything/infra';
import type { Socket } from 'socket.io-client';
import type { WebSocketService } from '../../cloud';
import {
base64ToUint8Array,
uint8ArrayToBase64,
@ -19,10 +20,17 @@ export class UserDBDocServer implements DocServer {
interruptCb: ((reason: string) => void) | null = null;
SEND_TIMEOUT = 30000;
socket: Socket;
disposeSocket: () => void;
constructor(
private readonly userId: string,
private readonly socket: Socket
) {}
webSocketService: WebSocketService
) {
const { socket, dispose } = webSocketService.connect();
this.socket = socket;
this.disposeSocket = dispose;
}
private async clientHandShake() {
await this.socket.emitWithAck('space:join', {
@ -154,7 +162,6 @@ export class UserDBDocServer implements DocServer {
if (this.socket.connected) {
await this.clientHandShake();
} else {
this.socket.connect();
await new Promise<void>((resolve, reject) => {
this.socket.on('connect', () => {
resolve();
@ -168,17 +175,12 @@ export class UserDBDocServer implements DocServer {
}
}
disconnectServer(): void {
if (!this.socket) {
return;
}
this.socket.emit('space:leave', {
spaceType: 'userspace',
spaceId: this.userId,
});
this.socket.off('server-version-rejected', this.handleVersionRejected);
this.socket.off('disconnect', this.handleDisconnect);
this.socket.disconnect();
}
onInterrupted = (cb: (reason: string) => void) => {
this.interruptCb = cb;
@ -192,4 +194,9 @@ export class UserDBDocServer implements DocServer {
handleVersionRejected = () => {
this.interruptCb?.('Client version rejected');
};
dispose(): void {
this.disconnectServer();
this.disposeSocket();
}
}

View File

@ -243,17 +243,11 @@ export class CloudWorkspaceFlavourProviderService
getAwarenessConnections: () => {
return [
new BroadcastChannelAwarenessConnection(workspaceId),
new CloudAwarenessConnection(
workspaceId,
this.webSocketService.newSocket()
),
new CloudAwarenessConnection(workspaceId, this.webSocketService),
];
},
getDocServer: () => {
return new CloudDocEngineServer(
workspaceId,
this.webSocketService.newSocket()
);
return new CloudDocEngineServer(workspaceId, this.webSocketService);
},
getDocStorage: () => {
return this.storageProvider.getDocStorage(workspaceId);

View File

@ -1,3 +1,4 @@
import type { WebSocketService } from '@affine/core/modules/cloud';
import { DebugLogger } from '@affine/debug';
import type { AwarenessConnection } from '@toeverything/infra';
import type { Socket } from 'socket.io-client';
@ -17,10 +18,17 @@ type AwarenessChanges = Record<'added' | 'updated' | 'removed', number[]>;
export class CloudAwarenessConnection implements AwarenessConnection {
awareness: Awareness | null = null;
socket: Socket;
disposeSocket: () => void;
constructor(
private readonly workspaceId: string,
private readonly socket: Socket
) {}
webSocketService: WebSocketService
) {
const { socket, dispose } = webSocketService.connect();
this.socket = socket;
this.disposeSocket = dispose;
}
connect(awareness: Awareness): void {
this.socket.on('space:broadcast-awareness-update', this.awarenessBroadcast);
@ -38,8 +46,6 @@ export class CloudAwarenessConnection implements AwarenessConnection {
if (this.socket.connected) {
this.handleConnect();
} else {
this.socket.connect();
}
}
@ -181,7 +187,10 @@ export class CloudAwarenessConnection implements AwarenessConnection {
handleReject = () => {
this.socket.off('server-version-rejected', this.handleReject);
this.disconnect();
this.socket.disconnect();
};
dispose() {
this.disconnect();
this.disposeSocket();
}
}

View File

@ -1,3 +1,4 @@
import type { WebSocketService } from '@affine/core/modules/cloud';
import { DebugLogger } from '@affine/debug';
import {
ErrorNames,
@ -20,10 +21,17 @@ export class CloudDocEngineServer implements DocServer {
interruptCb: ((reason: string) => void) | null = null;
SEND_TIMEOUT = 30000;
socket: Socket;
disposeSocket: () => void;
constructor(
private readonly workspaceId: string,
private readonly socket: Socket
) {}
webSocketService: WebSocketService
) {
const { socket, dispose } = webSocketService.connect();
this.socket = socket;
this.disposeSocket = dispose;
}
private async clientHandShake() {
await this.socket.emitWithAck('space:join', {
@ -169,17 +177,12 @@ export class CloudDocEngineServer implements DocServer {
}
}
disconnectServer(): void {
if (!this.socket) {
return;
}
this.socket.emit('space:leave', {
spaceType: 'workspace',
spaceId: this.workspaceId,
});
this.socket.off('server-version-rejected', this.handleVersionRejected);
this.socket.off('disconnect', this.handleDisconnect);
this.socket.disconnect();
}
onInterrupted = (cb: (reason: string) => void) => {
this.interruptCb = cb;
@ -193,4 +196,9 @@ export class CloudDocEngineServer implements DocServer {
handleVersionRejected = () => {
this.interruptCb?.('Client version rejected');
};
dispose(): void {
this.disconnectServer();
this.disposeSocket();
}
}