feat(server): independent websocket room for block and awareness (#5285)

This commit is contained in:
EYHN 2023-12-13 10:31:06 +00:00
parent 77a5552dcd
commit 8aac1e09e2
No known key found for this signature in database
GPG Key ID: 46C9E26A75AB276C

View File

@ -114,8 +114,8 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
}
@Auth()
@SubscribeMessage('client-handshake')
async handleClientHandShake(
@SubscribeMessage('client-handshake-sync')
async handleClientHandshakeSync(
@CurrentUser() user: UserType,
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
@ -127,7 +127,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
);
if (canWrite) {
await client.join(workspaceId);
await client.join(`${workspaceId}:sync`);
return {
data: {
clientId: client.id,
@ -140,13 +140,71 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
}
}
@SubscribeMessage('client-leave')
async handleClientLeave(
@Auth()
@SubscribeMessage('client-handshake-awareness')
async handleClientHandshakeAwareness(
@CurrentUser() user: UserType,
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ clientId: string }>> {
const canWrite = await this.permissions.tryCheckWorkspace(
workspaceId,
user.id,
Permission.Write
);
if (canWrite) {
await client.join(`${workspaceId}:awareness`);
return {
data: {
clientId: client.id,
},
};
} else {
return {
error: new AccessDeniedError(workspaceId),
};
}
}
/**
* @deprecated use `client-handshake-sync` and `client-handshake-awareness` instead
*/
@Auth()
@SubscribeMessage('client-handshake')
async handleClientHandShake(
@CurrentUser() user: UserType,
@MessageBody()
workspaceId: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ clientId: string }>> {
const canWrite = await this.permissions.tryCheckWorkspace(
workspaceId,
user.id,
Permission.Write
);
if (canWrite) {
await client.join([`${workspaceId}:sync`, `${workspaceId}:awareness`]);
return {
data: {
clientId: client.id,
},
};
} else {
return {
error: new AccessDeniedError(workspaceId),
};
}
}
@SubscribeMessage('client-leave-sync')
async handleLeaveSync(
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse> {
if (client.rooms.has(workspaceId)) {
await client.leave(workspaceId);
if (client.rooms.has(`${workspaceId}:sync`)) {
await client.leave(`${workspaceId}:sync`);
return {};
} else {
return {
@ -155,6 +213,38 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
}
}
@SubscribeMessage('client-leave-awareness')
async handleLeaveAwareness(
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse> {
if (client.rooms.has(`${workspaceId}:awareness`)) {
await client.leave(`${workspaceId}:awareness`);
return {};
} else {
return {
error: new NotInWorkspaceError(workspaceId),
};
}
}
/**
* @deprecated use `client-leave-sync` and `client-leave-awareness` instead
*/
@SubscribeMessage('client-leave')
async handleClientLeave(
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse> {
if (client.rooms.has(`${workspaceId}:sync`)) {
await client.leave(`${workspaceId}:sync`);
}
if (client.rooms.has(`${workspaceId}:awareness`)) {
await client.leave(`${workspaceId}:awareness`);
}
return {};
}
/**
* This is the old version of the `client-update` event without any data protocol.
* It only exists for backwards compatibility to adapt older clients.
@ -175,7 +265,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
},
@ConnectedSocket() client: Socket
) {
if (!client.rooms.has(workspaceId)) {
if (!client.rooms.has(`${workspaceId}:sync`)) {
this.logger.verbose(
`Client ${client.id} tried to push update to workspace ${workspaceId} without joining it first`
);
@ -185,12 +275,12 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
const docId = new DocID(guid, workspaceId);
client
.to(docId.workspace)
.to(`${docId.workspace}:sync`)
.emit('server-update', { workspaceId, guid, update });
// broadcast to all clients with newer version that only listen to `server-updates`
client
.to(docId.workspace)
.to(`${docId.workspace}:sync`)
.emit('server-updates', { workspaceId, guid, updates: [update] });
const buf = Buffer.from(update, 'base64');
@ -219,7 +309,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
stateVector?: string;
}
): Promise<{ missing: string; state?: string } | false> {
if (!client.rooms.has(workspaceId)) {
if (!client.rooms.has(`${workspaceId}:sync`)) {
const canRead = await this.permissions.tryCheckWorkspace(
workspaceId,
user.id
@ -264,7 +354,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
},
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ accepted: true }>> {
if (!client.rooms.has(workspaceId)) {
if (!client.rooms.has(`${workspaceId}:sync`)) {
return {
error: new NotInWorkspaceError(workspaceId),
};
@ -272,7 +362,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
const docId = new DocID(guid, workspaceId);
client
.to(docId.workspace)
.to(`${docId.workspace}:sync`)
.emit('server-updates', { workspaceId, guid, updates });
const buffers = updates.map(update => Buffer.from(update, 'base64'));
@ -301,7 +391,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
stateVector?: string;
}
): Promise<EventResponse<{ missing: string; state?: string }>> {
if (!client.rooms.has(workspaceId)) {
if (!client.rooms.has(`${workspaceId}:sync`)) {
const canRead = await this.permissions.tryCheckWorkspace(
workspaceId,
user.id
@ -343,8 +433,8 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ clientId: string }>> {
if (client.rooms.has(workspaceId)) {
client.to(workspaceId).emit('new-client-awareness-init');
if (client.rooms.has(`${workspaceId}:awareness`)) {
client.to(`${workspaceId}:awareness`).emit('new-client-awareness-init');
return {
data: {
clientId: client.id,
@ -362,9 +452,9 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
@MessageBody() message: { workspaceId: string; awarenessUpdate: string },
@ConnectedSocket() client: Socket
): Promise<EventResponse> {
if (client.rooms.has(message.workspaceId)) {
if (client.rooms.has(`${message.workspaceId}:awareness`)) {
client
.to(message.workspaceId)
.to(`${message.workspaceId}:awareness`)
.emit('server-awareness-broadcast', message);
return {};
} else {