mirror of
https://github.com/zed-industries/zed.git
synced 2024-11-08 05:12:06 +03:00
project search: Stream search results to improve TTFB (#16923)
This is a prototype change to improve latency of local project searches. It refactors the matcher to keep paths "in-order" so that we don't need to wait for all matching files to display the first result. On a test (searching for `<` in zed.dev) it changes the time until first result from about 2s to about 50ms. The tail latency seems to increase slightly (from 5s to 7s) so we may want to do more tuning before hitting merge. Release Notes: - reduces latency for first project search result --------- Co-authored-by: Thorsten Ball <mrnugget@gmail.com> Co-authored-by: Antonio <antonio@zed.dev> Co-authored-by: Thorsten <thorsten@zed.dev>
This commit is contained in:
parent
dc889ca7f2
commit
b2f3f760ab
@ -6,7 +6,7 @@ use crate::{
|
||||
use anyhow::{anyhow, Context as _, Result};
|
||||
use collections::{hash_map, HashMap, HashSet};
|
||||
use fs::Fs;
|
||||
use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt as _};
|
||||
use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt};
|
||||
use git::blame::Blame;
|
||||
use gpui::{
|
||||
AppContext, AsyncAppContext, Context as _, EventEmitter, Model, ModelContext, Task, WeakModel,
|
||||
@ -784,95 +784,57 @@ impl BufferStore {
|
||||
pub fn find_search_candidates(
|
||||
&mut self,
|
||||
query: &SearchQuery,
|
||||
limit: usize,
|
||||
mut limit: usize,
|
||||
fs: Arc<dyn Fs>,
|
||||
cx: &mut ModelContext<Self>,
|
||||
) -> Receiver<Model<Buffer>> {
|
||||
let (tx, rx) = smol::channel::unbounded();
|
||||
let open_buffers = self.find_open_search_candidates(query, cx);
|
||||
let skip_entries: HashSet<_> = open_buffers
|
||||
.iter()
|
||||
.filter_map(|buffer| buffer.read(cx).entry_id(cx))
|
||||
.collect();
|
||||
|
||||
let limit = limit.saturating_sub(open_buffers.len());
|
||||
for open_buffer in open_buffers {
|
||||
tx.send_blocking(open_buffer).ok();
|
||||
let mut open_buffers = HashSet::default();
|
||||
let mut unnamed_buffers = Vec::new();
|
||||
for handle in self.buffers() {
|
||||
let buffer = handle.read(cx);
|
||||
if let Some(entry_id) = buffer.entry_id(cx) {
|
||||
open_buffers.insert(entry_id);
|
||||
} else {
|
||||
limit = limit.saturating_sub(1);
|
||||
unnamed_buffers.push(handle)
|
||||
};
|
||||
}
|
||||
|
||||
let match_rx = self.worktree_store.update(cx, |worktree_store, cx| {
|
||||
worktree_store.find_search_candidates(query.clone(), limit, skip_entries, fs, cx)
|
||||
});
|
||||
const MAX_CONCURRENT_BUFFER_OPENS: usize = 64;
|
||||
let mut project_paths_rx = self
|
||||
.worktree_store
|
||||
.update(cx, |worktree_store, cx| {
|
||||
worktree_store.find_search_candidates(query.clone(), limit, open_buffers, fs, cx)
|
||||
})
|
||||
.chunks(MAX_CONCURRENT_BUFFER_OPENS);
|
||||
|
||||
const MAX_CONCURRENT_BUFFER_OPENS: usize = 8;
|
||||
cx.spawn(|this, mut cx| async move {
|
||||
for buffer in unnamed_buffers {
|
||||
tx.send(buffer).await.ok();
|
||||
}
|
||||
|
||||
for _ in 0..MAX_CONCURRENT_BUFFER_OPENS {
|
||||
let mut match_rx = match_rx.clone();
|
||||
let tx = tx.clone();
|
||||
cx.spawn(|this, mut cx| async move {
|
||||
while let Some(project_path) = match_rx.next().await {
|
||||
let buffer = this
|
||||
.update(&mut cx, |this, cx| this.open_buffer(project_path, cx))?
|
||||
.await
|
||||
.log_err();
|
||||
if let Some(buffer) = buffer {
|
||||
tx.send_blocking(buffer).ok();
|
||||
while let Some(project_paths) = project_paths_rx.next().await {
|
||||
let buffers = this.update(&mut cx, |this, cx| {
|
||||
project_paths
|
||||
.into_iter()
|
||||
.map(|project_path| this.open_buffer(project_path, cx))
|
||||
.collect::<Vec<_>>()
|
||||
})?;
|
||||
for buffer_task in buffers {
|
||||
if let Some(buffer) = buffer_task.await.log_err() {
|
||||
if tx.send(buffer).await.is_err() {
|
||||
return anyhow::Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
anyhow::Ok(())
|
||||
})
|
||||
.detach();
|
||||
}
|
||||
}
|
||||
anyhow::Ok(())
|
||||
})
|
||||
.detach();
|
||||
rx
|
||||
}
|
||||
|
||||
/// Returns open buffers filtered by filename
|
||||
/// Does *not* check the buffer content, the caller must do that
|
||||
fn find_open_search_candidates(
|
||||
&self,
|
||||
query: &SearchQuery,
|
||||
cx: &ModelContext<Self>,
|
||||
) -> Vec<Model<Buffer>> {
|
||||
let include_root = self
|
||||
.worktree_store
|
||||
.read(cx)
|
||||
.visible_worktrees(cx)
|
||||
.collect::<Vec<_>>()
|
||||
.len()
|
||||
> 1;
|
||||
self.buffers()
|
||||
.filter_map(|buffer| {
|
||||
let handle = buffer.clone();
|
||||
buffer.read_with(cx, |buffer, cx| {
|
||||
let worktree_store = self.worktree_store.read(cx);
|
||||
let entry_id = buffer.entry_id(cx);
|
||||
let is_ignored = entry_id
|
||||
.and_then(|entry_id| worktree_store.entry_for_id(entry_id, cx))
|
||||
.map_or(false, |entry| entry.is_ignored);
|
||||
|
||||
if is_ignored && !query.include_ignored() {
|
||||
return None;
|
||||
}
|
||||
if let Some(file) = buffer.file() {
|
||||
let matched_path = if include_root {
|
||||
query.file_matches(Some(&file.full_path(cx)))
|
||||
} else {
|
||||
query.file_matches(Some(file.path()))
|
||||
};
|
||||
|
||||
if matched_path {
|
||||
Some(handle)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
Some(handle)
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn on_buffer_event(
|
||||
&mut self,
|
||||
buffer: Model<Buffer>,
|
||||
|
@ -93,7 +93,7 @@ use snippet_provider::SnippetProvider;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
cell::RefCell,
|
||||
cmp::{self, Ordering},
|
||||
cmp::Ordering,
|
||||
convert::TryInto,
|
||||
env,
|
||||
ffi::OsStr,
|
||||
@ -7275,51 +7275,38 @@ impl Project {
|
||||
query: SearchQuery,
|
||||
cx: &mut ModelContext<Self>,
|
||||
) -> Receiver<SearchResult> {
|
||||
let (result_tx, result_rx) = smol::channel::bounded(1024);
|
||||
let (result_tx, result_rx) = smol::channel::unbounded();
|
||||
|
||||
let matching_buffers_rx =
|
||||
self.search_for_candidate_buffers(&query, MAX_SEARCH_RESULT_FILES + 1, cx);
|
||||
|
||||
cx.spawn(|_, cx| async move {
|
||||
let mut matching_buffers = matching_buffers_rx.collect::<Vec<_>>().await;
|
||||
let mut limit_reached = if matching_buffers.len() > MAX_SEARCH_RESULT_FILES {
|
||||
matching_buffers.truncate(MAX_SEARCH_RESULT_FILES);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
};
|
||||
cx.update(|cx| {
|
||||
sort_search_matches(&mut matching_buffers, cx);
|
||||
})?;
|
||||
|
||||
let mut range_count = 0;
|
||||
let mut buffer_count = 0;
|
||||
let mut limit_reached = false;
|
||||
let query = Arc::new(query);
|
||||
let mut chunks = matching_buffers_rx.ready_chunks(64);
|
||||
|
||||
// Now that we know what paths match the query, we will load at most
|
||||
// 64 buffers at a time to avoid overwhelming the main thread. For each
|
||||
// opened buffer, we will spawn a background task that retrieves all the
|
||||
// ranges in the buffer matched by the query.
|
||||
'outer: for matching_buffer_chunk in matching_buffers.chunks(64) {
|
||||
'outer: while let Some(matching_buffer_chunk) = chunks.next().await {
|
||||
let mut chunk_results = Vec::new();
|
||||
for buffer in matching_buffer_chunk {
|
||||
let buffer = buffer.clone();
|
||||
let query = query.clone();
|
||||
chunk_results.push(cx.spawn(|cx| async move {
|
||||
let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot())?;
|
||||
let ranges = cx
|
||||
.background_executor()
|
||||
.spawn(async move {
|
||||
query
|
||||
.search(&snapshot, None)
|
||||
.await
|
||||
.iter()
|
||||
.map(|range| {
|
||||
snapshot.anchor_before(range.start)
|
||||
..snapshot.anchor_after(range.end)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
let snapshot = buffer.read_with(&cx, |buffer, _| buffer.snapshot())?;
|
||||
chunk_results.push(cx.background_executor().spawn(async move {
|
||||
let ranges = query
|
||||
.search(&snapshot, None)
|
||||
.await
|
||||
.iter()
|
||||
.map(|range| {
|
||||
snapshot.anchor_before(range.start)
|
||||
..snapshot.anchor_after(range.end)
|
||||
})
|
||||
.await;
|
||||
.collect::<Vec<_>>();
|
||||
anyhow::Ok((buffer, ranges))
|
||||
}));
|
||||
}
|
||||
@ -7328,10 +7315,13 @@ impl Project {
|
||||
for result in chunk_results {
|
||||
if let Some((buffer, ranges)) = result.log_err() {
|
||||
range_count += ranges.len();
|
||||
buffer_count += 1;
|
||||
result_tx
|
||||
.send(SearchResult::Buffer { buffer, ranges })
|
||||
.await?;
|
||||
if range_count > MAX_SEARCH_RESULT_RANGES {
|
||||
if buffer_count > MAX_SEARCH_RESULT_FILES
|
||||
|| range_count > MAX_SEARCH_RESULT_RANGES
|
||||
{
|
||||
limit_reached = true;
|
||||
break 'outer;
|
||||
}
|
||||
@ -11369,19 +11359,3 @@ pub fn sort_worktree_entries(entries: &mut Vec<Entry>) {
|
||||
)
|
||||
});
|
||||
}
|
||||
|
||||
fn sort_search_matches(search_matches: &mut Vec<Model<Buffer>>, cx: &AppContext) {
|
||||
search_matches.sort_by(|buffer_a, buffer_b| {
|
||||
let path_a = buffer_a.read(cx).file().map(|file| file.path());
|
||||
let path_b = buffer_b.read(cx).file().map(|file| file.path());
|
||||
|
||||
match (path_a, path_b) {
|
||||
(None, None) => cmp::Ordering::Equal,
|
||||
(None, Some(_)) => cmp::Ordering::Less,
|
||||
(Some(_), None) => cmp::Ordering::Greater,
|
||||
(Some(path_a), Some(path_b)) => {
|
||||
compare_paths((path_a.as_ref(), true), (path_b.as_ref(), true))
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -420,6 +420,11 @@ impl SearchQuery {
|
||||
self.as_inner().files_to_exclude()
|
||||
}
|
||||
|
||||
pub fn filters_path(&self) -> bool {
|
||||
!(self.files_to_exclude().sources().is_empty()
|
||||
&& self.files_to_include().sources().is_empty())
|
||||
}
|
||||
|
||||
pub fn file_matches(&self, file_path: Option<&Path>) -> bool {
|
||||
match file_path {
|
||||
Some(file_path) => {
|
||||
|
@ -1,32 +1,35 @@
|
||||
use std::{
|
||||
cmp,
|
||||
collections::VecDeque,
|
||||
path::PathBuf,
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering::SeqCst},
|
||||
Arc,
|
||||
},
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::{anyhow, Context as _, Result};
|
||||
use collections::{HashMap, HashSet};
|
||||
use fs::Fs;
|
||||
use futures::SinkExt;
|
||||
use gpui::{AppContext, AsyncAppContext, EntityId, EventEmitter, Model, ModelContext, WeakModel};
|
||||
use postage::oneshot;
|
||||
use rpc::{
|
||||
proto::{self, AnyProtoClient},
|
||||
TypedEnvelope,
|
||||
};
|
||||
use smol::{
|
||||
channel::{Receiver, Sender},
|
||||
lock::Semaphore,
|
||||
stream::StreamExt,
|
||||
};
|
||||
use text::ReplicaId;
|
||||
use util::ResultExt;
|
||||
use worktree::{Entry, ProjectEntryId, Snapshot, Worktree, WorktreeId, WorktreeSettings};
|
||||
use worktree::{Entry, ProjectEntryId, Worktree, WorktreeId, WorktreeSettings};
|
||||
|
||||
use crate::{search::SearchQuery, ProjectPath};
|
||||
|
||||
struct MatchingEntry {
|
||||
worktree_path: Arc<Path>,
|
||||
path: ProjectPath,
|
||||
respond: oneshot::Sender<ProjectPath>,
|
||||
}
|
||||
|
||||
pub struct WorktreeStore {
|
||||
is_shared: bool,
|
||||
worktrees: Vec<WorktreeHandle>,
|
||||
@ -266,17 +269,15 @@ impl WorktreeStore {
|
||||
}
|
||||
}
|
||||
|
||||
/// search over all worktrees (ignoring open buffers)
|
||||
/// the query is tested against the file on disk and matching files are returned.
|
||||
/// search over all worktrees and return buffers that *might* match the search.
|
||||
pub fn find_search_candidates(
|
||||
&self,
|
||||
query: SearchQuery,
|
||||
limit: usize,
|
||||
skip_entries: HashSet<ProjectEntryId>,
|
||||
open_entries: HashSet<ProjectEntryId>,
|
||||
fs: Arc<dyn Fs>,
|
||||
cx: &ModelContext<Self>,
|
||||
) -> Receiver<ProjectPath> {
|
||||
let (matching_paths_tx, matching_paths_rx) = smol::channel::bounded(1024);
|
||||
let snapshots = self
|
||||
.visible_worktrees(cx)
|
||||
.filter_map(|tree| {
|
||||
@ -284,200 +285,81 @@ impl WorktreeStore {
|
||||
Some((tree.snapshot(), tree.as_local()?.settings()))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
let include_root = snapshots.len() > 1;
|
||||
let path_count: usize = snapshots
|
||||
.iter()
|
||||
.map(|(snapshot, _)| {
|
||||
if query.include_ignored() {
|
||||
snapshot.file_count()
|
||||
} else {
|
||||
snapshot.visible_file_count()
|
||||
}
|
||||
})
|
||||
.sum();
|
||||
|
||||
let remaining_paths = AtomicUsize::new(limit);
|
||||
if path_count == 0 {
|
||||
return matching_paths_rx;
|
||||
}
|
||||
let workers = cx.background_executor().num_cpus().min(path_count);
|
||||
let paths_per_worker = (path_count + workers - 1) / workers;
|
||||
|
||||
let executor = cx.background_executor().clone();
|
||||
|
||||
// We want to return entries in the order they are in the worktrees, so we have one
|
||||
// thread that iterates over the worktrees (and ignored directories) as necessary,
|
||||
// and pushes a oneshot::Receiver to the output channel and a oneshot::Sender to the filter
|
||||
// channel.
|
||||
// We spawn a number of workers that take items from the filter channel and check the query
|
||||
// against the version of the file on disk.
|
||||
let (filter_tx, filter_rx) = smol::channel::bounded(64);
|
||||
let (output_tx, mut output_rx) = smol::channel::bounded(64);
|
||||
let (matching_paths_tx, matching_paths_rx) = smol::channel::unbounded();
|
||||
|
||||
let input = cx.background_executor().spawn({
|
||||
let fs = fs.clone();
|
||||
let query = query.clone();
|
||||
async move {
|
||||
Self::find_candidate_paths(
|
||||
fs,
|
||||
snapshots,
|
||||
open_entries,
|
||||
query,
|
||||
filter_tx,
|
||||
output_tx,
|
||||
)
|
||||
.await
|
||||
.log_err();
|
||||
}
|
||||
});
|
||||
const MAX_CONCURRENT_FILE_SCANS: usize = 64;
|
||||
let filters = cx.background_executor().spawn(async move {
|
||||
let fs = &fs;
|
||||
let query = &query;
|
||||
executor
|
||||
.scoped(move |scope| {
|
||||
for _ in 0..MAX_CONCURRENT_FILE_SCANS {
|
||||
let filter_rx = filter_rx.clone();
|
||||
scope.spawn(async move {
|
||||
Self::filter_paths(fs, filter_rx, query).await.log_err();
|
||||
})
|
||||
}
|
||||
})
|
||||
.await;
|
||||
});
|
||||
cx.background_executor()
|
||||
.spawn(async move {
|
||||
let fs = &fs;
|
||||
let query = &query;
|
||||
let matching_paths_tx = &matching_paths_tx;
|
||||
let snapshots = &snapshots;
|
||||
let remaining_paths = &remaining_paths;
|
||||
|
||||
executor
|
||||
.scoped(move |scope| {
|
||||
let max_concurrent_workers = Arc::new(Semaphore::new(workers));
|
||||
|
||||
for worker_ix in 0..workers {
|
||||
let snapshots = snapshots.clone();
|
||||
let worker_start_ix = worker_ix * paths_per_worker;
|
||||
let worker_end_ix = worker_start_ix + paths_per_worker;
|
||||
let skip_entries = skip_entries.clone();
|
||||
let limiter = Arc::clone(&max_concurrent_workers);
|
||||
scope.spawn(async move {
|
||||
let _guard = limiter.acquire().await;
|
||||
Self::search_snapshots(
|
||||
&snapshots,
|
||||
worker_start_ix,
|
||||
worker_end_ix,
|
||||
&query,
|
||||
remaining_paths,
|
||||
&matching_paths_tx,
|
||||
&skip_entries,
|
||||
include_root,
|
||||
fs,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
if query.include_ignored() {
|
||||
for (snapshot, settings) in snapshots {
|
||||
for ignored_entry in
|
||||
snapshot.entries(true, 0).filter(|e| e.is_ignored)
|
||||
{
|
||||
let limiter = Arc::clone(&max_concurrent_workers);
|
||||
scope.spawn(async move {
|
||||
let _guard = limiter.acquire().await;
|
||||
if remaining_paths.load(SeqCst) == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
Self::search_ignored_entry(
|
||||
&snapshot,
|
||||
&settings,
|
||||
ignored_entry,
|
||||
&fs,
|
||||
&query,
|
||||
remaining_paths,
|
||||
&matching_paths_tx,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
let mut matched = 0;
|
||||
while let Some(mut receiver) = output_rx.next().await {
|
||||
let Some(path) = receiver.next().await else {
|
||||
continue;
|
||||
};
|
||||
let Ok(_) = matching_paths_tx.send(path).await else {
|
||||
break;
|
||||
};
|
||||
matched += 1;
|
||||
if matched == limit {
|
||||
break;
|
||||
}
|
||||
}
|
||||
drop(input);
|
||||
drop(filters);
|
||||
})
|
||||
.detach();
|
||||
return matching_paths_rx;
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn search_snapshots(
|
||||
snapshots: &Vec<(worktree::Snapshot, WorktreeSettings)>,
|
||||
worker_start_ix: usize,
|
||||
worker_end_ix: usize,
|
||||
query: &SearchQuery,
|
||||
remaining_paths: &AtomicUsize,
|
||||
results_tx: &Sender<ProjectPath>,
|
||||
skip_entries: &HashSet<ProjectEntryId>,
|
||||
include_root: bool,
|
||||
fs: &Arc<dyn Fs>,
|
||||
) {
|
||||
let mut snapshot_start_ix = 0;
|
||||
let mut abs_path = PathBuf::new();
|
||||
|
||||
for (snapshot, _) in snapshots {
|
||||
let snapshot_end_ix = snapshot_start_ix
|
||||
+ if query.include_ignored() {
|
||||
snapshot.file_count()
|
||||
} else {
|
||||
snapshot.visible_file_count()
|
||||
};
|
||||
if worker_end_ix <= snapshot_start_ix {
|
||||
break;
|
||||
} else if worker_start_ix > snapshot_end_ix {
|
||||
snapshot_start_ix = snapshot_end_ix;
|
||||
continue;
|
||||
} else {
|
||||
let start_in_snapshot = worker_start_ix.saturating_sub(snapshot_start_ix);
|
||||
let end_in_snapshot = cmp::min(worker_end_ix, snapshot_end_ix) - snapshot_start_ix;
|
||||
|
||||
for entry in snapshot
|
||||
.files(false, start_in_snapshot)
|
||||
.take(end_in_snapshot - start_in_snapshot)
|
||||
{
|
||||
if results_tx.is_closed() {
|
||||
break;
|
||||
}
|
||||
if skip_entries.contains(&entry.id) {
|
||||
continue;
|
||||
}
|
||||
if entry.is_fifo {
|
||||
continue;
|
||||
}
|
||||
|
||||
let matched_path = if include_root {
|
||||
let mut full_path = PathBuf::from(snapshot.root_name());
|
||||
full_path.push(&entry.path);
|
||||
query.file_matches(Some(&full_path))
|
||||
} else {
|
||||
query.file_matches(Some(&entry.path))
|
||||
};
|
||||
|
||||
let matches = if matched_path {
|
||||
abs_path.clear();
|
||||
abs_path.push(&snapshot.abs_path());
|
||||
abs_path.push(&entry.path);
|
||||
if let Some(file) = fs.open_sync(&abs_path).await.log_err() {
|
||||
query.detect(file).unwrap_or(false)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
if matches {
|
||||
if remaining_paths
|
||||
.fetch_update(SeqCst, SeqCst, |value| {
|
||||
if value > 0 {
|
||||
Some(value - 1)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
let project_path = ProjectPath {
|
||||
worktree_id: snapshot.id(),
|
||||
path: entry.path.clone(),
|
||||
};
|
||||
if results_tx.send(project_path).await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
snapshot_start_ix = snapshot_end_ix;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn search_ignored_entry(
|
||||
snapshot: &Snapshot,
|
||||
settings: &WorktreeSettings,
|
||||
ignored_entry: &Entry,
|
||||
async fn scan_ignored_dir(
|
||||
fs: &Arc<dyn Fs>,
|
||||
snapshot: &worktree::Snapshot,
|
||||
path: &Path,
|
||||
query: &SearchQuery,
|
||||
remaining_paths: &AtomicUsize,
|
||||
counter_tx: &Sender<ProjectPath>,
|
||||
) {
|
||||
let mut ignored_paths_to_process =
|
||||
VecDeque::from([snapshot.abs_path().join(&ignored_entry.path)]);
|
||||
filter_tx: &Sender<MatchingEntry>,
|
||||
output_tx: &Sender<oneshot::Receiver<ProjectPath>>,
|
||||
) -> Result<()> {
|
||||
let mut ignored_paths_to_process = VecDeque::from([snapshot.abs_path().join(&path)]);
|
||||
|
||||
while let Some(ignored_abs_path) = ignored_paths_to_process.pop_front() {
|
||||
let metadata = fs
|
||||
@ -487,67 +369,132 @@ impl WorktreeStore {
|
||||
.log_err()
|
||||
.flatten();
|
||||
|
||||
if let Some(fs_metadata) = metadata {
|
||||
if fs_metadata.is_dir {
|
||||
let files = fs
|
||||
.read_dir(&ignored_abs_path)
|
||||
.await
|
||||
.with_context(|| format!("listing ignored path {ignored_abs_path:?}"))
|
||||
.log_err();
|
||||
let Some(fs_metadata) = metadata else {
|
||||
continue;
|
||||
};
|
||||
if fs_metadata.is_dir {
|
||||
let files = fs
|
||||
.read_dir(&ignored_abs_path)
|
||||
.await
|
||||
.with_context(|| format!("listing ignored path {ignored_abs_path:?}"))
|
||||
.log_err();
|
||||
|
||||
if let Some(mut subfiles) = files {
|
||||
while let Some(subfile) = subfiles.next().await {
|
||||
if let Some(subfile) = subfile.log_err() {
|
||||
ignored_paths_to_process.push_back(subfile);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if !fs_metadata.is_symlink {
|
||||
if !query.file_matches(Some(&ignored_abs_path))
|
||||
|| settings.is_path_excluded(&ignored_entry.path)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
let matches = if let Some(file) = fs
|
||||
.open_sync(&ignored_abs_path)
|
||||
.await
|
||||
.with_context(|| format!("Opening ignored path {ignored_abs_path:?}"))
|
||||
.log_err()
|
||||
{
|
||||
query.detect(file).unwrap_or(false)
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
if matches {
|
||||
if remaining_paths
|
||||
.fetch_update(SeqCst, SeqCst, |value| {
|
||||
if value > 0 {
|
||||
Some(value - 1)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.is_err()
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
let project_path = ProjectPath {
|
||||
worktree_id: snapshot.id(),
|
||||
path: Arc::from(
|
||||
ignored_abs_path
|
||||
.strip_prefix(snapshot.abs_path())
|
||||
.expect("scanning worktree-related files"),
|
||||
),
|
||||
};
|
||||
if counter_tx.send(project_path).await.is_err() {
|
||||
return;
|
||||
if let Some(mut subfiles) = files {
|
||||
while let Some(subfile) = subfiles.next().await {
|
||||
if let Some(subfile) = subfile.log_err() {
|
||||
ignored_paths_to_process.push_back(subfile);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if !fs_metadata.is_symlink {
|
||||
if !query.file_matches(Some(&ignored_abs_path)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
output_tx.send(rx).await?;
|
||||
filter_tx
|
||||
.send(MatchingEntry {
|
||||
respond: tx,
|
||||
worktree_path: snapshot.abs_path().clone(),
|
||||
path: ProjectPath {
|
||||
worktree_id: snapshot.id(),
|
||||
path: Arc::from(ignored_abs_path.strip_prefix(snapshot.abs_path())?),
|
||||
},
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn find_candidate_paths(
|
||||
fs: Arc<dyn Fs>,
|
||||
snapshots: Vec<(worktree::Snapshot, WorktreeSettings)>,
|
||||
open_entries: HashSet<ProjectEntryId>,
|
||||
query: SearchQuery,
|
||||
filter_tx: Sender<MatchingEntry>,
|
||||
output_tx: Sender<oneshot::Receiver<ProjectPath>>,
|
||||
) -> Result<()> {
|
||||
let include_root = snapshots.len() > 1;
|
||||
for (snapshot, settings) in snapshots {
|
||||
for entry in snapshot.entries(query.include_ignored(), 0) {
|
||||
if entry.is_dir() && entry.is_ignored {
|
||||
if !settings.is_path_excluded(&entry.path) {
|
||||
Self::scan_ignored_dir(
|
||||
&fs,
|
||||
&snapshot,
|
||||
&entry.path,
|
||||
&query,
|
||||
&filter_tx,
|
||||
&output_tx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if entry.is_fifo || !entry.is_file() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if open_entries.contains(&entry.id) {
|
||||
let (mut tx, rx) = oneshot::channel();
|
||||
tx.send(ProjectPath {
|
||||
worktree_id: snapshot.id(),
|
||||
path: entry.path.clone(),
|
||||
})
|
||||
.await?;
|
||||
output_tx.send(rx).await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
if query.filters_path() {
|
||||
let matched_path = if include_root {
|
||||
let mut full_path = PathBuf::from(snapshot.root_name());
|
||||
full_path.push(&entry.path);
|
||||
query.file_matches(Some(&full_path))
|
||||
} else {
|
||||
query.file_matches(Some(&entry.path))
|
||||
};
|
||||
if !matched_path {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
output_tx.send(rx).await?;
|
||||
filter_tx
|
||||
.send(MatchingEntry {
|
||||
respond: tx,
|
||||
worktree_path: snapshot.abs_path().clone(),
|
||||
path: ProjectPath {
|
||||
worktree_id: snapshot.id(),
|
||||
path: entry.path.clone(),
|
||||
},
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn filter_paths(
|
||||
fs: &Arc<dyn Fs>,
|
||||
mut input: Receiver<MatchingEntry>,
|
||||
query: &SearchQuery,
|
||||
) -> Result<()> {
|
||||
while let Some(mut entry) = input.next().await {
|
||||
let abs_path = entry.worktree_path.join(&entry.path.path);
|
||||
let Some(file) = fs.open_sync(&abs_path).await.log_err() else {
|
||||
continue;
|
||||
};
|
||||
if query.detect(file).unwrap_or(false) {
|
||||
entry.respond.send(entry.path).await?
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn handle_create_project_entry(
|
||||
|
@ -11,6 +11,7 @@ use editor::{
|
||||
Anchor, Editor, EditorElement, EditorEvent, EditorSettings, EditorStyle, MultiBuffer,
|
||||
MAX_TAB_TITLE_LEN,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use gpui::{
|
||||
actions, div, Action, AnyElement, AnyView, AppContext, Context as _, EntityId, EventEmitter,
|
||||
FocusHandle, FocusableView, Global, Hsla, InteractiveElement, IntoElement, KeyContext, Model,
|
||||
@ -20,7 +21,6 @@ use gpui::{
|
||||
use menu::Confirm;
|
||||
use project::{search::SearchQuery, search_history::SearchHistoryCursor, Project, ProjectPath};
|
||||
use settings::Settings;
|
||||
use smol::stream::StreamExt;
|
||||
use std::{
|
||||
any::{Any, TypeId},
|
||||
mem,
|
||||
@ -209,7 +209,7 @@ impl ProjectSearch {
|
||||
self.active_query = Some(query);
|
||||
self.match_ranges.clear();
|
||||
self.pending_search = Some(cx.spawn(|this, mut cx| async move {
|
||||
let mut matches = search;
|
||||
let mut matches = search.ready_chunks(1024);
|
||||
let this = this.upgrade()?;
|
||||
this.update(&mut cx, |this, cx| {
|
||||
this.match_ranges.clear();
|
||||
@ -220,33 +220,35 @@ impl ProjectSearch {
|
||||
.ok()?;
|
||||
|
||||
let mut limit_reached = false;
|
||||
while let Some(result) = matches.next().await {
|
||||
match result {
|
||||
project::search::SearchResult::Buffer { buffer, ranges } => {
|
||||
let mut match_ranges = this
|
||||
.update(&mut cx, |this, cx| {
|
||||
this.excerpts.update(cx, |excerpts, cx| {
|
||||
excerpts.stream_excerpts_with_context_lines(
|
||||
buffer,
|
||||
ranges,
|
||||
editor::DEFAULT_MULTIBUFFER_CONTEXT,
|
||||
cx,
|
||||
)
|
||||
while let Some(results) = matches.next().await {
|
||||
for result in results {
|
||||
match result {
|
||||
project::search::SearchResult::Buffer { buffer, ranges } => {
|
||||
let mut match_ranges = this
|
||||
.update(&mut cx, |this, cx| {
|
||||
this.excerpts.update(cx, |excerpts, cx| {
|
||||
excerpts.stream_excerpts_with_context_lines(
|
||||
buffer,
|
||||
ranges,
|
||||
editor::DEFAULT_MULTIBUFFER_CONTEXT,
|
||||
cx,
|
||||
)
|
||||
})
|
||||
})
|
||||
})
|
||||
.ok()?;
|
||||
.ok()?;
|
||||
|
||||
while let Some(range) = match_ranges.next().await {
|
||||
this.update(&mut cx, |this, _| {
|
||||
this.no_results = Some(false);
|
||||
this.match_ranges.push(range)
|
||||
})
|
||||
.ok()?;
|
||||
while let Some(range) = match_ranges.next().await {
|
||||
this.update(&mut cx, |this, _| {
|
||||
this.no_results = Some(false);
|
||||
this.match_ranges.push(range)
|
||||
})
|
||||
.ok()?;
|
||||
}
|
||||
this.update(&mut cx, |_, cx| cx.notify()).ok()?;
|
||||
}
|
||||
project::search::SearchResult::LimitReached => {
|
||||
limit_reached = true;
|
||||
}
|
||||
this.update(&mut cx, |_, cx| cx.notify()).ok()?;
|
||||
}
|
||||
project::search::SearchResult::LimitReached => {
|
||||
limit_reached = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user