diff --git a/Cargo.lock b/Cargo.lock index 93ea32f333..a7510819df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12898,7 +12898,6 @@ dependencies = [ "gpui", "http 0.1.0", "ignore", - "itertools 0.11.0", "language", "log", "parking_lot", diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index e9608cd404..fa4b3241b1 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -1429,6 +1429,31 @@ impl Client { } } + pub fn request_dynamic( + &self, + envelope: proto::Envelope, + request_type: &'static str, + ) -> impl Future> { + let client_id = self.id(); + log::debug!( + "rpc request start. client_id:{}. name:{}", + client_id, + request_type + ); + let response = self + .connection_id() + .map(|conn_id| self.peer.request_dynamic(conn_id, envelope, request_type)); + async move { + let response = response?.await; + log::debug!( + "rpc request finish. client_id:{}. name:{}", + client_id, + request_type + ); + Ok(response?.0) + } + } + fn respond(&self, receipt: Receipt, response: T::Response) -> Result<()> { log::debug!("rpc respond. client_id:{}. name:{}", self.id(), T::NAME); self.peer.respond(receipt, response) diff --git a/crates/collab/src/tests/editor_tests.rs b/crates/collab/src/tests/editor_tests.rs index d6b099d175..739adec242 100644 --- a/crates/collab/src/tests/editor_tests.rs +++ b/crates/collab/src/tests/editor_tests.rs @@ -83,10 +83,7 @@ async fn test_host_disconnect( let project_b = client_b.build_dev_server_project(project_id, cx_b).await; cx_a.background_executor.run_until_parked(); - assert!(worktree_a.read_with(cx_a, |tree, _| tree - .as_local() - .unwrap() - .has_update_observer())); + assert!(worktree_a.read_with(cx_a, |tree, _| tree.has_update_observer())); let workspace_b = cx_b .add_window(|cx| Workspace::new(None, project_b.clone(), client_b.app_state.clone(), cx)); @@ -123,10 +120,7 @@ async fn test_host_disconnect( project_b.read_with(cx_b, |project, _| project.is_read_only()); - assert!(worktree_a.read_with(cx_a, |tree, _| !tree - .as_local() - .unwrap() - .has_update_observer())); + assert!(worktree_a.read_with(cx_a, |tree, _| !tree.has_update_observer())); // Ensure client B's edited state is reset and that the whole window is blurred. diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index 77cc9bab29..5193e84327 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -1378,10 +1378,7 @@ async fn test_unshare_project( let project_b = client_b.build_dev_server_project(project_id, cx_b).await; executor.run_until_parked(); - assert!(worktree_a.read_with(cx_a, |tree, _| tree - .as_local() - .unwrap() - .has_update_observer())); + assert!(worktree_a.read_with(cx_a, |tree, _| tree.has_update_observer())); project_b .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)) @@ -1406,10 +1403,7 @@ async fn test_unshare_project( .unwrap(); executor.run_until_parked(); - assert!(worktree_a.read_with(cx_a, |tree, _| !tree - .as_local() - .unwrap() - .has_update_observer())); + assert!(worktree_a.read_with(cx_a, |tree, _| !tree.has_update_observer())); assert!(project_c.read_with(cx_c, |project, _| project.is_disconnected())); @@ -1421,10 +1415,7 @@ async fn test_unshare_project( let project_c2 = client_c.build_dev_server_project(project_id, cx_c).await; executor.run_until_parked(); - assert!(worktree_a.read_with(cx_a, |tree, _| tree - .as_local() - .unwrap() - .has_update_observer())); + assert!(worktree_a.read_with(cx_a, |tree, _| tree.has_update_observer())); project_c2 .update(cx_c, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)) .await @@ -1531,7 +1522,7 @@ async fn test_project_reconnect( executor.run_until_parked(); let worktree1_id = worktree_a1.read_with(cx_a, |worktree, _| { - assert!(worktree.as_local().unwrap().has_update_observer()); + assert!(worktree.has_update_observer()); worktree.id() }); let (worktree_a2, _) = project_a1 @@ -1543,7 +1534,7 @@ async fn test_project_reconnect( executor.run_until_parked(); let worktree2_id = worktree_a2.read_with(cx_a, |tree, _| { - assert!(tree.as_local().unwrap().has_update_observer()); + assert!(tree.has_update_observer()); tree.id() }); executor.run_until_parked(); @@ -1576,9 +1567,7 @@ async fn test_project_reconnect( assert_eq!(project.collaborators().len(), 1); }); - worktree_a1.read_with(cx_a, |tree, _| { - assert!(tree.as_local().unwrap().has_update_observer()) - }); + worktree_a1.read_with(cx_a, |tree, _| assert!(tree.has_update_observer())); // While client A is disconnected, add and remove files from client A's project. client_a @@ -1620,7 +1609,7 @@ async fn test_project_reconnect( .await; let worktree3_id = worktree_a3.read_with(cx_a, |tree, _| { - assert!(!tree.as_local().unwrap().has_update_observer()); + assert!(!tree.has_update_observer()); tree.id() }); executor.run_until_parked(); @@ -1643,11 +1632,7 @@ async fn test_project_reconnect( project_a1.read_with(cx_a, |project, cx| { assert!(project.is_shared()); - assert!(worktree_a1 - .read(cx) - .as_local() - .unwrap() - .has_update_observer()); + assert!(worktree_a1.read(cx).has_update_observer()); assert_eq!( worktree_a1 .read(cx) @@ -1665,11 +1650,7 @@ async fn test_project_reconnect( "subdir2/i.txt" ] ); - assert!(worktree_a3 - .read(cx) - .as_local() - .unwrap() - .has_update_observer()); + assert!(worktree_a3.read(cx).has_update_observer()); assert_eq!( worktree_a3 .read(cx) @@ -1750,7 +1731,7 @@ async fn test_project_reconnect( executor.run_until_parked(); let worktree4_id = worktree_a4.read_with(cx_a, |tree, _| { - assert!(tree.as_local().unwrap().has_update_observer()); + assert!(tree.has_update_observer()); tree.id() }); project_a1.update(cx_a, |project, cx| { diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index c2cb6808a7..a70e15ac75 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -27,6 +27,7 @@ use futures::{ oneshot, }, future::{join_all, try_join_all, Shared}, + prelude::future::BoxFuture, select, stream::FuturesUnordered, AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt, @@ -38,6 +39,7 @@ use gpui::{ AnyModel, AppContext, AsyncAppContext, BackgroundExecutor, BorrowAppContext, Context, Entity, EventEmitter, Model, ModelContext, PromptLevel, SharedString, Task, WeakModel, WindowContext, }; +use http::{HttpClient, Url}; use itertools::Itertools; use language::{ language_settings::{language_settings, FormatOnSave, Formatter, InlayHintKind}, @@ -66,19 +68,16 @@ use postage::watch; use prettier_support::{DefaultPrettier, PrettierInstance}; use project_settings::{LspSettings, ProjectSettings}; use rand::prelude::*; -use search_history::SearchHistory; -use snippet::Snippet; -use worktree::{CreatedEntry, LocalSnapshot}; - -use http::{HttpClient, Url}; use rpc::{ErrorCode, ErrorExt as _}; use search::SearchQuery; +use search_history::SearchHistory; use serde::Serialize; use settings::{watch_config_file, Settings, SettingsLocation, SettingsStore}; use sha2::{Digest, Sha256}; use similar::{ChangeTag, TextDiff}; use smol::channel::{Receiver, Sender}; use smol::lock::Semaphore; +use snippet::Snippet; use std::{ borrow::Cow, cmp::{self, Ordering}, @@ -111,7 +110,7 @@ use util::{ }, post_inc, ResultExt, TryFutureExt as _, }; -use worktree::{Snapshot, Traversal}; +use worktree::{CreatedEntry, RemoteWorktreeClient, Snapshot, Traversal}; pub use fs::*; pub use language::Location; @@ -858,7 +857,13 @@ impl Project { // That's because Worktree's identifier is entity id, which should probably be changed. let mut worktrees = Vec::new(); for worktree in response.payload.worktrees { - let worktree = Worktree::remote(replica_id, worktree, cx); + let worktree = Worktree::remote( + remote_id, + replica_id, + worktree, + Box::new(CollabRemoteWorktreeClient(client.clone())), + cx, + ); worktrees.push(worktree); } @@ -1453,47 +1458,9 @@ impl Project { "No worktree for path {project_path:?}" )))); }; - if self.is_local() { - worktree.update(cx, |worktree, cx| { - worktree - .as_local_mut() - .unwrap() - .create_entry(project_path.path, is_directory, cx) - }) - } else { - let client = self.client.clone(); - let project_id = self.remote_id().unwrap(); - cx.spawn(move |_, mut cx| async move { - let response = client - .request(proto::CreateProjectEntry { - worktree_id: project_path.worktree_id.to_proto(), - project_id, - path: project_path.path.to_string_lossy().into(), - is_directory, - }) - .await?; - match response.entry { - Some(entry) => worktree - .update(&mut cx, |worktree, cx| { - worktree.as_remote_mut().unwrap().insert_entry( - entry, - response.worktree_scan_id as usize, - cx, - ) - })? - .await - .map(CreatedEntry::Included), - None => { - let abs_path = worktree.update(&mut cx, |worktree, _| { - worktree - .absolutize(&project_path.path) - .with_context(|| format!("absolutizing {project_path:?}")) - })??; - Ok(CreatedEntry::Excluded { abs_path }) - } - } - }) - } + worktree.update(cx, |worktree, cx| { + worktree.create_entry(project_path.path, is_directory, cx) + }) } pub fn copy_entry( @@ -1505,41 +1472,9 @@ impl Project { let Some(worktree) = self.worktree_for_entry(entry_id, cx) else { return Task::ready(Ok(None)); }; - let new_path = new_path.into(); - if self.is_local() { - worktree.update(cx, |worktree, cx| { - worktree - .as_local_mut() - .unwrap() - .copy_entry(entry_id, new_path, cx) - }) - } else { - let client = self.client.clone(); - let project_id = self.remote_id().unwrap(); - - cx.spawn(move |_, mut cx| async move { - let response = client - .request(proto::CopyProjectEntry { - project_id, - entry_id: entry_id.to_proto(), - new_path: new_path.to_string_lossy().into(), - }) - .await?; - match response.entry { - Some(entry) => worktree - .update(&mut cx, |worktree, cx| { - worktree.as_remote_mut().unwrap().insert_entry( - entry, - response.worktree_scan_id as usize, - cx, - ) - })? - .await - .map(Some), - None => Ok(None), - } - }) - } + worktree.update(cx, |worktree, cx| { + worktree.copy_entry(entry_id, new_path, cx) + }) } pub fn rename_entry( @@ -1551,48 +1486,9 @@ impl Project { let Some(worktree) = self.worktree_for_entry(entry_id, cx) else { return Task::ready(Err(anyhow!(format!("No worktree for entry {entry_id:?}")))); }; - let new_path = new_path.into(); - if self.is_local() { - worktree.update(cx, |worktree, cx| { - worktree - .as_local_mut() - .unwrap() - .rename_entry(entry_id, new_path, cx) - }) - } else { - let client = self.client.clone(); - let project_id = self.remote_id().unwrap(); - - cx.spawn(move |_, mut cx| async move { - let response = client - .request(proto::RenameProjectEntry { - project_id, - entry_id: entry_id.to_proto(), - new_path: new_path.to_string_lossy().into(), - }) - .await?; - match response.entry { - Some(entry) => worktree - .update(&mut cx, |worktree, cx| { - worktree.as_remote_mut().unwrap().insert_entry( - entry, - response.worktree_scan_id as usize, - cx, - ) - })? - .await - .map(CreatedEntry::Included), - None => { - let abs_path = worktree.update(&mut cx, |worktree, _| { - worktree - .absolutize(&new_path) - .with_context(|| format!("absolutizing {new_path:?}")) - })??; - Ok(CreatedEntry::Excluded { abs_path }) - } - } - }) - } + worktree.update(cx, |worktree, cx| { + worktree.rename_entry(entry_id, new_path, cx) + }) } pub fn delete_entry( @@ -1602,38 +1498,10 @@ impl Project { cx: &mut ModelContext, ) -> Option>> { let worktree = self.worktree_for_entry(entry_id, cx)?; - cx.emit(Event::DeletedEntry(entry_id)); - - if self.is_local() { - worktree.update(cx, |worktree, cx| { - worktree - .as_local_mut() - .unwrap() - .delete_entry(entry_id, trash, cx) - }) - } else { - let client = self.client.clone(); - let project_id = self.remote_id().unwrap(); - Some(cx.spawn(move |_, mut cx| async move { - let response = client - .request(proto::DeleteProjectEntry { - project_id, - entry_id: entry_id.to_proto(), - use_trash: trash, - }) - .await?; - worktree - .update(&mut cx, move |worktree, cx| { - worktree.as_remote_mut().unwrap().delete_entry( - entry_id, - response.worktree_scan_id as usize, - cx, - ) - })? - .await - })) - } + worktree.update(cx, |worktree, cx| { + worktree.delete_entry(entry_id, trash, cx) + }) } pub fn expand_entry( @@ -1643,31 +1511,7 @@ impl Project { cx: &mut ModelContext, ) -> Option>> { let worktree = self.worktree_for_id(worktree_id, cx)?; - if self.is_local() { - worktree.update(cx, |worktree, cx| { - worktree.as_local_mut().unwrap().expand_entry(entry_id, cx) - }) - } else { - let worktree = worktree.downgrade(); - let request = self.client.request(proto::ExpandProjectEntry { - project_id: self.remote_id().unwrap(), - entry_id: entry_id.to_proto(), - }); - Some(cx.spawn(move |_, mut cx| async move { - let response = request.await?; - if let Some(worktree) = worktree.upgrade() { - worktree - .update(&mut cx, |worktree, _| { - worktree - .as_remote_mut() - .unwrap() - .wait_for_snapshot(response.worktree_scan_id as usize) - })? - .await?; - } - Ok(()) - })) - } + worktree.update(cx, |worktree, cx| worktree.expand_entry(entry_id, cx)) } pub fn shared(&mut self, project_id: u64, cx: &mut ModelContext) -> Result<()> { @@ -1785,18 +1629,12 @@ impl Project { } } - worktree.as_local_mut().unwrap().observe_updates( - project_id, - cx, - { - let client = client.clone(); - move |update| { - client - .request(update) - .map(|result| result.is_ok()) - } - }, - ); + worktree.observe_updates(project_id, cx, { + let client = client.clone(); + move |update| { + client.request(update).map(|result| result.is_ok()) + } + }); anyhow::Ok(()) })?; @@ -1947,7 +1785,7 @@ impl Project { for worktree_handle in self.worktrees.iter_mut() { if let WorktreeHandle::Strong(worktree) = worktree_handle { let is_visible = worktree.update(cx, |worktree, _| { - worktree.as_local_mut().unwrap().stop_observing_updates(); + worktree.stop_observing_updates(); worktree.is_visible() }); if !is_visible { @@ -2230,21 +2068,20 @@ impl Project { cx: &mut ModelContext, ) -> Task>> { let load_buffer = worktree.update(cx, |worktree, cx| { - let worktree = worktree.as_local_mut().unwrap(); - let file = worktree.load_file(path.as_ref(), cx); + let load_file = worktree.load_file(path.as_ref(), cx); let reservation = cx.reserve_model(); let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64()); cx.spawn(move |_, mut cx| async move { - let (file, contents, diff_base) = file.await?; + let loaded = load_file.await?; let text_buffer = cx .background_executor() - .spawn(async move { text::Buffer::new(0, buffer_id, contents) }) + .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) }) .await; cx.insert_model(reservation, |_| { Buffer::build( text_buffer, - diff_base, - Some(Arc::new(file)), + loaded.diff_base, + Some(loaded.file), Capability::ReadWrite, ) }) @@ -2398,10 +2235,11 @@ impl Project { }; let worktree = file.worktree.clone(); let path = file.path.clone(); - worktree.update(cx, |worktree, cx| match worktree { - Worktree::Local(worktree) => self.save_local_buffer(&worktree, buffer, path, false, cx), - Worktree::Remote(_) => self.save_remote_buffer(buffer, None, cx), - }) + if self.is_local() { + self.save_local_buffer(worktree, buffer, path, false, cx) + } else { + self.save_remote_buffer(buffer, None, cx) + } } pub fn save_buffer_as( @@ -2410,26 +2248,21 @@ impl Project { path: ProjectPath, cx: &mut ModelContext, ) -> Task> { - let old_file = File::from_dyn(buffer.read(cx).file()) - .filter(|f| f.is_local()) - .cloned(); + let old_file = File::from_dyn(buffer.read(cx).file()).cloned(); let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) else { return Task::ready(Err(anyhow!("worktree does not exist"))); }; cx.spawn(move |this, mut cx| async move { this.update(&mut cx, |this, cx| { - if let Some(old_file) = &old_file { - this.unregister_buffer_from_language_servers(&buffer, old_file, cx); + if this.is_local() { + if let Some(old_file) = &old_file { + this.unregister_buffer_from_language_servers(&buffer, old_file, cx); + } + this.save_local_buffer(worktree, buffer.clone(), path.path, true, cx) + } else { + this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx) } - worktree.update(cx, |worktree, cx| match worktree { - Worktree::Local(worktree) => { - this.save_local_buffer(worktree, buffer.clone(), path.path, true, cx) - } - Worktree::Remote(_) => { - this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx) - } - }) })? .await?; @@ -2443,70 +2276,39 @@ impl Project { pub fn save_local_buffer( &self, - worktree: &LocalWorktree, + worktree: Model, buffer_handle: Model, path: Arc, mut has_changed_file: bool, - cx: &mut ModelContext, + cx: &mut ModelContext, ) -> Task> { let buffer = buffer_handle.read(cx); - - let rpc = self.client.clone(); - let buffer_id: u64 = buffer.remote_id().into(); - let project_id = self.remote_id(); - + let buffer_id = buffer.remote_id(); + let text = buffer.as_rope().clone(); + let line_ending = buffer.line_ending(); + let version = buffer.version(); if buffer.file().is_some_and(|file| !file.is_created()) { has_changed_file = true; } - let text = buffer.as_rope().clone(); - let version = buffer.version(); - let save = worktree.write_file(path.as_ref(), text, buffer.line_ending(), cx); - let fs = Arc::clone(&self.fs); - let abs_path = worktree.absolutize(&path); - let is_private = worktree.is_path_private(&path); - - cx.spawn(move |this, mut cx| async move { - let entry = save.await?; - let abs_path = abs_path?; - let this = this.upgrade().context("worktree dropped")?; - - let (entry_id, mtime, path, is_private) = match entry { - Some(entry) => (Some(entry.id), entry.mtime, entry.path, entry.is_private), - None => { - let metadata = fs - .metadata(&abs_path) - .await - .with_context(|| { - format!( - "Fetching metadata after saving the excluded buffer {abs_path:?}" - ) - })? - .with_context(|| { - format!("Excluded buffer {path:?} got removed during saving") - })?; - (None, Some(metadata.mtime), path, is_private) - } - }; + let save = worktree.update(cx, |worktree, cx| { + worktree.write_file(path.as_ref(), text, line_ending, cx) + }); + let client = self.client.clone(); + let project_id = self.remote_id(); + cx.spawn(move |_, mut cx| async move { + let new_file = save.await?; + let mtime = new_file.mtime; if has_changed_file { - let new_file = Arc::new(File { - entry_id, - worktree: this, - path, - mtime, - is_local: true, - is_deleted: false, - is_private, - }); - if let Some(project_id) = project_id { - rpc.send(proto::UpdateBufferFile { - project_id, - buffer_id, - file: Some(new_file.to_proto()), - }) - .log_err(); + client + .send(proto::UpdateBufferFile { + project_id, + buffer_id: buffer_id.into(), + file: Some(new_file.to_proto()), + }) + .log_err(); } buffer_handle.update(&mut cx, |buffer, cx| { @@ -2517,9 +2319,9 @@ impl Project { } if let Some(project_id) = project_id { - rpc.send(proto::BufferSaved { + client.send(proto::BufferSaved { project_id, - buffer_id, + buffer_id: buffer_id.into(), version: serialize_version(&version), mtime: mtime.map(|time| time.into()), })?; @@ -2537,7 +2339,7 @@ impl Project { &self, buffer_handle: Model, new_path: Option, - cx: &mut ModelContext, + cx: &mut ModelContext, ) -> Task> { let buffer = buffer_handle.read(cx); let buffer_id = buffer.remote_id().into(); @@ -2641,7 +2443,6 @@ impl Project { self.detect_language_for_buffer(buffer, cx); self.register_buffer_with_language_servers(buffer, cx); - // self.register_buffer_with_copilot(buffer, cx); cx.observe_release(buffer, |this, buffer, cx| { if let Some(file) = File::from_dyn(buffer.file()) { if file.is_local() { @@ -2791,16 +2592,6 @@ impl Project { }); } - // fn register_buffer_with_copilot( - // &self, - // buffer_handle: &Model, - // cx: &mut ModelContext, - // ) { - // if let Some(copilot) = Copilot::global(cx) { - // copilot.update(cx, |copilot, cx| copilot.register_buffer(buffer_handle, cx)); - // } - // } - async fn send_buffer_ordered_messages( this: WeakModel, rx: UnboundedReceiver, @@ -5521,7 +5312,7 @@ impl Project { ) -> Result> { let working_dir_path = buffer.update(cx, |buffer, cx| { let file = File::from_dyn(buffer.file())?; - let worktree = file.worktree.read(cx).as_local()?; + let worktree = file.worktree.read(cx); let mut worktree_path = worktree.abs_path().to_path_buf(); if worktree.root_entry()?.is_file() { worktree_path.pop(); @@ -5708,9 +5499,6 @@ impl Project { if !worktree.is_visible() { continue; } - let Some(worktree) = worktree.as_local() else { - continue; - }; let worktree_abs_path = worktree.abs_path().clone(); let (adapter, language, server) = match self.language_servers.get(server_id) { @@ -5874,15 +5662,14 @@ impl Project { let worktree_abs_path = if let Some(worktree_abs_path) = self .worktree_for_id(symbol.path.worktree_id, cx) - .and_then(|worktree| worktree.read(cx).as_local()) - .map(|local_worktree| local_worktree.abs_path()) + .map(|worktree| worktree.read(cx).abs_path()) { worktree_abs_path } else { return Task::ready(Err(anyhow!("worktree not found for symbol"))); }; - let symbol_abs_path = resolve_path(worktree_abs_path, &symbol.path.path); + let symbol_abs_path = resolve_path(&worktree_abs_path, &symbol.path.path); let symbol_uri = if let Ok(uri) = lsp::Url::from_file_path(symbol_abs_path) { uri } else { @@ -7234,8 +7021,8 @@ impl Project { let snapshots = self .visible_worktrees(cx) .filter_map(|tree| { - let tree = tree.read(cx).as_local()?; - Some(tree.snapshot()) + let tree = tree.read(cx); + Some((tree.snapshot(), tree.as_local()?.settings())) }) .collect::>(); let include_root = snapshots.len() > 1; @@ -7243,11 +7030,11 @@ impl Project { let background = cx.background_executor().clone(); let path_count: usize = snapshots .iter() - .map(|s| { + .map(|(snapshot, _)| { if query.include_ignored() { - s.file_count() + snapshot.file_count() } else { - s.visible_file_count() + snapshot.visible_file_count() } }) .sum(); @@ -7403,7 +7190,7 @@ impl Project { query: SearchQuery, include_root: bool, path_count: usize, - snapshots: Vec, + snapshots: Vec<(Snapshot, WorktreeSettings)>, matching_paths_tx: Sender, ) { let fs = &fs; @@ -7459,13 +7246,14 @@ impl Project { } if query.include_ignored() { - for snapshot in snapshots { + for (snapshot, settings) in snapshots { for ignored_entry in snapshot.entries(true).filter(|e| e.is_ignored) { let limiter = Arc::clone(&max_concurrent_workers); scope.spawn(async move { let _guard = limiter.acquire().await; search_ignored_entry( snapshot, + settings, ignored_entry, fs, query, @@ -8302,7 +8090,7 @@ impl Project { changes: &UpdatedEntriesSet, cx: &mut ModelContext, ) { - if worktree.read(cx).as_local().is_none() { + if worktree.read(cx).is_remote() { return; } let project_id = self.remote_id(); @@ -8607,14 +8395,12 @@ impl Project { self.worktree_for_id(project_path.worktree_id, cx)? .read(cx) .as_local()? - .snapshot() .local_git_repo(&project_path.path) } pub fn get_first_worktree_root_repo(&self, cx: &AppContext) -> Option> { let worktree = self.visible_worktrees(cx).next()?.read(cx).as_local()?; let root_entry = worktree.root_git_entry()?; - worktree.get_local_repo(&root_entry)?.repo().clone().into() } @@ -9016,21 +8802,7 @@ impl Project { this.worktree_for_id(worktree_id, cx) .ok_or_else(|| anyhow!("worktree not found")) })??; - let worktree_scan_id = worktree.update(&mut cx, |worktree, _| worktree.scan_id())?; - let entry = worktree - .update(&mut cx, |worktree, cx| { - let worktree = worktree.as_local_mut().unwrap(); - let path = PathBuf::from(envelope.payload.path); - worktree.create_entry(path, envelope.payload.is_directory, cx) - })? - .await?; - Ok(proto::ProjectEntryResponse { - entry: match &entry { - CreatedEntry::Included(entry) => Some(entry.into()), - CreatedEntry::Excluded { .. } => None, - }, - worktree_scan_id: worktree_scan_id as u64, - }) + Worktree::handle_create_entry(worktree, envelope.payload, cx).await } async fn handle_rename_project_entry( @@ -9044,23 +8816,7 @@ impl Project { this.worktree_for_entry(entry_id, cx) .ok_or_else(|| anyhow!("worktree not found")) })??; - let worktree_scan_id = worktree.update(&mut cx, |worktree, _| worktree.scan_id())?; - let entry = worktree - .update(&mut cx, |worktree, cx| { - let new_path = PathBuf::from(envelope.payload.new_path); - worktree - .as_local_mut() - .unwrap() - .rename_entry(entry_id, new_path, cx) - })? - .await?; - Ok(proto::ProjectEntryResponse { - entry: match &entry { - CreatedEntry::Included(entry) => Some(entry.into()), - CreatedEntry::Excluded { .. } => None, - }, - worktree_scan_id: worktree_scan_id as u64, - }) + Worktree::handle_rename_entry(worktree, envelope.payload, cx).await } async fn handle_copy_project_entry( @@ -9074,20 +8830,7 @@ impl Project { this.worktree_for_entry(entry_id, cx) .ok_or_else(|| anyhow!("worktree not found")) })??; - let worktree_scan_id = worktree.update(&mut cx, |worktree, _| worktree.scan_id())?; - let entry = worktree - .update(&mut cx, |worktree, cx| { - let new_path = PathBuf::from(envelope.payload.new_path); - worktree - .as_local_mut() - .unwrap() - .copy_entry(entry_id, new_path, cx) - })? - .await?; - Ok(proto::ProjectEntryResponse { - entry: entry.as_ref().map(|e| e.into()), - worktree_scan_id: worktree_scan_id as u64, - }) + Worktree::handle_copy_entry(worktree, envelope.payload, cx).await } async fn handle_delete_project_entry( @@ -9097,28 +8840,12 @@ impl Project { mut cx: AsyncAppContext, ) -> Result { let entry_id = ProjectEntryId::from_proto(envelope.payload.entry_id); - let trash = envelope.payload.use_trash; - - this.update(&mut cx, |_, cx| cx.emit(Event::DeletedEntry(entry_id)))?; - let worktree = this.update(&mut cx, |this, cx| { this.worktree_for_entry(entry_id, cx) .ok_or_else(|| anyhow!("worktree not found")) })??; - let worktree_scan_id = worktree.update(&mut cx, |worktree, _| worktree.scan_id())?; - worktree - .update(&mut cx, |worktree, cx| { - worktree - .as_local_mut() - .unwrap() - .delete_entry(entry_id, trash, cx) - .ok_or_else(|| anyhow!("invalid entry")) - })?? - .await?; - Ok(proto::ProjectEntryResponse { - entry: None, - worktree_scan_id: worktree_scan_id as u64, - }) + this.update(&mut cx, |_, cx| cx.emit(Event::DeletedEntry(entry_id)))?; + Worktree::handle_delete_entry(worktree, envelope.payload, cx).await } async fn handle_expand_project_entry( @@ -9131,17 +8858,7 @@ impl Project { let worktree = this .update(&mut cx, |this, cx| this.worktree_for_entry(entry_id, cx))? .ok_or_else(|| anyhow!("invalid request"))?; - worktree - .update(&mut cx, |worktree, cx| { - worktree - .as_local_mut() - .unwrap() - .expand_entry(entry_id, cx) - .ok_or_else(|| anyhow!("invalid entry")) - })?? - .await?; - let worktree_scan_id = worktree.update(&mut cx, |worktree, _| worktree.scan_id())? as u64; - Ok(proto::ExpandProjectEntryResponse { worktree_scan_id }) + Worktree::handle_expand_entry(worktree, envelope.payload, cx).await } async fn handle_update_diagnostic_summary( @@ -10594,6 +10311,7 @@ impl Project { cx: &mut ModelContext, ) -> Result<()> { let replica_id = self.replica_id(); + let remote_id = self.remote_id().ok_or_else(|| anyhow!("invalid project"))?; let mut old_worktrees_by_id = self .worktrees @@ -10610,8 +10328,16 @@ impl Project { { self.worktrees.push(WorktreeHandle::Strong(old_worktree)); } else { - let worktree = Worktree::remote(replica_id, worktree, cx); - let _ = self.add_worktree(&worktree, cx); + self.add_worktree( + &Worktree::remote( + remote_id, + replica_id, + worktree, + Box::new(CollabRemoteWorktreeClient(self.client.clone())), + cx, + ), + cx, + ); } } @@ -11374,7 +11100,7 @@ fn deserialize_code_actions(code_actions: &HashMap) -> Vec, + snapshots: &Vec<(Snapshot, WorktreeSettings)>, worker_start_ix: usize, worker_end_ix: usize, query: &SearchQuery, @@ -11386,7 +11112,7 @@ async fn search_snapshots( let mut snapshot_start_ix = 0; let mut abs_path = PathBuf::new(); - for snapshot in snapshots { + for (snapshot, _) in snapshots { let snapshot_end_ix = snapshot_start_ix + if query.include_ignored() { snapshot.file_count() @@ -11452,7 +11178,8 @@ async fn search_snapshots( } async fn search_ignored_entry( - snapshot: &LocalSnapshot, + snapshot: &Snapshot, + settings: &WorktreeSettings, ignored_entry: &Entry, fs: &Arc, query: &SearchQuery, @@ -11486,7 +11213,7 @@ async fn search_ignored_entry( } } else if !fs_metadata.is_symlink { if !query.file_matches(Some(&ignored_abs_path)) - || snapshot.is_path_excluded(&ignored_entry.path) + || settings.is_path_excluded(&ignored_entry.path) { continue; } @@ -11562,6 +11289,18 @@ impl OpenBuffer { } } +pub struct CollabRemoteWorktreeClient(Arc); + +impl RemoteWorktreeClient for CollabRemoteWorktreeClient { + fn request( + &self, + envelope: proto::Envelope, + request_type: &'static str, + ) -> BoxFuture<'static, Result> { + self.0.request_dynamic(envelope, request_type).boxed() + } +} + pub struct PathMatchCandidateSet { pub snapshot: Snapshot, pub include_ignored: bool, diff --git a/crates/project/src/project_tests.rs b/crates/project/src/project_tests.rs index d5096c78a2..d619a8f80f 100644 --- a/crates/project/src/project_tests.rs +++ b/crates/project/src/project_tests.rs @@ -2981,21 +2981,26 @@ async fn test_rescan_and_remote_updates(cx: &mut gpui::TestAppContext) { // Create a remote copy of this worktree. let tree = project.update(cx, |project, _| project.worktrees().next().unwrap()); - - let metadata = tree.update(cx, |tree, _| tree.as_local().unwrap().metadata_proto()); + let metadata = tree.update(cx, |tree, _| tree.metadata_proto()); let updates = Arc::new(Mutex::new(Vec::new())); tree.update(cx, |tree, cx| { - tree.as_local_mut().unwrap().observe_updates(0, cx, { - let updates = updates.clone(); - move |update| { - updates.lock().push(update); - async { true } - } + let updates = updates.clone(); + tree.observe_updates(0, cx, move |update| { + updates.lock().push(update); + async { true } }); }); - let remote = cx.update(|cx| Worktree::remote(1, metadata, cx)); + let remote = cx.update(|cx| { + Worktree::remote( + 0, + 1, + metadata, + Box::new(CollabRemoteWorktreeClient(project.read(cx).client())), + cx, + ) + }); cx.executor().run_until_parked(); diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 8e026953c1..c90a1cb35c 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -435,6 +435,7 @@ impl Peer { self.connections.write().clear(); } + /// Make a request and wait for a response. pub fn request( &self, receiver_id: ConnectionId, @@ -462,28 +463,50 @@ impl Peer { .map_ok(|envelope| envelope.payload) } - pub fn request_internal( + fn request_internal( &self, original_sender_id: Option, receiver_id: ConnectionId, request: T, ) -> impl Future>> { + let envelope = request.into_envelope(0, None, original_sender_id.map(Into::into)); + let response = self.request_dynamic(receiver_id, envelope, T::NAME); + async move { + let (response, received_at) = response.await?; + Ok(TypedEnvelope { + message_id: response.id, + sender_id: receiver_id, + original_sender_id: response.original_sender_id, + payload: T::Response::from_envelope(response) + .ok_or_else(|| anyhow!("received response of the wrong type"))?, + received_at, + }) + } + } + + /// Make a request and wait for a response. + /// + /// The caller must make sure to deserialize the response into the request's + /// response type. This interface is only useful in trait objects, where + /// generics can't be used. If you have a concrete type, use `request`. + pub fn request_dynamic( + &self, + receiver_id: ConnectionId, + mut envelope: proto::Envelope, + type_name: &'static str, + ) -> impl Future> { let (tx, rx) = oneshot::channel(); let send = self.connection_state(receiver_id).and_then(|connection| { - let message_id = connection.next_message_id.fetch_add(1, SeqCst); + envelope.id = connection.next_message_id.fetch_add(1, SeqCst); connection .response_channels .lock() .as_mut() .ok_or_else(|| anyhow!("connection was closed"))? - .insert(message_id, tx); + .insert(envelope.id, tx); connection .outgoing_tx - .unbounded_send(proto::Message::Envelope(request.into_envelope( - message_id, - None, - original_sender_id.map(Into::into), - ))) + .unbounded_send(proto::Message::Envelope(envelope)) .map_err(|_| anyhow!("connection was closed"))?; Ok(()) }); @@ -491,19 +514,10 @@ impl Peer { send?; let (response, received_at, _barrier) = rx.await.map_err(|_| anyhow!("connection was closed"))?; - if let Some(proto::envelope::Payload::Error(error)) = &response.payload { - Err(RpcError::from_proto(&error, T::NAME)) - } else { - Ok(TypedEnvelope { - message_id: response.id, - sender_id: receiver_id, - original_sender_id: response.original_sender_id, - payload: T::Response::from_envelope(response) - .ok_or_else(|| anyhow!("received response of the wrong type"))?, - received_at, - }) + return Err(RpcError::from_proto(&error, type_name)); } + Ok((response, received_at)) } } diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 3d1bba04fb..7a29f3be25 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -30,7 +30,7 @@ use std::{ time::{Duration, SystemTime}, }; use util::ResultExt; -use worktree::LocalSnapshot; +use worktree::Snapshot; pub use project_index_debug_view::ProjectIndexDebugView; @@ -583,9 +583,9 @@ impl WorktreeIndex { } fn index_entries_changed_on_disk(&self, cx: &AppContext) -> impl Future> { - let worktree = self.worktree.read(cx).as_local().unwrap().snapshot(); + let worktree = self.worktree.read(cx).snapshot(); let worktree_abs_path = worktree.abs_path().clone(); - let scan = self.scan_entries(worktree.clone(), cx); + let scan = self.scan_entries(worktree, cx); let chunk = self.chunk_files(worktree_abs_path, scan.updated_entries, cx); let embed = Self::embed_files(self.embedding_provider.clone(), chunk.files, cx); let persist = self.persist_embeddings(scan.deleted_entry_ranges, embed.files, cx); @@ -600,7 +600,7 @@ impl WorktreeIndex { updated_entries: UpdatedEntriesSet, cx: &AppContext, ) -> impl Future> { - let worktree = self.worktree.read(cx).as_local().unwrap().snapshot(); + let worktree = self.worktree.read(cx).snapshot(); let worktree_abs_path = worktree.abs_path().clone(); let scan = self.scan_updated_entries(worktree, updated_entries.clone(), cx); let chunk = self.chunk_files(worktree_abs_path, scan.updated_entries, cx); @@ -612,7 +612,7 @@ impl WorktreeIndex { } } - fn scan_entries(&self, worktree: LocalSnapshot, cx: &AppContext) -> ScanEntries { + fn scan_entries(&self, worktree: Snapshot, cx: &AppContext) -> ScanEntries { let (updated_entries_tx, updated_entries_rx) = channel::bounded(512); let (deleted_entry_ranges_tx, deleted_entry_ranges_rx) = channel::bounded(128); let db_connection = self.db_connection.clone(); @@ -692,7 +692,7 @@ impl WorktreeIndex { fn scan_updated_entries( &self, - worktree: LocalSnapshot, + worktree: Snapshot, updated_entries: UpdatedEntriesSet, cx: &AppContext, ) -> ScanEntries { diff --git a/crates/settings/src/settings_store.rs b/crates/settings/src/settings_store.rs index 6c9448f444..1384f8ddb6 100644 --- a/crates/settings/src/settings_store.rs +++ b/crates/settings/src/settings_store.rs @@ -632,8 +632,6 @@ impl SettingsStore { } // If the global settings file changed, reload the global value for the field. - project_settings_stack.clear(); - paths_stack.clear(); if changed_local_path.is_none() { if let Some(value) = setting_value .load_setting( @@ -653,6 +651,8 @@ impl SettingsStore { } // Reload the local values for the setting. + paths_stack.clear(); + project_settings_stack.clear(); for ((root_id, path), local_settings) in &self.raw_local_settings { // Build a stack of all of the local values for that setting. while let Some(prev_entry) = paths_stack.last() { diff --git a/crates/terminal_view/src/terminal_view.rs b/crates/terminal_view/src/terminal_view.rs index 0a2f5d863a..fda2a8b59e 100644 --- a/crates/terminal_view/src/terminal_view.rs +++ b/crates/terminal_view/src/terminal_view.rs @@ -1301,13 +1301,7 @@ mod tests { .unwrap(); let entry = cx - .update(|cx| { - wt.update(cx, |wt, cx| { - wt.as_local() - .unwrap() - .create_entry(Path::new(""), is_dir, cx) - }) - }) + .update(|cx| wt.update(cx, |wt, cx| wt.create_entry(Path::new(""), is_dir, cx))) .await .unwrap() .to_included() diff --git a/crates/util/src/paths.rs b/crates/util/src/paths.rs index f0fd8dc5b9..14aa1480ac 100644 --- a/crates/util/src/paths.rs +++ b/crates/util/src/paths.rs @@ -343,36 +343,40 @@ impl

PathLikeWithPosition

{ #[derive(Clone, Debug)] pub struct PathMatcher { - maybe_path: PathBuf, + source: String, glob: GlobMatcher, } impl std::fmt::Display for PathMatcher { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.maybe_path.to_string_lossy().fmt(f) + self.source.fmt(f) } } impl PartialEq for PathMatcher { fn eq(&self, other: &Self) -> bool { - self.maybe_path.eq(&other.maybe_path) + self.source.eq(&other.source) } } impl Eq for PathMatcher {} impl PathMatcher { - pub fn new(maybe_glob: &str) -> Result { + pub fn new(source: &str) -> Result { Ok(PathMatcher { - glob: Glob::new(maybe_glob)?.compile_matcher(), - maybe_path: PathBuf::from(maybe_glob), + glob: Glob::new(source)?.compile_matcher(), + source: String::from(source), }) } + pub fn source(&self) -> &str { + &self.source + } + pub fn is_match>(&self, other: P) -> bool { let other_path = other.as_ref(); - other_path.starts_with(&self.maybe_path) - || other_path.ends_with(&self.maybe_path) + other_path.starts_with(Path::new(&self.source)) + || other_path.ends_with(Path::new(&self.source)) || self.glob.is_match(other_path) || self.check_with_end_separator(other_path) } diff --git a/crates/worktree/Cargo.toml b/crates/worktree/Cargo.toml index fb6bad9b29..75136352fc 100644 --- a/crates/worktree/Cargo.toml +++ b/crates/worktree/Cargo.toml @@ -31,7 +31,6 @@ fuzzy.workspace = true git.workspace = true gpui.workspace = true ignore.workspace = true -itertools.workspace = true language.workspace = true log.workspace = true parking_lot.workspace = true diff --git a/crates/worktree/src/worktree.rs b/crates/worktree/src/worktree.rs index e24782bac6..2a03e4be59 100644 --- a/crates/worktree/src/worktree.rs +++ b/crates/worktree/src/worktree.rs @@ -13,6 +13,7 @@ use futures::{ mpsc::{self, UnboundedSender}, oneshot, }, + future::BoxFuture, select_biased, stream::select, task::Poll, @@ -29,14 +30,13 @@ use gpui::{ Task, }; use ignore::IgnoreStack; -use itertools::Itertools; use parking_lot::Mutex; use postage::{ barrier, prelude::{Sink as _, Stream as _}, watch, }; -use rpc::proto; +use rpc::proto::{self, EnvelopedMessage as _, RequestMessage}; use settings::{Settings, SettingsLocation, SettingsStore}; use smol::channel::{self, Sender}; use std::{ @@ -58,11 +58,7 @@ use std::{ }; use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap, TreeSet}; use text::{LineEnding, Rope}; -use util::{ - paths::{PathMatcher, HOME}, - ResultExt, -}; - +use util::{paths::HOME, ResultExt}; pub use worktree_settings::WorktreeSettings; #[cfg(feature = "test-support")] @@ -98,17 +94,25 @@ pub enum CreatedEntry { Excluded { abs_path: PathBuf }, } +pub struct LoadedFile { + pub file: Arc, + pub text: String, + pub diff_base: Option, +} + pub struct LocalWorktree { snapshot: LocalSnapshot, scan_requests_tx: channel::Sender, path_prefixes_to_scan_tx: channel::Sender>, is_scanning: (watch::Sender, watch::Receiver), _background_scanner_tasks: Vec>, - update_observer: Option, + update_observer: Option, fs: Arc, fs_case_sensitive: bool, visible: bool, next_entry_id: Arc, + settings: WorktreeSettings, + share_private_files: bool, } struct ScanRequest { @@ -119,13 +123,26 @@ struct ScanRequest { pub struct RemoteWorktree { snapshot: Snapshot, background_snapshot: Arc>, + project_id: u64, + client: Box, updates_tx: Option>, + update_observer: Arc< + Mutex BoxFuture<'static, bool>>>>, + >, snapshot_subscriptions: VecDeque<(usize, oneshot::Sender<()>)>, replica_id: ReplicaId, visible: bool, disconnected: bool, } +pub trait RemoteWorktreeClient { + fn request( + &self, + envelope: proto::Envelope, + request_type: &'static str, + ) -> BoxFuture<'static, Result>; +} + #[derive(Clone)] pub struct Snapshot { id: WorktreeId, @@ -275,9 +292,6 @@ pub struct LocalSnapshot { /// All of the git repositories in the worktree, indexed by the project entry /// id of their parent directory. git_repositories: TreeMap, - file_scan_exclusions: Vec, - private_files: Vec, - share_private_files: bool, } struct BackgroundScannerState { @@ -333,7 +347,7 @@ enum ScanState { }, } -struct ShareState { +struct UpdateObservationState { snapshots_tx: mpsc::UnboundedSender<(LocalSnapshot, UpdatedEntriesSet, UpdatedGitRepositoriesSet)>, resume_updates: watch::Sender<()>, @@ -346,6 +360,8 @@ pub enum Event { UpdatedGitRepositories(UpdatedGitRepositoriesSet), } +static EMPTY_PATH: &str = ""; + impl EventEmitter for Worktree {} impl Worktree { @@ -356,10 +372,7 @@ impl Worktree { next_entry_id: Arc, cx: &mut AsyncAppContext, ) -> Result> { - // After determining whether the root entry is a file or a directory, populate the - // snapshot's "root name", which will be used for the purpose of fuzzy matching. let abs_path = path.into(); - let metadata = fs .metadata(&abs_path) .await @@ -373,80 +386,34 @@ impl Worktree { }); cx.new_model(move |cx: &mut ModelContext| { + let worktree_id = cx.handle().entity_id().as_u64(); + let settings_location = Some(SettingsLocation { + worktree_id: worktree_id as usize, + path: Path::new(EMPTY_PATH), + }); + + let settings = WorktreeSettings::get(settings_location, cx).clone(); cx.observe_global::(move |this, cx| { if let Self::Local(this) = this { - let new_file_scan_exclusions = path_matchers( - WorktreeSettings::get_global(cx) - .file_scan_exclusions - .as_deref(), - "file_scan_exclusions", - ); - let new_private_files = path_matchers( - WorktreeSettings::get(Some(settings::SettingsLocation { - worktree_id: cx.handle().entity_id().as_u64() as usize, - path: Path::new("") - }), cx).private_files.as_deref(), - "private_files", - ); - - if new_file_scan_exclusions != this.snapshot.file_scan_exclusions - || new_private_files != this.snapshot.private_files - { - this.snapshot.file_scan_exclusions = new_file_scan_exclusions; - this.snapshot.private_files = new_private_files; - - log::info!( - "Re-scanning directories, new scan exclude files: {:?}, new dotenv files: {:?}", - this.snapshot - .file_scan_exclusions - .iter() - .map(ToString::to_string) - .collect::>(), - this.snapshot - .private_files - .iter() - .map(ToString::to_string) - .collect::>() - ); - + let settings = WorktreeSettings::get(settings_location, cx).clone(); + if settings != this.settings { + this.settings = settings; this.restart_background_scanners(cx); } } }) .detach(); - let root_name = abs_path - .file_name() - .map_or(String::new(), |f| f.to_string_lossy().to_string()); - let mut snapshot = LocalSnapshot { - file_scan_exclusions: path_matchers( - WorktreeSettings::get_global(cx) - .file_scan_exclusions - .as_deref(), - "file_scan_exclusions", - ), - private_files: path_matchers( - WorktreeSettings::get(Some(SettingsLocation { - worktree_id: cx.handle().entity_id().as_u64() as usize, - path: Path::new(""), - }), cx).private_files.as_deref(), - "private_files", - ), - share_private_files: false, ignores_by_parent_abs_path: Default::default(), git_repositories: Default::default(), - snapshot: Snapshot { - id: WorktreeId::from_usize(cx.entity_id().as_u64() as usize), - abs_path: abs_path.to_path_buf().into(), - root_name: root_name.clone(), - root_char_bag: root_name.chars().map(|c| c.to_ascii_lowercase()).collect(), - entries_by_path: Default::default(), - entries_by_id: Default::default(), - repository_entries: Default::default(), - scan_id: 1, - completed_scan_id: 0, - }, + snapshot: Snapshot::new( + cx.entity_id().as_u64(), + abs_path + .file_name() + .map_or(String::new(), |f| f.to_string_lossy().to_string()), + abs_path, + ), }; if let Some(metadata) = metadata { @@ -456,7 +423,7 @@ impl Worktree { &metadata, &next_entry_id, snapshot.root_char_bag, - None + None, ), fs.as_ref(), ); @@ -464,68 +431,71 @@ impl Worktree { let (scan_requests_tx, scan_requests_rx) = channel::unbounded(); let (path_prefixes_to_scan_tx, path_prefixes_to_scan_rx) = channel::unbounded(); - let task_snapshot = snapshot.clone(); - Worktree::Local(LocalWorktree { - next_entry_id: Arc::clone(&next_entry_id), + let mut worktree = LocalWorktree { + share_private_files: false, + next_entry_id, snapshot, is_scanning: watch::channel_with(true), update_observer: None, scan_requests_tx, path_prefixes_to_scan_tx, - _background_scanner_tasks: start_background_scan_tasks( - &abs_path, - task_snapshot, - scan_requests_rx, - path_prefixes_to_scan_rx, - Arc::clone(&next_entry_id), - Arc::clone(&fs), - cx, - ), + _background_scanner_tasks: Vec::new(), fs, fs_case_sensitive, visible, - }) + settings, + }; + worktree.start_background_scanner(scan_requests_rx, path_prefixes_to_scan_rx, cx); + Worktree::Local(worktree) }) } pub fn remote( + project_id: u64, replica_id: ReplicaId, worktree: proto::WorktreeMetadata, + client: Box, cx: &mut AppContext, ) -> Model { cx.new_model(|cx: &mut ModelContext| { - let snapshot = Snapshot { - id: WorktreeId(worktree.id as usize), - abs_path: Arc::from(PathBuf::from(worktree.abs_path)), - root_name: worktree.root_name.clone(), - root_char_bag: worktree - .root_name - .chars() - .map(|c| c.to_ascii_lowercase()) - .collect(), - entries_by_path: Default::default(), - entries_by_id: Default::default(), - repository_entries: Default::default(), - scan_id: 1, - completed_scan_id: 0, - }; + let snapshot = Snapshot::new( + worktree.id, + worktree.root_name, + Arc::from(PathBuf::from(worktree.abs_path)), + ); let (updates_tx, mut updates_rx) = mpsc::unbounded(); let background_snapshot = Arc::new(Mutex::new(snapshot.clone())); let (mut snapshot_updated_tx, mut snapshot_updated_rx) = watch::channel(); + let update_observer = Arc::new(Mutex::new(None)); + + let worktree = RemoteWorktree { + client, + project_id, + replica_id, + snapshot, + background_snapshot: background_snapshot.clone(), + update_observer: update_observer.clone(), + updates_tx: Some(updates_tx), + snapshot_subscriptions: Default::default(), + visible: worktree.visible, + disconnected: false, + }; cx.background_executor() - .spawn({ - let background_snapshot = background_snapshot.clone(); - async move { - while let Some(update) = updates_rx.next().await { - if let Err(error) = - background_snapshot.lock().apply_remote_update(update) - { - log::error!("error applying worktree update: {}", error); - } - snapshot_updated_tx.send(()).await.ok(); + .spawn(async move { + while let Some(update) = updates_rx.next().await { + let call = update_observer + .lock() + .as_mut() + .map(|observer| (observer)(update.clone())); + if let Some(call) = call { + call.await; } + if let Err(error) = background_snapshot.lock().apply_remote_update(update) { + log::error!("error applying worktree update: {}", error); + } + snapshot_updated_tx.send(()).await.ok(); } }) .detach(); @@ -551,15 +521,7 @@ impl Worktree { }) .detach(); - Worktree::Remote(RemoteWorktree { - replica_id, - snapshot: snapshot.clone(), - background_snapshot, - updates_tx: Some(updates_tx), - snapshot_subscriptions: Default::default(), - visible: worktree.visible, - disconnected: false, - }) + Worktree::Remote(worktree) }) } @@ -605,8 +567,8 @@ impl Worktree { pub fn snapshot(&self) -> Snapshot { match self { - Worktree::Local(worktree) => worktree.snapshot().snapshot, - Worktree::Remote(worktree) => worktree.snapshot(), + Worktree::Local(worktree) => worktree.snapshot.snapshot.clone(), + Worktree::Remote(worktree) => worktree.snapshot.clone(), } } @@ -617,6 +579,15 @@ impl Worktree { } } + pub fn metadata_proto(&self) -> proto::WorktreeMetadata { + proto::WorktreeMetadata { + id: self.id().to_proto(), + root_name: self.root_name().to_string(), + visible: self.is_visible(), + abs_path: self.abs_path().as_os_str().to_string_lossy().into(), + } + } + pub fn completed_scan_id(&self) -> usize { match self { Worktree::Local(worktree) => worktree.snapshot.completed_scan_id, @@ -649,101 +620,351 @@ impl Worktree { let entry = self.root_entry()?; Some(File::for_entry(entry.clone(), cx.handle())) } -} -fn start_background_scan_tasks( - abs_path: &Path, - snapshot: LocalSnapshot, - scan_requests_rx: channel::Receiver, - path_prefixes_to_scan_rx: channel::Receiver>, - next_entry_id: Arc, - fs: Arc, - cx: &mut ModelContext<'_, Worktree>, -) -> Vec> { - let (scan_states_tx, mut scan_states_rx) = mpsc::unbounded(); - let background_scanner = cx.background_executor().spawn({ - let abs_path = if cfg!(target_os = "windows") { - abs_path - .canonicalize() - .unwrap_or_else(|_| abs_path.to_path_buf()) - } else { - abs_path.to_path_buf() - }; - let background = cx.background_executor().clone(); - async move { - let (events, watcher) = fs.watch(&abs_path, FS_WATCH_LATENCY).await; - let case_sensitive = fs.is_case_sensitive().await.unwrap_or_else(|e| { - log::error!("Failed to determine whether filesystem is case sensitive: {e:#}"); - true - }); - - let mut scanner = BackgroundScanner { - fs, - fs_case_sensitive: case_sensitive, - status_updates_tx: scan_states_tx, - executor: background, - scan_requests_rx, - path_prefixes_to_scan_rx, - next_entry_id, - state: Mutex::new(BackgroundScannerState { - prev_snapshot: snapshot.snapshot.clone(), - snapshot, - scanned_dirs: Default::default(), - path_prefixes_to_scan: Default::default(), - paths_to_scan: Default::default(), - removed_entry_ids: Default::default(), - changed_paths: Default::default(), - }), - phase: BackgroundScannerPhase::InitialScan, - watcher, - }; - - scanner.run(events).await; + pub fn observe_updates( + &mut self, + project_id: u64, + cx: &mut ModelContext, + callback: F, + ) where + F: 'static + Send + Fn(proto::UpdateWorktree) -> Fut, + Fut: 'static + Send + Future, + { + match self { + Worktree::Local(this) => this.observe_updates(project_id, cx, callback), + Worktree::Remote(this) => { + this.update_observer + .lock() + .replace(Box::new(move |update| callback(update).boxed())); + } } - }); - let scan_state_updater = cx.spawn(|this, mut cx| async move { - while let Some((state, this)) = scan_states_rx.next().await.zip(this.upgrade()) { - this.update(&mut cx, |this, cx| { - let this = this.as_local_mut().unwrap(); - match state { - ScanState::Started => { - *this.is_scanning.0.borrow_mut() = true; - } - ScanState::Updated { - snapshot, - changes, - barrier, - scanning, - } => { - *this.is_scanning.0.borrow_mut() = scanning; - this.set_snapshot(snapshot, changes, cx); - drop(barrier); - } - } - cx.notify(); - }) - .ok(); - } - }); - vec![background_scanner, scan_state_updater] -} + } -fn path_matchers(values: Option<&[String]>, context: &'static str) -> Vec { - values - .unwrap_or(&[]) - .iter() - .sorted() - .filter_map(|pattern| { - PathMatcher::new(pattern) - .map(Some) - .unwrap_or_else(|e| { - log::error!( - "Skipping pattern {pattern} in `{}` project settings due to parsing error: {e:#}", context - ); - None + pub fn stop_observing_updates(&mut self) { + match self { + Worktree::Local(this) => { + this.update_observer.take(); + } + Worktree::Remote(this) => { + this.update_observer.lock().take(); + } + } + } + + #[cfg(any(test, feature = "test-support"))] + pub fn has_update_observer(&self) -> bool { + match self { + Worktree::Local(this) => this.update_observer.is_some(), + Worktree::Remote(this) => this.update_observer.lock().is_some(), + } + } + + pub fn load_file( + &self, + path: &Path, + cx: &mut ModelContext, + ) -> Task> { + match self { + Worktree::Local(this) => this.load_file(path, cx), + Worktree::Remote(_) => { + Task::ready(Err(anyhow!("remote worktrees can't yet load files"))) + } + } + } + + pub fn write_file( + &self, + path: &Path, + text: Rope, + line_ending: LineEnding, + cx: &mut ModelContext, + ) -> Task>> { + match self { + Worktree::Local(this) => this.write_file(path, text, line_ending, cx), + Worktree::Remote(_) => { + Task::ready(Err(anyhow!("remote worktree can't yet write files"))) + } + } + } + + pub fn create_entry( + &mut self, + path: impl Into>, + is_directory: bool, + cx: &mut ModelContext, + ) -> Task> { + let path = path.into(); + let worktree_id = self.id(); + match self { + Worktree::Local(this) => this.create_entry(path, is_directory, cx), + Worktree::Remote(this) => { + let project_id = this.project_id; + let request = this.rpc_request(proto::CreateProjectEntry { + worktree_id: worktree_id.to_proto(), + project_id, + path: path.to_string_lossy().into(), + is_directory, + }); + cx.spawn(move |this, mut cx| async move { + let response = request.await?; + match response.entry { + Some(entry) => this + .update(&mut cx, |worktree, cx| { + worktree.as_remote_mut().unwrap().insert_entry( + entry, + response.worktree_scan_id as usize, + cx, + ) + })? + .await + .map(CreatedEntry::Included), + None => { + let abs_path = this.update(&mut cx, |worktree, _| { + worktree + .absolutize(&path) + .with_context(|| format!("absolutizing {path:?}")) + })??; + Ok(CreatedEntry::Excluded { abs_path }) + } + } }) + } + } + } + + pub fn delete_entry( + &mut self, + entry_id: ProjectEntryId, + trash: bool, + cx: &mut ModelContext, + ) -> Option>> { + match self { + Worktree::Local(this) => this.delete_entry(entry_id, trash, cx), + Worktree::Remote(this) => { + let response = this.rpc_request(proto::DeleteProjectEntry { + project_id: this.project_id, + entry_id: entry_id.to_proto(), + use_trash: trash, + }); + Some(cx.spawn(move |this, mut cx| async move { + let response = response.await?; + this.update(&mut cx, move |worktree, cx| { + worktree.as_remote_mut().unwrap().delete_entry( + entry_id, + response.worktree_scan_id as usize, + cx, + ) + })? + .await + })) + } + } + } + + pub fn rename_entry( + &mut self, + entry_id: ProjectEntryId, + new_path: impl Into>, + cx: &mut ModelContext, + ) -> Task> { + let new_path = new_path.into(); + match self { + Worktree::Local(this) => this.rename_entry(entry_id, new_path, cx), + Worktree::Remote(this) => { + let response = this.rpc_request(proto::RenameProjectEntry { + project_id: this.project_id, + entry_id: entry_id.to_proto(), + new_path: new_path.to_string_lossy().into(), + }); + cx.spawn(move |this, mut cx| async move { + let response = response.await?; + match response.entry { + Some(entry) => this + .update(&mut cx, |this, cx| { + this.as_remote_mut().unwrap().insert_entry( + entry, + response.worktree_scan_id as usize, + cx, + ) + })? + .await + .map(CreatedEntry::Included), + None => { + let abs_path = this.update(&mut cx, |worktree, _| { + worktree + .absolutize(&new_path) + .with_context(|| format!("absolutizing {new_path:?}")) + })??; + Ok(CreatedEntry::Excluded { abs_path }) + } + } + }) + } + } + } + + pub fn copy_entry( + &mut self, + entry_id: ProjectEntryId, + new_path: impl Into>, + cx: &mut ModelContext, + ) -> Task>> { + let new_path = new_path.into(); + match self { + Worktree::Local(this) => this.copy_entry(entry_id, new_path, cx), + Worktree::Remote(this) => { + let response = this.rpc_request(proto::CopyProjectEntry { + project_id: this.project_id, + entry_id: entry_id.to_proto(), + new_path: new_path.to_string_lossy().into(), + }); + cx.spawn(move |this, mut cx| async move { + let response = response.await?; + match response.entry { + Some(entry) => this + .update(&mut cx, |worktree, cx| { + worktree.as_remote_mut().unwrap().insert_entry( + entry, + response.worktree_scan_id as usize, + cx, + ) + })? + .await + .map(Some), + None => Ok(None), + } + }) + } + } + } + + pub fn expand_entry( + &mut self, + entry_id: ProjectEntryId, + cx: &mut ModelContext, + ) -> Option>> { + match self { + Worktree::Local(this) => this.expand_entry(entry_id, cx), + Worktree::Remote(this) => { + let response = this.rpc_request(proto::ExpandProjectEntry { + project_id: this.project_id, + entry_id: entry_id.to_proto(), + }); + Some(cx.spawn(move |this, mut cx| async move { + let response = response.await?; + this.update(&mut cx, |this, _| { + this.as_remote_mut() + .unwrap() + .wait_for_snapshot(response.worktree_scan_id as usize) + })? + .await?; + Ok(()) + })) + } + } + } + + pub async fn handle_create_entry( + this: Model, + request: proto::CreateProjectEntry, + mut cx: AsyncAppContext, + ) -> Result { + let (scan_id, entry) = this.update(&mut cx, |this, cx| { + ( + this.scan_id(), + this.create_entry(PathBuf::from(request.path), request.is_directory, cx), + ) + })?; + Ok(proto::ProjectEntryResponse { + entry: match &entry.await? { + CreatedEntry::Included(entry) => Some(entry.into()), + CreatedEntry::Excluded { .. } => None, + }, + worktree_scan_id: scan_id as u64, }) - .collect() + } + + pub async fn handle_delete_entry( + this: Model, + request: proto::DeleteProjectEntry, + mut cx: AsyncAppContext, + ) -> Result { + let (scan_id, task) = this.update(&mut cx, |this, cx| { + ( + this.scan_id(), + this.delete_entry( + ProjectEntryId::from_proto(request.entry_id), + request.use_trash, + cx, + ), + ) + })?; + task.ok_or_else(|| anyhow!("invalid entry"))?.await?; + Ok(proto::ProjectEntryResponse { + entry: None, + worktree_scan_id: scan_id as u64, + }) + } + + pub async fn handle_expand_entry( + this: Model, + request: proto::ExpandProjectEntry, + mut cx: AsyncAppContext, + ) -> Result { + let task = this.update(&mut cx, |this, cx| { + this.expand_entry(ProjectEntryId::from_proto(request.entry_id), cx) + })?; + task.ok_or_else(|| anyhow!("no such entry"))?.await?; + let scan_id = this.read_with(&cx, |this, _| this.scan_id())?; + Ok(proto::ExpandProjectEntryResponse { + worktree_scan_id: scan_id as u64, + }) + } + + pub async fn handle_rename_entry( + this: Model, + request: proto::RenameProjectEntry, + mut cx: AsyncAppContext, + ) -> Result { + let (scan_id, task) = this.update(&mut cx, |this, cx| { + ( + this.scan_id(), + this.rename_entry( + ProjectEntryId::from_proto(request.entry_id), + PathBuf::from(request.new_path), + cx, + ), + ) + })?; + Ok(proto::ProjectEntryResponse { + entry: match &task.await? { + CreatedEntry::Included(entry) => Some(entry.into()), + CreatedEntry::Excluded { .. } => None, + }, + worktree_scan_id: scan_id as u64, + }) + } + + pub async fn handle_copy_entry( + this: Model, + request: proto::CopyProjectEntry, + mut cx: AsyncAppContext, + ) -> Result { + let (scan_id, task) = this.update(&mut cx, |this, cx| { + ( + this.scan_id(), + this.copy_entry( + ProjectEntryId::from_proto(request.entry_id), + PathBuf::from(request.new_path), + cx, + ), + ) + })?; + Ok(proto::ProjectEntryResponse { + entry: task.await?.as_ref().map(|e| e.into()), + worktree_scan_id: scan_id as u64, + }) + } } impl LocalWorktree { @@ -751,32 +972,108 @@ impl LocalWorktree { path.starts_with(&self.abs_path) } + pub fn is_path_private(&self, path: &Path) -> bool { + !self.share_private_files && self.settings.is_path_private(path) + } + fn restart_background_scanners(&mut self, cx: &mut ModelContext) { let (scan_requests_tx, scan_requests_rx) = channel::unbounded(); let (path_prefixes_to_scan_tx, path_prefixes_to_scan_rx) = channel::unbounded(); self.scan_requests_tx = scan_requests_tx; self.path_prefixes_to_scan_tx = path_prefixes_to_scan_tx; - self._background_scanner_tasks = start_background_scan_tasks( - &self.snapshot.abs_path, - self.snapshot(), - scan_requests_rx, - path_prefixes_to_scan_rx, - Arc::clone(&self.next_entry_id), - Arc::clone(&self.fs), - cx, - ); + self.start_background_scanner(scan_requests_rx, path_prefixes_to_scan_rx, cx); + } + + fn start_background_scanner( + &mut self, + scan_requests_rx: channel::Receiver, + path_prefixes_to_scan_rx: channel::Receiver>, + cx: &mut ModelContext, + ) { + let snapshot = self.snapshot(); + let share_private_files = self.share_private_files; + let next_entry_id = self.next_entry_id.clone(); + let fs = self.fs.clone(); + let settings = self.settings.clone(); + let (scan_states_tx, mut scan_states_rx) = mpsc::unbounded(); + let background_scanner = cx.background_executor().spawn({ + let abs_path = &snapshot.abs_path; + let abs_path = if cfg!(target_os = "windows") { + abs_path + .canonicalize() + .unwrap_or_else(|_| abs_path.to_path_buf()) + } else { + abs_path.to_path_buf() + }; + let background = cx.background_executor().clone(); + async move { + let (events, watcher) = fs.watch(&abs_path, FS_WATCH_LATENCY).await; + let fs_case_sensitive = fs.is_case_sensitive().await.unwrap_or_else(|e| { + log::error!("Failed to determine whether filesystem is case sensitive: {e:#}"); + true + }); + + let mut scanner = BackgroundScanner { + fs, + fs_case_sensitive, + status_updates_tx: scan_states_tx, + executor: background, + scan_requests_rx, + path_prefixes_to_scan_rx, + next_entry_id, + state: Mutex::new(BackgroundScannerState { + prev_snapshot: snapshot.snapshot.clone(), + snapshot, + scanned_dirs: Default::default(), + path_prefixes_to_scan: Default::default(), + paths_to_scan: Default::default(), + removed_entry_ids: Default::default(), + changed_paths: Default::default(), + }), + phase: BackgroundScannerPhase::InitialScan, + share_private_files, + settings, + watcher, + }; + + scanner.run(events).await; + } + }); + let scan_state_updater = cx.spawn(|this, mut cx| async move { + while let Some((state, this)) = scan_states_rx.next().await.zip(this.upgrade()) { + this.update(&mut cx, |this, cx| { + let this = this.as_local_mut().unwrap(); + match state { + ScanState::Started => { + *this.is_scanning.0.borrow_mut() = true; + } + ScanState::Updated { + snapshot, + changes, + barrier, + scanning, + } => { + *this.is_scanning.0.borrow_mut() = scanning; + this.set_snapshot(snapshot, changes, cx); + drop(barrier); + } + } + cx.notify(); + }) + .ok(); + } + }); + self._background_scanner_tasks = vec![background_scanner, scan_state_updater]; self.is_scanning = watch::channel_with(true); } fn set_snapshot( &mut self, - mut new_snapshot: LocalSnapshot, + new_snapshot: LocalSnapshot, entry_changes: UpdatedEntriesSet, cx: &mut ModelContext, ) { let repo_changes = self.changed_repos(&self.snapshot, &new_snapshot); - - new_snapshot.share_private_files = self.snapshot.share_private_files; self.snapshot = new_snapshot; if let Some(share) = self.update_observer.as_mut() { @@ -911,24 +1208,25 @@ impl LocalWorktree { self.snapshot.clone() } - pub fn metadata_proto(&self) -> proto::WorktreeMetadata { - proto::WorktreeMetadata { - id: self.id().to_proto(), - root_name: self.root_name().to_string(), - visible: self.visible, - abs_path: self.abs_path().as_os_str().to_string_lossy().into(), - } + pub fn settings(&self) -> WorktreeSettings { + self.settings.clone() } - pub fn load_file( - &self, - path: &Path, - cx: &mut ModelContext, - ) -> Task)>> { + pub fn local_git_repo(&self, path: &Path) -> Option> { + self.repo_for_path(path) + .map(|(_, entry)| entry.repo_ptr.clone()) + } + + pub fn get_local_repo(&self, repo: &RepositoryEntry) -> Option<&LocalRepositoryEntry> { + self.git_repositories.get(&repo.work_directory.0) + } + + fn load_file(&self, path: &Path, cx: &mut ModelContext) -> Task> { let path = Arc::from(path); let abs_path = self.absolutize(&path); let fs = self.fs.clone(); let entry = self.refresh_entry(path.clone(), None, cx); + let is_private = self.is_path_private(path.as_ref()); cx.spawn(|this, mut cx| async move { let abs_path = abs_path?; @@ -943,7 +1241,7 @@ impl LocalWorktree { let fs = fs.clone(); let abs_path = abs_path.clone(); async move { - let abs_path_metadata = fs + let metadata = fs .metadata(&abs_path) .await .with_context(|| { @@ -951,7 +1249,7 @@ impl LocalWorktree { }) .log_err() .flatten()?; - if abs_path_metadata.is_dir || abs_path_metadata.is_symlink { + if metadata.is_dir || metadata.is_symlink { None } else { git_repo.load_index_text(&repo_path) @@ -971,20 +1269,8 @@ impl LocalWorktree { let worktree = this .upgrade() .ok_or_else(|| anyhow!("worktree was dropped"))?; - match entry.await? { - Some(entry) => Ok(( - File { - entry_id: Some(entry.id), - worktree, - path: entry.path, - mtime: entry.mtime, - is_local: true, - is_deleted: false, - is_private: entry.is_private, - }, - text, - diff_base, - )), + let file = match entry.await? { + Some(entry) => File::for_entry(entry, worktree), None => { let metadata = fs .metadata(&abs_path) @@ -995,22 +1281,23 @@ impl LocalWorktree { .with_context(|| { format!("Excluded file {abs_path:?} got removed during loading") })?; - let is_private = snapshot.is_path_private(path.as_ref()); - Ok(( - File { - entry_id: None, - worktree, - path, - mtime: Some(metadata.mtime), - is_local: true, - is_deleted: false, - is_private, - }, - text, - diff_base, - )) + Arc::new(File { + entry_id: None, + worktree, + path, + mtime: Some(metadata.mtime), + is_local: true, + is_deleted: false, + is_private, + }) } - } + }; + + Ok(LoadedFile { + file, + text, + diff_base, + }) }) } @@ -1027,7 +1314,7 @@ impl LocalWorktree { lowest_ancestor.unwrap_or_else(|| PathBuf::from("")) } - pub fn create_entry( + fn create_entry( &self, path: impl Into>, is_dir: bool, @@ -1038,7 +1325,7 @@ impl LocalWorktree { Ok(path) => path, Err(e) => return Task::ready(Err(e.context(format!("absolutizing path {path:?}")))), }; - let path_excluded = self.is_path_excluded(&abs_path); + let path_excluded = self.settings.is_path_excluded(&abs_path); let fs = self.fs.clone(); let task_abs_path = abs_path.clone(); let write = cx.background_executor().spawn(async move { @@ -1091,30 +1378,62 @@ impl LocalWorktree { }) } - pub fn write_file( + fn write_file( &self, path: impl Into>, text: Rope, line_ending: LineEnding, cx: &mut ModelContext, - ) -> Task>> { - let path: Arc = path.into(); - let abs_path = self.absolutize(&path); + ) -> Task>> { + let path = path.into(); let fs = self.fs.clone(); - let write = cx - .background_executor() - .spawn(async move { fs.save(&abs_path?, &text, line_ending).await }); + let is_private = self.is_path_private(&path); + let Ok(abs_path) = self.absolutize(&path) else { + return Task::ready(Err(anyhow!("invalid path {path:?}"))); + }; - cx.spawn(|this, mut cx| async move { + let write = cx.background_executor().spawn({ + let fs = fs.clone(); + let abs_path = abs_path.clone(); + async move { fs.save(&abs_path, &text, line_ending).await } + }); + + cx.spawn(move |this, mut cx| async move { write.await?; - this.update(&mut cx, |this, cx| { - this.as_local_mut().unwrap().refresh_entry(path, None, cx) - })? - .await + let entry = this + .update(&mut cx, |this, cx| { + this.as_local_mut() + .unwrap() + .refresh_entry(path.clone(), None, cx) + })? + .await?; + let worktree = this.upgrade().ok_or_else(|| anyhow!("worktree dropped"))?; + if let Some(entry) = entry { + Ok(File::for_entry(entry, worktree)) + } else { + let metadata = fs + .metadata(&abs_path) + .await + .with_context(|| { + format!("Fetching metadata after saving the excluded buffer {abs_path:?}") + })? + .with_context(|| { + format!("Excluded buffer {path:?} got removed during saving") + })?; + Ok(Arc::new(File { + worktree, + path, + mtime: Some(metadata.mtime), + entry_id: None, + is_local: true, + is_deleted: false, + is_private, + })) + } }) } - pub fn delete_entry( + fn delete_entry( &self, entry_id: ProjectEntryId, trash: bool, @@ -1168,7 +1487,7 @@ impl LocalWorktree { })) } - pub fn rename_entry( + fn rename_entry( &self, entry_id: ProjectEntryId, new_path: impl Into>, @@ -1225,7 +1544,7 @@ impl LocalWorktree { }) } - pub fn copy_entry( + fn copy_entry( &self, entry_id: ProjectEntryId, new_path: impl Into>, @@ -1260,7 +1579,7 @@ impl LocalWorktree { }) } - pub fn expand_entry( + fn expand_entry( &mut self, entry_id: ProjectEntryId, cx: &mut ModelContext, @@ -1273,7 +1592,7 @@ impl LocalWorktree { })) } - pub fn refresh_entries_for_paths(&self, paths: Vec>) -> barrier::Receiver { + fn refresh_entries_for_paths(&self, paths: Vec>) -> barrier::Receiver { let (tx, rx) = barrier::channel(); self.scan_requests_tx .try_send(ScanRequest { @@ -1294,7 +1613,7 @@ impl LocalWorktree { old_path: Option>, cx: &mut ModelContext, ) -> Task>> { - if self.is_path_excluded(&path) { + if self.settings.is_path_excluded(&path) { return Task::ready(Ok(None)); } let paths = if let Some(old_path) = old_path.as_ref() { @@ -1316,7 +1635,7 @@ impl LocalWorktree { }) } - pub fn observe_updates( + fn observe_updates( &mut self, project_id: u64, cx: &mut ModelContext, @@ -1373,39 +1692,38 @@ impl LocalWorktree { Some(()) }); - self.update_observer = Some(ShareState { + self.update_observer = Some(UpdateObservationState { snapshots_tx, resume_updates: resume_updates_tx, _maintain_remote_snapshot, }); } - pub fn stop_observing_updates(&mut self) { - self.update_observer.take(); - } - - #[cfg(any(test, feature = "test-support"))] - pub fn has_update_observer(&self) -> bool { - self.update_observer.is_some() - } - pub fn share_private_files(&mut self, cx: &mut ModelContext) { - self.snapshot.share_private_files = true; + self.share_private_files = true; self.restart_background_scanners(cx); } } impl RemoteWorktree { - fn snapshot(&self) -> Snapshot { - self.snapshot.clone() - } - pub fn disconnected_from_host(&mut self) { self.updates_tx.take(); self.snapshot_subscriptions.clear(); self.disconnected = true; } + fn rpc_request( + &self, + request: T, + ) -> impl Future> { + let envelope = request.into_envelope(0, None, None); + let response = self.client.request(envelope, T::NAME); + async move { + T::Response::from_envelope(response.await?) + .ok_or_else(|| anyhow!("received response of the wrong type")) + } + } + pub fn update_from_remote(&mut self, update: proto::UpdateWorktree) { if let Some(updates_tx) = &self.updates_tx { updates_tx @@ -1439,7 +1757,7 @@ impl RemoteWorktree { } } - pub fn insert_entry( + fn insert_entry( &mut self, entry: proto::Entry, scan_id: usize, @@ -1458,7 +1776,7 @@ impl RemoteWorktree { }) } - pub fn delete_entry( + fn delete_entry( &mut self, id: ProjectEntryId, scan_id: usize, @@ -1479,6 +1797,20 @@ impl RemoteWorktree { } impl Snapshot { + pub fn new(id: u64, root_name: String, abs_path: Arc) -> Self { + Snapshot { + id: WorktreeId::from_usize(id as usize), + abs_path, + root_char_bag: root_name.chars().map(|c| c.to_ascii_lowercase()).collect(), + root_name, + entries_by_path: Default::default(), + entries_by_id: Default::default(), + repository_entries: Default::default(), + scan_id: 1, + completed_scan_id: 0, + } + } + pub fn id(&self) -> WorktreeId { self.id } @@ -1881,21 +2213,12 @@ impl Snapshot { } impl LocalSnapshot { - pub fn get_local_repo(&self, repo: &RepositoryEntry) -> Option<&LocalRepositoryEntry> { - self.git_repositories.get(&repo.work_directory.0) - } - pub fn repo_for_path(&self, path: &Path) -> Option<(RepositoryEntry, &LocalRepositoryEntry)> { let (_, repo_entry) = self.repository_and_work_directory_for_path(path)?; let work_directory_id = repo_entry.work_directory_id(); Some((repo_entry, self.git_repositories.get(&work_directory_id)?)) } - pub fn local_git_repo(&self, path: &Path) -> Option> { - self.repo_for_path(path) - .map(|(_, entry)| entry.repo_ptr.clone()) - } - fn build_update( &self, project_id: u64, @@ -2162,25 +2485,6 @@ impl LocalSnapshot { paths.sort_by(|a, b| a.0.cmp(b.0)); paths } - - pub fn is_path_private(&self, path: &Path) -> bool { - if self.share_private_files { - return false; - } - path.ancestors().any(|ancestor| { - self.private_files - .iter() - .any(|exclude_matcher| exclude_matcher.is_match(&ancestor)) - }) - } - - pub fn is_path_excluded(&self, path: &Path) -> bool { - path.ancestors().any(|path| { - self.file_scan_exclusions - .iter() - .any(|exclude_matcher| exclude_matcher.is_match(&path)) - }) - } } impl BackgroundScannerState { @@ -2962,6 +3266,8 @@ struct BackgroundScanner { next_entry_id: Arc, phase: BackgroundScannerPhase, watcher: Arc, + settings: WorktreeSettings, + share_private_files: bool, } #[derive(PartialEq)] @@ -3205,7 +3511,7 @@ impl BackgroundScanner { return false; } - if snapshot.is_path_excluded(&relative_path) { + if self.settings.is_path_excluded(&relative_path) { if !is_git_related { log::debug!("ignoring FS event for excluded path {relative_path:?}"); } @@ -3377,7 +3683,7 @@ impl BackgroundScanner { let root_char_bag; { let snapshot = &self.state.lock().snapshot; - if snapshot.is_path_excluded(&job.path) { + if self.settings.is_path_excluded(&job.path) { log::error!("skipping excluded directory {:?}", job.path); return Ok(()); } @@ -3462,13 +3768,10 @@ impl BackgroundScanner { } } - { - let mut state = self.state.lock(); - if state.snapshot.is_path_excluded(&child_path) { - log::debug!("skipping excluded child entry {child_path:?}"); - state.remove_path(&child_path); - continue; - } + if self.settings.is_path_excluded(&child_path) { + log::debug!("skipping excluded child entry {child_path:?}"); + self.state.lock().remove_path(&child_path); + continue; } let child_metadata = match self.fs.metadata(&child_abs_path).await { @@ -3561,12 +3864,10 @@ impl BackgroundScanner { { let relative_path = job.path.join(child_name); - let state = self.state.lock(); - if state.snapshot.is_path_private(&relative_path) { + if self.is_path_private(&relative_path) { log::debug!("detected private file: {relative_path:?}"); child_entry.is_private = true; } - drop(state) } new_entries.push(child_entry); @@ -3676,7 +3977,7 @@ impl BackgroundScanner { let is_dir = fs_entry.is_dir(); fs_entry.is_ignored = ignore_stack.is_abs_path_ignored(&abs_path, is_dir); fs_entry.is_external = !canonical_path.starts_with(&root_canonical_path); - fs_entry.is_private = state.snapshot.is_path_private(path); + fs_entry.is_private = self.is_path_private(path); if !is_dir && !fs_entry.is_ignored && !fs_entry.is_external { if let Some((repo_entry, repo)) = state.snapshot.repo_for_path(path) { @@ -3971,7 +4272,7 @@ impl BackgroundScanner { ids_to_preserve.insert(work_directory_id); } else { let git_dir_abs_path = snapshot.abs_path().join(&entry.git_dir_path); - let git_dir_excluded = snapshot.is_path_excluded(&entry.git_dir_path); + let git_dir_excluded = self.settings.is_path_excluded(&entry.git_dir_path); if git_dir_excluded && !matches!( smol::block_on(self.fs.metadata(&git_dir_abs_path)), @@ -4209,6 +4510,10 @@ impl BackgroundScanner { smol::Timer::after(FS_WATCH_LATENCY).await; } + + fn is_path_private(&self, path: &Path) -> bool { + !self.share_private_files && self.settings.is_path_private(path) + } } fn char_bag_for_path(root_char_bag: CharBag, path: &Path) -> CharBag { diff --git a/crates/worktree/src/worktree_settings.rs b/crates/worktree/src/worktree_settings.rs index 3941dc0ffb..eeb225818e 100644 --- a/crates/worktree/src/worktree_settings.rs +++ b/crates/worktree/src/worktree_settings.rs @@ -1,10 +1,37 @@ +use std::{path::Path, sync::Arc}; + use gpui::AppContext; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use settings::{Settings, SettingsSources}; +use util::paths::PathMatcher; + +#[derive(Clone, PartialEq, Eq)] +pub struct WorktreeSettings { + pub file_scan_exclusions: Arc<[PathMatcher]>, + pub private_files: Arc<[PathMatcher]>, +} + +impl WorktreeSettings { + pub fn is_path_private(&self, path: &Path) -> bool { + path.ancestors().any(|ancestor| { + self.private_files + .iter() + .any(|matcher| matcher.is_match(&ancestor)) + }) + } + + pub fn is_path_excluded(&self, path: &Path) -> bool { + path.ancestors().any(|ancestor| { + self.file_scan_exclusions + .iter() + .any(|matcher| matcher.is_match(&ancestor)) + }) + } +} #[derive(Clone, Default, Serialize, Deserialize, JsonSchema)] -pub struct WorktreeSettings { +pub struct WorktreeSettingsContent { /// Completely ignore files matching globs from `file_scan_exclusions` /// /// Default: [ @@ -28,12 +55,37 @@ pub struct WorktreeSettings { impl Settings for WorktreeSettings { const KEY: Option<&'static str> = None; - type FileContent = Self; + type FileContent = WorktreeSettingsContent; fn load( sources: SettingsSources, _: &mut AppContext, ) -> anyhow::Result { - sources.json_merge() + let result: WorktreeSettingsContent = sources.json_merge()?; + let mut file_scan_exclusions = result.file_scan_exclusions.unwrap_or_default(); + let mut private_files = result.private_files.unwrap_or_default(); + file_scan_exclusions.sort(); + private_files.sort(); + Ok(Self { + file_scan_exclusions: path_matchers(&file_scan_exclusions, "file_scan_exclusions"), + private_files: path_matchers(&private_files, "private_files"), + }) } } + +fn path_matchers(values: &[String], context: &'static str) -> Arc<[PathMatcher]> { + values + .iter() + .filter_map(|pattern| { + PathMatcher::new(pattern) + .map(Some) + .unwrap_or_else(|e| { + log::error!( + "Skipping pattern {pattern} in `{}` project settings due to parsing error: {e:#}", context + ); + None + }) + }) + .collect::>() + .into() +} diff --git a/crates/worktree/src/worktree_tests.rs b/crates/worktree/src/worktree_tests.rs index 14948eca82..35b1cd043c 100644 --- a/crates/worktree/src/worktree_tests.rs +++ b/crates/worktree/src/worktree_tests.rs @@ -453,11 +453,9 @@ async fn test_open_gitignored_files(cx: &mut TestAppContext) { // Open a file that is nested inside of a gitignored directory that // has not yet been expanded. let prev_read_dir_count = fs.read_dir_call_count(); - let (file, _, _) = tree + let loaded = tree .update(cx, |tree, cx| { - tree.as_local_mut() - .unwrap() - .load_file("one/node_modules/b/b1.js".as_ref(), cx) + tree.load_file("one/node_modules/b/b1.js".as_ref(), cx) }) .await .unwrap(); @@ -483,7 +481,10 @@ async fn test_open_gitignored_files(cx: &mut TestAppContext) { ] ); - assert_eq!(file.path.as_ref(), Path::new("one/node_modules/b/b1.js")); + assert_eq!( + loaded.file.path.as_ref(), + Path::new("one/node_modules/b/b1.js") + ); // Only the newly-expanded directories are scanned. assert_eq!(fs.read_dir_call_count() - prev_read_dir_count, 2); @@ -492,11 +493,9 @@ async fn test_open_gitignored_files(cx: &mut TestAppContext) { // Open another file in a different subdirectory of the same // gitignored directory. let prev_read_dir_count = fs.read_dir_call_count(); - let (file, _, _) = tree + let loaded = tree .update(cx, |tree, cx| { - tree.as_local_mut() - .unwrap() - .load_file("one/node_modules/a/a2.js".as_ref(), cx) + tree.load_file("one/node_modules/a/a2.js".as_ref(), cx) }) .await .unwrap(); @@ -524,7 +523,10 @@ async fn test_open_gitignored_files(cx: &mut TestAppContext) { ] ); - assert_eq!(file.path.as_ref(), Path::new("one/node_modules/a/a2.js")); + assert_eq!( + loaded.file.path.as_ref(), + Path::new("one/node_modules/a/a2.js") + ); // Only the newly-expanded directory is scanned. assert_eq!(fs.read_dir_call_count() - prev_read_dir_count, 1); @@ -844,7 +846,7 @@ async fn test_write_file(cx: &mut TestAppContext) { tree.flush_fs_events(cx).await; tree.update(cx, |tree, cx| { - tree.as_local().unwrap().write_file( + tree.write_file( Path::new("tracked-dir/file.txt"), "hello".into(), Default::default(), @@ -854,7 +856,7 @@ async fn test_write_file(cx: &mut TestAppContext) { .await .unwrap(); tree.update(cx, |tree, cx| { - tree.as_local().unwrap().write_file( + tree.write_file( Path::new("ignored-dir/file.txt"), "world".into(), Default::default(),