From 524533cfb227dffba93adfec461fee722c73ba4d Mon Sep 17 00:00:00 2001 From: KCaverly Date: Fri, 1 Sep 2023 11:24:08 -0400 Subject: [PATCH] flush embeddings queue when no files are parsed for 250 milliseconds Co-authored-by: Antonio --- crates/semantic_index/src/semantic_index.rs | 50 ++++++++++--------- .../src/semantic_index_tests.rs | 12 ++--- 2 files changed, 33 insertions(+), 29 deletions(-) diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 6d140931d6..a8518ce695 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -12,6 +12,7 @@ use anyhow::{anyhow, Result}; use db::VectorDatabase; use embedding::{Embedding, EmbeddingProvider, OpenAIEmbeddings}; use embedding_queue::{EmbeddingQueue, FileToEmbed}; +use futures::{FutureExt, StreamExt}; use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle}; use language::{Anchor, Buffer, Language, LanguageRegistry}; use parking_lot::Mutex; @@ -39,6 +40,7 @@ use workspace::WorkspaceCreated; const SEMANTIC_INDEX_VERSION: usize = 8; const BACKGROUND_INDEXING_DELAY: Duration = Duration::from_secs(600); +const EMBEDDING_QUEUE_FLUSH_TIMEOUT: Duration = Duration::from_millis(250); pub fn init( fs: Arc, @@ -253,24 +255,34 @@ impl SemanticIndex { let mut _parsing_files_tasks = Vec::new(); for _ in 0..cx.background().num_cpus() { let fs = fs.clone(); - let parsing_files_rx = parsing_files_rx.clone(); + let mut parsing_files_rx = parsing_files_rx.clone(); let embedding_provider = embedding_provider.clone(); let embedding_queue = embedding_queue.clone(); - let db = db.clone(); + let background = cx.background().clone(); _parsing_files_tasks.push(cx.background().spawn(async move { let mut retriever = CodeContextRetriever::new(embedding_provider.clone()); - while let Ok((embeddings_for_digest, pending_file)) = - parsing_files_rx.recv().await - { - Self::parse_file( - &fs, - pending_file, - &mut retriever, - &embedding_queue, - &parsing_files_rx, - &embeddings_for_digest, - ) - .await; + loop { + let mut timer = background.timer(EMBEDDING_QUEUE_FLUSH_TIMEOUT).fuse(); + let mut next_file_to_parse = parsing_files_rx.next().fuse(); + futures::select_biased! { + next_file_to_parse = next_file_to_parse => { + if let Some((embeddings_for_digest, pending_file)) = next_file_to_parse { + Self::parse_file( + &fs, + pending_file, + &mut retriever, + &embedding_queue, + &embeddings_for_digest, + ) + .await + } else { + break; + } + }, + _ = timer => { + embedding_queue.lock().flush(); + } + } } })); } @@ -297,10 +309,6 @@ impl SemanticIndex { pending_file: PendingFile, retriever: &mut CodeContextRetriever, embedding_queue: &Arc>, - parsing_files_rx: &channel::Receiver<( - Arc>, - PendingFile, - )>, embeddings_for_digest: &HashMap, ) { let Some(language) = pending_file.language else { @@ -333,10 +341,6 @@ impl SemanticIndex { }); } } - - if parsing_files_rx.len() == 0 { - embedding_queue.lock().flush(); - } } pub fn project_previously_indexed( @@ -581,7 +585,7 @@ impl SemanticIndex { cx: &mut ModelContext, ) -> Task)>> { cx.spawn(|this, mut cx| async move { - let embeddings_for_digest = this.read_with(&cx, |this, cx| { + let embeddings_for_digest = this.read_with(&cx, |this, _| { if let Some(state) = this.projects.get(&project.downgrade()) { let mut worktree_id_file_paths = HashMap::default(); for (path, _) in &state.changed_paths { diff --git a/crates/semantic_index/src/semantic_index_tests.rs b/crates/semantic_index/src/semantic_index_tests.rs index 01f34a2b1d..f549e68e04 100644 --- a/crates/semantic_index/src/semantic_index_tests.rs +++ b/crates/semantic_index/src/semantic_index_tests.rs @@ -3,11 +3,11 @@ use crate::{ embedding_queue::EmbeddingQueue, parsing::{subtract_ranges, CodeContextRetriever, Document, DocumentDigest}, semantic_index_settings::SemanticIndexSettings, - FileToEmbed, JobHandle, SearchResult, SemanticIndex, + FileToEmbed, JobHandle, SearchResult, SemanticIndex, EMBEDDING_QUEUE_FLUSH_TIMEOUT, }; use anyhow::Result; use async_trait::async_trait; -use gpui::{Task, TestAppContext}; +use gpui::{executor::Deterministic, Task, TestAppContext}; use language::{Language, LanguageConfig, LanguageRegistry, ToOffset}; use parking_lot::Mutex; use pretty_assertions::assert_eq; @@ -34,7 +34,7 @@ fn init_logger() { } #[gpui::test] -async fn test_semantic_index(cx: &mut TestAppContext) { +async fn test_semantic_index(deterministic: Arc, cx: &mut TestAppContext) { init_test(cx); let fs = FakeFs::new(cx.background()); @@ -98,7 +98,7 @@ async fn test_semantic_index(cx: &mut TestAppContext) { .await .unwrap(); assert_eq!(file_count, 3); - cx.foreground().run_until_parked(); + deterministic.advance_clock(EMBEDDING_QUEUE_FLUSH_TIMEOUT); assert_eq!(*outstanding_file_count.borrow(), 0); let search_results = semantic_index @@ -188,7 +188,7 @@ async fn test_semantic_index(cx: &mut TestAppContext) { .await .unwrap(); - cx.foreground().run_until_parked(); + deterministic.advance_clock(EMBEDDING_QUEUE_FLUSH_TIMEOUT); let prev_embedding_count = embedding_provider.embedding_count(); let (file_count, outstanding_file_count) = semantic_index @@ -197,7 +197,7 @@ async fn test_semantic_index(cx: &mut TestAppContext) { .unwrap(); assert_eq!(file_count, 1); - cx.foreground().run_until_parked(); + deterministic.advance_clock(EMBEDDING_QUEUE_FLUSH_TIMEOUT); assert_eq!(*outstanding_file_count.borrow(), 0); assert_eq!(