From 09bb3ddeb88a250f743502849d4f18bd9b423d97 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 30 Jun 2022 14:06:41 +0200 Subject: [PATCH] Split worktree updates and only send 256 entries at a time --- crates/collab/src/rpc.rs | 2 + crates/collab/src/rpc/store.rs | 3 + crates/project/src/project.rs | 1 + crates/project/src/worktree.rs | 150 ++++++++++++++++++--------------- crates/rpc/proto/zed.proto | 2 + 5 files changed, 88 insertions(+), 70 deletions(-) diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 79c1e53a0b..e5a1f4dd1a 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -804,6 +804,7 @@ impl Server { .collect(), visible: worktree.visible, scan_id: shared_worktree.scan_id, + is_complete: worktree.is_complete, }) }) .collect::>(); @@ -963,6 +964,7 @@ impl Server { &request.payload.removed_entries, &request.payload.updated_entries, request.payload.scan_id, + request.payload.is_last_update, )?; (connection_ids, metadata_changed, extension_counts.clone()) }; diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index d1eb4a3be6..1d36e971e2 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -62,6 +62,7 @@ pub struct Worktree { #[serde(skip)] pub diagnostic_summaries: BTreeMap, pub scan_id: u64, + pub is_complete: bool, } #[derive(Default)] @@ -615,6 +616,7 @@ impl Store { removed_entries: &[u64], updated_entries: &[proto::Entry], scan_id: u64, + is_last_update: bool, ) -> Result<(Vec, bool, HashMap)> { let project = self.write_project(project_id, connection_id)?; let connection_ids = project.connection_ids(); @@ -657,6 +659,7 @@ impl Store { } worktree.scan_id = scan_id; + worktree.is_complete = is_last_update; Ok(( connection_ids, metadata_changed, diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 8ce07a6abd..97b8b3a525 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -4447,6 +4447,7 @@ impl Project { diagnostic_summaries: Default::default(), visible: worktree.visible, scan_id: 0, + is_complete: false, }; let (worktree, load_task) = Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx); diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 8af81f6d1a..5d105df446 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -83,7 +83,7 @@ pub struct RemoteWorktree { project_id: u64, client: Arc, updates_tx: Option>, - last_scan_id_rx: watch::Receiver, + snapshot_updated_rx: watch::Receiver<()>, replica_id: ReplicaId, diagnostic_summaries: TreeMap, visible: bool, @@ -97,6 +97,7 @@ pub struct Snapshot { entries_by_path: SumTree, entries_by_id: SumTree, scan_id: usize, + is_complete: bool, } #[derive(Clone)] @@ -191,12 +192,12 @@ impl Worktree { entries_by_path: Default::default(), entries_by_id: Default::default(), scan_id: worktree.scan_id as usize, + is_complete: worktree.is_complete, }; let (updates_tx, mut updates_rx) = mpsc::unbounded(); let background_snapshot = Arc::new(Mutex::new(snapshot.clone())); let (mut snapshot_updated_tx, mut snapshot_updated_rx) = watch::channel(); - let (mut last_scan_id_tx, last_scan_id_rx) = watch::channel_with(worktree.scan_id as usize); let worktree_handle = cx.add_model(|_: &mut ModelContext| { Worktree::Remote(RemoteWorktree { project_id: project_remote_id, @@ -204,7 +205,7 @@ impl Worktree { snapshot: snapshot.clone(), background_snapshot: background_snapshot.clone(), updates_tx: Some(updates_tx), - last_scan_id_rx, + snapshot_updated_rx: snapshot_updated_rx.clone(), client: client.clone(), diagnostic_summaries: TreeMap::from_ordered_entries( worktree.diagnostic_summaries.into_iter().map(|summary| { @@ -279,11 +280,7 @@ impl Worktree { async move { while let Some(_) = snapshot_updated_rx.recv().await { if let Some(this) = this.upgrade(&cx) { - this.update(&mut cx, |this, cx| { - this.poll_snapshot(cx); - let this = this.as_remote_mut().unwrap(); - *last_scan_id_tx.borrow_mut() = this.snapshot.scan_id; - }); + this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); } else { break; } @@ -450,6 +447,7 @@ impl LocalWorktree { entries_by_path: Default::default(), entries_by_id: Default::default(), scan_id: 0, + is_complete: true, }, }; if let Some(metadata) = metadata { @@ -910,22 +908,20 @@ impl LocalWorktree { async move { let mut prev_snapshot = match snapshots_to_send_rx.recv().await { Ok(snapshot) => { - if let Err(error) = rpc - .request(proto::UpdateWorktree { - project_id, - worktree_id, - root_name: snapshot.root_name().to_string(), - updated_entries: snapshot - .entries_by_path - .iter() - .filter(|e| !e.is_ignored) - .map(Into::into) - .collect(), - removed_entries: Default::default(), - scan_id: snapshot.scan_id as u64, - }) - .await - { + let update = proto::UpdateWorktree { + project_id, + worktree_id, + root_name: snapshot.root_name().to_string(), + updated_entries: snapshot + .entries_by_path + .iter() + .map(Into::into) + .collect(), + removed_entries: Default::default(), + scan_id: snapshot.scan_id as u64, + is_last_update: true, + }; + if let Err(error) = send_worktree_update(&rpc, update).await { let _ = share_tx.send(Err(error)); return Err(anyhow!("failed to send initial update worktree")); } else { @@ -947,48 +943,16 @@ impl LocalWorktree { })?; } - // Stream ignored entries in chunks. - { - let mut ignored_entries = prev_snapshot - .entries_by_path - .iter() - .filter(|e| e.is_ignored); - let mut ignored_entries_to_send = Vec::new(); - loop { - #[cfg(any(test, feature = "test-support"))] - const CHUNK_SIZE: usize = 2; - #[cfg(not(any(test, feature = "test-support")))] - const CHUNK_SIZE: usize = 256; - - let entry = ignored_entries.next(); - if ignored_entries_to_send.len() >= CHUNK_SIZE || entry.is_none() { - rpc.request(proto::UpdateWorktree { - project_id, - worktree_id, - root_name: prev_snapshot.root_name().to_string(), - updated_entries: mem::take(&mut ignored_entries_to_send), - removed_entries: Default::default(), - scan_id: prev_snapshot.scan_id as u64, - }) - .await?; - } - - if let Some(entry) = entry { - ignored_entries_to_send.push(entry.into()); - } else { - break; - } - } - } - while let Ok(mut snapshot) = snapshots_to_send_rx.recv().await { while let Ok(newer_snapshot) = snapshots_to_send_rx.try_recv() { snapshot = newer_snapshot; } - let message = - snapshot.build_update(&prev_snapshot, project_id, worktree_id, true); - rpc.request(message).await?; + send_worktree_update( + &rpc, + snapshot.build_update(&prev_snapshot, project_id, worktree_id, true), + ) + .await?; prev_snapshot = snapshot; } @@ -1063,15 +1027,25 @@ impl RemoteWorktree { Ok(()) } - fn wait_for_snapshot(&self, scan_id: usize) -> impl Future { - let mut rx = self.last_scan_id_rx.clone(); - async move { - while let Some(applied_scan_id) = rx.next().await { - if applied_scan_id >= scan_id { - return; + fn wait_for_snapshot( + &self, + scan_id: usize, + cx: &mut ModelContext, + ) -> Task> { + let mut rx = self.snapshot_updated_rx.clone(); + cx.spawn_weak(|worktree, cx| async move { + while rx.recv().await.is_some() { + let snapshot = worktree + .upgrade(&cx)? + .read_with(&cx, |worktree, _| worktree.snapshot()); + if snapshot.scan_id > scan_id + || (snapshot.scan_id == scan_id && snapshot.is_complete) + { + break; } } - } + None + }) } pub fn update_diagnostic_summary( @@ -1098,7 +1072,7 @@ impl RemoteWorktree { scan_id: usize, cx: &mut ModelContext, ) -> Task> { - let wait_for_snapshot = self.wait_for_snapshot(scan_id); + let wait_for_snapshot = self.wait_for_snapshot(scan_id, cx); cx.spawn(|this, mut cx| async move { wait_for_snapshot.await; this.update(&mut cx, |worktree, _| { @@ -1117,7 +1091,7 @@ impl RemoteWorktree { scan_id: usize, cx: &mut ModelContext, ) -> Task> { - let wait_for_snapshot = self.wait_for_snapshot(scan_id); + let wait_for_snapshot = self.wait_for_snapshot(scan_id, cx); cx.spawn(|this, mut cx| async move { wait_for_snapshot.await; this.update(&mut cx, |worktree, _| { @@ -1210,6 +1184,7 @@ impl Snapshot { self.entries_by_path.edit(entries_by_path_edits, &()); self.entries_by_id.edit(entries_by_id_edits, &()); self.scan_id = update.scan_id as usize; + self.is_complete = update.is_last_update; Ok(()) } @@ -1352,6 +1327,7 @@ impl LocalSnapshot { .collect(), visible, scan_id: self.scan_id as u64, + is_complete: true, } } @@ -1418,6 +1394,7 @@ impl LocalSnapshot { updated_entries, removed_entries, scan_id: self.scan_id as u64, + is_last_update: true, } } @@ -2732,6 +2709,38 @@ impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry { } } +async fn send_worktree_update( + client: &Arc, + mut update: proto::UpdateWorktree, +) -> Result<()> { + #[cfg(any(test, feature = "test-support"))] + const MAX_CHUNK_SIZE: usize = 2; + #[cfg(not(any(test, feature = "test-support")))] + const MAX_CHUNK_SIZE: usize = 256; + + loop { + let chunk_size = cmp::min(update.updated_entries.len(), MAX_CHUNK_SIZE); + let updated_entries = update.updated_entries.drain(..chunk_size).collect(); + let is_last_update = update.updated_entries.is_empty(); + client + .request(proto::UpdateWorktree { + project_id: update.project_id, + worktree_id: update.worktree_id, + root_name: update.root_name.clone(), + updated_entries, + removed_entries: mem::take(&mut update.removed_entries), + scan_id: update.scan_id, + is_last_update, + }) + .await?; + if is_last_update { + break; + } + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; @@ -2941,6 +2950,7 @@ mod tests { root_name: Default::default(), root_char_bag: Default::default(), scan_id: 0, + is_complete: true, }, }; initial_snapshot.insert_entry( diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 69ccae1704..8291b8ac98 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -195,6 +195,7 @@ message UpdateWorktree { repeated Entry updated_entries = 4; repeated uint64 removed_entries = 5; uint64 scan_id = 6; + bool is_last_update = 7; } message CreateProjectEntry { @@ -772,6 +773,7 @@ message Worktree { repeated DiagnosticSummary diagnostic_summaries = 4; bool visible = 5; uint64 scan_id = 6; + bool is_complete = 7; } message File {