From bf43f93197e9149ef7625ed7b61f368edca79e82 Mon Sep 17 00:00:00 2001 From: KCaverly Date: Fri, 8 Sep 2023 15:04:50 -0400 Subject: [PATCH] updated semantic_index reset status to leverage target reset system time as opposed to duration --- crates/search/src/project_search.rs | 22 +++--- crates/semantic_index/src/embedding.rs | 67 +++++++++---------- crates/semantic_index/src/semantic_index.rs | 10 +-- .../src/semantic_index_tests.rs | 6 +- 4 files changed, 50 insertions(+), 55 deletions(-) diff --git a/crates/search/src/project_search.rs b/crates/search/src/project_search.rs index 977ead8c9e..6b5ebd56d4 100644 --- a/crates/search/src/project_search.rs +++ b/crates/search/src/project_search.rs @@ -34,7 +34,7 @@ use std::{ ops::{Not, Range}, path::PathBuf, sync::Arc, - time::Duration, + time::SystemTime, }; use util::ResultExt as _; use workspace::{ @@ -322,17 +322,23 @@ impl View for ProjectSearchView { SemanticIndexStatus::Indexed => Some("Indexing complete".to_string()), SemanticIndexStatus::Indexing { remaining_files, - rate_limiting, + rate_limit_expiration_time, } => { if remaining_files == 0 { Some(format!("Indexing...")) } else { - if rate_limiting > Duration::ZERO { - Some(format!( - "Remaining files to index (rate limit resets in {}s): {}", - rate_limiting.as_secs(), - remaining_files - )) + if let Some(rate_limit_expiration_time) = rate_limit_expiration_time { + if let Ok(remaining_seconds) = + rate_limit_expiration_time.duration_since(SystemTime::now()) + { + Some(format!( + "Remaining files to index(rate limit resets in {}s): {}", + remaining_seconds.as_secs(), + remaining_files + )) + } else { + Some(format!("Remaining files to index: {}", remaining_files)) + } } else { Some(format!("Remaining files to index: {}", remaining_files)) } diff --git a/crates/semantic_index/src/embedding.rs b/crates/semantic_index/src/embedding.rs index 6affac2556..148b354794 100644 --- a/crates/semantic_index/src/embedding.rs +++ b/crates/semantic_index/src/embedding.rs @@ -14,8 +14,9 @@ use rusqlite::types::{FromSql, FromSqlResult, ToSqlOutput, ValueRef}; use rusqlite::ToSql; use serde::{Deserialize, Serialize}; use std::env; +use std::ops::Add; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use tiktoken_rs::{cl100k_base, CoreBPE}; use util::http::{HttpClient, Request}; @@ -84,8 +85,8 @@ impl ToSql for Embedding { pub struct OpenAIEmbeddings { pub client: Arc, pub executor: Arc, - rate_limit_count_rx: watch::Receiver<(Duration, usize)>, - rate_limit_count_tx: Arc>>, + rate_limit_count_rx: watch::Receiver<(Option, usize)>, + rate_limit_count_tx: Arc, usize)>>>, } #[derive(Serialize)] @@ -118,15 +119,15 @@ pub trait EmbeddingProvider: Sync + Send { async fn embed_batch(&self, spans: Vec) -> Result>; fn max_tokens_per_batch(&self) -> usize; fn truncate(&self, span: &str) -> (String, usize); - fn rate_limit_expiration(&self) -> Duration; + fn rate_limit_expiration(&self) -> Option; } pub struct DummyEmbeddings {} #[async_trait] impl EmbeddingProvider for DummyEmbeddings { - fn rate_limit_expiration(&self) -> Duration { - Duration::ZERO + fn rate_limit_expiration(&self) -> Option { + None } async fn embed_batch(&self, spans: Vec) -> Result> { // 1024 is the OpenAI Embeddings size for ada models. @@ -158,7 +159,7 @@ const OPENAI_INPUT_LIMIT: usize = 8190; impl OpenAIEmbeddings { pub fn new(client: Arc, executor: Arc) -> Self { - let (rate_limit_count_tx, rate_limit_count_rx) = watch::channel_with((Duration::ZERO, 0)); + let (rate_limit_count_tx, rate_limit_count_rx) = watch::channel_with((None, 0)); let rate_limit_count_tx = Arc::new(Mutex::new(rate_limit_count_tx)); OpenAIEmbeddings { @@ -170,39 +171,32 @@ impl OpenAIEmbeddings { } fn resolve_rate_limit(&self) { - let (current_delay, delay_count) = *self.rate_limit_count_tx.lock().borrow(); + let (reset_time, delay_count) = *self.rate_limit_count_tx.lock().borrow(); let updated_count = delay_count - 1; - let updated_duration = if updated_count == 0 { - Duration::ZERO - } else { - current_delay - }; + let updated_time = if updated_count == 0 { None } else { reset_time }; - log::trace!( - "resolving rate limit: Count: {:?} Duration: {:?}", - updated_count, - updated_duration - ); + log::trace!("resolving rate limit: Count: {:?}", updated_count); - *self.rate_limit_count_tx.lock().borrow_mut() = (updated_duration, updated_count); + *self.rate_limit_count_tx.lock().borrow_mut() = (updated_time, updated_count); } - fn update_rate_limit(&self, delay_duration: Duration, count_increase: usize) { - let (current_delay, delay_count) = *self.rate_limit_count_tx.lock().borrow(); - let updated_count = delay_count + count_increase; - let updated_duration = if current_delay < delay_duration { - delay_duration + fn update_rate_limit(&self, reset_time: SystemTime, count_increase: usize) { + let (original_time, original_count) = *self.rate_limit_count_tx.lock().borrow(); + let updated_count = original_count + count_increase; + + let updated_time = if let Some(original_time) = original_time { + if reset_time < original_time { + Some(reset_time) + } else { + Some(original_time) + } } else { - current_delay + Some(reset_time) }; - log::trace!( - "updating rate limit: Count: {:?} Duration: {:?}", - updated_count, - updated_duration - ); + log::trace!("updating rate limit: Count: {:?}", updated_count); - *self.rate_limit_count_tx.lock().borrow_mut() = (updated_duration, updated_count); + *self.rate_limit_count_tx.lock().borrow_mut() = (updated_time, updated_count); } async fn send_request( &self, @@ -234,9 +228,9 @@ impl EmbeddingProvider for OpenAIEmbeddings { 50000 } - fn rate_limit_expiration(&self) -> Duration { - let (duration, _) = *self.rate_limit_count_rx.borrow(); - duration + fn rate_limit_expiration(&self) -> Option { + let (expiration_time, _) = *self.rate_limit_count_rx.borrow(); + expiration_time } fn truncate(&self, span: &str) -> (String, usize) { let mut tokens = OPENAI_BPE_TOKENIZER.encode_with_special_tokens(span); @@ -321,10 +315,11 @@ impl EmbeddingProvider for OpenAIEmbeddings { }; // If we've previously rate limited, increment the duration but not the count + let reset_time = SystemTime::now().add(delay_duration); if rate_limiting { - self.update_rate_limit(delay_duration, 0); + self.update_rate_limit(reset_time, 0); } else { - self.update_rate_limit(delay_duration, 1); + self.update_rate_limit(reset_time, 1); } rate_limiting = true; diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 8fba7de0f0..b60d697b43 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -112,7 +112,7 @@ pub enum SemanticIndexStatus { Indexed, Indexing { remaining_files: usize, - rate_limiting: Duration, + rate_limit_expiration_time: Option, }, } @@ -132,8 +132,6 @@ struct ProjectState { pending_file_count_rx: watch::Receiver, pending_file_count_tx: Arc>>, pending_index: usize, - rate_limiting_count_rx: watch::Receiver, - rate_limiting_count_tx: Arc>>, _subscription: gpui::Subscription, _observe_pending_file_count: Task<()>, } @@ -225,15 +223,11 @@ impl ProjectState { fn new(subscription: gpui::Subscription, cx: &mut ModelContext) -> Self { let (pending_file_count_tx, pending_file_count_rx) = watch::channel_with(0); let pending_file_count_tx = Arc::new(Mutex::new(pending_file_count_tx)); - let (rate_limiting_count_tx, rate_limiting_count_rx) = watch::channel_with(0); - let rate_limiting_count_tx = Arc::new(Mutex::new(rate_limiting_count_tx)); Self { worktrees: Default::default(), pending_file_count_rx: pending_file_count_rx.clone(), pending_file_count_tx, pending_index: 0, - rate_limiting_count_rx: rate_limiting_count_rx.clone(), - rate_limiting_count_tx, _subscription: subscription, _observe_pending_file_count: cx.spawn_weak({ let mut pending_file_count_rx = pending_file_count_rx.clone(); @@ -299,7 +293,7 @@ impl SemanticIndex { } else { SemanticIndexStatus::Indexing { remaining_files: project_state.pending_file_count_rx.borrow().clone(), - rate_limiting: self.embedding_provider.rate_limit_expiration(), + rate_limit_expiration_time: self.embedding_provider.rate_limit_expiration(), } } } else { diff --git a/crates/semantic_index/src/semantic_index_tests.rs b/crates/semantic_index/src/semantic_index_tests.rs index 09c94b9a94..4bc95bec62 100644 --- a/crates/semantic_index/src/semantic_index_tests.rs +++ b/crates/semantic_index/src/semantic_index_tests.rs @@ -21,7 +21,7 @@ use std::{ atomic::{self, AtomicUsize}, Arc, }, - time::{Duration, SystemTime}, + time::SystemTime, }; use unindent::Unindent; use util::RandomCharIter; @@ -1275,8 +1275,8 @@ impl EmbeddingProvider for FakeEmbeddingProvider { 200 } - fn rate_limit_expiration(&self) -> Duration { - Duration::ZERO + fn rate_limit_expiration(&self) -> Option { + None } async fn embed_batch(&self, spans: Vec) -> Result> {