quivr/backend/core/tests/fixture_chunks.py
Jacopo Chevallard ef90e8e672
feat: introducing configurable retrieval workflows (#3227)
# Description

Major PR which, among other things, introduces the possibility of easily
customizing the retrieval workflows. Workflows are based on LangGraph,
and can be customized using a [yaml configuration
file](core/tests/test_llm_endpoint.py), and adding the implementation of
the nodes logic into
[quivr_rag_langgraph.py](1a0c98437a/backend/core/quivr_core/quivr_rag_langgraph.py)

This is a first, simple implementation that will significantly evolve in
the coming weeks to enable more complex workflows (for instance, with
conditional nodes). We also plan to adopt a similar approach for the
ingestion part, i.e. to enable user to easily customize the ingestion
pipeline.

Closes CORE-195, CORE-203, CORE-204

## Checklist before requesting a review

Please delete options that are not relevant.

- [X] My code follows the style guidelines of this project
- [X] I have performed a self-review of my code
- [X] I have commented hard-to-understand areas
- [X] I have ideally added tests that prove my fix is effective or that
my feature works
- [X] New and existing unit tests pass locally with my changes
- [X] Any dependent changes have been merged

## Screenshots (if appropriate):
2024-09-23 09:11:06 -07:00

53 lines
1.8 KiB
Python

import asyncio
import json
from uuid import uuid4
from langchain_core.embeddings import DeterministicFakeEmbedding
from langchain_core.messages.ai import AIMessageChunk
from langchain_core.vectorstores import InMemoryVectorStore
from quivr_core.chat import ChatHistory
from quivr_core.config import LLMEndpointConfig, RetrievalConfig
from quivr_core.llm import LLMEndpoint
from quivr_core.quivr_rag_langgraph import QuivrQARAGLangGraph
async def main():
retrieval_config = RetrievalConfig(llm_config=LLMEndpointConfig(model="gpt-4o"))
embedder = DeterministicFakeEmbedding(size=20)
vec = InMemoryVectorStore(embedder)
llm = LLMEndpoint.from_config(retrieval_config.llm_config)
chat_history = ChatHistory(uuid4(), uuid4())
rag_pipeline = QuivrQARAGLangGraph(
retrieval_config=retrieval_config, llm=llm, vector_store=vec
)
conversational_qa_chain = rag_pipeline.build_chain()
with open("response.jsonl", "w") as f:
async for event in conversational_qa_chain.astream_events(
{
"messages": [
("user", "What is NLP, give a very long detailed answer"),
],
"chat_history": chat_history,
"custom_personality": None,
},
version="v1",
config={"metadata": {}},
):
kind = event["event"]
if (
kind == "on_chat_model_stream"
and event["metadata"]["langgraph_node"] == "generate"
):
chunk = event["data"]["chunk"]
dict_chunk = {
k: v.dict() if isinstance(v, AIMessageChunk) else v
for k, v in chunk.items()
}
f.write(json.dumps(dict_chunk) + "\n")
asyncio.run(main())