diff --git a/crates/semantic_index/src/db.rs b/crates/semantic_index/src/db.rs index e8c929c995..e57a5d733f 100644 --- a/crates/semantic_index/src/db.rs +++ b/crates/semantic_index/src/db.rs @@ -156,25 +156,27 @@ impl VectorDatabase { mtime: SystemTime, documents: Vec, ) -> Result<()> { - // Write to files table, and return generated id. - self.db.execute( - " - DELETE FROM files WHERE worktree_id = ?1 AND relative_path = ?2; - ", - params![worktree_id, path.to_str()], - )?; + // Return the existing ID, if both the file and mtime match let mtime = Timestamp::from(mtime); - self.db.execute( - " - INSERT INTO files - (worktree_id, relative_path, mtime_seconds, mtime_nanos) - VALUES - (?1, ?2, $3, $4); - ", - params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos], - )?; - - let file_id = self.db.last_insert_rowid(); + let mut existing_id_query = self.db.prepare("SELECT id FROM files WHERE worktree_id = ?1 AND relative_path = ?2 AND mtime_seconds = ?3 AND mtime_nanos = ?4")?; + let existing_id = existing_id_query + .query_row( + params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos], + |row| Ok(row.get::<_, i64>(0)?), + ) + .map_err(|err| anyhow!(err)); + let file_id = if existing_id.is_ok() { + // If already exists, just return the existing id + existing_id.unwrap() + } else { + // Delete Existing Row + self.db.execute( + "DELETE FROM files WHERE worktree_id = ?1 AND relative_path = ?2;", + params![worktree_id, path.to_str()], + )?; + self.db.execute("INSERT INTO files (worktree_id, relative_path, mtime_seconds, mtime_nanos) VALUES (?1, ?2, ?3, ?4);", params![worktree_id, path.to_str(), mtime.seconds, mtime.nanos])?; + self.db.last_insert_rowid() + }; // Currently inserting at approximately 3400 documents a second // I imagine we can speed this up with a bulk insert of some kind. diff --git a/crates/semantic_index/src/semantic_index.rs b/crates/semantic_index/src/semantic_index.rs index 8c9877b9d3..dd53215203 100644 --- a/crates/semantic_index/src/semantic_index.rs +++ b/crates/semantic_index/src/semantic_index.rs @@ -96,6 +96,7 @@ struct ProjectState { _outstanding_job_count_tx: Arc>>, } +#[derive(Clone)] struct JobHandle { tx: Weak>>, } @@ -389,6 +390,7 @@ impl SemanticIndex { embeddings_queue: &mut Vec<(i64, Vec, PathBuf, SystemTime, JobHandle)>, embed_batch_tx: &channel::Sender, PathBuf, SystemTime, JobHandle)>>, ) { + // Handle edge case where individual file has more documents than max batch size let should_flush = match job { EmbeddingJob::Enqueue { documents, @@ -397,9 +399,43 @@ impl SemanticIndex { mtime, job_handle, } => { - *queue_len += &documents.len(); - embeddings_queue.push((worktree_id, documents, path, mtime, job_handle)); - *queue_len >= EMBEDDINGS_BATCH_SIZE + // If documents is greater than embeddings batch size, recursively batch existing rows. + if &documents.len() > &EMBEDDINGS_BATCH_SIZE { + let first_job = EmbeddingJob::Enqueue { + documents: documents[..EMBEDDINGS_BATCH_SIZE].to_vec(), + worktree_id, + path: path.clone(), + mtime, + job_handle: job_handle.clone(), + }; + + Self::enqueue_documents_to_embed( + first_job, + queue_len, + embeddings_queue, + embed_batch_tx, + ); + + let second_job = EmbeddingJob::Enqueue { + documents: documents[EMBEDDINGS_BATCH_SIZE..].to_vec(), + worktree_id, + path: path.clone(), + mtime, + job_handle: job_handle.clone(), + }; + + Self::enqueue_documents_to_embed( + second_job, + queue_len, + embeddings_queue, + embed_batch_tx, + ); + return; + } else { + *queue_len += &documents.len(); + embeddings_queue.push((worktree_id, documents, path, mtime, job_handle)); + *queue_len >= EMBEDDINGS_BATCH_SIZE + } } EmbeddingJob::Flush => true, }; @@ -796,7 +832,10 @@ impl Drop for JobHandle { fn drop(&mut self) { if let Some(tx) = self.tx.upgrade() { let mut tx = tx.lock(); - *tx.borrow_mut() -= 1; + // Manage for overflow, cause we are cloning the Job Handle + if *tx.borrow() > 0 { + *tx.borrow_mut() -= 1; + }; } } }