From a9441879c3ee55e27bf685750054dad1ab6817e9 Mon Sep 17 00:00:00 2001 From: Marshall Bowers Date: Fri, 16 Aug 2024 14:03:34 -0400 Subject: [PATCH] collab: Fix writing LLM rate limit events to Clickhouse (#16367) This PR fixes the writing of LLM rate limit events to Clickhouse. We had a table in the table name: `llm_rate_limits` instead of `llm_rate_limit_events`. I also extracted a helper function to write to Clickhouse so we can use it anywhere we need to. Release Notes: - N/A --- crates/collab/src/api/events.rs | 51 ++++++++---------------------- crates/collab/src/clickhouse.rs | 28 ++++++++++++++++ crates/collab/src/lib.rs | 7 ++-- crates/collab/src/llm/telemetry.rs | 18 +++++++---- 4 files changed, 56 insertions(+), 48 deletions(-) create mode 100644 crates/collab/src/clickhouse.rs diff --git a/crates/collab/src/api/events.rs b/crates/collab/src/api/events.rs index a6afc98bfc..9331107259 100644 --- a/crates/collab/src/api/events.rs +++ b/crates/collab/src/api/events.rs @@ -1,5 +1,6 @@ use super::ips_file::IpsFile; use crate::api::CloudflareIpCountryHeader; +use crate::clickhouse::write_to_table; use crate::{api::slack, AppState, Error, Result}; use anyhow::{anyhow, Context}; use aws_sdk_s3::primitives::ByteStream; @@ -529,12 +530,12 @@ struct ToUpload { impl ToUpload { pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> { const EDITOR_EVENTS_TABLE: &str = "editor_events"; - Self::upload_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client) + write_to_table(EDITOR_EVENTS_TABLE, &self.editor_events, clickhouse_client) .await .with_context(|| format!("failed to upload to table '{EDITOR_EVENTS_TABLE}'"))?; const INLINE_COMPLETION_EVENTS_TABLE: &str = "inline_completion_events"; - Self::upload_to_table( + write_to_table( INLINE_COMPLETION_EVENTS_TABLE, &self.inline_completion_events, clickhouse_client, @@ -543,7 +544,7 @@ impl ToUpload { .with_context(|| format!("failed to upload to table '{INLINE_COMPLETION_EVENTS_TABLE}'"))?; const ASSISTANT_EVENTS_TABLE: &str = "assistant_events"; - Self::upload_to_table( + write_to_table( ASSISTANT_EVENTS_TABLE, &self.assistant_events, clickhouse_client, @@ -552,27 +553,27 @@ impl ToUpload { .with_context(|| format!("failed to upload to table '{ASSISTANT_EVENTS_TABLE}'"))?; const CALL_EVENTS_TABLE: &str = "call_events"; - Self::upload_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client) + write_to_table(CALL_EVENTS_TABLE, &self.call_events, clickhouse_client) .await .with_context(|| format!("failed to upload to table '{CALL_EVENTS_TABLE}'"))?; const CPU_EVENTS_TABLE: &str = "cpu_events"; - Self::upload_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client) + write_to_table(CPU_EVENTS_TABLE, &self.cpu_events, clickhouse_client) .await .with_context(|| format!("failed to upload to table '{CPU_EVENTS_TABLE}'"))?; const MEMORY_EVENTS_TABLE: &str = "memory_events"; - Self::upload_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client) + write_to_table(MEMORY_EVENTS_TABLE, &self.memory_events, clickhouse_client) .await .with_context(|| format!("failed to upload to table '{MEMORY_EVENTS_TABLE}'"))?; const APP_EVENTS_TABLE: &str = "app_events"; - Self::upload_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client) + write_to_table(APP_EVENTS_TABLE, &self.app_events, clickhouse_client) .await .with_context(|| format!("failed to upload to table '{APP_EVENTS_TABLE}'"))?; const SETTING_EVENTS_TABLE: &str = "setting_events"; - Self::upload_to_table( + write_to_table( SETTING_EVENTS_TABLE, &self.setting_events, clickhouse_client, @@ -581,7 +582,7 @@ impl ToUpload { .with_context(|| format!("failed to upload to table '{SETTING_EVENTS_TABLE}'"))?; const EXTENSION_EVENTS_TABLE: &str = "extension_events"; - Self::upload_to_table( + write_to_table( EXTENSION_EVENTS_TABLE, &self.extension_events, clickhouse_client, @@ -590,48 +591,22 @@ impl ToUpload { .with_context(|| format!("failed to upload to table '{EXTENSION_EVENTS_TABLE}'"))?; const EDIT_EVENTS_TABLE: &str = "edit_events"; - Self::upload_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client) + write_to_table(EDIT_EVENTS_TABLE, &self.edit_events, clickhouse_client) .await .with_context(|| format!("failed to upload to table '{EDIT_EVENTS_TABLE}'"))?; const ACTION_EVENTS_TABLE: &str = "action_events"; - Self::upload_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client) + write_to_table(ACTION_EVENTS_TABLE, &self.action_events, clickhouse_client) .await .with_context(|| format!("failed to upload to table '{ACTION_EVENTS_TABLE}'"))?; const REPL_EVENTS_TABLE: &str = "repl_events"; - Self::upload_to_table(REPL_EVENTS_TABLE, &self.repl_events, clickhouse_client) + write_to_table(REPL_EVENTS_TABLE, &self.repl_events, clickhouse_client) .await .with_context(|| format!("failed to upload to table '{REPL_EVENTS_TABLE}'"))?; Ok(()) } - - async fn upload_to_table( - table: &str, - rows: &[T], - clickhouse_client: &clickhouse::Client, - ) -> anyhow::Result<()> { - if rows.is_empty() { - return Ok(()); - } - - let mut insert = clickhouse_client.insert(table)?; - - for event in rows { - insert.write(event).await?; - } - - insert.end().await?; - - let event_count = rows.len(); - log::info!( - "wrote {event_count} {event_specifier} to '{table}'", - event_specifier = if event_count == 1 { "event" } else { "events" } - ); - - Ok(()) - } } pub fn serialize_country_code(country_code: &str, serializer: S) -> Result diff --git a/crates/collab/src/clickhouse.rs b/crates/collab/src/clickhouse.rs new file mode 100644 index 0000000000..2937116bad --- /dev/null +++ b/crates/collab/src/clickhouse.rs @@ -0,0 +1,28 @@ +use serde::Serialize; + +/// Writes the given rows to the specified Clickhouse table. +pub async fn write_to_table( + table: &str, + rows: &[T], + clickhouse_client: &clickhouse::Client, +) -> anyhow::Result<()> { + if rows.is_empty() { + return Ok(()); + } + + let mut insert = clickhouse_client.insert(table)?; + + for event in rows { + insert.write(event).await?; + } + + insert.end().await?; + + let event_count = rows.len(); + log::info!( + "wrote {event_count} {event_specifier} to '{table}'", + event_specifier = if event_count == 1 { "event" } else { "events" } + ); + + Ok(()) +} diff --git a/crates/collab/src/lib.rs b/crates/collab/src/lib.rs index 9cae7713dc..81cc334c43 100644 --- a/crates/collab/src/lib.rs +++ b/crates/collab/src/lib.rs @@ -1,5 +1,6 @@ pub mod api; pub mod auth; +pub mod clickhouse; pub mod db; pub mod env; pub mod executor; @@ -267,7 +268,7 @@ pub struct AppState { pub stripe_client: Option>, pub rate_limiter: Arc, pub executor: Executor, - pub clickhouse_client: Option, + pub clickhouse_client: Option<::clickhouse::Client>, pub config: Config, } @@ -358,8 +359,8 @@ async fn build_blob_store_client(config: &Config) -> anyhow::Result anyhow::Result { - Ok(clickhouse::Client::default() +fn build_clickhouse_client(config: &Config) -> anyhow::Result<::clickhouse::Client> { + Ok(::clickhouse::Client::default() .with_url( config .clickhouse_url diff --git a/crates/collab/src/llm/telemetry.rs b/crates/collab/src/llm/telemetry.rs index ac90bd265a..17a2cb9cd3 100644 --- a/crates/collab/src/llm/telemetry.rs +++ b/crates/collab/src/llm/telemetry.rs @@ -1,6 +1,8 @@ -use anyhow::Result; +use anyhow::{Context, Result}; use serde::Serialize; +use crate::clickhouse::write_to_table; + #[derive(Serialize, Debug, clickhouse::Row)] pub struct LlmUsageEventRow { pub time: i64, @@ -40,9 +42,10 @@ pub struct LlmRateLimitEventRow { } pub async fn report_llm_usage(client: &clickhouse::Client, row: LlmUsageEventRow) -> Result<()> { - let mut insert = client.insert("llm_usage_events")?; - insert.write(&row).await?; - insert.end().await?; + const LLM_USAGE_EVENTS_TABLE: &str = "llm_usage_events"; + write_to_table(LLM_USAGE_EVENTS_TABLE, &[row], client) + .await + .with_context(|| format!("failed to upload to table '{LLM_USAGE_EVENTS_TABLE}'"))?; Ok(()) } @@ -50,8 +53,9 @@ pub async fn report_llm_rate_limit( client: &clickhouse::Client, row: LlmRateLimitEventRow, ) -> Result<()> { - let mut insert = client.insert("llm_rate_limits")?; - insert.write(&row).await?; - insert.end().await?; + const LLM_RATE_LIMIT_EVENTS_TABLE: &str = "llm_rate_limit_events"; + write_to_table(LLM_RATE_LIMIT_EVENTS_TABLE, &[row], client) + .await + .with_context(|| format!("failed to upload to table '{LLM_RATE_LIMIT_EVENTS_TABLE}'"))?; Ok(()) }