From ef22372f0bed460110dc5795f712ed12e1c114d2 Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Mon, 26 Aug 2024 14:47:02 -0600 Subject: [PATCH] SSH remote search (#16915) Co-Authored-By: Max Release Notes: - ssh remoting: add project search --------- Co-authored-by: Max --- crates/collab/src/rpc.rs | 54 ++ crates/collab/src/rpc/connection_pool.rs | 4 + crates/collab/src/tests/integration_tests.rs | 4 +- .../random_project_collaboration_tests.rs | 2 +- crates/project/src/buffer_store.rs | 99 ++- crates/project/src/project.rs | 695 +++++------------- crates/project/src/search.rs | 52 +- crates/project/src/worktree_store.rs | 313 +++++++- crates/proto/proto/zed.proto | 25 +- crates/proto/src/proto.rs | 4 + crates/remote_server/src/headless_project.rs | 47 +- .../remote_server/src/remote_editing_tests.rs | 135 +++- crates/search/src/project_search.rs | 4 +- 13 files changed, 866 insertions(+), 572 deletions(-) diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index f436c02e3e..8a6867c5e0 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -478,6 +478,7 @@ impl Server { .add_request_handler(user_handler( forward_read_only_project_request::, )) + .add_request_handler(user_handler(forward_find_search_candidates_request)) .add_request_handler(user_handler( forward_read_only_project_request::, )) @@ -2943,6 +2944,59 @@ where Ok(()) } +async fn forward_find_search_candidates_request( + request: proto::FindSearchCandidates, + response: Response, + session: UserSession, +) -> Result<()> { + let project_id = ProjectId::from_proto(request.remote_entity_id()); + let host_connection_id = session + .db() + .await + .host_for_read_only_project_request(project_id, session.connection_id, session.user_id()) + .await?; + + let host_version = session + .connection_pool() + .await + .connection(host_connection_id) + .map(|c| c.zed_version); + + if host_version.is_some_and(|host_version| host_version < ZedVersion::with_search_candidates()) + { + let query = request.query.ok_or_else(|| anyhow!("missing query"))?; + let search = proto::SearchProject { + project_id: project_id.to_proto(), + query: query.query, + regex: query.regex, + whole_word: query.whole_word, + case_sensitive: query.case_sensitive, + files_to_include: query.files_to_include, + files_to_exclude: query.files_to_exclude, + include_ignored: query.include_ignored, + }; + + let payload = session + .peer + .forward_request(session.connection_id, host_connection_id, search) + .await?; + return response.send(proto::FindSearchCandidatesResponse { + buffer_ids: payload + .locations + .into_iter() + .map(|loc| loc.buffer_id) + .collect(), + }); + } + + let payload = session + .peer + .forward_request(session.connection_id, host_connection_id, request) + .await?; + response.send(payload)?; + Ok(()) +} + /// forward a project request to the dev server. Only allowed /// if it's your dev server. async fn forward_project_request_for_owner( diff --git a/crates/collab/src/rpc/connection_pool.rs b/crates/collab/src/rpc/connection_pool.rs index 6474b95f55..52d825f4e0 100644 --- a/crates/collab/src/rpc/connection_pool.rs +++ b/crates/collab/src/rpc/connection_pool.rs @@ -42,6 +42,10 @@ impl ZedVersion { pub fn with_list_directory() -> ZedVersion { ZedVersion(SemanticVersion::new(0, 145, 0)) } + + pub fn with_search_candidates() -> ZedVersion { + ZedVersion(SemanticVersion::new(0, 151, 0)) + } } pub trait VersionedMessage { diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index 3e95ca7659..cdd06f2078 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -28,8 +28,8 @@ use live_kit_client::MacOSDisplay; use lsp::LanguageServerId; use parking_lot::Mutex; use project::{ - search::SearchQuery, DiagnosticSummary, FormatTrigger, HoverBlockKind, Project, ProjectPath, - SearchResult, + search::SearchQuery, search::SearchResult, DiagnosticSummary, FormatTrigger, HoverBlockKind, + Project, ProjectPath, }; use rand::prelude::*; use serde_json::json; diff --git a/crates/collab/src/tests/random_project_collaboration_tests.rs b/crates/collab/src/tests/random_project_collaboration_tests.rs index da787a69ed..490718a9ce 100644 --- a/crates/collab/src/tests/random_project_collaboration_tests.rs +++ b/crates/collab/src/tests/random_project_collaboration_tests.rs @@ -15,7 +15,7 @@ use language::{ use lsp::FakeLanguageServer; use pretty_assertions::assert_eq; use project::{ - search::SearchQuery, Project, ProjectPath, SearchResult, DEFAULT_COMPLETION_CONTEXT, + search::SearchQuery, search::SearchResult, Project, ProjectPath, DEFAULT_COMPLETION_CONTEXT, }; use rand::{ distributions::{Alphanumeric, DistString}, diff --git a/crates/project/src/buffer_store.rs b/crates/project/src/buffer_store.rs index 2d5ed7ed84..1be7c45842 100644 --- a/crates/project/src/buffer_store.rs +++ b/crates/project/src/buffer_store.rs @@ -1,9 +1,11 @@ use crate::{ + search::SearchQuery, worktree_store::{WorktreeStore, WorktreeStoreEvent}, - NoRepositoryError, ProjectPath, + Item, NoRepositoryError, ProjectPath, }; use anyhow::{anyhow, Context as _, Result}; -use collections::{hash_map, HashMap}; +use collections::{hash_map, HashMap, HashSet}; +use fs::Fs; use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt as _}; use git::blame::Blame; use gpui::{ @@ -18,6 +20,7 @@ use rpc::{ proto::{self, AnyProtoClient, EnvelopedMessage, PeerId}, ErrorExt as _, TypedEnvelope, }; +use smol::channel::Receiver; use std::{io, path::Path, str::FromStr as _, sync::Arc}; use text::BufferId; use util::{debug_panic, maybe, ResultExt as _}; @@ -778,6 +781,98 @@ impl BufferStore { .retain(|_, buffer| !matches!(buffer, OpenBuffer::Operations(_))); } + pub fn find_search_candidates( + &mut self, + query: &SearchQuery, + limit: usize, + fs: Arc, + cx: &mut ModelContext, + ) -> Receiver> { + let (tx, rx) = smol::channel::unbounded(); + let open_buffers = self.find_open_search_candidates(query, cx); + let skip_entries: HashSet<_> = open_buffers + .iter() + .filter_map(|buffer| buffer.read(cx).entry_id(cx)) + .collect(); + + let limit = limit.saturating_sub(open_buffers.len()); + for open_buffer in open_buffers { + tx.send_blocking(open_buffer).ok(); + } + + let match_rx = self.worktree_store.update(cx, |worktree_store, cx| { + worktree_store.find_search_candidates(query.clone(), limit, skip_entries, fs, cx) + }); + + const MAX_CONCURRENT_BUFFER_OPENS: usize = 8; + + for _ in 0..MAX_CONCURRENT_BUFFER_OPENS { + let mut match_rx = match_rx.clone(); + let tx = tx.clone(); + cx.spawn(|this, mut cx| async move { + while let Some(project_path) = match_rx.next().await { + let buffer = this + .update(&mut cx, |this, cx| this.open_buffer(project_path, cx))? + .await + .log_err(); + if let Some(buffer) = buffer { + tx.send_blocking(buffer).ok(); + } + } + anyhow::Ok(()) + }) + .detach(); + } + rx + } + + /// Returns open buffers filtered by filename + /// Does *not* check the buffer content, the caller must do that + fn find_open_search_candidates( + &self, + query: &SearchQuery, + cx: &ModelContext, + ) -> Vec> { + let include_root = self + .worktree_store + .read(cx) + .visible_worktrees(cx) + .collect::>() + .len() + > 1; + self.buffers() + .filter_map(|buffer| { + let handle = buffer.clone(); + buffer.read_with(cx, |buffer, cx| { + let worktree_store = self.worktree_store.read(cx); + let entry_id = buffer.entry_id(cx); + let is_ignored = entry_id + .and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx)) + .map_or(false, |entry| entry.is_ignored); + + if is_ignored && !query.include_ignored() { + return None; + } + if let Some(file) = buffer.file() { + let matched_path = if include_root { + query.file_matches(Some(&file.full_path(cx))) + } else { + query.file_matches(Some(file.path())) + }; + + if matched_path { + Some(handle) + } else { + None + } + } else { + Some(handle) + } + }) + }) + .collect() + } + fn on_buffer_event( &mut self, buffer: Model, diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 81a9169227..24fa4e3ba8 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -24,7 +24,7 @@ use client::{ TypedEnvelope, UserStore, }; use clock::ReplicaId; -use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; +use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet}; use debounced_delay::DebouncedDelay; use futures::{ channel::mpsc::{self, UnboundedReceiver}, @@ -37,8 +37,8 @@ use futures::{ use git::{blame::Blame, repository::GitRepository}; use globset::{Glob, GlobSet, GlobSetBuilder}; use gpui::{ - AnyModel, AppContext, AsyncAppContext, BackgroundExecutor, BorrowAppContext, Context, Entity, - EventEmitter, Model, ModelContext, PromptLevel, SharedString, Task, WeakModel, WindowContext, + AnyModel, AppContext, AsyncAppContext, BorrowAppContext, Context, Entity, EventEmitter, Model, + ModelContext, PromptLevel, SharedString, Task, WeakModel, WindowContext, }; use http_client::HttpClient; use itertools::Itertools; @@ -77,17 +77,17 @@ use prettier_support::{DefaultPrettier, PrettierInstance}; use project_settings::{DirenvSettings, LspSettings, ProjectSettings}; use rand::prelude::*; use remote::SshSession; -use rpc::{proto::AddWorktree, ErrorCode}; -use search::SearchQuery; +use rpc::{ + proto::{AddWorktree, AnyProtoClient}, + ErrorCode, +}; +use search::{SearchQuery, SearchResult}; use search_history::SearchHistory; use serde::Serialize; use settings::{watch_config_file, Settings, SettingsLocation, SettingsStore}; use sha2::{Digest, Sha256}; use similar::{ChangeTag, TextDiff}; -use smol::{ - channel::{Receiver, Sender}, - lock::Semaphore, -}; +use smol::channel::{Receiver, Sender}; use snippet::Snippet; use snippet_provider::SnippetProvider; use std::{ @@ -142,6 +142,8 @@ const SERVER_LAUNCHING_BEFORE_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5 pub const SERVER_PROGRESS_THROTTLE_TIMEOUT: Duration = Duration::from_millis(100); const MAX_PROJECT_SEARCH_HISTORY_SIZE: usize = 500; +const MAX_SEARCH_RESULT_FILES: usize = 5_000; +const MAX_SEARCH_RESULT_RANGES: usize = 10_000; pub trait Item { fn try_open( @@ -214,6 +216,7 @@ pub struct Project { buffers_being_formatted: HashSet, buffers_needing_diff: HashSet>, git_diff_debouncer: DebouncedDelay, + remotely_created_buffers: Arc>, nonce: u128, _maintain_buffer_languages: Task<()>, _maintain_workspace_config: Task>, @@ -232,6 +235,32 @@ pub struct Project { cached_shell_environments: HashMap>, } +#[derive(Default)] +struct RemotelyCreatedBuffers { + buffers: Vec>, + retain_count: usize, +} + +struct RemotelyCreatedBufferGuard { + remote_buffers: std::sync::Weak>, +} + +impl Drop for RemotelyCreatedBufferGuard { + fn drop(&mut self) { + if let Some(remote_buffers) = self.remote_buffers.upgrade() { + let mut remote_buffers = remote_buffers.lock(); + assert!( + remote_buffers.retain_count > 0, + "RemotelyCreatedBufferGuard dropped too many times" + ); + remote_buffers.retain_count -= 1; + if remote_buffers.retain_count == 0 { + remote_buffers.buffers.clear(); + } + } + } +} + #[derive(Debug)] pub enum LanguageServerToQuery { Primary, @@ -680,29 +709,6 @@ impl DirectoryLister { } } -#[derive(Clone, Debug, PartialEq)] -enum SearchMatchCandidate { - OpenBuffer { - buffer: Model, - // This might be an unnamed file without representation on filesystem - path: Option>, - }, - Path { - worktree_id: WorktreeId, - is_ignored: bool, - is_file: bool, - path: Arc, - }, -} - -pub enum SearchResult { - Buffer { - buffer: Model, - ranges: Vec>, - }, - LimitReached, -} - #[cfg(any(test, feature = "test-support"))] pub const DEFAULT_COMPLETION_CONTEXT: CompletionContext = CompletionContext { trigger_kind: lsp::CompletionTriggerKind::INVOKED, @@ -752,6 +758,7 @@ impl Project { client.add_model_request_handler(Self::handle_lsp_command::); client.add_model_request_handler(Self::handle_lsp_command::); client.add_model_request_handler(Self::handle_search_project); + client.add_model_request_handler(Self::handle_search_candidate_buffers); client.add_model_request_handler(Self::handle_get_project_symbols); client.add_model_request_handler(Self::handle_open_buffer_for_symbol); client.add_model_request_handler(Self::handle_open_buffer_by_id); @@ -861,6 +868,7 @@ impl Project { dev_server_project_id: None, search_history: Self::new_search_history(), cached_shell_environments: HashMap::default(), + remotely_created_buffers: Default::default(), } }) } @@ -1056,6 +1064,7 @@ impl Project { .map(|dev_server_project_id| DevServerProjectId(dev_server_project_id)), search_history: Self::new_search_history(), cached_shell_environments: HashMap::default(), + remotely_created_buffers: Arc::new(Mutex::new(RemotelyCreatedBuffers::default())), }; this.set_role(role, cx); for worktree in worktrees { @@ -1939,6 +1948,15 @@ impl Project { self.is_disconnected() || self.capability() == Capability::ReadOnly } + pub fn is_local(&self) -> bool { + match &self.client_state { + ProjectClientState::Local | ProjectClientState::Shared { .. } => { + self.ssh_session.is_none() + } + ProjectClientState::Remote { .. } => false, + } + } + pub fn is_local_or_ssh(&self) -> bool { match &self.client_state { ProjectClientState::Local | ProjectClientState::Shared { .. } => true, @@ -1947,7 +1965,10 @@ impl Project { } pub fn is_via_collab(&self) -> bool { - !self.is_local_or_ssh() + match &self.client_state { + ProjectClientState::Local | ProjectClientState::Shared { .. } => false, + ProjectClientState::Remote { .. } => true, + } } pub fn create_buffer(&mut self, cx: &mut ModelContext) -> Task>> { @@ -2167,6 +2188,13 @@ impl Project { buffer: &Model, cx: &mut ModelContext, ) -> Result<()> { + { + let mut remotely_created_buffers = self.remotely_created_buffers.lock(); + if remotely_created_buffers.retain_count > 0 { + remotely_created_buffers.buffers.push(buffer.clone()) + } + } + self.request_buffer_diff_recalculation(buffer, cx); buffer.update(cx, |buffer, _| { buffer.set_language_registry(self.languages.clone()) @@ -7242,182 +7270,26 @@ impl Project { } } - #[allow(clippy::type_complexity)] pub fn search( - &self, + &mut self, query: SearchQuery, cx: &mut ModelContext, ) -> Receiver { - if self.is_local_or_ssh() { - self.search_local(query, cx) - } else if let Some(project_id) = self.remote_id() { - let (tx, rx) = smol::channel::unbounded(); - let request = self.client.request(query.to_proto(project_id)); - cx.spawn(move |this, mut cx| async move { - let response = request.await?; - let mut result = HashMap::default(); - for location in response.locations { - let buffer_id = BufferId::new(location.buffer_id)?; - let target_buffer = this - .update(&mut cx, |this, cx| { - this.wait_for_remote_buffer(buffer_id, cx) - })? - .await?; - let start = location - .start - .and_then(deserialize_anchor) - .ok_or_else(|| anyhow!("missing target start"))?; - let end = location - .end - .and_then(deserialize_anchor) - .ok_or_else(|| anyhow!("missing target end"))?; - result - .entry(target_buffer) - .or_insert(Vec::new()) - .push(start..end) - } - for (buffer, ranges) in result { - let _ = tx.send(SearchResult::Buffer { buffer, ranges }).await; - } - - if response.limit_reached { - let _ = tx.send(SearchResult::LimitReached).await; - } - - Result::<(), anyhow::Error>::Ok(()) - }) - .detach_and_log_err(cx); - rx - } else { - unimplemented!(); - } - } - - pub fn search_local( - &self, - query: SearchQuery, - cx: &mut ModelContext, - ) -> Receiver { - // Local search is split into several phases. - // TL;DR is that we do 2 passes; initial pass to pick files which contain at least one match - // and the second phase that finds positions of all the matches found in the candidate files. - // The Receiver obtained from this function returns matches sorted by buffer path. Files without a buffer path are reported first. - // - // It gets a bit hairy though, because we must account for files that do not have a persistent representation - // on FS. Namely, if you have an untitled buffer or unsaved changes in a buffer, we want to scan that too. - // - // 1. We initialize a queue of match candidates and feed all opened buffers into it (== unsaved files / untitled buffers). - // Then, we go through a worktree and check for files that do match a predicate. If the file had an opened version, we skip the scan - // of FS version for that file altogether - after all, what we have in memory is more up-to-date than what's in FS. - // 2. At this point, we have a list of all potentially matching buffers/files. - // We sort that list by buffer path - this list is retained for later use. - // We ensure that all buffers are now opened and available in project. - // 3. We run a scan over all the candidate buffers on multiple background threads. - // We cannot assume that there will even be a match - while at least one match - // is guaranteed for files obtained from FS, the buffers we got from memory (unsaved files/unnamed buffers) might not have a match at all. - // There is also an auxiliary background thread responsible for result gathering. - // This is where the sorted list of buffers comes into play to maintain sorted order; Whenever this background thread receives a notification (buffer has/doesn't have matches), - // it keeps it around. It reports matches in sorted order, though it accepts them in unsorted order as well. - // As soon as the match info on next position in sorted order becomes available, it reports it (if it's a match) or skips to the next - // entry - which might already be available thanks to out-of-order processing. - // - // We could also report matches fully out-of-order, without maintaining a sorted list of matching paths. - // This however would mean that project search (that is the main user of this function) would have to do the sorting itself, on the go. - // This isn't as straightforward as running an insertion sort sadly, and would also mean that it would have to care about maintaining match index - // in face of constantly updating list of sorted matches. - // Meanwhile, this implementation offers index stability, since the matches are already reported in a sorted order. - let snapshots = self - .visible_worktrees(cx) - .filter_map(|tree| { - let tree = tree.read(cx); - Some((tree.snapshot(), tree.as_local()?.settings())) - }) - .collect::>(); - let include_root = snapshots.len() > 1; - - let background = cx.background_executor().clone(); - let path_count: usize = snapshots - .iter() - .map(|(snapshot, _)| { - if query.include_ignored() { - snapshot.file_count() - } else { - snapshot.visible_file_count() - } - }) - .sum(); - if path_count == 0 { - let (_, rx) = smol::channel::bounded(1024); - return rx; - } - let workers = background.num_cpus().min(path_count); - let (matching_paths_tx, matching_paths_rx) = smol::channel::bounded(1024); - let mut unnamed_files = vec![]; - let opened_buffers = self.buffer_store.update(cx, |buffer_store, cx| { - buffer_store - .buffers() - .filter_map(|buffer| { - let (is_ignored, snapshot) = buffer.update(cx, |buffer, cx| { - let is_ignored = buffer - .project_path(cx) - .and_then(|path| self.entry_for_path(&path, cx)) - .map_or(false, |entry| entry.is_ignored); - (is_ignored, buffer.snapshot()) - }); - if is_ignored && !query.include_ignored() { - return None; - } else if let Some(file) = snapshot.file() { - let matched_path = if include_root { - query.file_matches(Some(&file.full_path(cx))) - } else { - query.file_matches(Some(file.path())) - }; - - if matched_path { - Some((file.path().clone(), (buffer, snapshot))) - } else { - None - } - } else { - unnamed_files.push(buffer); - None - } - }) - .collect() - }); - cx.background_executor() - .spawn(Self::background_search( - unnamed_files, - opened_buffers, - cx.background_executor().clone(), - self.fs.clone(), - workers, - query.clone(), - include_root, - path_count, - snapshots, - matching_paths_tx, - )) - .detach(); - let (result_tx, result_rx) = smol::channel::bounded(1024); - cx.spawn(|this, mut cx| async move { - const MAX_SEARCH_RESULT_FILES: usize = 5_000; - const MAX_SEARCH_RESULT_RANGES: usize = 10_000; + let matching_buffers_rx = + self.search_for_candidate_buffers(&query, MAX_SEARCH_RESULT_FILES + 1, cx); - let mut matching_paths = matching_paths_rx - .take(MAX_SEARCH_RESULT_FILES + 1) - .collect::>() - .await; - let mut limit_reached = if matching_paths.len() > MAX_SEARCH_RESULT_FILES { - matching_paths.pop(); + cx.spawn(|_, cx| async move { + let mut matching_buffers = matching_buffers_rx.collect::>().await; + let mut limit_reached = if matching_buffers.len() > MAX_SEARCH_RESULT_FILES { + matching_buffers.truncate(MAX_SEARCH_RESULT_FILES); true } else { false }; cx.update(|cx| { - sort_search_matches(&mut matching_paths, cx); + sort_search_matches(&mut matching_buffers, cx); })?; let mut range_count = 0; @@ -7427,23 +7299,12 @@ impl Project { // 64 buffers at a time to avoid overwhelming the main thread. For each // opened buffer, we will spawn a background task that retrieves all the // ranges in the buffer matched by the query. - 'outer: for matching_paths_chunk in matching_paths.chunks(64) { + 'outer: for matching_buffer_chunk in matching_buffers.chunks(64) { let mut chunk_results = Vec::new(); - for matching_path in matching_paths_chunk { + for buffer in matching_buffer_chunk { + let buffer = buffer.clone(); let query = query.clone(); - let buffer = match matching_path { - SearchMatchCandidate::OpenBuffer { buffer, .. } => { - Task::ready(Ok(buffer.clone())) - } - SearchMatchCandidate::Path { - worktree_id, path, .. - } => this.update(&mut cx, |this, cx| { - this.open_buffer((*worktree_id, path.clone()), cx) - })?, - }; - chunk_results.push(cx.spawn(|cx| async move { - let buffer = buffer.await?; let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot())?; let ranges = cx .background_executor() @@ -7489,93 +7350,63 @@ impl Project { result_rx } - /// Pick paths that might potentially contain a match of a given search query. - #[allow(clippy::too_many_arguments)] - async fn background_search( - unnamed_buffers: Vec>, - opened_buffers: HashMap, (Model, BufferSnapshot)>, - executor: BackgroundExecutor, - fs: Arc, - workers: usize, - query: SearchQuery, - include_root: bool, - path_count: usize, - snapshots: Vec<(Snapshot, WorktreeSettings)>, - matching_paths_tx: Sender, - ) { - let fs = &fs; - let query = &query; - let matching_paths_tx = &matching_paths_tx; - let snapshots = &snapshots; - for buffer in unnamed_buffers { - matching_paths_tx - .send(SearchMatchCandidate::OpenBuffer { - buffer: buffer.clone(), - path: None, - }) - .await - .log_err(); - } - for (path, (buffer, _)) in opened_buffers.iter() { - matching_paths_tx - .send(SearchMatchCandidate::OpenBuffer { - buffer: buffer.clone(), - path: Some(path.clone()), - }) - .await - .log_err(); + fn search_for_candidate_buffers( + &mut self, + query: &SearchQuery, + limit: usize, + cx: &mut ModelContext, + ) -> Receiver> { + if self.is_local() { + let fs = self.fs.clone(); + return self.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.find_search_candidates(query, limit, fs, cx) + }); + } else { + self.search_for_candidate_buffers_remote(query, limit, cx) } + } - let paths_per_worker = (path_count + workers - 1) / workers; + fn search_for_candidate_buffers_remote( + &mut self, + query: &SearchQuery, + limit: usize, + cx: &mut ModelContext, + ) -> Receiver> { + let (tx, rx) = smol::channel::unbounded(); - executor - .scoped(|scope| { - let max_concurrent_workers = Arc::new(Semaphore::new(workers)); + let (client, remote_id): (AnyProtoClient, _) = + if let Some(ssh_session) = self.ssh_session.clone() { + (ssh_session.into(), 0) + } else if let Some(remote_id) = self.remote_id() { + (self.client.clone().into(), remote_id) + } else { + return rx; + }; - for worker_ix in 0..workers { - let worker_start_ix = worker_ix * paths_per_worker; - let worker_end_ix = worker_start_ix + paths_per_worker; - let opened_buffers = opened_buffers.clone(); - let limiter = Arc::clone(&max_concurrent_workers); - scope.spawn({ - async move { - let _guard = limiter.acquire().await; - search_snapshots( - snapshots, - worker_start_ix, - worker_end_ix, - query, - matching_paths_tx, - &opened_buffers, - include_root, - fs, - ) - .await; - } - }); - } + let request = client.request(proto::FindSearchCandidates { + project_id: remote_id, + query: Some(query.to_proto()), + limit: limit as _, + }); + let guard = self.retain_remotely_created_buffers(); - if query.include_ignored() { - for (snapshot, settings) in snapshots { - for ignored_entry in snapshot.entries(true, 0).filter(|e| e.is_ignored) { - let limiter = Arc::clone(&max_concurrent_workers); - scope.spawn(async move { - let _guard = limiter.acquire().await; - search_ignored_entry( - snapshot, - settings, - ignored_entry, - fs, - query, - matching_paths_tx, - ) - .await; - }); - } - } - } - }) - .await; + cx.spawn(move |this, mut cx| async move { + let response = request.await?; + for buffer_id in response.buffer_ids { + let buffer_id = BufferId::new(buffer_id)?; + let buffer = this + .update(&mut cx, |this, cx| { + this.wait_for_remote_buffer(buffer_id, cx) + })? + .await?; + let _ = tx.send(buffer).await; + } + + drop(guard); + anyhow::Ok(()) + }) + .detach_and_log_err(cx); + rx } pub fn request_lsp( @@ -9075,6 +8906,13 @@ impl Project { BufferStore::handle_update_buffer(buffer_store, envelope, cx).await } + fn retain_remotely_created_buffers(&mut self) -> RemotelyCreatedBufferGuard { + self.remotely_created_buffers.lock().retain_count += 1; + RemotelyCreatedBufferGuard { + remote_buffers: Arc::downgrade(&self.remotely_created_buffers), + } + } + async fn handle_create_buffer_for_peer( this: Model, envelope: TypedEnvelope, @@ -9770,7 +9608,7 @@ impl Project { mut cx: AsyncAppContext, ) -> Result { let peer_id = envelope.original_sender_id()?; - let query = SearchQuery::from_proto(envelope.payload)?; + let query = SearchQuery::from_proto_v1(envelope.payload)?; let mut result = this.update(&mut cx, |this, cx| this.search(query, cx))?; cx.spawn(move |mut cx| async move { @@ -9798,11 +9636,42 @@ impl Project { Ok(proto::SearchProjectResponse { locations, limit_reached, + // will restart }) }) .await } + async fn handle_search_candidate_buffers( + this: Model, + envelope: TypedEnvelope, + mut cx: AsyncAppContext, + ) -> Result { + let peer_id = envelope.original_sender_id()?; + let message = envelope.payload; + let query = SearchQuery::from_proto( + message + .query + .ok_or_else(|| anyhow!("missing query field"))?, + )?; + let mut results = this.update(&mut cx, |this, cx| { + this.search_for_candidate_buffers(&query, message.limit as _, cx) + })?; + + let mut response = proto::FindSearchCandidatesResponse { + buffer_ids: Vec::new(), + }; + + while let Some(buffer) = results.next().await { + this.update(&mut cx, |this, cx| { + let buffer_id = this.create_buffer_for_peer(&buffer, peer_id, cx); + response.buffer_ids.push(buffer_id.to_proto()); + })?; + } + + Ok(response) + } + async fn handle_open_buffer_for_symbol( this: Model, envelope: TypedEnvelope, @@ -10916,162 +10785,6 @@ fn deserialize_code_actions(code_actions: &HashMap) -> Vec, - worker_start_ix: usize, - worker_end_ix: usize, - query: &SearchQuery, - results_tx: &Sender, - opened_buffers: &HashMap, (Model, BufferSnapshot)>, - include_root: bool, - fs: &Arc, -) { - let mut snapshot_start_ix = 0; - let mut abs_path = PathBuf::new(); - - for (snapshot, _) in snapshots { - let snapshot_end_ix = snapshot_start_ix - + if query.include_ignored() { - snapshot.file_count() - } else { - snapshot.visible_file_count() - }; - if worker_end_ix <= snapshot_start_ix { - break; - } else if worker_start_ix > snapshot_end_ix { - snapshot_start_ix = snapshot_end_ix; - continue; - } else { - let start_in_snapshot = worker_start_ix.saturating_sub(snapshot_start_ix); - let end_in_snapshot = cmp::min(worker_end_ix, snapshot_end_ix) - snapshot_start_ix; - - for entry in snapshot - .files(false, start_in_snapshot) - .take(end_in_snapshot - start_in_snapshot) - { - if results_tx.is_closed() { - break; - } - if opened_buffers.contains_key(&entry.path) { - continue; - } - - let matched_path = if include_root { - let mut full_path = PathBuf::from(snapshot.root_name()); - full_path.push(&entry.path); - query.file_matches(Some(&full_path)) - } else { - query.file_matches(Some(&entry.path)) - }; - - let matches = if matched_path { - abs_path.clear(); - abs_path.push(&snapshot.abs_path()); - abs_path.push(&entry.path); - - if entry.is_fifo { - false - } else { - if let Some(file) = fs.open_sync(&abs_path).await.log_err() { - query.detect(file).unwrap_or(false) - } else { - false - } - } - } else { - false - }; - - if matches { - let project_path = SearchMatchCandidate::Path { - worktree_id: snapshot.id(), - path: entry.path.clone(), - is_ignored: entry.is_ignored, - is_file: entry.is_file(), - }; - if results_tx.send(project_path).await.is_err() { - return; - } - } - } - - snapshot_start_ix = snapshot_end_ix; - } - } -} - -async fn search_ignored_entry( - snapshot: &Snapshot, - settings: &WorktreeSettings, - ignored_entry: &Entry, - fs: &Arc, - query: &SearchQuery, - counter_tx: &Sender, -) { - let mut ignored_paths_to_process = - VecDeque::from([snapshot.abs_path().join(&ignored_entry.path)]); - - while let Some(ignored_abs_path) = ignored_paths_to_process.pop_front() { - let metadata = fs - .metadata(&ignored_abs_path) - .await - .with_context(|| format!("fetching fs metadata for {ignored_abs_path:?}")) - .log_err() - .flatten(); - - if let Some(fs_metadata) = metadata { - if fs_metadata.is_dir { - let files = fs - .read_dir(&ignored_abs_path) - .await - .with_context(|| format!("listing ignored path {ignored_abs_path:?}")) - .log_err(); - - if let Some(mut subfiles) = files { - while let Some(subfile) = subfiles.next().await { - if let Some(subfile) = subfile.log_err() { - ignored_paths_to_process.push_back(subfile); - } - } - } - } else if !fs_metadata.is_symlink { - if !query.file_matches(Some(&ignored_abs_path)) - || settings.is_path_excluded(&ignored_entry.path) - { - continue; - } - let matches = if let Some(file) = fs - .open_sync(&ignored_abs_path) - .await - .with_context(|| format!("Opening ignored path {ignored_abs_path:?}")) - .log_err() - { - query.detect(file).unwrap_or(false) - } else { - false - }; - - if matches { - let project_path = SearchMatchCandidate::Path { - worktree_id: snapshot.id(), - path: Arc::from( - ignored_abs_path - .strip_prefix(snapshot.abs_path()) - .expect("scanning worktree-related files"), - ), - is_ignored: true, - is_file: ignored_entry.is_file(), - }; - if counter_tx.send(project_path).await.is_err() { - return; - } - } - } - } - } -} - fn glob_literal_prefix(glob: &str) -> &str { let mut literal_end = 0; for (i, part) in glob.split(path::MAIN_SEPARATOR).enumerate() { @@ -11657,74 +11370,18 @@ pub fn sort_worktree_entries(entries: &mut Vec) { }); } -fn sort_search_matches(search_matches: &mut Vec, cx: &AppContext) { - search_matches.sort_by(|entry_a, entry_b| match (entry_a, entry_b) { - ( - SearchMatchCandidate::OpenBuffer { - buffer: buffer_a, - path: None, - }, - SearchMatchCandidate::OpenBuffer { - buffer: buffer_b, - path: None, - }, - ) => buffer_a - .read(cx) - .remote_id() - .cmp(&buffer_b.read(cx).remote_id()), - ( - SearchMatchCandidate::OpenBuffer { path: None, .. }, - SearchMatchCandidate::Path { .. } - | SearchMatchCandidate::OpenBuffer { path: Some(_), .. }, - ) => Ordering::Less, - ( - SearchMatchCandidate::OpenBuffer { path: Some(_), .. } - | SearchMatchCandidate::Path { .. }, - SearchMatchCandidate::OpenBuffer { path: None, .. }, - ) => Ordering::Greater, - ( - SearchMatchCandidate::OpenBuffer { - path: Some(path_a), .. - }, - SearchMatchCandidate::Path { - is_file: is_file_b, - path: path_b, - .. - }, - ) => compare_paths((path_a.as_ref(), true), (path_b.as_ref(), *is_file_b)), - ( - SearchMatchCandidate::Path { - is_file: is_file_a, - path: path_a, - .. - }, - SearchMatchCandidate::OpenBuffer { - path: Some(path_b), .. - }, - ) => compare_paths((path_a.as_ref(), *is_file_a), (path_b.as_ref(), true)), - ( - SearchMatchCandidate::OpenBuffer { - path: Some(path_a), .. - }, - SearchMatchCandidate::OpenBuffer { - path: Some(path_b), .. - }, - ) => compare_paths((path_a.as_ref(), true), (path_b.as_ref(), true)), - ( - SearchMatchCandidate::Path { - worktree_id: worktree_id_a, - is_file: is_file_a, - path: path_a, - .. - }, - SearchMatchCandidate::Path { - worktree_id: worktree_id_b, - is_file: is_file_b, - path: path_b, - .. - }, - ) => worktree_id_a.cmp(&worktree_id_b).then_with(|| { - compare_paths((path_a.as_ref(), *is_file_a), (path_b.as_ref(), *is_file_b)) - }), +fn sort_search_matches(search_matches: &mut Vec>, cx: &AppContext) { + search_matches.sort_by(|buffer_a, buffer_b| { + let path_a = buffer_a.read(cx).file().map(|file| file.path()); + let path_b = buffer_b.read(cx).file().map(|file| file.path()); + + match (path_a, path_b) { + (None, None) => cmp::Ordering::Equal, + (None, Some(_)) => cmp::Ordering::Less, + (Some(_), None) => cmp::Ordering::Greater, + (Some(path_a), Some(path_b)) => { + compare_paths((path_a.as_ref(), true), (path_b.as_ref(), true)) + } + } }); } diff --git a/crates/project/src/search.rs b/crates/project/src/search.rs index 70b2ada8e4..67e7fccc63 100644 --- a/crates/project/src/search.rs +++ b/crates/project/src/search.rs @@ -1,7 +1,8 @@ use aho_corasick::{AhoCorasick, AhoCorasickBuilder}; use anyhow::Result; use client::proto; -use language::{char_kind, BufferSnapshot}; +use gpui::Model; +use language::{char_kind, Buffer, BufferSnapshot}; use regex::{Captures, Regex, RegexBuilder}; use smol::future::yield_now; use std::{ @@ -11,10 +12,19 @@ use std::{ path::Path, sync::{Arc, OnceLock}, }; +use text::Anchor; use util::paths::PathMatcher; static TEXT_REPLACEMENT_SPECIAL_CHARACTERS_REGEX: OnceLock = OnceLock::new(); +pub enum SearchResult { + Buffer { + buffer: Model, + ranges: Vec>, + }, + LimitReached, +} + #[derive(Clone, Debug)] pub struct SearchInputs { query: Arc, @@ -122,7 +132,29 @@ impl SearchQuery { }) } - pub fn from_proto(message: proto::SearchProject) -> Result { + pub fn from_proto_v1(message: proto::SearchProject) -> Result { + if message.regex { + Self::regex( + message.query, + message.whole_word, + message.case_sensitive, + message.include_ignored, + deserialize_path_matches(&message.files_to_include)?, + deserialize_path_matches(&message.files_to_exclude)?, + ) + } else { + Self::text( + message.query, + message.whole_word, + message.case_sensitive, + message.include_ignored, + deserialize_path_matches(&message.files_to_include)?, + deserialize_path_matches(&message.files_to_exclude)?, + ) + } + } + + pub fn from_proto(message: proto::SearchQuery) -> Result { if message.regex { Self::regex( message.query, @@ -158,7 +190,7 @@ impl SearchQuery { } } } - pub fn to_proto(&self, project_id: u64) -> proto::SearchProject { + pub fn to_protov1(&self, project_id: u64) -> proto::SearchProject { proto::SearchProject { project_id, query: self.as_str().to_string(), @@ -171,6 +203,18 @@ impl SearchQuery { } } + pub fn to_proto(&self) -> proto::SearchQuery { + proto::SearchQuery { + query: self.as_str().to_string(), + regex: self.is_regex(), + whole_word: self.whole_word(), + case_sensitive: self.case_sensitive(), + include_ignored: self.include_ignored(), + files_to_include: self.files_to_include().sources().join(","), + files_to_exclude: self.files_to_exclude().sources().join(","), + } + } + pub fn detect(&self, stream: T) -> Result { if self.as_str().is_empty() { return Ok(false); @@ -402,7 +446,7 @@ impl SearchQuery { } } -fn deserialize_path_matches(glob_set: &str) -> anyhow::Result { +pub fn deserialize_path_matches(glob_set: &str) -> anyhow::Result { let globs = glob_set .split(',') .map(str::trim) diff --git a/crates/project/src/worktree_store.rs b/crates/project/src/worktree_store.rs index fb79b3f8ea..a966861630 100644 --- a/crates/project/src/worktree_store.rs +++ b/crates/project/src/worktree_store.rs @@ -1,12 +1,31 @@ +use std::{ + cmp, + collections::VecDeque, + path::PathBuf, + sync::{ + atomic::{AtomicUsize, Ordering::SeqCst}, + Arc, + }, +}; + use anyhow::{anyhow, Context as _, Result}; -use collections::HashMap; +use collections::{HashMap, HashSet}; +use fs::Fs; use gpui::{AppContext, AsyncAppContext, EntityId, EventEmitter, Model, ModelContext, WeakModel}; use rpc::{ proto::{self, AnyProtoClient}, TypedEnvelope, }; +use smol::{ + channel::{Receiver, Sender}, + lock::Semaphore, + stream::StreamExt, +}; use text::ReplicaId; -use worktree::{ProjectEntryId, Worktree, WorktreeId}; +use util::ResultExt; +use worktree::{Entry, ProjectEntryId, Snapshot, Worktree, WorktreeId, WorktreeSettings}; + +use crate::{search::SearchQuery, ProjectPath}; pub struct WorktreeStore { is_shared: bool, @@ -61,6 +80,15 @@ impl WorktreeStore { .find(|worktree| worktree.read(cx).contains_entry(entry_id)) } + pub fn entry_for_id<'a>( + &'a self, + entry_id: ProjectEntryId, + cx: &'a AppContext, + ) -> Option<&'a Entry> { + self.worktrees() + .find_map(|worktree| worktree.read(cx).entry_for_id(entry_id)) + } + pub fn add(&mut self, worktree: &Model, cx: &mut ModelContext) { let push_strong_handle = self.is_shared || worktree.read(cx).is_visible(); let handle = if push_strong_handle { @@ -238,6 +266,287 @@ impl WorktreeStore { } } + /// search over all worktrees (ignoring open buffers) + /// the query is tested against the file on disk and matching files are returned. + pub fn find_search_candidates( + &self, + query: SearchQuery, + limit: usize, + skip_entries: HashSet, + fs: Arc, + cx: &ModelContext, + ) -> Receiver { + let (matching_paths_tx, matching_paths_rx) = smol::channel::bounded(1024); + let snapshots = self + .visible_worktrees(cx) + .filter_map(|tree| { + let tree = tree.read(cx); + Some((tree.snapshot(), tree.as_local()?.settings())) + }) + .collect::>(); + let include_root = snapshots.len() > 1; + let path_count: usize = snapshots + .iter() + .map(|(snapshot, _)| { + if query.include_ignored() { + snapshot.file_count() + } else { + snapshot.visible_file_count() + } + }) + .sum(); + + let remaining_paths = AtomicUsize::new(limit); + if path_count == 0 { + return matching_paths_rx; + } + let workers = cx.background_executor().num_cpus().min(path_count); + let paths_per_worker = (path_count + workers - 1) / workers; + + let executor = cx.background_executor().clone(); + cx.background_executor() + .spawn(async move { + let fs = &fs; + let query = &query; + let matching_paths_tx = &matching_paths_tx; + let snapshots = &snapshots; + let remaining_paths = &remaining_paths; + + executor + .scoped(move |scope| { + let max_concurrent_workers = Arc::new(Semaphore::new(workers)); + + for worker_ix in 0..workers { + let snapshots = snapshots.clone(); + let worker_start_ix = worker_ix * paths_per_worker; + let worker_end_ix = worker_start_ix + paths_per_worker; + let skip_entries = skip_entries.clone(); + let limiter = Arc::clone(&max_concurrent_workers); + scope.spawn(async move { + let _guard = limiter.acquire().await; + Self::search_snapshots( + &snapshots, + worker_start_ix, + worker_end_ix, + &query, + remaining_paths, + &matching_paths_tx, + &skip_entries, + include_root, + fs, + ) + .await; + }); + } + + if query.include_ignored() { + for (snapshot, settings) in snapshots { + for ignored_entry in + snapshot.entries(true, 0).filter(|e| e.is_ignored) + { + let limiter = Arc::clone(&max_concurrent_workers); + scope.spawn(async move { + let _guard = limiter.acquire().await; + if remaining_paths.load(SeqCst) == 0 { + return; + } + + Self::search_ignored_entry( + &snapshot, + &settings, + ignored_entry, + &fs, + &query, + remaining_paths, + &matching_paths_tx, + ) + .await; + }); + } + } + } + }) + .await + }) + .detach(); + return matching_paths_rx; + } + + #[allow(clippy::too_many_arguments)] + async fn search_snapshots( + snapshots: &Vec<(worktree::Snapshot, WorktreeSettings)>, + worker_start_ix: usize, + worker_end_ix: usize, + query: &SearchQuery, + remaining_paths: &AtomicUsize, + results_tx: &Sender, + skip_entries: &HashSet, + include_root: bool, + fs: &Arc, + ) { + let mut snapshot_start_ix = 0; + let mut abs_path = PathBuf::new(); + + for (snapshot, _) in snapshots { + let snapshot_end_ix = snapshot_start_ix + + if query.include_ignored() { + snapshot.file_count() + } else { + snapshot.visible_file_count() + }; + if worker_end_ix <= snapshot_start_ix { + break; + } else if worker_start_ix > snapshot_end_ix { + snapshot_start_ix = snapshot_end_ix; + continue; + } else { + let start_in_snapshot = worker_start_ix.saturating_sub(snapshot_start_ix); + let end_in_snapshot = cmp::min(worker_end_ix, snapshot_end_ix) - snapshot_start_ix; + + for entry in snapshot + .files(false, start_in_snapshot) + .take(end_in_snapshot - start_in_snapshot) + { + if results_tx.is_closed() { + break; + } + if skip_entries.contains(&entry.id) { + continue; + } + + let matched_path = if include_root { + let mut full_path = PathBuf::from(snapshot.root_name()); + full_path.push(&entry.path); + query.file_matches(Some(&full_path)) + } else { + query.file_matches(Some(&entry.path)) + }; + + let matches = if matched_path { + abs_path.clear(); + abs_path.push(&snapshot.abs_path()); + abs_path.push(&entry.path); + if let Some(file) = fs.open_sync(&abs_path).await.log_err() { + query.detect(file).unwrap_or(false) + } else { + false + } + } else { + false + }; + + if matches { + if remaining_paths + .fetch_update(SeqCst, SeqCst, |value| { + if value > 0 { + Some(value - 1) + } else { + None + } + }) + .is_err() + { + return; + } + + let project_path = ProjectPath { + worktree_id: snapshot.id(), + path: entry.path.clone(), + }; + if results_tx.send(project_path).await.is_err() { + return; + } + } + } + + snapshot_start_ix = snapshot_end_ix; + } + } + } + + async fn search_ignored_entry( + snapshot: &Snapshot, + settings: &WorktreeSettings, + ignored_entry: &Entry, + fs: &Arc, + query: &SearchQuery, + remaining_paths: &AtomicUsize, + counter_tx: &Sender, + ) { + let mut ignored_paths_to_process = + VecDeque::from([snapshot.abs_path().join(&ignored_entry.path)]); + + while let Some(ignored_abs_path) = ignored_paths_to_process.pop_front() { + let metadata = fs + .metadata(&ignored_abs_path) + .await + .with_context(|| format!("fetching fs metadata for {ignored_abs_path:?}")) + .log_err() + .flatten(); + + if let Some(fs_metadata) = metadata { + if fs_metadata.is_dir { + let files = fs + .read_dir(&ignored_abs_path) + .await + .with_context(|| format!("listing ignored path {ignored_abs_path:?}")) + .log_err(); + + if let Some(mut subfiles) = files { + while let Some(subfile) = subfiles.next().await { + if let Some(subfile) = subfile.log_err() { + ignored_paths_to_process.push_back(subfile); + } + } + } + } else if !fs_metadata.is_symlink { + if !query.file_matches(Some(&ignored_abs_path)) + || settings.is_path_excluded(&ignored_entry.path) + { + continue; + } + let matches = if let Some(file) = fs + .open_sync(&ignored_abs_path) + .await + .with_context(|| format!("Opening ignored path {ignored_abs_path:?}")) + .log_err() + { + query.detect(file).unwrap_or(false) + } else { + false + }; + + if matches { + if remaining_paths + .fetch_update(SeqCst, SeqCst, |value| { + if value > 0 { + Some(value - 1) + } else { + None + } + }) + .is_err() + { + return; + } + + let project_path = ProjectPath { + worktree_id: snapshot.id(), + path: Arc::from( + ignored_abs_path + .strip_prefix(snapshot.abs_path()) + .expect("scanning worktree-related files"), + ), + }; + if counter_tx.send(project_path).await.is_err() { + return; + } + } + } + } + } + } + pub async fn handle_create_project_entry( this: Model, envelope: TypedEnvelope, diff --git a/crates/proto/proto/zed.proto b/crates/proto/proto/zed.proto index 13a20adaa7..4cdc33cddc 100644 --- a/crates/proto/proto/zed.proto +++ b/crates/proto/proto/zed.proto @@ -275,7 +275,10 @@ message Envelope { GetLlmTokenResponse get_llm_token_response = 236; LspExtSwitchSourceHeader lsp_ext_switch_source_header = 241; - LspExtSwitchSourceHeaderResponse lsp_ext_switch_source_header_response = 242; // current max + LspExtSwitchSourceHeaderResponse lsp_ext_switch_source_header_response = 242; + + FindSearchCandidates find_search_candidates = 243; + FindSearchCandidatesResponse find_search_candidates_response = 244; // current max } reserved 158 to 161; @@ -1236,6 +1239,26 @@ message SearchProjectResponse { bool limit_reached = 2; } +message SearchQuery { + string query = 2; + bool regex = 3; + bool whole_word = 4; + bool case_sensitive = 5; + string files_to_include = 6; + string files_to_exclude = 7; + bool include_ignored = 8; +} + +message FindSearchCandidates { + uint64 project_id = 1; + SearchQuery query = 2; + uint64 limit = 3; +} + +message FindSearchCandidatesResponse { + repeated uint64 buffer_ids = 1; +} + message CodeAction { uint64 server_id = 1; Anchor start = 2; diff --git a/crates/proto/src/proto.rs b/crates/proto/src/proto.rs index 402c134c4e..38d1dfa8c7 100644 --- a/crates/proto/src/proto.rs +++ b/crates/proto/src/proto.rs @@ -410,6 +410,8 @@ messages!( (LspExtSwitchSourceHeaderResponse, Background), (AddWorktree, Foreground), (AddWorktreeResponse, Foreground), + (FindSearchCandidates, Background), + (FindSearchCandidatesResponse, Background) ); request_messages!( @@ -498,6 +500,7 @@ request_messages!( (RespondToContactRequest, Ack), (SaveBuffer, BufferSaved), (SearchProject, SearchProjectResponse), + (FindSearchCandidates, FindSearchCandidatesResponse), (SendChannelMessage, SendChannelMessageResponse), (SetChannelMemberRole, Ack), (SetChannelVisibility, Ack), @@ -547,6 +550,7 @@ entity_messages!( CreateProjectEntry, DeleteProjectEntry, ExpandProjectEntry, + FindSearchCandidates, FormatBuffers, GetCodeActions, GetCompletions, diff --git a/crates/remote_server/src/headless_project.rs b/crates/remote_server/src/headless_project.rs index d08bdc4b74..459e413742 100644 --- a/crates/remote_server/src/headless_project.rs +++ b/crates/remote_server/src/headless_project.rs @@ -1,8 +1,9 @@ -use anyhow::Result; +use anyhow::{anyhow, Result}; use fs::Fs; use gpui::{AppContext, AsyncAppContext, Context, Model, ModelContext}; use project::{ buffer_store::{BufferStore, BufferStoreEvent}, + search::SearchQuery, worktree_store::WorktreeStore, ProjectPath, WorktreeId, WorktreeSettings, }; @@ -49,6 +50,7 @@ impl HeadlessProject { session.add_request_handler(this.clone(), Self::handle_list_remote_directory); session.add_request_handler(this.clone(), Self::handle_add_worktree); session.add_request_handler(this.clone(), Self::handle_open_buffer_by_path); + session.add_request_handler(this.clone(), Self::handle_find_search_candidates); session.add_request_handler(buffer_store.downgrade(), BufferStore::handle_blame_buffer); session.add_request_handler(buffer_store.downgrade(), BufferStore::handle_update_buffer); @@ -160,6 +162,49 @@ impl HeadlessProject { }) } + pub async fn handle_find_search_candidates( + this: Model, + envelope: TypedEnvelope, + mut cx: AsyncAppContext, + ) -> Result { + let message = envelope.payload; + let query = SearchQuery::from_proto( + message + .query + .ok_or_else(|| anyhow!("missing query field"))?, + )?; + let mut results = this.update(&mut cx, |this, cx| { + this.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.find_search_candidates(&query, message.limit as _, this.fs.clone(), cx) + }) + })?; + + let mut response = proto::FindSearchCandidatesResponse { + buffer_ids: Vec::new(), + }; + + let (buffer_store, client) = this.update(&mut cx, |this, _| { + (this.buffer_store.clone(), this.session.clone()) + })?; + + while let Some(buffer) = results.next().await { + let buffer_id = buffer.update(&mut cx, |this, _| this.remote_id())?; + response.buffer_ids.push(buffer_id.to_proto()); + + BufferStore::create_buffer_for_peer( + buffer_store.clone(), + PEER_ID, + buffer_id, + PROJECT_ID, + client.clone(), + &mut cx, + ) + .await?; + } + + Ok(response) + } + pub async fn handle_list_remote_directory( this: Model, envelope: TypedEnvelope, diff --git a/crates/remote_server/src/remote_editing_tests.rs b/crates/remote_server/src/remote_editing_tests.rs index c166ff650a..56f64bb50e 100644 --- a/crates/remote_server/src/remote_editing_tests.rs +++ b/crates/remote_server/src/remote_editing_tests.rs @@ -1,55 +1,24 @@ use crate::headless_project::HeadlessProject; use client::{Client, UserStore}; use clock::FakeSystemClock; -use fs::{FakeFs, Fs as _}; +use fs::{FakeFs, Fs}; use gpui::{Context, Model, TestAppContext}; use http_client::FakeHttpClient; use language::LanguageRegistry; use node_runtime::FakeNodeRuntime; -use project::Project; +use project::{ + search::{SearchQuery, SearchResult}, + Project, +}; use remote::SshSession; use serde_json::json; use settings::SettingsStore; +use smol::stream::StreamExt; use std::{path::Path, sync::Arc}; -fn init_logger() { - if std::env::var("RUST_LOG").is_ok() { - env_logger::try_init().ok(); - } -} - #[gpui::test] async fn test_remote_editing(cx: &mut TestAppContext, server_cx: &mut TestAppContext) { - let (client_ssh, server_ssh) = SshSession::fake(cx, server_cx); - init_logger(); - - let fs = FakeFs::new(server_cx.executor()); - fs.insert_tree( - "/code", - json!({ - "project1": { - ".git": {}, - "README.md": "# project 1", - "src": { - "lib.rs": "fn one() -> usize { 1 }" - } - }, - "project2": { - "README.md": "# project 2", - }, - }), - ) - .await; - fs.set_index_for_repo( - Path::new("/code/project1/.git"), - &[(Path::new("src/lib.rs"), "fn one() -> usize { 0 }".into())], - ); - - server_cx.update(HeadlessProject::init); - let _headless_project = - server_cx.new_model(|cx| HeadlessProject::new(server_ssh, fs.clone(), cx)); - - let project = build_project(client_ssh, cx); + let (project, _headless, fs) = init_test(cx, server_cx).await; let (worktree, _) = project .update(cx, |project, cx| { project.find_or_create_worktree("/code/project1", true, cx) @@ -150,6 +119,96 @@ async fn test_remote_editing(cx: &mut TestAppContext, server_cx: &mut TestAppCon }); } +#[gpui::test] +async fn test_remote_project_search(cx: &mut TestAppContext, server_cx: &mut TestAppContext) { + let (project, _, _) = init_test(cx, server_cx).await; + + project + .update(cx, |project, cx| { + project.find_or_create_worktree("/code/project1", true, cx) + }) + .await + .unwrap(); + + cx.run_until_parked(); + + let mut receiver = project.update(cx, |project, cx| { + project.search( + SearchQuery::text( + "project", + false, + true, + false, + Default::default(), + Default::default(), + ) + .unwrap(), + cx, + ) + }); + + let first_response = receiver.next().await.unwrap(); + let SearchResult::Buffer { buffer, .. } = first_response else { + panic!("incorrect result"); + }; + buffer.update(cx, |buffer, cx| { + assert_eq!( + buffer.file().unwrap().full_path(cx).to_string_lossy(), + "project1/README.md" + ) + }); + + assert!(receiver.next().await.is_none()); +} + +fn init_logger() { + if std::env::var("RUST_LOG").is_ok() { + env_logger::try_init().ok(); + } +} + +async fn init_test( + cx: &mut TestAppContext, + server_cx: &mut TestAppContext, +) -> (Model, Model, Arc) { + let (client_ssh, server_ssh) = SshSession::fake(cx, server_cx); + init_logger(); + + let fs = FakeFs::new(server_cx.executor()); + fs.insert_tree( + "/code", + json!({ + "project1": { + ".git": {}, + "README.md": "# project 1", + "src": { + "lib.rs": "fn one() -> usize { 1 }" + } + }, + "project2": { + "README.md": "# project 2", + }, + }), + ) + .await; + fs.set_index_for_repo( + Path::new("/code/project1/.git"), + &[(Path::new("src/lib.rs"), "fn one() -> usize { 0 }".into())], + ); + + server_cx.update(HeadlessProject::init); + let headless = server_cx.new_model(|cx| HeadlessProject::new(server_ssh, fs.clone(), cx)); + let project = build_project(client_ssh, cx); + + project + .update(cx, { + let headless = headless.clone(); + |_, cx| cx.on_release(|_, _| drop(headless)) + }) + .detach(); + (project, headless, fs) +} + fn build_project(ssh: Arc, cx: &mut TestAppContext) -> Model { cx.update(|cx| { let settings_store = SettingsStore::test(cx); diff --git a/crates/search/src/project_search.rs b/crates/search/src/project_search.rs index 0dcc87a5e1..216d6a248a 100644 --- a/crates/search/src/project_search.rs +++ b/crates/search/src/project_search.rs @@ -222,7 +222,7 @@ impl ProjectSearch { let mut limit_reached = false; while let Some(result) = matches.next().await { match result { - project::SearchResult::Buffer { buffer, ranges } => { + project::search::SearchResult::Buffer { buffer, ranges } => { let mut match_ranges = this .update(&mut cx, |this, cx| { this.excerpts.update(cx, |excerpts, cx| { @@ -245,7 +245,7 @@ impl ProjectSearch { } this.update(&mut cx, |_, cx| cx.notify()).ok()?; } - project::SearchResult::LimitReached => { + project::search::SearchResult::LimitReached => { limit_reached = true; } }