From e7e45be6e141ac50db80cf66d1445afb8163d681 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 17 Nov 2022 16:57:32 +0100 Subject: [PATCH] Revert "Wait for previous `UpdateFollowers` message ack before sending new ones" This reverts commit fe93263ad450a1460ccb5edfde1ca868d132e8c6. --- crates/collab/src/integration_tests.rs | 82 +++++++++----------------- crates/collab/src/rpc.rs | 4 +- crates/rpc/src/proto.rs | 1 - crates/workspace/src/workspace.rs | 76 +++++++++--------------- 4 files changed, 57 insertions(+), 106 deletions(-) diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index 5118510024..d730b5d4e7 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -4672,7 +4672,7 @@ async fn test_following( cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, ) { - deterministic.forbid_parking(); + cx_a.foreground().forbid_parking(); cx_a.update(editor::init); cx_b.update(editor::init); @@ -4791,14 +4791,11 @@ async fn test_following( workspace_a.update(cx_a, |workspace, cx| { workspace.activate_item(&editor_a1, cx) }); - deterministic.run_until_parked(); - assert_eq!( - workspace_b.read_with(cx_b, |workspace, cx| workspace - .active_item(cx) - .unwrap() - .id()), - editor_b1.id() - ); + workspace_b + .condition(cx_b, |workspace, cx| { + workspace.active_item(cx).unwrap().id() == editor_b1.id() + }) + .await; // When client A navigates back and forth, client B does so as well. workspace_a @@ -4806,74 +4803,49 @@ async fn test_following( workspace::Pane::go_back(workspace, None, cx) }) .await; - deterministic.run_until_parked(); - assert_eq!( - workspace_b.read_with(cx_b, |workspace, cx| workspace - .active_item(cx) - .unwrap() - .id()), - editor_b2.id() - ); + workspace_b + .condition(cx_b, |workspace, cx| { + workspace.active_item(cx).unwrap().id() == editor_b2.id() + }) + .await; workspace_a .update(cx_a, |workspace, cx| { workspace::Pane::go_forward(workspace, None, cx) }) .await; - workspace_a - .update(cx_a, |workspace, cx| { - workspace::Pane::go_back(workspace, None, cx) + workspace_b + .condition(cx_b, |workspace, cx| { + workspace.active_item(cx).unwrap().id() == editor_b1.id() }) .await; - workspace_a - .update(cx_a, |workspace, cx| { - workspace::Pane::go_forward(workspace, None, cx) - }) - .await; - deterministic.run_until_parked(); - assert_eq!( - workspace_b.read_with(cx_b, |workspace, cx| workspace - .active_item(cx) - .unwrap() - .id()), - editor_b1.id() - ); // Changes to client A's editor are reflected on client B. editor_a1.update(cx_a, |editor, cx| { editor.change_selections(None, cx, |s| s.select_ranges([1..1, 2..2])); }); - deterministic.run_until_parked(); - assert_eq!( - editor_b1.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)), - vec![1..1, 2..2] - ); + editor_b1 + .condition(cx_b, |editor, cx| { + editor.selections.ranges(cx) == vec![1..1, 2..2] + }) + .await; editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx)); - deterministic.run_until_parked(); - assert_eq!( - editor_b1.read_with(cx_b, |editor, cx| editor.text(cx)), - "TWO" - ); + editor_b1 + .condition(cx_b, |editor, cx| editor.text(cx) == "TWO") + .await; editor_a1.update(cx_a, |editor, cx| { editor.change_selections(None, cx, |s| s.select_ranges([3..3])); editor.set_scroll_position(vec2f(0., 100.), cx); }); - deterministic.run_until_parked(); - assert_eq!( - editor_b1.read_with(cx_b, |editor, cx| editor.selections.ranges(cx)), - vec![3..3] - ); + editor_b1 + .condition(cx_b, |editor, cx| { + editor.selections.ranges(cx) == vec![3..3] + }) + .await; // After unfollowing, client B stops receiving updates from client A. - assert_eq!( - workspace_b.read_with(cx_b, |workspace, cx| workspace - .active_item(cx) - .unwrap() - .id()), - editor_b1.id() - ); workspace_b.update(cx_b, |workspace, cx| { workspace.unfollow(&workspace.active_pane().clone(), cx) }); diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 9e0335ef1b..4375056c9a 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -192,7 +192,7 @@ impl Server { .add_request_handler(Server::respond_to_contact_request) .add_request_handler(Server::follow) .add_message_handler(Server::unfollow) - .add_request_handler(Server::update_followers) + .add_message_handler(Server::update_followers) .add_message_handler(Server::update_diff_base) .add_request_handler(Server::get_private_user_info); @@ -1437,7 +1437,6 @@ impl Server { async fn update_followers( self: Arc, request: Message, - response: Response, ) -> Result<()> { let project_id = ProjectId::from_proto(request.payload.project_id); let project_connection_ids = self @@ -1465,7 +1464,6 @@ impl Server { )?; } } - response.send(proto::Ack {})?; Ok(()) } diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 8a59818fa3..50f3c57f2a 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -229,7 +229,6 @@ request_messages!( (Test, Test), (UpdateBuffer, Ack), (UpdateDiagnosticSummary, Ack), - (UpdateFollowers, Ack), (UpdateParticipantLocation, Ack), (UpdateProject, Ack), (UpdateWorktree, Ack), diff --git a/crates/workspace/src/workspace.rs b/crates/workspace/src/workspace.rs index 5f14427fee..2296741ed3 100644 --- a/crates/workspace/src/workspace.rs +++ b/crates/workspace/src/workspace.rs @@ -18,10 +18,7 @@ use collections::{hash_map, HashMap, HashSet}; use dock::{DefaultItemFactory, Dock, ToggleDockButton}; use drag_and_drop::DragAndDrop; use fs::{self, Fs}; -use futures::{ - channel::{mpsc, oneshot}, - FutureExt, StreamExt, -}; +use futures::{channel::oneshot, FutureExt, StreamExt}; use gpui::{ actions, elements::*, @@ -714,13 +711,14 @@ impl ItemHandle for ViewHandle { if let Some(followed_item) = self.to_followable_item_handle(cx) { if let Some(message) = followed_item.to_state_proto(cx) { - workspace.update_followers(proto::update_followers::Variant::CreateView( - proto::View { + workspace.update_followers( + proto::update_followers::Variant::CreateView(proto::View { id: followed_item.id() as u64, variant: Some(message), leader_id: workspace.leader_for_pane(&pane).map(|id| id.0), - }, - )); + }), + cx, + ); } } @@ -764,7 +762,7 @@ impl ItemHandle for ViewHandle { cx.after_window_update({ let pending_update = pending_update.clone(); let pending_update_scheduled = pending_update_scheduled.clone(); - move |this, _| { + move |this, cx| { pending_update_scheduled.store(false, SeqCst); this.update_followers( proto::update_followers::Variant::UpdateView( @@ -774,6 +772,7 @@ impl ItemHandle for ViewHandle { leader_id: leader_id.map(|id| id.0), }, ), + cx, ); } }); @@ -1082,11 +1081,9 @@ pub struct Workspace { leader_state: LeaderState, follower_states_by_leader: FollowerStatesByLeader, last_leaders_by_pane: HashMap, PeerId>, - follower_updates: mpsc::UnboundedSender, window_edited: bool, active_call: Option<(ModelHandle, Vec)>, _observe_current_user: Task<()>, - _update_followers: Task>, } #[derive(Default)] @@ -1169,34 +1166,6 @@ impl Workspace { } }); - let (follower_updates_tx, mut follower_updates_rx) = mpsc::unbounded(); - let _update_followers = cx.spawn_weak(|this, cx| async move { - while let Some(update) = follower_updates_rx.next().await { - let this = this.upgrade(&cx)?; - let update_followers = this.read_with(&cx, |this, cx| { - if let Some(project_id) = this.project.read(cx).remote_id() { - if this.leader_state.followers.is_empty() { - None - } else { - Some(this.client.request(proto::UpdateFollowers { - project_id, - follower_ids: - this.leader_state.followers.iter().map(|f| f.0).collect(), - variant: Some(update), - })) - } - } else { - None - } - }); - - if let Some(update_followers) = update_followers { - update_followers.await.log_err(); - } - } - None - }); - let handle = cx.handle(); let weak_handle = cx.weak_handle(); @@ -1255,12 +1224,10 @@ impl Workspace { project, leader_state: Default::default(), follower_states_by_leader: Default::default(), - follower_updates: follower_updates_tx, last_leaders_by_pane: Default::default(), window_edited: false, active_call, _observe_current_user, - _update_followers, }; this.project_remote_id_changed(this.project.read(cx).remote_id(), cx); cx.defer(|this, cx| this.update_window_title(cx)); @@ -2000,12 +1967,13 @@ impl Workspace { cx.notify(); } - self.update_followers(proto::update_followers::Variant::UpdateActiveView( - proto::UpdateActiveView { + self.update_followers( + proto::update_followers::Variant::UpdateActiveView(proto::UpdateActiveView { id: self.active_item(cx).map(|item| item.id() as u64), leader_id: self.leader_for_pane(&pane).map(|id| id.0), - }, - )); + }), + cx, + ); } fn handle_pane_event( @@ -2626,8 +2594,22 @@ impl Workspace { Ok(()) } - fn update_followers(&self, update: proto::update_followers::Variant) { - let _ = self.follower_updates.unbounded_send(update); + fn update_followers( + &self, + update: proto::update_followers::Variant, + cx: &AppContext, + ) -> Option<()> { + let project_id = self.project.read(cx).remote_id()?; + if !self.leader_state.followers.is_empty() { + self.client + .send(proto::UpdateFollowers { + project_id, + follower_ids: self.leader_state.followers.iter().map(|f| f.0).collect(), + variant: Some(update), + }) + .log_err(); + } + None } pub fn leader_for_pane(&self, pane: &ViewHandle) -> Option {