From 489077befcc1ac4e1aa4505fa3f27f1a6528da48 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Fri, 12 Jul 2024 15:25:54 -0700 Subject: [PATCH] Extract a BufferStore object from Project (#14037) This is a ~small~ pure refactor that's a step toward SSH remoting. I've extracted the Project's buffer state management into a smaller, separate struct called `BufferStore`, currently in the same crate. I did this as a separate PR to reduce conflicts between main and `remoting-over-ssh`. The idea is to make use of this struct (and other smaller structs that make up `Project`) in a dedicated, simpler `HeadlessProject` type that we will use in the SSH server to model the remote end of a project. With this approach, as we develop the headless project, we can avoid adding more conditional logic to `Project` itself (which is already very complex), and actually make `Project` a bit smaller by extracting out helper objects. Release Notes: - N/A --- Cargo.lock | 1 + crates/client/src/client.rs | 25 +- crates/collab/src/tests/integration_tests.rs | 8 +- .../random_project_collaboration_tests.rs | 6 +- crates/editor/src/editor.rs | 2 +- crates/editor/src/items.rs | 2 +- crates/outline_panel/src/outline_panel.rs | 4 +- crates/project/src/buffer_store.rs | 928 +++++++++++ crates/project/src/lsp_command.rs | 14 +- crates/project/src/project.rs | 1397 ++++------------- crates/project/src/project_tests.rs | 11 +- crates/proto/Cargo.toml | 1 + crates/proto/src/proto.rs | 58 +- crates/rpc/src/peer.rs | 8 + crates/worktree/src/worktree.rs | 44 +- 15 files changed, 1394 insertions(+), 1115 deletions(-) create mode 100644 crates/project/src/buffer_store.rs diff --git a/Cargo.lock b/Cargo.lock index fb37a5159d..1f11a62c85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8199,6 +8199,7 @@ version = "0.1.0" dependencies = [ "anyhow", "collections", + "futures 0.3.28", "prost", "prost-build", "serde", diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index ed2f108860..99d05cb146 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -13,8 +13,9 @@ use async_tungstenite::tungstenite::{ use clock::SystemClock; use collections::HashMap; use futures::{ - channel::oneshot, future::LocalBoxFuture, AsyncReadExt, FutureExt, SinkExt, Stream, StreamExt, - TryFutureExt as _, TryStreamExt, + channel::oneshot, + future::{BoxFuture, LocalBoxFuture}, + AsyncReadExt, FutureExt, SinkExt, Stream, StreamExt, TryFutureExt as _, TryStreamExt, }; use gpui::{ actions, AnyModel, AnyWeakModel, AppContext, AsyncAppContext, Global, Model, Task, WeakModel, @@ -23,6 +24,7 @@ use http::{HttpClient, HttpClientWithUrl}; use lazy_static::lazy_static; use parking_lot::RwLock; use postage::watch; +use proto::ProtoClient; use rand::prelude::*; use release_channel::{AppVersion, ReleaseChannel}; use rpc::proto::{AnyTypedEnvelope, EntityMessage, EnvelopedMessage, PeerId, RequestMessage}; @@ -1408,6 +1410,11 @@ impl Client { self.peer.send(self.connection_id()?, message) } + fn send_dynamic(&self, envelope: proto::Envelope) -> Result<()> { + let connection_id = self.connection_id()?; + self.peer.send_dynamic(connection_id, envelope) + } + pub fn request( &self, request: T, @@ -1606,6 +1613,20 @@ impl Client { } } +impl ProtoClient for Client { + fn request( + &self, + envelope: proto::Envelope, + request_type: &'static str, + ) -> BoxFuture<'static, Result> { + self.request_dynamic(envelope, request_type).boxed() + } + + fn send(&self, envelope: proto::Envelope) -> Result<()> { + self.send_dynamic(envelope) + } +} + #[derive(Serialize, Deserialize)] struct DevelopmentCredentials { user_id: u64, diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index c4f44e4dad..01d5d1b8ac 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -3327,7 +3327,7 @@ async fn test_local_settings( let store = cx.global::(); assert_eq!( store - .local_settings(worktree_b.read(cx).id().to_usize()) + .local_settings(worktree_b.entity_id().as_u64() as _) .collect::>(), &[ (Path::new("").into(), r#"{"tab_size":2}"#.to_string()), @@ -3346,7 +3346,7 @@ async fn test_local_settings( let store = cx.global::(); assert_eq!( store - .local_settings(worktree_b.read(cx).id().to_usize()) + .local_settings(worktree_b.entity_id().as_u64() as _) .collect::>(), &[ (Path::new("").into(), r#"{}"#.to_string()), @@ -3375,7 +3375,7 @@ async fn test_local_settings( let store = cx.global::(); assert_eq!( store - .local_settings(worktree_b.read(cx).id().to_usize()) + .local_settings(worktree_b.entity_id().as_u64() as _) .collect::>(), &[ (Path::new("a").into(), r#"{"tab_size":8}"#.to_string()), @@ -3407,7 +3407,7 @@ async fn test_local_settings( let store = cx.global::(); assert_eq!( store - .local_settings(worktree_b.read(cx).id().to_usize()) + .local_settings(worktree_b.entity_id().as_u64() as _) .collect::>(), &[(Path::new("a").into(), r#"{"hard_tabs":true}"#.to_string()),] ) diff --git a/crates/collab/src/tests/random_project_collaboration_tests.rs b/crates/collab/src/tests/random_project_collaboration_tests.rs index ff052c550d..97dd93e138 100644 --- a/crates/collab/src/tests/random_project_collaboration_tests.rs +++ b/crates/collab/src/tests/random_project_collaboration_tests.rs @@ -1237,7 +1237,7 @@ impl RandomizedTest for ProjectCollaborationTest { } } - for buffer in guest_project.opened_buffers() { + for buffer in guest_project.opened_buffers(cx) { let buffer = buffer.read(cx); assert_eq!( buffer.deferred_ops_len(), @@ -1287,8 +1287,8 @@ impl RandomizedTest for ProjectCollaborationTest { for guest_buffer in guest_buffers { let buffer_id = guest_buffer.read_with(client_cx, |buffer, _| buffer.remote_id()); - let host_buffer = host_project.read_with(host_cx, |project, _| { - project.buffer_for_id(buffer_id).unwrap_or_else(|| { + let host_buffer = host_project.read_with(host_cx, |project, cx| { + project.buffer_for_id(buffer_id, cx).unwrap_or_else(|| { panic!( "host does not have buffer for guest:{}, peer:{:?}, id:{}", client.username, diff --git a/crates/editor/src/editor.rs b/crates/editor/src/editor.rs index 595593e843..c9f25d8a4e 100644 --- a/crates/editor/src/editor.rs +++ b/crates/editor/src/editor.rs @@ -8529,7 +8529,7 @@ impl Editor { ) -> Vec<(TaskSourceKind, TaskTemplate)> { let (inventory, worktree_id, file) = project.read_with(cx, |project, cx| { let (worktree_id, file) = project - .buffer_for_id(runnable.buffer) + .buffer_for_id(runnable.buffer, cx) .and_then(|buffer| buffer.read(cx).file()) .map(|file| (WorktreeId::from_usize(file.worktree_id()), file.clone())) .unzip(); diff --git a/crates/editor/src/items.rs b/crates/editor/src/items.rs index 9c70a75e79..5289923c04 100644 --- a/crates/editor/src/items.rs +++ b/crates/editor/src/items.rs @@ -371,7 +371,7 @@ async fn update_editor_from_message( continue; }; let buffer_id = BufferId::new(excerpt.buffer_id)?; - let Some(buffer) = project.read(cx).buffer_for_id(buffer_id) else { + let Some(buffer) = project.read(cx).buffer_for_id(buffer_id, cx) else { continue; }; diff --git a/crates/outline_panel/src/outline_panel.rs b/crates/outline_panel/src/outline_panel.rs index 6d87cbd706..feb0e186e3 100644 --- a/crates/outline_panel/src/outline_panel.rs +++ b/crates/outline_panel/src/outline_panel.rs @@ -1129,7 +1129,7 @@ impl OutlinePanel { EntryOwned::Entry(FsEntry::File(worktree_id, _, buffer_id, _)) => { let project = self.project.read(cx); let entry_id = project - .buffer_for_id(buffer_id) + .buffer_for_id(buffer_id, cx) .and_then(|buffer| buffer.read(cx).entry_id(cx)); project .worktree_for_id(worktree_id, cx) @@ -1147,7 +1147,7 @@ impl OutlinePanel { .remove(&CollapsedEntry::Excerpt(buffer_id, excerpt_id)); let project = self.project.read(cx); let entry_id = project - .buffer_for_id(buffer_id) + .buffer_for_id(buffer_id, cx) .and_then(|buffer| buffer.read(cx).entry_id(cx)); entry_id.and_then(|entry_id| { diff --git a/crates/project/src/buffer_store.rs b/crates/project/src/buffer_store.rs new file mode 100644 index 0000000000..9e4e7a278b --- /dev/null +++ b/crates/project/src/buffer_store.rs @@ -0,0 +1,928 @@ +use crate::ProjectPath; +use anyhow::{anyhow, Context as _, Result}; +use collections::{hash_map, HashMap}; +use futures::{channel::oneshot, StreamExt as _}; +use gpui::{ + AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel, +}; +use language::{ + proto::{deserialize_version, serialize_version, split_operations}, + Buffer, Capability, Language, Operation, +}; +use rpc::{ + proto::{self, AnyProtoClient, PeerId}, + ErrorExt as _, TypedEnvelope, +}; +use std::{io, path::Path, sync::Arc}; +use text::BufferId; +use util::{debug_panic, maybe, ResultExt as _}; +use worktree::{File, ProjectEntryId, RemoteWorktree, Worktree}; + +/// A set of open buffers. +pub struct BufferStore { + retain_buffers: bool, + opened_buffers: HashMap, + local_buffer_ids_by_path: HashMap, + local_buffer_ids_by_entry_id: HashMap, + #[allow(clippy::type_complexity)] + loading_buffers_by_path: HashMap< + ProjectPath, + postage::watch::Receiver, Arc>>>, + >, + loading_remote_buffers_by_id: HashMap>, + remote_buffer_listeners: + HashMap, anyhow::Error>>>>, +} + +enum OpenBuffer { + Strong(Model), + Weak(WeakModel), + Operations(Vec), +} + +pub enum BufferStoreEvent { + BufferAdded(Model), + BufferChangedFilePath { + buffer: Model, + old_file: Option>, + }, + BufferSaved { + buffer: Model, + has_changed_file: bool, + saved_version: clock::Global, + }, +} + +impl EventEmitter for BufferStore {} + +impl BufferStore { + /// Creates a buffer store, optionally retaining its buffers. + /// + /// If `retain_buffers` is `true`, then buffers are owned by the buffer store + /// and won't be released unless they are explicitly removed, or `retain_buffers` + /// is set to `false` via `set_retain_buffers`. Otherwise, buffers are stored as + /// weak handles. + pub fn new(retain_buffers: bool) -> Self { + Self { + retain_buffers, + opened_buffers: Default::default(), + remote_buffer_listeners: Default::default(), + loading_remote_buffers_by_id: Default::default(), + local_buffer_ids_by_path: Default::default(), + local_buffer_ids_by_entry_id: Default::default(), + loading_buffers_by_path: Default::default(), + } + } + + pub fn open_buffer( + &mut self, + project_path: ProjectPath, + worktree: Model, + cx: &mut ModelContext, + ) -> Task>> { + let existing_buffer = self.get_by_path(&project_path, cx); + if let Some(existing_buffer) = existing_buffer { + return Task::ready(Ok(existing_buffer)); + } + + let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) { + // If the given path is already being loaded, then wait for that existing + // task to complete and return the same buffer. + hash_map::Entry::Occupied(e) => e.get().clone(), + + // Otherwise, record the fact that this path is now being loaded. + hash_map::Entry::Vacant(entry) => { + let (mut tx, rx) = postage::watch::channel(); + entry.insert(rx.clone()); + + let project_path = project_path.clone(); + let load_buffer = match worktree.read(cx) { + Worktree::Local(_) => { + self.open_local_buffer_internal(project_path.path.clone(), worktree, cx) + } + Worktree::Remote(tree) => { + self.open_remote_buffer_internal(&project_path.path, tree, cx) + } + }; + + cx.spawn(move |this, mut cx| async move { + let load_result = load_buffer.await; + *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| { + // Record the fact that the buffer is no longer loading. + this.loading_buffers_by_path.remove(&project_path); + let buffer = load_result.map_err(Arc::new)?; + Ok(buffer) + })?); + anyhow::Ok(()) + }) + .detach(); + rx + } + }; + + cx.background_executor().spawn(async move { + Self::wait_for_loading_buffer(loading_watch) + .await + .map_err(|e| e.cloned()) + }) + } + + fn open_local_buffer_internal( + &mut self, + path: Arc, + worktree: Model, + cx: &mut ModelContext, + ) -> Task>> { + let load_buffer = worktree.update(cx, |worktree, cx| { + let load_file = worktree.load_file(path.as_ref(), cx); + let reservation = cx.reserve_model(); + let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64()); + cx.spawn(move |_, mut cx| async move { + let loaded = load_file.await?; + let text_buffer = cx + .background_executor() + .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) }) + .await; + cx.insert_model(reservation, |_| { + Buffer::build( + text_buffer, + loaded.diff_base, + Some(loaded.file), + Capability::ReadWrite, + ) + }) + }) + }); + + cx.spawn(move |this, mut cx| async move { + let buffer = match load_buffer.await { + Ok(buffer) => Ok(buffer), + Err(error) if is_not_found_error(&error) => cx.new_model(|cx| { + let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64()); + let text_buffer = text::Buffer::new(0, buffer_id, "".into()); + Buffer::build( + text_buffer, + None, + Some(Arc::new(File { + worktree, + path, + mtime: None, + entry_id: None, + is_local: true, + is_deleted: false, + is_private: false, + })), + Capability::ReadWrite, + ) + }), + Err(e) => Err(e), + }?; + this.update(&mut cx, |this, cx| { + this.add_buffer(buffer.clone(), cx).log_err(); + })?; + Ok(buffer) + }) + } + + fn open_remote_buffer_internal( + &self, + path: &Arc, + worktree: &RemoteWorktree, + cx: &ModelContext, + ) -> Task>> { + let worktree_id = worktree.id().to_proto(); + let project_id = worktree.project_id(); + let client = worktree.client(); + let path_string = path.clone().to_string_lossy().to_string(); + cx.spawn(move |this, mut cx| async move { + let response = client + .request(proto::OpenBufferByPath { + project_id, + worktree_id, + path: path_string, + }) + .await?; + let buffer_id = BufferId::new(response.buffer_id)?; + this.update(&mut cx, |this, cx| { + this.wait_for_remote_buffer(buffer_id, cx) + })? + .await + }) + } + + pub fn create_buffer( + &mut self, + remote_client: Option<(AnyProtoClient, u64)>, + cx: &mut ModelContext, + ) -> Task>> { + if let Some((remote_client, project_id)) = remote_client { + let create = remote_client.request(proto::OpenNewBuffer { project_id }); + cx.spawn(|this, mut cx| async move { + let response = create.await?; + let buffer_id = BufferId::new(response.buffer_id)?; + + this.update(&mut cx, |this, cx| { + this.wait_for_remote_buffer(buffer_id, cx) + })? + .await + }) + } else { + Task::ready(Ok(self.create_local_buffer("", None, cx))) + } + } + + pub fn create_local_buffer( + &mut self, + text: &str, + language: Option>, + cx: &mut ModelContext, + ) -> Model { + let buffer = cx.new_model(|cx| { + Buffer::local(text, cx) + .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx) + }); + self.add_buffer(buffer.clone(), cx).log_err(); + buffer + } + + pub fn save_buffer( + &mut self, + buffer: Model, + cx: &mut ModelContext, + ) -> Task> { + let Some(file) = File::from_dyn(buffer.read(cx).file()) else { + return Task::ready(Err(anyhow!("buffer doesn't have a file"))); + }; + match file.worktree.read(cx) { + Worktree::Local(_) => { + self.save_local_buffer(file.worktree.clone(), buffer, file.path.clone(), false, cx) + } + Worktree::Remote(tree) => self.save_remote_buffer(buffer, None, tree, cx), + } + } + + pub fn save_buffer_as( + &mut self, + buffer: Model, + path: ProjectPath, + worktree: Model, + cx: &mut ModelContext, + ) -> Task> { + let old_file = File::from_dyn(buffer.read(cx).file()) + .cloned() + .map(Arc::new); + + let task = match worktree.read(cx) { + Worktree::Local(_) => { + self.save_local_buffer(worktree, buffer.clone(), path.path, true, cx) + } + Worktree::Remote(tree) => { + self.save_remote_buffer(buffer.clone(), Some(path.to_proto()), tree, cx) + } + }; + cx.spawn(|this, mut cx| async move { + task.await?; + this.update(&mut cx, |_, cx| { + cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file }); + }) + }) + } + + fn save_local_buffer( + &self, + worktree: Model, + buffer_handle: Model, + path: Arc, + mut has_changed_file: bool, + cx: &mut ModelContext, + ) -> Task> { + let buffer = buffer_handle.read(cx); + let text = buffer.as_rope().clone(); + let line_ending = buffer.line_ending(); + let version = buffer.version(); + if buffer.file().is_some_and(|file| !file.is_created()) { + has_changed_file = true; + } + + let save = worktree.update(cx, |worktree, cx| { + worktree.write_file(path.as_ref(), text, line_ending, cx) + }); + + cx.spawn(move |this, mut cx| async move { + let new_file = save.await?; + let mtime = new_file.mtime; + buffer_handle.update(&mut cx, |buffer, cx| { + if has_changed_file { + buffer.file_updated(new_file, cx); + } + buffer.did_save(version.clone(), mtime, cx); + })?; + this.update(&mut cx, |_, cx| { + cx.emit(BufferStoreEvent::BufferSaved { + buffer: buffer_handle, + has_changed_file, + saved_version: version, + }) + })?; + Ok(()) + }) + } + + fn save_remote_buffer( + &self, + buffer_handle: Model, + new_path: Option, + tree: &RemoteWorktree, + cx: &ModelContext, + ) -> Task> { + let buffer = buffer_handle.read(cx); + let buffer_id = buffer.remote_id().into(); + let version = buffer.version(); + let rpc = tree.client(); + let project_id = tree.project_id(); + cx.spawn(move |_, mut cx| async move { + let response = rpc + .request(proto::SaveBuffer { + project_id, + buffer_id, + new_path, + version: serialize_version(&version), + }) + .await?; + let version = deserialize_version(&response.version); + let mtime = response.mtime.map(|mtime| mtime.into()); + + buffer_handle.update(&mut cx, |buffer, cx| { + buffer.did_save(version.clone(), mtime, cx); + })?; + + Ok(()) + }) + } + + fn add_buffer(&mut self, buffer: Model, cx: &mut ModelContext) -> Result<()> { + let remote_id = buffer.read(cx).remote_id(); + let is_remote = buffer.read(cx).replica_id() != 0; + let open_buffer = if self.retain_buffers { + OpenBuffer::Strong(buffer.clone()) + } else { + OpenBuffer::Weak(buffer.downgrade()) + }; + + match self.opened_buffers.entry(remote_id) { + hash_map::Entry::Vacant(entry) => { + entry.insert(open_buffer); + } + hash_map::Entry::Occupied(mut entry) => { + if let OpenBuffer::Operations(operations) = entry.get_mut() { + buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?; + } else if entry.get().upgrade().is_some() { + if is_remote { + return Ok(()); + } else { + debug_panic!("buffer {} was already registered", remote_id); + Err(anyhow!("buffer {} was already registered", remote_id))?; + } + } + entry.insert(open_buffer); + } + } + + if let Some(senders) = self.remote_buffer_listeners.remove(&remote_id) { + for sender in senders { + sender.send(Ok(buffer.clone())).ok(); + } + } + + if let Some(file) = File::from_dyn(buffer.read(cx).file()) { + if file.is_local { + self.local_buffer_ids_by_path.insert( + ProjectPath { + worktree_id: file.worktree_id(cx), + path: file.path.clone(), + }, + remote_id, + ); + + if let Some(entry_id) = file.entry_id { + self.local_buffer_ids_by_entry_id + .insert(entry_id, remote_id); + } + } + } + + cx.emit(BufferStoreEvent::BufferAdded(buffer)); + Ok(()) + } + + pub fn buffers(&self) -> impl '_ + Iterator> { + self.opened_buffers + .values() + .filter_map(|buffer| buffer.upgrade()) + } + + pub fn loading_buffers( + &self, + ) -> impl Iterator< + Item = ( + &ProjectPath, + postage::watch::Receiver, Arc>>>, + ), + > { + self.loading_buffers_by_path + .iter() + .map(|(path, rx)| (path, rx.clone())) + } + + pub fn get_by_path(&self, path: &ProjectPath, cx: &AppContext) -> Option> { + self.buffers().find_map(|buffer| { + let file = File::from_dyn(buffer.read(cx).file())?; + if file.worktree_id(cx) == path.worktree_id && &file.path == &path.path { + Some(buffer) + } else { + None + } + }) + } + + pub fn get(&self, buffer_id: BufferId) -> Option> { + self.opened_buffers + .get(&buffer_id) + .and_then(|buffer| buffer.upgrade()) + } + + pub fn get_existing(&self, buffer_id: BufferId) -> Result> { + self.get(buffer_id) + .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id)) + } + + pub fn get_possibly_incomplete(&self, buffer_id: BufferId) -> Option> { + self.get(buffer_id) + .or_else(|| self.loading_remote_buffers_by_id.get(&buffer_id).cloned()) + } + + fn get_or_remove_by_path( + &mut self, + entry_id: ProjectEntryId, + project_path: &ProjectPath, + ) -> Option<(BufferId, Model)> { + let buffer_id = match self.local_buffer_ids_by_entry_id.get(&entry_id) { + Some(&buffer_id) => buffer_id, + None => match self.local_buffer_ids_by_path.get(project_path) { + Some(&buffer_id) => buffer_id, + None => { + return None; + } + }, + }; + let buffer = if let Some(buffer) = self.get(buffer_id) { + buffer + } else { + self.opened_buffers.remove(&buffer_id); + self.local_buffer_ids_by_path.remove(project_path); + self.local_buffer_ids_by_entry_id.remove(&entry_id); + return None; + }; + Some((buffer_id, buffer)) + } + + pub fn wait_for_remote_buffer( + &mut self, + id: BufferId, + cx: &mut AppContext, + ) -> Task>> { + let buffer = self.get(id); + if let Some(buffer) = buffer { + return Task::ready(Ok(buffer)); + } + let (tx, rx) = oneshot::channel(); + self.remote_buffer_listeners.entry(id).or_default().push(tx); + cx.background_executor().spawn(async move { rx.await? }) + } + + pub fn buffer_version_info( + &self, + cx: &AppContext, + ) -> (Vec, Vec) { + let buffers = self + .buffers() + .map(|buffer| { + let buffer = buffer.read(cx); + proto::BufferVersion { + id: buffer.remote_id().into(), + version: language::proto::serialize_version(&buffer.version), + } + }) + .collect(); + let incomplete_buffer_ids = self + .loading_remote_buffers_by_id + .keys() + .copied() + .collect::>(); + (buffers, incomplete_buffer_ids) + } + + pub fn disconnected_from_host(&mut self, cx: &mut AppContext) { + self.set_retain_buffers(false, cx); + + for buffer in self.buffers() { + buffer.update(cx, |buffer, cx| { + buffer.set_capability(Capability::ReadOnly, cx) + }); + } + + // Wake up all futures currently waiting on a buffer to get opened, + // to give them a chance to fail now that we've disconnected. + self.remote_buffer_listeners.clear(); + } + + pub fn set_retain_buffers(&mut self, retain_buffers: bool, cx: &mut AppContext) { + self.retain_buffers = retain_buffers; + for open_buffer in self.opened_buffers.values_mut() { + if retain_buffers { + if let OpenBuffer::Weak(buffer) = open_buffer { + if let Some(buffer) = buffer.upgrade() { + *open_buffer = OpenBuffer::Strong(buffer); + } + } + } else { + if let Some(buffer) = open_buffer.upgrade() { + buffer.update(cx, |buffer, _| buffer.give_up_waiting()); + } + if let OpenBuffer::Strong(buffer) = open_buffer { + *open_buffer = OpenBuffer::Weak(buffer.downgrade()); + } + } + } + } + + pub fn discard_incomplete(&mut self) { + self.opened_buffers + .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_))); + } + + pub fn file_changed( + &mut self, + path: Arc, + entry_id: ProjectEntryId, + worktree_handle: &Model, + snapshot: &worktree::Snapshot, + cx: &mut ModelContext, + ) -> Option<(Model, Arc, Arc)> { + let (buffer_id, buffer) = self.get_or_remove_by_path( + entry_id, + &ProjectPath { + worktree_id: snapshot.id(), + path, + }, + )?; + + let result = buffer.update(cx, |buffer, cx| { + let old_file = File::from_dyn(buffer.file())?; + if old_file.worktree != *worktree_handle { + return None; + } + + let new_file = if let Some(entry) = old_file + .entry_id + .and_then(|entry_id| snapshot.entry_for_id(entry_id)) + { + File { + is_local: true, + entry_id: Some(entry.id), + mtime: entry.mtime, + path: entry.path.clone(), + worktree: worktree_handle.clone(), + is_deleted: false, + is_private: entry.is_private, + } + } else if let Some(entry) = snapshot.entry_for_path(old_file.path.as_ref()) { + File { + is_local: true, + entry_id: Some(entry.id), + mtime: entry.mtime, + path: entry.path.clone(), + worktree: worktree_handle.clone(), + is_deleted: false, + is_private: entry.is_private, + } + } else { + File { + is_local: true, + entry_id: old_file.entry_id, + path: old_file.path.clone(), + mtime: old_file.mtime, + worktree: worktree_handle.clone(), + is_deleted: true, + is_private: old_file.is_private, + } + }; + + if new_file == *old_file { + return None; + } + + let old_file = Arc::new(old_file.clone()); + let new_file = Arc::new(new_file); + buffer.file_updated(new_file.clone(), cx); + Some((cx.handle(), old_file, new_file)) + }); + + if let Some((buffer, old_file, new_file)) = &result { + if new_file.path != old_file.path { + self.local_buffer_ids_by_path.remove(&ProjectPath { + path: old_file.path.clone(), + worktree_id: old_file.worktree_id(cx), + }); + self.local_buffer_ids_by_path.insert( + ProjectPath { + worktree_id: new_file.worktree_id(cx), + path: new_file.path.clone(), + }, + buffer_id, + ); + cx.emit(BufferStoreEvent::BufferChangedFilePath { + buffer: buffer.clone(), + old_file: Some(old_file.clone()), + }); + } + + if new_file.entry_id != old_file.entry_id { + if let Some(entry_id) = old_file.entry_id { + self.local_buffer_ids_by_entry_id.remove(&entry_id); + } + if let Some(entry_id) = new_file.entry_id { + self.local_buffer_ids_by_entry_id + .insert(entry_id, buffer_id); + } + } + } + + result + } + + pub fn buffer_changed_file( + &mut self, + buffer: Model, + cx: &mut AppContext, + ) -> Option<()> { + let file = File::from_dyn(buffer.read(cx).file())?; + + let remote_id = buffer.read(cx).remote_id(); + if let Some(entry_id) = file.entry_id { + match self.local_buffer_ids_by_entry_id.get(&entry_id) { + Some(_) => { + return None; + } + None => { + self.local_buffer_ids_by_entry_id + .insert(entry_id, remote_id); + } + } + }; + self.local_buffer_ids_by_path.insert( + ProjectPath { + worktree_id: file.worktree_id(cx), + path: file.path.clone(), + }, + remote_id, + ); + + Some(()) + } + + pub async fn create_buffer_for_peer( + this: Model, + peer_id: PeerId, + buffer_id: BufferId, + project_id: u64, + client: AnyProtoClient, + cx: &mut AsyncAppContext, + ) -> Result<()> { + let Some(buffer) = this.update(cx, |this, _| this.get(buffer_id))? else { + return Ok(()); + }; + + let operations = buffer.update(cx, |b, cx| b.serialize_ops(None, cx))?; + let operations = operations.await; + let state = buffer.update(cx, |buffer, _| buffer.to_proto())?; + + let initial_state = proto::CreateBufferForPeer { + project_id, + peer_id: Some(peer_id), + variant: Some(proto::create_buffer_for_peer::Variant::State(state)), + }; + + if client.send(initial_state).log_err().is_some() { + let client = client.clone(); + cx.background_executor() + .spawn(async move { + let mut chunks = split_operations(operations).peekable(); + while let Some(chunk) = chunks.next() { + let is_last = chunks.peek().is_none(); + client.send(proto::CreateBufferForPeer { + project_id, + peer_id: Some(peer_id), + variant: Some(proto::create_buffer_for_peer::Variant::Chunk( + proto::BufferChunk { + buffer_id: buffer_id.into(), + operations: chunk, + is_last, + }, + )), + })?; + } + anyhow::Ok(()) + }) + .await + .log_err(); + } + Ok(()) + } + + pub fn handle_update_buffer( + &mut self, + envelope: TypedEnvelope, + is_remote: bool, + cx: &mut AppContext, + ) -> Result { + let payload = envelope.payload.clone(); + let buffer_id = BufferId::new(payload.buffer_id)?; + let ops = payload + .operations + .into_iter() + .map(language::proto::deserialize_operation) + .collect::, _>>()?; + match self.opened_buffers.entry(buffer_id) { + hash_map::Entry::Occupied(mut e) => match e.get_mut() { + OpenBuffer::Strong(buffer) => { + buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?; + } + OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops), + OpenBuffer::Weak(_) => {} + }, + hash_map::Entry::Vacant(e) => { + if !is_remote { + debug_panic!( + "received buffer update from {:?}", + envelope.original_sender_id + ); + return Err(anyhow!("received buffer update for non-remote project")); + } + e.insert(OpenBuffer::Operations(ops)); + } + } + Ok(proto::Ack {}) + } + + pub fn handle_create_buffer_for_peer( + &mut self, + envelope: TypedEnvelope, + mut worktrees: impl Iterator>, + replica_id: u16, + capability: Capability, + cx: &mut ModelContext, + ) -> Result<()> { + match envelope + .payload + .variant + .ok_or_else(|| anyhow!("missing variant"))? + { + proto::create_buffer_for_peer::Variant::State(mut state) => { + let buffer_id = BufferId::new(state.id)?; + + let buffer_result = maybe!({ + let mut buffer_file = None; + if let Some(file) = state.file.take() { + let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id); + let worktree = worktrees + .find(|worktree| worktree.read(cx).id() == worktree_id) + .ok_or_else(|| { + anyhow!("no worktree found for id {}", file.worktree_id) + })?; + buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?) + as Arc); + } + Buffer::from_proto(replica_id, capability, state, buffer_file) + }); + + match buffer_result { + Ok(buffer) => { + let buffer = cx.new_model(|_| buffer); + self.loading_remote_buffers_by_id.insert(buffer_id, buffer); + } + Err(error) => { + if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) { + for listener in listeners { + listener.send(Err(anyhow!(error.cloned()))).ok(); + } + } + } + } + } + proto::create_buffer_for_peer::Variant::Chunk(chunk) => { + let buffer_id = BufferId::new(chunk.buffer_id)?; + let buffer = self + .loading_remote_buffers_by_id + .get(&buffer_id) + .cloned() + .ok_or_else(|| { + anyhow!( + "received chunk for buffer {} without initial state", + chunk.buffer_id + ) + })?; + + let result = maybe!({ + let operations = chunk + .operations + .into_iter() + .map(language::proto::deserialize_operation) + .collect::>>()?; + buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx)) + }); + + if let Err(error) = result { + self.loading_remote_buffers_by_id.remove(&buffer_id); + if let Some(listeners) = self.remote_buffer_listeners.remove(&buffer_id) { + for listener in listeners { + listener.send(Err(error.cloned())).ok(); + } + } + } else if chunk.is_last { + self.loading_remote_buffers_by_id.remove(&buffer_id); + self.add_buffer(buffer, cx)?; + } + } + } + + Ok(()) + } + + pub async fn handle_save_buffer( + this: Model, + project_id: u64, + worktree: Option>, + envelope: TypedEnvelope, + mut cx: AsyncAppContext, + ) -> Result { + let buffer_id = BufferId::new(envelope.payload.buffer_id)?; + let buffer = this.update(&mut cx, |this, _| this.get_existing(buffer_id))??; + buffer + .update(&mut cx, |buffer, _| { + buffer.wait_for_version(deserialize_version(&envelope.payload.version)) + })? + .await?; + let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?; + + if let Some(new_path) = envelope.payload.new_path { + let worktree = worktree.context("no such worktree")?; + let new_path = ProjectPath::from_proto(new_path); + this.update(&mut cx, |this, cx| { + this.save_buffer_as(buffer.clone(), new_path, worktree, cx) + })? + .await?; + } else { + this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))? + .await?; + } + + buffer.update(&mut cx, |buffer, _| proto::BufferSaved { + project_id, + buffer_id: buffer_id.into(), + version: serialize_version(buffer.saved_version()), + mtime: buffer.saved_mtime().map(|time| time.into()), + }) + } + + pub async fn wait_for_loading_buffer( + mut receiver: postage::watch::Receiver, Arc>>>, + ) -> Result, Arc> { + loop { + if let Some(result) = receiver.borrow().as_ref() { + match result { + Ok(buffer) => return Ok(buffer.to_owned()), + Err(e) => return Err(e.to_owned()), + } + } + receiver.next().await; + } + } +} + +impl OpenBuffer { + fn upgrade(&self) -> Option> { + match self { + OpenBuffer::Strong(handle) => Some(handle.clone()), + OpenBuffer::Weak(handle) => handle.upgrade(), + OpenBuffer::Operations(_) => None, + } + } +} + +fn is_not_found_error(error: &anyhow::Error) -> bool { + error + .root_cause() + .downcast_ref::() + .is_some_and(|err| err.kind() == io::ErrorKind::NotFound) +} diff --git a/crates/project/src/lsp_command.rs b/crates/project/src/lsp_command.rs index a13de4a7d9..a06af063ac 100644 --- a/crates/project/src/lsp_command.rs +++ b/crates/project/src/lsp_command.rs @@ -410,16 +410,18 @@ impl LspCommand for PerformRename { message: proto::PerformRenameResponse, project: Model, _: Model, - mut cx: AsyncAppContext, + cx: AsyncAppContext, ) -> Result { let message = message .transaction .ok_or_else(|| anyhow!("missing transaction"))?; - project - .update(&mut cx, |project, cx| { - project.deserialize_project_transaction(message, self.push_to_history, cx) - })? - .await + Project::deserialize_project_transaction( + project.downgrade(), + message, + self.push_to_history, + cx, + ) + .await } fn buffer_id_from_proto(message: &proto::PerformRename) -> Result { diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 15db5d3bb1..69c31c8929 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1,3 +1,4 @@ +pub mod buffer_store; pub mod connection_manager; pub mod debounced_delay; pub mod lsp_command; @@ -15,20 +16,17 @@ mod yarn; use anyhow::{anyhow, bail, Context as _, Result}; use async_trait::async_trait; +use buffer_store::{BufferStore, BufferStoreEvent}; use client::{ proto, Client, Collaborator, DevServerProjectId, PendingEntitySubscription, ProjectId, TypedEnvelope, UserStore, }; use clock::ReplicaId; -use collections::{btree_map, hash_map, BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; +use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; use debounced_delay::DebouncedDelay; use futures::{ - channel::{ - mpsc::{self, UnboundedReceiver}, - oneshot, - }, + channel::mpsc::{self, UnboundedReceiver}, future::{join_all, try_join_all, Shared}, - prelude::future::BoxFuture, select, stream::FuturesUnordered, AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt, @@ -54,8 +52,8 @@ use language::{ range_from_lsp, Bias, Buffer, BufferSnapshot, CachedLspAdapter, Capability, CodeLabel, ContextProvider, Diagnostic, DiagnosticEntry, DiagnosticSet, Diff, Documentation, Event as BufferEvent, File as _, Language, LanguageRegistry, LanguageServerName, LocalFile, - LspAdapterDelegate, Operation, Patch, PendingLanguageServer, PointUtf16, TextBufferSnapshot, - ToOffset, ToPointUtf16, Transaction, Unclipped, + LspAdapterDelegate, Patch, PendingLanguageServer, PointUtf16, TextBufferSnapshot, ToOffset, + ToPointUtf16, Transaction, Unclipped, }; use log::error; use lsp::{ @@ -75,7 +73,7 @@ use postage::watch; use prettier_support::{DefaultPrettier, PrettierInstance}; use project_settings::{DirenvSettings, LspSettings, ProjectSettings}; use rand::prelude::*; -use rpc::{ErrorCode, ErrorExt as _}; +use rpc::ErrorCode; use search::SearchQuery; use search_history::SearchHistory; use serde::Serialize; @@ -93,7 +91,7 @@ use std::{ env, ffi::OsStr, hash::Hash, - io, iter, mem, + iter, mem, num::NonZeroU32, ops::Range, path::{self, Component, Path, PathBuf}, @@ -116,7 +114,7 @@ use util::{ debug_panic, defer, maybe, merge_json_value_into, parse_env_output, post_inc, NumericPrefixWithSuffix, ResultExt, TryFutureExt as _, }; -use worktree::{CreatedEntry, RemoteWorktreeClient, Snapshot, Traversal}; +use worktree::{CreatedEntry, Snapshot, Traversal}; use yarn::YarnPathStore; pub use fs::*; @@ -200,21 +198,12 @@ pub struct Project { client_state: ProjectClientState, collaborators: HashMap, client_subscriptions: Vec, + buffer_store: Model, _subscriptions: Vec, - loading_buffers: HashMap, anyhow::Error>>>>, - incomplete_remote_buffers: HashMap>, shared_buffers: HashMap>, #[allow(clippy::type_complexity)] - loading_buffers_by_path: HashMap< - ProjectPath, - postage::watch::Receiver, Arc>>>, - >, - #[allow(clippy::type_complexity)] loading_local_worktrees: HashMap, Shared, Arc>>>>, - opened_buffers: HashMap, - local_buffer_ids_by_path: HashMap, - local_buffer_ids_by_entry_id: HashMap, buffer_snapshots: HashMap>>, // buffer_id -> server_id -> vec of snapshots buffers_being_formatted: HashSet, buffers_needing_diff: HashSet>, @@ -269,12 +258,6 @@ enum LocalProjectUpdate { }, } -enum OpenBuffer { - Strong(Model), - Weak(WeakModel), - Operations(Vec), -} - #[derive(Clone)] enum WorktreeHandle { Strong(Model), @@ -731,23 +714,24 @@ impl Project { let global_snippets_dir = paths::config_dir().join("snippets"); let snippets = SnippetProvider::new(fs.clone(), BTreeSet::from_iter([global_snippets_dir]), cx); + + let buffer_store = cx.new_model(|_| BufferStore::new(false)); + cx.subscribe(&buffer_store, Self::on_buffer_store_event) + .detach(); + let yarn = YarnPathStore::new(fs.clone(), cx); + Self { worktrees: Vec::new(), worktrees_reordered: false, buffer_ordered_messages_tx: tx, collaborators: Default::default(), - opened_buffers: Default::default(), + buffer_store, shared_buffers: Default::default(), - loading_buffers_by_path: Default::default(), loading_local_worktrees: Default::default(), - local_buffer_ids_by_path: Default::default(), - local_buffer_ids_by_entry_id: Default::default(), buffer_snapshots: Default::default(), join_project_response_message_id: 0, client_state: ProjectClientState::Local, - loading_buffers: HashMap::default(), - incomplete_remote_buffers: HashMap::default(), client_subscriptions: Vec::new(), _subscriptions: vec![ cx.observe_global::(Self::on_settings_changed), @@ -864,13 +848,8 @@ impl Project { // That's because Worktree's identifier is entity id, which should probably be changed. let mut worktrees = Vec::new(); for worktree in response.payload.worktrees { - let worktree = Worktree::remote( - remote_id, - replica_id, - worktree, - Box::new(CollabRemoteWorktreeClient(client.clone())), - cx, - ); + let worktree = + Worktree::remote(remote_id, replica_id, worktree, client.clone().into(), cx); worktrees.push(worktree); } @@ -878,17 +857,17 @@ impl Project { cx.spawn(move |this, cx| Self::send_buffer_ordered_messages(this, rx, cx)) .detach(); + let buffer_store = cx.new_model(|_| BufferStore::new(true)); + cx.subscribe(&buffer_store, Self::on_buffer_store_event) + .detach(); + let mut this = Self { worktrees: Vec::new(), worktrees_reordered: false, buffer_ordered_messages_tx: tx, - loading_buffers_by_path: Default::default(), - loading_buffers: Default::default(), + buffer_store, shared_buffers: Default::default(), - incomplete_remote_buffers: Default::default(), loading_local_worktrees: Default::default(), - local_buffer_ids_by_path: Default::default(), - local_buffer_ids_by_entry_id: Default::default(), active_entry: None, collaborators: Default::default(), join_project_response_message_id: response.message_id, @@ -939,7 +918,6 @@ impl Project { last_workspace_edits_by_language_server: Default::default(), language_server_watched_paths: HashMap::default(), language_server_watcher_registrations: HashMap::default(), - opened_buffers: Default::default(), buffers_being_formatted: Default::default(), buffers_needing_diff: Default::default(), git_diff_debouncer: DebouncedDelay::new(), @@ -1142,22 +1120,20 @@ impl Project { fn on_settings_changed(&mut self, cx: &mut ModelContext) { let mut language_servers_to_start = Vec::new(); let mut language_formatters_to_check = Vec::new(); - for buffer in self.opened_buffers.values() { - if let Some(buffer) = buffer.upgrade() { - let buffer = buffer.read(cx); - let buffer_file = File::from_dyn(buffer.file()); - let buffer_language = buffer.language(); - let settings = language_settings(buffer_language, buffer.file(), cx); - if let Some(language) = buffer_language { - if settings.enable_language_server { - if let Some(file) = buffer_file { - language_servers_to_start - .push((file.worktree.clone(), Arc::clone(language))); - } + for buffer in self.buffer_store.read(cx).buffers() { + let buffer = buffer.read(cx); + let buffer_file = File::from_dyn(buffer.file()); + let buffer_language = buffer.language(); + let settings = language_settings(buffer_language, buffer.file(), cx); + if let Some(language) = buffer_language { + if settings.enable_language_server { + if let Some(file) = buffer_file { + language_servers_to_start + .push((file.worktree.clone(), Arc::clone(language))); } - language_formatters_to_check - .push((buffer_file.map(|f| f.worktree_id(cx)), settings.clone())); } + language_formatters_to_check + .push((buffer_file.map(|f| f.worktree_id(cx)), settings.clone())); } } @@ -1243,10 +1219,8 @@ impl Project { cx.notify(); } - pub fn buffer_for_id(&self, remote_id: BufferId) -> Option> { - self.opened_buffers - .get(&remote_id) - .and_then(|buffer| buffer.upgrade()) + pub fn buffer_for_id(&self, remote_id: BufferId, cx: &AppContext) -> Option> { + self.buffer_store.read(cx).get(remote_id) } pub fn languages(&self) -> &Arc { @@ -1265,30 +1239,16 @@ impl Project { self.node.as_ref() } - pub fn opened_buffers(&self) -> Vec> { - self.opened_buffers - .values() - .filter_map(|b| b.upgrade()) - .collect() + pub fn opened_buffers(&self, cx: &AppContext) -> Vec> { + self.buffer_store.read(cx).buffers().collect() } #[cfg(any(test, feature = "test-support"))] pub fn has_open_buffer(&self, path: impl Into, cx: &AppContext) -> bool { - let path = path.into(); - if let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) { - self.opened_buffers.iter().any(|(_, buffer)| { - if let Some(buffer) = buffer.upgrade() { - if let Some(file) = File::from_dyn(buffer.read(cx).file()) { - if file.worktree == worktree && file.path() == &path.path { - return true; - } - } - } - false - }) - } else { - false - } + self.buffer_store + .read(cx) + .get_by_path(&path.into(), cx) + .is_some() } pub fn fs(&self) -> &Arc { @@ -1544,17 +1504,9 @@ impl Project { .set_model(&cx.handle(), &mut cx.to_async()), ); - for open_buffer in self.opened_buffers.values_mut() { - match open_buffer { - OpenBuffer::Strong(_) => {} - OpenBuffer::Weak(buffer) => { - if let Some(buffer) = buffer.upgrade() { - *open_buffer = OpenBuffer::Strong(buffer); - } - } - OpenBuffer::Operations(_) => unreachable!(), - } - } + self.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.set_retain_buffers(true, cx) + }); for worktree_handle in self.worktrees.iter_mut() { match worktree_handle { @@ -1654,58 +1606,30 @@ impl Project { })??; } LocalProjectUpdate::CreateBufferForPeer { peer_id, buffer_id } => { - let buffer = this.update(&mut cx, |this, _| { - let buffer = this.opened_buffers.get(&buffer_id).unwrap(); - let shared_buffers = - this.shared_buffers.entry(peer_id).or_default(); - if shared_buffers.insert(buffer_id) { - if let OpenBuffer::Strong(buffer) = buffer { - Some(buffer.clone()) - } else { - None - } + let Some(buffer_store) = this.update(&mut cx, |this, _| { + if this + .shared_buffers + .entry(peer_id) + .or_default() + .insert(buffer_id) + { + Some(this.buffer_store.clone()) } else { None } - })?; - - let Some(buffer) = buffer else { continue }; - let operations = - buffer.update(&mut cx, |b, cx| b.serialize_ops(None, cx))?; - let operations = operations.await; - let state = buffer.update(&mut cx, |buffer, _| buffer.to_proto())?; - - let initial_state = proto::CreateBufferForPeer { - project_id, - peer_id: Some(peer_id), - variant: Some(proto::create_buffer_for_peer::Variant::State(state)), + })? + else { + continue; }; - if client.send(initial_state).log_err().is_some() { - let client = client.clone(); - cx.background_executor() - .spawn(async move { - let mut chunks = split_operations(operations).peekable(); - while let Some(chunk) = chunks.next() { - let is_last = chunks.peek().is_none(); - client.send(proto::CreateBufferForPeer { - project_id, - peer_id: Some(peer_id), - variant: Some( - proto::create_buffer_for_peer::Variant::Chunk( - proto::BufferChunk { - buffer_id: buffer_id.into(), - operations: chunk, - is_last, - }, - ), - ), - })?; - } - anyhow::Ok(()) - }) - .await - .log_err(); - } + BufferStore::create_buffer_for_peer( + buffer_store, + peer_id, + buffer_id, + project_id, + client.clone().into(), + &mut cx, + ) + .await?; } } } @@ -1807,16 +1731,9 @@ impl Project { } } - for open_buffer in self.opened_buffers.values_mut() { - // Wake up any tasks waiting for peers' edits to this buffer. - if let Some(buffer) = open_buffer.upgrade() { - buffer.update(cx, |buffer, _| buffer.give_up_waiting()); - } - - if let OpenBuffer::Strong(buffer) = open_buffer { - *open_buffer = OpenBuffer::Weak(buffer.downgrade()); - } - } + self.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.set_retain_buffers(false, cx) + }); self.client .send(proto::UnshareProject { @@ -1852,7 +1769,7 @@ impl Project { } *capability = new_capability; - for buffer in self.opened_buffers() { + for buffer in self.opened_buffers(cx) { buffer.update(cx, |buffer, cx| buffer.set_capability(new_capability, cx)); } } @@ -1878,24 +1795,9 @@ impl Project { } } - for open_buffer in self.opened_buffers.values_mut() { - // Wake up any tasks waiting for peers' edits to this buffer. - if let Some(buffer) = open_buffer.upgrade() { - buffer.update(cx, |buffer, cx| { - buffer.give_up_waiting(); - buffer.set_capability(Capability::ReadOnly, cx) - }); - } - - if let OpenBuffer::Strong(buffer) = open_buffer { - *open_buffer = OpenBuffer::Weak(buffer.downgrade()); - } - } - - // Wake up all futures currently waiting on a buffer to get opened, - // to give them a chance to fail now that we've disconnected. - self.loading_buffers.clear(); - // self.opened_buffer.send(OpenedBufferEvent::Disconnected); + self.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.disconnected_from_host(cx) + }); } } @@ -1936,22 +1838,16 @@ impl Project { } pub fn create_buffer(&mut self, cx: &mut ModelContext) -> Task>> { - if self.is_remote() { - let create = self.client.request(proto::OpenNewBuffer { - project_id: self.remote_id().unwrap(), - }); - cx.spawn(|this, mut cx| async move { - let response = create.await?; - let buffer_id = BufferId::new(response.buffer_id)?; - - this.update(&mut cx, |this, cx| { - this.wait_for_remote_buffer(buffer_id, cx) - })? - .await - }) - } else { - Task::ready(Ok(self.create_local_buffer("", None, cx))) - } + self.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.create_buffer( + if self.is_remote() { + Some((self.client.clone().into(), self.remote_id().unwrap())) + } else { + None + }, + cx, + ) + }) } pub fn create_local_buffer( @@ -1963,13 +1859,9 @@ impl Project { if self.is_remote() { panic!("called create_local_buffer on a remote project") } - let buffer = cx.new_model(|cx| { - Buffer::local(text, cx) - .with_language(language.unwrap_or_else(|| language::PLAIN_TEXT.clone()), cx) - }); - self.register_buffer(&buffer, cx) - .expect("creating local buffers always succeeds"); - buffer + self.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.create_local_buffer(text, language, cx) + }) } pub fn open_path( @@ -2037,133 +1929,12 @@ impl Project { return Task::ready(Err(anyhow!("no such worktree"))); }; - // If there is already a buffer for the given path, then return it. - let existing_buffer = self.get_open_buffer(&project_path, cx); - if let Some(existing_buffer) = existing_buffer { - return Task::ready(Ok(existing_buffer)); - } - - let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) { - // If the given path is already being loaded, then wait for that existing - // task to complete and return the same buffer. - hash_map::Entry::Occupied(e) => e.get().clone(), - - // Otherwise, record the fact that this path is now being loaded. - hash_map::Entry::Vacant(entry) => { - let (mut tx, rx) = postage::watch::channel(); - entry.insert(rx.clone()); - - let project_path = project_path.clone(); - let load_buffer = if worktree.read(cx).is_local() { - self.open_local_buffer_internal(project_path.path.clone(), worktree, cx) - } else { - self.open_remote_buffer_internal(&project_path.path, &worktree, cx) - }; - - cx.spawn(move |this, mut cx| async move { - let load_result = load_buffer.await; - *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| { - // Record the fact that the buffer is no longer loading. - this.loading_buffers_by_path.remove(&project_path); - let buffer = load_result.map_err(Arc::new)?; - Ok(buffer) - })?); - anyhow::Ok(()) - }) - .detach(); - rx - } - }; - - cx.background_executor().spawn(async move { - wait_for_loading_buffer(loading_watch) - .await - .map_err(|e| e.cloned()) - }) - } - - fn open_local_buffer_internal( - &mut self, - path: Arc, - worktree: Model, - cx: &mut ModelContext, - ) -> Task>> { - let load_buffer = worktree.update(cx, |worktree, cx| { - let load_file = worktree.load_file(path.as_ref(), cx); - let reservation = cx.reserve_model(); - let buffer_id = BufferId::from(reservation.entity_id().as_non_zero_u64()); - cx.spawn(move |_, mut cx| async move { - let loaded = load_file.await?; - let text_buffer = cx - .background_executor() - .spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) }) - .await; - cx.insert_model(reservation, |_| { - Buffer::build( - text_buffer, - loaded.diff_base, - Some(loaded.file), - Capability::ReadWrite, - ) - }) - }) - }); - - cx.spawn(move |this, mut cx| async move { - let buffer = match load_buffer.await { - Ok(buffer) => Ok(buffer), - Err(error) if is_not_found_error(&error) => cx.new_model(|cx| { - let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64()); - let text_buffer = text::Buffer::new(0, buffer_id, "".into()); - Buffer::build( - text_buffer, - None, - Some(Arc::new(File { - worktree, - path, - mtime: None, - entry_id: None, - is_local: true, - is_deleted: false, - is_private: false, - })), - Capability::ReadWrite, - ) - }), - Err(e) => Err(e), - }?; - this.update(&mut cx, |this, cx| this.register_buffer(&buffer, cx))??; - Ok(buffer) - }) - } - - fn open_remote_buffer_internal( - &mut self, - path: &Arc, - worktree: &Model, - cx: &mut ModelContext, - ) -> Task>> { - let rpc = self.client.clone(); - let project_id = self.remote_id().unwrap(); - let remote_worktree_id = worktree.read(cx).id(); - let path = path.clone(); - let path_string = path.to_string_lossy().to_string(); - if self.is_disconnected() { + if self.is_remote() && self.is_disconnected() { return Task::ready(Err(anyhow!(ErrorCode::Disconnected))); } - cx.spawn(move |this, mut cx| async move { - let response = rpc - .request(proto::OpenBufferByPath { - project_id, - worktree_id: remote_worktree_id.to_proto(), - path: path_string, - }) - .await?; - let buffer_id = BufferId::new(response.buffer_id)?; - this.update(&mut cx, |this, cx| { - this.wait_for_remote_buffer(buffer_id, cx) - })? - .await + + self.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.open_buffer(project_path, worktree, cx) }) } @@ -2246,7 +2017,7 @@ impl Project { id: BufferId, cx: &mut ModelContext, ) -> Task>> { - if let Some(buffer) = self.buffer_for_id(id) { + if let Some(buffer) = self.buffer_for_id(id, cx) { Task::ready(Ok(buffer)) } else if self.is_local() { Task::ready(Err(anyhow!("buffer {} does not exist", id))) @@ -2287,16 +2058,8 @@ impl Project { buffer: Model, cx: &mut ModelContext, ) -> Task> { - let Some(file) = File::from_dyn(buffer.read(cx).file()) else { - return Task::ready(Err(anyhow!("buffer doesn't have a file"))); - }; - let worktree = file.worktree.clone(); - let path = file.path.clone(); - if self.is_local() { - self.save_local_buffer(worktree, buffer, path, false, cx) - } else { - self.save_remote_buffer(buffer, None, cx) - } + self.buffer_store + .update(cx, |buffer_store, cx| buffer_store.save_buffer(buffer, cx)) } pub fn save_buffer_as( @@ -2305,121 +2068,11 @@ impl Project { path: ProjectPath, cx: &mut ModelContext, ) -> Task> { - let old_file = File::from_dyn(buffer.read(cx).file()).cloned(); let Some(worktree) = self.worktree_for_id(path.worktree_id, cx) else { return Task::ready(Err(anyhow!("worktree does not exist"))); }; - - cx.spawn(move |this, mut cx| async move { - this.update(&mut cx, |this, cx| { - if this.is_local() { - if let Some(old_file) = &old_file { - this.unregister_buffer_from_language_servers(&buffer, old_file, cx); - } - this.save_local_buffer(worktree, buffer.clone(), path.path, true, cx) - } else { - this.save_remote_buffer(buffer.clone(), Some(path.to_proto()), cx) - } - })? - .await?; - - this.update(&mut cx, |this, cx| { - this.detect_language_for_buffer(&buffer, cx); - this.register_buffer_with_language_servers(&buffer, cx); - })?; - Ok(()) - }) - } - - pub fn save_local_buffer( - &self, - worktree: Model, - buffer_handle: Model, - path: Arc, - mut has_changed_file: bool, - cx: &mut ModelContext, - ) -> Task> { - let buffer = buffer_handle.read(cx); - let buffer_id = buffer.remote_id(); - let text = buffer.as_rope().clone(); - let line_ending = buffer.line_ending(); - let version = buffer.version(); - if buffer.file().is_some_and(|file| !file.is_created()) { - has_changed_file = true; - } - - let save = worktree.update(cx, |worktree, cx| { - worktree.write_file(path.as_ref(), text, line_ending, cx) - }); - - let client = self.client.clone(); - let project_id = self.remote_id(); - cx.spawn(move |_, mut cx| async move { - let new_file = save.await?; - let mtime = new_file.mtime; - if has_changed_file { - if let Some(project_id) = project_id { - client - .send(proto::UpdateBufferFile { - project_id, - buffer_id: buffer_id.into(), - file: Some(new_file.to_proto()), - }) - .log_err(); - } - - buffer_handle.update(&mut cx, |buffer, cx| { - if has_changed_file { - buffer.file_updated(new_file, cx); - } - })?; - } - - if let Some(project_id) = project_id { - client.send(proto::BufferSaved { - project_id, - buffer_id: buffer_id.into(), - version: serialize_version(&version), - mtime: mtime.map(|time| time.into()), - })?; - } - - buffer_handle.update(&mut cx, |buffer, cx| { - buffer.did_save(version.clone(), mtime, cx); - })?; - - Ok(()) - }) - } - - pub fn save_remote_buffer( - &self, - buffer_handle: Model, - new_path: Option, - cx: &mut ModelContext, - ) -> Task> { - let buffer = buffer_handle.read(cx); - let buffer_id = buffer.remote_id().into(); - let version = buffer.version(); - let rpc = self.client.clone(); - let project_id = self.remote_id(); - cx.spawn(move |_, mut cx| async move { - let response = rpc - .request(proto::SaveBuffer { - project_id: project_id.ok_or_else(|| anyhow!("project_id is not set"))?, - buffer_id, - new_path, - version: serialize_version(&version), - }) - .await?; - let version = deserialize_version(&response.version); - let mtime = response.mtime.map(|mtime| mtime.into()); - - buffer_handle.update(&mut cx, |buffer, cx| { - buffer.did_save(version.clone(), mtime, cx); - })?; - - Ok(()) + self.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.save_buffer_as(buffer.clone(), path, worktree, cx) }) } @@ -2428,16 +2081,7 @@ impl Project { path: &ProjectPath, cx: &mut ModelContext, ) -> Option> { - let worktree = self.worktree_for_id(path.worktree_id, cx)?; - self.opened_buffers.values().find_map(|buffer| { - let buffer = buffer.upgrade()?; - let file = File::from_dyn(buffer.read(cx).file())?; - if file.worktree == worktree && file.path() == &path.path { - Some(buffer) - } else { - None - } - }) + self.buffer_store.read(cx).get_by_path(path, cx) } fn register_buffer( @@ -2450,54 +2094,11 @@ impl Project { buffer.set_language_registry(self.languages.clone()) }); - let remote_id = buffer.read(cx).remote_id(); - let is_remote = self.is_remote(); - let open_buffer = if is_remote || self.is_shared() { - OpenBuffer::Strong(buffer.clone()) - } else { - OpenBuffer::Weak(buffer.downgrade()) - }; - - match self.opened_buffers.entry(remote_id) { - hash_map::Entry::Vacant(entry) => { - entry.insert(open_buffer); - } - hash_map::Entry::Occupied(mut entry) => { - if let OpenBuffer::Operations(operations) = entry.get_mut() { - buffer.update(cx, |b, cx| b.apply_ops(operations.drain(..), cx))?; - } else if entry.get().upgrade().is_some() { - if is_remote { - return Ok(()); - } else { - debug_panic!("buffer {} was already registered", remote_id); - Err(anyhow!("buffer {} was already registered", remote_id))?; - } - } - entry.insert(open_buffer); - } - } cx.subscribe(buffer, |this, buffer, event, cx| { this.on_buffer_event(buffer, event, cx); }) .detach(); - if let Some(file) = File::from_dyn(buffer.read(cx).file()) { - if file.is_local { - self.local_buffer_ids_by_path.insert( - ProjectPath { - worktree_id: file.worktree_id(cx), - path: file.path.clone(), - }, - remote_id, - ); - - if let Some(entry_id) = file.entry_id { - self.local_buffer_ids_by_entry_id - .insert(entry_id, remote_id); - } - } - } - self.detect_language_for_buffer(buffer, cx); self.register_buffer_with_language_servers(buffer, cx); cx.observe_release(buffer, |this, buffer, cx| { @@ -2519,11 +2120,6 @@ impl Project { }) .detach(); - if let Some(senders) = self.loading_buffers.remove(&remote_id) { - for sender in senders { - sender.send(Ok(buffer.clone())).ok(); - } - } Ok(()) } @@ -2617,7 +2213,7 @@ impl Project { &mut self, buffer: &Model, old_file: &File, - cx: &mut ModelContext, + cx: &mut AppContext, ) { let old_path = match old_file.as_local() { Some(local) => local.abs_path(cx), @@ -2758,6 +2354,57 @@ impl Project { Ok(()) } + fn on_buffer_store_event( + &mut self, + _: Model, + event: &BufferStoreEvent, + cx: &mut ModelContext, + ) { + match event { + BufferStoreEvent::BufferAdded(buffer) => { + self.register_buffer(buffer, cx).log_err(); + } + BufferStoreEvent::BufferChangedFilePath { buffer, old_file } => { + if let Some(old_file) = &old_file { + self.unregister_buffer_from_language_servers(&buffer, old_file, cx); + } + + self.detect_language_for_buffer(&buffer, cx); + self.register_buffer_with_language_servers(&buffer, cx); + } + BufferStoreEvent::BufferSaved { + buffer: buffer_handle, + has_changed_file, + saved_version, + } => { + let buffer = buffer_handle.read(cx); + let buffer_id = buffer.remote_id(); + let Some(new_file) = buffer.file() else { + return; + }; + if let Some(project_id) = self.remote_id() { + self.client + .send(proto::BufferSaved { + project_id, + buffer_id: buffer_id.into(), + version: serialize_version(&saved_version), + mtime: new_file.mtime().map(|time| time.into()), + }) + .log_err(); + if *has_changed_file { + self.client + .send(proto::UpdateBufferFile { + project_id, + buffer_id: buffer_id.into(), + file: Some(new_file.to_proto()), + }) + .log_err(); + } + } + } + } + } + fn on_buffer_event( &mut self, buffer: Model, @@ -2916,30 +2563,11 @@ impl Project { self.simulate_disk_based_diagnostics_events_if_needed(language_server_id, cx); } } - BufferEvent::FileHandleChanged => { - let Some(file) = File::from_dyn(buffer.read(cx).file()) else { - return None; - }; - let remote_id = buffer.read(cx).remote_id(); - if let Some(entry_id) = file.entry_id { - match self.local_buffer_ids_by_entry_id.get(&entry_id) { - Some(_) => { - return None; - } - None => { - self.local_buffer_ids_by_entry_id - .insert(entry_id, remote_id); - } - } - }; - self.local_buffer_ids_by_path.insert( - ProjectPath { - worktree_id: file.worktree_id(cx), - path: file.path.clone(), - }, - remote_id, - ); + BufferEvent::FileHandleChanged => { + self.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.buffer_changed_file(buffer, cx) + })?; } _ => {} } @@ -3103,21 +2731,20 @@ impl Project { prev_reload_count = reload_count; project .update(&mut cx, |this, cx| { - let buffers = this - .opened_buffers - .values() - .filter_map(|b| b.upgrade()) - .collect::>(); - for buffer in buffers { - if let Some(f) = File::from_dyn(buffer.read(cx).file()).cloned() - { - this.unregister_buffer_from_language_servers( - &buffer, &f, cx, - ); - buffer - .update(cx, |buffer, cx| buffer.set_language(None, cx)); + this.buffer_store.clone().update(cx, |buffer_store, cx| { + for buffer in buffer_store.buffers() { + if let Some(f) = + File::from_dyn(buffer.read(cx).file()).cloned() + { + this.unregister_buffer_from_language_servers( + &buffer, &f, cx, + ); + buffer.update(cx, |buffer, cx| { + buffer.set_language(None, cx) + }); + } } - } + }); }) .ok(); } @@ -3126,16 +2753,14 @@ impl Project { .update(&mut cx, |project, cx| { let mut plain_text_buffers = Vec::new(); let mut buffers_with_unknown_injections = Vec::new(); - for buffer in project.opened_buffers.values() { - if let Some(handle) = buffer.upgrade() { - let buffer = &handle.read(cx); - if buffer.language().is_none() - || buffer.language() == Some(&*language::PLAIN_TEXT) - { - plain_text_buffers.push(handle); - } else if buffer.contains_unknown_injections() { - buffers_with_unknown_injections.push(handle); - } + for handle in project.buffer_store.read(cx).buffers() { + let buffer = handle.read(cx); + if buffer.language().is_none() + || buffer.language() == Some(&*language::PLAIN_TEXT) + { + plain_text_buffers.push(handle); + } else if buffer.contains_unknown_injections() { + buffers_with_unknown_injections.push(handle); } } @@ -3918,8 +3543,8 @@ impl Project { } // Tell the language server about every open buffer in the worktree that matches the language. - for buffer in self.opened_buffers.values() { - if let Some(buffer_handle) = buffer.upgrade() { + self.buffer_store.update(cx, |buffer_store, cx| { + for buffer_handle in buffer_store.buffers() { let buffer = buffer_handle.read(cx); let file = match File::from_dyn(buffer.file()) { Some(file) => file, @@ -3984,7 +3609,8 @@ impl Project { ) }); } - } + anyhow::Ok(()) + })?; cx.notify(); Ok(()) @@ -4013,13 +3639,13 @@ impl Project { } } - for buffer in self.opened_buffers.values() { - if let Some(buffer) = buffer.upgrade() { + self.buffer_store.update(cx, |buffer_store, cx| { + for buffer in buffer_store.buffers() { buffer.update(cx, |buffer, cx| { buffer.update_diagnostics(server_id, Default::default(), cx); }); } - } + }); let project_id = self.remote_id(); for (worktree_id, summaries) in self.diagnostic_summaries.iter_mut() { @@ -4120,7 +3746,9 @@ impl Project { .payload .buffer_ids .into_iter() - .flat_map(|buffer_id| project.buffer_for_id(BufferId::new(buffer_id).log_err()?)) + .flat_map(|buffer_id| { + project.buffer_for_id(BufferId::new(buffer_id).log_err()?, cx) + }) .collect(); project.restart_language_servers_for_buffers(buffers, cx) })?; @@ -5025,10 +4653,7 @@ impl Project { .await? .transaction .ok_or_else(|| anyhow!("missing transaction"))?; - project_transaction = this - .update(&mut cx, |this, cx| { - this.deserialize_project_transaction(response, push_to_history, cx) - })? + Self::deserialize_project_transaction(this, response, push_to_history, cx.clone()) .await?; } @@ -5091,7 +4716,6 @@ impl Project { let remote_id = self.remote_id(); let client = self.client.clone(); cx.spawn(move |this, mut cx| async move { - let mut project_transaction = ProjectTransaction::default(); if let Some(project_id) = remote_id { let response = client .request(proto::FormatBuffers { @@ -5107,13 +4731,10 @@ impl Project { .await? .transaction .ok_or_else(|| anyhow!("missing transaction"))?; - project_transaction = this - .update(&mut cx, |this, cx| { - this.deserialize_project_transaction(response, push_to_history, cx) - })? - .await?; + Self::deserialize_project_transaction(this, response, push_to_history, cx).await + } else { + Ok(ProjectTransaction::default()) } - Ok(project_transaction) }) } } @@ -6626,16 +6247,13 @@ impl Project { buffer_id: buffer_handle.read(cx).remote_id().into(), action: Some(Self::serialize_code_action(&action)), }; - cx.spawn(move |this, mut cx| async move { + cx.spawn(move |this, cx| async move { let response = client .request(request) .await? .transaction .ok_or_else(|| anyhow!("missing transaction"))?; - this.update(&mut cx, |this, cx| { - this.deserialize_project_transaction(response, push_to_history, cx) - })? - .await + Self::deserialize_project_transaction(this, response, push_to_history, cx).await }) } else { Task::ready(Err(anyhow!("project does not have a remote id"))) @@ -7290,38 +6908,38 @@ impl Project { let workers = background.num_cpus().min(path_count); let (matching_paths_tx, matching_paths_rx) = smol::channel::bounded(1024); let mut unnamed_files = vec![]; - let opened_buffers = self - .opened_buffers - .iter() - .filter_map(|(_, b)| { - let buffer = b.upgrade()?; - let (is_ignored, snapshot) = buffer.update(cx, |buffer, cx| { - let is_ignored = buffer - .project_path(cx) - .and_then(|path| self.entry_for_path(&path, cx)) - .map_or(false, |entry| entry.is_ignored); - (is_ignored, buffer.snapshot()) - }); - if is_ignored && !query.include_ignored() { - return None; - } else if let Some(file) = snapshot.file() { - let matched_path = if include_root { - query.file_matches(Some(&file.full_path(cx))) - } else { - query.file_matches(Some(file.path())) - }; + let opened_buffers = self.buffer_store.update(cx, |buffer_store, cx| { + buffer_store + .buffers() + .filter_map(|buffer| { + let (is_ignored, snapshot) = buffer.update(cx, |buffer, cx| { + let is_ignored = buffer + .project_path(cx) + .and_then(|path| self.entry_for_path(&path, cx)) + .map_or(false, |entry| entry.is_ignored); + (is_ignored, buffer.snapshot()) + }); + if is_ignored && !query.include_ignored() { + return None; + } else if let Some(file) = snapshot.file() { + let matched_path = if include_root { + query.file_matches(Some(&file.full_path(cx))) + } else { + query.file_matches(Some(file.path())) + }; - if matched_path { - Some((file.path().clone(), (buffer, snapshot))) + if matched_path { + Some((file.path().clone(), (buffer, snapshot))) + } else { + None + } } else { + unnamed_files.push(buffer); None } - } else { - unnamed_files.push(buffer); - None - } - }) - .collect(); + }) + .collect() + }); cx.background_executor() .spawn(Self::background_search( unnamed_files, @@ -8002,119 +7620,27 @@ impl Project { cx: &mut ModelContext, ) { let snapshot = worktree_handle.read(cx).snapshot(); - - let mut renamed_buffers = Vec::new(); - for (path, entry_id, _) in changes { - let worktree_id = worktree_handle.read(cx).id(); - let project_path = ProjectPath { - worktree_id, - path: path.clone(), - }; - - let buffer_id = match self.local_buffer_ids_by_entry_id.get(entry_id) { - Some(&buffer_id) => buffer_id, - None => match self.local_buffer_ids_by_path.get(&project_path) { - Some(&buffer_id) => buffer_id, - None => { - continue; - } - }, - }; - - let open_buffer = self.opened_buffers.get(&buffer_id); - let buffer = if let Some(buffer) = open_buffer.and_then(|buffer| buffer.upgrade()) { - buffer - } else { - self.opened_buffers.remove(&buffer_id); - self.local_buffer_ids_by_path.remove(&project_path); - self.local_buffer_ids_by_entry_id.remove(entry_id); - continue; - }; - - buffer.update(cx, |buffer, cx| { - if let Some(old_file) = File::from_dyn(buffer.file()) { - if old_file.worktree != *worktree_handle { - return; - } - - let new_file = if let Some(entry) = old_file - .entry_id - .and_then(|entry_id| snapshot.entry_for_id(entry_id)) - { - File { - is_local: true, - entry_id: Some(entry.id), - mtime: entry.mtime, - path: entry.path.clone(), - worktree: worktree_handle.clone(), - is_deleted: false, - is_private: entry.is_private, - } - } else if let Some(entry) = snapshot.entry_for_path(old_file.path().as_ref()) { - File { - is_local: true, - entry_id: Some(entry.id), - mtime: entry.mtime, - path: entry.path.clone(), - worktree: worktree_handle.clone(), - is_deleted: false, - is_private: entry.is_private, - } - } else { - File { - is_local: true, - entry_id: old_file.entry_id, - path: old_file.path().clone(), - mtime: old_file.mtime(), - worktree: worktree_handle.clone(), - is_deleted: true, - is_private: old_file.is_private, - } - }; - - let old_path = old_file.abs_path(cx); - if new_file.abs_path(cx) != old_path { - renamed_buffers.push((cx.handle(), old_file.clone())); - self.local_buffer_ids_by_path.remove(&project_path); - self.local_buffer_ids_by_path.insert( - ProjectPath { - worktree_id, - path: path.clone(), - }, - buffer_id, - ); - } - - if new_file.entry_id != Some(*entry_id) { - self.local_buffer_ids_by_entry_id.remove(entry_id); - if let Some(entry_id) = new_file.entry_id { - self.local_buffer_ids_by_entry_id - .insert(entry_id, buffer_id); - } - } - - if new_file != *old_file { - if let Some(project_id) = self.remote_id() { - self.client - .send(proto::UpdateBufferFile { - project_id, - buffer_id: buffer_id.into(), - file: Some(new_file.to_proto()), - }) - .log_err(); - } - - buffer.file_updated(Arc::new(new_file), cx); + self.buffer_store.clone().update(cx, |buffer_store, cx| { + for (path, entry_id, _) in changes { + if let Some((buffer, _, new_file)) = buffer_store.file_changed( + path.clone(), + *entry_id, + worktree_handle, + &snapshot, + cx, + ) { + if let Some(project_id) = self.remote_id() { + self.client + .send(proto::UpdateBufferFile { + project_id, + buffer_id: buffer.read(cx).remote_id().into(), + file: Some(new_file.to_proto()), + }) + .log_err(); } } - }); - } - - for (buffer, old_file) in renamed_buffers { - self.unregister_buffer_from_language_servers(&buffer, &old_file, cx); - self.detect_language_for_buffer(&buffer, cx); - self.register_buffer_with_language_servers(&buffer, cx); - } + } + }); } fn update_local_worktree_language_servers( @@ -8189,8 +7715,9 @@ impl Project { // Identify the loading buffers whose containing repository that has changed. let future_buffers = self - .loading_buffers_by_path - .iter() + .buffer_store + .read(cx) + .loading_buffers() .filter_map(|(project_path, receiver)| { if project_path.worktree_id != worktree_handle.read(cx).id() { return None; @@ -8199,11 +7726,10 @@ impl Project { changed_repos .iter() .find(|(work_dir, _)| path.starts_with(work_dir))?; - let receiver = receiver.clone(); let path = path.clone(); let abs_path = worktree_handle.read(cx).absolutize(&path).ok()?; Some(async move { - wait_for_loading_buffer(receiver) + BufferStore::wait_for_loading_buffer(receiver) .await .ok() .map(|buffer| (buffer, path, abs_path)) @@ -8213,10 +7739,10 @@ impl Project { // Identify the current buffers whose containing repository has changed. let current_buffers = self - .opened_buffers - .values() + .buffer_store + .read(cx) + .buffers() .filter_map(|buffer| { - let buffer = buffer.upgrade()?; let file = File::from_dyn(buffer.read(cx).file())?; if file.worktree != worktree_handle { return None; @@ -8708,11 +8234,8 @@ impl Project { let buffer_id = BufferId::new(envelope.payload.buffer_id)?; let version = deserialize_version(&envelope.payload.version); - let buffer = this.update(&mut cx, |this, _cx| { - this.opened_buffers - .get(&buffer_id) - .and_then(|buffer| buffer.upgrade()) - .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id)) + let buffer = this.update(&mut cx, |this, cx| { + this.buffer_store.read(cx).get_existing(buffer_id) })??; buffer @@ -8738,12 +8261,8 @@ impl Project { let sender_id = envelope.original_sender_id()?; let buffer_id = BufferId::new(envelope.payload.buffer_id)?; let version = deserialize_version(&envelope.payload.version); - let buffer = project.update(&mut cx, |project, _cx| { - project - .opened_buffers - .get(&buffer_id) - .and_then(|buffer| buffer.upgrade()) - .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id)) + let buffer = project.update(&mut cx, |project, cx| { + project.buffer_store.read(cx).get_existing(buffer_id) })??; buffer .update(&mut cx, |buffer, _| { @@ -8912,8 +8431,8 @@ impl Project { } if is_host { - this.opened_buffers - .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_))); + this.buffer_store + .update(cx, |buffer_store, _| buffer_store.discard_incomplete()); this.enqueue_buffer_ordered_message(BufferOrderedMessage::Resync) .unwrap(); cx.emit(Event::HostReshared); @@ -8943,11 +8462,11 @@ impl Project { .remove(&peer_id) .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))? .replica_id; - for buffer in this.opened_buffers.values() { - if let Some(buffer) = buffer.upgrade() { + this.buffer_store.update(cx, |buffer_store, cx| { + for buffer in buffer_store.buffers() { buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx)); } - } + }); this.shared_buffers.remove(&peer_id); cx.emit(Event::CollaboratorLeft(peer_id)); @@ -9214,34 +8733,9 @@ impl Project { mut cx: AsyncAppContext, ) -> Result { this.update(&mut cx, |this, cx| { - let payload = envelope.payload.clone(); - let buffer_id = BufferId::new(payload.buffer_id)?; - let ops = payload - .operations - .into_iter() - .map(language::proto::deserialize_operation) - .collect::, _>>()?; - let is_remote = this.is_remote(); - match this.opened_buffers.entry(buffer_id) { - hash_map::Entry::Occupied(mut e) => match e.get_mut() { - OpenBuffer::Strong(buffer) => { - buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?; - } - OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops), - OpenBuffer::Weak(_) => {} - }, - hash_map::Entry::Vacant(e) => { - if !is_remote { - debug_panic!( - "received buffer update from {:?}", - envelope.original_sender_id - ); - return Err(anyhow!("received buffer update for non-remote project")); - } - e.insert(OpenBuffer::Operations(ops)); - } - } - Ok(proto::Ack {}) + this.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.handle_update_buffer(envelope, this.is_remote(), cx) + }) })? } @@ -9251,82 +8745,15 @@ impl Project { mut cx: AsyncAppContext, ) -> Result<()> { this.update(&mut cx, |this, cx| { - match envelope - .payload - .variant - .ok_or_else(|| anyhow!("missing variant"))? - { - proto::create_buffer_for_peer::Variant::State(mut state) => { - let buffer_id = BufferId::new(state.id)?; - - let buffer_result = maybe!({ - let mut buffer_file = None; - if let Some(file) = state.file.take() { - let worktree_id = WorktreeId::from_proto(file.worktree_id); - let worktree = - this.worktree_for_id(worktree_id, cx).ok_or_else(|| { - anyhow!("no worktree found for id {}", file.worktree_id) - })?; - buffer_file = - Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?) - as Arc); - } - Buffer::from_proto(this.replica_id(), this.capability(), state, buffer_file) - }); - - match buffer_result { - Ok(buffer) => { - let buffer = cx.new_model(|_| buffer); - this.incomplete_remote_buffers.insert(buffer_id, buffer); - } - Err(error) => { - if let Some(listeners) = this.loading_buffers.remove(&buffer_id) { - for listener in listeners { - listener.send(Err(anyhow!(error.cloned()))).ok(); - } - } - } - }; - } - proto::create_buffer_for_peer::Variant::Chunk(chunk) => { - let buffer_id = BufferId::new(chunk.buffer_id)?; - let buffer = this - .incomplete_remote_buffers - .get(&buffer_id) - .cloned() - .ok_or_else(|| { - anyhow!( - "received chunk for buffer {} without initial state", - chunk.buffer_id - ) - })?; - - let result = maybe!({ - let operations = chunk - .operations - .into_iter() - .map(language::proto::deserialize_operation) - .collect::>>()?; - buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx)) - }); - - if let Err(error) = result { - this.incomplete_remote_buffers.remove(&buffer_id); - if let Some(listeners) = this.loading_buffers.remove(&buffer_id) { - for listener in listeners { - listener.send(Err(error.cloned())).ok(); - } - } - } else { - if chunk.is_last { - this.incomplete_remote_buffers.remove(&buffer_id); - this.register_buffer(&buffer, cx)?; - } - } - } - } - - Ok(()) + this.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.handle_create_buffer_for_peer( + envelope, + this.worktrees(), + this.replica_id(), + this.capability(), + cx, + ) + }) })? } @@ -9339,10 +8766,9 @@ impl Project { let buffer_id = envelope.payload.buffer_id; let buffer_id = BufferId::new(buffer_id)?; if let Some(buffer) = this - .opened_buffers - .get_mut(&buffer_id) - .and_then(|b| b.upgrade()) - .or_else(|| this.incomplete_remote_buffers.get(&buffer_id).cloned()) + .buffer_store + .read(cx) + .get_possibly_incomplete(buffer_id) { buffer.update(cx, |buffer, cx| { buffer.set_diff_base(envelope.payload.diff_base, cx) @@ -9363,10 +8789,9 @@ impl Project { this.update(&mut cx, |this, cx| { let payload = envelope.payload.clone(); if let Some(buffer) = this - .opened_buffers - .get(&buffer_id) - .and_then(|b| b.upgrade()) - .or_else(|| this.incomplete_remote_buffers.get(&buffer_id).cloned()) + .buffer_store + .read(cx) + .get_possibly_incomplete(buffer_id) { let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?; let worktree = this @@ -9387,40 +8812,20 @@ impl Project { envelope: TypedEnvelope, mut cx: AsyncAppContext, ) -> Result { - let buffer_id = BufferId::new(envelope.payload.buffer_id)?; - let (project_id, buffer) = this.update(&mut cx, |this, _cx| { - let project_id = this.remote_id().ok_or_else(|| anyhow!("not connected"))?; - let buffer = this - .opened_buffers - .get(&buffer_id) - .and_then(|buffer| buffer.upgrade()) - .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?; - anyhow::Ok((project_id, buffer)) + let (buffer_store, worktree, project_id) = this.update(&mut cx, |this, cx| { + let buffer_store = this.buffer_store.clone(); + let project_id = this.remote_id().context("not connected")?; + let worktree = if let Some(path) = &envelope.payload.new_path { + Some( + this.worktree_for_id(WorktreeId::from_proto(path.worktree_id), cx) + .context("worktree does not exist")?, + ) + } else { + None + }; + anyhow::Ok((buffer_store, worktree, project_id)) })??; - buffer - .update(&mut cx, |buffer, _| { - buffer.wait_for_version(deserialize_version(&envelope.payload.version)) - })? - .await?; - let buffer_id = buffer.update(&mut cx, |buffer, _| buffer.remote_id())?; - - if let Some(new_path) = envelope.payload.new_path { - let new_path = ProjectPath::from_proto(new_path); - this.update(&mut cx, |this, cx| { - this.save_buffer_as(buffer.clone(), new_path, cx) - })? - .await?; - } else { - this.update(&mut cx, |this, cx| this.save_buffer(buffer.clone(), cx))? - .await?; - } - - buffer.update(&mut cx, |buffer, _| proto::BufferSaved { - project_id, - buffer_id: buffer_id.into(), - version: serialize_version(buffer.saved_version()), - mtime: buffer.saved_mtime().map(|time| time.into()), - }) + BufferStore::handle_save_buffer(buffer_store, project_id, worktree, envelope, cx).await } async fn handle_reload_buffers( @@ -9433,12 +8838,7 @@ impl Project { let mut buffers = HashSet::default(); for buffer_id in &envelope.payload.buffer_ids { let buffer_id = BufferId::new(*buffer_id)?; - buffers.insert( - this.opened_buffers - .get(&buffer_id) - .and_then(|buffer| buffer.upgrade()) - .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?, - ); + buffers.insert(this.buffer_store.read(cx).get_existing(buffer_id)?); } Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx)) })??; @@ -9472,7 +8872,7 @@ impl Project { for buffer in envelope.payload.buffers { let buffer_id = BufferId::new(buffer.id)?; let remote_version = language::proto::deserialize_version(&buffer.version); - if let Some(buffer) = this.buffer_for_id(buffer_id) { + if let Some(buffer) = this.buffer_for_id(buffer_id, cx) { this.shared_buffers .entry(guest_id) .or_default() @@ -9552,12 +8952,7 @@ impl Project { let mut buffers = HashSet::default(); for buffer_id in &envelope.payload.buffer_ids { let buffer_id = BufferId::new(*buffer_id)?; - buffers.insert( - this.opened_buffers - .get(&buffer_id) - .and_then(|buffer| buffer.upgrade()) - .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?, - ); + buffers.insert(this.buffer_store.read(cx).get_existing(buffer_id)?); } let trigger = FormatTrigger::from_proto(envelope.payload.trigger); Ok::<_, anyhow::Error>(this.format(buffers, false, trigger, cx)) @@ -9577,13 +8972,9 @@ impl Project { envelope: TypedEnvelope, mut cx: AsyncAppContext, ) -> Result { - let (buffer, completion) = this.update(&mut cx, |this, _| { + let (buffer, completion) = this.update(&mut cx, |this, cx| { let buffer_id = BufferId::new(envelope.payload.buffer_id)?; - let buffer = this - .opened_buffers - .get(&buffer_id) - .and_then(|buffer| buffer.upgrade()) - .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?; + let buffer = this.buffer_store.read(cx).get_existing(buffer_id)?; let completion = Self::deserialize_completion( envelope .payload @@ -9660,11 +9051,7 @@ impl Project { let mut new_text = String::default(); if let Ok(buffer_id) = BufferId::new(envelope.payload.buffer_id) { let buffer_snapshot = this.update(&mut cx, |this, cx| { - let buffer = this - .opened_buffers - .get(&buffer_id) - .and_then(|buffer| buffer.upgrade()) - .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?; + let buffer = this.buffer_store.read(cx).get_existing(buffer_id)?; anyhow::Ok(buffer.read(cx).snapshot()) })??; @@ -9704,12 +9091,8 @@ impl Project { )?; let apply_code_action = this.update(&mut cx, |this, cx| { let buffer_id = BufferId::new(envelope.payload.buffer_id)?; - let buffer = this - .opened_buffers - .get(&buffer_id) - .and_then(|buffer| buffer.upgrade()) - .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?; - Ok::<_, anyhow::Error>(this.apply_code_action(buffer, action, false, cx)) + let buffer = this.buffer_store.read(cx).get_existing(buffer_id)?; + anyhow::Ok(this.apply_code_action(buffer, action, false, cx)) })??; let project_transaction = apply_code_action.await?; @@ -9728,11 +9111,7 @@ impl Project { ) -> Result { let on_type_formatting = this.update(&mut cx, |this, cx| { let buffer_id = BufferId::new(envelope.payload.buffer_id)?; - let buffer = this - .opened_buffers - .get(&buffer_id) - .and_then(|buffer| buffer.upgrade()) - .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?; + let buffer = this.buffer_store.read(cx).get_existing(buffer_id)?; let position = envelope .payload .position @@ -9760,11 +9139,8 @@ impl Project { ) -> Result { let sender_id = envelope.original_sender_id()?; let buffer_id = BufferId::new(envelope.payload.buffer_id)?; - let buffer = this.update(&mut cx, |this, _| { - this.opened_buffers - .get(&buffer_id) - .and_then(|buffer| buffer.upgrade()) - .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id)) + let buffer = this.update(&mut cx, |this, cx| { + this.buffer_store.read(cx).get_existing(buffer_id) })??; buffer .update(&mut cx, |buffer, _| { @@ -9812,12 +9188,9 @@ impl Project { .expect("incorrect protobuf resolve inlay hint message: missing the inlay hint"); let hint = InlayHints::proto_to_project_hint(proto_hint) .context("resolved proto inlay hint conversion")?; - let buffer = this.update(&mut cx, |this, _cx| { + let buffer = this.update(&mut cx, |this, cx| { let buffer_id = BufferId::new(envelope.payload.buffer_id)?; - this.opened_buffers - .get(&buffer_id) - .and_then(|buffer| buffer.upgrade()) - .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id)) + this.buffer_store.read(cx).get_existing(buffer_id) })??; let response_hint = this .update(&mut cx, |project, cx| { @@ -9956,12 +9329,8 @@ impl Project { ) -> Result { let sender_id = envelope.original_sender_id()?; let buffer_id = BufferId::new(envelope.payload.buffer_id)?; - let buffer = project.update(&mut cx, |project, _| { - project - .opened_buffers - .get(&buffer_id) - .and_then(|buffer| buffer.upgrade()) - .with_context(|| format!("unknown buffer id {}", envelope.payload.buffer_id)) + let buffer = project.update(&mut cx, |project, cx| { + project.buffer_store.read(cx).get_existing(buffer_id) })??; let response = GetSignatureHelp::from_proto( envelope.payload.clone(), @@ -10101,11 +9470,8 @@ impl Project { { let sender_id = envelope.original_sender_id()?; let buffer_id = T::buffer_id_from_proto(&envelope.payload)?; - let buffer_handle = this.update(&mut cx, |this, _cx| { - this.opened_buffers - .get(&buffer_id) - .and_then(|buffer| buffer.upgrade()) - .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id)) + let buffer_handle = this.update(&mut cx, |this, cx| { + this.buffer_store.read(cx).get_existing(buffer_id) })??; let request = T::from_proto( envelope.payload, @@ -10343,42 +9709,39 @@ impl Project { serialized_transaction } - fn deserialize_project_transaction( - &mut self, + async fn deserialize_project_transaction( + this: WeakModel, message: proto::ProjectTransaction, push_to_history: bool, - cx: &mut ModelContext, - ) -> Task> { - cx.spawn(move |this, mut cx| async move { - let mut project_transaction = ProjectTransaction::default(); - for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions) - { - let buffer_id = BufferId::new(buffer_id)?; - let buffer = this - .update(&mut cx, |this, cx| { - this.wait_for_remote_buffer(buffer_id, cx) - })? - .await?; - let transaction = language::proto::deserialize_transaction(transaction)?; - project_transaction.0.insert(buffer, transaction); + mut cx: AsyncAppContext, + ) -> Result { + let mut project_transaction = ProjectTransaction::default(); + for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions) { + let buffer_id = BufferId::new(buffer_id)?; + let buffer = this + .update(&mut cx, |this, cx| { + this.wait_for_remote_buffer(buffer_id, cx) + })? + .await?; + let transaction = language::proto::deserialize_transaction(transaction)?; + project_transaction.0.insert(buffer, transaction); + } + + for (buffer, transaction) in &project_transaction.0 { + buffer + .update(&mut cx, |buffer, _| { + buffer.wait_for_edits(transaction.edit_ids.iter().copied()) + })? + .await?; + + if push_to_history { + buffer.update(&mut cx, |buffer, _| { + buffer.push_transaction(transaction.clone(), Instant::now()); + })?; } + } - for (buffer, transaction) in &project_transaction.0 { - buffer - .update(&mut cx, |buffer, _| { - buffer.wait_for_edits(transaction.edit_ids.iter().copied()) - })? - .await?; - - if push_to_history { - buffer.update(&mut cx, |buffer, _| { - buffer.push_transaction(transaction.clone(), Instant::now()); - })?; - } - } - - Ok(project_transaction) - }) + Ok(project_transaction) } fn create_buffer_for_peer( @@ -10401,19 +9764,9 @@ impl Project { id: BufferId, cx: &mut ModelContext, ) -> Task>> { - let buffer = self - .opened_buffers - .get(&id) - .and_then(|buffer| buffer.upgrade()); - - if let Some(buffer) = buffer { - return Task::ready(Ok(buffer)); - } - - let (tx, rx) = oneshot::channel(); - self.loading_buffers.entry(id).or_default().push(tx); - - cx.background_executor().spawn(async move { rx.await? }) + self.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.wait_for_remote_buffer(id, cx) + }) } fn synchronize_remote_buffers(&mut self, cx: &mut ModelContext) -> Task> { @@ -10441,24 +9794,7 @@ impl Project { let client = self.client.clone(); cx.spawn(move |this, mut cx| async move { let (buffers, incomplete_buffer_ids) = this.update(&mut cx, |this, cx| { - let buffers = this - .opened_buffers - .iter() - .filter_map(|(id, buffer)| { - let buffer = buffer.upgrade()?; - Some(proto::BufferVersion { - id: (*id).into(), - version: language::proto::serialize_version(&buffer.read(cx).version), - }) - }) - .collect(); - let incomplete_buffer_ids = this - .incomplete_remote_buffers - .keys() - .copied() - .collect::>(); - - (buffers, incomplete_buffer_ids) + this.buffer_store.read(cx).buffer_version_info(cx) })?; let response = client .request(proto::SynchronizeBuffers { @@ -10480,7 +9816,7 @@ impl Project { } }; let remote_version = language::proto::deserialize_version(&buffer.version); - if let Some(buffer) = this.buffer_for_id(buffer_id) { + if let Some(buffer) = this.buffer_for_id(buffer_id, cx) { let operations = buffer.read(cx).serialize_ops(Some(remote_version), cx); cx.background_executor().spawn(async move { @@ -10563,7 +9899,7 @@ impl Project { remote_id, replica_id, worktree, - Box::new(CollabRemoteWorktreeClient(self.client.clone())), + self.client.clone().into(), cx, ), cx, @@ -10694,10 +10030,9 @@ impl Project { this.update(&mut cx, |this, cx| { let buffer = this - .opened_buffers - .get(&buffer_id) - .and_then(|buffer| buffer.upgrade()) - .or_else(|| this.incomplete_remote_buffers.get(&buffer_id).cloned()); + .buffer_store + .read(cx) + .get_possibly_incomplete(buffer_id); if let Some(buffer) = buffer { buffer.update(cx, |buffer, cx| { buffer.did_save(version, mtime, cx); @@ -10721,12 +10056,11 @@ impl Project { let mtime = payload.mtime.map(|time| time.into()); let buffer_id = BufferId::new(payload.buffer_id)?; this.update(&mut cx, |this, cx| { - let buffer = this - .opened_buffers - .get(&buffer_id) - .and_then(|buffer| buffer.upgrade()) - .or_else(|| this.incomplete_remote_buffers.get(&buffer_id).cloned()); - if let Some(buffer) = buffer { + if let Some(buffer) = this + .buffer_store + .read(cx) + .get_possibly_incomplete(buffer_id) + { buffer.update(cx, |buffer, cx| { buffer.did_reload(version, line_ending, mtime, cx); }); @@ -11517,28 +10851,6 @@ impl WorktreeHandle { } } -impl OpenBuffer { - pub fn upgrade(&self) -> Option> { - match self { - OpenBuffer::Strong(handle) => Some(handle.clone()), - OpenBuffer::Weak(handle) => handle.upgrade(), - OpenBuffer::Operations(_) => None, - } - } -} - -pub struct CollabRemoteWorktreeClient(Arc); - -impl RemoteWorktreeClient for CollabRemoteWorktreeClient { - fn request( - &self, - envelope: proto::Envelope, - request_type: &'static str, - ) -> BoxFuture<'static, Result> { - self.0.request_dynamic(envelope, request_type).boxed() - } -} - pub struct PathMatchCandidateSet { pub snapshot: Snapshot, pub include_ignored: bool, @@ -11840,27 +11152,6 @@ impl Completion { } } -async fn wait_for_loading_buffer( - mut receiver: postage::watch::Receiver, Arc>>>, -) -> Result, Arc> { - loop { - if let Some(result) = receiver.borrow().as_ref() { - match result { - Ok(buffer) => return Ok(buffer.to_owned()), - Err(e) => return Err(e.to_owned()), - } - } - receiver.next().await; - } -} - -fn is_not_found_error(error: &anyhow::Error) -> bool { - error - .root_cause() - .downcast_ref::() - .is_some_and(|err| err.kind() == io::ErrorKind::NotFound) -} - fn include_text(server: &lsp::LanguageServer) -> bool { server .capabilities() diff --git a/crates/project/src/project_tests.rs b/crates/project/src/project_tests.rs index 4945d89ce7..1c64c08345 100644 --- a/crates/project/src/project_tests.rs +++ b/crates/project/src/project_tests.rs @@ -3056,15 +3056,8 @@ async fn test_rescan_and_remote_updates(cx: &mut gpui::TestAppContext) { }); }); - let remote = cx.update(|cx| { - Worktree::remote( - 0, - 1, - metadata, - Box::new(CollabRemoteWorktreeClient(project.read(cx).client())), - cx, - ) - }); + let remote = + cx.update(|cx| Worktree::remote(0, 1, metadata, project.read(cx).client().into(), cx)); cx.executor().run_until_parked(); diff --git a/crates/proto/Cargo.toml b/crates/proto/Cargo.toml index eca020a92d..5ee2e60aaa 100644 --- a/crates/proto/Cargo.toml +++ b/crates/proto/Cargo.toml @@ -19,6 +19,7 @@ doctest = false [dependencies] anyhow.workspace = true collections.workspace = true +futures.workspace = true prost.workspace = true serde.workspace = true diff --git a/crates/proto/src/proto.rs b/crates/proto/src/proto.rs index 047a072792..a90ea3edb2 100644 --- a/crates/proto/src/proto.rs +++ b/crates/proto/src/proto.rs @@ -7,18 +7,19 @@ mod typed_envelope; pub use error::*; pub use typed_envelope::*; +use anyhow::anyhow; use collections::HashMap; +use futures::{future::BoxFuture, Future}; pub use prost::{DecodeError, Message}; use serde::Serialize; -use std::any::{Any, TypeId}; -use std::time::Instant; use std::{ + any::{Any, TypeId}, cmp, - fmt::Debug, - iter, - time::{Duration, SystemTime, UNIX_EPOCH}, + fmt::{self, Debug}, + iter, mem, + sync::Arc, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; -use std::{fmt, mem}; include!(concat!(env!("OUT_DIR"), "/zed.messages.rs")); @@ -59,6 +60,51 @@ pub enum MessagePriority { Background, } +pub trait ProtoClient: Send + Sync { + fn request( + &self, + envelope: Envelope, + request_type: &'static str, + ) -> BoxFuture<'static, anyhow::Result>; + + fn send(&self, envelope: Envelope) -> anyhow::Result<()>; +} + +#[derive(Clone)] +pub struct AnyProtoClient(Arc); + +impl From> for AnyProtoClient +where + T: ProtoClient + 'static, +{ + fn from(client: Arc) -> Self { + Self(client) + } +} + +impl AnyProtoClient { + pub fn new(client: Arc) -> Self { + Self(client) + } + + pub fn request( + &self, + request: T, + ) -> impl Future> { + let envelope = request.into_envelope(0, None, None); + let response = self.0.request(envelope, T::NAME); + async move { + T::Response::from_envelope(response.await?) + .ok_or_else(|| anyhow!("received response of the wrong type")) + } + } + + pub fn send(&self, request: T) -> anyhow::Result<()> { + let envelope = request.into_envelope(0, None, None); + self.0.send(envelope) + } +} + impl AnyTypedEnvelope for TypedEnvelope { fn payload_type_id(&self) -> TypeId { TypeId::of::() diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 0b4af7e9c5..e0d38f9f37 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -557,6 +557,14 @@ impl Peer { Ok(()) } + pub fn send_dynamic(&self, receiver_id: ConnectionId, message: proto::Envelope) -> Result<()> { + let connection = self.connection_state(receiver_id)?; + connection + .outgoing_tx + .unbounded_send(proto::Message::Envelope(message))?; + Ok(()) + } + pub fn forward_send( &self, sender_id: ConnectionId, diff --git a/crates/worktree/src/worktree.rs b/crates/worktree/src/worktree.rs index 1672d9f8da..4406f261df 100644 --- a/crates/worktree/src/worktree.rs +++ b/crates/worktree/src/worktree.rs @@ -37,7 +37,7 @@ use postage::{ prelude::{Sink as _, Stream as _}, watch, }; -use rpc::proto::{self, EnvelopedMessage as _, RequestMessage}; +use rpc::proto::{self, AnyProtoClient}; use settings::{Settings, SettingsLocation, SettingsStore}; use smol::channel::{self, Sender}; use std::{ @@ -131,7 +131,7 @@ pub struct RemoteWorktree { snapshot: Snapshot, background_snapshot: Arc>, project_id: u64, - client: Box, + client: AnyProtoClient, updates_tx: Option>, update_observer: Arc< Mutex BoxFuture<'static, bool>>>>, @@ -142,14 +142,6 @@ pub struct RemoteWorktree { disconnected: bool, } -pub trait RemoteWorktreeClient { - fn request( - &self, - envelope: proto::Envelope, - request_type: &'static str, - ) -> BoxFuture<'static, Result>; -} - #[derive(Clone)] pub struct Snapshot { id: WorktreeId, @@ -461,7 +453,7 @@ impl Worktree { project_id: u64, replica_id: ReplicaId, worktree: proto::WorktreeMetadata, - client: Box, + client: AnyProtoClient, cx: &mut AppContext, ) -> Model { cx.new_model(|cx: &mut ModelContext| { @@ -706,7 +698,7 @@ impl Worktree { Worktree::Local(this) => this.create_entry(path, is_directory, cx), Worktree::Remote(this) => { let project_id = this.project_id; - let request = this.rpc_request(proto::CreateProjectEntry { + let request = this.client.request(proto::CreateProjectEntry { worktree_id: worktree_id.to_proto(), project_id, path: path.to_string_lossy().into(), @@ -748,7 +740,7 @@ impl Worktree { match self { Worktree::Local(this) => this.delete_entry(entry_id, trash, cx), Worktree::Remote(this) => { - let response = this.rpc_request(proto::DeleteProjectEntry { + let response = this.client.request(proto::DeleteProjectEntry { project_id: this.project_id, entry_id: entry_id.to_proto(), use_trash: trash, @@ -778,7 +770,7 @@ impl Worktree { match self { Worktree::Local(this) => this.rename_entry(entry_id, new_path, cx), Worktree::Remote(this) => { - let response = this.rpc_request(proto::RenameProjectEntry { + let response = this.client.request(proto::RenameProjectEntry { project_id: this.project_id, entry_id: entry_id.to_proto(), new_path: new_path.to_string_lossy().into(), @@ -820,7 +812,7 @@ impl Worktree { match self { Worktree::Local(this) => this.copy_entry(entry_id, new_path, cx), Worktree::Remote(this) => { - let response = this.rpc_request(proto::CopyProjectEntry { + let response = this.client.request(proto::CopyProjectEntry { project_id: this.project_id, entry_id: entry_id.to_proto(), new_path: new_path.to_string_lossy().into(), @@ -870,7 +862,7 @@ impl Worktree { match self { Worktree::Local(this) => this.expand_entry(entry_id, cx), Worktree::Remote(this) => { - let response = this.rpc_request(proto::ExpandProjectEntry { + let response = this.client.request(proto::ExpandProjectEntry { project_id: this.project_id, entry_id: entry_id.to_proto(), }); @@ -1811,24 +1803,20 @@ impl LocalWorktree { } impl RemoteWorktree { + pub fn project_id(&self) -> u64 { + self.project_id + } + + pub fn client(&self) -> AnyProtoClient { + self.client.clone() + } + pub fn disconnected_from_host(&mut self) { self.updates_tx.take(); self.snapshot_subscriptions.clear(); self.disconnected = true; } - fn rpc_request( - &self, - request: T, - ) -> impl Future> { - let envelope = request.into_envelope(0, None, None); - let response = self.client.request(envelope, T::NAME); - async move { - T::Response::from_envelope(response.await?) - .ok_or_else(|| anyhow!("received response of the wrong type")) - } - } - pub fn update_from_remote(&mut self, update: proto::UpdateWorktree) { if let Some(updates_tx) = &self.updates_tx { updates_tx