quivr/backend/repository/brain_subscription/subscription_invitation_service.py

100 lines
3.6 KiB
Python
Raw Normal View History

from logger import get_logger
from models import BrainSubscription, get_supabase_client
from repository.brain import get_brain_for_user
from repository.user.get_user_id_by_user_email import get_user_id_by_user_email
logger = get_logger(__name__)
class SubscriptionInvitationService:
def __init__(self):
self.supabase_client = get_supabase_client()
def create_subscription_invitation(self, brain_subscription: BrainSubscription):
logger.info("Creating subscription invitation")
response = (
self.supabase_client.table("brain_subscription_invitations")
.insert(
{
"brain_id": str(brain_subscription.brain_id),
"email": brain_subscription.email,
"rights": brain_subscription.rights,
}
)
.execute()
)
return response.data
def update_subscription_invitation(self, brain_subscription: BrainSubscription):
logger.info("Updating subscription invitation")
response = (
self.supabase_client.table("brain_subscription_invitations")
.update({"rights": brain_subscription.rights})
.eq("brain_id", str(brain_subscription.brain_id))
.eq("email", brain_subscription.email)
.execute()
)
return response.data
def create_or_update_subscription_invitation(
self,
brain_subscription: BrainSubscription,
) -> bool:
"""
Creates a subscription invitation if it does not exist, otherwise updates it.
Returns True if the invitation was created or updated and False if user already has access.
"""
response = (
self.supabase_client.table("brain_subscription_invitations")
.select("*")
.eq("brain_id", str(brain_subscription.brain_id))
.eq("email", brain_subscription.email)
.execute()
)
if response.data:
self.update_subscription_invitation(brain_subscription)
return True
else:
user_id = get_user_id_by_user_email(brain_subscription.email)
brain_id = brain_subscription.brain_id
brain_user = get_brain_for_user(user_id, brain_id)
if brain_user is None:
self.create_subscription_invitation(brain_subscription)
return True
return False
def fetch_invitation(self, subscription: BrainSubscription):
logger.info("Fetching subscription invitation")
response = (
self.supabase_client.table("brain_subscription_invitations")
.select("*")
.eq("brain_id", str(subscription.brain_id))
.eq("email", subscription.email)
.execute()
)
if response.data:
return response.data[0] # return the first matching invitation
else:
return None
def remove_invitation(self, subscription: BrainSubscription):
logger.info(
f"Removing subscription invitation for email {subscription.email} and brain {subscription.brain_id}"
)
response = (
self.supabase_client.table("brain_subscription_invitations")
.delete()
.eq("brain_id", str(subscription.brain_id))
.eq("email", subscription.email)
.execute()
)
logger.info(
f"Removed subscription invitation for email {subscription.email} and brain {subscription.brain_id}"
)
logger.info(response)
return response.data