quivr/backend/modules/upload/controller/upload_routes.py
AmineDiro 675885c762
feat(upload): async improved (#2544)
# Description
Hey,

Here's a breakdown of what I've done:

- Reducing the number of opened fd and memory footprint: Previously, for
each uploaded file, we were opening a temporary NamedTemporaryFile to
write existing content read from Supabase. However, due to the
dependency on `langchain` loader classes, we couldn't use memory buffers
for the loaders. Now, with the changes made, we only open a single
temporary file for each `process_file_and_notify`, cutting down on
excessive file opening, read syscalls, and memory buffer usage. This
could cause stability issues when ingesting and processing large volumes
of documents. Unfortunately, there is still reopening of temporary files
in some code paths but this can be improved further in later work.
- Removing `UploadFile` class from File: The `UploadFile` ( a FastAPI
abstraction over a SpooledTemporaryFile for multipart upload) was
redundant in our `File` setup since we already downloaded the file from
remote storage and read it into memory + wrote the file into a temp
file. By removing this abstraction, we streamline our code and eliminate
unnecessary complexity.
- `async` function Adjustments: I've removed the async labeling from
functions where it wasn't truly asynchronous. For instance, calling
`filter_file` for processing files isn't genuinely async, ass async file
reading isn't actually asynchronous—it [uses a threadpool for reading
the
file](9f16bf5c25/starlette/datastructures.py (L458))
. Given that we're already leveraging `celery` for parallelism (one
worker per core), we need to ensure that reading and processing occur in
the same thread, or at least minimize thread spawning. Additionally,
since the rest of the code isn't inherently asynchronous, our bottleneck
lies in CPU operations rather than asynchronous processing.

These changes aim to improve performance and streamline our codebase. 
Let me know if you have any questions or suggestions for further
improvements!

## Checklist before requesting a review
- [x] My code follows the style guidelines of this project
- [x] I have performed a self-review of my code
- [x] I have ideally added tests that prove my fix is effective or that
my feature works

---------

Signed-off-by: aminediro <aminediro@github.com>
Co-authored-by: aminediro <aminediro@github.com>
Co-authored-by: Stan Girard <girard.stanislas@gmail.com>
2024-06-04 06:29:27 -07:00

116 lines
4.1 KiB
Python

import os
from typing import Optional
from uuid import UUID
from celery_worker import process_file_and_notify
from fastapi import APIRouter, Depends, HTTPException, Query, UploadFile
from logger import get_logger
from middlewares.auth import AuthBearer, get_current_user
from modules.brain.entity.brain_entity import RoleEnum
from modules.brain.service.brain_authorization_service import (
validate_brain_authorization,
)
from modules.knowledge.dto.inputs import CreateKnowledgeProperties
from modules.knowledge.service.knowledge_service import KnowledgeService
from modules.notification.dto.inputs import (
CreateNotification,
NotificationUpdatableProperties,
)
from modules.notification.entity.notification import NotificationsStatusEnum
from modules.notification.service.notification_service import NotificationService
from modules.upload.service.upload_file import upload_file_storage
from modules.user.entity.user_identity import UserIdentity
from modules.user.service.user_usage import UserUsage
from packages.files.file import convert_bytes, get_file_size
from packages.utils.telemetry import maybe_send_telemetry
logger = get_logger(__name__)
upload_router = APIRouter()
notification_service = NotificationService()
knowledge_service = KnowledgeService()
@upload_router.get("/upload/healthz", tags=["Health"])
async def healthz():
return {"status": "ok"}
@upload_router.post("/upload", dependencies=[Depends(AuthBearer())], tags=["Upload"])
async def upload_file(
uploadFile: UploadFile,
brain_id: UUID = Query(..., description="The ID of the brain"),
chat_id: Optional[UUID] = Query(None, description="The ID of the chat"),
current_user: UserIdentity = Depends(get_current_user),
):
validate_brain_authorization(
brain_id, current_user.id, [RoleEnum.Editor, RoleEnum.Owner]
)
uploadFile.file.seek(0)
user_daily_usage = UserUsage(
id=current_user.id,
email=current_user.email,
)
upload_notification = notification_service.add_notification(
CreateNotification(
user_id=current_user.id,
status=NotificationsStatusEnum.INFO,
title=f"Processing File {uploadFile.filename}",
)
)
user_settings = user_daily_usage.get_user_settings()
remaining_free_space = user_settings.get("max_brain_size", 1000000000)
maybe_send_telemetry("upload_file", {"file_name": uploadFile.filename})
file_size = get_file_size(uploadFile)
if remaining_free_space - file_size < 0:
message = f"Brain will exceed maximum capacity. Maximum file allowed is : {convert_bytes(remaining_free_space)}"
raise HTTPException(status_code=403, detail=message)
file_content = await uploadFile.read()
filename_with_brain_id = str(brain_id) + "/" + str(uploadFile.filename)
try:
upload_file_storage(file_content, filename_with_brain_id)
except Exception as e:
print(e)
notification_service.update_notification_by_id(
upload_notification.id if upload_notification else None,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.ERROR,
description=f"There was an error uploading the file: {e}",
),
)
if "The resource already exists" in str(e):
raise HTTPException(
status_code=403,
detail=f"File {uploadFile.filename} already exists in storage.",
)
else:
raise HTTPException(
status_code=500, detail=f"Failed to upload file to storage. {e}"
)
knowledge_to_add = CreateKnowledgeProperties(
brain_id=brain_id,
file_name=uploadFile.filename,
extension=os.path.splitext(
uploadFile.filename # pyright: ignore reportPrivateUsage=none
)[-1].lower(),
)
knowledge_service.add_knowledge(knowledge_to_add)
process_file_and_notify.delay(
file_name=filename_with_brain_id,
file_original_name=uploadFile.filename,
brain_id=brain_id,
notification_id=upload_notification.id,
)
return {"message": "File processing has started."}