import asyncio import json from uuid import uuid4 from langchain_core.embeddings import DeterministicFakeEmbedding from langchain_core.messages.ai import AIMessageChunk from langchain_core.vectorstores import InMemoryVectorStore from quivr_core.chat import ChatHistory from quivr_core.config import LLMEndpointConfig, RetrievalConfig from quivr_core.llm import LLMEndpoint from quivr_core.quivr_rag_langgraph import QuivrQARAGLangGraph async def main(): retrieval_config = RetrievalConfig(llm_config=LLMEndpointConfig(model="gpt-4o")) embedder = DeterministicFakeEmbedding(size=20) vec = InMemoryVectorStore(embedder) llm = LLMEndpoint.from_config(retrieval_config.llm_config) chat_history = ChatHistory(uuid4(), uuid4()) rag_pipeline = QuivrQARAGLangGraph( retrieval_config=retrieval_config, llm=llm, vector_store=vec ) conversational_qa_chain = rag_pipeline.build_chain() with open("response.jsonl", "w") as f: async for event in conversational_qa_chain.astream_events( { "messages": [ ("user", "What is NLP, give a very long detailed answer"), ], "chat_history": chat_history, "custom_personality": None, }, version="v1", config={"metadata": {}}, ): kind = event["event"] if ( kind == "on_chat_model_stream" and event["metadata"]["langgraph_node"] == "generate" ): chunk = event["data"]["chunk"] dict_chunk = { k: v.dict() if isinstance(v, AIMessageChunk) else v for k, v in chunk.items() } f.write(json.dumps(dict_chunk) + "\n") asyncio.run(main())