From 4bd43e67ef8da054b605d7ad93ba8af65ad0b958 Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Sat, 27 Nov 2021 12:33:25 -0700 Subject: [PATCH] Introduce a TestClient and associate it with a PeerId This makes it easier to integration test peer interactions because now we know their PeerIds. --- crates/project/src/worktree.rs | 1 + crates/rpc/src/peer.rs | 14 +- crates/server/src/rpc.rs | 237 +++++++++++++++++++++------------ 3 files changed, 156 insertions(+), 96 deletions(-) diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 0fec2d4209..50f5b2d044 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -63,6 +63,7 @@ pub enum Event { Closed, } +#[derive(Debug)] pub struct Collaborator { pub user: Arc, pub peer_id: PeerId, diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 7752fc231a..454881fece 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -8,6 +8,7 @@ use postage::{ prelude::{Sink as _, Stream as _}, }; use smol_timeout::TimeoutExt as _; +use std::sync::atomic::Ordering::SeqCst; use std::{ collections::HashMap, fmt, @@ -81,12 +82,12 @@ impl TypedEnvelope { } pub struct Peer { - connections: RwLock>, + pub connections: RwLock>, next_connection_id: AtomicU32, } #[derive(Clone)] -struct ConnectionState { +pub struct ConnectionState { outgoing_tx: mpsc::Sender, next_message_id: Arc, response_channels: Arc>>>>, @@ -110,10 +111,7 @@ impl Peer { impl Future> + Send, mpsc::Receiver>, ) { - let connection_id = ConnectionId( - self.next_connection_id - .fetch_add(1, atomic::Ordering::SeqCst), - ); + let connection_id = ConnectionId(self.next_connection_id.fetch_add(1, SeqCst)); let (mut incoming_tx, incoming_rx) = mpsc::channel(64); let (outgoing_tx, mut outgoing_rx) = mpsc::channel(64); let connection_state = ConnectionState { @@ -219,9 +217,7 @@ impl Peer { let (tx, mut rx) = mpsc::channel(1); async move { let mut connection = this.connection_state(receiver_id).await?; - let message_id = connection - .next_message_id - .fetch_add(1, atomic::Ordering::SeqCst); + let message_id = connection.next_message_id.fetch_add(1, SeqCst); connection .response_channels .lock() diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index a4c21b4ea4..0e317028c7 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -112,11 +112,17 @@ impl Server { connection: Connection, addr: String, user_id: UserId, + mut send_connection_id: Option>, ) -> impl Future { let mut this = self.clone(); async move { let (connection_id, handle_io, mut incoming_rx) = this.peer.add_connection(connection).await; + + if let Some(send_connection_id) = send_connection_id.as_mut() { + let _ = send_connection_id.send(connection_id).await; + } + this.state_mut().add_connection(connection_id, user_id); if let Err(err) = this.update_contacts_for_users(&[user_id]).await { log::error!("error updating contacts for {:?}: {}", user_id, err); @@ -887,6 +893,7 @@ pub fn add_routes(app: &mut tide::Server>, rpc: &Arc) { ), addr, user_id, + None, ) .await; } @@ -925,9 +932,11 @@ mod tests { use gpui::{ModelHandle, TestAppContext}; use parking_lot::Mutex; use postage::{mpsc, watch}; + use rpc::PeerId; use serde_json::json; use sqlx::types::time::OffsetDateTime; use std::{ + ops::Deref, path::Path, sync::{ atomic::{AtomicBool, Ordering::SeqCst}, @@ -960,8 +969,8 @@ mod tests { // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; - let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; + let client_a = server.create_client(&mut cx_a, "user_a").await; + let client_b = server.create_client(&mut cx_b, "user_b").await; cx_a.foreground().forbid_parking(); @@ -978,7 +987,7 @@ mod tests { .await; let worktree_a = Worktree::open_local( client_a.clone(), - user_store_a, + client_a.user_store.clone(), "/a".as_ref(), fs, lang_registry.clone(), @@ -999,17 +1008,31 @@ mod tests { client_b.clone(), worktree_id, lang_registry.clone(), - user_store_b, + client_b.user_store.clone(), &mut cx_b.to_async(), ) .await .unwrap(); - let replica_id_b = worktree_b.read_with(&cx_b, |tree, _| tree.replica_id()); + + let replica_id_b = worktree_b.read_with(&cx_b, |tree, _| { + assert_eq!( + tree.collaborators() + .get(&client_a.peer_id) + .unwrap() + .user + .github_login, + "user_a" + ); + tree.replica_id() + }); worktree_a .condition(&cx_a, |tree, _| { tree.collaborators() - .values() - .any(|collaborator| collaborator.replica_id == replica_id_b) + .get(&client_b.peer_id) + .map_or(false, |collaborator| { + collaborator.replica_id == replica_id_b + && collaborator.user.github_login == "user_b" + }) }) .await; @@ -1068,12 +1091,12 @@ mod tests { // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; - let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; - Arc::get_mut(&mut app_state_a).unwrap().client = client_a; - Arc::get_mut(&mut app_state_a).unwrap().user_store = user_store_a; - Arc::get_mut(&mut app_state_b).unwrap().client = client_b; - Arc::get_mut(&mut app_state_b).unwrap().user_store = user_store_b; + let client_a = server.create_client(&mut cx_a, "user_a").await; + let client_b = server.create_client(&mut cx_b, "user_b").await; + Arc::get_mut(&mut app_state_a).unwrap().client = client_a.clone(); + Arc::get_mut(&mut app_state_a).unwrap().user_store = client_a.user_store.clone(); + Arc::get_mut(&mut app_state_b).unwrap().client = client_b.clone(); + Arc::get_mut(&mut app_state_b).unwrap().user_store = client_b.user_store.clone(); cx_a.foreground().forbid_parking(); @@ -1165,9 +1188,9 @@ mod tests { // Connect to a server as 3 clients. let mut server = TestServer::start().await; - let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; - let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; - let (client_c, user_store_c) = server.create_client(&mut cx_c, "user_c").await; + let client_a = server.create_client(&mut cx_a, "user_a").await; + let client_b = server.create_client(&mut cx_b, "user_b").await; + let client_c = server.create_client(&mut cx_c, "user_c").await; let fs = Arc::new(FakeFs::new()); @@ -1184,7 +1207,7 @@ mod tests { let worktree_a = Worktree::open_local( client_a.clone(), - user_store_a, + client_a.user_store.clone(), "/a".as_ref(), fs.clone(), lang_registry.clone(), @@ -1205,7 +1228,7 @@ mod tests { client_b.clone(), worktree_id, lang_registry.clone(), - user_store_b, + client_b.user_store.clone(), &mut cx_b.to_async(), ) .await @@ -1214,7 +1237,7 @@ mod tests { client_c.clone(), worktree_id, lang_registry.clone(), - user_store_c, + client_c.user_store.clone(), &mut cx_c.to_async(), ) .await @@ -1307,8 +1330,8 @@ mod tests { // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; - let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; + let client_a = server.create_client(&mut cx_a, "user_a").await; + let client_b = server.create_client(&mut cx_b, "user_b").await; // Share a local worktree as client A let fs = Arc::new(FakeFs::new()); @@ -1323,7 +1346,7 @@ mod tests { let worktree_a = Worktree::open_local( client_a.clone(), - user_store_a, + client_a.user_store.clone(), "/dir".as_ref(), fs, lang_registry.clone(), @@ -1344,7 +1367,7 @@ mod tests { client_b.clone(), worktree_id, lang_registry.clone(), - user_store_b, + client_b.user_store.clone(), &mut cx_b.to_async(), ) .await @@ -1394,8 +1417,8 @@ mod tests { // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; - let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; + let client_a = server.create_client(&mut cx_a, "user_a").await; + let client_b = server.create_client(&mut cx_b, "user_b").await; // Share a local worktree as client A let fs = Arc::new(FakeFs::new()); @@ -1409,7 +1432,7 @@ mod tests { .await; let worktree_a = Worktree::open_local( client_a.clone(), - user_store_a, + client_a.user_store.clone(), "/dir".as_ref(), fs, lang_registry.clone(), @@ -1430,7 +1453,7 @@ mod tests { client_b.clone(), worktree_id, lang_registry.clone(), - user_store_b, + client_b.user_store.clone(), &mut cx_b.to_async(), ) .await @@ -1462,8 +1485,8 @@ mod tests { // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; - let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; + let client_a = server.create_client(&mut cx_a, "user_a").await; + let client_b = server.create_client(&mut cx_b, "user_b").await; // Share a local worktree as client A let fs = Arc::new(FakeFs::new()); @@ -1477,7 +1500,7 @@ mod tests { .await; let worktree_a = Worktree::open_local( client_a.clone(), - user_store_a, + client_a.user_store.clone(), "/dir".as_ref(), fs, lang_registry.clone(), @@ -1498,7 +1521,7 @@ mod tests { client_b.clone(), worktree_id, lang_registry.clone(), - user_store_b, + client_b.user_store.clone(), &mut cx_b.to_async(), ) .await @@ -1524,8 +1547,8 @@ mod tests { // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; - let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; + let client_a = server.create_client(&mut cx_a, "user_a").await; + let client_b = server.create_client(&mut cx_b, "user_b").await; // Share a local worktree as client A let fs = Arc::new(FakeFs::new()); @@ -1540,7 +1563,7 @@ mod tests { .await; let worktree_a = Worktree::open_local( client_a.clone(), - user_store_a, + client_a.user_store.clone(), "/a".as_ref(), fs, lang_registry.clone(), @@ -1561,7 +1584,7 @@ mod tests { client_b.clone(), worktree_id, lang_registry.clone(), - user_store_b, + client_b.user_store.clone(), &mut cx_b.to_async(), ) .await @@ -1600,8 +1623,8 @@ mod tests { // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; - let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; + let client_a = server.create_client(&mut cx_a, "user_a").await; + let client_b = server.create_client(&mut cx_b, "user_b").await; // Share a local worktree as client A let fs = Arc::new(FakeFs::new()); @@ -1616,7 +1639,7 @@ mod tests { .await; let worktree_a = Worktree::open_local( client_a.clone(), - user_store_a, + client_a.user_store.clone(), "/a".as_ref(), fs, lang_registry.clone(), @@ -1671,7 +1694,7 @@ mod tests { client_b.clone(), worktree_id, lang_registry.clone(), - user_store_b, + client_b.user_store.clone(), &mut cx_b.to_async(), ) .await @@ -1719,30 +1742,30 @@ mod tests { // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; - let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; + let client_a = server.create_client(&mut cx_a, "user_a").await; + let client_b = server.create_client(&mut cx_b, "user_b").await; // Create an org that includes these 2 users. let db = &server.app_state.db; let org_id = db.create_org("Test Org", "test-org").await.unwrap(); - db.add_org_member(org_id, current_user_id(&user_store_a, &cx_a), false) + db.add_org_member(org_id, client_a.current_user_id(&cx_a), false) .await .unwrap(); - db.add_org_member(org_id, current_user_id(&user_store_b, &cx_b), false) + db.add_org_member(org_id, client_b.current_user_id(&cx_b), false) .await .unwrap(); // Create a channel that includes all the users. let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap(); - db.add_channel_member(channel_id, current_user_id(&user_store_a, &cx_a), false) + db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false) .await .unwrap(); - db.add_channel_member(channel_id, current_user_id(&user_store_b, &cx_b), false) + db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false) .await .unwrap(); db.create_channel_message( channel_id, - current_user_id(&user_store_b, &cx_b), + client_b.current_user_id(&cx_b), "hello A, it's B.", OffsetDateTime::now_utc(), 1, @@ -1750,7 +1773,8 @@ mod tests { .await .unwrap(); - let channels_a = cx_a.add_model(|cx| ChannelList::new(user_store_a, client_a, cx)); + let channels_a = cx_a + .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx)); channels_a .condition(&mut cx_a, |list, _| list.available_channels().is_some()) .await; @@ -1774,7 +1798,8 @@ mod tests { }) .await; - let channels_b = cx_b.add_model(|cx| ChannelList::new(user_store_b, client_b, cx)); + let channels_b = cx_b + .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx)); channels_b .condition(&mut cx_b, |list, _| list.available_channels().is_some()) .await; @@ -1856,19 +1881,20 @@ mod tests { cx_a.foreground().forbid_parking(); let mut server = TestServer::start().await; - let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; + let client_a = server.create_client(&mut cx_a, "user_a").await; let db = &server.app_state.db; let org_id = db.create_org("Test Org", "test-org").await.unwrap(); let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap(); - db.add_org_member(org_id, current_user_id(&user_store_a, &cx_a), false) + db.add_org_member(org_id, client_a.current_user_id(&cx_a), false) .await .unwrap(); - db.add_channel_member(channel_id, current_user_id(&user_store_a, &cx_a), false) + db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false) .await .unwrap(); - let channels_a = cx_a.add_model(|cx| ChannelList::new(user_store_a, client_a, cx)); + let channels_a = cx_a + .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx)); channels_a .condition(&mut cx_a, |list, _| list.available_channels().is_some()) .await; @@ -1916,31 +1942,31 @@ mod tests { // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; - let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; + let client_a = server.create_client(&mut cx_a, "user_a").await; + let client_b = server.create_client(&mut cx_b, "user_b").await; let mut status_b = client_b.status(); // Create an org that includes these 2 users. let db = &server.app_state.db; let org_id = db.create_org("Test Org", "test-org").await.unwrap(); - db.add_org_member(org_id, current_user_id(&user_store_a, &cx_a), false) + db.add_org_member(org_id, client_a.current_user_id(&cx_a), false) .await .unwrap(); - db.add_org_member(org_id, current_user_id(&user_store_b, &cx_b), false) + db.add_org_member(org_id, client_b.current_user_id(&cx_b), false) .await .unwrap(); // Create a channel that includes all the users. let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap(); - db.add_channel_member(channel_id, current_user_id(&user_store_a, &cx_a), false) + db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false) .await .unwrap(); - db.add_channel_member(channel_id, current_user_id(&user_store_b, &cx_b), false) + db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false) .await .unwrap(); db.create_channel_message( channel_id, - current_user_id(&user_store_b, &cx_b), + client_b.current_user_id(&cx_b), "hello A, it's B.", OffsetDateTime::now_utc(), 2, @@ -1948,7 +1974,8 @@ mod tests { .await .unwrap(); - let channels_a = cx_a.add_model(|cx| ChannelList::new(user_store_a, client_a, cx)); + let channels_a = cx_a + .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx)); channels_a .condition(&mut cx_a, |list, _| list.available_channels().is_some()) .await; @@ -1973,7 +2000,8 @@ mod tests { }) .await; - let channels_b = cx_b.add_model(|cx| ChannelList::new(user_store_b.clone(), client_b, cx)); + let channels_b = cx_b + .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx)); channels_b .condition(&mut cx_b, |list, _| list.available_channels().is_some()) .await; @@ -2000,7 +2028,7 @@ mod tests { // Disconnect client B, ensuring we can still access its cached channel data. server.forbid_connections(); - server.disconnect_client(current_user_id(&user_store_b, &cx_b)); + server.disconnect_client(client_b.current_user_id(&cx_b)); while !matches!( status_b.recv().await, Some(client::Status::ReconnectionError { .. }) @@ -2131,9 +2159,9 @@ mod tests { // Connect to a server as 3 clients. let mut server = TestServer::start().await; - let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; - let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; - let (_client_c, user_store_c) = server.create_client(&mut cx_c, "user_c").await; + let client_a = server.create_client(&mut cx_a, "user_a").await; + let client_b = server.create_client(&mut cx_b, "user_b").await; + let client_c = server.create_client(&mut cx_c, "user_c").await; let fs = Arc::new(FakeFs::new()); @@ -2148,7 +2176,7 @@ mod tests { let worktree_a = Worktree::open_local( client_a.clone(), - user_store_a.clone(), + client_a.user_store.clone(), "/a".as_ref(), fs.clone(), lang_registry.clone(), @@ -2157,17 +2185,20 @@ mod tests { .await .unwrap(); - user_store_a + client_a + .user_store .condition(&cx_a, |user_store, _| { contacts(user_store) == vec![("user_a", vec![("a", vec![])])] }) .await; - user_store_b + client_b + .user_store .condition(&cx_b, |user_store, _| { contacts(user_store) == vec![("user_a", vec![("a", vec![])])] }) .await; - user_store_c + client_c + .user_store .condition(&cx_c, |user_store, _| { contacts(user_store) == vec![("user_a", vec![("a", vec![])])] }) @@ -2182,36 +2213,42 @@ mod tests { client_b.clone(), worktree_id, lang_registry.clone(), - user_store_b.clone(), + client_b.user_store.clone(), &mut cx_b.to_async(), ) .await .unwrap(); - user_store_a + client_a + .user_store .condition(&cx_a, |user_store, _| { contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])] }) .await; - user_store_b + client_b + .user_store .condition(&cx_b, |user_store, _| { contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])] }) .await; - user_store_c + client_c + .user_store .condition(&cx_c, |user_store, _| { contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])] }) .await; cx_a.update(move |_| drop(worktree_a)); - user_store_a + client_a + .user_store .condition(&cx_a, |user_store, _| contacts(user_store) == vec![]) .await; - user_store_b + client_b + .user_store .condition(&cx_b, |user_store, _| contacts(user_store) == vec![]) .await; - user_store_c + client_c + .user_store .condition(&cx_c, |user_store, _| contacts(user_store) == vec![]) .await; @@ -2264,17 +2301,15 @@ mod tests { } } - async fn create_client( - &mut self, - cx: &mut TestAppContext, - name: &str, - ) -> (Arc, ModelHandle) { + async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient { let user_id = self.app_state.db.create_user(name, false).await.unwrap(); let client_name = name.to_string(); let mut client = Client::new(); let server = self.server.clone(); let connection_killers = self.connection_killers.clone(); let forbid_connections = self.forbid_connections.clone(); + let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16); + Arc::get_mut(&mut client) .unwrap() .override_authenticate(move |cx| { @@ -2294,6 +2329,7 @@ mod tests { let connection_killers = connection_killers.clone(); let forbid_connections = forbid_connections.clone(); let client_name = client_name.clone(); + let connection_id_tx = connection_id_tx.clone(); cx.spawn(move |cx| async move { if forbid_connections.load(SeqCst) { Err(EstablishConnectionError::other(anyhow!( @@ -2303,7 +2339,12 @@ mod tests { let (client_conn, server_conn, kill_conn) = Connection::in_memory(); connection_killers.lock().insert(user_id, kill_conn); cx.background() - .spawn(server.handle_connection(server_conn, client_name, user_id)) + .spawn(server.handle_connection( + server_conn, + client_name, + user_id, + Some(connection_id_tx), + )) .detach(); Ok(client_conn) } @@ -2316,12 +2357,17 @@ mod tests { .await .unwrap(); + let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0); let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx)); let mut authed_user = user_store.read_with(cx, |user_store, _| user_store.watch_current_user()); while authed_user.recv().await.unwrap().is_none() {} - (client, user_store) + TestClient { + client, + peer_id, + user_store, + } } fn disconnect_client(&self, user_id: UserId) { @@ -2377,10 +2423,27 @@ mod tests { } } - fn current_user_id(user_store: &ModelHandle, cx: &TestAppContext) -> UserId { - UserId::from_proto( - user_store.read_with(cx, |user_store, _| user_store.current_user().unwrap().id), - ) + struct TestClient { + client: Arc, + pub peer_id: PeerId, + pub user_store: ModelHandle, + } + + impl Deref for TestClient { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.client + } + } + + impl TestClient { + pub fn current_user_id(&self, cx: &TestAppContext) -> UserId { + UserId::from_proto( + self.user_store + .read_with(cx, |user_store, _| user_store.current_user().unwrap().id), + ) + } } fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {