From ec58935d9b7eeb757022ea98cf554a0314e21799 Mon Sep 17 00:00:00 2001 From: Stan Girard Date: Thu, 13 Jun 2024 10:14:12 +0200 Subject: [PATCH] feat: Add premium user check in celery task (#2668) "This pull request adds a new celery task called `check_if_is_premium_user` that checks if a user is a premium user based on their subscription status. The task retrieves the list of active subscriptions and the list of customers from the Supabase database. It then matches the subscriptions with the customers and updates the user settings with the corresponding premium features if a match is found. If a user is not found or their subscription is expired, the user settings are deleted. This task will run periodically to keep the user settings up to date with the subscription status. --------- Co-authored-by: Stan Girard --- backend/celery_worker.py | 132 +++++++++++---- .../models/databases/supabase/user_usage.py | 150 +----------------- 2 files changed, 106 insertions(+), 176 deletions(-) diff --git a/backend/celery_worker.py b/backend/celery_worker.py index d948c0f36..df81b2e02 100644 --- a/backend/celery_worker.py +++ b/backend/celery_worker.py @@ -1,5 +1,5 @@ import os -from datetime import datetime, timezone +from datetime import datetime from tempfile import NamedTemporaryFile from uuid import UUID @@ -8,9 +8,8 @@ from celery_config import celery from logger import get_logger from middlewares.auth.auth_bearer import AuthBearer from models.files import File -from models.settings import get_supabase_client +from models.settings import get_supabase_client, get_supabase_db from modules.brain.integrations.Notion.Notion_connector import NotionConnector -from modules.brain.repository.integration_brains import IntegrationBrain from modules.brain.service.brain_service import BrainService from modules.brain.service.brain_vector_service import BrainVectorService from modules.notification.dto.inputs import NotificationUpdatableProperties @@ -180,27 +179,106 @@ def ping_telemetry(): maybe_send_telemetry("ping", {"ping": "pong"}) -@celery.task -def process_integration_brain_sync(): - integration = IntegrationBrain() - integrations = integration.get_integration_brain_by_type_integration("notion") +@celery.task(name="check_if_is_premium_user") +def check_if_is_premium_user(): + supabase = get_supabase_db() + supabase_db = supabase.db + # Get the list of subscription active + subscriptions = ( + supabase_db.table("subscriptions") + .select("*") + .filter( + "current_period_end", + "gt", + datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), + ) + .execute() + ).data + logger.debug(f"Subscriptions: {subscriptions}") - time = datetime.now(timezone.utc) # Make `time` timezone-aware - # last_synced is a string that represents a timestampz in the database - # only call process_integration_brain_sync_user_brain if more than 1 day has passed since the last sync - if not integrations: - return - # TODO fix this - # for integration in integrations: - # print(f"last_synced: {integration.last_synced}") - # print(f"Integration Name: {integration.name}") - # last_synced = datetime.strptime( - # integration.last_synced, "%Y-%m-%dT%H:%M:%S.%f%z" - # ) - # if last_synced < time - timedelta(hours=12) and integration.name == "notion": - # process_integration_brain_sync_user_brain.delay( - # brain_id=integration.brain_id, user_id=integration.user_id - # ) + # Get List of all customers + customers = ( + supabase_db.table("customers") + .select("*") + .order("created", desc=True) + .execute() + .data + ) + unique_customers = {} + for customer in customers: + if customer["email"] not in unique_customers: + unique_customers[customer["email"]] = customer + customers = list(unique_customers.values()) + logger.debug(f"Unique Customers with latest created date: {customers}") + + # Matching Products + matching_product_settings = ( + supabase_db.table("product_to_features").select("*").execute() + ).data + logger.debug(f"Matching product settings: {matching_product_settings}") + + # if customer.id in subscriptions.customer then find the user id in the table users where email = customer.email and then update the user_settings with is_premium = True else delete the user_settings + + for customer in customers: + logger.debug(f"Customer: {customer}") + # Find the subscription of the customer + user_id = None + matching_subscription = [ + subscription + for subscription in subscriptions + if subscription["customer"] == customer["id"] + ] + logger.debug(f"Matching subscription: {matching_subscription}") + user_id = ( + supabase_db.table("users") + .select("id") + .filter("email", "eq", customer["email"]) + .execute() + ).data + if len(user_id) > 0: + user_id = user_id[0]["id"] + else: + logger.debug(f"User not found for customer: {customer}") + continue + if len(matching_subscription) > 0: + + # Get the matching product from the subscription + matching_product_settings = [ + product + for product in matching_product_settings + if product["stripe_product_id"] + == matching_subscription[0]["attrs"]["items"]["data"][0]["plan"][ + "product" + ] + ] + # Update the user with the product settings + supabase_db.table("user_settings").update( + { + "max_brains": matching_product_settings[0]["max_brains"], + "max_brain_size": matching_product_settings[0]["max_brain_size"], + "monthly_chat_credit": matching_product_settings[0][ + "monthly_chat_credit" + ], + "api_access": matching_product_settings[0]["api_access"], + "models": matching_product_settings[0]["models"], + "is_premium": True, + } + ).match({"user_id": str(user_id)}).execute() + else: + # check if user_settings is_premium is true then delete the user_settings + user_settings = ( + supabase_db.table("user_settings") + .select("*") + .filter("user_id", "eq", user_id) + .filter("is_premium", "eq", True) + .execute() + ).data + if len(user_settings) > 0: + supabase_db.table("user_settings").delete().match( + {"user_id": user_id} + ).execute() + + return True celery.conf.beat_schedule = { @@ -208,10 +286,6 @@ celery.conf.beat_schedule = { "task": f"{__name__}.remove_onboarding_more_than_x_days_task", "schedule": crontab(minute="0", hour="0"), }, - "process_integration_brain_sync": { - "task": f"{__name__}.process_integration_brain_sync", - "schedule": crontab(minute="*/5", hour="*"), - }, "ping_telemetry": { "task": f"{__name__}.ping_telemetry", "schedule": crontab(minute="*/30", hour="*"), @@ -220,4 +294,8 @@ celery.conf.beat_schedule = { "task": "process_sync_active", "schedule": crontab(minute="*/1", hour="*"), }, + "process_premium_users": { + "task": "check_if_is_premium_user", + "schedule": crontab(minute="*/1", hour="*"), + }, } diff --git a/backend/models/databases/supabase/user_usage.py b/backend/models/databases/supabase/user_usage.py index 9e69038ec..b335107ce 100644 --- a/backend/models/databases/supabase/user_usage.py +++ b/backend/models/databases/supabase/user_usage.py @@ -7,7 +7,7 @@ from models.databases.repository import Repository logger = get_logger(__name__) -#TODO: change the name of this class because another one already exists +# TODO: change the name of this class because another one already exists class UserUsage(Repository): def __init__(self, supabase_client): self.db = supabase_client @@ -28,142 +28,6 @@ class UserUsage(Repository): .execute() ) - def check_subscription_validity(self, customer_id: str) -> bool: - """ - Check if the subscription of the user is still valid - """ - now = datetime.now() - - # Format the datetime object as a string in the appropriate format for your Supabase database - now_str = now.strftime("%Y-%m-%d %H:%M:%S.%f") - subscription_still_valid = ( - self.db.from_("subscriptions") - .select("*") - .filter( - "customer", "eq", customer_id - ) # then check if current_period_end is greater than now with timestamp format - .filter("current_period_end", "gt", now_str) - .execute() - ).data - - if len(subscription_still_valid) > 0: - return True - - def check_user_is_customer(self, user_id: UUID) -> (bool, str): - """ - Check if the user is a customer and return the customer id - """ - user_email_customer = ( - self.db.from_("users") - .select("*") - .filter("id", "eq", str(user_id)) - .execute() - ).data - - if len(user_email_customer) == 0: - return False, None - - matching_customers = ( - self.db.table("customers") - .select("email,id") - .filter("email", "eq", user_email_customer[0]["email"]) - .execute() - ).data - - if len(matching_customers) == 0: - return False, None - - return True, matching_customers[0]["id"] - - def update_customer_settings_with_product_settings( - self, user_id: UUID, customer_id: str - ): - """ - Check if the user is a customer and return the customer id - """ - - matching_products = ( - self.db.table("subscriptions") - .select("attrs") - .filter("customer", "eq", customer_id) - .execute() - ).data - - # Output object - # {"id":"sub_1OUZOgJglvQxkJ1H98TSY9bv","plan":{"id":"price_1NwMsXJglvQxkJ1Hbzs5JkTs","active":true,"amount":1900,"object":"plan","created":1696156081,"product":"prod_OjqZPhbBQwmsB8","currency":"usd","interval":"month","livemode":false,"metadata":{},"nickname":null,"tiers_mode":null,"usage_type":"licensed","amount_decimal":"1900","billing_scheme":"per_unit","interval_count":1,"aggregate_usage":null,"transform_usage":null,"trial_period_days":null},"items":{"url":"/v1/subscription_items?subscription=sub_1OUZOgJglvQxkJ1H98TSY9bv","data":[{"id":"si_PJBm1ciQlpaOA4","plan":{"id":"price_1NwMsXJglvQxkJ1Hbzs5JkTs","active":true,"amount":1900,"object":"plan","created":1696156081,"product":"prod_OjqZPhbBQwmsB8","currency":"usd","interval":"month","livemode":false,"metadata":{},"nickname":null,"tiers_mode":null,"usage_type":"licensed","amount_decimal":"1900","billing_scheme":"per_unit","interval_count":1,"aggregate_usage":null,"transform_usage":null,"trial_period_days":null},"price":{"id":"price_1NwMsXJglvQxkJ1Hbzs5JkTs","type":"recurring","active":true,"object":"price","created":1696156081,"product":"prod_OjqZPhbBQwmsB8","currency":"usd","livemode":false,"metadata":{},"nickname":null,"recurring":{"interval":"month","usage_type":"licensed","interval_count":1,"aggregate_usage":null,"trial_period_days":null},"lookup_key":null,"tiers_mode":null,"unit_amount":1900,"tax_behavior":"unspecified","billing_scheme":"per_unit","custom_unit_amount":null,"transform_quantity":null,"unit_amount_decimal":"1900"},"object":"subscription_item","created":1704307355,"metadata":{},"quantity":1,"tax_rates":[],"subscription":"sub_1OUZOgJglvQxkJ1H98TSY9bv","billing_thresholds":null}],"object":"list","has_more":false,"total_count":1},"object":"subscription","status":"active","created":1704307354,"currency":"usd","customer":"cus_PJBmxGOKfQgYDN","discount":null,"ended_at":null,"livemode":false,"metadata":{},"quantity":1,"schedule":null,"cancel_at":null,"trial_end":null,"start_date":1704307354,"test_clock":null,"application":null,"canceled_at":null,"description":null,"trial_start":null,"on_behalf_of":null,"automatic_tax":{"enabled":true},"transfer_data":null,"days_until_due":null,"default_source":null,"latest_invoice":"in_1OUZOgJglvQxkJ1HysujPh0b","pending_update":null,"trial_settings":{"end_behavior":{"missing_payment_method":"create_invoice"}},"pause_collection":null,"payment_settings":{"payment_method_types":null,"payment_method_options":null,"save_default_payment_method":"off"},"collection_method":"charge_automatically","default_tax_rates":[],"billing_thresholds":null,"current_period_end":1706985754,"billing_cycle_anchor":1704307354,"cancel_at_period_end":false,"cancellation_details":{"reason":null,"comment":null,"feedback":null},"current_period_start":1704307354,"pending_setup_intent":null,"default_payment_method":"pm_1OUZOfJglvQxkJ1HSHU0TTWW","application_fee_percent":null,"pending_invoice_item_interval":null,"next_pending_invoice_item_invoice":null} - - # Now extract the product id from the object - - if len(matching_products) == 0: - logger.info("No matching products found") - return - - product_id = matching_products[0]["attrs"]["items"]["data"][0]["plan"][ - "product" - ] - - # Now fetch the product settings - - matching_product_settings = ( - self.db.table("product_to_features") - .select("*") - .filter("stripe_product_id", "eq", product_id) - .execute() - ).data - - if len(matching_product_settings) == 0: - logger.info("No matching product settings found") - return - - product_settings = matching_product_settings[0] - - # Now update the user settings with the product settings - try: - self.db.table("user_settings").update( - { - "max_brains": product_settings["max_brains"], - "max_brain_size": product_settings["max_brain_size"], - "monthly_chat_credit": product_settings["monthly_chat_credit"], - "api_access": product_settings["api_access"], - "models": product_settings["models"], - } - ).match({"user_id": str(user_id)}).execute() - - except Exception as e: - logger.error(e) - logger.error("Error while updating user settings with product settings") - - def check_if_is_premium_user(self, user_id: UUID): - """ - Check if the user is a premium user - """ - matching_customers = None - try: - user_is_customer, user_customer_id = self.check_user_is_customer(user_id) - - if user_is_customer: - self.db.table("user_settings").update({"is_premium": True}).match( - {"user_id": str(user_id)} - ).execute() - - if user_is_customer and self.check_subscription_validity(user_customer_id): - logger.info("User is a premium user") - self.update_customer_settings_with_product_settings( - user_id, user_customer_id - ) - return True, False - else: - self.db.table("user_settings").update({"is_premium": False}).match( - {"user_id": str(user_id)} - ).execute() - return False, False - - except Exception as e: - logger.info( - "Stripe needs to be configured if you want to have the premium features" - ) - return False, True - def get_user_settings(self, user_id): """ Fetch the user settings from the database @@ -189,18 +53,6 @@ class UserUsage(Repository): user_settings = user_settings_response[0] - check_is_premium, error = self.check_if_is_premium_user(user_id) - - if check_is_premium and not error: - # get the possibly updated user settings - user_settings_response = ( - self.db.from_("user_settings") - .select("*") - .filter("user_id", "eq", str(user_id)) - .execute() - ).data - return user_settings_response[0] - return user_settings def get_model_settings(self):