2024-06-27 13:51:01 +03:00
|
|
|
import logging
|
2024-07-12 16:07:39 +03:00
|
|
|
from typing import Any, List, Tuple, no_type_check
|
2024-06-26 10:58:55 +03:00
|
|
|
|
2024-07-15 20:10:03 +03:00
|
|
|
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessage
|
2024-06-26 10:58:55 +03:00
|
|
|
from langchain_core.messages.ai import AIMessageChunk
|
2024-07-12 16:07:39 +03:00
|
|
|
from langchain_core.prompts import format_document
|
2024-08-06 15:51:27 +03:00
|
|
|
|
2024-06-27 13:51:01 +03:00
|
|
|
from quivr_core.models import (
|
2024-08-06 18:44:12 +03:00
|
|
|
ChatLLMMetadata,
|
2024-06-26 10:58:55 +03:00
|
|
|
ParsedRAGResponse,
|
2024-06-27 13:51:01 +03:00
|
|
|
QuivrKnowledge,
|
2024-06-26 10:58:55 +03:00
|
|
|
RAGResponseMetadata,
|
|
|
|
RawRAGResponse,
|
|
|
|
)
|
2024-09-23 19:11:06 +03:00
|
|
|
from quivr_core.prompts import custom_prompts
|
2024-06-27 13:51:01 +03:00
|
|
|
|
|
|
|
# TODO(@aminediro): define a types packages where we clearly define IO types
|
|
|
|
# This should be used for serialization/deseriallization later
|
|
|
|
|
2024-06-26 10:58:55 +03:00
|
|
|
|
2024-07-15 20:10:03 +03:00
|
|
|
logger = logging.getLogger("quivr_core")
|
2024-06-26 10:58:55 +03:00
|
|
|
|
|
|
|
|
|
|
|
def model_supports_function_calling(model_name: str):
|
|
|
|
models_supporting_function_calls = [
|
|
|
|
"gpt-4",
|
|
|
|
"gpt-4-1106-preview",
|
|
|
|
"gpt-4-0613",
|
2024-10-21 18:30:34 +03:00
|
|
|
"gpt-4o",
|
2024-06-26 10:58:55 +03:00
|
|
|
"gpt-3.5-turbo-1106",
|
|
|
|
"gpt-3.5-turbo-0613",
|
|
|
|
"gpt-4-0125-preview",
|
|
|
|
"gpt-3.5-turbo",
|
|
|
|
"gpt-4-turbo",
|
|
|
|
"gpt-4o",
|
2024-08-06 15:51:27 +03:00
|
|
|
"gpt-4o-mini",
|
2024-06-26 10:58:55 +03:00
|
|
|
]
|
|
|
|
return model_name in models_supporting_function_calls
|
|
|
|
|
2024-09-18 13:30:48 +03:00
|
|
|
|
2024-06-26 10:58:55 +03:00
|
|
|
def format_history_to_openai_mesages(
|
|
|
|
tuple_history: List[Tuple[str, str]], system_message: str, question: str
|
|
|
|
) -> List[BaseMessage]:
|
|
|
|
"""Format the chat history into a list of Base Messages"""
|
|
|
|
messages = []
|
|
|
|
messages.append(SystemMessage(content=system_message))
|
|
|
|
for human, ai in tuple_history:
|
|
|
|
messages.append(HumanMessage(content=human))
|
|
|
|
messages.append(AIMessage(content=ai))
|
|
|
|
messages.append(HumanMessage(content=question))
|
|
|
|
return messages
|
|
|
|
|
|
|
|
|
|
|
|
def cited_answer_filter(tool):
|
|
|
|
return tool["name"] == "cited_answer"
|
|
|
|
|
|
|
|
|
|
|
|
def get_chunk_metadata(
|
2024-09-23 19:11:06 +03:00
|
|
|
msg: AIMessageChunk, sources: list[Any] | None = None
|
2024-06-26 10:58:55 +03:00
|
|
|
) -> RAGResponseMetadata:
|
|
|
|
# Initiate the source
|
2024-09-23 19:11:06 +03:00
|
|
|
metadata = {"sources": sources} if sources else {"sources": []}
|
2024-06-26 10:58:55 +03:00
|
|
|
if msg.tool_calls:
|
|
|
|
cited_answer = next(x for x in msg.tool_calls if cited_answer_filter(x))
|
|
|
|
|
|
|
|
if "args" in cited_answer:
|
|
|
|
gathered_args = cited_answer["args"]
|
|
|
|
if "citations" in gathered_args:
|
|
|
|
citations = gathered_args["citations"]
|
|
|
|
metadata["citations"] = citations
|
|
|
|
|
|
|
|
if "followup_questions" in gathered_args:
|
|
|
|
followup_questions = gathered_args["followup_questions"]
|
|
|
|
metadata["followup_questions"] = followup_questions
|
|
|
|
|
2024-09-23 19:11:06 +03:00
|
|
|
return RAGResponseMetadata(**metadata, metadata_model=None)
|
2024-06-26 10:58:55 +03:00
|
|
|
|
|
|
|
|
2024-07-12 16:07:39 +03:00
|
|
|
def get_prev_message_str(msg: AIMessageChunk) -> str:
|
|
|
|
if msg.tool_calls:
|
|
|
|
cited_answer = next(x for x in msg.tool_calls if cited_answer_filter(x))
|
|
|
|
if "args" in cited_answer and "answer" in cited_answer["args"]:
|
|
|
|
return cited_answer["args"]["answer"]
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
2024-06-26 10:58:55 +03:00
|
|
|
# TODO: CONVOLUTED LOGIC !
|
|
|
|
# TODO(@aminediro): redo this
|
2024-07-09 18:55:14 +03:00
|
|
|
@no_type_check
|
2024-06-26 10:58:55 +03:00
|
|
|
def parse_chunk_response(
|
2024-07-12 16:07:39 +03:00
|
|
|
rolling_msg: AIMessageChunk,
|
2024-06-26 10:58:55 +03:00
|
|
|
raw_chunk: dict[str, Any],
|
|
|
|
supports_func_calling: bool,
|
2024-07-12 16:07:39 +03:00
|
|
|
) -> Tuple[AIMessageChunk, str]:
|
2024-06-26 10:58:55 +03:00
|
|
|
# Init with sources
|
|
|
|
answer_str = ""
|
|
|
|
|
2024-09-03 16:23:23 +03:00
|
|
|
if "answer" in raw_chunk:
|
|
|
|
answer = raw_chunk["answer"]
|
|
|
|
else:
|
|
|
|
answer = raw_chunk
|
|
|
|
|
|
|
|
rolling_msg += answer
|
2024-09-23 19:11:06 +03:00
|
|
|
if supports_func_calling and rolling_msg.tool_calls:
|
|
|
|
cited_answer = next(x for x in rolling_msg.tool_calls if cited_answer_filter(x))
|
|
|
|
if "args" in cited_answer and "answer" in cited_answer["args"]:
|
|
|
|
gathered_args = cited_answer["args"]
|
|
|
|
# Only send the difference between answer and response_tokens which was the previous answer
|
|
|
|
answer_str = gathered_args["answer"]
|
|
|
|
return rolling_msg, answer_str
|
|
|
|
|
|
|
|
return rolling_msg, answer.content
|
2024-06-26 10:58:55 +03:00
|
|
|
|
|
|
|
|
2024-07-09 18:55:14 +03:00
|
|
|
@no_type_check
|
2024-06-26 10:58:55 +03:00
|
|
|
def parse_response(raw_response: RawRAGResponse, model_name: str) -> ParsedRAGResponse:
|
2024-09-03 16:23:23 +03:00
|
|
|
answer = ""
|
2024-09-23 19:11:06 +03:00
|
|
|
sources = raw_response["docs"] if "docs" in raw_response else []
|
2024-06-26 10:58:55 +03:00
|
|
|
|
2024-08-06 18:44:12 +03:00
|
|
|
metadata = RAGResponseMetadata(
|
|
|
|
sources=sources, metadata_model=ChatLLMMetadata(name=model_name)
|
|
|
|
)
|
2024-06-26 10:58:55 +03:00
|
|
|
|
2024-09-23 19:11:06 +03:00
|
|
|
if (
|
|
|
|
model_supports_function_calling(model_name)
|
|
|
|
and "tool_calls" in raw_response["answer"]
|
|
|
|
and raw_response["answer"].tool_calls
|
|
|
|
):
|
|
|
|
if "citations" in raw_response["answer"].tool_calls[-1]["args"]:
|
2024-06-26 10:58:55 +03:00
|
|
|
citations = raw_response["answer"].tool_calls[-1]["args"]["citations"]
|
2024-08-06 18:44:12 +03:00
|
|
|
metadata.citations = citations
|
2024-06-26 10:58:55 +03:00
|
|
|
followup_questions = raw_response["answer"].tool_calls[-1]["args"][
|
|
|
|
"followup_questions"
|
|
|
|
]
|
|
|
|
if followup_questions:
|
2024-08-06 18:44:12 +03:00
|
|
|
metadata.followup_questions = followup_questions
|
2024-09-03 16:23:23 +03:00
|
|
|
answer = raw_response["answer"].tool_calls[-1]["args"]["answer"]
|
|
|
|
else:
|
|
|
|
answer = raw_response["answer"].tool_calls[-1]["args"]["answer"]
|
|
|
|
else:
|
|
|
|
answer = raw_response["answer"].content
|
2024-06-26 10:58:55 +03:00
|
|
|
|
2024-08-06 18:44:12 +03:00
|
|
|
parsed_response = ParsedRAGResponse(answer=answer, metadata=metadata)
|
2024-06-26 10:58:55 +03:00
|
|
|
return parsed_response
|
|
|
|
|
|
|
|
|
|
|
|
def combine_documents(
|
2024-09-23 19:11:06 +03:00
|
|
|
docs,
|
|
|
|
document_prompt=custom_prompts.DEFAULT_DOCUMENT_PROMPT,
|
|
|
|
document_separator="\n\n",
|
2024-06-26 10:58:55 +03:00
|
|
|
):
|
|
|
|
# for each docs, add an index in the metadata to be able to cite the sources
|
2024-09-18 13:30:48 +03:00
|
|
|
for doc, index in zip(docs, range(len(docs)), strict=False):
|
2024-06-26 10:58:55 +03:00
|
|
|
doc.metadata["index"] = index
|
|
|
|
doc_strings = [format_document(doc, document_prompt) for doc in docs]
|
|
|
|
return document_separator.join(doc_strings)
|
|
|
|
|
|
|
|
|
2024-06-27 13:51:01 +03:00
|
|
|
def format_file_list(
|
|
|
|
list_files_array: list[QuivrKnowledge], max_files: int = 20
|
|
|
|
) -> str:
|
2024-06-26 10:58:55 +03:00
|
|
|
list_files = [file.file_name or file.url for file in list_files_array]
|
|
|
|
files: list[str] = list(filter(lambda n: n is not None, list_files)) # type: ignore
|
|
|
|
files = files[:max_files]
|
|
|
|
|
|
|
|
files_str = "\n".join(files) if list_files_array else "None"
|
|
|
|
return files_str
|