From b14bb6bda44e1b6e56506c3d579c9e949aecc525 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 24 Jul 2024 15:50:29 -0700 Subject: [PATCH] Make git blame for SSH remote projects (#15106) This also refactors the BufferStore + WorktreeStore interfaces to make them cleaner, more fully encapsulating the RPC aspects of their functionality. Release Notes: - N/A --- crates/client/src/client.rs | 2 +- crates/project/src/buffer_store.rs | 464 ++++++++++++++---- crates/project/src/project.rs | 490 +++---------------- crates/project/src/project_tests.rs | 1 + crates/proto/src/proto.rs | 4 + crates/remote_server/src/headless_project.rs | 58 +-- 6 files changed, 452 insertions(+), 567 deletions(-) diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 8c309f6846..e0c6690bb9 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -1410,7 +1410,7 @@ impl Client { self.peer.send(self.connection_id()?, message) } - fn send_dynamic(&self, envelope: proto::Envelope) -> Result<()> { + pub fn send_dynamic(&self, envelope: proto::Envelope) -> Result<()> { let connection_id = self.connection_id()?; self.peer.send_dynamic(connection_id, envelope) } diff --git a/crates/project/src/buffer_store.rs b/crates/project/src/buffer_store.rs index 4509b4e056..2d5ed7ed84 100644 --- a/crates/project/src/buffer_store.rs +++ b/crates/project/src/buffer_store.rs @@ -1,31 +1,34 @@ use crate::{ worktree_store::{WorktreeStore, WorktreeStoreEvent}, - ProjectPath, + NoRepositoryError, ProjectPath, }; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context as _, Result}; use collections::{hash_map, HashMap}; use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt as _}; +use git::blame::Blame; use gpui::{ AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel, }; +use http_client::Url; use language::{ proto::{deserialize_line_ending, deserialize_version, serialize_version, split_operations}, - Buffer, Capability, Event as BufferEvent, Language, Operation, + Buffer, Capability, Event as BufferEvent, File as _, Language, Operation, }; use rpc::{ - proto::{self, AnyProtoClient, PeerId}, + proto::{self, AnyProtoClient, EnvelopedMessage, PeerId}, ErrorExt as _, TypedEnvelope, }; -use std::{io, path::Path, sync::Arc}; +use std::{io, path::Path, str::FromStr as _, sync::Arc}; use text::BufferId; use util::{debug_panic, maybe, ResultExt as _}; use worktree::{ File, PathChange, ProjectEntryId, RemoteWorktree, UpdatedGitRepositoriesSet, Worktree, + WorktreeId, }; /// A set of open buffers. pub struct BufferStore { - retain_buffers: bool, + remote_id: Option, #[allow(unused)] worktree_store: Model, opened_buffers: HashMap, @@ -51,19 +54,9 @@ pub enum BufferStoreEvent { BufferAdded(Model), BufferChangedFilePath { buffer: Model, - old_file: Option>, - }, - BufferSaved { - buffer: Model, - has_changed_file: bool, - saved_version: clock::Global, - }, - LocalBufferUpdated { - buffer: Model, - }, - DiffBaseUpdated { - buffer: Model, + old_file: Option>, }, + MessageToReplicas(Box), } impl EventEmitter for BufferStore {} @@ -77,7 +70,7 @@ impl BufferStore { /// weak handles. pub fn new( worktree_store: Model, - retain_buffers: bool, + remote_id: Option, cx: &mut ModelContext, ) -> Self { cx.subscribe(&worktree_store, |this, _, event, cx| match event { @@ -89,7 +82,7 @@ impl BufferStore { .detach(); Self { - retain_buffers, + remote_id, worktree_store, opened_buffers: Default::default(), remote_buffer_listeners: Default::default(), @@ -272,13 +265,23 @@ impl BufferStore { }) .await; - this.update(&mut cx, |_, cx| { + this.update(&mut cx, |this, cx| { // Assign the new diff bases on all of the buffers. for (buffer, diff_base) in diff_bases_by_buffer { - buffer.update(cx, |buffer, cx| { + let buffer_id = buffer.update(cx, |buffer, cx| { buffer.set_diff_base(diff_base.clone(), cx); + buffer.remote_id().to_proto() }); - cx.emit(BufferStoreEvent::DiffBaseUpdated { buffer }) + if let Some(project_id) = this.remote_id { + cx.emit(BufferStoreEvent::MessageToReplicas(Box::new( + proto::UpdateDiffBase { + project_id, + buffer_id, + diff_base, + } + .into_envelope(0, None, None), + ))) + } } }) }) @@ -433,9 +436,7 @@ impl BufferStore { return Task::ready(Err(anyhow!("no such worktree"))); }; - let old_file = File::from_dyn(buffer.read(cx).file()) - .cloned() - .map(Arc::new); + let old_file = buffer.read(cx).file().cloned(); let task = match worktree.read(cx) { Worktree::Local(_) => { @@ -465,6 +466,7 @@ impl BufferStore { let text = buffer.as_rope().clone(); let line_ending = buffer.line_ending(); let version = buffer.version(); + let buffer_id = buffer.remote_id(); if buffer.file().is_some_and(|file| !file.is_created()) { has_changed_file = true; } @@ -476,20 +478,35 @@ impl BufferStore { cx.spawn(move |this, mut cx| async move { let new_file = save.await?; let mtime = new_file.mtime; + this.update(&mut cx, |this, cx| { + if let Some(project_id) = this.remote_id { + if has_changed_file { + cx.emit(BufferStoreEvent::MessageToReplicas(Box::new( + proto::UpdateBufferFile { + project_id, + buffer_id: buffer_id.to_proto(), + file: Some(language::File::to_proto(&*new_file, cx)), + } + .into_envelope(0, None, None), + ))); + } + cx.emit(BufferStoreEvent::MessageToReplicas(Box::new( + proto::BufferSaved { + project_id, + buffer_id: buffer_id.to_proto(), + version: serialize_version(&version), + mtime: mtime.map(|time| time.into()), + } + .into_envelope(0, None, None), + ))); + } + })?; buffer_handle.update(&mut cx, |buffer, cx| { if has_changed_file { buffer.file_updated(new_file, cx); } buffer.did_save(version.clone(), mtime, cx); - })?; - this.update(&mut cx, |_, cx| { - cx.emit(BufferStoreEvent::BufferSaved { - buffer: buffer_handle, - has_changed_file, - saved_version: version, - }) - })?; - Ok(()) + }) }) } @@ -525,10 +542,69 @@ impl BufferStore { }) } + pub fn blame_buffer( + &self, + buffer: &Model, + version: Option, + cx: &AppContext, + ) -> Task> { + let buffer = buffer.read(cx); + let Some(file) = File::from_dyn(buffer.file()) else { + return Task::ready(Err(anyhow!("buffer has no file"))); + }; + + match file.worktree.clone().read(cx) { + Worktree::Local(worktree) => { + let worktree = worktree.snapshot(); + let blame_params = maybe!({ + let (repo_entry, local_repo_entry) = match worktree.repo_for_path(&file.path) { + Some(repo_for_path) => repo_for_path, + None => anyhow::bail!(NoRepositoryError {}), + }; + + let relative_path = repo_entry + .relativize(&worktree, &file.path) + .context("failed to relativize buffer path")?; + + let repo = local_repo_entry.repo().clone(); + + let content = match version { + Some(version) => buffer.rope_for_version(&version).clone(), + None => buffer.as_rope().clone(), + }; + + anyhow::Ok((repo, relative_path, content)) + }); + + cx.background_executor().spawn(async move { + let (repo, relative_path, content) = blame_params?; + repo.blame(&relative_path, content) + .with_context(|| format!("Failed to blame {:?}", relative_path.0)) + }) + } + Worktree::Remote(worktree) => { + let buffer_id = buffer.remote_id(); + let version = buffer.version(); + let project_id = worktree.project_id(); + let client = worktree.client(); + cx.spawn(|_| async move { + let response = client + .request(proto::BlameBuffer { + project_id, + buffer_id: buffer_id.into(), + version: serialize_version(&version), + }) + .await?; + Ok(deserialize_blame_buffer_response(response)) + }) + } + } + } + fn add_buffer(&mut self, buffer: Model, cx: &mut ModelContext) -> Result<()> { let remote_id = buffer.read(cx).remote_id(); let is_remote = buffer.read(cx).replica_id() != 0; - let open_buffer = if self.retain_buffers { + let open_buffer = if self.remote_id.is_some() { OpenBuffer::Strong(buffer.clone()) } else { OpenBuffer::Weak(buffer.downgrade()) @@ -664,7 +740,7 @@ impl BufferStore { } pub fn disconnected_from_host(&mut self, cx: &mut AppContext) { - self.set_retain_buffers(false, cx); + self.set_remote_id(None, cx); for buffer in self.buffers() { buffer.update(cx, |buffer, cx| { @@ -677,10 +753,10 @@ impl BufferStore { self.remote_buffer_listeners.clear(); } - pub fn set_retain_buffers(&mut self, retain_buffers: bool, cx: &mut AppContext) { - self.retain_buffers = retain_buffers; + pub fn set_remote_id(&mut self, remote_id: Option, cx: &mut AppContext) { + self.remote_id = remote_id; for open_buffer in self.opened_buffers.values_mut() { - if retain_buffers { + if remote_id.is_some() { if let OpenBuffer::Weak(buffer) = open_buffer { if let Some(buffer) = buffer.upgrade() { *open_buffer = OpenBuffer::Strong(buffer); @@ -741,8 +817,9 @@ impl BufferStore { return None; }; - let (old_file, new_file) = buffer.update(cx, |buffer, cx| { - let old_file = File::from_dyn(buffer.file())?; + let events = buffer.update(cx, |buffer, cx| { + let file = buffer.file()?; + let old_file = File::from_dyn(Some(file))?; if old_file.worktree != *worktree { return None; } @@ -786,41 +863,54 @@ impl BufferStore { return None; } - let old_file = Arc::new(old_file.clone()); - let new_file = Arc::new(new_file); - buffer.file_updated(new_file.clone(), cx); - Some((old_file, new_file)) + let mut events = Vec::new(); + if new_file.path != old_file.path { + self.local_buffer_ids_by_path.remove(&ProjectPath { + path: old_file.path.clone(), + worktree_id: old_file.worktree_id(cx), + }); + self.local_buffer_ids_by_path.insert( + ProjectPath { + worktree_id: new_file.worktree_id(cx), + path: new_file.path.clone(), + }, + buffer_id, + ); + events.push(BufferStoreEvent::BufferChangedFilePath { + buffer: cx.handle(), + old_file: buffer.file().cloned(), + }); + } + + if new_file.entry_id != old_file.entry_id { + if let Some(entry_id) = old_file.entry_id { + self.local_buffer_ids_by_entry_id.remove(&entry_id); + } + if let Some(entry_id) = new_file.entry_id { + self.local_buffer_ids_by_entry_id + .insert(entry_id, buffer_id); + } + } + + if let Some(project_id) = self.remote_id { + events.push(BufferStoreEvent::MessageToReplicas(Box::new( + proto::UpdateBufferFile { + project_id, + buffer_id: buffer_id.to_proto(), + file: Some(new_file.to_proto(cx)), + } + .into_envelope(0, None, None), + ))) + } + + buffer.file_updated(Arc::new(new_file), cx); + Some(events) })?; - if new_file.path != old_file.path { - self.local_buffer_ids_by_path.remove(&ProjectPath { - path: old_file.path.clone(), - worktree_id: old_file.worktree_id(cx), - }); - self.local_buffer_ids_by_path.insert( - ProjectPath { - worktree_id: new_file.worktree_id(cx), - path: new_file.path.clone(), - }, - buffer_id, - ); - cx.emit(BufferStoreEvent::BufferChangedFilePath { - buffer: buffer.clone(), - old_file: Some(old_file.clone()), - }); + for event in events { + cx.emit(event); } - if new_file.entry_id != old_file.entry_id { - if let Some(entry_id) = old_file.entry_id { - self.local_buffer_ids_by_entry_id.remove(&entry_id); - } - if let Some(entry_id) = new_file.entry_id { - self.local_buffer_ids_by_entry_id - .insert(entry_id, buffer_id); - } - } - - cx.emit(BufferStoreEvent::LocalBufferUpdated { buffer }); None } @@ -899,11 +989,10 @@ impl BufferStore { Ok(()) } - pub fn handle_update_buffer( - &mut self, + pub async fn handle_update_buffer( + this: Model, envelope: TypedEnvelope, - is_remote: bool, - cx: &mut AppContext, + mut cx: AsyncAppContext, ) -> Result { let payload = envelope.payload.clone(); let buffer_id = BufferId::new(payload.buffer_id)?; @@ -912,32 +1001,26 @@ impl BufferStore { .into_iter() .map(language::proto::deserialize_operation) .collect::, _>>()?; - match self.opened_buffers.entry(buffer_id) { - hash_map::Entry::Occupied(mut e) => match e.get_mut() { - OpenBuffer::Strong(buffer) => { - buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?; + this.update(&mut cx, |this, cx| { + match this.opened_buffers.entry(buffer_id) { + hash_map::Entry::Occupied(mut e) => match e.get_mut() { + OpenBuffer::Strong(buffer) => { + buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?; + } + OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops), + OpenBuffer::Weak(_) => {} + }, + hash_map::Entry::Vacant(e) => { + e.insert(OpenBuffer::Operations(ops)); } - OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops), - OpenBuffer::Weak(_) => {} - }, - hash_map::Entry::Vacant(e) => { - if !is_remote { - debug_panic!( - "received buffer update from {:?}", - envelope.original_sender_id - ); - return Err(anyhow!("received buffer update for non-remote project")); - } - e.insert(OpenBuffer::Operations(ops)); } - } - Ok(proto::Ack {}) + Ok(proto::Ack {}) + })? } pub fn handle_create_buffer_for_peer( &mut self, envelope: TypedEnvelope, - mut worktrees: impl Iterator>, replica_id: u16, capability: Capability, cx: &mut ModelContext, @@ -954,8 +1037,10 @@ impl BufferStore { let mut buffer_file = None; if let Some(file) = state.file.take() { let worktree_id = worktree::WorktreeId::from_proto(file.worktree_id); - let worktree = worktrees - .find(|worktree| worktree.read(cx).id() == worktree_id) + let worktree = self + .worktree_store + .read(cx) + .worktree_for_id(worktree_id, cx) .ok_or_else(|| { anyhow!("no worktree found for id {}", file.worktree_id) })?; @@ -1018,14 +1103,74 @@ impl BufferStore { Ok(()) } + pub async fn handle_update_buffer_file( + this: Model, + envelope: TypedEnvelope, + mut cx: AsyncAppContext, + ) -> Result<()> { + let buffer_id = envelope.payload.buffer_id; + let buffer_id = BufferId::new(buffer_id)?; + + this.update(&mut cx, |this, cx| { + let payload = envelope.payload.clone(); + if let Some(buffer) = this.get_possibly_incomplete(buffer_id) { + let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?; + let worktree = this + .worktree_store + .read(cx) + .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx) + .ok_or_else(|| anyhow!("no such worktree"))?; + let file = File::from_proto(file, worktree, cx)?; + let old_file = buffer.update(cx, |buffer, cx| { + let old_file = buffer.file().cloned(); + let new_path = file.path.clone(); + buffer.file_updated(Arc::new(file), cx); + if old_file + .as_ref() + .map_or(true, |old| *old.path() != new_path) + { + Some(old_file) + } else { + None + } + }); + if let Some(old_file) = old_file { + cx.emit(BufferStoreEvent::BufferChangedFilePath { buffer, old_file }); + } + } + Ok(()) + })? + } + + pub async fn handle_update_diff_base( + this: Model, + envelope: TypedEnvelope, + mut cx: AsyncAppContext, + ) -> Result<()> { + this.update(&mut cx, |this, cx| { + let buffer_id = envelope.payload.buffer_id; + let buffer_id = BufferId::new(buffer_id)?; + if let Some(buffer) = this.get_possibly_incomplete(buffer_id) { + buffer.update(cx, |buffer, cx| { + buffer.set_diff_base(envelope.payload.diff_base, cx) + }); + } + Ok(()) + })? + } + pub async fn handle_save_buffer( this: Model, - project_id: u64, envelope: TypedEnvelope, mut cx: AsyncAppContext, ) -> Result { let buffer_id = BufferId::new(envelope.payload.buffer_id)?; - let buffer = this.update(&mut cx, |this, _| this.get_existing(buffer_id))??; + let (buffer, project_id) = this.update(&mut cx, |this, _| { + anyhow::Ok(( + this.get_existing(buffer_id)?, + this.remote_id.context("project is not shared")?, + )) + })??; buffer .update(&mut cx, |buffer, _| { buffer.wait_for_version(deserialize_version(&envelope.payload.version)) @@ -1090,6 +1235,27 @@ impl BufferStore { }) } + pub async fn handle_blame_buffer( + this: Model, + envelope: TypedEnvelope, + mut cx: AsyncAppContext, + ) -> Result { + let buffer_id = BufferId::new(envelope.payload.buffer_id)?; + let version = deserialize_version(&envelope.payload.version); + let buffer = this.read_with(&cx, |this, _| this.get_existing(buffer_id))??; + buffer + .update(&mut cx, |buffer, _| { + buffer.wait_for_version(version.clone()) + })? + .await?; + let blame = this + .update(&mut cx, |this, cx| { + this.blame_buffer(&buffer, Some(version), cx) + })? + .await?; + Ok(serialize_blame_buffer_response(blame)) + } + pub async fn wait_for_loading_buffer( mut receiver: postage::watch::Receiver, Arc>>>, ) -> Result, Arc> { @@ -1121,3 +1287,101 @@ fn is_not_found_error(error: &anyhow::Error) -> bool { .downcast_ref::() .is_some_and(|err| err.kind() == io::ErrorKind::NotFound) } + +fn serialize_blame_buffer_response(blame: git::blame::Blame) -> proto::BlameBufferResponse { + let entries = blame + .entries + .into_iter() + .map(|entry| proto::BlameEntry { + sha: entry.sha.as_bytes().into(), + start_line: entry.range.start, + end_line: entry.range.end, + original_line_number: entry.original_line_number, + author: entry.author.clone(), + author_mail: entry.author_mail.clone(), + author_time: entry.author_time, + author_tz: entry.author_tz.clone(), + committer: entry.committer.clone(), + committer_mail: entry.committer_mail.clone(), + committer_time: entry.committer_time, + committer_tz: entry.committer_tz.clone(), + summary: entry.summary.clone(), + previous: entry.previous.clone(), + filename: entry.filename.clone(), + }) + .collect::>(); + + let messages = blame + .messages + .into_iter() + .map(|(oid, message)| proto::CommitMessage { + oid: oid.as_bytes().into(), + message, + }) + .collect::>(); + + let permalinks = blame + .permalinks + .into_iter() + .map(|(oid, url)| proto::CommitPermalink { + oid: oid.as_bytes().into(), + permalink: url.to_string(), + }) + .collect::>(); + + proto::BlameBufferResponse { + entries, + messages, + permalinks, + remote_url: blame.remote_url, + } +} + +fn deserialize_blame_buffer_response(response: proto::BlameBufferResponse) -> git::blame::Blame { + let entries = response + .entries + .into_iter() + .filter_map(|entry| { + Some(git::blame::BlameEntry { + sha: git::Oid::from_bytes(&entry.sha).ok()?, + range: entry.start_line..entry.end_line, + original_line_number: entry.original_line_number, + committer: entry.committer, + committer_time: entry.committer_time, + committer_tz: entry.committer_tz, + committer_mail: entry.committer_mail, + author: entry.author, + author_mail: entry.author_mail, + author_time: entry.author_time, + author_tz: entry.author_tz, + summary: entry.summary, + previous: entry.previous, + filename: entry.filename, + }) + }) + .collect::>(); + + let messages = response + .messages + .into_iter() + .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message))) + .collect::>(); + + let permalinks = response + .permalinks + .into_iter() + .filter_map(|permalink| { + Some(( + git::Oid::from_bytes(&permalink.oid).ok()?, + Url::from_str(&permalink.permalink).ok()?, + )) + }) + .collect::>(); + + Blame { + entries, + permalinks, + messages, + remote_url: response.remote_url, + } +} diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 58762104de..c937362a50 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -39,7 +39,7 @@ use gpui::{ AnyModel, AppContext, AsyncAppContext, BackgroundExecutor, BorrowAppContext, Context, Entity, EventEmitter, Model, ModelContext, PromptLevel, SharedString, Task, WeakModel, WindowContext, }; -use http_client::{HttpClient, Url}; +use http_client::HttpClient; use itertools::Itertools; use language::{ language_settings::{ @@ -101,7 +101,7 @@ use std::{ ops::Range, path::{self, Component, Path, PathBuf}, process::Stdio, - str::{self, FromStr}, + str, sync::{ atomic::{AtomicUsize, Ordering::SeqCst}, Arc, @@ -690,23 +690,15 @@ impl Project { client.add_model_message_handler(Self::handle_add_collaborator); client.add_model_message_handler(Self::handle_update_project_collaborator); client.add_model_message_handler(Self::handle_remove_collaborator); - client.add_model_message_handler(Self::handle_buffer_reloaded); - client.add_model_message_handler(Self::handle_buffer_saved); client.add_model_message_handler(Self::handle_start_language_server); client.add_model_message_handler(Self::handle_update_language_server); client.add_model_message_handler(Self::handle_update_project); client.add_model_message_handler(Self::handle_unshare_project); client.add_model_message_handler(Self::handle_create_buffer_for_peer); - client.add_model_message_handler(Self::handle_update_buffer_file); client.add_model_request_handler(Self::handle_update_buffer); client.add_model_message_handler(Self::handle_update_diagnostic_summary); client.add_model_message_handler(Self::handle_update_worktree); client.add_model_message_handler(Self::handle_update_worktree_settings); - client.add_model_request_handler(Self::handle_create_project_entry); - client.add_model_request_handler(Self::handle_rename_project_entry); - client.add_model_request_handler(Self::handle_copy_project_entry); - client.add_model_request_handler(Self::handle_delete_project_entry); - client.add_model_request_handler(Self::handle_expand_project_entry); client.add_model_request_handler(Self::handle_apply_additional_edits_for_completion); client.add_model_request_handler(Self::handle_resolve_completion_documentation); client.add_model_request_handler(Self::handle_apply_code_action); @@ -732,15 +724,25 @@ impl Project { client.add_model_request_handler(Self::handle_open_buffer_by_id); client.add_model_request_handler(Self::handle_open_buffer_by_path); client.add_model_request_handler(Self::handle_open_new_buffer); - client.add_model_request_handler(Self::handle_save_buffer); - client.add_model_message_handler(Self::handle_update_diff_base); client.add_model_request_handler(Self::handle_lsp_command::); - client.add_model_request_handler(Self::handle_blame_buffer); client.add_model_request_handler(Self::handle_multi_lsp_query); client.add_model_request_handler(Self::handle_restart_language_servers); client.add_model_request_handler(Self::handle_task_context_for_location); client.add_model_request_handler(Self::handle_task_templates); client.add_model_request_handler(Self::handle_lsp_command::); + + client.add_model_request_handler(WorktreeStore::handle_create_project_entry); + client.add_model_request_handler(WorktreeStore::handle_rename_project_entry); + client.add_model_request_handler(WorktreeStore::handle_copy_project_entry); + client.add_model_request_handler(WorktreeStore::handle_delete_project_entry); + client.add_model_request_handler(WorktreeStore::handle_expand_project_entry); + + client.add_model_message_handler(BufferStore::handle_buffer_reloaded); + client.add_model_message_handler(BufferStore::handle_buffer_saved); + client.add_model_message_handler(BufferStore::handle_update_buffer_file); + client.add_model_message_handler(BufferStore::handle_update_diff_base); + client.add_model_request_handler(BufferStore::handle_save_buffer); + client.add_model_request_handler(BufferStore::handle_blame_buffer); } pub fn local( @@ -765,7 +767,7 @@ impl Project { .detach(); let buffer_store = - cx.new_model(|cx| BufferStore::new(worktree_store.clone(), false, cx)); + cx.new_model(|cx| BufferStore::new(worktree_store.clone(), None, cx)); cx.subscribe(&buffer_store, Self::on_buffer_store_event) .detach(); @@ -830,7 +832,7 @@ impl Project { } pub fn ssh( - ssh_session: Arc, + ssh: Arc, client: Arc, node: Arc, user_store: Model, @@ -840,11 +842,14 @@ impl Project { ) -> Model { let this = Self::local(client, node, user_store, languages, fs, cx); this.update(cx, |this, cx| { - ssh_session.add_message_handler(cx.weak_model(), Self::handle_update_worktree); - ssh_session.add_message_handler(cx.weak_model(), Self::handle_create_buffer_for_peer); - ssh_session.add_message_handler(cx.weak_model(), Self::handle_update_buffer_file); - ssh_session.add_message_handler(cx.weak_model(), Self::handle_update_diff_base); - this.ssh_session = Some(ssh_session); + let buffer_store = this.buffer_store.downgrade(); + + ssh.add_message_handler(cx.weak_model(), Self::handle_update_worktree); + ssh.add_message_handler(cx.weak_model(), Self::handle_create_buffer_for_peer); + ssh.add_message_handler(buffer_store.clone(), BufferStore::handle_update_buffer_file); + ssh.add_message_handler(buffer_store.clone(), BufferStore::handle_update_diff_base); + + this.ssh_session = Some(ssh); }); this } @@ -877,7 +882,10 @@ impl Project { ) -> Result> { client.authenticate_and_connect(true, &cx).await?; - let subscription = client.subscribe_to_entity(remote_id)?; + let subscriptions = ( + client.subscribe_to_entity::(remote_id)?, + client.subscribe_to_entity::(remote_id)?, + ); let response = client .request_envelope(proto::JoinProject { project_id: remote_id, @@ -885,7 +893,7 @@ impl Project { .await?; Self::from_join_project_response( response, - subscription, + subscriptions, client, user_store, languages, @@ -897,7 +905,10 @@ impl Project { async fn from_join_project_response( response: TypedEnvelope, - subscription: PendingEntitySubscription, + subscription: ( + PendingEntitySubscription, + PendingEntitySubscription, + ), client: Arc, user_store: Model, languages: Arc, @@ -906,6 +917,11 @@ impl Project { ) -> Result> { let remote_id = response.payload.project_id; let role = response.payload.role(); + + let worktree_store = cx.new_model(|_| WorktreeStore::new(true))?; + let buffer_store = + cx.new_model(|cx| BufferStore::new(worktree_store.clone(), Some(remote_id), cx))?; + let this = cx.new_model(|cx| { let replica_id = response.payload.replica_id as ReplicaId; let tasks = Inventory::new(cx); @@ -913,9 +929,7 @@ impl Project { let snippets = SnippetProvider::new(fs.clone(), BTreeSet::from_iter([global_snippets_dir]), cx); let yarn = YarnPathStore::new(fs.clone(), cx); - // BIG CAUTION NOTE: The order in which we initialize fields here matters and it should match what's done in Self::local. - // Otherwise, you might run into issues where worktree id on remote is different than what's on local host. - // That's because Worktree's identifier is entity id, which should probably be changed. + let mut worktrees = Vec::new(); for worktree in response.payload.worktrees { let worktree = @@ -927,16 +941,12 @@ impl Project { cx.spawn(move |this, cx| Self::send_buffer_ordered_messages(this, rx, cx)) .detach(); - let worktree_store = cx.new_model(|_| WorktreeStore::new(true)); - - let buffer_store = - cx.new_model(|cx| BufferStore::new(worktree_store.clone(), true, cx)); cx.subscribe(&buffer_store, Self::on_buffer_store_event) .detach(); let mut this = Self { buffer_ordered_messages_tx: tx, - buffer_store, + buffer_store: buffer_store.clone(), worktree_store, shared_buffers: Default::default(), loading_worktrees: Default::default(), @@ -1018,7 +1028,11 @@ impl Project { } this })?; - let subscription = subscription.set_model(&this, &mut cx); + + let subscriptions = [ + subscription.0.set_model(&this, &mut cx), + subscription.1.set_model(&buffer_store, &mut cx), + ]; let user_ids = response .payload @@ -1032,7 +1046,7 @@ impl Project { this.update(&mut cx, |this, cx| { this.set_collaborators_from_proto(response.payload.collaborators, cx)?; - this.client_subscriptions.push(subscription); + this.client_subscriptions.extend(subscriptions); anyhow::Ok(()) })??; @@ -1049,7 +1063,10 @@ impl Project { ) -> Result> { client.authenticate_and_connect(true, &cx).await?; - let subscription = client.subscribe_to_entity(remote_id.0)?; + let subscriptions = ( + client.subscribe_to_entity::(remote_id.0)?, + client.subscribe_to_entity::(remote_id.0)?, + ); let response = client .request_envelope(proto::JoinHostedProject { project_id: remote_id.0, @@ -1057,7 +1074,7 @@ impl Project { .await?; Self::from_join_project_response( response, - subscription, + subscriptions, client, user_store, languages, @@ -1572,14 +1589,20 @@ impl Project { return Err(anyhow!("project was already shared")); } } - self.client_subscriptions.push( + self.client_subscriptions.extend([ self.client .subscribe_to_entity(project_id)? .set_model(&cx.handle(), &mut cx.to_async()), - ); + self.client + .subscribe_to_entity(project_id)? + .set_model(&self.worktree_store, &mut cx.to_async()), + self.client + .subscribe_to_entity(project_id)? + .set_model(&self.buffer_store, &mut cx.to_async()), + ]); self.buffer_store.update(cx, |buffer_store, cx| { - buffer_store.set_retain_buffers(true, cx) + buffer_store.set_remote_id(Some(project_id), cx) }); self.worktree_store.update(cx, |store, cx| { store.set_shared(true, cx); @@ -1789,9 +1812,8 @@ impl Project { self.worktree_store.update(cx, |store, cx| { store.set_shared(false, cx); }); - self.buffer_store.update(cx, |buffer_store, cx| { - buffer_store.set_retain_buffers(false, cx) - }); + self.buffer_store + .update(cx, |buffer_store, cx| buffer_store.set_remote_id(None, cx)); self.client .send(proto::UnshareProject { project_id: remote_id, @@ -2377,72 +2399,15 @@ impl Project { self.register_buffer(buffer, cx).log_err(); } BufferStoreEvent::BufferChangedFilePath { buffer, old_file } => { - if let Some(old_file) = &old_file { + if let Some(old_file) = File::from_dyn(old_file.as_ref()) { self.unregister_buffer_from_language_servers(&buffer, old_file, cx); } self.detect_language_for_buffer(&buffer, cx); self.register_buffer_with_language_servers(&buffer, cx); } - BufferStoreEvent::LocalBufferUpdated { buffer } => { - let buffer = buffer.read(cx); - let buffer_id = buffer.remote_id(); - let Some(new_file) = buffer.file() else { - return; - }; - if let Some(project_id) = self.remote_id() { - self.client - .send(proto::UpdateBufferFile { - project_id, - buffer_id: buffer_id.into(), - file: Some(new_file.to_proto(cx)), - }) - .log_err(); - } - } - BufferStoreEvent::DiffBaseUpdated { buffer } => { - let buffer = buffer.read(cx); - let buffer_id = buffer.remote_id(); - let diff_base = buffer.diff_base(); - if let Some(project_id) = self.remote_id() { - self.client - .send(proto::UpdateDiffBase { - project_id, - buffer_id: buffer_id.to_proto(), - diff_base: diff_base.map(|b| b.to_string()), - }) - .log_err(); - } - } - BufferStoreEvent::BufferSaved { - buffer: buffer_handle, - has_changed_file, - saved_version, - } => { - let buffer = buffer_handle.read(cx); - let buffer_id = buffer.remote_id(); - let Some(new_file) = buffer.file() else { - return; - }; - if let Some(project_id) = self.remote_id() { - self.client - .send(proto::BufferSaved { - project_id, - buffer_id: buffer_id.into(), - version: serialize_version(&saved_version), - mtime: new_file.mtime().map(|time| time.into()), - }) - .log_err(); - if *has_changed_file { - self.client - .send(proto::UpdateBufferFile { - project_id, - buffer_id: buffer_id.into(), - file: Some(new_file.to_proto(cx)), - }) - .log_err(); - } - } + BufferStoreEvent::MessageToReplicas(message) => { + self.client.send_dynamic(message.as_ref().clone()).log_err(); } } } @@ -8428,96 +8393,11 @@ impl Project { version: Option, cx: &AppContext, ) -> Task> { - if self.is_local() { - let blame_params = maybe!({ - let buffer = buffer.read(cx); - let buffer_project_path = buffer - .project_path(cx) - .context("failed to get buffer project path")?; - - let worktree = self - .worktree_for_id(buffer_project_path.worktree_id, cx) - .context("failed to get worktree")? - .read(cx) - .as_local() - .context("worktree was not local")? - .snapshot(); - - let (repo_entry, local_repo_entry) = - match worktree.repo_for_path(&buffer_project_path.path) { - Some(repo_for_path) => repo_for_path, - None => anyhow::bail!(NoRepositoryError {}), - }; - - let relative_path = repo_entry - .relativize(&worktree, &buffer_project_path.path) - .context("failed to relativize buffer path")?; - - let repo = local_repo_entry.repo().clone(); - - let content = match version { - Some(version) => buffer.rope_for_version(&version).clone(), - None => buffer.as_rope().clone(), - }; - - anyhow::Ok((repo, relative_path, content)) - }); - - cx.background_executor().spawn(async move { - let (repo, relative_path, content) = blame_params?; - repo.blame(&relative_path, content) - .with_context(|| format!("Failed to blame {:?}", relative_path.0)) - }) - } else { - let project_id = self.remote_id(); - let buffer_id = buffer.read(cx).remote_id(); - let client = self.client.clone(); - let version = buffer.read(cx).version(); - - cx.spawn(|_| async move { - let project_id = project_id.context("unable to get project id for buffer")?; - let response = client - .request(proto::BlameBuffer { - project_id, - buffer_id: buffer_id.into(), - version: serialize_version(&version), - }) - .await?; - - Ok(deserialize_blame_buffer_response(response)) - }) - } + self.buffer_store.read(cx).blame_buffer(buffer, version, cx) } // RPC message handlers - async fn handle_blame_buffer( - this: Model, - envelope: TypedEnvelope, - mut cx: AsyncAppContext, - ) -> Result { - let buffer_id = BufferId::new(envelope.payload.buffer_id)?; - let version = deserialize_version(&envelope.payload.version); - - let buffer = this.update(&mut cx, |this, cx| { - this.buffer_store.read(cx).get_existing(buffer_id) - })??; - - buffer - .update(&mut cx, |buffer, _| { - buffer.wait_for_version(version.clone()) - })? - .await?; - - let blame = this - .update(&mut cx, |this, cx| { - this.blame_buffer(&buffer, Some(version), cx) - })? - .await?; - - Ok(serialize_blame_buffer_response(blame)) - } - async fn handle_multi_lsp_query( project: Model, envelope: TypedEnvelope, @@ -8827,51 +8707,6 @@ impl Project { })? } - async fn handle_create_project_entry( - this: Model, - envelope: TypedEnvelope, - mut cx: AsyncAppContext, - ) -> Result { - let worktree_store = this.update(&mut cx, |this, _| this.worktree_store.clone())?; - WorktreeStore::handle_create_project_entry(worktree_store, envelope, cx).await - } - - async fn handle_rename_project_entry( - this: Model, - envelope: TypedEnvelope, - mut cx: AsyncAppContext, - ) -> Result { - let worktree_store = this.update(&mut cx, |this, _| this.worktree_store.clone())?; - WorktreeStore::handle_rename_project_entry(worktree_store, envelope, cx).await - } - - async fn handle_copy_project_entry( - this: Model, - envelope: TypedEnvelope, - mut cx: AsyncAppContext, - ) -> Result { - let worktree_store = this.update(&mut cx, |this, _| this.worktree_store.clone())?; - WorktreeStore::handle_copy_project_entry(worktree_store, envelope, cx).await - } - - async fn handle_delete_project_entry( - this: Model, - envelope: TypedEnvelope, - mut cx: AsyncAppContext, - ) -> Result { - let worktree_store = this.update(&mut cx, |this, _| this.worktree_store.clone())?; - WorktreeStore::handle_delete_project_entry(worktree_store, envelope, cx).await - } - - async fn handle_expand_project_entry( - this: Model, - envelope: TypedEnvelope, - mut cx: AsyncAppContext, - ) -> Result { - let worktree_store = this.update(&mut cx, |this, _| this.worktree_store.clone())?; - WorktreeStore::handle_expand_project_entry(worktree_store, envelope, cx).await - } - async fn handle_update_diagnostic_summary( this: Model, envelope: TypedEnvelope, @@ -9008,9 +8843,9 @@ impl Project { async fn handle_update_buffer( this: Model, envelope: TypedEnvelope, - mut cx: AsyncAppContext, + cx: AsyncAppContext, ) -> Result { - this.update(&mut cx, |this, cx| { + let buffer_store = this.read_with(&cx, |this, cx| { if let Some(ssh) = &this.ssh_session { let mut payload = envelope.payload.clone(); payload.project_id = 0; @@ -9018,10 +8853,9 @@ impl Project { .spawn(ssh.request(payload)) .detach_and_log_err(cx); } - this.buffer_store.update(cx, |buffer_store, cx| { - buffer_store.handle_update_buffer(envelope, this.is_remote(), cx) - }) - })? + this.buffer_store.clone() + })?; + BufferStore::handle_update_buffer(buffer_store, envelope, cx).await } async fn handle_create_buffer_for_peer( @@ -9033,7 +8867,6 @@ impl Project { this.buffer_store.update(cx, |buffer_store, cx| { buffer_store.handle_create_buffer_for_peer( envelope, - this.worktrees(cx).collect::>().into_iter(), this.replica_id(), this.capability(), cx, @@ -9042,69 +8875,6 @@ impl Project { })? } - async fn handle_update_diff_base( - this: Model, - envelope: TypedEnvelope, - mut cx: AsyncAppContext, - ) -> Result<()> { - this.update(&mut cx, |this, cx| { - let buffer_id = envelope.payload.buffer_id; - let buffer_id = BufferId::new(buffer_id)?; - if let Some(buffer) = this - .buffer_store - .read(cx) - .get_possibly_incomplete(buffer_id) - { - buffer.update(cx, |buffer, cx| { - buffer.set_diff_base(envelope.payload.diff_base, cx) - }); - } - Ok(()) - })? - } - - async fn handle_update_buffer_file( - this: Model, - envelope: TypedEnvelope, - mut cx: AsyncAppContext, - ) -> Result<()> { - let buffer_id = envelope.payload.buffer_id; - let buffer_id = BufferId::new(buffer_id)?; - - this.update(&mut cx, |this, cx| { - let payload = envelope.payload.clone(); - if let Some(buffer) = this - .buffer_store - .read(cx) - .get_possibly_incomplete(buffer_id) - { - let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?; - let worktree = this - .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx) - .ok_or_else(|| anyhow!("no such worktree"))?; - let file = File::from_proto(file, worktree, cx)?; - buffer.update(cx, |buffer, cx| { - buffer.file_updated(Arc::new(file), cx); - }); - this.detect_language_for_buffer(&buffer, cx); - } - Ok(()) - })? - } - - async fn handle_save_buffer( - this: Model, - envelope: TypedEnvelope, - mut cx: AsyncAppContext, - ) -> Result { - let (buffer_store, project_id) = this.update(&mut cx, |this, _| { - let buffer_store = this.buffer_store.clone(); - let project_id = this.remote_id().context("not connected")?; - anyhow::Ok((buffer_store, project_id)) - })??; - BufferStore::handle_save_buffer(buffer_store, project_id, envelope, cx).await - } - async fn handle_reload_buffers( this: Model, envelope: TypedEnvelope, @@ -10252,24 +10022,6 @@ impl Project { }) } - async fn handle_buffer_saved( - this: Model, - envelope: TypedEnvelope, - mut cx: AsyncAppContext, - ) -> Result<()> { - let buffer_store = this.update(&mut cx, |this, _| this.buffer_store.clone())?; - BufferStore::handle_buffer_saved(buffer_store, envelope, cx).await - } - - async fn handle_buffer_reloaded( - this: Model, - envelope: TypedEnvelope, - mut cx: AsyncAppContext, - ) -> Result<()> { - let buffer_store = this.update(&mut cx, |this, _| this.buffer_store.clone())?; - BufferStore::handle_buffer_reloaded(buffer_store, envelope, cx).await - } - #[allow(clippy::type_complexity)] fn edits_from_lsp( &mut self, @@ -11488,104 +11240,6 @@ async fn load_shell_environment( Ok(parsed_env) } -fn serialize_blame_buffer_response(blame: git::blame::Blame) -> proto::BlameBufferResponse { - let entries = blame - .entries - .into_iter() - .map(|entry| proto::BlameEntry { - sha: entry.sha.as_bytes().into(), - start_line: entry.range.start, - end_line: entry.range.end, - original_line_number: entry.original_line_number, - author: entry.author.clone(), - author_mail: entry.author_mail.clone(), - author_time: entry.author_time, - author_tz: entry.author_tz.clone(), - committer: entry.committer.clone(), - committer_mail: entry.committer_mail.clone(), - committer_time: entry.committer_time, - committer_tz: entry.committer_tz.clone(), - summary: entry.summary.clone(), - previous: entry.previous.clone(), - filename: entry.filename.clone(), - }) - .collect::>(); - - let messages = blame - .messages - .into_iter() - .map(|(oid, message)| proto::CommitMessage { - oid: oid.as_bytes().into(), - message, - }) - .collect::>(); - - let permalinks = blame - .permalinks - .into_iter() - .map(|(oid, url)| proto::CommitPermalink { - oid: oid.as_bytes().into(), - permalink: url.to_string(), - }) - .collect::>(); - - proto::BlameBufferResponse { - entries, - messages, - permalinks, - remote_url: blame.remote_url, - } -} - -fn deserialize_blame_buffer_response(response: proto::BlameBufferResponse) -> git::blame::Blame { - let entries = response - .entries - .into_iter() - .filter_map(|entry| { - Some(git::blame::BlameEntry { - sha: git::Oid::from_bytes(&entry.sha).ok()?, - range: entry.start_line..entry.end_line, - original_line_number: entry.original_line_number, - committer: entry.committer, - committer_time: entry.committer_time, - committer_tz: entry.committer_tz, - committer_mail: entry.committer_mail, - author: entry.author, - author_mail: entry.author_mail, - author_time: entry.author_time, - author_tz: entry.author_tz, - summary: entry.summary, - previous: entry.previous, - filename: entry.filename, - }) - }) - .collect::>(); - - let messages = response - .messages - .into_iter() - .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message))) - .collect::>(); - - let permalinks = response - .permalinks - .into_iter() - .filter_map(|permalink| { - Some(( - git::Oid::from_bytes(&permalink.oid).ok()?, - Url::from_str(&permalink.permalink).ok()?, - )) - }) - .collect::>(); - - Blame { - entries, - permalinks, - messages, - remote_url: response.remote_url, - } -} - fn remove_empty_hover_blocks(mut hover: Hover) -> Option { hover .contents diff --git a/crates/project/src/project_tests.rs b/crates/project/src/project_tests.rs index 427b156bbd..fc95ef6d3c 100644 --- a/crates/project/src/project_tests.rs +++ b/crates/project/src/project_tests.rs @@ -2,6 +2,7 @@ use crate::{Event, *}; use fs::FakeFs; use futures::{future, StreamExt}; use gpui::{AppContext, SemanticVersion, UpdateGlobal}; +use http_client::Url; use language::{ language_settings::{AllLanguageSettings, LanguageSettingsContent}, tree_sitter_rust, tree_sitter_typescript, Diagnostic, FakeLspAdapter, LanguageConfig, diff --git a/crates/proto/src/proto.rs b/crates/proto/src/proto.rs index 3a0865a0b1..a205b79ecb 100644 --- a/crates/proto/src/proto.rs +++ b/crates/proto/src/proto.rs @@ -103,6 +103,10 @@ impl AnyProtoClient { let envelope = request.into_envelope(0, None, None); self.0.send(envelope) } + + pub fn send_dynamic(&self, message: Envelope) -> anyhow::Result<()> { + self.0.send(message) + } } impl AnyTypedEnvelope for TypedEnvelope { diff --git a/crates/remote_server/src/headless_project.rs b/crates/remote_server/src/headless_project.rs index 5ab35144d7..feac87a2b1 100644 --- a/crates/remote_server/src/headless_project.rs +++ b/crates/remote_server/src/headless_project.rs @@ -40,14 +40,18 @@ impl HeadlessProject { let this = cx.weak_model(); let worktree_store = cx.new_model(|_| WorktreeStore::new(true)); - let buffer_store = cx.new_model(|cx| BufferStore::new(worktree_store.clone(), true, cx)); + let buffer_store = + cx.new_model(|cx| BufferStore::new(worktree_store.clone(), Some(PROJECT_ID), cx)); cx.subscribe(&buffer_store, Self::on_buffer_store_event) .detach(); session.add_request_handler(this.clone(), Self::handle_add_worktree); session.add_request_handler(this.clone(), Self::handle_open_buffer_by_path); - session.add_request_handler(this.clone(), Self::handle_update_buffer); - session.add_request_handler(this.clone(), Self::handle_save_buffer); + + session.add_request_handler(buffer_store.downgrade(), BufferStore::handle_blame_buffer); + session.add_request_handler(buffer_store.downgrade(), BufferStore::handle_update_buffer); + session.add_request_handler(buffer_store.downgrade(), BufferStore::handle_save_buffer); + session.add_request_handler( worktree_store.downgrade(), WorktreeStore::handle_create_project_entry, @@ -112,27 +116,6 @@ impl HeadlessProject { }) } - pub async fn handle_update_buffer( - this: Model, - envelope: TypedEnvelope, - mut cx: AsyncAppContext, - ) -> Result { - this.update(&mut cx, |this, cx| { - this.buffer_store.update(cx, |buffer_store, cx| { - buffer_store.handle_update_buffer(envelope, false, cx) - }) - })? - } - - pub async fn handle_save_buffer( - this: Model, - envelope: TypedEnvelope, - mut cx: AsyncAppContext, - ) -> Result { - let buffer_store = this.update(&mut cx, |this, _| this.buffer_store.clone())?; - BufferStore::handle_save_buffer(buffer_store, PROJECT_ID, envelope, cx).await - } - pub async fn handle_open_buffer_by_path( this: Model, message: TypedEnvelope, @@ -178,33 +161,12 @@ impl HeadlessProject { &mut self, _: Model, event: &BufferStoreEvent, - cx: &mut ModelContext, + _: &mut ModelContext, ) { match event { - BufferStoreEvent::LocalBufferUpdated { buffer } => { - let buffer = buffer.read(cx); - let buffer_id = buffer.remote_id(); - let Some(new_file) = buffer.file() else { - return; - }; + BufferStoreEvent::MessageToReplicas(message) => { self.session - .send(proto::UpdateBufferFile { - project_id: 0, - buffer_id: buffer_id.into(), - file: Some(new_file.to_proto(cx)), - }) - .log_err(); - } - BufferStoreEvent::DiffBaseUpdated { buffer } => { - let buffer = buffer.read(cx); - let buffer_id = buffer.remote_id(); - let diff_base = buffer.diff_base(); - self.session - .send(proto::UpdateDiffBase { - project_id: 0, - buffer_id: buffer_id.to_proto(), - diff_base: diff_base.map(|b| b.to_string()), - }) + .send_dynamic(message.as_ref().clone()) .log_err(); } _ => {}