diff --git a/Cargo.lock b/Cargo.lock index c012316eaa..483b56d999 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6960,6 +6960,7 @@ dependencies = [ name = "picker" version = "0.1.0" dependencies = [ + "anyhow", "ctor", "editor", "env_logger", diff --git a/crates/collab/src/tests/following_tests.rs b/crates/collab/src/tests/following_tests.rs index 57e5388045..0dff43e41b 100644 --- a/crates/collab/src/tests/following_tests.rs +++ b/crates/collab/src/tests/following_tests.rs @@ -373,8 +373,10 @@ async fn test_basic_following( editor_a1.update(cx_a, |editor, cx| { editor.change_selections(None, cx, |s| s.select_ranges([1..1, 2..2])); }); + executor.advance_clock(workspace::item::LEADER_UPDATE_THROTTLE); executor.run_until_parked(); cx_b.background_executor.run_until_parked(); + editor_b1.update(cx_b, |editor, cx| { assert_eq!(editor.selections.ranges(cx), &[1..1, 2..2]); }); @@ -387,6 +389,7 @@ async fn test_basic_following( editor.change_selections(None, cx, |s| s.select_ranges([3..3])); editor.set_scroll_position(point(0., 100.), cx); }); + executor.advance_clock(workspace::item::LEADER_UPDATE_THROTTLE); executor.run_until_parked(); editor_b1.update(cx_b, |editor, cx| { assert_eq!(editor.selections.ranges(cx), &[3..3]); @@ -1598,6 +1601,8 @@ async fn test_following_stops_on_unshare(cx_a: &mut TestAppContext, cx_b: &mut T editor_a.update(cx_a, |editor, cx| { editor.change_selections(None, cx, |s| s.select_ranges([1..1])) }); + cx_a.executor() + .advance_clock(workspace::item::LEADER_UPDATE_THROTTLE); cx_a.run_until_parked(); editor_b.update(cx_b, |editor, cx| { assert_eq!(editor.selections.ranges(cx), vec![1..1]) @@ -1616,6 +1621,8 @@ async fn test_following_stops_on_unshare(cx_a: &mut TestAppContext, cx_b: &mut T editor_a.update(cx_a, |editor, cx| { editor.change_selections(None, cx, |s| s.select_ranges([2..2])) }); + cx_a.executor() + .advance_clock(workspace::item::LEADER_UPDATE_THROTTLE); cx_a.run_until_parked(); editor_b.update(cx_b, |editor, cx| { assert_eq!(editor.selections.ranges(cx), vec![1..1]) @@ -1720,6 +1727,7 @@ async fn test_following_into_excluded_file( // When client B starts following client A, currently visible file is replicated workspace_b.update(cx_b, |workspace, cx| workspace.follow(peer_id_a, cx)); + executor.advance_clock(workspace::item::LEADER_UPDATE_THROTTLE); executor.run_until_parked(); let editor_for_excluded_b = workspace_b.update(cx_b, |workspace, cx| { @@ -1741,6 +1749,7 @@ async fn test_following_into_excluded_file( editor_for_excluded_a.update(cx_a, |editor, cx| { editor.select_right(&Default::default(), cx); }); + executor.advance_clock(workspace::item::LEADER_UPDATE_THROTTLE); executor.run_until_parked(); // Changes from B to the excluded file are replicated in A's editor diff --git a/crates/gpui/src/test.rs b/crates/gpui/src/test.rs index eab11e7b4c..3f267542f8 100644 --- a/crates/gpui/src/test.rs +++ b/crates/gpui/src/test.rs @@ -46,10 +46,10 @@ pub fn run_test( let starting_seed = env::var("SEED") .map(|seed| seed.parse().expect("invalid SEED variable")) .unwrap_or(0); - let is_randomized = num_iterations > 1; if let Ok(iterations) = env::var("ITERATIONS") { num_iterations = iterations.parse().expect("invalid ITERATIONS variable"); } + let is_randomized = num_iterations > 1; for seed in starting_seed..starting_seed + num_iterations { let mut retry = 0; diff --git a/crates/picker/Cargo.toml b/crates/picker/Cargo.toml index 79a8ed70d8..435e4459e9 100644 --- a/crates/picker/Cargo.toml +++ b/crates/picker/Cargo.toml @@ -13,6 +13,7 @@ path = "src/picker.rs" doctest = false [dependencies] +anyhow.workspace = true editor.workspace = true gpui.workspace = true menu.workspace = true diff --git a/crates/picker/src/picker.rs b/crates/picker/src/picker.rs index 11c2458ee1..6ff9f62e0f 100644 --- a/crates/picker/src/picker.rs +++ b/crates/picker/src/picker.rs @@ -1,3 +1,4 @@ +use anyhow::Result; use editor::Editor; use gpui::{ div, list, prelude::*, uniform_list, AnyElement, AppContext, ClickEvent, DismissEvent, @@ -15,11 +16,16 @@ enum ElementContainer { UniformList(UniformListScrollHandle), } +struct PendingUpdateMatches { + delegate_update_matches: Option>, + _task: Task>, +} + pub struct Picker { pub delegate: D, element_container: ElementContainer, editor: View, - pending_update_matches: Option>, + pending_update_matches: Option, confirm_on_update: Option, width: Option, max_height: Option, @@ -281,15 +287,32 @@ impl Picker { } pub fn update_matches(&mut self, query: String, cx: &mut ViewContext) { - let update = self.delegate.update_matches(query, cx); + let delegate_pending_update_matches = self.delegate.update_matches(query, cx); + self.matches_updated(cx); - self.pending_update_matches = Some(cx.spawn(|this, mut cx| async move { - update.await; - this.update(&mut cx, |this, cx| { - this.matches_updated(cx); - }) - .ok(); - })); + // This struct ensures that we can synchronously drop the task returned by the + // delegate's `update_matches` method and the task that the picker is spawning. + // If we simply capture the delegate's task into the picker's task, when the picker's + // task gets synchronously dropped, the delegate's task would keep running until + // the picker's task has a chance of being scheduled, because dropping a task happens + // asynchronously. + self.pending_update_matches = Some(PendingUpdateMatches { + delegate_update_matches: Some(delegate_pending_update_matches), + _task: cx.spawn(|this, mut cx| async move { + let delegate_pending_update_matches = this.update(&mut cx, |this, _| { + this.pending_update_matches + .as_mut() + .unwrap() + .delegate_update_matches + .take() + .unwrap() + })?; + delegate_pending_update_matches.await; + this.update(&mut cx, |this, cx| { + this.matches_updated(cx); + }) + }), + }); } fn matches_updated(&mut self, cx: &mut ViewContext) { diff --git a/crates/workspace/src/item.rs b/crates/workspace/src/item.rs index 25d9f5ed89..07ca2c06a7 100644 --- a/crates/workspace/src/item.rs +++ b/crates/workspace/src/item.rs @@ -11,6 +11,7 @@ use client::{ proto::{self, PeerId}, Client, }; +use futures::{channel::mpsc, StreamExt}; use gpui::{ AnyElement, AnyView, AppContext, Entity, EntityId, EventEmitter, FocusHandle, FocusableView, HighlightStyle, Model, Pixels, Point, SharedString, Task, View, ViewContext, WeakView, @@ -27,14 +28,13 @@ use std::{ ops::Range, path::PathBuf, rc::Rc, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, + sync::Arc, time::Duration, }; use theme::Theme; +pub const LEADER_UPDATE_THROTTLE: Duration = Duration::from_millis(200); + #[derive(Deserialize)] pub struct ItemSettings { pub git_status: bool, @@ -415,7 +415,7 @@ impl ItemHandle for View { followed_item.is_project_item(cx), proto::update_followers::Variant::CreateView(proto::View { id: followed_item - .remote_id(&workspace.app_state.client, cx) + .remote_id(&workspace.client(), cx) .map(|id| id.to_proto()), variant: Some(message), leader_id: workspace.leader_for_pane(&pane), @@ -431,8 +431,46 @@ impl ItemHandle for View { .is_none() { let mut pending_autosave = DelayedDebouncedEditAction::new(); + let (pending_update_tx, mut pending_update_rx) = mpsc::unbounded(); let pending_update = Rc::new(RefCell::new(None)); - let pending_update_scheduled = Arc::new(AtomicBool::new(false)); + + let mut send_follower_updates = None; + if let Some(item) = self.to_followable_item_handle(cx) { + let is_project_item = item.is_project_item(cx); + let item = item.downgrade(); + + send_follower_updates = Some(cx.spawn({ + let pending_update = pending_update.clone(); + |workspace, mut cx| async move { + while let Some(mut leader_id) = pending_update_rx.next().await { + while let Ok(Some(id)) = pending_update_rx.try_next() { + leader_id = id; + } + + workspace.update(&mut cx, |workspace, cx| { + let item = item.upgrade().expect( + "item to be alive, otherwise task would have been dropped", + ); + workspace.update_followers( + is_project_item, + proto::update_followers::Variant::UpdateView( + proto::UpdateView { + id: item + .remote_id(workspace.client(), cx) + .map(|id| id.to_proto()), + variant: pending_update.borrow_mut().take(), + leader_id, + }, + ), + cx, + ); + })?; + cx.background_executor().timer(LEADER_UPDATE_THROTTLE).await; + } + anyhow::Ok(()) + } + })); + } let mut event_subscription = Some(cx.subscribe(self, move |workspace, item, event, cx| { @@ -448,9 +486,7 @@ impl ItemHandle for View { }; if let Some(item) = item.to_followable_item_handle(cx) { - let is_project_item = item.is_project_item(cx); let leader_id = workspace.leader_for_pane(&pane); - let follow_event = item.to_follow_event(event); if leader_id.is_some() && matches!(follow_event, Some(FollowEvent::Unfollow)) @@ -458,35 +494,13 @@ impl ItemHandle for View { workspace.unfollow(&pane, cx); } - if item.focus_handle(cx).contains_focused(cx) - && item.add_event_to_update_proto( + if item.focus_handle(cx).contains_focused(cx) { + item.add_event_to_update_proto( event, &mut pending_update.borrow_mut(), cx, - ) - && !pending_update_scheduled.load(Ordering::SeqCst) - { - pending_update_scheduled.store(true, Ordering::SeqCst); - cx.defer({ - let pending_update = pending_update.clone(); - let pending_update_scheduled = pending_update_scheduled.clone(); - move |this, cx| { - pending_update_scheduled.store(false, Ordering::SeqCst); - this.update_followers( - is_project_item, - proto::update_followers::Variant::UpdateView( - proto::UpdateView { - id: item - .remote_id(&this.app_state.client, cx) - .map(|id| id.to_proto()), - variant: pending_update.borrow_mut().take(), - leader_id, - }, - ), - cx, - ); - } - }); + ); + pending_update_tx.unbounded_send(leader_id).ok(); } } @@ -535,6 +549,7 @@ impl ItemHandle for View { cx.observe_release(self, move |workspace, _, _| { workspace.panes_by_item.remove(&item_id); event_subscription.take(); + send_follower_updates.take(); }) .detach(); } @@ -715,6 +730,7 @@ pub trait FollowableItem: Item { pub trait FollowableItemHandle: ItemHandle { fn remote_id(&self, client: &Arc, cx: &WindowContext) -> Option; + fn downgrade(&self) -> Box; fn set_leader_peer_id(&self, leader_peer_id: Option, cx: &mut WindowContext); fn to_state_proto(&self, cx: &WindowContext) -> Option; fn add_event_to_update_proto( @@ -743,6 +759,10 @@ impl FollowableItemHandle for View { }) } + fn downgrade(&self) -> Box { + Box::new(self.downgrade()) + } + fn set_leader_peer_id(&self, leader_peer_id: Option, cx: &mut WindowContext) { self.update(cx, |this, cx| this.set_leader_peer_id(leader_peer_id, cx)) } @@ -782,6 +802,16 @@ impl FollowableItemHandle for View { } } +pub trait WeakFollowableItemHandle: Send + Sync { + fn upgrade(&self) -> Option>; +} + +impl WeakFollowableItemHandle for WeakView { + fn upgrade(&self) -> Option> { + Some(Box::new(self.upgrade()?)) + } +} + #[cfg(any(test, feature = "test-support"))] pub mod test { use super::{Item, ItemEvent}; diff --git a/crates/workspace/src/workspace.rs b/crates/workspace/src/workspace.rs index b4cb71c45a..50631ba736 100644 --- a/crates/workspace/src/workspace.rs +++ b/crates/workspace/src/workspace.rs @@ -1108,7 +1108,7 @@ impl Workspace { ) } - pub fn client(&self) -> &Client { + pub fn client(&self) -> &Arc { &self.app_state.client }