From 30225678c0e431360ba8c4342adf7c4c0be72dc0 Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Wed, 12 Jan 2022 11:19:17 -0700 Subject: [PATCH] Test ordering of responses with respect to uni-directional messages Co-Authored-By: Max Brunsfeld Co-Authored-By: Antonio Scandurra --- Cargo.lock | 1 + crates/rpc/Cargo.toml | 1 + crates/rpc/src/peer.rs | 444 +++++++++++++++++++++++++---------------- 3 files changed, 279 insertions(+), 167 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dda7116de2..8c3174d68d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3837,6 +3837,7 @@ dependencies = [ "async-tungstenite", "base64 0.13.0", "futures", + "gpui", "log", "parking_lot", "postage", diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index f16d7f39c2..4be612eec7 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -30,5 +30,6 @@ zstd = "0.9" prost-build = "0.8" [dev-dependencies] +gpui = { path = "../gpui", features = ["test-support"] } smol = "1.2.5" tempdir = "0.3.7" diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 30d754e97d..ce96801733 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -342,201 +342,311 @@ mod tests { use super::*; use crate::TypedEnvelope; use async_tungstenite::tungstenite::Message as WebSocketMessage; + use gpui::TestAppContext; - #[test] - fn test_request_response() { - smol::block_on(async move { - // create 2 clients connected to 1 server - let server = Peer::new(); - let client1 = Peer::new(); - let client2 = Peer::new(); + #[gpui::test(iterations = 10)] + async fn test_request_response(cx: TestAppContext) { + let executor = cx.foreground(); - let (client1_to_server_conn, server_to_client_1_conn, _) = Connection::in_memory(); - let (client1_conn_id, io_task1, client1_incoming) = - client1.add_connection(client1_to_server_conn).await; - let (_, io_task2, server_incoming1) = - server.add_connection(server_to_client_1_conn).await; + // create 2 clients connected to 1 server + let server = Peer::new(); + let client1 = Peer::new(); + let client2 = Peer::new(); - let (client2_to_server_conn, server_to_client_2_conn, _) = Connection::in_memory(); - let (client2_conn_id, io_task3, client2_incoming) = - client2.add_connection(client2_to_server_conn).await; - let (_, io_task4, server_incoming2) = - server.add_connection(server_to_client_2_conn).await; + let (client1_to_server_conn, server_to_client_1_conn, _) = Connection::in_memory(); + let (client1_conn_id, io_task1, client1_incoming) = + client1.add_connection(client1_to_server_conn).await; + let (_, io_task2, server_incoming1) = server.add_connection(server_to_client_1_conn).await; - smol::spawn(io_task1).detach(); - smol::spawn(io_task2).detach(); - smol::spawn(io_task3).detach(); - smol::spawn(io_task4).detach(); - smol::spawn(handle_messages(server_incoming1, server.clone())).detach(); - smol::spawn(handle_messages(client1_incoming, client1.clone())).detach(); - smol::spawn(handle_messages(server_incoming2, server.clone())).detach(); - smol::spawn(handle_messages(client2_incoming, client2.clone())).detach(); + let (client2_to_server_conn, server_to_client_2_conn, _) = Connection::in_memory(); + let (client2_conn_id, io_task3, client2_incoming) = + client2.add_connection(client2_to_server_conn).await; + let (_, io_task4, server_incoming2) = server.add_connection(server_to_client_2_conn).await; - assert_eq!( - client1 - .request(client1_conn_id, proto::Ping {},) - .await - .unwrap(), - proto::Ack {} - ); + executor.spawn(io_task1).detach(); + executor.spawn(io_task2).detach(); + executor.spawn(io_task3).detach(); + executor.spawn(io_task4).detach(); + executor + .spawn(handle_messages(server_incoming1, server.clone())) + .detach(); + executor + .spawn(handle_messages(client1_incoming, client1.clone())) + .detach(); + executor + .spawn(handle_messages(server_incoming2, server.clone())) + .detach(); + executor + .spawn(handle_messages(client2_incoming, client2.clone())) + .detach(); - assert_eq!( - client2 - .request(client2_conn_id, proto::Ping {},) - .await - .unwrap(), - proto::Ack {} - ); + assert_eq!( + client1 + .request(client1_conn_id, proto::Ping {},) + .await + .unwrap(), + proto::Ack {} + ); - assert_eq!( - client1 - .request( - client1_conn_id, - proto::OpenBuffer { - project_id: 0, - worktree_id: 1, - path: "path/one".to_string(), - }, - ) - .await - .unwrap(), - proto::OpenBufferResponse { - buffer: Some(proto::Buffer { - id: 101, - visible_text: "path/one content".to_string(), - ..Default::default() - }), - } - ); + assert_eq!( + client2 + .request(client2_conn_id, proto::Ping {},) + .await + .unwrap(), + proto::Ack {} + ); - assert_eq!( - client2 - .request( - client2_conn_id, - proto::OpenBuffer { - project_id: 0, - worktree_id: 2, - path: "path/two".to_string(), - }, - ) - .await - .unwrap(), - proto::OpenBufferResponse { - buffer: Some(proto::Buffer { - id: 102, - visible_text: "path/two content".to_string(), - ..Default::default() - }), - } - ); - - client1.disconnect(client1_conn_id); - client2.disconnect(client1_conn_id); - - async fn handle_messages( - mut messages: BoxStream<'static, Box>, - peer: Arc, - ) -> Result<()> { - while let Some(envelope) = messages.next().await { - let envelope = envelope.into_any(); - if let Some(envelope) = envelope.downcast_ref::>() { - let receipt = envelope.receipt(); - peer.respond(receipt, proto::Ack {}).await? - } else if let Some(envelope) = - envelope.downcast_ref::>() - { - let message = &envelope.payload; - let receipt = envelope.receipt(); - let response = match message.path.as_str() { - "path/one" => { - assert_eq!(message.worktree_id, 1); - proto::OpenBufferResponse { - buffer: Some(proto::Buffer { - id: 101, - visible_text: "path/one content".to_string(), - ..Default::default() - }), - } - } - "path/two" => { - assert_eq!(message.worktree_id, 2); - proto::OpenBufferResponse { - buffer: Some(proto::Buffer { - id: 102, - visible_text: "path/two content".to_string(), - ..Default::default() - }), - } - } - _ => { - panic!("unexpected path {}", message.path); - } - }; - - peer.respond(receipt, response).await? - } else { - panic!("unknown message type"); - } - } - - Ok(()) + assert_eq!( + client1 + .request( + client1_conn_id, + proto::OpenBuffer { + project_id: 0, + worktree_id: 1, + path: "path/one".to_string(), + }, + ) + .await + .unwrap(), + proto::OpenBufferResponse { + buffer: Some(proto::Buffer { + id: 101, + visible_text: "path/one content".to_string(), + ..Default::default() + }), } - }); + ); + + assert_eq!( + client2 + .request( + client2_conn_id, + proto::OpenBuffer { + project_id: 0, + worktree_id: 2, + path: "path/two".to_string(), + }, + ) + .await + .unwrap(), + proto::OpenBufferResponse { + buffer: Some(proto::Buffer { + id: 102, + visible_text: "path/two content".to_string(), + ..Default::default() + }), + } + ); + + client1.disconnect(client1_conn_id); + client2.disconnect(client1_conn_id); + + async fn handle_messages( + mut messages: BoxStream<'static, Box>, + peer: Arc, + ) -> Result<()> { + while let Some(envelope) = messages.next().await { + let envelope = envelope.into_any(); + if let Some(envelope) = envelope.downcast_ref::>() { + let receipt = envelope.receipt(); + peer.respond(receipt, proto::Ack {}).await? + } else if let Some(envelope) = + envelope.downcast_ref::>() + { + let message = &envelope.payload; + let receipt = envelope.receipt(); + let response = match message.path.as_str() { + "path/one" => { + assert_eq!(message.worktree_id, 1); + proto::OpenBufferResponse { + buffer: Some(proto::Buffer { + id: 101, + visible_text: "path/one content".to_string(), + ..Default::default() + }), + } + } + "path/two" => { + assert_eq!(message.worktree_id, 2); + proto::OpenBufferResponse { + buffer: Some(proto::Buffer { + id: 102, + visible_text: "path/two content".to_string(), + ..Default::default() + }), + } + } + _ => { + panic!("unexpected path {}", message.path); + } + }; + + peer.respond(receipt, response).await? + } else { + panic!("unknown message type"); + } + } + + Ok(()) + } } - #[test] - fn test_disconnect() { - smol::block_on(async move { - let (client_conn, mut server_conn, _) = Connection::in_memory(); + #[gpui::test(iterations = 10)] + async fn test_order_of_response_and_incoming(cx: TestAppContext) { + let executor = cx.foreground(); + let server = Peer::new(); + let client = Peer::new(); - let client = Peer::new(); - let (connection_id, io_handler, mut incoming) = - client.add_connection(client_conn).await; + let (client_to_server_conn, server_to_client_conn, _) = Connection::in_memory(); + let (client_to_server_conn_id, io_task1, mut client_incoming) = + client.add_connection(client_to_server_conn).await; + let (server_to_client_conn_id, io_task2, mut server_incoming) = + server.add_connection(server_to_client_conn).await; - let (mut io_ended_tx, mut io_ended_rx) = postage::barrier::channel(); - smol::spawn(async move { + executor.spawn(io_task1).detach(); + executor.spawn(io_task2).detach(); + + executor + .spawn(async move { + let request = server_incoming + .next() + .await + .unwrap() + .into_any() + .downcast::>() + .unwrap(); + + server + .send( + server_to_client_conn_id, + proto::Error { + message: "message 1".to_string(), + }, + ) + .await + .unwrap(); + server + .send( + server_to_client_conn_id, + proto::Error { + message: "message 2".to_string(), + }, + ) + .await + .unwrap(); + server + .respond(request.receipt(), proto::Ack {}) + .await + .unwrap(); + + // Prevent the connection from being dropped + server_incoming.next().await; + }) + .detach(); + + let events = Arc::new(Mutex::new(Vec::new())); + + let response = client.request(client_to_server_conn_id, proto::Ping {}); + let response_task = executor.spawn({ + let events = events.clone(); + async move { + response.await.unwrap(); + events.lock().push("response".to_string()); + } + }); + + executor + .spawn({ + let events = events.clone(); + async move { + let incoming1 = client_incoming + .next() + .await + .unwrap() + .into_any() + .downcast::>() + .unwrap(); + events.lock().push(incoming1.payload.message); + let incoming2 = client_incoming + .next() + .await + .unwrap() + .into_any() + .downcast::>() + .unwrap(); + events.lock().push(incoming2.payload.message); + + // Prevent the connection from being dropped + client_incoming.next().await; + } + }) + .detach(); + + response_task.await; + assert_eq!( + &*events.lock(), + &[ + "message 1".to_string(), + "message 2".to_string(), + "response".to_string() + ] + ); + } + + #[gpui::test(iterations = 10)] + async fn test_disconnect(cx: TestAppContext) { + let executor = cx.foreground(); + + let (client_conn, mut server_conn, _) = Connection::in_memory(); + + let client = Peer::new(); + let (connection_id, io_handler, mut incoming) = client.add_connection(client_conn).await; + + let (mut io_ended_tx, mut io_ended_rx) = postage::barrier::channel(); + executor + .spawn(async move { io_handler.await.ok(); io_ended_tx.send(()).await.unwrap(); }) .detach(); - let (mut messages_ended_tx, mut messages_ended_rx) = postage::barrier::channel(); - smol::spawn(async move { + let (mut messages_ended_tx, mut messages_ended_rx) = postage::barrier::channel(); + executor + .spawn(async move { incoming.next().await; messages_ended_tx.send(()).await.unwrap(); }) .detach(); - client.disconnect(connection_id); + client.disconnect(connection_id); - io_ended_rx.recv().await; - messages_ended_rx.recv().await; - assert!(server_conn - .send(WebSocketMessage::Binary(vec![])) - .await - .is_err()); - }); + io_ended_rx.recv().await; + messages_ended_rx.recv().await; + assert!(server_conn + .send(WebSocketMessage::Binary(vec![])) + .await + .is_err()); } - #[test] - fn test_io_error() { - smol::block_on(async move { - let (client_conn, mut server_conn, _) = Connection::in_memory(); + #[gpui::test(iterations = 10)] + async fn test_io_error(cx: TestAppContext) { + let executor = cx.foreground(); + let (client_conn, mut server_conn, _) = Connection::in_memory(); - let client = Peer::new(); - let (connection_id, io_handler, mut incoming) = - client.add_connection(client_conn).await; - smol::spawn(io_handler).detach(); - smol::spawn(async move { incoming.next().await }).detach(); + let client = Peer::new(); + let (connection_id, io_handler, mut incoming) = client.add_connection(client_conn).await; + executor.spawn(io_handler).detach(); + executor + .spawn(async move { incoming.next().await }) + .detach(); - let response = smol::spawn(client.request(connection_id, proto::Ping {})); - let _request = server_conn.rx.next().await.unwrap().unwrap(); + let response = executor.spawn(client.request(connection_id, proto::Ping {})); + let _request = server_conn.rx.next().await.unwrap().unwrap(); - drop(server_conn); - assert_eq!( - response.await.unwrap_err().to_string(), - "connection was closed" - ); - }); + drop(server_conn); + assert_eq!( + response.await.unwrap_err().to_string(), + "connection was closed" + ); } }