From 55834365b2c19f163d78df7bc871b3dce3010eaf Mon Sep 17 00:00:00 2001 From: Stan Girard Date: Tue, 7 May 2024 18:12:31 +0200 Subject: [PATCH] feat(celery): moved assistant summary to celery (#2557) This pull request moves the assistant summary functionality to the celery module for better organization and separation of concerns. --- backend/celery_config.py | 3 + backend/modules/assistant/ito/ito.py | 77 ++++++---- backend/modules/assistant/ito/summary.py | 182 +++++++++++++---------- backend/modules/assistant/ito/tasks.py | 42 ++++++ 4 files changed, 194 insertions(+), 110 deletions(-) create mode 100644 backend/modules/assistant/ito/tasks.py diff --git a/backend/celery_config.py b/backend/celery_config.py index 971594de4..561be493c 100644 --- a/backend/celery_config.py +++ b/backend/celery_config.py @@ -8,6 +8,7 @@ CELERY_BROKER_QUEUE_NAME = os.getenv("CELERY_BROKER_QUEUE_NAME", "quivr") celery = Celery(__name__) + if CELERY_BROKER_URL.startswith("sqs"): broker_transport_options = { CELERY_BROKER_QUEUE_NAME: { @@ -36,3 +37,5 @@ elif CELERY_BROKER_URL.startswith("redis"): ) else: raise ValueError(f"Unsupported broker URL: {CELERY_BROKER_URL}") + +celery.autodiscover_tasks(["modules.assistant.ito"]) diff --git a/backend/modules/assistant/ito/ito.py b/backend/modules/assistant/ito/ito.py index 0a92fbb5f..877a64737 100644 --- a/backend/modules/assistant/ito/ito.py +++ b/backend/modules/assistant/ito/ito.py @@ -9,13 +9,13 @@ from typing import List, Optional from fastapi import UploadFile from logger import get_logger -from modules.user.service.user_usage import UserUsage from modules.assistant.dto.inputs import InputAssistant from modules.assistant.ito.utils.pdf_generator import PDFGenerator, PDFModel from modules.chat.controller.chat.utils import update_user_usage from modules.contact_support.controller.settings import ContactsSettings from modules.upload.controller.upload_routes import upload_file from modules.user.entity.user_identity import UserIdentity +from modules.user.service.user_usage import UserUsage from packages.emails.send_email import send_email from pydantic import BaseModel from unidecode import unidecode @@ -62,31 +62,36 @@ class ITO(BaseModel): def calculate_pricing(self): return 20 - def generate_pdf(self, filename: str, title: str, content: str): - pdf_model = PDFModel(title=title, content=content) - pdf = PDFGenerator(pdf_model) - pdf.print_pdf() - pdf.output(filename, "F") - @abstractmethod async def process_assistant(self): pass + +async def uploadfile_to_file(uploadFile: UploadFile): + # Transform the UploadFile object to a file object with same name and content + tmp_file = NamedTemporaryFile(delete=False) + tmp_file.write(uploadFile.file.read()) + tmp_file.flush() # Make sure all data is written to disk + return tmp_file + + +class OutputHandler(BaseModel): async def send_output_by_email( self, - file: UploadFile, filename: str, + file: UploadFile, task_name: str, custom_message: str, brain_id: str = None, + user_email: str = None, ): settings = ContactsSettings() - file = await self.uploadfile_to_file(file) + file = await uploadfile_to_file(file) domain_quivr = os.getenv("QUIVR_DOMAIN", "https://chat.quivr.app/") with open(file.name, "rb") as f: mail_from = settings.resend_contact_sales_from - mail_to = self.current_user.email + mail_to = user_email body = f"""
Quivr Logo @@ -116,20 +121,34 @@ class ITO(BaseModel): "subject": "Quivr Ingestion Processed", "reply_to": "no-reply@quivr.app", "html": body, - "attachments": [{"filename": filename, "content": list(f.read())}], + "attachments": [ + { + "filename": filename, + "content": list(f.read()), + "type": "application/pdf", + } + ], } logger.info(f"Sending email to {mail_to} with file {filename}") send_email(params) - async def uploadfile_to_file(self, uploadFile: UploadFile): - # Transform the UploadFile object to a file object with same name and content - tmp_file = NamedTemporaryFile(delete=False) - tmp_file.write(uploadFile.file.read()) - tmp_file.flush() # Make sure all data is written to disk - return tmp_file + def generate_pdf(self, filename: str, title: str, content: str): + pdf_model = PDFModel(title=title, content=content) + pdf = PDFGenerator(pdf_model) + pdf.print_pdf() + pdf.output(filename, "F") async def create_and_upload_processed_file( - self, processed_content: str, original_filename: str, file_description: str + self, + processed_content: str, + original_filename: str, + file_description: str, + content: str, + task_name: str, + custom_message: str, + brain_id: str = None, + email_activated: bool = False, + current_user: UserIdentity = None, ) -> dict: """Handles creation and uploading of the processed file.""" # remove any special characters from the filename that aren't http safe @@ -164,29 +183,25 @@ class ITO(BaseModel): headers={"content-type": "application/pdf"}, ) - if self.input.outputs.email.activated: + logger.info(f"current_user: {current_user}") + if email_activated: await self.send_output_by_email( - file_to_upload, new_filename, + file_to_upload, "Summary", f"{file_description} of {original_filename}", - brain_id=( - self.input.outputs.brain.value - if ( - self.input.outputs.brain.activated - and self.input.outputs.brain.value - ) - else None - ), + brain_id=brain_id, + user_email=current_user["email"], ) # Reset to start of file before upload file_to_upload.file.seek(0) - if self.input.outputs.brain.activated: + UserIdentity(**current_user) + if brain_id: await upload_file( uploadFile=file_to_upload, - brain_id=self.input.outputs.brain.value, - current_user=self.current_user, + brain_id=brain_id, + current_user=current_user, chat_id=None, ) diff --git a/backend/modules/assistant/ito/summary.py b/backend/modules/assistant/ito/summary.py index c97740313..cbd363339 100644 --- a/backend/modules/assistant/ito/summary.py +++ b/backend/modules/assistant/ito/summary.py @@ -1,6 +1,7 @@ import tempfile from typing import List +from celery_config import celery from fastapi import UploadFile from langchain.chains import ( MapReduceDocumentsChain, @@ -23,9 +24,12 @@ from modules.assistant.dto.outputs import ( Outputs, ) from modules.assistant.ito.ito import ITO +from modules.notification.dto.inputs import CreateNotification +from modules.notification.service.notification_service import NotificationService from modules.user.entity.user_identity import UserIdentity logger = get_logger(__name__) +notification_service = NotificationService() class SummaryAssistant(ITO): @@ -69,97 +73,117 @@ class SummaryAssistant(ITO): return True async def process_assistant(self): - try: - self.increase_usage_user() + notification_service.add_notification( + CreateNotification( + user_id=self.current_user.id, + status="info", + title=f"Creating Summary for {self.files[0].filename}", + ) + ) + # Create a temporary file with the uploaded file as a temporary file and then pass it to the loader + tmp_file = tempfile.NamedTemporaryFile(delete=False) + + # Write the file to the temporary file + tmp_file.write(self.files[0].file.read()) + + # Now pass the path of the temporary file to the loader + + loader = UnstructuredPDFLoader(tmp_file.name) + + tmp_file.close() + + data = loader.load() + + text_splitter = CharacterTextSplitter.from_tiktoken_encoder( + chunk_size=1000, chunk_overlap=100 + ) + split_docs = text_splitter.split_documents(data) + logger.info(f"Split {len(split_docs)} documents") + # Jsonify the split docs + split_docs = [doc.to_json() for doc in split_docs] + ## Turn this into a task + brain_id = ( + self.input.outputs.brain.id + if self.input.outputs.brain.activated + else None + ) + email_activated = self.input.outputs.email.activated + celery.send_task( + name="task_summary", + args=( + split_docs, + self.files[0].filename, + brain_id, + email_activated, + self.current_user.model_dump(mode="json"), + ), + ) except Exception as e: - logger.error(f"Error increasing usage: {e}") - return {"error": str(e)} + logger.error(f"Error processing summary: {e}") - # Create a temporary file with the uploaded file as a temporary file and then pass it to the loader - tmp_file = tempfile.NamedTemporaryFile(delete=False) - # Write the file to the temporary file - tmp_file.write(self.files[0].file.read()) +def map_reduce_chain(): + llm = ChatLiteLLM(model="gpt-3.5-turbo", max_tokens=2000) - # Now pass the path of the temporary file to the loader + map_template = """The following is a document that has been divided into multiple sections: + {docs} + + Please carefully analyze each section and identify the following: - loader = UnstructuredPDFLoader(tmp_file.name) + 1. Main Themes: What are the overarching ideas or topics in this section? + 2. Key Points: What are the most important facts, arguments, or ideas presented in this section? + 3. Important Information: Are there any crucial details that stand out? This could include data, quotes, specific events, entity, or other relevant information. + 4. People: Who are the key individuals mentioned in this section? What roles do they play? + 5. Reasoning: What logic or arguments are used to support the key points? + 6. Chapters: If the document is divided into chapters, what is the main focus of each chapter? - tmp_file.close() + Remember to consider the language and context of the document. This will help in understanding the nuances and subtleties of the text.""" + map_prompt = PromptTemplate.from_template(map_template) + map_chain = LLMChain(llm=llm, prompt=map_prompt) - data = loader.load() + # Reduce + reduce_template = """The following is a set of summaries for parts of the document : + {docs} + Take these and distill it into a final, consolidated summary of the document. Make sure to include the main themes, key points, and important information such as data, quotes,people and specific events. + Use markdown such as bold, italics, underlined. For example, **bold**, *italics*, and _underlined_ to highlight key points. + Please provide the final summary with sections using bold headers. + Sections should always be Summary and Key Points, but feel free to add more sections as needed. + Always use bold text for the sections headers. + Keep the same language as the documents. + Answer:""" + reduce_prompt = PromptTemplate.from_template(reduce_template) - llm = ChatLiteLLM(model="gpt-3.5-turbo", max_tokens=2000) + # Run chain + reduce_chain = LLMChain(llm=llm, prompt=reduce_prompt) - map_template = """The following is a document that has been divided into multiple sections: - {docs} - - Please carefully analyze each section and identify the following: + # Takes a list of documents, combines them into a single string, and passes this to an LLMChain + combine_documents_chain = StuffDocumentsChain( + llm_chain=reduce_chain, document_variable_name="docs" + ) - 1. Main Themes: What are the overarching ideas or topics in this section? - 2. Key Points: What are the most important facts, arguments, or ideas presented in this section? - 3. Important Information: Are there any crucial details that stand out? This could include data, quotes, specific events, entity, or other relevant information. - 4. People: Who are the key individuals mentioned in this section? What roles do they play? - 5. Reasoning: What logic or arguments are used to support the key points? - 6. Chapters: If the document is divided into chapters, what is the main focus of each chapter? + # Combines and iteratively reduces the mapped documents + reduce_documents_chain = ReduceDocumentsChain( + # This is final chain that is called. + combine_documents_chain=combine_documents_chain, + # If documents exceed context for `StuffDocumentsChain` + collapse_documents_chain=combine_documents_chain, + # The maximum number of tokens to group documents into. + token_max=4000, + ) - Remember to consider the language and context of the document. This will help in understanding the nuances and subtleties of the text.""" - map_prompt = PromptTemplate.from_template(map_template) - map_chain = LLMChain(llm=llm, prompt=map_prompt) - - # Reduce - reduce_template = """The following is a set of summaries for parts of the document: - {docs} - Take these and distill it into a final, consolidated summary of the document. Make sure to include the main themes, key points, and important information such as data, quotes,people and specific events. - Use markdown such as bold, italics, underlined. For example, **bold**, *italics*, and _underlined_ to highlight key points. - Please provide the final summary with sections using bold headers. - Sections should always be Summary and Key Points, but feel free to add more sections as needed. - Always use bold text for the sections headers. - Keep the same language as the documents. - Answer:""" - reduce_prompt = PromptTemplate.from_template(reduce_template) - - # Run chain - reduce_chain = LLMChain(llm=llm, prompt=reduce_prompt) - - # Takes a list of documents, combines them into a single string, and passes this to an LLMChain - combine_documents_chain = StuffDocumentsChain( - llm_chain=reduce_chain, document_variable_name="docs" - ) - - # Combines and iteratively reduces the mapped documents - reduce_documents_chain = ReduceDocumentsChain( - # This is final chain that is called. - combine_documents_chain=combine_documents_chain, - # If documents exceed context for `StuffDocumentsChain` - collapse_documents_chain=combine_documents_chain, - # The maximum number of tokens to group documents into. - token_max=4000, - ) - - # Combining documents by mapping a chain over them, then combining results - map_reduce_chain = MapReduceDocumentsChain( - # Map chain - llm_chain=map_chain, - # Reduce chain - reduce_documents_chain=reduce_documents_chain, - # The variable name in the llm_chain to put the documents in - document_variable_name="docs", - # Return the results of the map steps in the output - return_intermediate_steps=False, - ) - - text_splitter = CharacterTextSplitter.from_tiktoken_encoder( - chunk_size=1000, chunk_overlap=100 - ) - split_docs = text_splitter.split_documents(data) - - content = map_reduce_chain.run(split_docs) - - return await self.create_and_upload_processed_file( - content, self.files[0].filename, "Summary" - ) + # Combining documents by mapping a chain over them, then combining results + map_reduce_chain = MapReduceDocumentsChain( + # Map chain + llm_chain=map_chain, + # Reduce chain + reduce_documents_chain=reduce_documents_chain, + # The variable name in the llm_chain to put the documents in + document_variable_name="docs", + # Return the results of the map steps in the output + return_intermediate_steps=False, + ) + return map_reduce_chain def summary_inputs(): diff --git a/backend/modules/assistant/ito/tasks.py b/backend/modules/assistant/ito/tasks.py new file mode 100644 index 000000000..08f190828 --- /dev/null +++ b/backend/modules/assistant/ito/tasks.py @@ -0,0 +1,42 @@ +import asyncio + +from celery_config import celery +from langchain_core.documents import Document +from logger import get_logger + +from .ito import OutputHandler +from .summary import map_reduce_chain + +logger = get_logger(__name__) + + +@celery.task(name="task_summary") +def task_summary(split_docs, filename, brain_id, email_activated, current_user): + loop = asyncio.get_event_loop() + # turn split_docs into a list of Document objects + logger.info("split_docs: %s", split_docs) + split_docs = [ + Document( + page_content=doc["kwargs"]["page_content"], + metadata=doc["kwargs"]["metadata"], + ) + for doc in split_docs + if "kwargs" in doc + and "page_content" in doc["kwargs"] + and "metadata" in doc["kwargs"] + ] + content = map_reduce_chain().run(split_docs) + output_handler = OutputHandler() + return loop.run_until_complete( + output_handler.create_and_upload_processed_file( + content, + filename, + "Summary", + content, + "Summary", + "Summary", + brain_id, + email_activated, + current_user, + ) + )