quivr/backend/modules/assistant/ito/ito.py
Stan Girard 1931848262
feat(assistants): Add user usage update and pricing calculation to ITO assistant (#2433)
This pull request adds functionality to update user usage and calculate
pricing in the ITO assistant. It includes a new method
`increase_usage_user()` that raises an error if the user has consumed
all of their credits, and a new method `calculate_pricing()` that
returns a fixed pricing value of 20.

<!--
ELLIPSIS_HIDDEN
-->
----

| <a href="https://ellipsis.dev" target="_blank"><img
src="https://avatars.githubusercontent.com/u/80834858?s=400&u=31e596315b0d8f7465b3ee670f25cea677299c96&v=4"
alt="Ellipsis" width="30px" height="30px"/></a> | 🚀 This PR
description was created by [Ellipsis](https://www.ellipsis.dev) for
commit ee5fdf70f6. |
|--------|--------|

### Summary:
This PR adds functionality to update user usage and calculate pricing in
the ITO assistant, with new methods in the `ITO` class and an update to
where the `increase_usage_user()` method is called.

**Key points**:
- Added `increase_usage_user()` and `calculate_pricing()` methods to
`ITO` class in `/backend/modules/assistant/ito/ito.py`
- `increase_usage_user()` updates user usage and raises an error if all
credits are consumed
- `calculate_pricing()` returns a fixed pricing value of 20
- `increase_usage_user()` is now called in the `ITO` class constructor


----
Generated with ❤️ by [ellipsis.dev](https://www.ellipsis.dev)

<!--
ELLIPSIS_HIDDEN
-->
2024-04-16 09:14:38 -07:00

135 lines
4.3 KiB
Python

import random
from abc import abstractmethod
from io import BytesIO
from tempfile import NamedTemporaryFile
from typing import List, Optional
from fastapi import UploadFile
from logger import get_logger
from models.user_usage import UserUsage
from modules.assistant.dto.inputs import InputAssistant
from modules.chat.controller.chat.utils import update_user_usage
from modules.contact_support.controller.settings import ContactsSettings
from modules.upload.controller.upload_routes import upload_file
from modules.user.entity.user_identity import UserIdentity
from packages.emails.send_email import send_email
from pydantic import BaseModel
logger = get_logger(__name__)
class ITO(BaseModel):
input: InputAssistant
files: List[UploadFile]
current_user: UserIdentity
user_usage: Optional[UserUsage] = None
user_settings: Optional[dict] = None
def __init__(
self,
input: InputAssistant,
files: List[UploadFile] = None,
current_user: UserIdentity = None,
**kwargs,
):
super().__init__(
input=input,
files=files,
current_user=current_user,
**kwargs,
)
self.user_usage = UserUsage(
id=current_user.id,
email=current_user.email,
)
self.user_settings = self.user_usage.get_user_settings()
self.increase_usage_user()
def increase_usage_user(self):
# Raises an error if the user has consumed all of of his credits
update_user_usage(
usage=self.user_usage,
user_settings=self.user_settings,
cost=self.calculate_pricing(),
)
def calculate_pricing(self):
return 20
@abstractmethod
async def process_assistant(self):
pass
async def send_output_by_email(
self, file: UploadFile, name: str, custom_message: str = None
):
settings = ContactsSettings()
file = await self.uploadfile_to_file(file)
with open(file.name, "rb") as f:
mail_from = settings.resend_contact_sales_from
mail_to = self.current_user.email
body = f"""
<p>{custom_message}</p>
"""
params = {
"from": mail_from,
"to": mail_to,
"subject": "Quivr Ingestion Processed",
"reply_to": "no-reply@quivr.app",
"html": body,
"attachments": [{"filename": name, "content": list(f.read())}],
}
logger.info(f"Sending email to {mail_to} with file {name}")
send_email(params)
async def uploadfile_to_file(self, uploadFile: UploadFile):
# Transform the UploadFile object to a file object with same name and content
tmp_file = NamedTemporaryFile(delete=False)
tmp_file.write(uploadFile.file.read())
tmp_file.flush() # Make sure all data is written to disk
return tmp_file
async def create_and_upload_processed_file(
self, processed_content: str, original_filename: str, file_description: str
) -> dict:
"""Handles creation and uploading of the processed file."""
content_io = BytesIO(processed_content.encode("utf-8"))
content_io.seek(0)
new_filename = (
original_filename.split(".")[0]
+ "_"
+ file_description.lower().replace(" ", "_")
+ "_"
+ str(random.randint(1000, 9999))
+ ".txt"
)
file_to_upload = UploadFile(
filename=new_filename,
file=content_io,
headers={"content-type": "text/plain"},
)
if self.input.outputs.email.activated:
await self.send_output_by_email(
file_to_upload,
new_filename,
f"{file_description} of {original_filename}",
)
# Reset to start of file before upload
file_to_upload.file.seek(0)
if self.input.outputs.brain.activated:
await upload_file(
uploadFile=file_to_upload,
brain_id=self.input.outputs.brain.value,
current_user=self.current_user,
chat_id=None,
)
return {"message": f"{file_description} generated successfully"}