collab: Rework Stripe event processing (#15510)

This PR reworks how we process Stripe events for reconciliation
purposes.

The previous approach in #15480 turns out to not be workable, on account
of the Stripe event IDs not being strictly in order. This meant that we
couldn't reliably compare two arbitrary event IDs and determine which
one was more recent.

This new approach leans on the guidance that Stripe provides for
webhooks events:

> Webhook endpoints might occasionally receive the same event more than
once. You can guard against duplicated event receipts by logging the
[event IDs](https://docs.stripe.com/api/events/object#event_object-id)
you’ve processed, and then not processing already-logged events.
>
> https://docs.stripe.com/webhooks#handle-duplicate-events

We now record processed Stripe events in the `processed_stripe_events`
table and use this to filter out events that have already been
processed, so we do not process them again.

When retrieving events from the Stripe events API we now buffer the
unprocessed events so that we can sort them by their `created` timestamp
and process them in (roughly) the order they occurred.

Release Notes:

- N/A
This commit is contained in:
Marshall Bowers 2024-07-30 16:35:11 -04:00 committed by GitHub
parent dca9400edf
commit 7c5f4b72fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 242 additions and 158 deletions

View File

@ -417,25 +417,32 @@ CREATE TABLE dev_server_projects (
paths TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS billing_customers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
user_id INTEGER NOT NULL REFERENCES users(id),
stripe_customer_id TEXT NOT NULL
);
CREATE UNIQUE INDEX "uix_billing_customers_on_user_id" ON billing_customers (user_id);
CREATE UNIQUE INDEX "uix_billing_customers_on_stripe_customer_id" ON billing_customers (stripe_customer_id);
CREATE TABLE IF NOT EXISTS billing_subscriptions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
billing_customer_id INTEGER NOT NULL REFERENCES billing_customers(id),
stripe_subscription_id TEXT NOT NULL,
stripe_subscription_status TEXT NOT NULL,
last_stripe_event_id TEXT
stripe_subscription_status TEXT NOT NULL
);
CREATE INDEX "ix_billing_subscriptions_on_billing_customer_id" ON billing_subscriptions (billing_customer_id);
CREATE UNIQUE INDEX "uix_billing_subscriptions_on_stripe_subscription_id" ON billing_subscriptions (stripe_subscription_id);
CREATE TABLE IF NOT EXISTS billing_customers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
user_id INTEGER NOT NULL REFERENCES users(id),
stripe_customer_id TEXT NOT NULL,
last_stripe_event_id TEXT
CREATE TABLE IF NOT EXISTS processed_stripe_events (
stripe_event_id TEXT PRIMARY KEY,
stripe_event_type TEXT NOT NULL,
stripe_event_created_timestamp INTEGER NOT NULL,
processed_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE UNIQUE INDEX "uix_billing_customers_on_user_id" ON billing_customers (user_id);
CREATE UNIQUE INDEX "uix_billing_customers_on_stripe_customer_id" ON billing_customers (stripe_customer_id);
CREATE INDEX "ix_processed_stripe_events_on_stripe_event_created_timestamp" ON processed_stripe_events (stripe_event_created_timestamp);

View File

@ -0,0 +1,11 @@
ALTER TABLE billing_customers DROP COLUMN last_stripe_event_id;
ALTER TABLE billing_subscriptions DROP COLUMN last_stripe_event_id;
CREATE TABLE IF NOT EXISTS processed_stripe_events (
stripe_event_id TEXT PRIMARY KEY,
stripe_event_type TEXT NOT NULL,
stripe_event_created_timestamp BIGINT NOT NULL,
processed_at TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT now()
);
CREATE INDEX "ix_processed_stripe_events_on_stripe_event_created_timestamp" ON processed_stripe_events (stripe_event_created_timestamp);

View File

@ -12,7 +12,7 @@ use stripe::{
CreateBillingPortalSessionFlowData, CreateBillingPortalSessionFlowDataAfterCompletion,
CreateBillingPortalSessionFlowDataAfterCompletionRedirect,
CreateBillingPortalSessionFlowDataType, CreateCheckoutSession, CreateCheckoutSessionLineItems,
CreateCustomer, Customer, CustomerId, EventId, EventObject, EventType, Expandable, ListEvents,
CreateCustomer, Customer, CustomerId, EventObject, EventType, Expandable, ListEvents,
SubscriptionStatus,
};
use util::ResultExt;
@ -20,7 +20,8 @@ use util::ResultExt;
use crate::db::billing_subscription::StripeSubscriptionStatus;
use crate::db::{
billing_customer, BillingSubscriptionId, CreateBillingCustomerParams,
CreateBillingSubscriptionParams, UpdateBillingCustomerParams, UpdateBillingSubscriptionParams,
CreateBillingSubscriptionParams, CreateProcessedStripeEventParams, UpdateBillingCustomerParams,
UpdateBillingSubscriptionParams,
};
use crate::{AppState, Error, Result};
@ -230,23 +231,27 @@ async fn poll_stripe_events(
app: &Arc<AppState>,
stripe_client: &stripe::Client,
) -> anyhow::Result<()> {
let event_types = [
EventType::CustomerCreated.to_string(),
EventType::CustomerUpdated.to_string(),
EventType::CustomerSubscriptionCreated.to_string(),
EventType::CustomerSubscriptionUpdated.to_string(),
EventType::CustomerSubscriptionPaused.to_string(),
EventType::CustomerSubscriptionResumed.to_string(),
EventType::CustomerSubscriptionDeleted.to_string(),
]
.into_iter()
.map(|event_type| {
fn event_type_to_string(event_type: EventType) -> String {
// Calling `to_string` on `stripe::EventType` members gives us a quoted string,
// so we need to unquote it.
event_type.trim_matches('"').to_string()
})
event_type.to_string().trim_matches('"').to_string()
}
let event_types = [
EventType::CustomerCreated,
EventType::CustomerUpdated,
EventType::CustomerSubscriptionCreated,
EventType::CustomerSubscriptionUpdated,
EventType::CustomerSubscriptionPaused,
EventType::CustomerSubscriptionResumed,
EventType::CustomerSubscriptionDeleted,
]
.into_iter()
.map(event_type_to_string)
.collect::<Vec<_>>();
let mut unprocessed_events = Vec::new();
loop {
log::info!("retrieving events from Stripe: {}", event_types.join(", "));
@ -255,23 +260,27 @@ async fn poll_stripe_events(
params.limit = Some(100);
let events = stripe::Event::list(stripe_client, &params).await?;
let processed_event_ids = {
let event_ids = &events
.data
.iter()
.map(|event| event.id.as_str())
.collect::<Vec<_>>();
app.db
.get_processed_stripe_events_by_event_ids(event_ids)
.await?
.into_iter()
.map(|event| event.stripe_event_id)
.collect::<Vec<_>>()
};
for event in events.data {
match event.type_ {
EventType::CustomerCreated | EventType::CustomerUpdated => {
handle_customer_event(app, stripe_client, event)
.await
.log_err();
}
EventType::CustomerSubscriptionCreated
| EventType::CustomerSubscriptionUpdated
| EventType::CustomerSubscriptionPaused
| EventType::CustomerSubscriptionResumed
| EventType::CustomerSubscriptionDeleted => {
handle_customer_subscription_event(app, stripe_client, event)
.await
.log_err();
}
_ => {}
if processed_event_ids.contains(&event.id.to_string()) {
log::info!("Stripe event {} already processed: skipping", event.id);
} else {
unprocessed_events.push(event);
}
}
@ -280,6 +289,44 @@ async fn poll_stripe_events(
}
}
log::info!(
"unprocessed events from Stripe: {}",
unprocessed_events.len()
);
// Sort all of the unprocessed events in ascending order, so we can handle them in the order they occurred.
unprocessed_events.sort_by(|a, b| a.created.cmp(&b.created).then_with(|| a.id.cmp(&b.id)));
for event in unprocessed_events {
let processed_event_params = CreateProcessedStripeEventParams {
stripe_event_id: event.id.to_string(),
stripe_event_type: event_type_to_string(event.type_),
stripe_event_created_timestamp: event.created,
};
match event.type_ {
EventType::CustomerCreated | EventType::CustomerUpdated => {
handle_customer_event(app, stripe_client, event)
.await
.log_err();
}
EventType::CustomerSubscriptionCreated
| EventType::CustomerSubscriptionUpdated
| EventType::CustomerSubscriptionPaused
| EventType::CustomerSubscriptionResumed
| EventType::CustomerSubscriptionDeleted => {
handle_customer_subscription_event(app, stripe_client, event)
.await
.log_err();
}
_ => {}
}
app.db
.create_processed_stripe_event(&processed_event_params)
.await?;
}
Ok(())
}
@ -309,22 +356,12 @@ async fn handle_customer_event(
.get_billing_customer_by_stripe_customer_id(&customer.id)
.await?
{
if should_ignore_event(&event.id, existing_customer.last_stripe_event_id.as_deref()) {
log::info!(
"ignoring Stripe event {} based on last seen event ID",
event.id
);
return Ok(());
}
app.db
.update_billing_customer(
existing_customer.id,
&UpdateBillingCustomerParams {
// For now we just update the last event ID for the customer
// and leave the rest of the information as-is, as it is not
// For now we just leave the information as-is, as it is not
// likely to change.
last_stripe_event_id: ActiveValue::set(Some(event.id.to_string())),
..Default::default()
},
)
@ -334,7 +371,6 @@ async fn handle_customer_event(
.create_billing_customer(&CreateBillingCustomerParams {
user_id: user.id,
stripe_customer_id: customer.id.to_string(),
last_stripe_event_id: Some(event.id.to_string()),
})
.await?;
}
@ -353,37 +389,16 @@ async fn handle_customer_subscription_event(
log::info!("handling Stripe {} event: {}", event.type_, event.id);
let billing_customer = find_or_create_billing_customer(
app,
stripe_client,
// Even though we're handling a subscription event, we can still set
// the ID as the last seen event ID on the customer in the event that
// we have to create it.
//
// This is done to avoid any potential rollback in the customer's values
// if we then see an older event that pertains to the customer.
&event.id,
subscription.customer,
)
.await?
.ok_or_else(|| anyhow!("billing customer not found"))?;
let billing_customer =
find_or_create_billing_customer(app, stripe_client, subscription.customer)
.await?
.ok_or_else(|| anyhow!("billing customer not found"))?;
if let Some(existing_subscription) = app
.db
.get_billing_subscription_by_stripe_subscription_id(&subscription.id)
.await?
{
if should_ignore_event(
&event.id,
existing_subscription.last_stripe_event_id.as_deref(),
) {
log::info!(
"ignoring Stripe event {} based on last seen event ID",
event.id
);
return Ok(());
}
app.db
.update_billing_subscription(
existing_subscription.id,
@ -391,7 +406,6 @@ async fn handle_customer_subscription_event(
billing_customer_id: ActiveValue::set(billing_customer.id),
stripe_subscription_id: ActiveValue::set(subscription.id.to_string()),
stripe_subscription_status: ActiveValue::set(subscription.status.into()),
last_stripe_event_id: ActiveValue::set(Some(event.id.to_string())),
},
)
.await?;
@ -401,7 +415,6 @@ async fn handle_customer_subscription_event(
billing_customer_id: billing_customer.id,
stripe_subscription_id: subscription.id.to_string(),
stripe_subscription_status: subscription.status.into(),
last_stripe_event_id: Some(event.id.to_string()),
})
.await?;
}
@ -428,7 +441,6 @@ impl From<SubscriptionStatus> for StripeSubscriptionStatus {
async fn find_or_create_billing_customer(
app: &Arc<AppState>,
stripe_client: &stripe::Client,
event_id: &EventId,
customer_or_id: Expandable<Customer>,
) -> anyhow::Result<Option<billing_customer::Model>> {
let customer_id = match &customer_or_id {
@ -466,70 +478,8 @@ async fn find_or_create_billing_customer(
.create_billing_customer(&CreateBillingCustomerParams {
user_id: user.id,
stripe_customer_id: customer.id.to_string(),
last_stripe_event_id: Some(event_id.to_string()),
})
.await?;
Ok(Some(billing_customer))
}
/// Returns whether an [`Event`] should be ignored, based on its ID and the last
/// seen event ID for this object.
#[inline]
fn should_ignore_event(event_id: &EventId, last_event_id: Option<&str>) -> bool {
!should_apply_event(event_id, last_event_id)
}
/// Returns whether an [`Event`] should be applied, based on its ID and the last
/// seen event ID for this object.
fn should_apply_event(event_id: &EventId, last_event_id: Option<&str>) -> bool {
let Some(last_event_id) = last_event_id else {
return true;
};
event_id.as_str() < last_event_id
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_should_apply_event() {
let subscription_created_event = EventId::from_str("evt_1Pi5s9RxOf7d5PNafuZSGsmh").unwrap();
let subscription_updated_event = EventId::from_str("evt_1Pi5s9RxOf7d5PNa5UZLSsto").unwrap();
assert_eq!(
should_apply_event(
&subscription_created_event,
Some(subscription_created_event.as_str())
),
false,
"Events should not be applied when the IDs are the same."
);
assert_eq!(
should_apply_event(
&subscription_created_event,
Some(subscription_updated_event.as_str())
),
false,
"Events should not be applied when the last event ID is newer than the event ID."
);
assert_eq!(
should_apply_event(&subscription_created_event, None),
true,
"Events should be applied when we don't have a last event ID."
);
assert_eq!(
should_apply_event(
&subscription_updated_event,
Some(subscription_created_event.as_str())
),
true,
"Events should be applied when the event ID is newer than the last event ID."
);
}
}

View File

@ -50,6 +50,7 @@ pub use queries::billing_subscriptions::{
CreateBillingSubscriptionParams, UpdateBillingSubscriptionParams,
};
pub use queries::contributors::ContributorSelector;
pub use queries::processed_stripe_events::CreateProcessedStripeEventParams;
pub use sea_orm::ConnectOptions;
pub use tables::user::Model as User;
pub use tables::*;

View File

@ -14,6 +14,7 @@ pub mod extensions;
pub mod hosted_projects;
pub mod messages;
pub mod notifications;
pub mod processed_stripe_events;
pub mod projects;
pub mod rate_buckets;
pub mod rooms;

View File

@ -4,14 +4,12 @@ use super::*;
pub struct CreateBillingCustomerParams {
pub user_id: UserId,
pub stripe_customer_id: String,
pub last_stripe_event_id: Option<String>,
}
#[derive(Debug, Default)]
pub struct UpdateBillingCustomerParams {
pub user_id: ActiveValue<UserId>,
pub stripe_customer_id: ActiveValue<String>,
pub last_stripe_event_id: ActiveValue<Option<String>>,
}
impl Database {
@ -45,7 +43,6 @@ impl Database {
id: ActiveValue::set(id),
user_id: params.user_id.clone(),
stripe_customer_id: params.stripe_customer_id.clone(),
last_stripe_event_id: params.last_stripe_event_id.clone(),
..Default::default()
})
.exec(&*tx)

View File

@ -1,5 +1,3 @@
use sea_orm::IntoActiveValue;
use crate::db::billing_subscription::StripeSubscriptionStatus;
use super::*;
@ -9,7 +7,6 @@ pub struct CreateBillingSubscriptionParams {
pub billing_customer_id: BillingCustomerId,
pub stripe_subscription_id: String,
pub stripe_subscription_status: StripeSubscriptionStatus,
pub last_stripe_event_id: Option<String>,
}
#[derive(Debug, Default)]
@ -17,7 +14,6 @@ pub struct UpdateBillingSubscriptionParams {
pub billing_customer_id: ActiveValue<BillingCustomerId>,
pub stripe_subscription_id: ActiveValue<String>,
pub stripe_subscription_status: ActiveValue<StripeSubscriptionStatus>,
pub last_stripe_event_id: ActiveValue<Option<String>>,
}
impl Database {
@ -31,7 +27,6 @@ impl Database {
billing_customer_id: ActiveValue::set(params.billing_customer_id),
stripe_subscription_id: ActiveValue::set(params.stripe_subscription_id.clone()),
stripe_subscription_status: ActiveValue::set(params.stripe_subscription_status),
last_stripe_event_id: params.last_stripe_event_id.clone().into_active_value(),
..Default::default()
})
.exec_without_returning(&*tx)
@ -54,7 +49,6 @@ impl Database {
billing_customer_id: params.billing_customer_id.clone(),
stripe_subscription_id: params.stripe_subscription_id.clone(),
stripe_subscription_status: params.stripe_subscription_status.clone(),
last_stripe_event_id: params.last_stripe_event_id.clone(),
..Default::default()
})
.exec(&*tx)

View File

@ -0,0 +1,69 @@
use super::*;
#[derive(Debug)]
pub struct CreateProcessedStripeEventParams {
pub stripe_event_id: String,
pub stripe_event_type: String,
pub stripe_event_created_timestamp: i64,
}
impl Database {
/// Creates a new processed Stripe event.
pub async fn create_processed_stripe_event(
&self,
params: &CreateProcessedStripeEventParams,
) -> Result<()> {
self.transaction(|tx| async move {
processed_stripe_event::Entity::insert(processed_stripe_event::ActiveModel {
stripe_event_id: ActiveValue::set(params.stripe_event_id.clone()),
stripe_event_type: ActiveValue::set(params.stripe_event_type.clone()),
stripe_event_created_timestamp: ActiveValue::set(
params.stripe_event_created_timestamp,
),
..Default::default()
})
.exec_without_returning(&*tx)
.await?;
Ok(())
})
.await
}
/// Returns the processed Stripe event with the specified event ID.
pub async fn get_processed_stripe_event_by_event_id(
&self,
event_id: &str,
) -> Result<Option<processed_stripe_event::Model>> {
self.transaction(|tx| async move {
Ok(processed_stripe_event::Entity::find_by_id(event_id)
.one(&*tx)
.await?)
})
.await
}
/// Returns the processed Stripe events with the specified event IDs.
pub async fn get_processed_stripe_events_by_event_ids(
&self,
event_ids: &[&str],
) -> Result<Vec<processed_stripe_event::Model>> {
self.transaction(|tx| async move {
Ok(processed_stripe_event::Entity::find()
.filter(
processed_stripe_event::Column::StripeEventId.is_in(event_ids.iter().copied()),
)
.all(&*tx)
.await?)
})
.await
}
/// Returns whether the Stripe event with the specified ID has already been processed.
pub async fn already_processed_stripe_event(&self, event_id: &str) -> Result<bool> {
Ok(self
.get_processed_stripe_event_by_event_id(event_id)
.await?
.is_some())
}
}

View File

@ -25,6 +25,7 @@ pub mod notification;
pub mod notification_kind;
pub mod observed_buffer_edits;
pub mod observed_channel_messages;
pub mod processed_stripe_event;
pub mod project;
pub mod project_collaborator;
pub mod rate_buckets;

View File

@ -9,7 +9,6 @@ pub struct Model {
pub id: BillingCustomerId,
pub user_id: UserId,
pub stripe_customer_id: String,
pub last_stripe_event_id: Option<String>,
pub created_at: DateTime,
}

View File

@ -10,7 +10,6 @@ pub struct Model {
pub billing_customer_id: BillingCustomerId,
pub stripe_subscription_id: String,
pub stripe_subscription_status: StripeSubscriptionStatus,
pub last_stripe_event_id: Option<String>,
pub created_at: DateTime,
}

View File

@ -0,0 +1,16 @@
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "processed_stripe_events")]
pub struct Model {
#[sea_orm(primary_key)]
pub stripe_event_id: String,
pub stripe_event_type: String,
pub stripe_event_created_timestamp: i64,
pub processed_at: DateTime,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View File

@ -9,6 +9,7 @@ mod embedding_tests;
mod extension_tests;
mod feature_flag_tests;
mod message_tests;
mod processed_stripe_event_tests;
use super::*;
use gpui::BackgroundExecutor;

View File

@ -29,7 +29,6 @@ async fn test_get_active_billing_subscriptions(db: &Arc<Database>) {
.create_billing_customer(&CreateBillingCustomerParams {
user_id,
stripe_customer_id: "cus_active_user".into(),
last_stripe_event_id: None,
})
.await
.unwrap();
@ -39,7 +38,6 @@ async fn test_get_active_billing_subscriptions(db: &Arc<Database>) {
billing_customer_id: customer.id,
stripe_subscription_id: "sub_active_user".into(),
stripe_subscription_status: StripeSubscriptionStatus::Active,
last_stripe_event_id: None,
})
.await
.unwrap();
@ -65,7 +63,6 @@ async fn test_get_active_billing_subscriptions(db: &Arc<Database>) {
.create_billing_customer(&CreateBillingCustomerParams {
user_id,
stripe_customer_id: "cus_past_due_user".into(),
last_stripe_event_id: None,
})
.await
.unwrap();
@ -75,7 +72,6 @@ async fn test_get_active_billing_subscriptions(db: &Arc<Database>) {
billing_customer_id: customer.id,
stripe_subscription_id: "sub_past_due_user".into(),
stripe_subscription_status: StripeSubscriptionStatus::PastDue,
last_stripe_event_id: None,
})
.await
.unwrap();

View File

@ -0,0 +1,40 @@
use std::sync::Arc;
use crate::test_both_dbs;
use super::{CreateProcessedStripeEventParams, Database};
test_both_dbs!(
test_already_processed_stripe_event,
test_already_processed_stripe_event_postgres,
test_already_processed_stripe_event_sqlite
);
async fn test_already_processed_stripe_event(db: &Arc<Database>) {
let unprocessed_event_id = "evt_1PiJOuRxOf7d5PNaw2zzWiyO".to_string();
let processed_event_id = "evt_1PiIfMRxOf7d5PNakHrAUe8P".to_string();
db.create_processed_stripe_event(&CreateProcessedStripeEventParams {
stripe_event_id: processed_event_id.clone(),
stripe_event_type: "customer.created".into(),
stripe_event_created_timestamp: 1722355968,
})
.await
.unwrap();
assert_eq!(
db.already_processed_stripe_event(&processed_event_id)
.await
.unwrap(),
true,
"Expected {processed_event_id} to already be processed"
);
assert_eq!(
db.already_processed_stripe_event(&unprocessed_event_id)
.await
.unwrap(),
false,
"Expected {unprocessed_event_id} to be unprocessed"
);
}

View File

@ -6,6 +6,8 @@ extend-exclude = [
# File suffixes aren't typos
"assets/icons/file_icons/file_types.json",
"crates/extensions_ui/src/extension_suggest.rs",
# Stripe IDs are flagged as typos.
"crates/collab/src/db/tests/processed_stripe_event_tests.rs",
# Not our typos
"crates/live_kit_server/",
# Vim makes heavy use of partial typing tables