diff --git a/crates/vector_store/src/vector_store.rs b/crates/vector_store/src/vector_store.rs index 5141451e64..57277e39af 100644 --- a/crates/vector_store/src/vector_store.rs +++ b/crates/vector_store/src/vector_store.rs @@ -18,11 +18,13 @@ use modal::{SemanticSearch, SemanticSearchDelegate, Toggle}; use project::{Fs, Project, WorktreeId}; use smol::channel; use std::{ + cell::RefCell, cmp::Ordering, collections::HashMap, path::{Path, PathBuf}, + rc::Rc, sync::Arc, - time::{Instant, SystemTime}, + time::{Duration, Instant, SystemTime}, }; use tree_sitter::{Parser, QueryCursor}; use util::{ @@ -33,7 +35,7 @@ use util::{ }; use workspace::{Workspace, WorkspaceCreated}; -const REINDEXING_DELAY: u64 = 30; +const REINDEXING_DELAY_SECONDS: u64 = 3; const EMBEDDINGS_BATCH_SIZE: usize = 150; #[derive(Debug, Clone)] @@ -124,20 +126,62 @@ pub struct VectorStore { embedding_provider: Arc, language_registry: Arc, db_update_tx: channel::Sender, - // embed_batch_tx: channel::Sender)>>, - parsing_files_tx: channel::Sender<(i64, PathBuf, Arc, SystemTime)>, + parsing_files_tx: channel::Sender, _db_update_task: Task<()>, _embed_batch_task: Vec>, _batch_files_task: Task<()>, _parsing_files_tasks: Vec>, - projects: HashMap, ProjectState>, + projects: HashMap, Rc>>, } struct ProjectState { worktree_db_ids: Vec<(WorktreeId, i64)>, + pending_files: HashMap, _subscription: gpui::Subscription, } +impl ProjectState { + fn update_pending_files(&mut self, pending_file: PendingFile, indexing_time: SystemTime) { + // If Pending File Already Exists, Replace it with the new one + // but keep the old indexing time + if let Some(old_file) = self.pending_files.remove(&pending_file.path.clone()) { + self.pending_files + .insert(pending_file.path.clone(), (pending_file, old_file.1)); + } else { + self.pending_files + .insert(pending_file.path.clone(), (pending_file, indexing_time)); + }; + } + + fn get_outstanding_files(&mut self) -> Vec { + let mut outstanding_files = vec![]; + let mut remove_keys = vec![]; + for key in self.pending_files.keys().into_iter() { + if let Some(pending_details) = self.pending_files.get(key) { + let (pending_file, index_time) = pending_details; + if index_time <= &SystemTime::now() { + outstanding_files.push(pending_file.clone()); + remove_keys.push(key.clone()); + } + } + } + + for key in remove_keys.iter() { + self.pending_files.remove(key); + } + + return outstanding_files; + } +} + +#[derive(Clone, Debug)] +struct PendingFile { + worktree_db_id: i64, + path: PathBuf, + language: Arc, + modified_time: SystemTime, +} + #[derive(Debug, Clone)] pub struct SearchResult { pub worktree_id: WorktreeId, @@ -293,8 +337,7 @@ impl VectorStore { }); // parsing_files_tx/rx: Parsing Files to Embeddable Documents - let (parsing_files_tx, parsing_files_rx) = - channel::unbounded::<(i64, PathBuf, Arc, SystemTime)>(); + let (parsing_files_tx, parsing_files_rx) = channel::unbounded::(); let mut _parsing_files_tasks = Vec::new(); for _ in 0..cx.background().num_cpus() { @@ -304,23 +347,25 @@ impl VectorStore { _parsing_files_tasks.push(cx.background().spawn(async move { let mut parser = Parser::new(); let mut cursor = QueryCursor::new(); - while let Ok((worktree_id, file_path, language, mtime)) = - parsing_files_rx.recv().await - { - log::info!("Parsing File: {:?}", &file_path); + while let Ok(pending_file) = parsing_files_rx.recv().await { + log::info!("Parsing File: {:?}", &pending_file.path); if let Some((indexed_file, document_spans)) = Self::index_file( &mut cursor, &mut parser, &fs, - language, - file_path.clone(), - mtime, + pending_file.language, + pending_file.path.clone(), + pending_file.modified_time, ) .await .log_err() { batch_files_tx - .try_send((worktree_id, indexed_file, document_spans)) + .try_send(( + pending_file.worktree_db_id, + indexed_file, + document_spans, + )) .unwrap(); } } @@ -516,12 +561,13 @@ impl VectorStore { if !already_stored { parsing_files_tx - .try_send(( - db_ids_by_worktree_id[&worktree.id()], - path_buf, + .try_send(PendingFile { + worktree_db_id: db_ids_by_worktree_id + [&worktree.id()], + path: path_buf, language, - file.mtime, - )) + modified_time: file.mtime, + }) .unwrap(); } } @@ -543,54 +589,82 @@ impl VectorStore { }) .detach(); + // let mut pending_files: Vec<(PathBuf, ((i64, PathBuf, Arc, SystemTime), SystemTime))> = vec![]; this.update(&mut cx, |this, cx| { // The below is managing for updated on save // Currently each time a file is saved, this code is run, and for all the files that were changed, if the current time is // greater than the previous embedded time by the REINDEXING_DELAY variable, we will send the file off to be indexed. - let _subscription = cx.subscribe(&project, |this, project, event, _cx| { + let _subscription = cx.subscribe(&project, |this, project, event, cx| { if let Some(project_state) = this.projects.get(&project.downgrade()) { + let mut project_state = project_state.borrow_mut(); let worktree_db_ids = project_state.worktree_db_ids.clone(); if let project::Event::WorktreeUpdatedEntries(worktree_id, changes) = event { - // Iterate through changes - let language_registry = this.language_registry.clone(); - - let db = - VectorDatabase::new(this.database_url.to_string_lossy().into()); - if db.is_err() { + // Get Worktree Object + let worktree = + project.read(cx).worktree_for_id(worktree_id.clone(), cx); + if worktree.is_none() { return; } - let db = db.unwrap(); + let worktree = worktree.unwrap(); - let worktree_db_id: Option = { - let mut found_db_id = None; - for (w_id, db_id) in worktree_db_ids.into_iter() { - if &w_id == worktree_id { - found_db_id = Some(db_id); + // Get Database + let db_values = { + if let Ok(db) = + VectorDatabase::new(this.database_url.to_string_lossy().into()) + { + let worktree_db_id: Option = { + let mut found_db_id = None; + for (w_id, db_id) in worktree_db_ids.into_iter() { + if &w_id == &worktree.read(cx).id() { + found_db_id = Some(db_id) + } + } + found_db_id + }; + if worktree_db_id.is_none() { + return; } - } + let worktree_db_id = worktree_db_id.unwrap(); - found_db_id + let file_mtimes = db.get_file_mtimes(worktree_db_id); + if file_mtimes.is_err() { + return; + } + + let file_mtimes = file_mtimes.unwrap(); + Some((file_mtimes, worktree_db_id)) + } else { + return; + } }; - if worktree_db_id.is_none() { - return; - } - let worktree_db_id = worktree_db_id.unwrap(); - - let file_mtimes = db.get_file_mtimes(worktree_db_id); - if file_mtimes.is_err() { + if db_values.is_none() { return; } - let file_mtimes = file_mtimes.unwrap(); + let (file_mtimes, worktree_db_id) = db_values.unwrap(); + + // Iterate Through Changes + let language_registry = this.language_registry.clone(); let parsing_files_tx = this.parsing_files_tx.clone(); smol::block_on(async move { for change in changes.into_iter() { let change_path = change.0.clone(); - log::info!("Change: {:?}", &change_path); + // Skip if git ignored or symlink + if let Some(entry) = worktree.read(cx).entry_for_id(change.1) { + if entry.is_ignored || entry.is_symlink { + continue; + } else { + log::info!( + "Testing for Reindexing: {:?}", + &change_path + ); + } + }; + if let Ok(language) = language_registry .language_for_file(&change_path.to_path_buf(), None) .await @@ -603,47 +677,59 @@ impl VectorStore { continue; } - // TODO: Make this a bit more defensive - let modified_time = - change_path.metadata().unwrap().modified().unwrap(); - let existing_time = - file_mtimes.get(&change_path.to_path_buf()); - let already_stored = - existing_time.map_or(false, |existing_time| { - if &modified_time != existing_time - && existing_time.elapsed().unwrap().as_secs() - > REINDEXING_DELAY - { - false + if let Some(modified_time) = { + let metadata = change_path.metadata(); + if metadata.is_err() { + None + } else { + let mtime = metadata.unwrap().modified(); + if mtime.is_err() { + None } else { - true + Some(mtime.unwrap()) } - }); + } + } { + let existing_time = + file_mtimes.get(&change_path.to_path_buf()); + let already_stored = existing_time + .map_or(false, |existing_time| { + &modified_time != existing_time + }); - if !already_stored { - log::info!("Need to reindex: {:?}", &change_path); - parsing_files_tx - .try_send(( - worktree_db_id, - change_path.to_path_buf(), - language, - modified_time, - )) - .unwrap(); + let reindex_time = modified_time + + Duration::from_secs(REINDEXING_DELAY_SECONDS); + + if !already_stored { + project_state.update_pending_files( + PendingFile { + path: change_path.to_path_buf(), + modified_time, + worktree_db_id, + language: language.clone(), + }, + reindex_time, + ); + + for file in project_state.get_outstanding_files() { + parsing_files_tx.try_send(file).unwrap(); + } + } } } } - }) - } + }); + }; } }); this.projects.insert( project.downgrade(), - ProjectState { + Rc::new(RefCell::new(ProjectState { + pending_files: HashMap::new(), worktree_db_ids: db_ids_by_worktree_id.into_iter().collect(), _subscription, - }, + })), ); }); @@ -659,7 +745,7 @@ impl VectorStore { cx: &mut ModelContext, ) -> Task>> { let project_state = if let Some(state) = self.projects.get(&project.downgrade()) { - state + state.borrow() } else { return Task::ready(Err(anyhow!("project not added"))); }; @@ -717,7 +803,7 @@ impl VectorStore { this.read_with(&cx, |this, _| { let project_state = if let Some(state) = this.projects.get(&project.downgrade()) { - state + state.borrow() } else { return Err(anyhow!("project not added")); };