Throttle the sending of UpdateFollowers messages (#8918)

## Problem

We're trying to figure out why we sometimes see high latency when
collaborating, even though the collab server logs indicate that messages
are not taking long to process.

We think that high volumes of certain types of messages, including
`UpdateFollowers` may cause a lot of messages to queue up, causing
delays before collab sees certain messages.

## Fix

This PR reduces the number of `UpdateFollowers` messages that clients
send to collab when scrolling around or moving the cursor, using a
time-based throttle.

The downside of this change is that scrolling will not be as smooth when
following someone. The advantage is that it will be much easier to keep
up with the stream of updates, since they will be sent much less
frequently.

## Release Notes:

- Fixed slowness that could occur when collaborating due to excessive
messages being sent to support following.

---------

Co-authored-by: Nathan <nathan@zed.dev>
Co-authored-by: Conrad <conrad@zed.dev>
Co-authored-by: Antonio Scandurra <me@as-cii.com>
Co-authored-by: Thorsten <thorsten@zed.dev>
Co-authored-by: Thorsten Ball <mrnugget@gmail.com>
This commit is contained in:
Max Brunsfeld 2024-03-06 05:58:41 -08:00 committed by GitHub
parent c8383e3b18
commit 6036830049
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 109 additions and 45 deletions

1
Cargo.lock generated
View File

@ -6960,6 +6960,7 @@ dependencies = [
name = "picker" name = "picker"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"ctor", "ctor",
"editor", "editor",
"env_logger", "env_logger",

View File

@ -373,8 +373,10 @@ async fn test_basic_following(
editor_a1.update(cx_a, |editor, cx| { editor_a1.update(cx_a, |editor, cx| {
editor.change_selections(None, cx, |s| s.select_ranges([1..1, 2..2])); editor.change_selections(None, cx, |s| s.select_ranges([1..1, 2..2]));
}); });
executor.advance_clock(workspace::item::LEADER_UPDATE_THROTTLE);
executor.run_until_parked(); executor.run_until_parked();
cx_b.background_executor.run_until_parked(); cx_b.background_executor.run_until_parked();
editor_b1.update(cx_b, |editor, cx| { editor_b1.update(cx_b, |editor, cx| {
assert_eq!(editor.selections.ranges(cx), &[1..1, 2..2]); assert_eq!(editor.selections.ranges(cx), &[1..1, 2..2]);
}); });
@ -387,6 +389,7 @@ async fn test_basic_following(
editor.change_selections(None, cx, |s| s.select_ranges([3..3])); editor.change_selections(None, cx, |s| s.select_ranges([3..3]));
editor.set_scroll_position(point(0., 100.), cx); editor.set_scroll_position(point(0., 100.), cx);
}); });
executor.advance_clock(workspace::item::LEADER_UPDATE_THROTTLE);
executor.run_until_parked(); executor.run_until_parked();
editor_b1.update(cx_b, |editor, cx| { editor_b1.update(cx_b, |editor, cx| {
assert_eq!(editor.selections.ranges(cx), &[3..3]); assert_eq!(editor.selections.ranges(cx), &[3..3]);
@ -1598,6 +1601,8 @@ async fn test_following_stops_on_unshare(cx_a: &mut TestAppContext, cx_b: &mut T
editor_a.update(cx_a, |editor, cx| { editor_a.update(cx_a, |editor, cx| {
editor.change_selections(None, cx, |s| s.select_ranges([1..1])) editor.change_selections(None, cx, |s| s.select_ranges([1..1]))
}); });
cx_a.executor()
.advance_clock(workspace::item::LEADER_UPDATE_THROTTLE);
cx_a.run_until_parked(); cx_a.run_until_parked();
editor_b.update(cx_b, |editor, cx| { editor_b.update(cx_b, |editor, cx| {
assert_eq!(editor.selections.ranges(cx), vec![1..1]) assert_eq!(editor.selections.ranges(cx), vec![1..1])
@ -1616,6 +1621,8 @@ async fn test_following_stops_on_unshare(cx_a: &mut TestAppContext, cx_b: &mut T
editor_a.update(cx_a, |editor, cx| { editor_a.update(cx_a, |editor, cx| {
editor.change_selections(None, cx, |s| s.select_ranges([2..2])) editor.change_selections(None, cx, |s| s.select_ranges([2..2]))
}); });
cx_a.executor()
.advance_clock(workspace::item::LEADER_UPDATE_THROTTLE);
cx_a.run_until_parked(); cx_a.run_until_parked();
editor_b.update(cx_b, |editor, cx| { editor_b.update(cx_b, |editor, cx| {
assert_eq!(editor.selections.ranges(cx), vec![1..1]) assert_eq!(editor.selections.ranges(cx), vec![1..1])
@ -1720,6 +1727,7 @@ async fn test_following_into_excluded_file(
// When client B starts following client A, currently visible file is replicated // When client B starts following client A, currently visible file is replicated
workspace_b.update(cx_b, |workspace, cx| workspace.follow(peer_id_a, cx)); workspace_b.update(cx_b, |workspace, cx| workspace.follow(peer_id_a, cx));
executor.advance_clock(workspace::item::LEADER_UPDATE_THROTTLE);
executor.run_until_parked(); executor.run_until_parked();
let editor_for_excluded_b = workspace_b.update(cx_b, |workspace, cx| { let editor_for_excluded_b = workspace_b.update(cx_b, |workspace, cx| {
@ -1741,6 +1749,7 @@ async fn test_following_into_excluded_file(
editor_for_excluded_a.update(cx_a, |editor, cx| { editor_for_excluded_a.update(cx_a, |editor, cx| {
editor.select_right(&Default::default(), cx); editor.select_right(&Default::default(), cx);
}); });
executor.advance_clock(workspace::item::LEADER_UPDATE_THROTTLE);
executor.run_until_parked(); executor.run_until_parked();
// Changes from B to the excluded file are replicated in A's editor // Changes from B to the excluded file are replicated in A's editor

View File

@ -46,10 +46,10 @@ pub fn run_test(
let starting_seed = env::var("SEED") let starting_seed = env::var("SEED")
.map(|seed| seed.parse().expect("invalid SEED variable")) .map(|seed| seed.parse().expect("invalid SEED variable"))
.unwrap_or(0); .unwrap_or(0);
let is_randomized = num_iterations > 1;
if let Ok(iterations) = env::var("ITERATIONS") { if let Ok(iterations) = env::var("ITERATIONS") {
num_iterations = iterations.parse().expect("invalid ITERATIONS variable"); num_iterations = iterations.parse().expect("invalid ITERATIONS variable");
} }
let is_randomized = num_iterations > 1;
for seed in starting_seed..starting_seed + num_iterations { for seed in starting_seed..starting_seed + num_iterations {
let mut retry = 0; let mut retry = 0;

View File

@ -13,6 +13,7 @@ path = "src/picker.rs"
doctest = false doctest = false
[dependencies] [dependencies]
anyhow.workspace = true
editor.workspace = true editor.workspace = true
gpui.workspace = true gpui.workspace = true
menu.workspace = true menu.workspace = true

View File

@ -1,3 +1,4 @@
use anyhow::Result;
use editor::Editor; use editor::Editor;
use gpui::{ use gpui::{
div, list, prelude::*, uniform_list, AnyElement, AppContext, ClickEvent, DismissEvent, div, list, prelude::*, uniform_list, AnyElement, AppContext, ClickEvent, DismissEvent,
@ -15,11 +16,16 @@ enum ElementContainer {
UniformList(UniformListScrollHandle), UniformList(UniformListScrollHandle),
} }
struct PendingUpdateMatches {
delegate_update_matches: Option<Task<()>>,
_task: Task<Result<()>>,
}
pub struct Picker<D: PickerDelegate> { pub struct Picker<D: PickerDelegate> {
pub delegate: D, pub delegate: D,
element_container: ElementContainer, element_container: ElementContainer,
editor: View<Editor>, editor: View<Editor>,
pending_update_matches: Option<Task<()>>, pending_update_matches: Option<PendingUpdateMatches>,
confirm_on_update: Option<bool>, confirm_on_update: Option<bool>,
width: Option<Length>, width: Option<Length>,
max_height: Option<Length>, max_height: Option<Length>,
@ -281,15 +287,32 @@ impl<D: PickerDelegate> Picker<D> {
} }
pub fn update_matches(&mut self, query: String, cx: &mut ViewContext<Self>) { pub fn update_matches(&mut self, query: String, cx: &mut ViewContext<Self>) {
let update = self.delegate.update_matches(query, cx); let delegate_pending_update_matches = self.delegate.update_matches(query, cx);
self.matches_updated(cx); self.matches_updated(cx);
self.pending_update_matches = Some(cx.spawn(|this, mut cx| async move { // This struct ensures that we can synchronously drop the task returned by the
update.await; // delegate's `update_matches` method and the task that the picker is spawning.
// If we simply capture the delegate's task into the picker's task, when the picker's
// task gets synchronously dropped, the delegate's task would keep running until
// the picker's task has a chance of being scheduled, because dropping a task happens
// asynchronously.
self.pending_update_matches = Some(PendingUpdateMatches {
delegate_update_matches: Some(delegate_pending_update_matches),
_task: cx.spawn(|this, mut cx| async move {
let delegate_pending_update_matches = this.update(&mut cx, |this, _| {
this.pending_update_matches
.as_mut()
.unwrap()
.delegate_update_matches
.take()
.unwrap()
})?;
delegate_pending_update_matches.await;
this.update(&mut cx, |this, cx| { this.update(&mut cx, |this, cx| {
this.matches_updated(cx); this.matches_updated(cx);
}) })
.ok(); }),
})); });
} }
fn matches_updated(&mut self, cx: &mut ViewContext<Self>) { fn matches_updated(&mut self, cx: &mut ViewContext<Self>) {

View File

@ -11,6 +11,7 @@ use client::{
proto::{self, PeerId}, proto::{self, PeerId},
Client, Client,
}; };
use futures::{channel::mpsc, StreamExt};
use gpui::{ use gpui::{
AnyElement, AnyView, AppContext, Entity, EntityId, EventEmitter, FocusHandle, FocusableView, AnyElement, AnyView, AppContext, Entity, EntityId, EventEmitter, FocusHandle, FocusableView,
HighlightStyle, Model, Pixels, Point, SharedString, Task, View, ViewContext, WeakView, HighlightStyle, Model, Pixels, Point, SharedString, Task, View, ViewContext, WeakView,
@ -27,14 +28,13 @@ use std::{
ops::Range, ops::Range,
path::PathBuf, path::PathBuf,
rc::Rc, rc::Rc,
sync::{ sync::Arc,
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration, time::Duration,
}; };
use theme::Theme; use theme::Theme;
pub const LEADER_UPDATE_THROTTLE: Duration = Duration::from_millis(200);
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct ItemSettings { pub struct ItemSettings {
pub git_status: bool, pub git_status: bool,
@ -415,7 +415,7 @@ impl<T: Item> ItemHandle for View<T> {
followed_item.is_project_item(cx), followed_item.is_project_item(cx),
proto::update_followers::Variant::CreateView(proto::View { proto::update_followers::Variant::CreateView(proto::View {
id: followed_item id: followed_item
.remote_id(&workspace.app_state.client, cx) .remote_id(&workspace.client(), cx)
.map(|id| id.to_proto()), .map(|id| id.to_proto()),
variant: Some(message), variant: Some(message),
leader_id: workspace.leader_for_pane(&pane), leader_id: workspace.leader_for_pane(&pane),
@ -431,8 +431,46 @@ impl<T: Item> ItemHandle for View<T> {
.is_none() .is_none()
{ {
let mut pending_autosave = DelayedDebouncedEditAction::new(); let mut pending_autosave = DelayedDebouncedEditAction::new();
let (pending_update_tx, mut pending_update_rx) = mpsc::unbounded();
let pending_update = Rc::new(RefCell::new(None)); let pending_update = Rc::new(RefCell::new(None));
let pending_update_scheduled = Arc::new(AtomicBool::new(false));
let mut send_follower_updates = None;
if let Some(item) = self.to_followable_item_handle(cx) {
let is_project_item = item.is_project_item(cx);
let item = item.downgrade();
send_follower_updates = Some(cx.spawn({
let pending_update = pending_update.clone();
|workspace, mut cx| async move {
while let Some(mut leader_id) = pending_update_rx.next().await {
while let Ok(Some(id)) = pending_update_rx.try_next() {
leader_id = id;
}
workspace.update(&mut cx, |workspace, cx| {
let item = item.upgrade().expect(
"item to be alive, otherwise task would have been dropped",
);
workspace.update_followers(
is_project_item,
proto::update_followers::Variant::UpdateView(
proto::UpdateView {
id: item
.remote_id(workspace.client(), cx)
.map(|id| id.to_proto()),
variant: pending_update.borrow_mut().take(),
leader_id,
},
),
cx,
);
})?;
cx.background_executor().timer(LEADER_UPDATE_THROTTLE).await;
}
anyhow::Ok(())
}
}));
}
let mut event_subscription = let mut event_subscription =
Some(cx.subscribe(self, move |workspace, item, event, cx| { Some(cx.subscribe(self, move |workspace, item, event, cx| {
@ -448,9 +486,7 @@ impl<T: Item> ItemHandle for View<T> {
}; };
if let Some(item) = item.to_followable_item_handle(cx) { if let Some(item) = item.to_followable_item_handle(cx) {
let is_project_item = item.is_project_item(cx);
let leader_id = workspace.leader_for_pane(&pane); let leader_id = workspace.leader_for_pane(&pane);
let follow_event = item.to_follow_event(event); let follow_event = item.to_follow_event(event);
if leader_id.is_some() if leader_id.is_some()
&& matches!(follow_event, Some(FollowEvent::Unfollow)) && matches!(follow_event, Some(FollowEvent::Unfollow))
@ -458,35 +494,13 @@ impl<T: Item> ItemHandle for View<T> {
workspace.unfollow(&pane, cx); workspace.unfollow(&pane, cx);
} }
if item.focus_handle(cx).contains_focused(cx) if item.focus_handle(cx).contains_focused(cx) {
&& item.add_event_to_update_proto( item.add_event_to_update_proto(
event, event,
&mut pending_update.borrow_mut(), &mut pending_update.borrow_mut(),
cx, cx,
)
&& !pending_update_scheduled.load(Ordering::SeqCst)
{
pending_update_scheduled.store(true, Ordering::SeqCst);
cx.defer({
let pending_update = pending_update.clone();
let pending_update_scheduled = pending_update_scheduled.clone();
move |this, cx| {
pending_update_scheduled.store(false, Ordering::SeqCst);
this.update_followers(
is_project_item,
proto::update_followers::Variant::UpdateView(
proto::UpdateView {
id: item
.remote_id(&this.app_state.client, cx)
.map(|id| id.to_proto()),
variant: pending_update.borrow_mut().take(),
leader_id,
},
),
cx,
); );
} pending_update_tx.unbounded_send(leader_id).ok();
});
} }
} }
@ -535,6 +549,7 @@ impl<T: Item> ItemHandle for View<T> {
cx.observe_release(self, move |workspace, _, _| { cx.observe_release(self, move |workspace, _, _| {
workspace.panes_by_item.remove(&item_id); workspace.panes_by_item.remove(&item_id);
event_subscription.take(); event_subscription.take();
send_follower_updates.take();
}) })
.detach(); .detach();
} }
@ -715,6 +730,7 @@ pub trait FollowableItem: Item {
pub trait FollowableItemHandle: ItemHandle { pub trait FollowableItemHandle: ItemHandle {
fn remote_id(&self, client: &Arc<Client>, cx: &WindowContext) -> Option<ViewId>; fn remote_id(&self, client: &Arc<Client>, cx: &WindowContext) -> Option<ViewId>;
fn downgrade(&self) -> Box<dyn WeakFollowableItemHandle>;
fn set_leader_peer_id(&self, leader_peer_id: Option<PeerId>, cx: &mut WindowContext); fn set_leader_peer_id(&self, leader_peer_id: Option<PeerId>, cx: &mut WindowContext);
fn to_state_proto(&self, cx: &WindowContext) -> Option<proto::view::Variant>; fn to_state_proto(&self, cx: &WindowContext) -> Option<proto::view::Variant>;
fn add_event_to_update_proto( fn add_event_to_update_proto(
@ -743,6 +759,10 @@ impl<T: FollowableItem> FollowableItemHandle for View<T> {
}) })
} }
fn downgrade(&self) -> Box<dyn WeakFollowableItemHandle> {
Box::new(self.downgrade())
}
fn set_leader_peer_id(&self, leader_peer_id: Option<PeerId>, cx: &mut WindowContext) { fn set_leader_peer_id(&self, leader_peer_id: Option<PeerId>, cx: &mut WindowContext) {
self.update(cx, |this, cx| this.set_leader_peer_id(leader_peer_id, cx)) self.update(cx, |this, cx| this.set_leader_peer_id(leader_peer_id, cx))
} }
@ -782,6 +802,16 @@ impl<T: FollowableItem> FollowableItemHandle for View<T> {
} }
} }
pub trait WeakFollowableItemHandle: Send + Sync {
fn upgrade(&self) -> Option<Box<dyn FollowableItemHandle>>;
}
impl<T: FollowableItem> WeakFollowableItemHandle for WeakView<T> {
fn upgrade(&self) -> Option<Box<dyn FollowableItemHandle>> {
Some(Box::new(self.upgrade()?))
}
}
#[cfg(any(test, feature = "test-support"))] #[cfg(any(test, feature = "test-support"))]
pub mod test { pub mod test {
use super::{Item, ItemEvent}; use super::{Item, ItemEvent};

View File

@ -1108,7 +1108,7 @@ impl Workspace {
) )
} }
pub fn client(&self) -> &Client { pub fn client(&self) -> &Arc<Client> {
&self.app_state.client &self.app_state.client
} }