quivr/backend/main.py
AmineDiro 675885c762
feat(upload): async improved (#2544)
# Description
Hey,

Here's a breakdown of what I've done:

- Reducing the number of opened fd and memory footprint: Previously, for
each uploaded file, we were opening a temporary NamedTemporaryFile to
write existing content read from Supabase. However, due to the
dependency on `langchain` loader classes, we couldn't use memory buffers
for the loaders. Now, with the changes made, we only open a single
temporary file for each `process_file_and_notify`, cutting down on
excessive file opening, read syscalls, and memory buffer usage. This
could cause stability issues when ingesting and processing large volumes
of documents. Unfortunately, there is still reopening of temporary files
in some code paths but this can be improved further in later work.
- Removing `UploadFile` class from File: The `UploadFile` ( a FastAPI
abstraction over a SpooledTemporaryFile for multipart upload) was
redundant in our `File` setup since we already downloaded the file from
remote storage and read it into memory + wrote the file into a temp
file. By removing this abstraction, we streamline our code and eliminate
unnecessary complexity.
- `async` function Adjustments: I've removed the async labeling from
functions where it wasn't truly asynchronous. For instance, calling
`filter_file` for processing files isn't genuinely async, ass async file
reading isn't actually asynchronous—it [uses a threadpool for reading
the
file](9f16bf5c25/starlette/datastructures.py (L458))
. Given that we're already leveraging `celery` for parallelism (one
worker per core), we need to ensure that reading and processing occur in
the same thread, or at least minimize thread spawning. Additionally,
since the rest of the code isn't inherently asynchronous, our bottleneck
lies in CPU operations rather than asynchronous processing.

These changes aim to improve performance and streamline our codebase. 
Let me know if you have any questions or suggestions for further
improvements!

## Checklist before requesting a review
- [x] My code follows the style guidelines of this project
- [x] I have performed a self-review of my code
- [x] I have ideally added tests that prove my fix is effective or that
my feature works

---------

Signed-off-by: aminediro <aminediro@github.com>
Co-authored-by: aminediro <aminediro@github.com>
Co-authored-by: Stan Girard <girard.stanislas@gmail.com>
2024-06-04 06:29:27 -07:00

133 lines
4.1 KiB
Python

import logging
import os
import litellm
import sentry_sdk
from dotenv import load_dotenv # type: ignore
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse
from logger import get_logger
from middlewares.cors import add_cors_middleware
from modules.analytics.controller.analytics_routes import analytics_router
from modules.api_key.controller import api_key_router
from modules.assistant.controller import assistant_router
from modules.brain.controller import brain_router
from modules.chat.controller import chat_router
from modules.contact_support.controller import contact_router
from modules.knowledge.controller import knowledge_router
from modules.misc.controller import misc_router
from modules.onboarding.controller import onboarding_router
from modules.prompt.controller import prompt_router
from modules.sync.controller import sync_router
from modules.upload.controller import upload_router
from modules.user.controller import user_router
from packages.utils import handle_request_validation_error
from packages.utils.telemetry import maybe_send_telemetry
from pyinstrument import Profiler
from routes.crawl_routes import crawl_router
from routes.subscription_routes import subscription_router
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.starlette import StarletteIntegration
load_dotenv()
# Set the logging level for all loggers to WARNING
logging.basicConfig(level=logging.INFO)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("LiteLLM").setLevel(logging.WARNING)
logging.getLogger("litellm").setLevel(logging.WARNING)
litellm.set_verbose = False
logger = get_logger(__name__)
def before_send(event, hint):
# If this is a transaction event
if event["type"] == "transaction":
# And the transaction name contains 'healthz'
if "healthz" in event["transaction"]:
# Drop the event by returning None
return None
# For other events, return them as is
return event
sentry_dsn = os.getenv("SENTRY_DSN")
if sentry_dsn:
sentry_sdk.init(
dsn=sentry_dsn,
sample_rate=0.1,
enable_tracing=True,
traces_sample_rate=0.1,
integrations=[
StarletteIntegration(transaction_style="url"),
FastApiIntegration(transaction_style="url"),
],
before_send=before_send,
)
app = FastAPI()
add_cors_middleware(app)
app.include_router(brain_router)
app.include_router(chat_router)
app.include_router(crawl_router)
app.include_router(assistant_router)
app.include_router(sync_router)
app.include_router(onboarding_router)
app.include_router(misc_router)
app.include_router(analytics_router)
app.include_router(upload_router)
app.include_router(user_router)
app.include_router(api_key_router)
app.include_router(subscription_router)
app.include_router(prompt_router)
app.include_router(knowledge_router)
app.include_router(contact_router)
PROFILING = os.getenv("PROFILING", "false").lower() == "true"
if PROFILING:
@app.middleware("http")
async def profile_request(request: Request, call_next):
profiling = request.query_params.get("profile", False)
if profiling:
profiler = Profiler()
profiler.start()
await call_next(request)
profiler.stop()
return HTMLResponse(profiler.output_html())
else:
return await call_next(request)
@app.exception_handler(HTTPException)
async def http_exception_handler(_, exc):
return JSONResponse(
status_code=exc.status_code,
content={"detail": exc.detail},
)
handle_request_validation_error(app)
if os.getenv("TELEMETRY_ENABLED") == "true":
logger.info("Telemetry enabled, we use telemetry to collect anonymous usage data.")
logger.info(
"To disable telemetry, set the TELEMETRY_ENABLED environment variable to false."
)
maybe_send_telemetry("booting", {"status": "ok"})
maybe_send_telemetry("ping", {"ping": "pong"})
if __name__ == "__main__":
# run main.py to debug backend
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=5050, log_level="warning", access_log=False)