From 05e99eb67ed804a54a421fa1a447df869d3324b5 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 14 Dec 2022 15:55:56 +0100 Subject: [PATCH] Introduce an epoch to `ConnectionId` and `PeerId` --- crates/call/src/room.rs | 18 +- crates/client/src/client.rs | 18 +- crates/client/src/test.rs | 4 +- .../20221109000000_test_schema.sql | 8 +- ...4346_change_epoch_from_uuid_to_integer.sql | 9 + crates/collab/src/db.rs | 357 +++++++++++++----- crates/collab/src/db/project.rs | 2 +- crates/collab/src/db/project_collaborator.rs | 2 +- crates/collab/src/db/room_participant.rs | 4 +- crates/collab/src/db/tests.rs | 26 +- crates/collab/src/integration_tests.rs | 10 +- crates/collab/src/rpc.rs | 135 ++++--- crates/collab_ui/src/collab_titlebar_item.rs | 36 +- crates/collab_ui/src/contact_list.rs | 141 +++---- crates/project/src/lsp_command.rs | 2 +- crates/project/src/project.rs | 38 +- crates/rpc/proto/zed.proto | 29 +- crates/rpc/src/macros.rs | 7 +- crates/rpc/src/peer.rs | 132 ++++--- crates/rpc/src/proto.rs | 93 ++++- crates/rpc/src/rpc.rs | 2 +- crates/workspace/src/item.rs | 4 +- crates/workspace/src/shared_screen.rs | 2 +- crates/workspace/src/workspace.rs | 17 +- 24 files changed, 714 insertions(+), 382 deletions(-) create mode 100644 crates/collab/migrations/20221214144346_change_epoch_from_uuid_to_integer.sql diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index b26a8fcbfe..1d22fe50f1 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -3,7 +3,7 @@ use crate::{ IncomingCall, }; use anyhow::{anyhow, Result}; -use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore}; +use client::{proto, Client, TypedEnvelope, User, UserStore}; use collections::{BTreeMap, HashSet}; use futures::{FutureExt, StreamExt}; use gpui::{ @@ -20,10 +20,10 @@ pub const RECONNECT_TIMEOUT: Duration = client::RECEIVE_TIMEOUT; #[derive(Clone, Debug, PartialEq, Eq)] pub enum Event { ParticipantLocationChanged { - participant_id: PeerId, + participant_id: proto::PeerId, }, RemoteVideoTracksChanged { - participant_id: PeerId, + participant_id: proto::PeerId, }, RemoteProjectShared { owner: Arc, @@ -41,7 +41,7 @@ pub struct Room { live_kit: Option, status: RoomStatus, local_participant: LocalParticipant, - remote_participants: BTreeMap, + remote_participants: BTreeMap, pending_participants: Vec>, participant_user_ids: HashSet, pending_call_count: usize, @@ -349,7 +349,7 @@ impl Room { &self.local_participant } - pub fn remote_participants(&self) -> &BTreeMap { + pub fn remote_participants(&self) -> &BTreeMap { &self.remote_participants } @@ -419,7 +419,7 @@ impl Room { if let Some(participants) = remote_participants.log_err() { let mut participant_peer_ids = HashSet::default(); for (participant, user) in room.participants.into_iter().zip(participants) { - let peer_id = PeerId(participant.peer_id); + let Some(peer_id) = participant.peer_id else { continue }; this.participant_user_ids.insert(participant.user_id); participant_peer_ids.insert(peer_id); @@ -476,7 +476,7 @@ impl Room { if let Some(live_kit) = this.live_kit.as_ref() { let tracks = - live_kit.room.remote_video_tracks(&peer_id.0.to_string()); + live_kit.room.remote_video_tracks(&peer_id.to_string()); for track in tracks { this.remote_video_track_updated( RemoteVideoTrackUpdate::Subscribed(track), @@ -531,7 +531,7 @@ impl Room { ) -> Result<()> { match change { RemoteVideoTrackUpdate::Subscribed(track) => { - let peer_id = PeerId(track.publisher_id().parse()?); + let peer_id = track.publisher_id().parse()?; let track_id = track.sid().to_string(); let participant = self .remote_participants @@ -551,7 +551,7 @@ impl Room { publisher_id, track_id, } => { - let peer_id = PeerId(publisher_id.parse()?); + let peer_id = publisher_id.parse()?; let participant = self .remote_participants .get_mut(&peer_id) diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 5e10f9ea8f..876b83b11a 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -23,7 +23,7 @@ use lazy_static::lazy_static; use parking_lot::RwLock; use postage::watch; use rand::prelude::*; -use rpc::proto::{AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage}; +use rpc::proto::{AnyTypedEnvelope, EntityMessage, EnvelopedMessage, PeerId, RequestMessage}; use serde::Deserialize; use std::{ any::TypeId, @@ -140,7 +140,7 @@ impl EstablishConnectionError { } } -#[derive(Copy, Clone, Debug, Eq, PartialEq)] +#[derive(Copy, Clone, Debug, PartialEq)] pub enum Status { SignedOut, UpgradeRequired, @@ -306,7 +306,7 @@ impl Client { pub fn new(http: Arc, cx: &AppContext) -> Arc { Arc::new(Self { id: 0, - peer: Peer::new(), + peer: Peer::new(0), telemetry: Telemetry::new(http.clone(), cx), http, state: Default::default(), @@ -810,7 +810,11 @@ impl Client { hello_message_type_name ) })?; - Ok(PeerId(hello.payload.peer_id)) + let peer_id = hello + .payload + .peer_id + .ok_or_else(|| anyhow!("invalid peer id"))?; + Ok(peer_id) }; let peer_id = match peer_id.await { @@ -822,7 +826,7 @@ impl Client { }; log::info!( - "set status to connected (connection id: {}, peer id: {})", + "set status to connected (connection id: {:?}, peer id: {:?})", connection_id, peer_id ); @@ -853,7 +857,7 @@ impl Client { .spawn(async move { match handle_io.await { Ok(()) => { - if *this.status().borrow() + if this.status().borrow().clone() == (Status::Connected { connection_id, peer_id, @@ -1194,7 +1198,7 @@ impl Client { let mut state = self.state.write(); let type_name = message.payload_type_name(); let payload_type_id = message.payload_type_id(); - let sender_id = message.original_sender_id().map(|id| id.0); + let sender_id = message.original_sender_id(); let mut subscriber = None; diff --git a/crates/client/src/test.rs b/crates/client/src/test.rs index 3cfba3b184..db9e0d8c48 100644 --- a/crates/client/src/test.rs +++ b/crates/client/src/test.rs @@ -35,7 +35,7 @@ impl FakeServer { cx: &TestAppContext, ) -> Self { let server = Self { - peer: Peer::new(), + peer: Peer::new(0), state: Default::default(), user_id: client_user_id, executor: cx.foreground(), @@ -92,7 +92,7 @@ impl FakeServer { peer.send( connection_id, proto::Hello { - peer_id: connection_id.0, + peer_id: Some(connection_id.into()), }, ) .unwrap(); diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index c0cc5b3457..b96b07d977 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -44,7 +44,7 @@ CREATE TABLE "projects" ( "room_id" INTEGER REFERENCES rooms (id) NOT NULL, "host_user_id" INTEGER REFERENCES users (id) NOT NULL, "host_connection_id" INTEGER NOT NULL, - "host_connection_epoch" TEXT NOT NULL, + "host_connection_epoch" INTEGER NOT NULL, "unregistered" BOOLEAN NOT NULL DEFAULT FALSE ); CREATE INDEX "index_projects_on_host_connection_epoch" ON "projects" ("host_connection_epoch"); @@ -103,7 +103,7 @@ CREATE TABLE "project_collaborators" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE, "connection_id" INTEGER NOT NULL, - "connection_epoch" TEXT NOT NULL, + "connection_epoch" INTEGER NOT NULL, "user_id" INTEGER NOT NULL, "replica_id" INTEGER NOT NULL, "is_host" BOOLEAN NOT NULL @@ -119,14 +119,14 @@ CREATE TABLE "room_participants" ( "room_id" INTEGER NOT NULL REFERENCES rooms (id), "user_id" INTEGER NOT NULL REFERENCES users (id), "answering_connection_id" INTEGER, - "answering_connection_epoch" TEXT, + "answering_connection_epoch" INTEGER, "answering_connection_lost" BOOLEAN NOT NULL, "location_kind" INTEGER, "location_project_id" INTEGER, "initial_project_id" INTEGER, "calling_user_id" INTEGER NOT NULL REFERENCES users (id), "calling_connection_id" INTEGER NOT NULL, - "calling_connection_epoch" TEXT NOT NULL + "calling_connection_epoch" INTEGER NOT NULL ); CREATE UNIQUE INDEX "index_room_participants_on_user_id" ON "room_participants" ("user_id"); CREATE INDEX "index_room_participants_on_room_id" ON "room_participants" ("room_id"); diff --git a/crates/collab/migrations/20221214144346_change_epoch_from_uuid_to_integer.sql b/crates/collab/migrations/20221214144346_change_epoch_from_uuid_to_integer.sql new file mode 100644 index 0000000000..ed294502a5 --- /dev/null +++ b/crates/collab/migrations/20221214144346_change_epoch_from_uuid_to_integer.sql @@ -0,0 +1,9 @@ +ALTER TABLE "projects" + ALTER COLUMN "host_connection_epoch" TYPE INTEGER USING 0; + +ALTER TABLE "project_collaborators" + ALTER COLUMN "connection_epoch" TYPE INTEGER USING 0; + +ALTER TABLE "room_participants" + ALTER COLUMN "answering_connection_epoch" TYPE INTEGER USING 0, + ALTER COLUMN "calling_connection_epoch" TYPE INTEGER USING 0; diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 892ea4ccd6..6237cc4f79 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -1076,7 +1076,7 @@ impl Database { pub async fn create_room( &self, user_id: UserId, - connection_id: ConnectionId, + connection: ConnectionId, live_kit_room: &str, ) -> Result> { self.room_transaction(|tx| async move { @@ -1091,12 +1091,12 @@ impl Database { room_participant::ActiveModel { room_id: ActiveValue::set(room_id), user_id: ActiveValue::set(user_id), - answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)), - answering_connection_epoch: ActiveValue::set(Some(self.epoch())), + answering_connection_id: ActiveValue::set(Some(connection.id as i32)), + answering_connection_epoch: ActiveValue::set(Some(connection.epoch as i32)), answering_connection_lost: ActiveValue::set(false), calling_user_id: ActiveValue::set(user_id), - calling_connection_id: ActiveValue::set(connection_id.0 as i32), - calling_connection_epoch: ActiveValue::set(self.epoch()), + calling_connection_id: ActiveValue::set(connection.id as i32), + calling_connection_epoch: ActiveValue::set(connection.epoch as i32), ..Default::default() } .insert(&*tx) @@ -1112,7 +1112,7 @@ impl Database { &self, room_id: RoomId, calling_user_id: UserId, - calling_connection_id: ConnectionId, + calling_connection: ConnectionId, called_user_id: UserId, initial_project_id: Option, ) -> Result> { @@ -1122,8 +1122,8 @@ impl Database { user_id: ActiveValue::set(called_user_id), answering_connection_lost: ActiveValue::set(false), calling_user_id: ActiveValue::set(calling_user_id), - calling_connection_id: ActiveValue::set(calling_connection_id.0 as i32), - calling_connection_epoch: ActiveValue::set(self.epoch()), + calling_connection_id: ActiveValue::set(calling_connection.id as i32), + calling_connection_epoch: ActiveValue::set(calling_connection.epoch as i32), initial_project_id: ActiveValue::set(initial_project_id), ..Default::default() } @@ -1192,19 +1192,23 @@ impl Database { pub async fn cancel_call( &self, expected_room_id: Option, - calling_connection_id: ConnectionId, + calling_connection: ConnectionId, called_user_id: UserId, ) -> Result> { self.room_transaction(|tx| async move { let participant = room_participant::Entity::find() .filter( - room_participant::Column::UserId - .eq(called_user_id) - .and( + Condition::all() + .add(room_participant::Column::UserId.eq(called_user_id)) + .add( room_participant::Column::CallingConnectionId - .eq(calling_connection_id.0 as i32), + .eq(calling_connection.id as i32), ) - .and(room_participant::Column::AnsweringConnectionId.is_null()), + .add( + room_participant::Column::CallingConnectionEpoch + .eq(calling_connection.epoch as i32), + ) + .add(room_participant::Column::AnsweringConnectionId.is_null()), ) .one(&*tx) .await? @@ -1228,7 +1232,7 @@ impl Database { &self, room_id: RoomId, user_id: UserId, - connection_id: ConnectionId, + connection: ConnectionId, ) -> Result> { self.room_transaction(|tx| async move { let result = room_participant::Entity::update_many() @@ -1242,13 +1246,13 @@ impl Database { .add(room_participant::Column::AnsweringConnectionLost.eq(true)) .add( room_participant::Column::AnsweringConnectionEpoch - .ne(self.epoch()), + .ne(connection.epoch as i32), ), ), ) .set(room_participant::ActiveModel { - answering_connection_id: ActiveValue::set(Some(connection_id.0 as i32)), - answering_connection_epoch: ActiveValue::set(Some(self.epoch())), + answering_connection_id: ActiveValue::set(Some(connection.id as i32)), + answering_connection_epoch: ActiveValue::set(Some(connection.epoch as i32)), answering_connection_lost: ActiveValue::set(false), ..Default::default() }) @@ -1264,10 +1268,20 @@ impl Database { .await } - pub async fn leave_room(&self, connection_id: ConnectionId) -> Result> { + pub async fn leave_room(&self, connection: ConnectionId) -> Result> { self.room_transaction(|tx| async move { let leaving_participant = room_participant::Entity::find() - .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add( + room_participant::Column::AnsweringConnectionId + .eq(connection.id as i32), + ) + .add( + room_participant::Column::AnsweringConnectionEpoch + .eq(connection.epoch as i32), + ), + ) .one(&*tx) .await?; @@ -1281,9 +1295,16 @@ impl Database { // Cancel pending calls initiated by the leaving user. let called_participants = room_participant::Entity::find() .filter( - room_participant::Column::CallingConnectionId - .eq(connection_id.0) - .and(room_participant::Column::AnsweringConnectionId.is_null()), + Condition::all() + .add( + room_participant::Column::CallingConnectionId + .eq(connection.id as i32), + ) + .add( + room_participant::Column::CallingConnectionEpoch + .eq(connection.epoch as i32), + ) + .add(room_participant::Column::AnsweringConnectionId.is_null()), ) .all(&*tx) .await?; @@ -1310,7 +1331,16 @@ impl Database { project_collaborator::Column::ProjectId, QueryProjectIds::ProjectId, ) - .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add( + project_collaborator::Column::ConnectionId.eq(connection.id as i32), + ) + .add( + project_collaborator::Column::ConnectionEpoch + .eq(connection.epoch as i32), + ), + ) .into_values::<_, QueryProjectIds>() .all(&*tx) .await?; @@ -1331,32 +1361,43 @@ impl Database { host_connection_id: Default::default(), }); - let collaborator_connection_id = - ConnectionId(collaborator.connection_id as u32); - if collaborator_connection_id != connection_id { + let collaborator_connection_id = ConnectionId { + epoch: collaborator.connection_epoch as u32, + id: collaborator.connection_id as u32, + }; + if collaborator_connection_id != connection { left_project.connection_ids.push(collaborator_connection_id); } if collaborator.is_host { left_project.host_user_id = collaborator.user_id; - left_project.host_connection_id = - ConnectionId(collaborator.connection_id as u32); + left_project.host_connection_id = collaborator_connection_id; } } drop(collaborators); // Leave projects. project_collaborator::Entity::delete_many() - .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add( + project_collaborator::Column::ConnectionId.eq(connection.id as i32), + ) + .add( + project_collaborator::Column::ConnectionEpoch + .eq(connection.epoch as i32), + ), + ) .exec(&*tx) .await?; // Unshare projects. project::Entity::delete_many() .filter( - project::Column::RoomId - .eq(room_id) - .and(project::Column::HostConnectionId.eq(connection_id.0 as i32)), + Condition::all() + .add(project::Column::RoomId.eq(room_id)) + .add(project::Column::HostConnectionId.eq(connection.id as i32)) + .add(project::Column::HostConnectionEpoch.eq(connection.epoch as i32)), ) .exec(&*tx) .await?; @@ -1387,7 +1428,7 @@ impl Database { pub async fn update_room_participant_location( &self, room_id: RoomId, - connection_id: ConnectionId, + connection: ConnectionId, location: proto::ParticipantLocation, ) -> Result> { self.room_transaction(|tx| async { @@ -1414,9 +1455,18 @@ impl Database { } let result = room_participant::Entity::update_many() - .filter(room_participant::Column::RoomId.eq(room_id).and( - room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32), - )) + .filter( + Condition::all() + .add(room_participant::Column::RoomId.eq(room_id)) + .add( + room_participant::Column::AnsweringConnectionId + .eq(connection.id as i32), + ) + .add( + room_participant::Column::AnsweringConnectionEpoch + .eq(connection.epoch as i32), + ), + ) .set(room_participant::ActiveModel { location_kind: ActiveValue::set(Some(location_kind)), location_project_id: ActiveValue::set(location_project_id), @@ -1437,11 +1487,21 @@ impl Database { pub async fn connection_lost( &self, - connection_id: ConnectionId, + connection: ConnectionId, ) -> Result>> { self.room_transaction(|tx| async move { let participant = room_participant::Entity::find() - .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add( + room_participant::Column::AnsweringConnectionId + .eq(connection.id as i32), + ) + .add( + room_participant::Column::AnsweringConnectionEpoch + .eq(connection.epoch as i32), + ), + ) .one(&*tx) .await? .ok_or_else(|| anyhow!("not a participant in any room"))?; @@ -1456,11 +1516,25 @@ impl Database { let collaborator_on_projects = project_collaborator::Entity::find() .find_also_related(project::Entity) - .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32)) + .add( + project_collaborator::Column::ConnectionEpoch + .eq(connection.epoch as i32), + ), + ) .all(&*tx) .await?; project_collaborator::Entity::delete_many() - .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32)) + .add( + project_collaborator::Column::ConnectionEpoch + .eq(connection.epoch as i32), + ), + ) .exec(&*tx) .await?; @@ -1473,20 +1547,30 @@ impl Database { .await?; let connection_ids = collaborators .into_iter() - .map(|collaborator| ConnectionId(collaborator.connection_id as u32)) + .map(|collaborator| ConnectionId { + id: collaborator.connection_id as u32, + epoch: collaborator.connection_epoch as u32, + }) .collect(); left_projects.push(LeftProject { id: project.id, host_user_id: project.host_user_id, - host_connection_id: ConnectionId(project.host_connection_id as u32), + host_connection_id: ConnectionId { + id: project.host_connection_id as u32, + epoch: project.host_connection_epoch as u32, + }, connection_ids, }); } } project::Entity::delete_many() - .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add(project::Column::HostConnectionId.eq(connection.id as i32)) + .add(project::Column::HostConnectionEpoch.eq(connection.epoch as i32)), + ) .exec(&*tx) .await?; @@ -1537,7 +1621,10 @@ impl Database { let mut pending_participants = Vec::new(); while let Some(db_participant) = db_participants.next().await { let db_participant = db_participant?; - if let Some(answering_connection_id) = db_participant.answering_connection_id { + if let Some((answering_connection_id, answering_connection_epoch)) = db_participant + .answering_connection_id + .zip(db_participant.answering_connection_epoch) + { let location = match ( db_participant.location_kind, db_participant.location_project_id, @@ -1560,7 +1647,10 @@ impl Database { answering_connection_id, proto::Participant { user_id: db_participant.user_id.to_proto(), - peer_id: answering_connection_id as u32, + peer_id: Some(proto::PeerId { + epoch: answering_connection_epoch as u32, + id: answering_connection_id as u32, + }), projects: Default::default(), location: Some(proto::ParticipantLocation { variant: location }), }, @@ -1637,12 +1727,22 @@ impl Database { pub async fn share_project( &self, room_id: RoomId, - connection_id: ConnectionId, + connection: ConnectionId, worktrees: &[proto::WorktreeMetadata], ) -> Result> { self.room_transaction(|tx| async move { let participant = room_participant::Entity::find() - .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add( + room_participant::Column::AnsweringConnectionId + .eq(connection.id as i32), + ) + .add( + room_participant::Column::AnsweringConnectionEpoch + .eq(connection.epoch as i32), + ), + ) .one(&*tx) .await? .ok_or_else(|| anyhow!("could not find participant"))?; @@ -1653,8 +1753,8 @@ impl Database { let project = project::ActiveModel { room_id: ActiveValue::set(participant.room_id), host_user_id: ActiveValue::set(participant.user_id), - host_connection_id: ActiveValue::set(connection_id.0 as i32), - host_connection_epoch: ActiveValue::set(self.epoch()), + host_connection_id: ActiveValue::set(connection.id as i32), + host_connection_epoch: ActiveValue::set(connection.epoch as i32), ..Default::default() } .insert(&*tx) @@ -1678,8 +1778,8 @@ impl Database { project_collaborator::ActiveModel { project_id: ActiveValue::set(project.id), - connection_id: ActiveValue::set(connection_id.0 as i32), - connection_epoch: ActiveValue::set(self.epoch()), + connection_id: ActiveValue::set(connection.id as i32), + connection_epoch: ActiveValue::set(connection.epoch as i32), user_id: ActiveValue::set(participant.user_id), replica_id: ActiveValue::set(ReplicaId(0)), is_host: ActiveValue::set(true), @@ -1697,7 +1797,7 @@ impl Database { pub async fn unshare_project( &self, project_id: ProjectId, - connection_id: ConnectionId, + connection: ConnectionId, ) -> Result)>> { self.room_transaction(|tx| async move { let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?; @@ -1706,7 +1806,11 @@ impl Database { .one(&*tx) .await? .ok_or_else(|| anyhow!("project not found"))?; - if project.host_connection_id == connection_id.0 as i32 { + let host_connection = ConnectionId { + epoch: project.host_connection_epoch as u32, + id: project.host_connection_id as u32, + }; + if host_connection == connection { let room_id = project.room_id; project::Entity::delete(project.into_active_model()) .exec(&*tx) @@ -1723,12 +1827,16 @@ impl Database { pub async fn update_project( &self, project_id: ProjectId, - connection_id: ConnectionId, + connection: ConnectionId, worktrees: &[proto::WorktreeMetadata], ) -> Result)>> { self.room_transaction(|tx| async move { let project = project::Entity::find_by_id(project_id) - .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add(project::Column::HostConnectionId.eq(connection.id as i32)) + .add(project::Column::HostConnectionEpoch.eq(connection.epoch as i32)), + ) .one(&*tx) .await? .ok_or_else(|| anyhow!("no such project"))?; @@ -1774,7 +1882,7 @@ impl Database { pub async fn update_worktree( &self, update: &proto::UpdateWorktree, - connection_id: ConnectionId, + connection: ConnectionId, ) -> Result>> { self.room_transaction(|tx| async move { let project_id = ProjectId::from_proto(update.project_id); @@ -1782,7 +1890,11 @@ impl Database { // Ensure the update comes from the host. let project = project::Entity::find_by_id(project_id) - .filter(project::Column::HostConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add(project::Column::HostConnectionId.eq(connection.id as i32)) + .add(project::Column::HostConnectionEpoch.eq(connection.epoch as i32)), + ) .one(&*tx) .await? .ok_or_else(|| anyhow!("no such project"))?; @@ -1862,7 +1974,7 @@ impl Database { pub async fn update_diagnostic_summary( &self, update: &proto::UpdateDiagnosticSummary, - connection_id: ConnectionId, + connection: ConnectionId, ) -> Result>> { self.room_transaction(|tx| async move { let project_id = ProjectId::from_proto(update.project_id); @@ -1877,7 +1989,11 @@ impl Database { .one(&*tx) .await? .ok_or_else(|| anyhow!("no such project"))?; - if project.host_connection_id != connection_id.0 as i32 { + let host_connection = ConnectionId { + epoch: project.host_connection_epoch as u32, + id: project.host_connection_id as u32, + }; + if host_connection != connection { return Err(anyhow!("can't update a project hosted by someone else"))?; } @@ -1916,7 +2032,7 @@ impl Database { pub async fn start_language_server( &self, update: &proto::StartLanguageServer, - connection_id: ConnectionId, + connection: ConnectionId, ) -> Result>> { self.room_transaction(|tx| async move { let project_id = ProjectId::from_proto(update.project_id); @@ -1930,7 +2046,11 @@ impl Database { .one(&*tx) .await? .ok_or_else(|| anyhow!("no such project"))?; - if project.host_connection_id != connection_id.0 as i32 { + let host_connection = ConnectionId { + epoch: project.host_connection_epoch as u32, + id: project.host_connection_id as u32, + }; + if host_connection != connection { return Err(anyhow!("can't update a project hosted by someone else"))?; } @@ -1961,11 +2081,21 @@ impl Database { pub async fn join_project( &self, project_id: ProjectId, - connection_id: ConnectionId, + connection: ConnectionId, ) -> Result> { self.room_transaction(|tx| async move { let participant = room_participant::Entity::find() - .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0 as i32)) + .filter( + Condition::all() + .add( + room_participant::Column::AnsweringConnectionId + .eq(connection.id as i32), + ) + .add( + room_participant::Column::AnsweringConnectionEpoch + .eq(connection.epoch as i32), + ), + ) .one(&*tx) .await? .ok_or_else(|| anyhow!("must join a room first"))?; @@ -1992,8 +2122,8 @@ impl Database { } let new_collaborator = project_collaborator::ActiveModel { project_id: ActiveValue::set(project_id), - connection_id: ActiveValue::set(connection_id.0 as i32), - connection_epoch: ActiveValue::set(self.epoch()), + connection_id: ActiveValue::set(connection.id as i32), + connection_epoch: ActiveValue::set(connection.epoch as i32), user_id: ActiveValue::set(participant.user_id), replica_id: ActiveValue::set(replica_id), is_host: ActiveValue::set(false), @@ -2095,14 +2225,18 @@ impl Database { pub async fn leave_project( &self, project_id: ProjectId, - connection_id: ConnectionId, + connection: ConnectionId, ) -> Result> { self.room_transaction(|tx| async move { let result = project_collaborator::Entity::delete_many() .filter( - project_collaborator::Column::ProjectId - .eq(project_id) - .and(project_collaborator::Column::ConnectionId.eq(connection_id.0 as i32)), + Condition::all() + .add(project_collaborator::Column::ProjectId.eq(project_id)) + .add(project_collaborator::Column::ConnectionId.eq(connection.id as i32)) + .add( + project_collaborator::Column::ConnectionEpoch + .eq(connection.epoch as i32), + ), ) .exec(&*tx) .await?; @@ -2120,13 +2254,19 @@ impl Database { .await?; let connection_ids = collaborators .into_iter() - .map(|collaborator| ConnectionId(collaborator.connection_id as u32)) + .map(|collaborator| ConnectionId { + epoch: collaborator.connection_epoch as u32, + id: collaborator.connection_id as u32, + }) .collect(); let left_project = LeftProject { id: project_id, host_user_id: project.host_user_id, - host_connection_id: ConnectionId(project.host_connection_id as u32), + host_connection_id: ConnectionId { + epoch: project.host_connection_epoch as u32, + id: project.host_connection_id as u32, + }, connection_ids, }; Ok((project.room_id, left_project)) @@ -2137,7 +2277,7 @@ impl Database { pub async fn project_collaborators( &self, project_id: ProjectId, - connection_id: ConnectionId, + connection: ConnectionId, ) -> Result>> { self.room_transaction(|tx| async move { let project = project::Entity::find_by_id(project_id) @@ -2149,10 +2289,13 @@ impl Database { .all(&*tx) .await?; - if collaborators - .iter() - .any(|collaborator| collaborator.connection_id == connection_id.0 as i32) - { + if collaborators.iter().any(|collaborator| { + let collaborator_connection = ConnectionId { + epoch: collaborator.connection_epoch as u32, + id: collaborator.connection_id as u32, + }; + collaborator_connection == connection + }) { Ok((project.room_id, collaborators)) } else { Err(anyhow!("no such project"))? @@ -2166,30 +2309,42 @@ impl Database { project_id: ProjectId, connection_id: ConnectionId, ) -> Result>> { - self.room_transaction(|tx| async move { - #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] - enum QueryAs { - ConnectionId, - } + #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] + enum QueryAs { + Epoch, + Id, + } + #[derive(Debug, FromQueryResult)] + struct ProjectConnection { + epoch: i32, + id: i32, + } + + self.room_transaction(|tx| async move { let project = project::Entity::find_by_id(project_id) .one(&*tx) .await? .ok_or_else(|| anyhow!("no such project"))?; - let mut db_connection_ids = project_collaborator::Entity::find() + let mut db_connections = project_collaborator::Entity::find() .select_only() + .column_as(project_collaborator::Column::ConnectionId, QueryAs::Id) .column_as( - project_collaborator::Column::ConnectionId, - QueryAs::ConnectionId, + project_collaborator::Column::ConnectionEpoch, + QueryAs::Epoch, ) .filter(project_collaborator::Column::ProjectId.eq(project_id)) - .into_values::() + .into_model::() .stream(&*tx) .await?; let mut connection_ids = HashSet::default(); - while let Some(connection_id) = db_connection_ids.next().await { - connection_ids.insert(ConnectionId(connection_id? as u32)); + while let Some(connection) = db_connections.next().await { + let connection = connection?; + connection_ids.insert(ConnectionId { + epoch: connection.epoch as u32, + id: connection.id as u32, + }); } if connection_ids.contains(&connection_id) { @@ -2208,27 +2363,39 @@ impl Database { ) -> Result> { #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] enum QueryAs { - ConnectionId, + Epoch, + Id, } - let mut db_guest_connection_ids = project_collaborator::Entity::find() + #[derive(Debug, FromQueryResult)] + struct ProjectConnection { + epoch: i32, + id: i32, + } + + let mut db_guest_connections = project_collaborator::Entity::find() .select_only() + .column_as(project_collaborator::Column::ConnectionId, QueryAs::Id) .column_as( - project_collaborator::Column::ConnectionId, - QueryAs::ConnectionId, + project_collaborator::Column::ConnectionEpoch, + QueryAs::Epoch, ) .filter( project_collaborator::Column::ProjectId .eq(project_id) .and(project_collaborator::Column::IsHost.eq(false)), ) - .into_values::() + .into_model::() .stream(tx) .await?; let mut guest_connection_ids = Vec::new(); - while let Some(connection_id) = db_guest_connection_ids.next().await { - guest_connection_ids.push(ConnectionId(connection_id? as u32)); + while let Some(connection) = db_guest_connections.next().await { + let connection = connection?; + guest_connection_ids.push(ConnectionId { + epoch: connection.epoch as u32, + id: connection.id as u32, + }); } Ok(guest_connection_ids) } diff --git a/crates/collab/src/db/project.rs b/crates/collab/src/db/project.rs index 971a8fcefb..b8cf321e51 100644 --- a/crates/collab/src/db/project.rs +++ b/crates/collab/src/db/project.rs @@ -9,7 +9,7 @@ pub struct Model { pub room_id: RoomId, pub host_user_id: UserId, pub host_connection_id: i32, - pub host_connection_epoch: Uuid, + pub host_connection_epoch: i32, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/collab/src/db/project_collaborator.rs b/crates/collab/src/db/project_collaborator.rs index 5db307f5df..e12b113ff9 100644 --- a/crates/collab/src/db/project_collaborator.rs +++ b/crates/collab/src/db/project_collaborator.rs @@ -8,7 +8,7 @@ pub struct Model { pub id: ProjectCollaboratorId, pub project_id: ProjectId, pub connection_id: i32, - pub connection_epoch: Uuid, + pub connection_epoch: i32, pub user_id: UserId, pub replica_id: ReplicaId, pub is_host: bool, diff --git a/crates/collab/src/db/room_participant.rs b/crates/collab/src/db/room_participant.rs index c80c10c1ba..265febd545 100644 --- a/crates/collab/src/db/room_participant.rs +++ b/crates/collab/src/db/room_participant.rs @@ -9,14 +9,14 @@ pub struct Model { pub room_id: RoomId, pub user_id: UserId, pub answering_connection_id: Option, - pub answering_connection_epoch: Option, + pub answering_connection_epoch: Option, pub answering_connection_lost: bool, pub location_kind: Option, pub location_project_id: Option, pub initial_project_id: Option, pub calling_user_id: UserId, pub calling_connection_id: i32, - pub calling_connection_epoch: Uuid, + pub calling_connection_epoch: i32, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/collab/src/db/tests.rs b/crates/collab/src/db/tests.rs index 2d254c2e37..b6b8a780a7 100644 --- a/crates/collab/src/db/tests.rs +++ b/crates/collab/src/db/tests.rs @@ -436,36 +436,44 @@ test_both_dbs!( .unwrap(); let room_id = RoomId::from_proto( - db.create_room(user1.user_id, ConnectionId(0), "") + db.create_room(user1.user_id, ConnectionId { epoch: 0, id: 0 }, "") .await .unwrap() .id, ); - db.call(room_id, user1.user_id, ConnectionId(0), user2.user_id, None) - .await - .unwrap(); - db.join_room(room_id, user2.user_id, ConnectionId(1)) + db.call( + room_id, + user1.user_id, + ConnectionId { epoch: 0, id: 0 }, + user2.user_id, + None, + ) + .await + .unwrap(); + db.join_room(room_id, user2.user_id, ConnectionId { epoch: 0, id: 1 }) .await .unwrap(); assert_eq!(db.project_count_excluding_admins().await.unwrap(), 0); - db.share_project(room_id, ConnectionId(1), &[]) + db.share_project(room_id, ConnectionId { epoch: 0, id: 1 }, &[]) .await .unwrap(); assert_eq!(db.project_count_excluding_admins().await.unwrap(), 1); - db.share_project(room_id, ConnectionId(1), &[]) + db.share_project(room_id, ConnectionId { epoch: 0, id: 1 }, &[]) .await .unwrap(); assert_eq!(db.project_count_excluding_admins().await.unwrap(), 2); // Projects shared by admins aren't counted. - db.share_project(room_id, ConnectionId(0), &[]) + db.share_project(room_id, ConnectionId { epoch: 0, id: 0 }, &[]) .await .unwrap(); assert_eq!(db.project_count_excluding_admins().await.unwrap(), 2); - db.leave_room(ConnectionId(1)).await.unwrap(); + db.leave_room(ConnectionId { epoch: 0, id: 1 }) + .await + .unwrap(); assert_eq!(db.project_count_excluding_admins().await.unwrap(), 0); } ); diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index bee8f9a34f..2c2a301733 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -7,8 +7,8 @@ use crate::{ use anyhow::anyhow; use call::{room, ActiveCall, ParticipantLocation, Room}; use client::{ - self, test::FakeHttpClient, Client, Connection, Credentials, EstablishConnectionError, PeerId, - User, UserStore, RECEIVE_TIMEOUT, + self, proto::PeerId, test::FakeHttpClient, Client, Connection, Credentials, + EstablishConnectionError, User, UserStore, RECEIVE_TIMEOUT, }; use collections::{BTreeMap, HashMap, HashSet}; use editor::{ @@ -6066,7 +6066,7 @@ async fn test_random_collaboration( .user_connection_ids(removed_guest_id) .collect::>(); assert_eq!(user_connection_ids.len(), 1); - let removed_peer_id = PeerId(user_connection_ids[0].0); + let removed_peer_id = user_connection_ids[0].into(); let guest = clients.remove(guest_ix); op_start_signals.remove(guest_ix); server.forbid_connections(); @@ -6115,7 +6115,7 @@ async fn test_random_collaboration( .user_connection_ids(user_id) .collect::>(); assert_eq!(user_connection_ids.len(), 1); - let peer_id = PeerId(user_connection_ids[0].0); + let peer_id = user_connection_ids[0].into(); server.disconnect_client(peer_id); deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); operations += 1; @@ -6429,7 +6429,7 @@ impl TestServer { let connection_id = connection_id_rx.await.unwrap(); connection_killers .lock() - .insert(PeerId(connection_id.0), killed); + .insert(connection_id.into(), killed); Ok(client_conn) } }) diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 70802fa431..856eb36fb1 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -170,7 +170,7 @@ where impl Server { pub fn new(app_state: Arc, executor: Executor) -> Arc { let mut server = Self { - peer: Peer::new(), + peer: Peer::new(0), app_state, executor, connection_pool: Default::default(), @@ -457,9 +457,9 @@ impl Server { move |duration| executor.sleep(duration) }); - tracing::info!(%user_id, %login, %connection_id, %address, "connection opened"); - this.peer.send(connection_id, proto::Hello { peer_id: connection_id.0 })?; - tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message"); + tracing::info!(%user_id, %login, ?connection_id, %address, "connection opened"); + this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?; + tracing::info!(%user_id, %login, ?connection_id, %address, "sent hello message"); if let Some(send_connection_id) = send_connection_id.take() { let _ = send_connection_id.send(connection_id); @@ -521,7 +521,7 @@ impl Server { _ = teardown.changed().fuse() => return Ok(()), result = handle_io => { if let Err(error) = result { - tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O"); + tracing::error!(?error, %user_id, %login, ?connection_id, %address, "error handling I/O"); } break; } @@ -529,7 +529,7 @@ impl Server { message = next_message => { if let Some(message) = message { let type_name = message.payload_type_name(); - let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name); + let span = tracing::info_span!("receive message", %user_id, %login, ?connection_id, %address, type_name); let span_enter = span.enter(); if let Some(handler) = this.handlers.get(&message.payload_type_id()) { let is_background = message.is_background(); @@ -543,10 +543,10 @@ impl Server { foreground_message_handlers.push(handle_message); } } else { - tracing::error!(%user_id, %login, %connection_id, %address, "no message handler"); + tracing::error!(%user_id, %login, ?connection_id, %address, "no message handler"); } } else { - tracing::info!(%user_id, %login, %connection_id, %address, "connection closed"); + tracing::info!(%user_id, %login, ?connection_id, %address, "connection closed"); break; } } @@ -554,9 +554,9 @@ impl Server { } drop(foreground_message_handlers); - tracing::info!(%user_id, %login, %connection_id, %address, "signing out"); + tracing::info!(%user_id, %login, ?connection_id, %address, "signing out"); if let Err(error) = sign_out(session, teardown, executor).await { - tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out"); + tracing::error!(%user_id, %login, ?connection_id, %address, ?error, "error signing out"); } Ok(()) @@ -1128,12 +1128,18 @@ async fn join_project( let collaborators = project .collaborators .iter() - .filter(|collaborator| collaborator.connection_id != session.connection_id.0 as i32) - .map(|collaborator| proto::Collaborator { - peer_id: collaborator.connection_id as u32, - replica_id: collaborator.replica_id.0 as u32, - user_id: collaborator.user_id.to_proto(), + .map(|collaborator| { + let peer_id = proto::PeerId { + epoch: collaborator.connection_epoch as u32, + id: collaborator.connection_id as u32, + }; + proto::Collaborator { + peer_id: Some(peer_id), + replica_id: collaborator.replica_id.0 as u32, + user_id: collaborator.user_id.to_proto(), + } }) + .filter(|collaborator| collaborator.peer_id != Some(session.connection_id.into())) .collect::>(); let worktrees = project .worktrees @@ -1150,11 +1156,11 @@ async fn join_project( session .peer .send( - ConnectionId(collaborator.peer_id), + collaborator.peer_id.unwrap().into(), proto::AddProjectCollaborator { project_id: project_id.to_proto(), collaborator: Some(proto::Collaborator { - peer_id: session.connection_id.0, + peer_id: Some(session.connection_id.into()), replica_id: replica_id.0 as u32, user_id: guest_user_id.to_proto(), }), @@ -1375,13 +1381,14 @@ where .await .project_collaborators(project_id, session.connection_id) .await?; - ConnectionId( - collaborators - .iter() - .find(|collaborator| collaborator.is_host) - .ok_or_else(|| anyhow!("host not found"))? - .connection_id as u32, - ) + let host = collaborators + .iter() + .find(|collaborator| collaborator.is_host) + .ok_or_else(|| anyhow!("host not found"))?; + ConnectionId { + epoch: host.connection_epoch as u32, + id: host.connection_id as u32, + } }; let payload = session @@ -1409,7 +1416,10 @@ async fn save_buffer( .iter() .find(|collaborator| collaborator.is_host) .ok_or_else(|| anyhow!("host not found"))?; - ConnectionId(host.connection_id as u32) + ConnectionId { + epoch: host.connection_epoch as u32, + id: host.connection_id as u32, + } }; let response_payload = session .peer @@ -1421,11 +1431,17 @@ async fn save_buffer( .await .project_collaborators(project_id, session.connection_id) .await?; - collaborators - .retain(|collaborator| collaborator.connection_id != session.connection_id.0 as i32); - let project_connection_ids = collaborators - .iter() - .map(|collaborator| ConnectionId(collaborator.connection_id as u32)); + collaborators.retain(|collaborator| { + let collaborator_connection = ConnectionId { + epoch: collaborator.connection_epoch as u32, + id: collaborator.connection_id as u32, + }; + collaborator_connection != session.connection_id + }); + let project_connection_ids = collaborators.iter().map(|collaborator| ConnectionId { + epoch: collaborator.connection_epoch as u32, + id: collaborator.connection_id as u32, + }); broadcast(host_connection_id, project_connection_ids, |conn_id| { session .peer @@ -1439,11 +1455,10 @@ async fn create_buffer_for_peer( request: proto::CreateBufferForPeer, session: Session, ) -> Result<()> { - session.peer.forward_send( - session.connection_id, - ConnectionId(request.peer_id), - request, - )?; + let peer_id = request.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?; + session + .peer + .forward_send(session.connection_id, peer_id.into(), request)?; Ok(()) } @@ -1536,7 +1551,10 @@ async fn follow( session: Session, ) -> Result<()> { let project_id = ProjectId::from_proto(request.project_id); - let leader_id = ConnectionId(request.leader_id); + let leader_id = request + .leader_id + .ok_or_else(|| anyhow!("invalid leader id"))? + .into(); let follower_id = session.connection_id; { let project_connection_ids = session @@ -1556,14 +1574,17 @@ async fn follow( .await?; response_payload .views - .retain(|view| view.leader_id != Some(follower_id.0)); + .retain(|view| view.leader_id != Some(follower_id.into())); response.send(response_payload)?; Ok(()) } async fn unfollow(request: proto::Unfollow, session: Session) -> Result<()> { let project_id = ProjectId::from_proto(request.project_id); - let leader_id = ConnectionId(request.leader_id); + let leader_id = request + .leader_id + .ok_or_else(|| anyhow!("invalid leader id"))? + .into(); let project_connection_ids = session .db() .await @@ -1592,12 +1613,16 @@ async fn update_followers(request: proto::UpdateFollowers, session: Session) -> proto::update_followers::Variant::UpdateView(payload) => payload.leader_id, proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id, }); - for follower_id in &request.follower_ids { - let follower_id = ConnectionId(*follower_id); - if project_connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id { - session - .peer - .forward_send(session.connection_id, follower_id, request.clone())?; + for follower_peer_id in request.follower_ids.iter().copied() { + let follower_connection_id = follower_peer_id.into(); + if project_connection_ids.contains(&follower_connection_id) + && Some(follower_peer_id) != leader_id + { + session.peer.forward_send( + session.connection_id, + follower_connection_id, + request.clone(), + )?; } } Ok(()) @@ -1912,13 +1937,19 @@ fn contact_for_user( fn room_updated(room: &proto::Room, peer: &Peer) { for participant in &room.participants { - peer.send( - ConnectionId(participant.peer_id), - proto::RoomUpdated { - room: Some(room.clone()), - }, - ) - .trace_err(); + if let Some(peer_id) = participant + .peer_id + .ok_or_else(|| anyhow!("invalid participant peer id")) + .trace_err() + { + peer.send( + peer_id.into(), + proto::RoomUpdated { + room: Some(room.clone()), + }, + ) + .trace_err(); + } } } @@ -2033,7 +2064,7 @@ fn project_left(project: &db::LeftProject, session: &Session) { *connection_id, proto::RemoveProjectCollaborator { project_id: project.id.to_proto(), - peer_id: session.connection_id.0, + peer_id: Some(session.connection_id.into()), }, ) .trace_err(); diff --git a/crates/collab_ui/src/collab_titlebar_item.rs b/crates/collab_ui/src/collab_titlebar_item.rs index ab414e051b..2288f77cd3 100644 --- a/crates/collab_ui/src/collab_titlebar_item.rs +++ b/crates/collab_ui/src/collab_titlebar_item.rs @@ -1,6 +1,6 @@ use crate::{contact_notification::ContactNotification, contacts_popover}; use call::{ActiveCall, ParticipantLocation}; -use client::{Authenticate, ContactEventKind, PeerId, User, UserStore}; +use client::{proto::PeerId, Authenticate, ContactEventKind, User, UserStore}; use clock::ReplicaId; use contacts_popover::ContactsPopover; use gpui::{ @@ -474,7 +474,7 @@ impl CollabTitlebarItem { cx.dispatch_action(ToggleFollow(peer_id)) }) .with_tooltip::( - peer_id.0 as usize, + peer_id.as_u64() as usize, if is_followed { format!("Unfollow {}", peer_github_login) } else { @@ -487,22 +487,24 @@ impl CollabTitlebarItem { .boxed() } else if let ParticipantLocation::SharedProject { project_id } = location { let user_id = user.id; - MouseEventHandler::::new(peer_id.0 as usize, cx, move |_, _| content) - .with_cursor_style(CursorStyle::PointingHand) - .on_click(MouseButton::Left, move |_, cx| { - cx.dispatch_action(JoinProject { - project_id, - follow_user_id: user_id, - }) + MouseEventHandler::::new(peer_id.as_u64() as usize, cx, move |_, _| { + content + }) + .with_cursor_style(CursorStyle::PointingHand) + .on_click(MouseButton::Left, move |_, cx| { + cx.dispatch_action(JoinProject { + project_id, + follow_user_id: user_id, }) - .with_tooltip::( - peer_id.0 as usize, - format!("Follow {} into external project", peer_github_login), - Some(Box::new(FollowNextCollaborator)), - theme.tooltip.clone(), - cx, - ) - .boxed() + }) + .with_tooltip::( + peer_id.as_u64() as usize, + format!("Follow {} into external project", peer_github_login), + Some(Box::new(FollowNextCollaborator)), + theme.tooltip.clone(), + cx, + ) + .boxed() } else { content } diff --git a/crates/collab_ui/src/contact_list.rs b/crates/collab_ui/src/contact_list.rs index bc8b2947c4..48a4d1a2b5 100644 --- a/crates/collab_ui/src/contact_list.rs +++ b/crates/collab_ui/src/contact_list.rs @@ -2,7 +2,7 @@ use std::{mem, sync::Arc}; use crate::contacts_popover; use call::ActiveCall; -use client::{Contact, PeerId, User, UserStore}; +use client::{proto::PeerId, Contact, User, UserStore}; use editor::{Cancel, Editor}; use fuzzy::{match_strings, StringMatchCandidate}; use gpui::{ @@ -465,7 +465,7 @@ impl ContactList { room.remote_participants() .iter() .map(|(peer_id, participant)| StringMatchCandidate { - id: peer_id.0 as usize, + id: peer_id.as_u64() as usize, string: participant.user.github_login.clone(), char_bag: participant.user.github_login.chars().collect(), }), @@ -479,7 +479,7 @@ impl ContactList { executor.clone(), )); for mat in matches { - let peer_id = PeerId(mat.candidate_id as u32); + let peer_id = PeerId::from_u64(mat.candidate_id as u64); let participant = &room.remote_participants()[&peer_id]; participant_entries.push(ContactEntry::CallParticipant { user: participant.user.clone(), @@ -881,75 +881,80 @@ impl ContactList { let baseline_offset = row.name.text.baseline_offset(font_cache) + (theme.row_height - line_height) / 2.; - MouseEventHandler::::new(peer_id.0 as usize, cx, |mouse_state, _| { - let tree_branch = *tree_branch.style_for(mouse_state, is_selected); - let row = theme.project_row.style_for(mouse_state, is_selected); + MouseEventHandler::::new( + peer_id.as_u64() as usize, + cx, + |mouse_state, _| { + let tree_branch = *tree_branch.style_for(mouse_state, is_selected); + let row = theme.project_row.style_for(mouse_state, is_selected); - Flex::row() - .with_child( - Stack::new() - .with_child( - Canvas::new(move |bounds, _, cx| { - let start_x = bounds.min_x() + (bounds.width() / 2.) - - (tree_branch.width / 2.); - let end_x = bounds.max_x(); - let start_y = bounds.min_y(); - let end_y = bounds.min_y() + baseline_offset - (cap_height / 2.); + Flex::row() + .with_child( + Stack::new() + .with_child( + Canvas::new(move |bounds, _, cx| { + let start_x = bounds.min_x() + (bounds.width() / 2.) + - (tree_branch.width / 2.); + let end_x = bounds.max_x(); + let start_y = bounds.min_y(); + let end_y = + bounds.min_y() + baseline_offset - (cap_height / 2.); - cx.scene.push_quad(gpui::Quad { - bounds: RectF::from_points( - vec2f(start_x, start_y), - vec2f( - start_x + tree_branch.width, - if is_last { end_y } else { bounds.max_y() }, + cx.scene.push_quad(gpui::Quad { + bounds: RectF::from_points( + vec2f(start_x, start_y), + vec2f( + start_x + tree_branch.width, + if is_last { end_y } else { bounds.max_y() }, + ), ), - ), - background: Some(tree_branch.color), - border: gpui::Border::default(), - corner_radius: 0., - }); - cx.scene.push_quad(gpui::Quad { - bounds: RectF::from_points( - vec2f(start_x, end_y), - vec2f(end_x, end_y + tree_branch.width), - ), - background: Some(tree_branch.color), - border: gpui::Border::default(), - corner_radius: 0., - }); - }) + background: Some(tree_branch.color), + border: gpui::Border::default(), + corner_radius: 0., + }); + cx.scene.push_quad(gpui::Quad { + bounds: RectF::from_points( + vec2f(start_x, end_y), + vec2f(end_x, end_y + tree_branch.width), + ), + background: Some(tree_branch.color), + border: gpui::Border::default(), + corner_radius: 0., + }); + }) + .boxed(), + ) + .constrained() + .with_width(host_avatar_height) .boxed(), - ) - .constrained() - .with_width(host_avatar_height) - .boxed(), - ) - .with_child( - Svg::new("icons/disable_screen_sharing_12.svg") - .with_color(row.icon.color) - .constrained() - .with_width(row.icon.width) - .aligned() - .left() - .contained() - .with_style(row.icon.container) - .boxed(), - ) - .with_child( - Label::new("Screen".into(), row.name.text.clone()) - .aligned() - .left() - .contained() - .with_style(row.name.container) - .flex(1., false) - .boxed(), - ) - .constrained() - .with_height(theme.row_height) - .contained() - .with_style(row.container) - .boxed() - }) + ) + .with_child( + Svg::new("icons/disable_screen_sharing_12.svg") + .with_color(row.icon.color) + .constrained() + .with_width(row.icon.width) + .aligned() + .left() + .contained() + .with_style(row.icon.container) + .boxed(), + ) + .with_child( + Label::new("Screen".into(), row.name.text.clone()) + .aligned() + .left() + .contained() + .with_style(row.name.container) + .flex(1., false) + .boxed(), + ) + .constrained() + .with_height(theme.row_height) + .contained() + .with_style(row.container) + .boxed() + }, + ) .with_cursor_style(CursorStyle::PointingHand) .on_click(MouseButton::Left, move |_, cx| { cx.dispatch_action(OpenSharedScreen { peer_id }); diff --git a/crates/project/src/lsp_command.rs b/crates/project/src/lsp_command.rs index 3ea1261735..a0eb845581 100644 --- a/crates/project/src/lsp_command.rs +++ b/crates/project/src/lsp_command.rs @@ -3,7 +3,7 @@ use crate::{ }; use anyhow::{anyhow, Result}; use async_trait::async_trait; -use client::{proto, PeerId}; +use client::proto::{self, PeerId}; use gpui::{AppContext, AsyncAppContext, ModelHandle}; use language::{ point_from_lsp, point_to_lsp, diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 9b4a163af4..7f2fcb516f 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -7,7 +7,7 @@ pub mod worktree; mod project_tests; use anyhow::{anyhow, Context, Result}; -use client::{proto, Client, PeerId, TypedEnvelope, UserStore}; +use client::{proto, Client, TypedEnvelope, UserStore}; use clock::ReplicaId; use collections::{hash_map, BTreeMap, HashMap, HashSet}; use futures::{ @@ -15,7 +15,6 @@ use futures::{ future::Shared, AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt, }; - use gpui::{ AnyModelHandle, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, UpgradeModelHandle, WeakModelHandle, @@ -103,11 +102,11 @@ pub struct Project { user_store: ModelHandle, fs: Arc, client_state: Option, - collaborators: HashMap, + collaborators: HashMap, client_subscriptions: Vec, _subscriptions: Vec, opened_buffer: (watch::Sender<()>, watch::Receiver<()>), - shared_buffers: HashMap>, + shared_buffers: HashMap>, #[allow(clippy::type_complexity)] loading_buffers: HashMap< ProjectPath, @@ -164,7 +163,7 @@ enum ProjectClientState { #[derive(Clone, Debug)] pub struct Collaborator { - pub peer_id: PeerId, + pub peer_id: proto::PeerId, pub replica_id: ReplicaId, } @@ -185,7 +184,7 @@ pub enum Event { }, RemoteIdChanged(Option), DisconnectedFromHost, - CollaboratorLeft(PeerId), + CollaboratorLeft(proto::PeerId), } pub enum LanguageServerState { @@ -555,7 +554,7 @@ impl Project { .await?; let mut collaborators = HashMap::default(); for message in response.collaborators { - let collaborator = Collaborator::from_proto(message); + let collaborator = Collaborator::from_proto(message)?; collaborators.insert(collaborator.peer_id, collaborator); } @@ -754,7 +753,7 @@ impl Project { } } - pub fn collaborators(&self) -> &HashMap { + pub fn collaborators(&self) -> &HashMap { &self.collaborators } @@ -4605,7 +4604,7 @@ impl Project { .take() .ok_or_else(|| anyhow!("empty collaborator"))?; - let collaborator = Collaborator::from_proto(collaborator); + let collaborator = Collaborator::from_proto(collaborator)?; this.update(&mut cx, |this, cx| { this.collaborators .insert(collaborator.peer_id, collaborator); @@ -4622,7 +4621,10 @@ impl Project { mut cx: AsyncAppContext, ) -> Result<()> { this.update(&mut cx, |this, cx| { - let peer_id = PeerId(envelope.payload.peer_id); + let peer_id = envelope + .payload + .peer_id + .ok_or_else(|| anyhow!("invalid peer id"))?; let replica_id = this .collaborators .remove(&peer_id) @@ -5489,7 +5491,7 @@ impl Project { fn serialize_project_transaction_for_peer( &mut self, project_transaction: ProjectTransaction, - peer_id: PeerId, + peer_id: proto::PeerId, cx: &AppContext, ) -> proto::ProjectTransaction { let mut serialized_transaction = proto::ProjectTransaction { @@ -5545,7 +5547,7 @@ impl Project { fn create_buffer_for_peer( &mut self, buffer: &ModelHandle, - peer_id: PeerId, + peer_id: proto::PeerId, cx: &AppContext, ) -> u64 { let buffer_id = buffer.read(cx).remote_id(); @@ -5563,7 +5565,7 @@ impl Project { client.send(proto::CreateBufferForPeer { project_id, - peer_id: peer_id.0, + peer_id: Some(peer_id), variant: Some(proto::create_buffer_for_peer::Variant::State(state)), })?; @@ -5580,7 +5582,7 @@ impl Project { let is_last = operations.is_empty(); client.send(proto::CreateBufferForPeer { project_id, - peer_id: peer_id.0, + peer_id: Some(peer_id), variant: Some(proto::create_buffer_for_peer::Variant::Chunk( proto::BufferChunk { buffer_id, @@ -6036,11 +6038,11 @@ impl Entity for Project { } impl Collaborator { - fn from_proto(message: proto::Collaborator) -> Self { - Self { - peer_id: PeerId(message.peer_id), + fn from_proto(message: proto::Collaborator) -> Result { + Ok(Self { + peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?, replica_id: message.replica_id as ReplicaId, - } + }) } } diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index cf58adfe0b..064bf6f50a 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -1,10 +1,15 @@ syntax = "proto3"; package zed.messages; +message PeerId { + uint32 epoch = 1; + uint32 id = 2; +} + message Envelope { uint32 id = 1; optional uint32 responding_to = 2; - optional uint32 original_sender_id = 3; + PeerId original_sender_id = 3; oneof payload { Hello hello = 4; Ack ack = 5; @@ -125,7 +130,7 @@ message Envelope { // Messages message Hello { - uint32 peer_id = 1; + PeerId peer_id = 1; } message Ping {} @@ -167,7 +172,7 @@ message Room { message Participant { uint64 user_id = 1; - uint32 peer_id = 2; + PeerId peer_id = 2; repeated ParticipantProject projects = 3; ParticipantLocation location = 4; } @@ -319,7 +324,7 @@ message AddProjectCollaborator { message RemoveProjectCollaborator { uint64 project_id = 1; - uint32 peer_id = 2; + PeerId peer_id = 2; } message GetDefinition { @@ -438,7 +443,7 @@ message OpenBufferResponse { message CreateBufferForPeer { uint64 project_id = 1; - uint32 peer_id = 2; + PeerId peer_id = 2; oneof variant { BufferState state = 3; BufferChunk chunk = 4; @@ -794,7 +799,7 @@ message UpdateDiagnostics { message Follow { uint64 project_id = 1; - uint32 leader_id = 2; + PeerId leader_id = 2; } message FollowResponse { @@ -804,7 +809,7 @@ message FollowResponse { message UpdateFollowers { uint64 project_id = 1; - repeated uint32 follower_ids = 2; + repeated PeerId follower_ids = 2; oneof variant { UpdateActiveView update_active_view = 3; View create_view = 4; @@ -814,7 +819,7 @@ message UpdateFollowers { message Unfollow { uint64 project_id = 1; - uint32 leader_id = 2; + PeerId leader_id = 2; } message GetPrivateUserInfo {} @@ -828,12 +833,12 @@ message GetPrivateUserInfoResponse { message UpdateActiveView { optional uint64 id = 1; - optional uint32 leader_id = 2; + optional PeerId leader_id = 2; } message UpdateView { uint64 id = 1; - optional uint32 leader_id = 2; + optional PeerId leader_id = 2; oneof variant { Editor editor = 3; @@ -849,7 +854,7 @@ message UpdateView { message View { uint64 id = 1; - optional uint32 leader_id = 2; + optional PeerId leader_id = 2; oneof variant { Editor editor = 3; @@ -865,7 +870,7 @@ message View { } message Collaborator { - uint32 peer_id = 1; + PeerId peer_id = 1; uint32 replica_id = 2; uint64 user_id = 3; } diff --git a/crates/rpc/src/macros.rs b/crates/rpc/src/macros.rs index 38d35893ee..ebed976968 100644 --- a/crates/rpc/src/macros.rs +++ b/crates/rpc/src/macros.rs @@ -6,7 +6,10 @@ macro_rules! messages { $(Some(envelope::Payload::$name(payload)) => { Some(Box::new(TypedEnvelope { sender_id, - original_sender_id: envelope.original_sender_id.map(PeerId), + original_sender_id: envelope.original_sender_id.map(|original_sender| PeerId { + epoch: original_sender.epoch, + id: original_sender.id + }), message_id: envelope.id, payload, })) @@ -24,7 +27,7 @@ macro_rules! messages { self, id: u32, responding_to: Option, - original_sender_id: Option, + original_sender_id: Option, ) -> Envelope { Envelope { id, diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 66ba6a4029..43e4dfbde8 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -1,5 +1,5 @@ use super::{ - proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, RequestMessage}, + proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, PeerId, RequestMessage}, Connection, }; use anyhow::{anyhow, Context, Result}; @@ -11,9 +11,8 @@ use futures::{ }; use parking_lot::{Mutex, RwLock}; use serde::{ser::SerializeStruct, Serialize}; -use std::sync::atomic::Ordering::SeqCst; +use std::{fmt, sync::atomic::Ordering::SeqCst}; use std::{ - fmt, future::Future, marker::PhantomData, sync::{ @@ -25,20 +24,32 @@ use std::{ use tracing::instrument; #[derive(Clone, Copy, Default, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Serialize)] -pub struct ConnectionId(pub u32); +pub struct ConnectionId { + pub epoch: u32, + pub id: u32, +} -impl fmt::Display for ConnectionId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) +impl Into for ConnectionId { + fn into(self) -> PeerId { + PeerId { + epoch: self.epoch, + id: self.id, + } } } -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)] -pub struct PeerId(pub u32); +impl From for ConnectionId { + fn from(peer_id: PeerId) -> Self { + Self { + epoch: peer_id.epoch, + id: peer_id.id, + } + } +} -impl fmt::Display for PeerId { +impl fmt::Display for ConnectionId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) + write!(f, "{}/{}", self.epoch, self.id) } } @@ -70,6 +81,7 @@ pub struct TypedEnvelope { impl TypedEnvelope { pub fn original_sender_id(&self) -> Result { self.original_sender_id + .clone() .ok_or_else(|| anyhow!("missing original_sender_id")) } } @@ -85,6 +97,7 @@ impl TypedEnvelope { } pub struct Peer { + epoch: u32, pub connections: RwLock>, next_connection_id: AtomicU32, } @@ -105,8 +118,9 @@ const WRITE_TIMEOUT: Duration = Duration::from_secs(2); pub const RECEIVE_TIMEOUT: Duration = Duration::from_secs(5); impl Peer { - pub fn new() -> Arc { + pub fn new(epoch: u32) -> Arc { Arc::new(Self { + epoch, connections: Default::default(), next_connection_id: Default::default(), }) @@ -138,7 +152,10 @@ impl Peer { let (mut incoming_tx, incoming_rx) = mpsc::channel(INCOMING_BUFFER_SIZE); let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded(); - let connection_id = ConnectionId(self.next_connection_id.fetch_add(1, SeqCst)); + let connection_id = ConnectionId { + epoch: self.epoch, + id: self.next_connection_id.fetch_add(1, SeqCst), + }; let connection_state = ConnectionState { outgoing_tx, next_message_id: Default::default(), @@ -150,12 +167,12 @@ impl Peer { let this = self.clone(); let response_channels = connection_state.response_channels.clone(); let handle_io = async move { - tracing::debug!(%connection_id, "handle io future: start"); + tracing::debug!(?connection_id, "handle io future: start"); let _end_connection = util::defer(|| { response_channels.lock().take(); this.connections.write().remove(&connection_id); - tracing::debug!(%connection_id, "handle io future: end"); + tracing::debug!(?connection_id, "handle io future: end"); }); // Send messages on this frequency so the connection isn't closed. @@ -167,68 +184,68 @@ impl Peer { futures::pin_mut!(receive_timeout); loop { - tracing::debug!(%connection_id, "outer loop iteration start"); + tracing::debug!(?connection_id, "outer loop iteration start"); let read_message = reader.read().fuse(); futures::pin_mut!(read_message); loop { - tracing::debug!(%connection_id, "inner loop iteration start"); + tracing::debug!(?connection_id, "inner loop iteration start"); futures::select_biased! { outgoing = outgoing_rx.next().fuse() => match outgoing { Some(outgoing) => { - tracing::debug!(%connection_id, "outgoing rpc message: writing"); + tracing::debug!(?connection_id, "outgoing rpc message: writing"); futures::select_biased! { result = writer.write(outgoing).fuse() => { - tracing::debug!(%connection_id, "outgoing rpc message: done writing"); + tracing::debug!(?connection_id, "outgoing rpc message: done writing"); result.context("failed to write RPC message")?; - tracing::debug!(%connection_id, "keepalive interval: resetting after sending message"); + tracing::debug!(?connection_id, "keepalive interval: resetting after sending message"); keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); } _ = create_timer(WRITE_TIMEOUT).fuse() => { - tracing::debug!(%connection_id, "outgoing rpc message: writing timed out"); + tracing::debug!(?connection_id, "outgoing rpc message: writing timed out"); Err(anyhow!("timed out writing message"))?; } } } None => { - tracing::debug!(%connection_id, "outgoing rpc message: channel closed"); + tracing::debug!(?connection_id, "outgoing rpc message: channel closed"); return Ok(()) }, }, _ = keepalive_timer => { - tracing::debug!(%connection_id, "keepalive interval: pinging"); + tracing::debug!(?connection_id, "keepalive interval: pinging"); futures::select_biased! { result = writer.write(proto::Message::Ping).fuse() => { - tracing::debug!(%connection_id, "keepalive interval: done pinging"); + tracing::debug!(?connection_id, "keepalive interval: done pinging"); result.context("failed to send keepalive")?; - tracing::debug!(%connection_id, "keepalive interval: resetting after pinging"); + tracing::debug!(?connection_id, "keepalive interval: resetting after pinging"); keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); } _ = create_timer(WRITE_TIMEOUT).fuse() => { - tracing::debug!(%connection_id, "keepalive interval: pinging timed out"); + tracing::debug!(?connection_id, "keepalive interval: pinging timed out"); Err(anyhow!("timed out sending keepalive"))?; } } } incoming = read_message => { let incoming = incoming.context("error reading rpc message from socket")?; - tracing::debug!(%connection_id, "incoming rpc message: received"); - tracing::debug!(%connection_id, "receive timeout: resetting"); + tracing::debug!(?connection_id, "incoming rpc message: received"); + tracing::debug!(?connection_id, "receive timeout: resetting"); receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse()); if let proto::Message::Envelope(incoming) = incoming { - tracing::debug!(%connection_id, "incoming rpc message: processing"); + tracing::debug!(?connection_id, "incoming rpc message: processing"); futures::select_biased! { result = incoming_tx.send(incoming).fuse() => match result { Ok(_) => { - tracing::debug!(%connection_id, "incoming rpc message: processed"); + tracing::debug!(?connection_id, "incoming rpc message: processed"); } Err(_) => { - tracing::debug!(%connection_id, "incoming rpc message: channel closed"); + tracing::debug!(?connection_id, "incoming rpc message: channel closed"); return Ok(()) } }, _ = create_timer(WRITE_TIMEOUT).fuse() => { - tracing::debug!(%connection_id, "incoming rpc message: processing timed out"); + tracing::debug!(?connection_id, "incoming rpc message: processing timed out"); Err(anyhow!("timed out processing incoming message"))? } } @@ -236,7 +253,7 @@ impl Peer { break; }, _ = receive_timeout => { - tracing::debug!(%connection_id, "receive timeout: delay between messages too long"); + tracing::debug!(?connection_id, "receive timeout: delay between messages too long"); Err(anyhow!("delay between messages too long"))? } } @@ -255,16 +272,12 @@ impl Peer { let message_id = incoming.id; tracing::debug!(?incoming, "incoming message future: start"); let _end = util::defer(move || { - tracing::debug!( - %connection_id, - message_id, - "incoming message future: end" - ); + tracing::debug!(?connection_id, message_id, "incoming message future: end"); }); if let Some(responding_to) = incoming.responding_to { tracing::debug!( - %connection_id, + ?connection_id, message_id, responding_to, "incoming response: received" @@ -274,7 +287,7 @@ impl Peer { let requester_resumed = oneshot::channel(); if let Err(error) = tx.send((incoming, requester_resumed.0)) { tracing::debug!( - %connection_id, + ?connection_id, message_id, responding_to = responding_to, ?error, @@ -283,21 +296,21 @@ impl Peer { } tracing::debug!( - %connection_id, + ?connection_id, message_id, responding_to, "incoming response: waiting to resume requester" ); let _ = requester_resumed.1.await; tracing::debug!( - %connection_id, + ?connection_id, message_id, responding_to, "incoming response: requester resumed" ); } else { tracing::warn!( - %connection_id, + ?connection_id, message_id, responding_to, "incoming response: unknown request" @@ -306,14 +319,10 @@ impl Peer { None } else { - tracing::debug!( - %connection_id, - message_id, - "incoming message: received" - ); + tracing::debug!(?connection_id, message_id, "incoming message: received"); proto::build_typed_envelope(connection_id, incoming).or_else(|| { tracing::error!( - %connection_id, + ?connection_id, message_id, "unable to construct a typed envelope" ); @@ -345,6 +354,7 @@ impl Peer { pub fn reset(&self) { self.connections.write().clear(); + self.next_connection_id.store(0, SeqCst); } pub fn request( @@ -384,7 +394,7 @@ impl Peer { .unbounded_send(proto::Message::Envelope(request.into_envelope( message_id, None, - original_sender_id.map(|id| id.0), + original_sender_id.map(Into::into), ))) .map_err(|_| anyhow!("connection was closed"))?; Ok(()) @@ -433,7 +443,7 @@ impl Peer { .unbounded_send(proto::Message::Envelope(message.into_envelope( message_id, None, - Some(sender_id.0), + Some(sender_id.into()), )))?; Ok(()) } @@ -480,7 +490,7 @@ impl Peer { let connections = self.connections.read(); let connection = connections .get(&connection_id) - .ok_or_else(|| anyhow!("no such connection: {}", connection_id))?; + .ok_or_else(|| anyhow!("no such connection: {:?}", connection_id))?; Ok(connection.clone()) } } @@ -515,9 +525,9 @@ mod tests { let executor = cx.foreground(); // create 2 clients connected to 1 server - let server = Peer::new(); - let client1 = Peer::new(); - let client2 = Peer::new(); + let server = Peer::new(0); + let client1 = Peer::new(0); + let client2 = Peer::new(0); let (client1_to_server_conn, server_to_client_1_conn, _kill) = Connection::in_memory(cx.background()); @@ -609,8 +619,8 @@ mod tests { #[gpui::test(iterations = 50)] async fn test_order_of_response_and_incoming(cx: &mut TestAppContext) { let executor = cx.foreground(); - let server = Peer::new(); - let client = Peer::new(); + let server = Peer::new(0); + let client = Peer::new(0); let (client_to_server_conn, server_to_client_conn, _kill) = Connection::in_memory(cx.background()); @@ -707,8 +717,8 @@ mod tests { #[gpui::test(iterations = 50)] async fn test_dropping_request_before_completion(cx: &mut TestAppContext) { let executor = cx.foreground(); - let server = Peer::new(); - let client = Peer::new(); + let server = Peer::new(0); + let client = Peer::new(0); let (client_to_server_conn, server_to_client_conn, _kill) = Connection::in_memory(cx.background()); @@ -822,7 +832,7 @@ mod tests { let (client_conn, mut server_conn, _kill) = Connection::in_memory(cx.background()); - let client = Peer::new(); + let client = Peer::new(0); let (connection_id, io_handler, mut incoming) = client.add_test_connection(client_conn, cx.background()); @@ -857,7 +867,7 @@ mod tests { let executor = cx.foreground(); let (client_conn, mut server_conn, _kill) = Connection::in_memory(cx.background()); - let client = Peer::new(); + let client = Peer::new(0); let (connection_id, io_handler, mut incoming) = client.add_test_connection(client_conn, cx.background()); executor.spawn(io_handler).detach(); diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 6d9bc9a0aa..77d82ec308 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -1,14 +1,16 @@ -use super::{entity_messages, messages, request_messages, ConnectionId, PeerId, TypedEnvelope}; +use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope}; use anyhow::{anyhow, Result}; use async_tungstenite::tungstenite::Message as WebSocketMessage; use futures::{SinkExt as _, StreamExt as _}; use prost::Message as _; use serde::Serialize; use std::any::{Any, TypeId}; -use std::{cmp, iter, mem}; +use std::fmt; +use std::str::FromStr; use std::{ + cmp, fmt::Debug, - io, + io, iter, mem, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -21,7 +23,7 @@ pub trait EnvelopedMessage: Clone + Debug + Serialize + Sized + Send + Sync + 's self, id: u32, responding_to: Option, - original_sender_id: Option, + original_sender_id: Option, ) -> Envelope; fn from_envelope(envelope: Envelope) -> Option; } @@ -70,7 +72,67 @@ impl AnyTypedEnvelope for TypedEnvelope { } fn original_sender_id(&self) -> Option { - self.original_sender_id + self.original_sender_id.clone() + } +} + +impl PeerId { + pub fn from_u64(peer_id: u64) -> Self { + let epoch = (peer_id >> 32) as u32; + let id = peer_id as u32; + Self { epoch, id } + } + + pub fn as_u64(self) -> u64 { + ((self.epoch as u64) << 32) | (self.id as u64) + } +} + +impl Copy for PeerId {} + +impl Eq for PeerId {} + +impl Ord for PeerId { + fn cmp(&self, other: &Self) -> cmp::Ordering { + self.epoch + .cmp(&other.epoch) + .then_with(|| self.id.cmp(&other.id)) + } +} + +impl PartialOrd for PeerId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl std::hash::Hash for PeerId { + fn hash(&self, state: &mut H) { + self.epoch.hash(state); + self.id.hash(state); + } +} + +impl fmt::Display for PeerId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}/{}", self.epoch, self.id) + } +} + +impl FromStr for PeerId { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let mut components = s.split('/'); + let epoch = components + .next() + .ok_or_else(|| anyhow!("invalid peer id {:?}", s))? + .parse()?; + let id = components + .next() + .ok_or_else(|| anyhow!("invalid peer id {:?}", s))? + .parse()?; + Ok(PeerId { epoch, id }) } } @@ -477,4 +539,25 @@ mod tests { stream.read().await.unwrap(); assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN); } + + #[gpui::test] + fn test_converting_peer_id_from_and_to_u64() { + let peer_id = PeerId { epoch: 10, id: 3 }; + assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id); + let peer_id = PeerId { + epoch: u32::MAX, + id: 3, + }; + assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id); + let peer_id = PeerId { + epoch: 10, + id: u32::MAX, + }; + assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id); + let peer_id = PeerId { + epoch: u32::MAX, + id: u32::MAX, + }; + assert_eq!(PeerId::from_u64(peer_id.as_u64()), peer_id); + } } diff --git a/crates/rpc/src/rpc.rs b/crates/rpc/src/rpc.rs index 1d4a4496d0..d06f1c1c5c 100644 --- a/crates/rpc/src/rpc.rs +++ b/crates/rpc/src/rpc.rs @@ -6,4 +6,4 @@ pub use conn::Connection; pub use peer::*; mod macros; -pub const PROTOCOL_VERSION: u32 = 41; +pub const PROTOCOL_VERSION: u32 = 42; diff --git a/crates/workspace/src/item.rs b/crates/workspace/src/item.rs index 14f847fd54..bd00272c94 100644 --- a/crates/workspace/src/item.rs +++ b/crates/workspace/src/item.rs @@ -280,7 +280,7 @@ impl ItemHandle for ViewHandle { proto::update_followers::Variant::CreateView(proto::View { id: followed_item.id() as u64, variant: Some(message), - leader_id: workspace.leader_for_pane(&pane).map(|id| id.0), + leader_id: workspace.leader_for_pane(&pane), }), cx, ); @@ -334,7 +334,7 @@ impl ItemHandle for ViewHandle { proto::UpdateView { id: item.id() as u64, variant: pending_update.borrow_mut().take(), - leader_id: leader_id.map(|id| id.0), + leader_id, }, ), cx, diff --git a/crates/workspace/src/shared_screen.rs b/crates/workspace/src/shared_screen.rs index 7dee642423..d292ece3d5 100644 --- a/crates/workspace/src/shared_screen.rs +++ b/crates/workspace/src/shared_screen.rs @@ -3,7 +3,7 @@ use crate::{ }; use anyhow::{anyhow, Result}; use call::participant::{Frame, RemoteVideoTrack}; -use client::{PeerId, User}; +use client::{proto::PeerId, User}; use futures::StreamExt; use gpui::{ elements::*, diff --git a/crates/workspace/src/workspace.rs b/crates/workspace/src/workspace.rs index 387a18006a..46e790b892 100644 --- a/crates/workspace/src/workspace.rs +++ b/crates/workspace/src/workspace.rs @@ -25,7 +25,10 @@ use std::{ use anyhow::{anyhow, Context, Result}; use call::ActiveCall; -use client::{proto, Client, PeerId, TypedEnvelope, UserStore}; +use client::{ + proto::{self, PeerId}, + Client, TypedEnvelope, UserStore, +}; use collections::{hash_map, HashMap, HashSet}; use dock::{Dock, DockDefaultItemFactory, ToggleDockButton}; use drag_and_drop::DragAndDrop; @@ -1441,7 +1444,7 @@ impl Workspace { self.update_followers( proto::update_followers::Variant::UpdateActiveView(proto::UpdateActiveView { id: self.active_item(cx).map(|item| item.id() as u64), - leader_id: self.leader_for_pane(&pane).map(|id| id.0), + leader_id: self.leader_for_pane(&pane), }), cx, ); @@ -1620,7 +1623,7 @@ impl Workspace { let project_id = self.project.read(cx).remote_id()?; let request = self.client.request(proto::Follow { project_id, - leader_id: leader_id.0, + leader_id: Some(leader_id), }); Some(cx.spawn_weak(|this, mut cx| async move { let response = request.await?; @@ -1692,7 +1695,7 @@ impl Workspace { self.client .send(proto::Unfollow { project_id, - leader_id: leader_id.0, + leader_id: Some(leader_id), }) .log_err(); } @@ -1888,7 +1891,7 @@ impl Workspace { .panes() .iter() .flat_map(|pane| { - let leader_id = this.leader_for_pane(pane).map(|id| id.0); + let leader_id = this.leader_for_pane(pane); pane.read(cx).items().filter_map({ let cx = &cx; move |item| { @@ -1997,7 +2000,7 @@ impl Workspace { .get(&leader_id) .map(|c| c.replica_id) }) - .ok_or_else(|| anyhow!("no such collaborator {}", leader_id))?; + .ok_or_else(|| anyhow!("no such collaborator {:?}", leader_id))?; let item_builders = cx.update(|cx| { cx.default_global::() @@ -2077,7 +2080,7 @@ impl Workspace { self.client .send(proto::UpdateFollowers { project_id, - follower_ids: self.leader_state.followers.iter().map(|f| f.0).collect(), + follower_ids: self.leader_state.followers.iter().copied().collect(), variant: Some(update), }) .log_err();