Derive worktree update messages from existing change events

This commit is contained in:
Max Brunsfeld 2023-05-23 20:12:09 -07:00
parent 6628c4df28
commit 02b95ef320
3 changed files with 486 additions and 468 deletions

View File

@ -1459,7 +1459,7 @@ impl Project {
};
cx.foreground().spawn(async move {
pump_loading_buffer_reciever(loading_watch)
wait_for_loading_buffer(loading_watch)
.await
.map_err(|error| anyhow!("{}", error))
})
@ -5057,98 +5057,102 @@ impl Project {
fn update_local_worktree_buffers_git_repos(
&mut self,
worktree_handle: ModelHandle<Worktree>,
repos: &HashMap<Arc<Path>, LocalRepositoryEntry>,
changed_repos: &UpdatedGitRepositoriesSet,
cx: &mut ModelContext<Self>,
) {
debug_assert!(worktree_handle.read(cx).is_local());
// Setup the pending buffers
// Identify the loading buffers whose containing repository that has changed.
let future_buffers = self
.loading_buffers_by_path
.iter()
.filter_map(|(path, receiver)| {
let path = &path.path;
let (work_directory, repo) = repos
.filter_map(|(project_path, receiver)| {
if project_path.worktree_id != worktree_handle.read(cx).id() {
return None;
}
let path = &project_path.path;
changed_repos
.iter()
.find(|(work_directory, _)| path.starts_with(work_directory))?;
let repo_relative_path = path.strip_prefix(work_directory).log_err()?;
.find(|(work_dir, _)| path.starts_with(work_dir))?;
let receiver = receiver.clone();
let repo_ptr = repo.repo_ptr.clone();
let repo_relative_path = repo_relative_path.to_owned();
let path = path.clone();
Some(async move {
pump_loading_buffer_reciever(receiver)
wait_for_loading_buffer(receiver)
.await
.ok()
.map(|buffer| (buffer, repo_relative_path, repo_ptr))
.map(|buffer| (buffer, path))
})
})
.collect::<FuturesUnordered<_>>()
.filter_map(|result| async move {
let (buffer_handle, repo_relative_path, repo_ptr) = result?;
.collect::<FuturesUnordered<_>>();
let lock = repo_ptr.lock();
lock.load_index_text(&repo_relative_path)
.map(|diff_base| (diff_base, buffer_handle))
});
// Identify the current buffers whose containing repository has changed.
let current_buffers = self
.opened_buffers
.values()
.filter_map(|buffer| {
let buffer = buffer.upgrade(cx)?;
let file = File::from_dyn(buffer.read(cx).file())?;
if file.worktree != worktree_handle {
return None;
}
let path = file.path();
changed_repos
.iter()
.find(|(work_dir, _)| path.starts_with(work_dir))?;
Some((buffer, path.clone()))
})
.collect::<Vec<_>>();
let update_diff_base_fn = update_diff_base(self);
cx.spawn(|_, mut cx| async move {
let diff_base_tasks = cx
if future_buffers.len() + current_buffers.len() == 0 {
return;
}
let remote_id = self.remote_id();
let client = self.client.clone();
cx.spawn_weak(move |_, mut cx| async move {
// Wait for all of the buffers to load.
let future_buffers = future_buffers.collect::<Vec<_>>().await;
// Reload the diff base for every buffer whose containing git repository has changed.
let snapshot =
worktree_handle.read_with(&cx, |tree, _| tree.as_local().unwrap().snapshot());
let diff_bases_by_buffer = cx
.background()
.spawn(future_buffers.collect::<Vec<_>>())
.spawn(async move {
future_buffers
.into_iter()
.filter_map(|e| e)
.chain(current_buffers)
.filter_map(|(buffer, path)| {
let (work_directory, repo) =
snapshot.repository_and_work_directory_for_path(&path)?;
let repo = snapshot.get_local_repo(&repo)?;
let relative_path = path.strip_prefix(&work_directory).ok()?;
let base_text = repo.repo_ptr.lock().load_index_text(&relative_path);
Some((buffer, base_text))
})
.collect::<Vec<_>>()
})
.await;
for (diff_base, buffer) in diff_base_tasks.into_iter() {
update_diff_base_fn(Some(diff_base), buffer, &mut cx);
// Assign the new diff bases on all of the buffers.
for (buffer, diff_base) in diff_bases_by_buffer {
let buffer_id = buffer.update(&mut cx, |buffer, cx| {
buffer.set_diff_base(diff_base.clone(), cx);
buffer.remote_id()
});
if let Some(project_id) = remote_id {
client
.send(proto::UpdateDiffBase {
project_id,
buffer_id,
diff_base,
})
.log_err();
}
}
})
.detach();
// And the current buffers
for (_, buffer) in &self.opened_buffers {
if let Some(buffer) = buffer.upgrade(cx) {
let file = match File::from_dyn(buffer.read(cx).file()) {
Some(file) => file,
None => continue,
};
if file.worktree != worktree_handle {
continue;
}
let path = file.path().clone();
let worktree = worktree_handle.read(cx);
let (work_directory, repo) = match repos
.iter()
.find(|(work_directory, _)| path.starts_with(work_directory))
{
Some(repo) => repo.clone(),
None => continue,
};
let relative_repo = match path.strip_prefix(work_directory).log_err() {
Some(relative_repo) => relative_repo.to_owned(),
None => continue,
};
drop(worktree);
let update_diff_base_fn = update_diff_base(self);
let git_ptr = repo.repo_ptr.clone();
let diff_base_task = cx
.background()
.spawn(async move { git_ptr.lock().load_index_text(&relative_repo) });
cx.spawn(|_, mut cx| async move {
let diff_base = diff_base_task.await;
update_diff_base_fn(diff_base, buffer, &mut cx);
})
.detach();
}
}
}
pub fn set_active_path(&mut self, entry: Option<ProjectPath>, cx: &mut ModelContext<Self>) {
@ -7070,7 +7074,7 @@ impl Item for Buffer {
}
}
async fn pump_loading_buffer_reciever(
async fn wait_for_loading_buffer(
mut receiver: postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
) -> Result<ModelHandle<Buffer>, Arc<anyhow::Error>> {
loop {
@ -7083,26 +7087,3 @@ async fn pump_loading_buffer_reciever(
receiver.next().await;
}
}
fn update_diff_base(
project: &Project,
) -> impl Fn(Option<String>, ModelHandle<Buffer>, &mut AsyncAppContext) {
let remote_id = project.remote_id();
let client = project.client().clone();
move |diff_base, buffer, cx| {
let buffer_id = buffer.update(cx, |buffer, cx| {
buffer.set_diff_base(diff_base.clone(), cx);
buffer.remote_id()
});
if let Some(project_id) = remote_id {
client
.send(proto::UpdateDiffBase {
project_id,
buffer_id: buffer_id as u64,
diff_base,
})
.log_err();
}
}
}

View File

@ -2524,29 +2524,21 @@ async fn test_rescan_and_remote_updates(
// Create a remote copy of this worktree.
let tree = project.read_with(cx, |project, cx| project.worktrees(cx).next().unwrap());
let initial_snapshot = tree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot());
let remote = cx.update(|cx| {
Worktree::remote(
1,
1,
proto::WorktreeMetadata {
id: initial_snapshot.id().to_proto(),
root_name: initial_snapshot.root_name().into(),
abs_path: initial_snapshot
.abs_path()
.as_os_str()
.to_string_lossy()
.into(),
visible: true,
},
rpc.clone(),
cx,
)
});
remote.update(cx, |remote, _| {
let update = initial_snapshot.build_initial_update(1);
remote.as_remote_mut().unwrap().update_from_remote(update);
let metadata = tree.read_with(cx, |tree, _| tree.as_local().unwrap().metadata_proto());
let updates = Arc::new(Mutex::new(Vec::new()));
tree.update(cx, |tree, cx| {
let _ = tree.as_local_mut().unwrap().observe_updates(0, cx, {
let updates = updates.clone();
move |update| {
updates.lock().push(update);
async { true }
}
});
});
let remote = cx.update(|cx| Worktree::remote(1, 1, metadata, rpc.clone(), cx));
deterministic.run_until_parked();
cx.read(|cx| {
@ -2612,14 +2604,11 @@ async fn test_rescan_and_remote_updates(
// Update the remote worktree. Check that it becomes consistent with the
// local worktree.
remote.update(cx, |remote, cx| {
let update = tree.read(cx).as_local().unwrap().snapshot().build_update(
&initial_snapshot,
1,
1,
true,
);
remote.as_remote_mut().unwrap().update_from_remote(update);
deterministic.run_until_parked();
remote.update(cx, |remote, _| {
for update in updates.lock().drain(..) {
remote.as_remote_mut().unwrap().update_from_remote(update);
}
});
deterministic.run_until_parked();
remote.read_with(cx, |remote, _| {

View File

@ -17,7 +17,7 @@ use futures::{
},
select_biased,
task::Poll,
Stream, StreamExt,
FutureExt, Stream, StreamExt,
};
use fuzzy::CharBag;
use git::{DOT_GIT, GITIGNORE};
@ -55,7 +55,7 @@ use std::{
time::{Duration, SystemTime},
};
use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap, TreeSet};
use util::{paths::HOME, ResultExt, TakeUntilExt, TryFutureExt};
use util::{paths::HOME, ResultExt, TakeUntilExt};
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash, PartialOrd, Ord)]
pub struct WorktreeId(usize);
@ -363,7 +363,7 @@ enum ScanState {
Started,
Updated {
snapshot: LocalSnapshot,
changes: Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>,
changes: UpdatedEntriesSet,
barrier: Option<barrier::Sender>,
scanning: bool,
},
@ -371,14 +371,15 @@ enum ScanState {
struct ShareState {
project_id: u64,
snapshots_tx: watch::Sender<LocalSnapshot>,
snapshots_tx:
mpsc::UnboundedSender<(LocalSnapshot, UpdatedEntriesSet, UpdatedGitRepositoriesSet)>,
resume_updates: watch::Sender<()>,
_maintain_remote_snapshot: Task<Option<()>>,
}
pub enum Event {
UpdatedEntries(Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>),
UpdatedGitRepositories(HashMap<Arc<Path>, LocalRepositoryEntry>),
UpdatedEntries(UpdatedEntriesSet),
UpdatedGitRepositories(UpdatedGitRepositoriesSet),
}
impl Entity for Worktree {
@ -453,8 +454,7 @@ impl Worktree {
scanning,
} => {
*this.is_scanning.0.borrow_mut() = scanning;
this.set_snapshot(snapshot, cx);
cx.emit(Event::UpdatedEntries(changes));
this.set_snapshot(snapshot, changes, cx);
drop(barrier);
}
}
@ -820,73 +820,109 @@ impl LocalWorktree {
Ok(!old_summary.is_empty() || !new_summary.is_empty())
}
fn set_snapshot(&mut self, new_snapshot: LocalSnapshot, cx: &mut ModelContext<Worktree>) {
let updated_repos =
self.changed_repos(&self.git_repositories, &new_snapshot.git_repositories);
fn set_snapshot(
&mut self,
new_snapshot: LocalSnapshot,
entry_changes: UpdatedEntriesSet,
cx: &mut ModelContext<Worktree>,
) {
let repo_changes = self.changed_repos(&self.snapshot, &new_snapshot);
self.snapshot = new_snapshot;
if let Some(share) = self.share.as_mut() {
*share.snapshots_tx.borrow_mut() = self.snapshot.clone();
share
.snapshots_tx
.unbounded_send((
self.snapshot.clone(),
entry_changes.clone(),
repo_changes.clone(),
))
.ok();
}
if !updated_repos.is_empty() {
cx.emit(Event::UpdatedGitRepositories(updated_repos));
if !entry_changes.is_empty() {
cx.emit(Event::UpdatedEntries(entry_changes));
}
if !repo_changes.is_empty() {
cx.emit(Event::UpdatedGitRepositories(repo_changes));
}
}
fn changed_repos(
&self,
old_repos: &TreeMap<ProjectEntryId, LocalRepositoryEntry>,
new_repos: &TreeMap<ProjectEntryId, LocalRepositoryEntry>,
) -> HashMap<Arc<Path>, LocalRepositoryEntry> {
let mut diff = HashMap::default();
let mut old_repos = old_repos.iter().peekable();
let mut new_repos = new_repos.iter().peekable();
old_snapshot: &LocalSnapshot,
new_snapshot: &LocalSnapshot,
) -> UpdatedGitRepositoriesSet {
let mut changes = Vec::new();
let mut old_repos = old_snapshot.git_repositories.iter().peekable();
let mut new_repos = new_snapshot.git_repositories.iter().peekable();
loop {
match (old_repos.peek(), new_repos.peek()) {
(Some((old_entry_id, old_repo)), Some((new_entry_id, new_repo))) => {
match Ord::cmp(old_entry_id, new_entry_id) {
match (new_repos.peek().map(clone), old_repos.peek().map(clone)) {
(Some((new_entry_id, new_repo)), Some((old_entry_id, old_repo))) => {
match Ord::cmp(&new_entry_id, &old_entry_id) {
Ordering::Less => {
if let Some(entry) = self.entry_for_id(**old_entry_id) {
diff.insert(entry.path.clone(), (*old_repo).clone());
if let Some(entry) = new_snapshot.entry_for_id(new_entry_id) {
changes.push((entry.path.clone(), None));
}
old_repos.next();
new_repos.next();
}
Ordering::Equal => {
if old_repo.git_dir_scan_id != new_repo.git_dir_scan_id {
if let Some(entry) = self.entry_for_id(**new_entry_id) {
diff.insert(entry.path.clone(), (*new_repo).clone());
if new_repo.git_dir_scan_id != old_repo.git_dir_scan_id {
if let Some(entry) = new_snapshot.entry_for_id(new_entry_id) {
changes.push((
entry.path.clone(),
old_snapshot
.repository_entries
.get(&RepositoryWorkDirectory(entry.path.clone()))
.cloned(),
));
}
}
old_repos.next();
new_repos.next();
old_repos.next();
}
Ordering::Greater => {
if let Some(entry) = self.entry_for_id(**new_entry_id) {
diff.insert(entry.path.clone(), (*new_repo).clone());
if let Some(entry) = old_snapshot.entry_for_id(old_entry_id) {
changes.push((
entry.path.clone(),
old_snapshot
.repository_entries
.get(&RepositoryWorkDirectory(entry.path.clone()))
.cloned(),
));
}
new_repos.next();
old_repos.next();
}
}
}
(Some((old_entry_id, old_repo)), None) => {
if let Some(entry) = self.entry_for_id(**old_entry_id) {
diff.insert(entry.path.clone(), (*old_repo).clone());
}
old_repos.next();
}
(None, Some((new_entry_id, new_repo))) => {
if let Some(entry) = self.entry_for_id(**new_entry_id) {
diff.insert(entry.path.clone(), (*new_repo).clone());
(Some((entry_id, _)), None) => {
if let Some(entry) = new_snapshot.entry_for_id(entry_id) {
changes.push((entry.path.clone(), None));
}
new_repos.next();
}
(None, Some((entry_id, _))) => {
if let Some(entry) = old_snapshot.entry_for_id(entry_id) {
changes.push((
entry.path.clone(),
old_snapshot
.repository_entries
.get(&RepositoryWorkDirectory(entry.path.clone()))
.cloned(),
));
}
old_repos.next();
}
(None, None) => break,
}
}
diff
fn clone<T: Clone, U: Clone>(value: &(&T, &U)) -> (T, U) {
(value.0.clone(), value.1.clone())
}
changes.into()
}
pub fn scan_complete(&self) -> impl Future<Output = ()> {
@ -1227,89 +1263,97 @@ impl LocalWorktree {
})
}
pub fn share(&mut self, project_id: u64, cx: &mut ModelContext<Worktree>) -> Task<Result<()>> {
pub fn observe_updates<F, Fut>(
&mut self,
project_id: u64,
cx: &mut ModelContext<Worktree>,
callback: F,
) -> oneshot::Receiver<()>
where
F: 'static + Send + Fn(proto::UpdateWorktree) -> Fut,
Fut: Send + Future<Output = bool>,
{
#[cfg(any(test, feature = "test-support"))]
const MAX_CHUNK_SIZE: usize = 2;
#[cfg(not(any(test, feature = "test-support")))]
const MAX_CHUNK_SIZE: usize = 256;
let (share_tx, share_rx) = oneshot::channel();
if let Some(share) = self.share.as_mut() {
let _ = share_tx.send(());
share_tx.send(()).ok();
*share.resume_updates.borrow_mut() = ();
} else {
let (snapshots_tx, mut snapshots_rx) = watch::channel_with(self.snapshot());
let (resume_updates_tx, mut resume_updates_rx) = watch::channel();
let worktree_id = cx.model_id() as u64;
return share_rx;
}
for (path, summaries) in &self.diagnostic_summaries {
for (&server_id, summary) in summaries {
if let Err(e) = self.client.send(proto::UpdateDiagnosticSummary {
project_id,
worktree_id,
summary: Some(summary.to_proto(server_id, &path)),
}) {
return Task::ready(Err(e));
let (resume_updates_tx, mut resume_updates_rx) = watch::channel::<()>();
let (snapshots_tx, mut snapshots_rx) =
mpsc::unbounded::<(LocalSnapshot, UpdatedEntriesSet, UpdatedGitRepositoriesSet)>();
snapshots_tx
.unbounded_send((self.snapshot(), Arc::from([]), Arc::from([])))
.ok();
let worktree_id = cx.model_id() as u64;
let _maintain_remote_snapshot = cx.background().spawn(async move {
let mut is_first = true;
while let Some((snapshot, entry_changes, repo_changes)) = snapshots_rx.next().await {
let update;
if is_first {
update = snapshot.build_initial_update(project_id, worktree_id);
is_first = false;
} else {
update =
snapshot.build_update(project_id, worktree_id, entry_changes, repo_changes);
}
for update in proto::split_worktree_update(update, MAX_CHUNK_SIZE) {
let _ = resume_updates_rx.try_recv();
loop {
let result = callback(update.clone());
if result.await {
break;
} else {
log::info!("waiting to resume updates");
if resume_updates_rx.next().await.is_none() {
return Some(());
}
}
}
}
}
share_tx.send(()).ok();
Some(())
});
let _maintain_remote_snapshot = cx.background().spawn({
let client = self.client.clone();
async move {
let mut share_tx = Some(share_tx);
let mut prev_snapshot = LocalSnapshot {
ignores_by_parent_abs_path: Default::default(),
git_repositories: Default::default(),
snapshot: Snapshot {
id: WorktreeId(worktree_id as usize),
abs_path: Path::new("").into(),
root_name: Default::default(),
root_char_bag: Default::default(),
entries_by_path: Default::default(),
entries_by_id: Default::default(),
repository_entries: Default::default(),
scan_id: 0,
completed_scan_id: 0,
},
};
while let Some(snapshot) = snapshots_rx.recv().await {
#[cfg(any(test, feature = "test-support"))]
const MAX_CHUNK_SIZE: usize = 2;
#[cfg(not(any(test, feature = "test-support")))]
const MAX_CHUNK_SIZE: usize = 256;
self.share = Some(ShareState {
project_id,
snapshots_tx,
resume_updates: resume_updates_tx,
_maintain_remote_snapshot,
});
share_rx
}
let update =
snapshot.build_update(&prev_snapshot, project_id, worktree_id, true);
for update in proto::split_worktree_update(update, MAX_CHUNK_SIZE) {
let _ = resume_updates_rx.try_recv();
while let Err(error) = client.request(update.clone()).await {
log::error!("failed to send worktree update: {}", error);
log::info!("waiting to resume updates");
if resume_updates_rx.next().await.is_none() {
return Ok(());
}
}
}
pub fn share(&mut self, project_id: u64, cx: &mut ModelContext<Worktree>) -> Task<Result<()>> {
let client = self.client.clone();
if let Some(share_tx) = share_tx.take() {
let _ = share_tx.send(());
}
prev_snapshot = snapshot;
}
Ok::<_, anyhow::Error>(())
for (path, summaries) in &self.diagnostic_summaries {
for (&server_id, summary) in summaries {
if let Err(e) = self.client.send(proto::UpdateDiagnosticSummary {
project_id,
worktree_id: cx.model_id() as u64,
summary: Some(summary.to_proto(server_id, &path)),
}) {
return Task::ready(Err(e));
}
.log_err()
});
self.share = Some(ShareState {
project_id,
snapshots_tx,
resume_updates: resume_updates_tx,
_maintain_remote_snapshot,
});
}
}
let rx = self.observe_updates(project_id, cx, move |update| {
client.request(update).map(|result| result.is_ok())
});
cx.foreground()
.spawn(async move { share_rx.await.map_err(|_| anyhow!("share ended")) })
.spawn(async move { rx.await.map_err(|_| anyhow!("share ended")) })
}
pub fn unshare(&mut self) {
@ -1518,10 +1562,12 @@ impl Snapshot {
pub(crate) fn apply_remote_update(&mut self, mut update: proto::UpdateWorktree) -> Result<()> {
let mut entries_by_path_edits = Vec::new();
let mut entries_by_id_edits = Vec::new();
for entry_id in update.removed_entries {
if let Some(entry) = self.entry_for_id(ProjectEntryId::from_proto(entry_id)) {
let entry_id = ProjectEntryId::from_proto(entry_id);
entries_by_id_edits.push(Edit::Remove(entry_id));
if let Some(entry) = self.entry_for_id(entry_id) {
entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
entries_by_id_edits.push(Edit::Remove(entry.id));
}
}
@ -1530,6 +1576,11 @@ impl Snapshot {
if let Some(PathEntry { path, .. }) = self.entries_by_id.get(&entry.id, &()) {
entries_by_path_edits.push(Edit::Remove(PathKey(path.clone())));
}
if let Some(old_entry) = self.entries_by_path.get(&PathKey(entry.path.clone()), &()) {
if old_entry.id != entry.id {
entries_by_id_edits.push(Edit::Remove(old_entry.id));
}
}
entries_by_id_edits.push(Edit::Insert(PathEntry {
id: entry.id,
path: entry.path.clone(),
@ -1672,20 +1723,19 @@ impl Snapshot {
/// Get the repository whose work directory contains the given path.
pub fn repository_for_path(&self, path: &Path) -> Option<RepositoryEntry> {
let mut max_len = 0;
let mut current_candidate = None;
for (work_directory, repo) in (&self.repository_entries).iter() {
if path.starts_with(&work_directory.0) {
if work_directory.0.as_os_str().len() >= max_len {
current_candidate = Some(repo);
max_len = work_directory.0.as_os_str().len();
} else {
break;
}
}
}
self.repository_and_work_directory_for_path(path)
.map(|e| e.1)
}
current_candidate.cloned()
pub fn repository_and_work_directory_for_path(
&self,
path: &Path,
) -> Option<(RepositoryWorkDirectory, RepositoryEntry)> {
self.repository_entries
.iter()
.filter(|(workdir_path, _)| path.starts_with(workdir_path))
.last()
.map(|(path, repo)| (path.clone(), repo.clone()))
}
/// Given an ordered iterator of entries, returns an iterator of those entries,
@ -1821,116 +1871,50 @@ impl LocalSnapshot {
.find(|(_, repo)| repo.in_dot_git(path))
}
#[cfg(test)]
pub(crate) fn build_initial_update(&self, project_id: u64) -> proto::UpdateWorktree {
let root_name = self.root_name.clone();
proto::UpdateWorktree {
project_id,
worktree_id: self.id().to_proto(),
abs_path: self.abs_path().to_string_lossy().into(),
root_name,
updated_entries: self.entries_by_path.iter().map(Into::into).collect(),
removed_entries: Default::default(),
scan_id: self.scan_id as u64,
is_last_update: true,
updated_repositories: self.repository_entries.values().map(Into::into).collect(),
removed_repositories: Default::default(),
}
}
pub(crate) fn build_update(
fn build_update(
&self,
other: &Self,
project_id: u64,
worktree_id: u64,
include_ignored: bool,
entry_changes: UpdatedEntriesSet,
repo_changes: UpdatedGitRepositoriesSet,
) -> proto::UpdateWorktree {
let mut updated_entries = Vec::new();
let mut removed_entries = Vec::new();
let mut self_entries = self
.entries_by_id
.cursor::<()>()
.filter(|e| include_ignored || !e.is_ignored)
.peekable();
let mut other_entries = other
.entries_by_id
.cursor::<()>()
.filter(|e| include_ignored || !e.is_ignored)
.peekable();
loop {
match (self_entries.peek(), other_entries.peek()) {
(Some(self_entry), Some(other_entry)) => {
match Ord::cmp(&self_entry.id, &other_entry.id) {
Ordering::Less => {
let entry = self.entry_for_id(self_entry.id).unwrap().into();
updated_entries.push(entry);
self_entries.next();
}
Ordering::Equal => {
if self_entry.scan_id != other_entry.scan_id {
let entry = self.entry_for_id(self_entry.id).unwrap().into();
updated_entries.push(entry);
}
self_entries.next();
other_entries.next();
}
Ordering::Greater => {
removed_entries.push(other_entry.id.to_proto());
other_entries.next();
}
}
}
(Some(self_entry), None) => {
let entry = self.entry_for_id(self_entry.id).unwrap().into();
updated_entries.push(entry);
self_entries.next();
}
(None, Some(other_entry)) => {
removed_entries.push(other_entry.id.to_proto());
other_entries.next();
}
(None, None) => break,
}
}
let mut updated_repositories: Vec<proto::RepositoryEntry> = Vec::new();
let mut updated_repositories = Vec::new();
let mut removed_repositories = Vec::new();
let mut self_repos = self.snapshot.repository_entries.iter().peekable();
let mut other_repos = other.snapshot.repository_entries.iter().peekable();
loop {
match (self_repos.peek(), other_repos.peek()) {
(Some((self_work_dir, self_repo)), Some((other_work_dir, other_repo))) => {
match Ord::cmp(self_work_dir, other_work_dir) {
Ordering::Less => {
updated_repositories.push((*self_repo).into());
self_repos.next();
}
Ordering::Equal => {
if self_repo != other_repo {
updated_repositories.push(self_repo.build_update(other_repo));
}
self_repos.next();
other_repos.next();
}
Ordering::Greater => {
removed_repositories.push(other_repo.work_directory.to_proto());
other_repos.next();
}
}
}
(Some((_, self_repo)), None) => {
updated_repositories.push((*self_repo).into());
self_repos.next();
}
(None, Some((_, other_repo))) => {
removed_repositories.push(other_repo.work_directory.to_proto());
other_repos.next();
}
(None, None) => break,
for (_, entry_id, path_change) in entry_changes.iter() {
if let PathChange::Removed = path_change {
removed_entries.push(entry_id.0 as u64);
} else if let Some(entry) = self.entry_for_id(*entry_id) {
updated_entries.push(proto::Entry::from(entry));
}
}
for (work_dir_path, old_repo) in repo_changes.iter() {
let new_repo = self
.repository_entries
.get(&RepositoryWorkDirectory(work_dir_path.clone()));
match (old_repo, new_repo) {
(Some(old_repo), Some(new_repo)) => {
updated_repositories.push(new_repo.build_update(old_repo));
}
(None, Some(new_repo)) => {
updated_repositories.push(proto::RepositoryEntry::from(new_repo));
}
(Some(old_repo), None) => {
removed_repositories.push(old_repo.work_directory.0.to_proto());
}
_ => {}
}
}
removed_entries.sort_unstable();
updated_entries.sort_unstable_by_key(|e| e.id);
removed_repositories.sort_unstable();
updated_repositories.sort_unstable_by_key(|e| e.work_directory_id);
// TODO - optimize, knowing that removed_entries are sorted.
removed_entries.retain(|id| updated_entries.binary_search_by_key(id, |e| e.id).is_err());
proto::UpdateWorktree {
project_id,
@ -1946,6 +1930,35 @@ impl LocalSnapshot {
}
}
fn build_initial_update(&self, project_id: u64, worktree_id: u64) -> proto::UpdateWorktree {
let mut updated_entries = self
.entries_by_path
.iter()
.map(proto::Entry::from)
.collect::<Vec<_>>();
updated_entries.sort_unstable_by_key(|e| e.id);
let mut updated_repositories = self
.repository_entries
.values()
.map(proto::RepositoryEntry::from)
.collect::<Vec<_>>();
updated_repositories.sort_unstable_by_key(|e| e.work_directory_id);
proto::UpdateWorktree {
project_id,
worktree_id,
abs_path: self.abs_path().to_string_lossy().into(),
root_name: self.root_name().to_string(),
updated_entries,
removed_entries: Vec::new(),
scan_id: self.scan_id as u64,
is_last_update: self.completed_scan_id == self.scan_id,
updated_repositories,
removed_repositories: Vec::new(),
}
}
fn insert_entry(&mut self, mut entry: Entry, fs: &dyn Fs) -> Entry {
if entry.is_file() && entry.path.file_name() == Some(&GITIGNORE) {
let abs_path = self.abs_path.join(&entry.path);
@ -2481,6 +2494,9 @@ pub enum PathChange {
Loaded,
}
pub type UpdatedEntriesSet = Arc<[(Arc<Path>, ProjectEntryId, PathChange)]>;
pub type UpdatedGitRepositoriesSet = Arc<[(Arc<Path>, Option<RepositoryEntry>)]>;
impl Entry {
fn new(
path: Arc<Path>,
@ -2896,11 +2912,13 @@ impl BackgroundScanner {
fn send_status_update(&self, scanning: bool, barrier: Option<barrier::Sender>) -> bool {
let mut state = self.state.lock();
if state.changed_paths.is_empty() && scanning {
return true;
}
let new_snapshot = state.snapshot.clone();
let old_snapshot = mem::replace(&mut state.prev_snapshot, new_snapshot.snapshot.clone());
let changes =
self.build_change_set(&old_snapshot, &new_snapshot.snapshot, &state.changed_paths);
let changes = self.build_change_set(&old_snapshot, &new_snapshot, &state.changed_paths);
state.changed_paths.clear();
self.status_updates_tx
@ -3386,9 +3404,20 @@ impl BackgroundScanner {
}
}
let snapshot = &mut self.state.lock().snapshot;
snapshot.entries_by_path.edit(entries_by_path_edits, &());
snapshot.entries_by_id.edit(entries_by_id_edits, &());
let state = &mut self.state.lock();
for edit in &entries_by_path_edits {
if let Edit::Insert(entry) = edit {
if let Err(ix) = state.changed_paths.binary_search(&entry.path) {
state.changed_paths.insert(ix, entry.path.clone());
}
}
}
state
.snapshot
.entries_by_path
.edit(entries_by_path_edits, &());
state.snapshot.entries_by_id.edit(entries_by_id_edits, &());
}
fn build_change_set(
@ -3396,16 +3425,17 @@ impl BackgroundScanner {
old_snapshot: &Snapshot,
new_snapshot: &Snapshot,
event_paths: &[Arc<Path>],
) -> Arc<[(Arc<Path>, ProjectEntryId, PathChange)]> {
) -> UpdatedEntriesSet {
use BackgroundScannerPhase::*;
use PathChange::{Added, AddedOrUpdated, Loaded, Removed, Updated};
// Identify which paths have changed. Use the known set of changed
// parent paths to optimize the search.
let mut changes = Vec::new();
let mut old_paths = old_snapshot.entries_by_path.cursor::<PathKey>();
let mut new_paths = new_snapshot.entries_by_path.cursor::<PathKey>();
old_paths.next(&());
new_paths.next(&());
let mut changes = Vec::new();
for path in event_paths {
let path = PathKey(path.clone());
if old_paths.item().map_or(false, |e| e.path < path.0) {
@ -3441,7 +3471,10 @@ impl BackgroundScanner {
new_entry.id,
AddedOrUpdated,
));
} else if old_entry.mtime != new_entry.mtime {
} else if old_entry.id != new_entry.id {
changes.push((old_entry.path.clone(), old_entry.id, Removed));
changes.push((new_entry.path.clone(), new_entry.id, Added));
} else if old_entry != new_entry {
changes.push((new_entry.path.clone(), new_entry.id, Updated));
}
old_paths.next(&());
@ -3543,8 +3576,6 @@ impl WorktreeHandle for ModelHandle<Worktree> {
&self,
cx: &'a gpui::TestAppContext,
) -> futures::future::LocalBoxFuture<'a, ()> {
use smol::future::FutureExt;
let filename = "fs-event-sentinel";
let tree = self.clone();
let (fs, root_path) = self.read_with(cx, |tree, _| {
@ -4207,7 +4238,18 @@ mod tests {
.await
.unwrap();
let mut snapshot1 = tree.update(cx, |tree, _| tree.as_local().unwrap().snapshot());
let snapshot1 = tree.update(cx, |tree, cx| {
let tree = tree.as_local_mut().unwrap();
let snapshot = Arc::new(Mutex::new(tree.snapshot()));
let _ = tree.observe_updates(0, cx, {
let snapshot = snapshot.clone();
move |update| {
snapshot.lock().apply_remote_update(update).unwrap();
async { true }
}
});
snapshot
});
let entry = tree
.update(cx, |tree, cx| {
@ -4225,9 +4267,10 @@ mod tests {
});
let snapshot2 = tree.update(cx, |tree, _| tree.as_local().unwrap().snapshot());
let update = snapshot2.build_update(&snapshot1, 0, 0, true);
snapshot1.apply_remote_update(update).unwrap();
assert_eq!(snapshot1.to_vec(true), snapshot2.to_vec(true),);
assert_eq!(
snapshot1.lock().entries(true).collect::<Vec<_>>(),
snapshot2.entries(true).collect::<Vec<_>>()
);
}
#[gpui::test(iterations = 100)]
@ -4262,7 +4305,20 @@ mod tests {
.await
.unwrap();
let mut snapshot = worktree.update(cx, |tree, _| tree.as_local().unwrap().snapshot());
let mut snapshots =
vec![worktree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot())];
let updates = Arc::new(Mutex::new(Vec::new()));
worktree.update(cx, |tree, cx| {
check_worktree_change_events(tree, cx);
let _ = tree.as_local_mut().unwrap().observe_updates(0, cx, {
let updates = updates.clone();
move |update| {
updates.lock().push(update);
async { true }
}
});
});
for _ in 0..operations {
worktree
@ -4276,35 +4332,39 @@ mod tests {
});
if rng.gen_bool(0.6) {
let new_snapshot =
worktree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot());
let update = new_snapshot.build_update(&snapshot, 0, 0, true);
snapshot.apply_remote_update(update.clone()).unwrap();
assert_eq!(
snapshot.to_vec(true),
new_snapshot.to_vec(true),
"incorrect snapshot after update {:?}",
update
);
snapshots
.push(worktree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot()));
}
}
worktree
.update(cx, |tree, _| tree.as_local_mut().unwrap().scan_complete())
.await;
worktree.read_with(cx, |tree, _| {
tree.as_local().unwrap().snapshot.check_invariants()
cx.foreground().run_until_parked();
let final_snapshot = worktree.read_with(cx, |tree, _| {
let tree = tree.as_local().unwrap();
tree.snapshot.check_invariants();
tree.snapshot()
});
let new_snapshot = worktree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot());
let update = new_snapshot.build_update(&snapshot, 0, 0, true);
snapshot.apply_remote_update(update.clone()).unwrap();
assert_eq!(
snapshot.to_vec(true),
new_snapshot.to_vec(true),
"incorrect snapshot after update {:?}",
update
);
for (i, snapshot) in snapshots.into_iter().enumerate().rev() {
let mut updated_snapshot = snapshot.clone();
for update in updates.lock().iter() {
if update.scan_id >= updated_snapshot.scan_id() as u64 {
updated_snapshot
.apply_remote_update(update.clone())
.unwrap();
}
}
assert_eq!(
updated_snapshot.entries(true).collect::<Vec<_>>(),
final_snapshot.entries(true).collect::<Vec<_>>(),
"wrong updates after snapshot {i}: {snapshot:#?} {updates:#?}",
);
}
}
#[gpui::test(iterations = 100)]
@ -4336,55 +4396,17 @@ mod tests {
.await
.unwrap();
// The worktree's `UpdatedEntries` event can be used to follow along with
// all changes to the worktree's snapshot.
let updates = Arc::new(Mutex::new(Vec::new()));
worktree.update(cx, |tree, cx| {
let mut paths = tree
.entries(true)
.map(|e| (e.path.clone(), e.mtime))
.collect::<Vec<_>>();
check_worktree_change_events(tree, cx);
cx.subscribe(&worktree, move |tree, _, event, _| {
if let Event::UpdatedEntries(changes) = event {
for (path, _, change_type) in changes.iter() {
let mtime = tree.entry_for_path(&path).map(|e| e.mtime);
let path = path.clone();
let ix = match paths.binary_search_by_key(&&path, |e| &e.0) {
Ok(ix) | Err(ix) => ix,
};
match change_type {
PathChange::Loaded => {
paths.insert(ix, (path, mtime.unwrap()));
}
PathChange::Added => {
paths.insert(ix, (path, mtime.unwrap()));
}
PathChange::Removed => {
paths.remove(ix);
}
PathChange::Updated => {
let entry = paths.get_mut(ix).unwrap();
assert_eq!(entry.0, path);
entry.1 = mtime.unwrap();
}
PathChange::AddedOrUpdated => {
if paths.get(ix).map(|e| &e.0) == Some(&path) {
paths.get_mut(ix).unwrap().1 = mtime.unwrap();
} else {
paths.insert(ix, (path, mtime.unwrap()));
}
}
}
}
let new_paths = tree
.entries(true)
.map(|e| (e.path.clone(), e.mtime))
.collect::<Vec<_>>();
assert_eq!(paths, new_paths, "incorrect changes: {:?}", changes);
let _ = tree.as_local_mut().unwrap().observe_updates(0, cx, {
let updates = updates.clone();
move |update| {
updates.lock().push(update);
async { true }
}
})
.detach();
});
});
worktree
@ -4447,38 +4469,64 @@ mod tests {
.await;
let new_snapshot =
new_worktree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot());
assert_eq!(snapshot.to_vec(true), new_snapshot.to_vec(true));
}
for (i, mut prev_snapshot) in snapshots.into_iter().enumerate() {
let include_ignored = rng.gen::<bool>();
if !include_ignored {
let mut entries_by_path_edits = Vec::new();
let mut entries_by_id_edits = Vec::new();
for entry in prev_snapshot
.entries_by_id
.cursor::<()>()
.filter(|e| e.is_ignored)
{
entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
entries_by_id_edits.push(Edit::Remove(entry.id));
}
prev_snapshot
.entries_by_path
.edit(entries_by_path_edits, &());
prev_snapshot.entries_by_id.edit(entries_by_id_edits, &());
}
let update = snapshot.build_update(&prev_snapshot, 0, 0, include_ignored);
prev_snapshot.apply_remote_update(update.clone()).unwrap();
assert_eq!(
prev_snapshot.to_vec(include_ignored),
snapshot.to_vec(include_ignored),
"wrong update for snapshot {i}. update: {:?}",
update
snapshot.entries_without_ids(true),
new_snapshot.entries_without_ids(true)
);
}
for (i, mut prev_snapshot) in snapshots.into_iter().enumerate().rev() {
for update in updates.lock().iter() {
if update.scan_id >= prev_snapshot.scan_id() as u64 {
prev_snapshot.apply_remote_update(update.clone()).unwrap();
}
}
assert_eq!(
prev_snapshot.entries(true).collect::<Vec<_>>(),
snapshot.entries(true).collect::<Vec<_>>(),
"wrong updates after snapshot {i}: {updates:#?}",
);
}
}
// The worktree's `UpdatedEntries` event can be used to follow along with
// all changes to the worktree's snapshot.
fn check_worktree_change_events(tree: &mut Worktree, cx: &mut ModelContext<Worktree>) {
let mut entries = tree.entries(true).cloned().collect::<Vec<_>>();
cx.subscribe(&cx.handle(), move |tree, _, event, _| {
if let Event::UpdatedEntries(changes) = event {
for (path, _, change_type) in changes.iter() {
let entry = tree.entry_for_path(&path).cloned();
let ix = match entries.binary_search_by_key(&path, |e| &e.path) {
Ok(ix) | Err(ix) => ix,
};
match change_type {
PathChange::Loaded => entries.insert(ix, entry.unwrap()),
PathChange::Added => entries.insert(ix, entry.unwrap()),
PathChange::Removed => drop(entries.remove(ix)),
PathChange::Updated => {
let entry = entry.unwrap();
let existing_entry = entries.get_mut(ix).unwrap();
assert_eq!(existing_entry.path, entry.path);
*existing_entry = entry;
}
PathChange::AddedOrUpdated => {
let entry = entry.unwrap();
if entries.get(ix).map(|e| &e.path) == Some(&entry.path) {
*entries.get_mut(ix).unwrap() = entry;
} else {
entries.insert(ix, entry);
}
}
}
}
let new_entries = tree.entries(true).cloned().collect::<Vec<_>>();
assert_eq!(entries, new_entries, "incorrect changes: {:?}", changes);
}
})
.detach();
}
fn randomly_mutate_worktree(
@ -4772,7 +4820,7 @@ mod tests {
}
}
fn to_vec(&self, include_ignored: bool) -> Vec<(&Path, u64, bool)> {
fn entries_without_ids(&self, include_ignored: bool) -> Vec<(&Path, u64, bool)> {
let mut paths = Vec::new();
for entry in self.entries_by_path.cursor::<()>() {
if include_ignored || !entry.is_ignored {
@ -4964,8 +5012,8 @@ mod tests {
assert_eq!(
repo_update_events.lock()[0]
.keys()
.cloned()
.iter()
.map(|e| e.0.clone())
.collect::<Vec<Arc<Path>>>(),
vec![Path::new("dir1").into()]
);