diff --git a/crates/client/src/test.rs b/crates/client/src/test.rs index b91a943002..a40d1ee3a7 100644 --- a/crates/client/src/test.rs +++ b/crates/client/src/test.rs @@ -2,7 +2,7 @@ use super::Client; use super::*; use crate::http::{HttpClient, Request, Response, ServerResponse}; use futures::{future::BoxFuture, Future}; -use gpui::TestAppContext; +use gpui::{ModelHandle, TestAppContext}; use parking_lot::Mutex; use postage::{mpsc, prelude::Stream}; use rpc::{proto, ConnectionId, Peer, Receipt, TypedEnvelope}; @@ -155,6 +155,24 @@ impl FakeServer { fn connection_id(&self) -> ConnectionId { self.connection_id.lock().expect("not connected") } + + pub async fn build_user_store( + &self, + client: Arc, + cx: &mut TestAppContext, + ) -> ModelHandle { + let http_client = FakeHttpClient::with_404_response(); + let user_store = cx.add_model(|cx| UserStore::new(client, http_client, cx)); + assert_eq!( + self.receive::() + .await + .unwrap() + .payload + .user_ids, + &[self.user_id] + ); + user_store + } } pub struct FakeHttpClient { diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index f21abf06de..0fec2d4209 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -69,6 +69,26 @@ pub struct Collaborator { pub replica_id: ReplicaId, } +impl Collaborator { + fn from_proto( + message: proto::Collaborator, + user_store: &ModelHandle, + cx: &mut AsyncAppContext, + ) -> impl Future> { + let user = user_store.update(cx, |user_store, cx| { + user_store.fetch_user(message.user_id, cx) + }); + + async move { + Ok(Self { + peer_id: PeerId(message.peer_id), + user: user.await?, + replica_id: message.replica_id as ReplicaId, + }) + } + } +} + impl Entity for Worktree { type Event = Event; @@ -174,7 +194,6 @@ impl Worktree { let remote_id = worktree.id; let replica_id = join_response.replica_id as ReplicaId; - let peers = join_response.peers; let root_char_bag: CharBag = worktree .root_name .chars() @@ -209,24 +228,18 @@ impl Worktree { }) .await; - let user_ids = peers.iter().map(|peer| peer.user_id).collect(); + let user_ids = join_response + .collaborators + .iter() + .map(|peer| peer.user_id) + .collect(); user_store .update(cx, |user_store, cx| user_store.load_users(user_ids, cx)) .await?; - let mut collaborators = HashMap::with_capacity(peers.len()); - for peer in &peers { - let peer_id = PeerId(peer.peer_id); - let user = user_store - .update(cx, |user_store, cx| user_store.fetch_user(peer.user_id, cx)) - .await?; - collaborators.insert( - peer_id, - Collaborator { - peer_id, - user, - replica_id: peer.replica_id as ReplicaId, - }, - ); + let mut collaborators = HashMap::with_capacity(join_response.collaborators.len()); + for message in join_response.collaborators { + let collaborator = Collaborator::from_proto(message, &user_store, cx).await?; + collaborators.insert(collaborator.peer_id, collaborator); } let worktree = cx.update(|cx| { @@ -274,8 +287,8 @@ impl Worktree { } let _subscriptions = vec![ - client.subscribe_to_entity(remote_id, cx, Self::handle_add_peer), - client.subscribe_to_entity(remote_id, cx, Self::handle_remove_peer), + client.subscribe_to_entity(remote_id, cx, Self::handle_add_collaborator), + client.subscribe_to_entity(remote_id, cx, Self::handle_remove_collaborator), client.subscribe_to_entity(remote_id, cx, Self::handle_update), client.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer), client.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved), @@ -347,27 +360,52 @@ impl Worktree { } } - pub fn handle_add_peer( - &mut self, - envelope: TypedEnvelope, - _: Arc, - cx: &mut ModelContext, - ) -> Result<()> { + pub fn user_store(&self) -> &ModelHandle { match self { - Worktree::Local(worktree) => worktree.add_peer(envelope, cx), - Worktree::Remote(worktree) => worktree.add_peer(envelope, cx), + Worktree::Local(worktree) => &worktree.user_store, + Worktree::Remote(worktree) => &worktree.user_store, } } - pub fn handle_remove_peer( + pub fn handle_add_collaborator( &mut self, - envelope: TypedEnvelope, + mut envelope: TypedEnvelope, + _: Arc, + cx: &mut ModelContext, + ) -> Result<()> { + let user_store = self.user_store().clone(); + let collaborator = envelope + .payload + .collaborator + .take() + .ok_or_else(|| anyhow!("empty collaborator"))?; + + cx.spawn(|this, mut cx| { + async move { + let collaborator = + Collaborator::from_proto(collaborator, &user_store, &mut cx).await?; + this.update(&mut cx, |this, cx| match this { + Worktree::Local(worktree) => worktree.add_collaborator(collaborator, cx), + Worktree::Remote(worktree) => worktree.add_collaborator(collaborator, cx), + }); + Ok(()) + } + .log_err() + }) + .detach(); + + Ok(()) + } + + pub fn handle_remove_collaborator( + &mut self, + envelope: TypedEnvelope, _: Arc, cx: &mut ModelContext, ) -> Result<()> { match self { - Worktree::Local(worktree) => worktree.remove_peer(envelope, cx), - Worktree::Remote(worktree) => worktree.remove_peer(envelope, cx), + Worktree::Local(worktree) => worktree.remove_collaborator(envelope, cx), + Worktree::Remote(worktree) => worktree.remove_collaborator(envelope, cx), } } @@ -1107,33 +1145,19 @@ impl LocalWorktree { Ok(()) } - pub fn add_peer( + pub fn add_collaborator( &mut self, - envelope: TypedEnvelope, + collaborator: Collaborator, cx: &mut ModelContext, - ) -> Result<()> { - let peer = envelope - .payload - .peer - .as_ref() - .ok_or_else(|| anyhow!("empty peer"))?; - let peer_id = PeerId(peer.peer_id); - self.collaborators.insert( - peer_id, - Collaborator { - peer_id, - user: todo!(), - replica_id: peer.replica_id as ReplicaId, - }, - ); + ) { + self.collaborators + .insert(collaborator.peer_id, collaborator); cx.notify(); - - Ok(()) } - pub fn remove_peer( + pub fn remove_collaborator( &mut self, - envelope: TypedEnvelope, + envelope: TypedEnvelope, cx: &mut ModelContext, ) -> Result<()> { let peer_id = PeerId(envelope.payload.peer_id); @@ -1316,8 +1340,8 @@ impl LocalWorktree { this.update(&mut cx, |worktree, cx| { let _subscriptions = vec![ - rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_add_peer), - rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_remove_peer), + rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_add_collaborator), + rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_remove_collaborator), rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_open_buffer), rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_close_buffer), rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_update_buffer), @@ -1532,32 +1556,19 @@ impl RemoteWorktree { Ok(()) } - pub fn add_peer( + pub fn add_collaborator( &mut self, - envelope: TypedEnvelope, + collaborator: Collaborator, cx: &mut ModelContext, - ) -> Result<()> { - let peer = envelope - .payload - .peer - .as_ref() - .ok_or_else(|| anyhow!("empty peer"))?; - let peer_id = PeerId(peer.peer_id); - self.collaborators.insert( - peer_id, - Collaborator { - peer_id, - user: todo!(), - replica_id: peer.replica_id as ReplicaId, - }, - ); + ) { + self.collaborators + .insert(collaborator.peer_id, collaborator); cx.notify(); - Ok(()) } - pub fn remove_peer( + pub fn remove_collaborator( &mut self, - envelope: TypedEnvelope, + envelope: TypedEnvelope, cx: &mut ModelContext, ) -> Result<()> { let peer_id = PeerId(envelope.payload.peer_id); @@ -3146,9 +3157,8 @@ mod tests { let user_id = 5; let mut client = Client::new(); - let http_client = FakeHttpClient::with_404_response(); - let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx)); let server = FakeServer::for_client(user_id, &mut client, &cx).await; + let user_store = server.build_user_store(client.clone(), &mut cx).await; let tree = Worktree::open_local( client, user_store.clone(), @@ -3203,7 +3213,7 @@ mod tests { proto::JoinWorktreeResponse { worktree: share_request.await.unwrap().worktree, replica_id: 1, - peers: Vec::new(), + collaborators: Vec::new(), }, Client::new(), user_store, @@ -3355,8 +3365,7 @@ mod tests { let user_id = 100; let mut client = Client::new(); let server = FakeServer::for_client(user_id, &mut client, &cx).await; - let http_client = FakeHttpClient::with_404_response(); - let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx)); + let user_store = server.build_user_store(client.clone(), &mut cx).await; let fs = Arc::new(FakeFs::new()); fs.insert_tree( @@ -3383,11 +3392,6 @@ mod tests { .await .unwrap(); - { - let cx = cx.to_async(); - client.authenticate_and_connect(&cx).await.unwrap(); - } - let open_worktree = server.receive::().await.unwrap(); assert_eq!( open_worktree.payload, diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 5b899ae916..775f94d595 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -21,8 +21,8 @@ message Envelope { UpdateBuffer update_buffer = 16; SaveBuffer save_buffer = 17; BufferSaved buffer_saved = 18; - AddPeer add_peer = 19; - RemovePeer remove_peer = 20; + AddCollaborator add_collaborator = 19; + RemoveCollaborator remove_collaborator = 20; GetChannels get_channels = 21; GetChannelsResponse get_channels_response = 22; GetUsers get_users = 23; @@ -83,7 +83,7 @@ message LeaveWorktree { message JoinWorktreeResponse { Worktree worktree = 2; uint32 replica_id = 3; - repeated Peer peers = 4; + repeated Collaborator collaborators = 4; } message UpdateWorktree { @@ -96,12 +96,12 @@ message CloseWorktree { uint64 worktree_id = 1; } -message AddPeer { +message AddCollaborator { uint64 worktree_id = 1; - Peer peer = 2; + Collaborator collaborator = 2; } -message RemovePeer { +message RemoveCollaborator { uint64 worktree_id = 1; uint32 peer_id = 2; } @@ -196,7 +196,7 @@ message UpdateContacts { // Entities -message Peer { +message Collaborator { uint32 peer_id = 1; uint32 replica_id = 2; uint64 user_id = 3; diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 8299952c79..bfdce85b77 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -121,7 +121,7 @@ macro_rules! entity_messages { messages!( Ack, - AddPeer, + AddCollaborator, BufferSaved, ChannelMessageSent, CloseBuffer, @@ -145,7 +145,7 @@ messages!( OpenWorktree, OpenWorktreeResponse, Ping, - RemovePeer, + RemoveCollaborator, SaveBuffer, SendChannelMessage, SendChannelMessageResponse, @@ -174,13 +174,13 @@ request_messages!( entity_messages!( worktree_id, - AddPeer, + AddCollaborator, BufferSaved, CloseBuffer, CloseWorktree, OpenBuffer, JoinWorktree, - RemovePeer, + RemoveCollaborator, SaveBuffer, UnshareWorktree, UpdateBuffer, diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 9f4c724770..a4c21b4ea4 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -187,7 +187,7 @@ impl Server { broadcast(connection_id, peer_ids, |conn_id| { self.peer.send( conn_id, - proto::RemovePeer { + proto::RemoveCollaborator { worktree_id, peer_id: connection_id.0, }, @@ -341,15 +341,15 @@ impl Server { .and_then(|joined| { let share = joined.worktree.share()?; let peer_count = share.guests.len(); - let mut peers = Vec::with_capacity(peer_count); - peers.push(proto::Peer { + let mut collaborators = Vec::with_capacity(peer_count); + collaborators.push(proto::Collaborator { peer_id: joined.worktree.host_connection_id.0, replica_id: 0, user_id: joined.worktree.host_user_id.to_proto(), }); for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests { if *peer_conn_id != request.sender_id { - peers.push(proto::Peer { + collaborators.push(proto::Collaborator { peer_id: peer_conn_id.0, replica_id: *peer_replica_id as u32, user_id: peer_user_id.to_proto(), @@ -363,7 +363,7 @@ impl Server { entries: share.entries.values().cloned().collect(), }), replica_id: joined.replica_id as u32, - peers, + collaborators, }; let connection_ids = joined.worktree.connection_ids(); let contact_user_ids = joined.worktree.authorized_user_ids.clone(); @@ -375,9 +375,9 @@ impl Server { broadcast(request.sender_id, connection_ids, |conn_id| { self.peer.send( conn_id, - proto::AddPeer { + proto::AddCollaborator { worktree_id, - peer: Some(proto::Peer { + collaborator: Some(proto::Collaborator { peer_id: request.sender_id.0, replica_id: response.replica_id, user_id: user_id.to_proto(), @@ -415,7 +415,7 @@ impl Server { broadcast(sender_id, worktree.connection_ids, |conn_id| { self.peer.send( conn_id, - proto::RemovePeer { + proto::RemoveCollaborator { worktree_id, peer_id: sender_id.0, }, @@ -960,7 +960,7 @@ mod tests { // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, _) = server.create_client(&mut cx_a, "user_a").await; + let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; cx_a.foreground().forbid_parking(); @@ -978,6 +978,7 @@ mod tests { .await; let worktree_a = Worktree::open_local( client_a.clone(), + user_store_a, "/a".as_ref(), fs, lang_registry.clone(), @@ -1052,7 +1053,7 @@ mod tests { .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx)) .await; - // Dropping the worktree removes client B from client A's peers. + // Dropping the worktree removes client B from client A's collaborators. cx_b.update(move |_| drop(worktree_b)); worktree_a .condition(&cx_a, |tree, _| tree.collaborators().is_empty()) @@ -1089,6 +1090,7 @@ mod tests { .await; let worktree_a = Worktree::open_local( app_state_a.client.clone(), + app_state_a.user_store.clone(), "/a".as_ref(), fs, app_state_a.languages.clone(), @@ -1163,7 +1165,7 @@ mod tests { // Connect to a server as 3 clients. let mut server = TestServer::start().await; - let (client_a, _) = server.create_client(&mut cx_a, "user_a").await; + let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; let (client_c, user_store_c) = server.create_client(&mut cx_c, "user_c").await; @@ -1182,6 +1184,7 @@ mod tests { let worktree_a = Worktree::open_local( client_a.clone(), + user_store_a, "/a".as_ref(), fs.clone(), lang_registry.clone(), @@ -1304,7 +1307,7 @@ mod tests { // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, _) = server.create_client(&mut cx_a, "user_a").await; + let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; // Share a local worktree as client A @@ -1320,6 +1323,7 @@ mod tests { let worktree_a = Worktree::open_local( client_a.clone(), + user_store_a, "/dir".as_ref(), fs, lang_registry.clone(), @@ -1390,7 +1394,7 @@ mod tests { // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, _) = server.create_client(&mut cx_a, "user_a").await; + let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; // Share a local worktree as client A @@ -1405,6 +1409,7 @@ mod tests { .await; let worktree_a = Worktree::open_local( client_a.clone(), + user_store_a, "/dir".as_ref(), fs, lang_registry.clone(), @@ -1457,7 +1462,7 @@ mod tests { // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, _) = server.create_client(&mut cx_a, "user_a").await; + let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; // Share a local worktree as client A @@ -1472,6 +1477,7 @@ mod tests { .await; let worktree_a = Worktree::open_local( client_a.clone(), + user_store_a, "/dir".as_ref(), fs, lang_registry.clone(), @@ -1512,14 +1518,14 @@ mod tests { } #[gpui::test] - async fn test_peer_disconnection(mut cx_a: TestAppContext, cx_b: TestAppContext) { + async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { cx_a.foreground().forbid_parking(); let lang_registry = Arc::new(LanguageRegistry::new()); // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, _) = server.create_client(&mut cx_a, "user_a").await; - let (client_b, user_store_b) = server.create_client(&mut cx_a, "user_b").await; + let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; + let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; // Share a local worktree as client A let fs = Arc::new(FakeFs::new()); @@ -1534,6 +1540,7 @@ mod tests { .await; let worktree_a = Worktree::open_local( client_a.clone(), + user_store_a, "/a".as_ref(), fs, lang_registry.clone(), @@ -1593,8 +1600,8 @@ mod tests { // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, _) = server.create_client(&mut cx_a, "user_a").await; - let (client_b, user_store_b) = server.create_client(&mut cx_a, "user_b").await; + let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; + let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; // Share a local worktree as client A let fs = Arc::new(FakeFs::new()); @@ -1609,6 +1616,7 @@ mod tests { .await; let worktree_a = Worktree::open_local( client_a.clone(), + user_store_a, "/a".as_ref(), fs, lang_registry.clone(), @@ -2140,6 +2148,7 @@ mod tests { let worktree_a = Worktree::open_local( client_a.clone(), + user_store_a.clone(), "/a".as_ref(), fs.clone(), lang_registry.clone(),