From fe46b89500eb4c8128d415bc728175439d1e080b Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Mon, 14 Feb 2022 10:51:12 -0800 Subject: [PATCH] Remove logic for preserving RPC message order between peers * On the server, spawn a separate task for each incoming message * In the peer, eliminate the barrier that was used to enforce ordering of responses with respect to other incoming messages Co-Authored-By: Antonio Scandurra --- crates/rpc/src/peer.rs | 17 +++++----------- crates/server/src/rpc.rs | 43 +++++++++++++++++++++++++++++++--------- 2 files changed, 39 insertions(+), 21 deletions(-) diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 0a614e0bed..082bff1634 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -5,7 +5,7 @@ use futures::stream::BoxStream; use futures::{FutureExt as _, StreamExt}; use parking_lot::{Mutex, RwLock}; use postage::{ - barrier, mpsc, + mpsc, prelude::{Sink as _, Stream as _}, }; use smol_timeout::TimeoutExt as _; @@ -91,8 +91,7 @@ pub struct Peer { pub struct ConnectionState { outgoing_tx: futures::channel::mpsc::UnboundedSender, next_message_id: Arc, - response_channels: - Arc>>>>, + response_channels: Arc>>>>, } const WRITE_TIMEOUT: Duration = Duration::from_secs(10); @@ -178,18 +177,12 @@ impl Peer { if let Some(responding_to) = incoming.responding_to { let channel = response_channels.lock().as_mut()?.remove(&responding_to); if let Some(mut tx) = channel { - let mut requester_resumed = barrier::channel(); - if let Err(error) = tx.send((incoming, requester_resumed.0)).await { + if let Err(error) = tx.send(incoming).await { log::debug!( "received RPC but request future was dropped {:?}", - error.0 .0 + error.0 ); } - // Drop response channel before awaiting on the barrier. This allows the - // barrier to get dropped even if the request's future is dropped before it - // has a chance to observe the response. - drop(tx); - requester_resumed.1.recv().await; } else { log::warn!("received RPC response to unknown request {}", responding_to); } @@ -260,7 +253,7 @@ impl Peer { }); async move { send?; - let (response, _barrier) = rx + let response = rx .recv() .await .ok_or_else(|| anyhow!("connection was closed"))?; diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index c1d36ef3c6..34ee52b099 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -41,6 +41,12 @@ pub struct Server { notifications: Option>, } +pub trait Executor { + fn spawn_detached>(&self, future: F); +} + +pub struct RealExecutor; + const MESSAGE_COUNT_PER_PAGE: usize = 100; const MAX_MESSAGE_LEN: usize = 1024; @@ -144,12 +150,13 @@ impl Server { }) } - pub fn handle_connection( + pub fn handle_connection( self: &Arc, connection: Connection, addr: String, user_id: UserId, mut send_connection_id: Option>, + executor: E, ) -> impl Future { let mut this = self.clone(); async move { @@ -183,12 +190,14 @@ impl Server { let type_name = message.payload_type_name(); log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name); if let Some(handler) = this.handlers.get(&message.payload_type_id()) { - if let Err(err) = (handler)(this.clone(), message).await { - log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err); - } else { - log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed()); - } - + let handle_message = (handler)(this.clone(), message); + executor.spawn_detached(async move { + if let Err(err) = handle_message.await { + log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err); + } else { + log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed()); + } + }); if let Some(mut notifications) = this.notifications.clone() { let _ = notifications.send(()).await; } @@ -966,6 +975,12 @@ impl Server { } } +impl Executor for RealExecutor { + fn spawn_detached>(&self, future: F) { + task::spawn(future); + } +} + fn broadcast( sender_id: ConnectionId, receiver_ids: Vec, @@ -1032,6 +1047,7 @@ pub fn add_routes(app: &mut tide::Server>, rpc: &Arc) { addr, user_id, None, + RealExecutor, ) .await; } @@ -1778,10 +1794,12 @@ mod tests { let buffer_b = cx_b .background() .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))); - task::yield_now().await; // Edit the buffer as client A while client B is still opening it. - buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx)); + cx_b.background().simulate_random_delay().await; + buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx)); + cx_b.background().simulate_random_delay().await; + buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx)); let text = buffer_a.read_with(&cx_a, |buf, _| buf.text()); let buffer_b = buffer_b.await.unwrap(); @@ -3598,6 +3616,7 @@ mod tests { client_name, user_id, Some(connection_id_tx), + cx.background(), )) .detach(); Ok(client_conn) @@ -3701,6 +3720,12 @@ mod tests { } } + impl Executor for Arc { + fn spawn_detached>(&self, future: F) { + self.spawn(future).detach(); + } + } + fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> { channel .messages()