feat: Improve efficiency of syncing stripe (#2719)

Fixes #2718

# Description

Please include a summary of the changes and the related issue. Please
also include relevant motivation and context.

## Checklist before requesting a review

Please delete options that are not relevant.

- [ ] My code follows the style guidelines of this project
- [ ] I have performed a self-review of my code
- [ ] I have commented hard-to-understand areas
- [ ] I have ideally added tests that prove my fix is effective or that
my feature works
- [ ] New and existing unit tests pass locally with my changes
- [ ] Any dependent changes have been merged

## Screenshots (if appropriate):

---------

Co-authored-by: Stan Girard <stan@quivr.app>
This commit is contained in:
Stan Girard 2024-06-24 22:17:48 +02:00 committed by GitHub
parent 18d594493a
commit 036201d844
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 99 additions and 109 deletions

View File

@ -1,5 +1,5 @@
import os
from datetime import datetime
from datetime import datetime, timedelta
from tempfile import NamedTemporaryFile
from uuid import UUID
@ -184,130 +184,119 @@ def ping_telemetry():
def check_if_is_premium_user():
supabase = get_supabase_db()
supabase_db = supabase.db
# Get the list of subscription active
paris_tz = timezone("Europe/Paris")
paris_time = datetime.now(paris_tz).strftime("%Y-%m-%d %H:%M:%S.%f")
logger.debug(f"Paris time: {paris_time}")
current_time = datetime.now(paris_tz)
current_time_str = current_time.strftime("%Y-%m-%d %H:%M:%S.%f")
logger.debug(f"Current time: {current_time_str}")
# Define the memoization period (e.g., 1 hour)
memoization_period = timedelta(hours=1)
memoization_cutoff = current_time - memoization_period
# Fetch all necessary data in bulk
subscriptions = (
supabase_db.table("subscriptions")
.select("*")
.filter(
"current_period_end",
"gt",
paris_time,
)
.filter("current_period_end", "gt", current_time_str)
.execute()
).data
# Only get the subscriptions with status active
logger.info(f"Subscriptions: {subscriptions}")
if len(subscriptions) > 0:
subscriptions = [
subscription
for subscription in subscriptions
if subscription["attrs"]["status"] == "active"
]
else:
logger.info(f"No active subscriptions found")
# Get List of all customers
customers = (
supabase_db.table("customers")
.select("*")
.order("created", desc=True)
.execute()
.data
)
unique_customers = {}
for customer in customers:
if customer["email"] not in unique_customers:
unique_customers[customer["email"]] = customer
customers = list(unique_customers.values())
customers = (supabase_db.table("customers").select("*").execute()).data
# Matching Products
matching_product_settings = (
customer_emails = [customer["email"] for customer in customers]
# Split customer emails into batches of 50
email_batches = [
customer_emails[i : i + 20] for i in range(0, len(customer_emails), 20)
]
users = []
for email_batch in email_batches:
batch_users = (
supabase_db.table("users")
.select("id, email")
.in_("email", email_batch)
.execute()
).data
users.extend(batch_users)
product_features = (
supabase_db.table("product_to_features").select("*").execute()
).data
# if customer.id in subscriptions.customer then find the user id in the table users where email = customer.email and then update the user_settings with is_premium = True else delete the user_settings
user_settings = (supabase_db.table("user_settings").select("*").execute()).data
for customer in customers:
logger.debug(f"Customer: {customer['email']}")
# Find the subscription of the customer
user_id = None
matching_subscription = [
subscription
for subscription in subscriptions
if subscription["customer"] == customer["id"]
]
user_id = (
supabase_db.table("users")
.select("id")
.filter("email", "eq", customer["email"])
.execute()
).data
if len(user_id) > 0:
user_id = user_id[0]["id"]
else:
logger.debug(f"User not found for customer: {customer['email']}")
# Create lookup dictionaries for faster access
user_dict = {user["email"]: user["id"] for user in users}
customer_dict = {customer["id"]: customer for customer in customers}
product_dict = {
product["stripe_product_id"]: product for product in product_features
}
settings_dict = {setting["user_id"]: setting for setting in user_settings}
# Process subscriptions and update user settings
premium_user_ids = set()
settings_to_upsert = {}
for sub in subscriptions:
if sub["attrs"]["status"] != "active":
continue
if len(matching_subscription) > 0:
logger.debug(
f"Updating subscription for user {user_id} with subscription {matching_subscription[0]['id']}"
)
# Get the matching product from the subscription
matching_product_settings = [
product
for product in matching_product_settings
if product["stripe_product_id"]
== matching_subscription[0]["attrs"]["items"]["data"][0]["plan"][
"product"
]
]
if len(matching_product_settings) > 0:
# Update the user with the product settings
data, _ = (
supabase_db.table("user_settings")
.upsert(
{
"user_id": str(user_id),
"max_brains": matching_product_settings[0]["max_brains"],
"max_brain_size": matching_product_settings[0][
"max_brain_size"
],
"monthly_chat_credit": matching_product_settings[0][
"monthly_chat_credit"
],
"api_access": matching_product_settings[0]["api_access"],
"models": matching_product_settings[0]["models"],
"is_premium": True,
}
)
.execute()
)
assert (
len(data[1]) == 1
), "fatal, upsert user_settings didn't update a single record"
else:
logger.info(
f"No matching product settings found for customer: {customer['email']} with subscription {matching_subscription[0]['id']}"
)
else:
logger.debug(f"No subscription found for customer: {customer['email']}")
# check if user_settings is_premium is true then delete the user_settings
user_settings = (
supabase_db.table("user_settings")
.select("*")
.filter("user_id", "eq", user_id)
.filter("is_premium", "eq", True)
.execute()
).data
if len(user_settings) > 0:
supabase_db.table("user_settings").delete().match(
{"user_id": user_id}
).execute()
customer = customer_dict.get(sub["customer"])
if not customer:
continue
user_id = user_dict.get(customer["email"])
if not user_id:
continue
current_settings = settings_dict.get(user_id, {})
last_check = current_settings.get("last_stripe_check")
# Skip if the user was checked recently
if last_check and datetime.fromisoformat(last_check) > memoization_cutoff:
premium_user_ids.add(user_id)
continue
user_id = str(user_id) # Ensure user_id is a string
premium_user_ids.add(user_id)
product_id = sub["attrs"]["items"]["data"][0]["plan"]["product"]
product = product_dict.get(product_id)
if not product:
logger.warning(f"No matching product found for subscription: {sub['id']}")
continue
settings_to_upsert[user_id] = {
"user_id": user_id,
"max_brains": product["max_brains"],
"max_brain_size": product["max_brain_size"],
"monthly_chat_credit": product["monthly_chat_credit"],
"api_access": product["api_access"],
"models": product["models"],
"is_premium": True,
"last_stripe_check": current_time_str,
}
# Bulk upsert premium user settings in batches of 10
settings_list = list(settings_to_upsert.values())
for i in range(0, len(settings_list), 10):
batch = settings_list[i : i + 10]
supabase_db.table("user_settings").upsert(batch).execute()
# Delete settings for non-premium users in batches of 10
settings_to_delete = [
setting["user_id"]
for setting in user_settings
if setting["user_id"] not in premium_user_ids and setting.get("is_premium")
]
for i in range(0, len(settings_to_delete), 10):
batch = settings_to_delete[i : i + 10]
supabase_db.table("user_settings").delete().in_("user_id", batch).execute()
logger.info(
f"Updated {len(settings_to_upsert)} premium users, deleted settings for {len(settings_to_delete)} non-premium users"
)
return True

View File

@ -0,0 +1 @@
alter table "public"."user_settings" add column "last_stripe_check" timestamp with time zone;