Working PostgresML backend with resyncing

This commit is contained in:
Silas Marvin 2024-06-18 20:03:10 -07:00
parent cbe487ca3a
commit 3e8c99b237
4 changed files with 118 additions and 13 deletions

1
Cargo.lock generated
View File

@ -1569,6 +1569,7 @@ dependencies = [
"llama-cpp-2",
"lsp-server",
"lsp-types",
"md5",
"minijinja",
"once_cell",
"parking_lot",

View File

@ -36,6 +36,7 @@ tree-sitter = "0.22"
utils-tree-sitter = { workspace = true, features = ["all"] }
splitter-tree-sitter = { workspace = true }
text-splitter = { version = "0.13.3" }
md5 = "0.7.0"
[build-dependencies]
cc="*"

View File

@ -57,7 +57,6 @@ impl Crawl {
for result in WalkBuilder::new(&root_uri[7..]).build() {
let result = result?;
let path = result.path();
eprintln!("CRAWLING: {}", path.display());
if !path.is_dir() {
if let Some(path_str) = path.to_str() {
if self.crawl_config.all_files {

View File

@ -2,6 +2,7 @@ use anyhow::Context;
use lsp_types::TextDocumentPositionParams;
use parking_lot::Mutex;
use pgml::{Collection, Pipeline};
use rand::{distributions::Alphanumeric, Rng};
use serde_json::{json, Value};
use std::{
io::Read,
@ -26,6 +27,8 @@ use super::{
ContextAndCodePrompt, FIMPrompt, MemoryBackend, MemoryRunParams, Prompt, PromptType,
};
const RESYNC_MAX_FILE_SIZE: u64 = 10_000_000;
fn chunk_to_document(uri: &str, chunk: Chunk) -> Value {
json!({
"id": chunk_to_id(uri, &chunk),
@ -94,11 +97,21 @@ impl PostgresML {
let database_url = if let Some(database_url) = postgresml_config.database_url {
database_url
} else {
std::env::var("PGML_DATABASE_URL")?
std::env::var("PGML_DATABASE_URL").context("please provide either the `database_url` in the `postgresml` config, or set the `PGML_DATABASE_URL` environment variable")?
};
// TODO: Think through Collections and Pipelines
let mut collection = Collection::new("test-lsp-ai-5", Some(database_url))?;
let collection_name = match configuration.client_params.root_uri.clone() {
Some(root_uri) => format!("{:x}", md5::compute(root_uri.as_bytes())),
None => {
warn!("no root_uri provided in server configuration - generating random string for collection name");
rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(21)
.map(char::from)
.collect()
}
};
let mut collection = Collection::new(&collection_name, Some(database_url))?;
let mut pipeline = Pipeline::new(
"v1",
Some(
@ -145,7 +158,6 @@ impl PostgresML {
if file_uris.is_empty() {
continue;
}
// Build the chunks for our changed files
let chunks: Vec<Vec<Chunk>> = match file_uris
.iter()
@ -160,11 +172,10 @@ impl PostgresML {
{
Ok(chunks) => chunks,
Err(e) => {
error!("{e}");
error!("{e:?}");
continue;
}
};
// Delete old chunks that no longer exist after the latest file changes
let delete_or_statements: Vec<Value> = file_uris
.iter()
@ -196,10 +207,10 @@ impl PostgresML {
.into(),
)
.await
.context("PGML - error deleting documents")
{
error!("PGML - Error deleting file: {e:?}");
error!("{e:?}");
}
// Prepare and upsert our new chunks
let documents: Vec<pgml::types::Json> = chunks
.into_iter()
@ -218,7 +229,7 @@ impl PostgresML {
.await
.context("PGML - Error upserting changed files")
{
error!("{e}");
error!("{e:?}");
continue;
}
@ -237,12 +248,105 @@ impl PostgresML {
splitter,
};
// Resync our Collection
let task_s = s.clone();
TOKIO_RUNTIME.spawn(async move {
if let Err(e) = task_s.resync().await {
error!("{e:?}")
}
});
if let Err(e) = s.maybe_do_crawl(None) {
error!("{e}")
error!("{e:?}")
}
Ok(s)
}
async fn resync(&self) -> anyhow::Result<()> {
let mut collection = self.collection.clone();
let documents = collection
.get_documents(Some(
json!({
"limit": 100_000_000,
"keys": ["uri"]
})
.into(),
))
.await?;
let try_get_file_contents = |path: &std::path::Path| {
// Open the file and see if it is small enough to read
let mut f = std::fs::File::open(path)?;
let metadata = f.metadata()?;
if metadata.len() > RESYNC_MAX_FILE_SIZE {
anyhow::bail!("file size is greater than: {RESYNC_MAX_FILE_SIZE}")
}
// Read the file contents
let mut contents = vec![];
f.read_to_end(&mut contents)?;
anyhow::Ok(String::from_utf8(contents)?)
};
let mut documents_to_delete = vec![];
let mut chunks_to_upsert = vec![];
let mut current_chunks_bytes = 0;
for document in documents.into_iter() {
let uri = match document["document"]["uri"].as_str() {
Some(uri) => uri,
None => continue, // This should never happen, but is really bad as we now have a document with essentially no way to delete it
};
let path = uri.replace("file://", "");
let path = std::path::Path::new(&path);
if !path.exists() {
documents_to_delete.push(uri.to_string());
} else {
// Try to read the file. If we fail delete it
let contents = match try_get_file_contents(path) {
Ok(contents) => contents,
Err(e) => {
error!("{e:?}");
documents_to_delete.push(uri.to_string());
continue;
}
};
// Split the file into chunks
current_chunks_bytes += contents.len();
let chunks: Vec<pgml::types::Json> = self
.splitter
.split_file_contents(&uri, &contents)
.into_iter()
.map(|chunk| chunk_to_document(&uri, chunk).into())
.collect();
chunks_to_upsert.extend(chunks);
// If we have over 10 mega bytes of chunks do the upsert
if current_chunks_bytes > 10_000_000 {
collection
.upsert_documents(chunks_to_upsert, None)
.await
.context("PGML - error upserting documents during resync")?;
}
chunks_to_upsert = vec![];
}
}
// Delete documents
if !documents_to_delete.is_empty() {
collection
.delete_documents(
json!({
"uri": {
"$in": documents_to_delete
}
})
.into(),
)
.await
.context("PGML - error deleting documents during resync")?;
}
Ok(())
}
fn maybe_do_crawl(&self, triggered_file: Option<String>) -> anyhow::Result<()> {
if let Some(crawl) = &self.crawl {
let mut documents = vec![];
@ -281,8 +385,8 @@ impl PostgresML {
.map(|chunk| chunk_to_document(&uri, chunk).into())
.collect();
documents.extend(chunks);
// If we have over 100 mega bytes of data do the upsert
if current_bytes >= 100_000_000 || total_bytes as u64 >= config.max_crawl_memory
// If we have over 10 mega bytes of data do the upsert
if current_bytes >= 10_000_000 || total_bytes as u64 >= config.max_crawl_memory
{
// Upsert the documents
let mut collection = self.collection.clone();