quivr/core/tests/fixture_chunks.py

53 lines
1.8 KiB
Python
Raw Permalink Normal View History

import asyncio
import json
from uuid import uuid4
from langchain_core.embeddings import DeterministicFakeEmbedding
from langchain_core.messages.ai import AIMessageChunk
from langchain_core.vectorstores import InMemoryVectorStore
from quivr_core.rag.entities.chat import ChatHistory
from quivr_core.rag.entities.config import LLMEndpointConfig, RetrievalConfig
from quivr_core.llm import LLMEndpoint
from quivr_core.rag.quivr_rag_langgraph import QuivrQARAGLangGraph
async def main():
retrieval_config = RetrievalConfig(llm_config=LLMEndpointConfig(model="gpt-4o"))
embedder = DeterministicFakeEmbedding(size=20)
vec = InMemoryVectorStore(embedder)
llm = LLMEndpoint.from_config(retrieval_config.llm_config)
chat_history = ChatHistory(uuid4(), uuid4())
rag_pipeline = QuivrQARAGLangGraph(
retrieval_config=retrieval_config, llm=llm, vector_store=vec
)
conversational_qa_chain = rag_pipeline.build_chain()
with open("response.jsonl", "w") as f:
async for event in conversational_qa_chain.astream_events(
{
"messages": [
("user", "What is NLP, give a very long detailed answer"),
],
"chat_history": chat_history,
"custom_personality": None,
},
version="v1",
config={"metadata": {}},
):
kind = event["event"]
if (
kind == "on_chat_model_stream"
and event["metadata"]["langgraph_node"] == "generate"
):
chunk = event["data"]["chunk"]
dict_chunk = {
k: v.dict() if isinstance(v, AIMessageChunk) else v
for k, v in chunk.items()
}
f.write(json.dumps(dict_chunk) + "\n")
asyncio.run(main())