Merged origin/master into Add new components

This commit is contained in:
Pavel Laptev 2023-12-06 16:37:53 +01:00 committed by GitButler
commit f53c3ca66d
6 changed files with 189 additions and 48 deletions

View File

@ -40,7 +40,7 @@ impl Event {
}
}
fn into_posthog_event(self, user: &User) -> Result<posthog::Event, posthog::Error> {
fn into_posthog_event(self, user: &User) -> posthog::Event {
match self {
Event::HeadChange {
project_id,
@ -48,9 +48,9 @@ impl Event {
} => {
let mut event =
posthog::Event::new("git::head_changed", &format!("user_{}", user.id));
event.insert_prop("project_id", format!("project_{}", project_id))?;
event.insert_prop("reference", reference)?;
Ok(event)
event.insert_prop("project_id", format!("project_{}", project_id));
event.insert_prop("reference", reference);
event
}
}
}
@ -65,11 +65,13 @@ impl Client {
pub fn new(app_handle: &AppHandle, config: &Config) -> Self {
let client: Box<dyn posthog::Client + Sync + Send> =
if let Some(posthog_token) = config.posthog_token {
Box::new(posthog::real::Client::new(posthog::real::ClientOptions {
let real = posthog::real::Client::new(posthog::real::ClientOptions {
api_key: posthog_token.to_string(),
app_name: app_handle.package_info().name.clone(),
app_version: app_handle.package_info().version.to_string(),
}))
});
let real_with_retry = posthog::retry::Client::new(real);
Box::new(real_with_retry)
} else {
Box::<posthog::mock::Client>::default()
};
@ -78,10 +80,13 @@ impl Client {
}
}
pub async fn send(&self, user: &User, event: &Event) -> Result<(), posthog::Error> {
self.client
.capture(event.clone().into_posthog_event(user)?)
.await?;
Ok(())
pub async fn send(&self, user: &User, event: &Event) {
if let Err(error) = self
.client
.capture(&[event.clone().into_posthog_event(user)])
.await
{
tracing::warn!(?error, "failed to send analytics");
}
}
}

View File

@ -1,5 +1,6 @@
pub mod mock;
pub mod real;
pub mod retry;
use std::collections::HashMap;
@ -9,18 +10,20 @@ use serde::Serialize;
#[async_trait]
pub trait Client {
async fn capture(&self, event: Event) -> Result<(), Error>;
async fn capture(&self, events: &[Event]) -> Result<(), Error>;
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("{code}: {message}")]
BadRequest { code: u16, message: String },
#[error("Connection error: {0}")]
Connection(#[from] reqwest::Error),
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
}
#[derive(Serialize, Debug, PartialEq, Eq)]
#[derive(Serialize, Debug, PartialEq, Eq, Clone)]
pub struct Event {
event: String,
properties: Properties,
@ -41,10 +44,10 @@ impl Properties {
}
}
pub fn insert<K: Into<String>, P: Serialize>(&mut self, key: K, prop: P) -> Result<(), Error> {
let as_json = serde_json::to_value(prop)?;
pub fn insert<K: Into<String>, P: Serialize>(&mut self, key: K, prop: P) {
let as_json =
serde_json::to_value(prop).expect("safe serialization of a analytics property");
let _ = self.props.insert(key.into(), as_json);
Ok(())
}
}
@ -58,12 +61,7 @@ impl Event {
}
/// Errors if `prop` fails to serialize
pub fn insert_prop<K: Into<String>, P: Serialize>(
&mut self,
key: K,
prop: P,
) -> Result<(), Error> {
self.properties.insert(key, prop)?;
Ok(())
pub fn insert_prop<K: Into<String>, P: Serialize>(&mut self, key: K, prop: P) {
self.properties.insert(key, prop);
}
}

View File

@ -7,7 +7,7 @@ pub struct Client;
#[async_trait]
impl super::Client for Client {
#[instrument(skip(self), level = "debug")]
async fn capture(&self, _event: super::Event) -> Result<(), super::Error> {
async fn capture(&self, _events: &[super::Event]) -> Result<(), super::Error> {
Ok(())
}
}

View File

@ -3,12 +3,11 @@ use std::time::Duration;
use async_trait::async_trait;
use chrono::NaiveDateTime;
use reqwest::{header::CONTENT_TYPE, Client as HttpClient};
use serde::Serialize;
use serde_json;
use tracing::instrument;
const API_ENDPOINT: &str = "https://eu.posthog.com/capture/";
const API_ENDPOINT: &str = "https://eu.posthog.com/batch/";
const TIMEOUT: &Duration = &Duration::from_millis(800);
pub struct ClientOptions {
@ -35,39 +34,61 @@ impl Client {
#[async_trait]
impl super::Client for Client {
#[instrument(skip(self), level = "debug")]
async fn capture(&self, event: super::Event) -> Result<(), super::Error> {
let mut event = event;
event
.properties
.insert("appName", self.options.app_name.clone())?;
event
.properties
.insert("appVersion", self.options.app_version.clone())?;
let inner_event = InnerEvent::new(&event, self.options.api_key.clone());
let _res = self
async fn capture(&self, events: &[super::Event]) -> Result<(), super::Error> {
let events = events
.iter()
.map(|event| {
let event = &mut event.clone();
event
.properties
.insert("appName", self.options.app_name.clone());
event
.properties
.insert("appVersion", self.options.app_version.clone());
Event::from(event)
})
.collect::<Vec<_>>();
let batch = Batch {
api_key: &self.options.api_key,
batch: events.as_slice(),
};
let response = self
.client
.post(API_ENDPOINT)
.header(CONTENT_TYPE, "application/json")
.body(serde_json::to_string(&inner_event).expect("unwrap here is safe"))
.body(serde_json::to_string(&batch)?)
.send()
.await?;
Ok(())
if response.status().is_success() {
Ok(())
} else {
Err(super::Error::BadRequest {
code: response.status().as_u16(),
message: response.text().await.unwrap_or_default(),
})
}
}
}
// This exists so that the client doesn't have to specify the API key over and over
#[derive(Serialize)]
struct InnerEvent {
api_key: String,
struct Batch<'a> {
api_key: &'a str,
batch: &'a [Event],
}
#[derive(Serialize)]
struct Event {
event: String,
properties: super::Properties,
timestamp: Option<NaiveDateTime>,
}
impl InnerEvent {
fn new(event: &super::Event, api_key: String) -> Self {
impl From<&mut super::Event> for Event {
fn from(event: &mut super::Event) -> Self {
Self {
api_key,
event: event.event.clone(),
properties: event.properties.clone(),
timestamp: event.timestamp,

View File

@ -0,0 +1,120 @@
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::Mutex;
use tracing::instrument;
#[derive(Clone)]
pub struct Client<T: super::Client + Sync> {
inner: T,
/// Events that failed to be sent
/// and are waiting to be retried.
batch: Arc<Mutex<Vec<super::Event>>>,
}
impl<T: super::Client + Sync> Client<T> {
pub fn new(inner: T) -> Self {
Client {
inner,
batch: Arc::new(Mutex::new(Vec::new())),
}
}
}
#[async_trait]
impl<T: super::Client + Sync> super::Client for Client<T> {
#[instrument(skip(self), level = "debug")]
async fn capture(&self, events: &[super::Event]) -> Result<(), super::Error> {
let mut batch = self.batch.lock().await;
batch.extend_from_slice(events);
if let Err(error) = self.inner.capture(&batch).await {
tracing::warn!("Failed to send analytics: {}", error);
} else {
batch.clear();
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use super::super::Client;
#[derive(Clone)]
struct MockClient {
sent: Arc<AtomicUsize>,
is_failing: Arc<AtomicBool>,
}
impl MockClient {
fn new() -> Self {
MockClient {
sent: Arc::new(AtomicUsize::new(0)),
is_failing: Arc::new(AtomicBool::new(false)),
}
}
fn set_failing(&self, is_failing: bool) {
self.is_failing.store(is_failing, Ordering::SeqCst);
}
fn get_sent(&self) -> usize {
self.sent.load(Ordering::SeqCst)
}
}
#[async_trait]
impl super::super::Client for MockClient {
async fn capture(&self, events: &[super::super::Event]) -> Result<(), super::super::Error> {
if self.is_failing.load(Ordering::SeqCst) {
Err(super::super::Error::BadRequest {
code: 400,
message: "Bad request".to_string(),
})
} else {
self.sent.fetch_add(events.len(), Ordering::SeqCst);
Ok(())
}
}
}
#[tokio::test]
async fn test_retry() {
let inner_client = MockClient::new();
let retry_client = super::Client::new(inner_client.clone());
inner_client.set_failing(true);
retry_client
.capture(&[super::super::Event::new("test", "test")])
.await
.unwrap();
assert_eq!(inner_client.get_sent(), 0);
retry_client
.capture(&[super::super::Event::new("test", "test")])
.await
.unwrap();
assert_eq!(inner_client.get_sent(), 0);
inner_client.set_failing(false);
retry_client
.capture(&[super::super::Event::new("test", "test")])
.await
.unwrap();
assert_eq!(inner_client.get_sent(), 3);
retry_client
.capture(&[super::super::Event::new("test", "test")])
.await
.unwrap();
assert_eq!(inner_client.get_sent(), 4);
}
}

View File

@ -23,10 +23,7 @@ impl From<&AppHandle> for Handler {
impl Handler {
pub async fn handle(&self, event: &analytics::Event) -> Result<Vec<events::Event>> {
if let Some(user) = self.users.get_user().context("failed to get user")? {
self.client
.send(&user, event)
.await
.context("failed to send event")?;
self.client.send(&user, event).await;
}
Ok(vec![])
}