feat(megaparse): add sdk (#3462)

What it does :

Adds the MegaParse API call for parsing using the SDK
This commit is contained in:
Chloé Daems 2024-11-08 14:36:54 +01:00 committed by GitHub
parent 1356d87098
commit 190d971bd7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 153 additions and 97 deletions

View File

@ -22,15 +22,12 @@ dependencies = [
"transformers[sentencepiece]>=4.44.2",
"faiss-cpu>=1.8.0.post1",
"rapidfuzz>=3.10.1",
"megaparse-sdk>=0.1.2",
"markupsafe>=2.1.5",
]
readme = "README.md"
requires-python = ">= 3.11"
[project.optional-dependencies]
all = [
"unstructured[epub,docx,odt,doc,pptx,ppt,xlsx,md]>=0.15.5",
"docx2txt>=0.8",
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

View File

@ -4,12 +4,22 @@ import yaml
from pydantic import BaseModel
class PdfParser(str, Enum):
LLAMA_PARSE = "llama_parse"
class ParserType(str, Enum):
"""Parser type enumeration."""
UNSTRUCTURED = "unstructured"
LLAMA_PARSER = "llama_parser"
MEGAPARSE_VISION = "megaparse_vision"
class StrategyEnum(str, Enum):
"""Method to use for the conversion"""
FAST = "fast"
AUTO = "auto"
HI_RES = "hi_res"
class MegaparseBaseConfig(BaseModel):
@classmethod
def from_yaml(cls, file_path: str):
@ -22,6 +32,8 @@ class MegaparseBaseConfig(BaseModel):
class MegaparseConfig(MegaparseBaseConfig):
strategy: str = "fast"
llama_parse_api_key: str | None = None
pdf_parser: PdfParser = PdfParser.UNSTRUCTURED
method: ParserType = ParserType.UNSTRUCTURED
strategy: StrategyEnum = StrategyEnum.AUTO
check_table: bool = False
parsing_instruction: str | None = None
model_name: str = "gpt-4o"

View File

@ -1,9 +1,10 @@
import logging
import os
import tiktoken
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter, TextSplitter
from megaparse import MegaParse
from megaparse_sdk import MegaParseSDK
from quivr_core.config import MegaparseConfig
from quivr_core.files.file import QuivrFile
@ -29,7 +30,24 @@ class MegaparseProcessor(ProcessorBase):
"""
supported_extensions = [FileExtension.pdf]
supported_extensions = [
FileExtension.pdf,
FileExtension.docx,
FileExtension.doc,
FileExtension.pptx,
FileExtension.xls,
FileExtension.xlsx,
FileExtension.csv,
FileExtension.epub,
FileExtension.bib,
FileExtension.odt,
FileExtension.html,
FileExtension.py,
FileExtension.markdown,
FileExtension.md,
FileExtension.mdx,
FileExtension.ipynb,
]
def __init__(
self,
@ -56,9 +74,24 @@ class MegaparseProcessor(ProcessorBase):
}
async def process_file_inner(self, file: QuivrFile) -> list[Document]:
mega_parse = MegaParse(file_path=file.path, config=self.megaparse_config) # type: ignore
document: Document = await mega_parse.aload()
if len(document.page_content) > self.splitter_config.chunk_size:
api_key = str(os.getenv("MEGAPARSE_API_KEY"))
megaparse = MegaParseSDK(api_key)
logger.info(f"Uploading file {file.path} to MegaParse")
data = {
"method": self.megaparse_config.method,
"strategy": self.megaparse_config.strategy,
"check_table": self.megaparse_config.check_table,
"parsing_instruction": self.megaparse_config.parsing_instruction,
"model_name": self.megaparse_config.model_name,
}
response = await megaparse.file.upload(
file_path=str(file.path),
**data,
)
document = Document(
page_content=response["result"],
)
if len(response) > self.splitter_config.chunk_size:
docs = self.text_splitter.split_documents([document])
for doc in docs:
doc.metadata = {"chunk_size": len(self.enc.encode(doc.page_content))}

View File

@ -3,7 +3,7 @@ import logging
import types
from dataclasses import dataclass, field
from heapq import heappop, heappush
from typing import Type, TypeAlias
from typing import List, Type, TypeAlias
from quivr_core.files.file import FileExtension
@ -49,37 +49,41 @@ base_processors: ProcMapping = {
def _append_proc_mapping(
mapping: ProcMapping,
file_ext: FileExtension | str,
file_exts: List[FileExtension] | List[str],
cls_mod: str,
errtxt: str,
priority: int | None,
):
if file_ext in mapping:
try:
prev_proc = heappop(mapping[file_ext])
proc_entry = ProcEntry(
priority=priority if priority is not None else prev_proc.priority - 1,
cls_mod=cls_mod,
err=errtxt,
)
# Push the previous processor back
heappush(mapping[file_ext], prev_proc)
heappush(mapping[file_ext], proc_entry)
except IndexError:
for file_ext in file_exts:
if file_ext in mapping:
try:
prev_proc = heappop(mapping[file_ext])
proc_entry = ProcEntry(
priority=priority
if priority is not None
else prev_proc.priority - 1,
cls_mod=cls_mod,
err=errtxt,
)
# Push the previous processor back
heappush(mapping[file_ext], prev_proc)
heappush(mapping[file_ext], proc_entry)
except IndexError:
proc_entry = ProcEntry(
priority=priority if priority is not None else _LOWEST_PRIORITY,
cls_mod=cls_mod,
err=errtxt,
)
heappush(mapping[file_ext], proc_entry)
else:
proc_entry = ProcEntry(
priority=priority if priority is not None else _LOWEST_PRIORITY,
cls_mod=cls_mod,
err=errtxt,
)
heappush(mapping[file_ext], proc_entry)
else:
proc_entry = ProcEntry(
priority=priority if priority is not None else _LOWEST_PRIORITY,
cls_mod=cls_mod,
err=errtxt,
)
mapping[file_ext] = [proc_entry]
mapping[file_ext] = [proc_entry]
def defaults_to_proc_entries(
@ -109,7 +113,7 @@ def defaults_to_proc_entries(
ext_str = ext.value if isinstance(ext, FileExtension) else ext
_append_proc_mapping(
mapping=base_processors,
file_ext=ext,
file_exts=[ext],
cls_mod=f"quivr_core.processor.implementations.default.{processor_name}",
errtxt=f"can't import {processor_name}. Please install quivr-core[{ext_str}] to access {processor_name}",
priority=None,
@ -117,13 +121,30 @@ def defaults_to_proc_entries(
# TODO(@aminediro): Megaparse should register itself
# Append Megaparse
# _append_proc_mapping(
# mapping=base_processors,
# file_ext=FileExtension.pdf,
# cls_mod="quivr_core.processor.implementations.megaparse_processor.MegaparseProcessor",
# errtxt=f"can't import MegaparseProcessor. Please install quivr-core[{ext_str}] to access MegaparseProcessor",
# priority=None,
# )
_append_proc_mapping(
mapping=base_processors,
file_exts=[
FileExtension.pdf,
FileExtension.docx,
FileExtension.doc,
FileExtension.pptx,
FileExtension.xls,
FileExtension.xlsx,
FileExtension.csv,
FileExtension.epub,
FileExtension.bib,
FileExtension.odt,
FileExtension.html,
FileExtension.py,
FileExtension.markdown,
FileExtension.md,
FileExtension.mdx,
FileExtension.ipynb,
],
cls_mod="quivr_core.processor.implementations.megaparse_processor.MegaparseProcessor",
errtxt=f"can't import MegaparseProcessor. Please install quivr-core[{ext_str}] to access MegaparseProcessor",
priority=None,
)
return base_processors
@ -181,7 +202,7 @@ def register_processor(
if all(proc_cls != proc.cls_mod for proc in known_processors[file_ext]):
_append_proc_mapping(
known_processors,
file_ext=file_ext,
file_exts=[file_ext],
cls_mod=proc_cls,
errtxt=errtxt
or f"{proc_cls} import failed for processor of {file_ext}",

View File

@ -1,51 +0,0 @@
from pathlib import Path
from uuid import uuid4
import pytest
from quivr_core.files.file import FileExtension, QuivrFile
from quivr_core.processor.implementations.megaparse_processor import MegaparseProcessor
from quivr_core.processor.registry import get_processor_class
all_but_pdf = list(filter(lambda ext: ext != ".pdf", list(FileExtension)))
unstructured = pytest.importorskip("unstructured")
def test_get_default_processors_megaparse():
cls = get_processor_class(FileExtension.pdf)
assert cls == MegaparseProcessor
@pytest.mark.asyncio
async def test_megaparse_pdf_processor():
p = Path("./tests/processor/pdf/sample.pdf")
f = QuivrFile(
id=uuid4(),
brain_id=uuid4(),
original_filename=p.stem,
path=p,
file_extension=FileExtension.pdf,
file_sha1="123",
)
processor = MegaparseProcessor()
result = await processor.process_file(f)
assert len(result) > 0
# FIXME: @chloedia once move to megaparse api
# assert len(result[0].page_content) > 0
@pytest.mark.parametrize("ext", all_but_pdf)
@pytest.mark.asyncio
async def test_megaparse_fail(ext):
p = Path("./tests/processor/pdf/sample.pdf")
f = QuivrFile(
id=uuid4(),
brain_id=uuid4(),
original_filename=p.stem,
path=p,
file_extension=ext,
file_sha1="123",
)
processor = MegaparseProcessor()
with pytest.raises(ValueError):
await processor.process_file(f)

View File

@ -0,0 +1,44 @@
import os
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from quivr_core import Brain
from quivr_core.llm.llm_endpoint import LLMEndpoint
from quivr_core.rag.entities.config import LLMEndpointConfig
from rich.console import Console
from rich.panel import Panel
from rich.prompt import Prompt
if __name__ == "__main__":
brain = Brain.from_files(
name="test_brain",
file_paths=["./tests/processor/docx/demo.docx"],
llm=LLMEndpoint(
llm_config=LLMEndpointConfig(model="gpt-4o"),
llm=ChatOpenAI(model="gpt-4o", api_key=str(os.getenv("OPENAI_API_KEY"))),
),
)
embedder = embeddings = OpenAIEmbeddings(
model="text-embedding-3-large",
)
# Check brain info
brain.print_info()
console = Console()
console.print(Panel.fit("Ask your brain !", style="bold magenta"))
while True:
# Get user input
question = Prompt.ask("[bold cyan]Question[/bold cyan]")
# Check if user wants to exit
if question.lower() == "exit":
console.print(Panel("Goodbye!", style="bold yellow"))
break
answer = brain.ask(question)
# Print the answer with typing effect
console.print(f"[bold green]Quivr Assistant[/bold green]: {answer.answer}")
console.print("-" * console.width)
brain.print_info()