diff --git a/Cargo.lock b/Cargo.lock index 5c9408f013..0a2cf5ad04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3456,6 +3456,7 @@ dependencies = [ "async-trait", "client", "clock", + "collections", "fsevent", "futures", "fuzzy", diff --git a/crates/project/Cargo.toml b/crates/project/Cargo.toml index fce637c8e7..6d5e2790ef 100644 --- a/crates/project/Cargo.toml +++ b/crates/project/Cargo.toml @@ -13,6 +13,7 @@ test-support = ["language/test-support", "text/test-support"] text = { path = "../text" } client = { path = "../client" } clock = { path = "../clock" } +collections = { path = "../collections" } fsevent = { path = "../fsevent" } fuzzy = { path = "../fuzzy" } gpui = { path = "../gpui" } @@ -37,6 +38,7 @@ toml = "0.5" [dev-dependencies] client = { path = "../client", features = ["test-support"] } +collections = { path = "../collections", features = ["test-support"] } gpui = { path = "../gpui", features = ["test-support"] } language = { path = "../language", features = ["test-support"] } lsp = { path = "../lsp", features = ["test-support"] } diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index fa6f6a0f39..4a646cb17b 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -7,6 +7,7 @@ use ::ignore::gitignore::{Gitignore, GitignoreBuilder}; use anyhow::{anyhow, Context, Result}; use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore}; use clock::ReplicaId; +use collections::{hash_map, HashMap}; use futures::{Stream, StreamExt}; use fuzzy::CharBag; use gpui::{ @@ -14,8 +15,8 @@ use gpui::{ Task, UpgradeModelHandle, WeakModelHandle, }; use language::{ - Buffer, Diagnostic, DiagnosticEntry, DiagnosticSeverity, Language, LanguageRegistry, Operation, - PointUtf16, Rope, + Buffer, Diagnostic, DiagnosticEntry, DiagnosticSeverity, File as _, Language, LanguageRegistry, + Operation, PointUtf16, Rope, }; use lazy_static::lazy_static; use lsp::LanguageServer; @@ -29,7 +30,6 @@ use smol::channel::{self, Sender}; use std::{ any::Any, cmp::{self, Ordering}, - collections::HashMap, convert::{TryFrom, TryInto}, ffi::{OsStr, OsString}, fmt, @@ -240,7 +240,7 @@ impl Worktree { user_store .update(cx, |user_store, cx| user_store.load_users(user_ids, cx)) .await?; - let mut collaborators = HashMap::with_capacity(join_response.collaborators.len()); + let mut collaborators = HashMap::default(); for message in join_response.collaborators { let collaborator = Collaborator::from_proto(message, &user_store, cx).await?; collaborators.insert(collaborator.peer_id, collaborator); @@ -306,8 +306,9 @@ impl Worktree { snapshot_rx, updates_tx, client: client.clone(), + loading_buffers: Default::default(), open_buffers: Default::default(), - diagnostics: Vec::new(), + diagnostic_summaries: HashMap::default(), collaborators, queued_operations: Default::default(), languages, @@ -476,15 +477,65 @@ impl Worktree { std::iter::empty() } + pub fn loading_buffers<'a>(&'a mut self) -> &'a mut LoadingBuffers { + match self { + Worktree::Local(worktree) => &mut worktree.loading_buffers, + Worktree::Remote(worktree) => &mut worktree.loading_buffers, + } + } + pub fn open_buffer( &mut self, path: impl AsRef, cx: &mut ModelContext, ) -> Task>> { - match self { - Worktree::Local(worktree) => worktree.open_buffer(path.as_ref(), cx), - Worktree::Remote(worktree) => worktree.open_buffer(path.as_ref(), cx), + let path = path.as_ref(); + + // If there is already a buffer for the given path, then return it. + let existing_buffer = match self { + Worktree::Local(worktree) => worktree.get_open_buffer(path, cx), + Worktree::Remote(worktree) => worktree.get_open_buffer(path, cx), + }; + if let Some(existing_buffer) = existing_buffer { + return cx.spawn(move |_, _| async move { Ok(existing_buffer) }); } + + let path: Arc = Arc::from(path); + let mut loading_watch = match self.loading_buffers().entry(path.clone()) { + // If the given path is already being loaded, then wait for that existing + // task to complete and return the same buffer. + hash_map::Entry::Occupied(e) => e.get().clone(), + + // Otherwise, record the fact that this path is now being loaded. + hash_map::Entry::Vacant(entry) => { + let (mut tx, rx) = postage::watch::channel(); + entry.insert(rx.clone()); + + let load_buffer = match self { + Worktree::Local(worktree) => worktree.open_buffer(&path, cx), + Worktree::Remote(worktree) => worktree.open_buffer(&path, cx), + }; + cx.spawn(move |this, mut cx| async move { + let result = load_buffer.await; + + // After the buffer loads, record the fact that it is no longer + // loading. + this.update(&mut cx, |this, _| this.loading_buffers().remove(&path)); + *tx.borrow_mut() = Some(result.map_err(|e| Arc::new(e))); + }) + .detach(); + rx + } + }; + + cx.spawn(|_, _| async move { + loop { + if let Some(result) = loading_watch.borrow().as_ref() { + return result.clone().map_err(|e| anyhow!("{}", e)); + } + loading_watch.recv().await; + } + }) } #[cfg(feature = "test-support")] @@ -769,8 +820,8 @@ impl Worktree { .context("path is not within worktree")?, ); - let mut group_ids_by_diagnostic_range = HashMap::new(); - let mut diagnostics_by_group_id = HashMap::new(); + let mut group_ids_by_diagnostic_range = HashMap::default(); + let mut diagnostics_by_group_id = HashMap::default(); let mut next_group_id = 0; for diagnostic in ¶ms.diagnostics { let source = diagnostic.source.as_ref(); @@ -878,15 +929,18 @@ impl Worktree { } } -impl Deref for Worktree { - type Target = Snapshot; - - fn deref(&self) -> &Self::Target { - match self { - Worktree::Local(worktree) => &worktree.snapshot, - Worktree::Remote(worktree) => &worktree.snapshot, - } - } +#[derive(Clone)] +pub struct Snapshot { + id: usize, + scan_id: usize, + abs_path: Arc, + root_name: String, + root_char_bag: CharBag, + ignores: HashMap, (Arc, usize)>, + entries_by_path: SumTree, + entries_by_id: SumTree, + removed_entry_ids: HashMap, + next_entry_id: Arc, } pub struct LocalWorktree { @@ -899,9 +953,11 @@ pub struct LocalWorktree { poll_task: Option>, remote_id: watch::Receiver>, share: Option, + loading_buffers: LoadingBuffers, open_buffers: HashMap>, shared_buffers: HashMap>>, diagnostics: HashMap, Vec>>, + diagnostic_summaries: HashMap, DiagnosticSummary>, collaborators: HashMap, queued_operations: Vec<(u64, Operation)>, languages: Arc, @@ -911,6 +967,33 @@ pub struct LocalWorktree { language_servers: HashMap>, } +struct ShareState { + snapshots_tx: Sender, + _subscriptions: Vec, +} + +pub struct RemoteWorktree { + remote_id: u64, + snapshot: Snapshot, + snapshot_rx: watch::Receiver, + client: Arc, + updates_tx: postage::mpsc::Sender, + replica_id: ReplicaId, + loading_buffers: LoadingBuffers, + open_buffers: HashMap, + collaborators: HashMap, + diagnostic_summaries: HashMap, DiagnosticSummary>, + languages: Arc, + user_store: ModelHandle, + queued_operations: Vec<(u64, Operation)>, + _subscriptions: Vec, +} + +type LoadingBuffers = HashMap< + Arc, + postage::watch::Receiver, Arc>>>, +>; + #[derive(Default, Deserialize)] struct WorktreeConfig { collaborators: Vec, @@ -1015,9 +1098,11 @@ impl LocalWorktree { _maintain_remote_id_task, share: None, poll_task: None, + loading_buffers: Default::default(), open_buffers: Default::default(), shared_buffers: Default::default(), diagnostics: Default::default(), + diagnostic_summaries: Default::default(), queued_operations: Default::default(), collaborators: Default::default(), languages, @@ -1105,20 +1190,18 @@ impl LocalWorktree { } } - pub fn open_buffer( + fn get_open_buffer( &mut self, path: &Path, cx: &mut ModelContext, - ) -> Task>> { + ) -> Option> { let handle = cx.handle(); - - // If there is already a buffer for the given path, then return it. - let mut existing_buffer = None; + let mut result = None; self.open_buffers.retain(|_buffer_id, buffer| { if let Some(buffer) = buffer.upgrade(cx.as_ref()) { if let Some(file) = buffer.read(cx.as_ref()).file() { if file.worktree_id() == handle.id() && file.path().as_ref() == path { - existing_buffer = Some(buffer); + result = Some(buffer); } } true @@ -1126,45 +1209,45 @@ impl LocalWorktree { false } }); + result + } + fn open_buffer( + &mut self, + path: &Path, + cx: &mut ModelContext, + ) -> Task>> { let path = Arc::from(path); - cx.spawn(|this, mut cx| async move { - if let Some(existing_buffer) = existing_buffer { - Ok(existing_buffer) - } else { - let (file, contents) = this - .update(&mut cx, |this, cx| this.as_local().unwrap().load(&path, cx)) - .await?; - let language = this.read_with(&cx, |this, _| { - use language::File; - this.languages().select_language(file.full_path()).cloned() - }); - let (diagnostics, language_server) = this.update(&mut cx, |this, cx| { - let this = this.as_local_mut().unwrap(); - ( - this.diagnostics.remove(path.as_ref()), - language - .as_ref() - .and_then(|language| this.ensure_language_server(language, cx)), - ) - }); - let buffer = cx.add_model(|cx| { - let mut buffer = Buffer::from_file(0, contents, Box::new(file), cx); - buffer.set_language(language, language_server, cx); - if let Some(diagnostics) = diagnostics { - buffer.update_diagnostics(None, diagnostics, cx).unwrap(); - } - buffer - }); - this.update(&mut cx, |this, _| { - let this = this - .as_local_mut() - .ok_or_else(|| anyhow!("must be a local worktree"))?; + cx.spawn(move |this, mut cx| async move { + let (file, contents) = this + .update(&mut cx, |t, cx| t.as_local().unwrap().load(&path, cx)) + .await?; - this.open_buffers.insert(buffer.id(), buffer.downgrade()); - Ok(buffer) - }) - } + let (diagnostics, language, language_server) = this.update(&mut cx, |this, cx| { + let this = this.as_local_mut().unwrap(); + let diagnostics = this.diagnostics.remove(&path); + let language = this.languages.select_language(file.full_path()).cloned(); + let server = language + .as_ref() + .and_then(|language| this.ensure_language_server(language, cx)); + (diagnostics, language, server) + }); + + let buffer = cx.add_model(|cx| { + let mut buffer = Buffer::from_file(0, contents, Box::new(file), cx); + buffer.set_language(language, language_server, cx); + if let Some(diagnostics) = diagnostics { + buffer.update_diagnostics(None, diagnostics, cx).unwrap(); + } + buffer + }); + + this.update(&mut cx, |this, _| { + let this = this.as_local_mut().unwrap(); + this.open_buffers.insert(buffer.id(), buffer.downgrade()); + }); + + Ok(buffer) }) } @@ -1173,13 +1256,12 @@ impl LocalWorktree { envelope: TypedEnvelope, cx: &mut ModelContext, ) -> Task> { - let peer_id = envelope.original_sender_id(); - let path = Path::new(&envelope.payload.path); - - let buffer = self.open_buffer(path, cx); - cx.spawn(|this, mut cx| async move { - let buffer = buffer.await?; + let peer_id = envelope.original_sender_id(); + let path = Path::new(&envelope.payload.path); + let buffer = this + .update(&mut cx, |this, cx| this.open_buffer(path, cx)) + .await?; this.update(&mut cx, |this, cx| { this.as_local_mut() .unwrap() @@ -1473,6 +1555,17 @@ fn build_gitignore(abs_path: &Path, fs: &dyn Fs) -> Result { Ok(builder.build()?) } +impl Deref for Worktree { + type Target = Snapshot; + + fn deref(&self) -> &Self::Target { + match self { + Worktree::Local(worktree) => &worktree.snapshot, + Worktree::Remote(worktree) => &worktree.snapshot, + } + } +} + impl Deref for LocalWorktree { type Target = Snapshot; @@ -1487,38 +1580,18 @@ impl fmt::Debug for LocalWorktree { } } -struct ShareState { - snapshots_tx: Sender, - _subscriptions: Vec, -} - -pub struct RemoteWorktree { - remote_id: u64, - snapshot: Snapshot, - snapshot_rx: watch::Receiver, - client: Arc, - updates_tx: postage::mpsc::Sender, - replica_id: ReplicaId, - open_buffers: HashMap, - collaborators: HashMap, - diagnostics: Vec, - languages: Arc, - user_store: ModelHandle, - queued_operations: Vec<(u64, Operation)>, - _subscriptions: Vec, -} - impl RemoteWorktree { - pub fn open_buffer( + fn get_open_buffer( &mut self, path: &Path, cx: &mut ModelContext, - ) -> Task>> { + ) -> Option> { + let handle = cx.handle(); let mut existing_buffer = None; self.open_buffers.retain(|_buffer_id, buffer| { if let Some(buffer) = buffer.upgrade(cx.as_ref()) { if let Some(file) = buffer.read(cx.as_ref()).file() { - if file.worktree_id() == cx.model_id() && file.path().as_ref() == path { + if file.worktree_id() == handle.id() && file.path().as_ref() == path { existing_buffer = Some(buffer); } } @@ -1527,62 +1600,65 @@ impl RemoteWorktree { false } }); + existing_buffer + } + fn open_buffer( + &mut self, + path: &Path, + cx: &mut ModelContext, + ) -> Task>> { let rpc = self.client.clone(); let replica_id = self.replica_id; let remote_worktree_id = self.remote_id; let root_path = self.snapshot.abs_path.clone(); - let path = path.to_string_lossy().to_string(); - cx.spawn_weak(|this, mut cx| async move { - if let Some(existing_buffer) = existing_buffer { - Ok(existing_buffer) - } else { - let entry = this - .upgrade(&cx) - .ok_or_else(|| anyhow!("worktree was closed"))? - .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned()) - .ok_or_else(|| anyhow!("file does not exist"))?; - let response = rpc - .request(proto::OpenBuffer { - worktree_id: remote_worktree_id as u64, - path, - }) - .await?; + let path: Arc = Arc::from(path); + let path_string = path.to_string_lossy().to_string(); + cx.spawn_weak(move |this, mut cx| async move { + let entry = this + .upgrade(&cx) + .ok_or_else(|| anyhow!("worktree was closed"))? + .read_with(&cx, |tree, _| tree.entry_for_path(&path).cloned()) + .ok_or_else(|| anyhow!("file does not exist"))?; + let response = rpc + .request(proto::OpenBuffer { + worktree_id: remote_worktree_id as u64, + path: path_string, + }) + .await?; - let this = this - .upgrade(&cx) - .ok_or_else(|| anyhow!("worktree was closed"))?; - let file = File { - entry_id: Some(entry.id), - worktree: this.clone(), - worktree_path: root_path, - path: entry.path, - mtime: entry.mtime, - is_local: false, - }; - let language = this.read_with(&cx, |this, _| { - use language::File; - this.languages().select_language(file.full_path()).cloned() - }); - let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?; - let buffer_id = remote_buffer.id as usize; - let buffer = cx.add_model(|cx| { - Buffer::from_proto(replica_id, remote_buffer, Some(Box::new(file)), cx) - .unwrap() - .with_language(language, None, cx) - }); - this.update(&mut cx, |this, cx| { - let this = this.as_remote_mut().unwrap(); - if let Some(RemoteBuffer::Operations(pending_ops)) = this - .open_buffers - .insert(buffer_id, RemoteBuffer::Loaded(buffer.downgrade())) - { - buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?; - } - Result::<_, anyhow::Error>::Ok(()) - })?; - Ok(buffer) - } + let this = this + .upgrade(&cx) + .ok_or_else(|| anyhow!("worktree was closed"))?; + let file = File { + entry_id: Some(entry.id), + worktree: this.clone(), + worktree_path: root_path, + path: entry.path, + mtime: entry.mtime, + is_local: false, + }; + let language = this.read_with(&cx, |this, _| { + use language::File; + this.languages().select_language(file.full_path()).cloned() + }); + let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?; + let buffer_id = remote_buffer.id as usize; + let buffer = cx.add_model(|cx| { + Buffer::from_proto(replica_id, remote_buffer, Some(Box::new(file)), cx) + .unwrap() + .with_language(language, None, cx) + }); + this.update(&mut cx, move |this, cx| { + let this = this.as_remote_mut().unwrap(); + if let Some(RemoteBuffer::Operations(pending_ops)) = this + .open_buffers + .insert(buffer_id, RemoteBuffer::Loaded(buffer.downgrade())) + { + buffer.update(cx, |buf, cx| buf.apply_ops(pending_ops, cx))?; + } + Result::<_, anyhow::Error>::Ok(buffer) + }) }) } @@ -1665,20 +1741,6 @@ impl RemoteBuffer { } } -#[derive(Clone)] -pub struct Snapshot { - id: usize, - scan_id: usize, - abs_path: Arc, - root_name: String, - root_char_bag: CharBag, - ignores: HashMap, (Arc, usize)>, - entries_by_path: SumTree, - entries_by_id: SumTree, - removed_entry_ids: HashMap, - next_entry_id: Arc, -} - impl Snapshot { pub fn id(&self) -> usize { self.id @@ -3519,6 +3581,64 @@ mod tests { server.receive::().await.unwrap(); } + #[gpui::test] + async fn test_buffer_deduping(mut cx: gpui::TestAppContext) { + let user_id = 100; + let mut client = Client::new(); + let server = FakeServer::for_client(user_id, &mut client, &cx).await; + let user_store = server.build_user_store(client.clone(), &mut cx).await; + + let fs = Arc::new(FakeFs::new()); + fs.insert_tree( + "/the-dir", + json!({ + "a.txt": "a-contents", + "b.txt": "b-contents", + }), + ) + .await; + + let worktree = Worktree::open_local( + client.clone(), + user_store, + "/the-dir".as_ref(), + fs, + Default::default(), + &mut cx.to_async(), + ) + .await + .unwrap(); + + // Spawn multiple tasks to open paths, repeating some paths. + let (buffer_a_1, buffer_b, buffer_a_2) = worktree.update(&mut cx, |worktree, cx| { + ( + worktree.open_buffer("a.txt", cx), + worktree.open_buffer("b.txt", cx), + worktree.open_buffer("a.txt", cx), + ) + }); + + let buffer_a_1 = buffer_a_1.await.unwrap(); + let buffer_a_2 = buffer_a_2.await.unwrap(); + let buffer_b = buffer_b.await.unwrap(); + assert_eq!(buffer_a_1.read_with(&cx, |b, _| b.text()), "a-contents"); + assert_eq!(buffer_b.read_with(&cx, |b, _| b.text()), "b-contents"); + + // There is only one buffer per path. + let buffer_a_id = buffer_a_1.id(); + assert_eq!(buffer_a_2.id(), buffer_a_id); + + // Open the same path again while it is still open. + drop(buffer_a_1); + let buffer_a_3 = worktree + .update(&mut cx, |worktree, cx| worktree.open_buffer("a.txt", cx)) + .await + .unwrap(); + + // There's still only one buffer per path. + assert_eq!(buffer_a_3.id(), buffer_a_id); + } + #[gpui::test] async fn test_buffer_is_dirty(mut cx: gpui::TestAppContext) { use std::fs; @@ -3985,7 +4105,7 @@ mod tests { worktree .update(&mut cx, |tree, cx| tree.update_diagnostics(message, cx)) .unwrap(); - let buffer = buffer.read_with(&cx, |buffer, cx| buffer.snapshot()); + let buffer = buffer.read_with(&cx, |buffer, _| buffer.snapshot()); assert_eq!( buffer diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index e06db2a273..c46e631c8b 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -1064,7 +1064,7 @@ mod tests { // TODO // // Remove the selection set as client B, see those selections disappear as client A. - // cx_b.update(move |_| drop(editor_b)); + cx_b.update(move |_| drop(editor_b)); // buffer_a // .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0) // .await; diff --git a/crates/workspace/src/workspace.rs b/crates/workspace/src/workspace.rs index 1f11f11693..e49f81efa7 100644 --- a/crates/workspace/src/workspace.rs +++ b/crates/workspace/src/workspace.rs @@ -4,7 +4,7 @@ pub mod settings; pub mod sidebar; mod status_bar; -use anyhow::{anyhow, Result}; +use anyhow::Result; use client::{Authenticate, ChannelList, Client, User, UserStore}; use gpui::{ action, @@ -28,7 +28,6 @@ use sidebar::{Side, Sidebar, SidebarItemId, ToggleSidebarItem, ToggleSidebarItem use status_bar::StatusBar; pub use status_bar::StatusItemView; use std::{ - collections::{hash_map::Entry, HashMap}, future::Future, path::{Path, PathBuf}, sync::Arc, @@ -342,10 +341,6 @@ pub struct Workspace { project: ModelHandle, entry_openers: Arc<[Box]>, items: Vec>, - loading_items: HashMap< - ProjectPath, - postage::watch::Receiver, Arc>>>, - >, _observe_current_user: Task<()>, } @@ -408,7 +403,6 @@ impl Workspace { project, entry_openers: params.entry_openers.clone(), items: Default::default(), - loading_items: Default::default(), _observe_current_user, } } @@ -606,43 +600,22 @@ impl Workspace { } }; - if let Entry::Vacant(entry) = self.loading_items.entry(project_path.clone()) { - let (mut tx, rx) = postage::watch::channel(); - entry.insert(rx); - - let project_path = project_path.clone(); - let entry_openers = self.entry_openers.clone(); - cx.as_mut() - .spawn(|mut cx| async move { - let item = worktree.update(&mut cx, move |worktree, cx| { - for opener in entry_openers.iter() { - if let Some(task) = opener.open(worktree, project_path.clone(), cx) { - return task; - } - } - - cx.spawn(|_, _| async move { - Err(anyhow!("no opener for path {:?} found", project_path)) - }) - }); - *tx.borrow_mut() = Some(item.await.map_err(Arc::new)); - }) - .detach(); - } + let project_path = project_path.clone(); + let entry_openers = self.entry_openers.clone(); + let task = worktree.update(cx, |worktree, cx| { + for opener in entry_openers.iter() { + if let Some(task) = opener.open(worktree, project_path.clone(), cx) { + return Some(task); + } + } + log::error!("no opener for path {:?} found", project_path); + None + })?; let pane = pane.downgrade(); - let mut watch = self.loading_items.get(&project_path).unwrap().clone(); - Some(cx.spawn(|this, mut cx| async move { - let load_result = loop { - if let Some(load_result) = watch.borrow().as_ref() { - break load_result.clone(); - } - watch.recv().await; - }; - + let load_result = task.await; this.update(&mut cx, |this, cx| { - this.loading_items.remove(&project_path); if let Some(pane) = pane.upgrade(&cx) { match load_result { Ok(item) => {