Introduce a TestClient and associate it with a PeerId

This makes it easier to integration test peer interactions because now we know their PeerIds.
This commit is contained in:
Nathan Sobo 2021-11-27 12:33:25 -07:00
parent b307a7e91d
commit 4bd43e67ef
3 changed files with 156 additions and 96 deletions

View File

@ -63,6 +63,7 @@ pub enum Event {
Closed,
}
#[derive(Debug)]
pub struct Collaborator {
pub user: Arc<User>,
pub peer_id: PeerId,

View File

@ -8,6 +8,7 @@ use postage::{
prelude::{Sink as _, Stream as _},
};
use smol_timeout::TimeoutExt as _;
use std::sync::atomic::Ordering::SeqCst;
use std::{
collections::HashMap,
fmt,
@ -81,12 +82,12 @@ impl<T: RequestMessage> TypedEnvelope<T> {
}
pub struct Peer {
connections: RwLock<HashMap<ConnectionId, ConnectionState>>,
pub connections: RwLock<HashMap<ConnectionId, ConnectionState>>,
next_connection_id: AtomicU32,
}
#[derive(Clone)]
struct ConnectionState {
pub struct ConnectionState {
outgoing_tx: mpsc::Sender<proto::Envelope>,
next_message_id: Arc<AtomicU32>,
response_channels: Arc<Mutex<Option<HashMap<u32, mpsc::Sender<proto::Envelope>>>>>,
@ -110,10 +111,7 @@ impl Peer {
impl Future<Output = anyhow::Result<()>> + Send,
mpsc::Receiver<Box<dyn AnyTypedEnvelope>>,
) {
let connection_id = ConnectionId(
self.next_connection_id
.fetch_add(1, atomic::Ordering::SeqCst),
);
let connection_id = ConnectionId(self.next_connection_id.fetch_add(1, SeqCst));
let (mut incoming_tx, incoming_rx) = mpsc::channel(64);
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(64);
let connection_state = ConnectionState {
@ -219,9 +217,7 @@ impl Peer {
let (tx, mut rx) = mpsc::channel(1);
async move {
let mut connection = this.connection_state(receiver_id).await?;
let message_id = connection
.next_message_id
.fetch_add(1, atomic::Ordering::SeqCst);
let message_id = connection.next_message_id.fetch_add(1, SeqCst);
connection
.response_channels
.lock()

View File

@ -112,11 +112,17 @@ impl Server {
connection: Connection,
addr: String,
user_id: UserId,
mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
) -> impl Future<Output = ()> {
let mut this = self.clone();
async move {
let (connection_id, handle_io, mut incoming_rx) =
this.peer.add_connection(connection).await;
if let Some(send_connection_id) = send_connection_id.as_mut() {
let _ = send_connection_id.send(connection_id).await;
}
this.state_mut().add_connection(connection_id, user_id);
if let Err(err) = this.update_contacts_for_users(&[user_id]).await {
log::error!("error updating contacts for {:?}: {}", user_id, err);
@ -887,6 +893,7 @@ pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
),
addr,
user_id,
None,
)
.await;
}
@ -925,9 +932,11 @@ mod tests {
use gpui::{ModelHandle, TestAppContext};
use parking_lot::Mutex;
use postage::{mpsc, watch};
use rpc::PeerId;
use serde_json::json;
use sqlx::types::time::OffsetDateTime;
use std::{
ops::Deref,
path::Path,
sync::{
atomic::{AtomicBool, Ordering::SeqCst},
@ -960,8 +969,8 @@ mod tests {
// Connect to a server as 2 clients.
let mut server = TestServer::start().await;
let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
cx_a.foreground().forbid_parking();
@ -978,7 +987,7 @@ mod tests {
.await;
let worktree_a = Worktree::open_local(
client_a.clone(),
user_store_a,
client_a.user_store.clone(),
"/a".as_ref(),
fs,
lang_registry.clone(),
@ -999,17 +1008,31 @@ mod tests {
client_b.clone(),
worktree_id,
lang_registry.clone(),
user_store_b,
client_b.user_store.clone(),
&mut cx_b.to_async(),
)
.await
.unwrap();
let replica_id_b = worktree_b.read_with(&cx_b, |tree, _| tree.replica_id());
let replica_id_b = worktree_b.read_with(&cx_b, |tree, _| {
assert_eq!(
tree.collaborators()
.get(&client_a.peer_id)
.unwrap()
.user
.github_login,
"user_a"
);
tree.replica_id()
});
worktree_a
.condition(&cx_a, |tree, _| {
tree.collaborators()
.values()
.any(|collaborator| collaborator.replica_id == replica_id_b)
.get(&client_b.peer_id)
.map_or(false, |collaborator| {
collaborator.replica_id == replica_id_b
&& collaborator.user.github_login == "user_b"
})
})
.await;
@ -1068,12 +1091,12 @@ mod tests {
// Connect to a server as 2 clients.
let mut server = TestServer::start().await;
let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
Arc::get_mut(&mut app_state_a).unwrap().client = client_a;
Arc::get_mut(&mut app_state_a).unwrap().user_store = user_store_a;
Arc::get_mut(&mut app_state_b).unwrap().client = client_b;
Arc::get_mut(&mut app_state_b).unwrap().user_store = user_store_b;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
Arc::get_mut(&mut app_state_a).unwrap().client = client_a.clone();
Arc::get_mut(&mut app_state_a).unwrap().user_store = client_a.user_store.clone();
Arc::get_mut(&mut app_state_b).unwrap().client = client_b.clone();
Arc::get_mut(&mut app_state_b).unwrap().user_store = client_b.user_store.clone();
cx_a.foreground().forbid_parking();
@ -1165,9 +1188,9 @@ mod tests {
// Connect to a server as 3 clients.
let mut server = TestServer::start().await;
let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
let (client_c, user_store_c) = server.create_client(&mut cx_c, "user_c").await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
let client_c = server.create_client(&mut cx_c, "user_c").await;
let fs = Arc::new(FakeFs::new());
@ -1184,7 +1207,7 @@ mod tests {
let worktree_a = Worktree::open_local(
client_a.clone(),
user_store_a,
client_a.user_store.clone(),
"/a".as_ref(),
fs.clone(),
lang_registry.clone(),
@ -1205,7 +1228,7 @@ mod tests {
client_b.clone(),
worktree_id,
lang_registry.clone(),
user_store_b,
client_b.user_store.clone(),
&mut cx_b.to_async(),
)
.await
@ -1214,7 +1237,7 @@ mod tests {
client_c.clone(),
worktree_id,
lang_registry.clone(),
user_store_c,
client_c.user_store.clone(),
&mut cx_c.to_async(),
)
.await
@ -1307,8 +1330,8 @@ mod tests {
// Connect to a server as 2 clients.
let mut server = TestServer::start().await;
let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
// Share a local worktree as client A
let fs = Arc::new(FakeFs::new());
@ -1323,7 +1346,7 @@ mod tests {
let worktree_a = Worktree::open_local(
client_a.clone(),
user_store_a,
client_a.user_store.clone(),
"/dir".as_ref(),
fs,
lang_registry.clone(),
@ -1344,7 +1367,7 @@ mod tests {
client_b.clone(),
worktree_id,
lang_registry.clone(),
user_store_b,
client_b.user_store.clone(),
&mut cx_b.to_async(),
)
.await
@ -1394,8 +1417,8 @@ mod tests {
// Connect to a server as 2 clients.
let mut server = TestServer::start().await;
let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
// Share a local worktree as client A
let fs = Arc::new(FakeFs::new());
@ -1409,7 +1432,7 @@ mod tests {
.await;
let worktree_a = Worktree::open_local(
client_a.clone(),
user_store_a,
client_a.user_store.clone(),
"/dir".as_ref(),
fs,
lang_registry.clone(),
@ -1430,7 +1453,7 @@ mod tests {
client_b.clone(),
worktree_id,
lang_registry.clone(),
user_store_b,
client_b.user_store.clone(),
&mut cx_b.to_async(),
)
.await
@ -1462,8 +1485,8 @@ mod tests {
// Connect to a server as 2 clients.
let mut server = TestServer::start().await;
let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
// Share a local worktree as client A
let fs = Arc::new(FakeFs::new());
@ -1477,7 +1500,7 @@ mod tests {
.await;
let worktree_a = Worktree::open_local(
client_a.clone(),
user_store_a,
client_a.user_store.clone(),
"/dir".as_ref(),
fs,
lang_registry.clone(),
@ -1498,7 +1521,7 @@ mod tests {
client_b.clone(),
worktree_id,
lang_registry.clone(),
user_store_b,
client_b.user_store.clone(),
&mut cx_b.to_async(),
)
.await
@ -1524,8 +1547,8 @@ mod tests {
// Connect to a server as 2 clients.
let mut server = TestServer::start().await;
let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
// Share a local worktree as client A
let fs = Arc::new(FakeFs::new());
@ -1540,7 +1563,7 @@ mod tests {
.await;
let worktree_a = Worktree::open_local(
client_a.clone(),
user_store_a,
client_a.user_store.clone(),
"/a".as_ref(),
fs,
lang_registry.clone(),
@ -1561,7 +1584,7 @@ mod tests {
client_b.clone(),
worktree_id,
lang_registry.clone(),
user_store_b,
client_b.user_store.clone(),
&mut cx_b.to_async(),
)
.await
@ -1600,8 +1623,8 @@ mod tests {
// Connect to a server as 2 clients.
let mut server = TestServer::start().await;
let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
// Share a local worktree as client A
let fs = Arc::new(FakeFs::new());
@ -1616,7 +1639,7 @@ mod tests {
.await;
let worktree_a = Worktree::open_local(
client_a.clone(),
user_store_a,
client_a.user_store.clone(),
"/a".as_ref(),
fs,
lang_registry.clone(),
@ -1671,7 +1694,7 @@ mod tests {
client_b.clone(),
worktree_id,
lang_registry.clone(),
user_store_b,
client_b.user_store.clone(),
&mut cx_b.to_async(),
)
.await
@ -1719,30 +1742,30 @@ mod tests {
// Connect to a server as 2 clients.
let mut server = TestServer::start().await;
let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
// Create an org that includes these 2 users.
let db = &server.app_state.db;
let org_id = db.create_org("Test Org", "test-org").await.unwrap();
db.add_org_member(org_id, current_user_id(&user_store_a, &cx_a), false)
db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
.await
.unwrap();
db.add_org_member(org_id, current_user_id(&user_store_b, &cx_b), false)
db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
.await
.unwrap();
// Create a channel that includes all the users.
let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
db.add_channel_member(channel_id, current_user_id(&user_store_a, &cx_a), false)
db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
.await
.unwrap();
db.add_channel_member(channel_id, current_user_id(&user_store_b, &cx_b), false)
db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
.await
.unwrap();
db.create_channel_message(
channel_id,
current_user_id(&user_store_b, &cx_b),
client_b.current_user_id(&cx_b),
"hello A, it's B.",
OffsetDateTime::now_utc(),
1,
@ -1750,7 +1773,8 @@ mod tests {
.await
.unwrap();
let channels_a = cx_a.add_model(|cx| ChannelList::new(user_store_a, client_a, cx));
let channels_a = cx_a
.add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
channels_a
.condition(&mut cx_a, |list, _| list.available_channels().is_some())
.await;
@ -1774,7 +1798,8 @@ mod tests {
})
.await;
let channels_b = cx_b.add_model(|cx| ChannelList::new(user_store_b, client_b, cx));
let channels_b = cx_b
.add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
channels_b
.condition(&mut cx_b, |list, _| list.available_channels().is_some())
.await;
@ -1856,19 +1881,20 @@ mod tests {
cx_a.foreground().forbid_parking();
let mut server = TestServer::start().await;
let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let db = &server.app_state.db;
let org_id = db.create_org("Test Org", "test-org").await.unwrap();
let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
db.add_org_member(org_id, current_user_id(&user_store_a, &cx_a), false)
db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
.await
.unwrap();
db.add_channel_member(channel_id, current_user_id(&user_store_a, &cx_a), false)
db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
.await
.unwrap();
let channels_a = cx_a.add_model(|cx| ChannelList::new(user_store_a, client_a, cx));
let channels_a = cx_a
.add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
channels_a
.condition(&mut cx_a, |list, _| list.available_channels().is_some())
.await;
@ -1916,31 +1942,31 @@ mod tests {
// Connect to a server as 2 clients.
let mut server = TestServer::start().await;
let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
let mut status_b = client_b.status();
// Create an org that includes these 2 users.
let db = &server.app_state.db;
let org_id = db.create_org("Test Org", "test-org").await.unwrap();
db.add_org_member(org_id, current_user_id(&user_store_a, &cx_a), false)
db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
.await
.unwrap();
db.add_org_member(org_id, current_user_id(&user_store_b, &cx_b), false)
db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
.await
.unwrap();
// Create a channel that includes all the users.
let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
db.add_channel_member(channel_id, current_user_id(&user_store_a, &cx_a), false)
db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
.await
.unwrap();
db.add_channel_member(channel_id, current_user_id(&user_store_b, &cx_b), false)
db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
.await
.unwrap();
db.create_channel_message(
channel_id,
current_user_id(&user_store_b, &cx_b),
client_b.current_user_id(&cx_b),
"hello A, it's B.",
OffsetDateTime::now_utc(),
2,
@ -1948,7 +1974,8 @@ mod tests {
.await
.unwrap();
let channels_a = cx_a.add_model(|cx| ChannelList::new(user_store_a, client_a, cx));
let channels_a = cx_a
.add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
channels_a
.condition(&mut cx_a, |list, _| list.available_channels().is_some())
.await;
@ -1973,7 +2000,8 @@ mod tests {
})
.await;
let channels_b = cx_b.add_model(|cx| ChannelList::new(user_store_b.clone(), client_b, cx));
let channels_b = cx_b
.add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
channels_b
.condition(&mut cx_b, |list, _| list.available_channels().is_some())
.await;
@ -2000,7 +2028,7 @@ mod tests {
// Disconnect client B, ensuring we can still access its cached channel data.
server.forbid_connections();
server.disconnect_client(current_user_id(&user_store_b, &cx_b));
server.disconnect_client(client_b.current_user_id(&cx_b));
while !matches!(
status_b.recv().await,
Some(client::Status::ReconnectionError { .. })
@ -2131,9 +2159,9 @@ mod tests {
// Connect to a server as 3 clients.
let mut server = TestServer::start().await;
let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await;
let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await;
let (_client_c, user_store_c) = server.create_client(&mut cx_c, "user_c").await;
let client_a = server.create_client(&mut cx_a, "user_a").await;
let client_b = server.create_client(&mut cx_b, "user_b").await;
let client_c = server.create_client(&mut cx_c, "user_c").await;
let fs = Arc::new(FakeFs::new());
@ -2148,7 +2176,7 @@ mod tests {
let worktree_a = Worktree::open_local(
client_a.clone(),
user_store_a.clone(),
client_a.user_store.clone(),
"/a".as_ref(),
fs.clone(),
lang_registry.clone(),
@ -2157,17 +2185,20 @@ mod tests {
.await
.unwrap();
user_store_a
client_a
.user_store
.condition(&cx_a, |user_store, _| {
contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
})
.await;
user_store_b
client_b
.user_store
.condition(&cx_b, |user_store, _| {
contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
})
.await;
user_store_c
client_c
.user_store
.condition(&cx_c, |user_store, _| {
contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
})
@ -2182,36 +2213,42 @@ mod tests {
client_b.clone(),
worktree_id,
lang_registry.clone(),
user_store_b.clone(),
client_b.user_store.clone(),
&mut cx_b.to_async(),
)
.await
.unwrap();
user_store_a
client_a
.user_store
.condition(&cx_a, |user_store, _| {
contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
})
.await;
user_store_b
client_b
.user_store
.condition(&cx_b, |user_store, _| {
contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
})
.await;
user_store_c
client_c
.user_store
.condition(&cx_c, |user_store, _| {
contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
})
.await;
cx_a.update(move |_| drop(worktree_a));
user_store_a
client_a
.user_store
.condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
.await;
user_store_b
client_b
.user_store
.condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
.await;
user_store_c
client_c
.user_store
.condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
.await;
@ -2264,17 +2301,15 @@ mod tests {
}
}
async fn create_client(
&mut self,
cx: &mut TestAppContext,
name: &str,
) -> (Arc<Client>, ModelHandle<UserStore>) {
async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
let user_id = self.app_state.db.create_user(name, false).await.unwrap();
let client_name = name.to_string();
let mut client = Client::new();
let server = self.server.clone();
let connection_killers = self.connection_killers.clone();
let forbid_connections = self.forbid_connections.clone();
let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
Arc::get_mut(&mut client)
.unwrap()
.override_authenticate(move |cx| {
@ -2294,6 +2329,7 @@ mod tests {
let connection_killers = connection_killers.clone();
let forbid_connections = forbid_connections.clone();
let client_name = client_name.clone();
let connection_id_tx = connection_id_tx.clone();
cx.spawn(move |cx| async move {
if forbid_connections.load(SeqCst) {
Err(EstablishConnectionError::other(anyhow!(
@ -2303,7 +2339,12 @@ mod tests {
let (client_conn, server_conn, kill_conn) = Connection::in_memory();
connection_killers.lock().insert(user_id, kill_conn);
cx.background()
.spawn(server.handle_connection(server_conn, client_name, user_id))
.spawn(server.handle_connection(
server_conn,
client_name,
user_id,
Some(connection_id_tx),
))
.detach();
Ok(client_conn)
}
@ -2316,12 +2357,17 @@ mod tests {
.await
.unwrap();
let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0);
let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
let mut authed_user =
user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
while authed_user.recv().await.unwrap().is_none() {}
(client, user_store)
TestClient {
client,
peer_id,
user_store,
}
}
fn disconnect_client(&self, user_id: UserId) {
@ -2377,10 +2423,27 @@ mod tests {
}
}
fn current_user_id(user_store: &ModelHandle<UserStore>, cx: &TestAppContext) -> UserId {
UserId::from_proto(
user_store.read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
)
struct TestClient {
client: Arc<Client>,
pub peer_id: PeerId,
pub user_store: ModelHandle<UserStore>,
}
impl Deref for TestClient {
type Target = Arc<Client>;
fn deref(&self) -> &Self::Target {
&self.client
}
}
impl TestClient {
pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
UserId::from_proto(
self.user_store
.read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
)
}
}
fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {