feat(celery): moved assistant summary to celery (#2557)

This pull request moves the assistant summary functionality to the
celery module for better organization and separation of concerns.
This commit is contained in:
Stan Girard 2024-05-07 18:12:31 +02:00 committed by GitHub
parent 748733df2d
commit 55834365b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 194 additions and 110 deletions

View File

@ -8,6 +8,7 @@ CELERY_BROKER_QUEUE_NAME = os.getenv("CELERY_BROKER_QUEUE_NAME", "quivr")
celery = Celery(__name__)
if CELERY_BROKER_URL.startswith("sqs"):
broker_transport_options = {
CELERY_BROKER_QUEUE_NAME: {
@ -36,3 +37,5 @@ elif CELERY_BROKER_URL.startswith("redis"):
)
else:
raise ValueError(f"Unsupported broker URL: {CELERY_BROKER_URL}")
celery.autodiscover_tasks(["modules.assistant.ito"])

View File

@ -9,13 +9,13 @@ from typing import List, Optional
from fastapi import UploadFile
from logger import get_logger
from modules.user.service.user_usage import UserUsage
from modules.assistant.dto.inputs import InputAssistant
from modules.assistant.ito.utils.pdf_generator import PDFGenerator, PDFModel
from modules.chat.controller.chat.utils import update_user_usage
from modules.contact_support.controller.settings import ContactsSettings
from modules.upload.controller.upload_routes import upload_file
from modules.user.entity.user_identity import UserIdentity
from modules.user.service.user_usage import UserUsage
from packages.emails.send_email import send_email
from pydantic import BaseModel
from unidecode import unidecode
@ -62,31 +62,36 @@ class ITO(BaseModel):
def calculate_pricing(self):
return 20
def generate_pdf(self, filename: str, title: str, content: str):
pdf_model = PDFModel(title=title, content=content)
pdf = PDFGenerator(pdf_model)
pdf.print_pdf()
pdf.output(filename, "F")
@abstractmethod
async def process_assistant(self):
pass
async def uploadfile_to_file(uploadFile: UploadFile):
# Transform the UploadFile object to a file object with same name and content
tmp_file = NamedTemporaryFile(delete=False)
tmp_file.write(uploadFile.file.read())
tmp_file.flush() # Make sure all data is written to disk
return tmp_file
class OutputHandler(BaseModel):
async def send_output_by_email(
self,
file: UploadFile,
filename: str,
file: UploadFile,
task_name: str,
custom_message: str,
brain_id: str = None,
user_email: str = None,
):
settings = ContactsSettings()
file = await self.uploadfile_to_file(file)
file = await uploadfile_to_file(file)
domain_quivr = os.getenv("QUIVR_DOMAIN", "https://chat.quivr.app/")
with open(file.name, "rb") as f:
mail_from = settings.resend_contact_sales_from
mail_to = self.current_user.email
mail_to = user_email
body = f"""
<div style="text-align: center;">
<img src="https://quivr-cms.s3.eu-west-3.amazonaws.com/logo_quivr_white_7e3c72620f.png" alt="Quivr Logo" style="width: 100px; height: 100px; border-radius: 50%; margin: 0 auto; display: block;">
@ -116,20 +121,34 @@ class ITO(BaseModel):
"subject": "Quivr Ingestion Processed",
"reply_to": "no-reply@quivr.app",
"html": body,
"attachments": [{"filename": filename, "content": list(f.read())}],
"attachments": [
{
"filename": filename,
"content": list(f.read()),
"type": "application/pdf",
}
],
}
logger.info(f"Sending email to {mail_to} with file {filename}")
send_email(params)
async def uploadfile_to_file(self, uploadFile: UploadFile):
# Transform the UploadFile object to a file object with same name and content
tmp_file = NamedTemporaryFile(delete=False)
tmp_file.write(uploadFile.file.read())
tmp_file.flush() # Make sure all data is written to disk
return tmp_file
def generate_pdf(self, filename: str, title: str, content: str):
pdf_model = PDFModel(title=title, content=content)
pdf = PDFGenerator(pdf_model)
pdf.print_pdf()
pdf.output(filename, "F")
async def create_and_upload_processed_file(
self, processed_content: str, original_filename: str, file_description: str
self,
processed_content: str,
original_filename: str,
file_description: str,
content: str,
task_name: str,
custom_message: str,
brain_id: str = None,
email_activated: bool = False,
current_user: UserIdentity = None,
) -> dict:
"""Handles creation and uploading of the processed file."""
# remove any special characters from the filename that aren't http safe
@ -164,29 +183,25 @@ class ITO(BaseModel):
headers={"content-type": "application/pdf"},
)
if self.input.outputs.email.activated:
logger.info(f"current_user: {current_user}")
if email_activated:
await self.send_output_by_email(
file_to_upload,
new_filename,
file_to_upload,
"Summary",
f"{file_description} of {original_filename}",
brain_id=(
self.input.outputs.brain.value
if (
self.input.outputs.brain.activated
and self.input.outputs.brain.value
)
else None
),
brain_id=brain_id,
user_email=current_user["email"],
)
# Reset to start of file before upload
file_to_upload.file.seek(0)
if self.input.outputs.brain.activated:
UserIdentity(**current_user)
if brain_id:
await upload_file(
uploadFile=file_to_upload,
brain_id=self.input.outputs.brain.value,
current_user=self.current_user,
brain_id=brain_id,
current_user=current_user,
chat_id=None,
)

View File

@ -1,6 +1,7 @@
import tempfile
from typing import List
from celery_config import celery
from fastapi import UploadFile
from langchain.chains import (
MapReduceDocumentsChain,
@ -23,9 +24,12 @@ from modules.assistant.dto.outputs import (
Outputs,
)
from modules.assistant.ito.ito import ITO
from modules.notification.dto.inputs import CreateNotification
from modules.notification.service.notification_service import NotificationService
from modules.user.entity.user_identity import UserIdentity
logger = get_logger(__name__)
notification_service = NotificationService()
class SummaryAssistant(ITO):
@ -69,97 +73,117 @@ class SummaryAssistant(ITO):
return True
async def process_assistant(self):
try:
self.increase_usage_user()
notification_service.add_notification(
CreateNotification(
user_id=self.current_user.id,
status="info",
title=f"Creating Summary for {self.files[0].filename}",
)
)
# Create a temporary file with the uploaded file as a temporary file and then pass it to the loader
tmp_file = tempfile.NamedTemporaryFile(delete=False)
# Write the file to the temporary file
tmp_file.write(self.files[0].file.read())
# Now pass the path of the temporary file to the loader
loader = UnstructuredPDFLoader(tmp_file.name)
tmp_file.close()
data = loader.load()
text_splitter = CharacterTextSplitter.from_tiktoken_encoder(
chunk_size=1000, chunk_overlap=100
)
split_docs = text_splitter.split_documents(data)
logger.info(f"Split {len(split_docs)} documents")
# Jsonify the split docs
split_docs = [doc.to_json() for doc in split_docs]
## Turn this into a task
brain_id = (
self.input.outputs.brain.id
if self.input.outputs.brain.activated
else None
)
email_activated = self.input.outputs.email.activated
celery.send_task(
name="task_summary",
args=(
split_docs,
self.files[0].filename,
brain_id,
email_activated,
self.current_user.model_dump(mode="json"),
),
)
except Exception as e:
logger.error(f"Error increasing usage: {e}")
return {"error": str(e)}
logger.error(f"Error processing summary: {e}")
# Create a temporary file with the uploaded file as a temporary file and then pass it to the loader
tmp_file = tempfile.NamedTemporaryFile(delete=False)
# Write the file to the temporary file
tmp_file.write(self.files[0].file.read())
def map_reduce_chain():
llm = ChatLiteLLM(model="gpt-3.5-turbo", max_tokens=2000)
# Now pass the path of the temporary file to the loader
map_template = """The following is a document that has been divided into multiple sections:
{docs}
Please carefully analyze each section and identify the following:
loader = UnstructuredPDFLoader(tmp_file.name)
1. Main Themes: What are the overarching ideas or topics in this section?
2. Key Points: What are the most important facts, arguments, or ideas presented in this section?
3. Important Information: Are there any crucial details that stand out? This could include data, quotes, specific events, entity, or other relevant information.
4. People: Who are the key individuals mentioned in this section? What roles do they play?
5. Reasoning: What logic or arguments are used to support the key points?
6. Chapters: If the document is divided into chapters, what is the main focus of each chapter?
tmp_file.close()
Remember to consider the language and context of the document. This will help in understanding the nuances and subtleties of the text."""
map_prompt = PromptTemplate.from_template(map_template)
map_chain = LLMChain(llm=llm, prompt=map_prompt)
data = loader.load()
# Reduce
reduce_template = """The following is a set of summaries for parts of the document :
{docs}
Take these and distill it into a final, consolidated summary of the document. Make sure to include the main themes, key points, and important information such as data, quotes,people and specific events.
Use markdown such as bold, italics, underlined. For example, **bold**, *italics*, and _underlined_ to highlight key points.
Please provide the final summary with sections using bold headers.
Sections should always be Summary and Key Points, but feel free to add more sections as needed.
Always use bold text for the sections headers.
Keep the same language as the documents.
Answer:"""
reduce_prompt = PromptTemplate.from_template(reduce_template)
llm = ChatLiteLLM(model="gpt-3.5-turbo", max_tokens=2000)
# Run chain
reduce_chain = LLMChain(llm=llm, prompt=reduce_prompt)
map_template = """The following is a document that has been divided into multiple sections:
{docs}
Please carefully analyze each section and identify the following:
# Takes a list of documents, combines them into a single string, and passes this to an LLMChain
combine_documents_chain = StuffDocumentsChain(
llm_chain=reduce_chain, document_variable_name="docs"
)
1. Main Themes: What are the overarching ideas or topics in this section?
2. Key Points: What are the most important facts, arguments, or ideas presented in this section?
3. Important Information: Are there any crucial details that stand out? This could include data, quotes, specific events, entity, or other relevant information.
4. People: Who are the key individuals mentioned in this section? What roles do they play?
5. Reasoning: What logic or arguments are used to support the key points?
6. Chapters: If the document is divided into chapters, what is the main focus of each chapter?
# Combines and iteratively reduces the mapped documents
reduce_documents_chain = ReduceDocumentsChain(
# This is final chain that is called.
combine_documents_chain=combine_documents_chain,
# If documents exceed context for `StuffDocumentsChain`
collapse_documents_chain=combine_documents_chain,
# The maximum number of tokens to group documents into.
token_max=4000,
)
Remember to consider the language and context of the document. This will help in understanding the nuances and subtleties of the text."""
map_prompt = PromptTemplate.from_template(map_template)
map_chain = LLMChain(llm=llm, prompt=map_prompt)
# Reduce
reduce_template = """The following is a set of summaries for parts of the document:
{docs}
Take these and distill it into a final, consolidated summary of the document. Make sure to include the main themes, key points, and important information such as data, quotes,people and specific events.
Use markdown such as bold, italics, underlined. For example, **bold**, *italics*, and _underlined_ to highlight key points.
Please provide the final summary with sections using bold headers.
Sections should always be Summary and Key Points, but feel free to add more sections as needed.
Always use bold text for the sections headers.
Keep the same language as the documents.
Answer:"""
reduce_prompt = PromptTemplate.from_template(reduce_template)
# Run chain
reduce_chain = LLMChain(llm=llm, prompt=reduce_prompt)
# Takes a list of documents, combines them into a single string, and passes this to an LLMChain
combine_documents_chain = StuffDocumentsChain(
llm_chain=reduce_chain, document_variable_name="docs"
)
# Combines and iteratively reduces the mapped documents
reduce_documents_chain = ReduceDocumentsChain(
# This is final chain that is called.
combine_documents_chain=combine_documents_chain,
# If documents exceed context for `StuffDocumentsChain`
collapse_documents_chain=combine_documents_chain,
# The maximum number of tokens to group documents into.
token_max=4000,
)
# Combining documents by mapping a chain over them, then combining results
map_reduce_chain = MapReduceDocumentsChain(
# Map chain
llm_chain=map_chain,
# Reduce chain
reduce_documents_chain=reduce_documents_chain,
# The variable name in the llm_chain to put the documents in
document_variable_name="docs",
# Return the results of the map steps in the output
return_intermediate_steps=False,
)
text_splitter = CharacterTextSplitter.from_tiktoken_encoder(
chunk_size=1000, chunk_overlap=100
)
split_docs = text_splitter.split_documents(data)
content = map_reduce_chain.run(split_docs)
return await self.create_and_upload_processed_file(
content, self.files[0].filename, "Summary"
)
# Combining documents by mapping a chain over them, then combining results
map_reduce_chain = MapReduceDocumentsChain(
# Map chain
llm_chain=map_chain,
# Reduce chain
reduce_documents_chain=reduce_documents_chain,
# The variable name in the llm_chain to put the documents in
document_variable_name="docs",
# Return the results of the map steps in the output
return_intermediate_steps=False,
)
return map_reduce_chain
def summary_inputs():

View File

@ -0,0 +1,42 @@
import asyncio
from celery_config import celery
from langchain_core.documents import Document
from logger import get_logger
from .ito import OutputHandler
from .summary import map_reduce_chain
logger = get_logger(__name__)
@celery.task(name="task_summary")
def task_summary(split_docs, filename, brain_id, email_activated, current_user):
loop = asyncio.get_event_loop()
# turn split_docs into a list of Document objects
logger.info("split_docs: %s", split_docs)
split_docs = [
Document(
page_content=doc["kwargs"]["page_content"],
metadata=doc["kwargs"]["metadata"],
)
for doc in split_docs
if "kwargs" in doc
and "page_content" in doc["kwargs"]
and "metadata" in doc["kwargs"]
]
content = map_reduce_chain().run(split_docs)
output_handler = OutputHandler()
return loop.run_until_complete(
output_handler.create_and_upload_processed_file(
content,
filename,
"Summary",
content,
"Summary",
"Summary",
brain_id,
email_activated,
current_user,
)
)