diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 2a6cb1aefe..e22cd7cba9 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -11,11 +11,12 @@ use async_tungstenite::tungstenite::{ error::Error as WebsocketError, http::{Request, StatusCode}, }; +use futures::StreamExt; use gpui::{action, AsyncAppContext, Entity, ModelContext, MutableAppContext, Task}; use http::HttpClient; use lazy_static::lazy_static; use parking_lot::RwLock; -use postage::{prelude::Stream, watch}; +use postage::watch; use rand::prelude::*; use rpc::proto::{AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage}; use std::{ @@ -436,7 +437,7 @@ impl Client { let mut cx = cx.clone(); let this = self.clone(); async move { - while let Some(message) = incoming.recv().await { + while let Some(message) = incoming.next().await { let mut state = this.state.write(); let payload_type_id = message.payload_type_id(); let entity_id = if let Some(extract_entity_id) = @@ -777,23 +778,23 @@ mod tests { let server = FakeServer::for_client(user_id, &mut client, &cx).await; let mut status = client.status(); assert!(matches!( - status.recv().await, + status.next().await, Some(Status::Connected { .. }) )); assert_eq!(server.auth_count(), 1); server.forbid_connections(); server.disconnect(); - while !matches!(status.recv().await, Some(Status::ReconnectionError { .. })) {} + while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {} server.allow_connections(); cx.foreground().advance_clock(Duration::from_secs(10)); - while !matches!(status.recv().await, Some(Status::Connected { .. })) {} + while !matches!(status.next().await, Some(Status::Connected { .. })) {} assert_eq!(server.auth_count(), 1); // Client reused the cached credentials when reconnecting server.forbid_connections(); server.disconnect(); - while !matches!(status.recv().await, Some(Status::ReconnectionError { .. })) {} + while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {} // Clear cached credentials after authentication fails server.roll_access_token(); @@ -801,7 +802,7 @@ mod tests { cx.foreground().advance_clock(Duration::from_secs(10)); assert_eq!(server.auth_count(), 1); cx.foreground().advance_clock(Duration::from_secs(10)); - while !matches!(status.recv().await, Some(Status::Connected { .. })) {} + while !matches!(status.next().await, Some(Status::Connected { .. })) {} assert_eq!(server.auth_count(), 2); // Client re-authenticated due to an invalid token } @@ -861,8 +862,8 @@ mod tests { server.send(proto::UnshareProject { project_id: 1 }).await; server.send(proto::UnshareProject { project_id: 2 }).await; - done_rx1.recv().await.unwrap(); - done_rx2.recv().await.unwrap(); + done_rx1.next().await.unwrap(); + done_rx2.next().await.unwrap(); } #[gpui::test] @@ -890,7 +891,7 @@ mod tests { }) }); server.send(proto::Ping {}).await; - done_rx2.recv().await.unwrap(); + done_rx2.next().await.unwrap(); } #[gpui::test] @@ -914,7 +915,7 @@ mod tests { )); }); server.send(proto::Ping {}).await; - done_rx.recv().await.unwrap(); + done_rx.next().await.unwrap(); } struct Model { diff --git a/crates/client/src/test.rs b/crates/client/src/test.rs index 6339d025f1..1630a454b7 100644 --- a/crates/client/src/test.rs +++ b/crates/client/src/test.rs @@ -1,10 +1,9 @@ use super::Client; use super::*; use crate::http::{HttpClient, Request, Response, ServerResponse}; -use futures::{future::BoxFuture, Future}; +use futures::{future::BoxFuture, stream::BoxStream, Future, StreamExt}; use gpui::{ModelHandle, TestAppContext}; use parking_lot::Mutex; -use postage::{mpsc, prelude::Stream}; use rpc::{proto, ConnectionId, Peer, Receipt, TypedEnvelope}; use std::fmt; use std::sync::atomic::Ordering::SeqCst; @@ -15,7 +14,7 @@ use std::sync::{ pub struct FakeServer { peer: Arc, - incoming: Mutex>>>, + incoming: Mutex>>>, connection_id: Mutex>, forbid_connections: AtomicBool, auth_count: AtomicUsize, @@ -129,7 +128,7 @@ impl FakeServer { .lock() .as_mut() .expect("not connected") - .recv() + .next() .await .ok_or_else(|| anyhow!("other half hung up"))?; let type_name = message.payload_type_name(); diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 3d924b3310..a9ee86268d 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -600,7 +600,6 @@ impl Worktree { // We spawn here in order to enqueue the sending of `Ack` *after* transmission of edits // associated with formatting. cx.spawn(|_| async move { - dbg!("responding"); match format { Ok(()) => rpc.respond(receipt, proto::Ack {}).await?, Err(error) => { @@ -923,7 +922,6 @@ impl Worktree { )), } { cx.spawn(|worktree, mut cx| async move { - dbg!(&operation); if let Err(error) = rpc .request(proto::UpdateBuffer { project_id, diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 848ae44402..2f1ac2a249 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -1,7 +1,8 @@ use super::proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, RequestMessage}; use super::Connection; use anyhow::{anyhow, Context, Result}; -use futures::FutureExt as _; +use futures::stream::BoxStream; +use futures::{FutureExt as _, StreamExt}; use parking_lot::{Mutex, RwLock}; use postage::{ mpsc, @@ -109,7 +110,7 @@ impl Peer { ) -> ( ConnectionId, impl Future> + Send, - mpsc::Receiver>, + BoxStream<'static, Box>, ) { let connection_id = ConnectionId(self.next_connection_id.fetch_add(1, SeqCst)); let (mut incoming_tx, incoming_rx) = mpsc::channel(64); @@ -132,23 +133,9 @@ impl Peer { futures::select_biased! { incoming = read_message => match incoming { Ok(incoming) => { - if let Some(responding_to) = incoming.responding_to { - let channel = response_channels.lock().as_mut().unwrap().remove(&responding_to); - if let Some(mut tx) = channel { - tx.send(incoming).await.ok(); - } else { - log::warn!("received RPC response to unknown request {}", responding_to); - } - } else { - if let Some(envelope) = proto::build_typed_envelope(connection_id, incoming) { - if incoming_tx.send(envelope).await.is_err() { - break 'outer Ok(()) - } - } else { - log::error!("unable to construct a typed envelope"); - } + if incoming_tx.send(incoming).await.is_err() { + break 'outer Ok(()); } - break; } Err(error) => { @@ -174,11 +161,38 @@ impl Peer { result }; + let response_channels = connection_state.response_channels.clone(); self.connections .write() .insert(connection_id, connection_state); - (connection_id, handle_io, incoming_rx) + let incoming_rx = incoming_rx.filter_map(move |incoming| { + let response_channels = response_channels.clone(); + async move { + if let Some(responding_to) = incoming.responding_to { + let channel = response_channels + .lock() + .as_mut() + .unwrap() + .remove(&responding_to); + if let Some(mut tx) = channel { + tx.send(incoming).await.ok(); + } else { + log::warn!("received RPC response to unknown request {}", responding_to); + } + + None + } else { + if let Some(envelope) = proto::build_typed_envelope(connection_id, incoming) { + Some(envelope) + } else { + log::error!("unable to construct a typed envelope"); + None + } + } + } + }); + (connection_id, handle_io, incoming_rx.boxed()) } pub fn disconnect(&self, connection_id: ConnectionId) { @@ -332,7 +346,6 @@ mod tests { use super::*; use crate::TypedEnvelope; use async_tungstenite::tungstenite::Message as WebSocketMessage; - use futures::StreamExt as _; #[test] fn test_request_response() { @@ -421,7 +434,7 @@ mod tests { client2.disconnect(client1_conn_id); async fn handle_messages( - mut messages: mpsc::Receiver>, + mut messages: BoxStream<'static, Box>, peer: Arc, ) -> Result<()> { while let Some(envelope) = messages.next().await { diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 8248dfa103..40a3f956bb 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -9,9 +9,9 @@ use anyhow::anyhow; use async_std::task; use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream}; use collections::{HashMap, HashSet}; -use futures::{future::BoxFuture, FutureExt}; +use futures::{future::BoxFuture, FutureExt, StreamExt}; use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; -use postage::{mpsc, prelude::Sink as _, prelude::Stream as _}; +use postage::{mpsc, prelude::Sink as _}; use rpc::{ proto::{self, AnyTypedEnvelope, EnvelopedMessage}, Connection, ConnectionId, Peer, TypedEnvelope, @@ -133,7 +133,7 @@ impl Server { let handle_io = handle_io.fuse(); futures::pin_mut!(handle_io); loop { - let next_message = incoming_rx.recv().fuse(); + let next_message = incoming_rx.next().fuse(); futures::pin_mut!(next_message); futures::select_biased! { message = next_message => { @@ -2026,7 +2026,7 @@ mod tests { }); } - #[gpui::test(iterations = 1, seed = 2)] + #[gpui::test] async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { cx_a.foreground().forbid_parking(); let mut lang_registry = Arc::new(LanguageRegistry::new()); @@ -2425,7 +2425,7 @@ mod tests { server.forbid_connections(); server.disconnect_client(client_b.current_user_id(&cx_b)); while !matches!( - status_b.recv().await, + status_b.next().await, Some(client::Status::ReconnectionError { .. }) ) {} @@ -2769,11 +2769,11 @@ mod tests { .await .unwrap(); - let peer_id = PeerId(connection_id_rx.recv().await.unwrap().0); + let peer_id = PeerId(connection_id_rx.next().await.unwrap().0); let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx)); let mut authed_user = user_store.read_with(cx, |user_store, _| user_store.watch_current_user()); - while authed_user.recv().await.unwrap().is_none() {} + while authed_user.next().await.unwrap().is_none() {} TestClient { client, @@ -2822,7 +2822,7 @@ mod tests { async_std::future::timeout(Duration::from_millis(500), async { while !(predicate)(&*self.server.store.read()) { self.foreground.start_waiting(); - self.notifications.recv().await; + self.notifications.next().await; self.foreground.finish_waiting(); } })