feat(celery): add retry logic for dcos (#2862)

# Description

Please include a summary of the changes and the related issue. Please
also include relevant motivation and context.

## Checklist before requesting a review

Please delete options that are not relevant.

- [ ] My code follows the style guidelines of this project
- [ ] I have performed a self-review of my code
- [ ] I have commented hard-to-understand areas
- [ ] I have ideally added tests that prove my fix is effective or that
my feature works
- [ ] New and existing unit tests pass locally with my changes
- [ ] Any dependent changes have been merged

## Screenshots (if appropriate):

---------

Co-authored-by: Stan Girard <stan@quivr.app>
This commit is contained in:
Stan Girard 2024-07-15 12:32:00 +02:00 committed by GitHub
parent 37793d549a
commit c8dd70affc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -3,6 +3,7 @@ from datetime import datetime, timedelta
from tempfile import NamedTemporaryFile
from uuid import UUID
from celery.exceptions import MaxRetriesExceededError
from celery.schedules import crontab
from pytz import timezone
from quivr_api.celery_config import celery
@ -34,8 +35,15 @@ brain_service = BrainService()
auth_bearer = AuthBearer()
@celery.task(name="process_file_and_notify")
@celery.task(
bind=True,
retries=3,
default_retry_delay=1,
name="process_file_and_notify",
autoretry_for=(Exception,),
)
def process_file_and_notify(
self,
file_name: str,
file_original_name: str,
brain_id,
@ -45,63 +53,69 @@ def process_file_and_notify(
knowledge_id: UUID = None,
):
try:
supabase_client = get_supabase_client()
tmp_name = file_name.replace("/", "_")
base_file_name = os.path.basename(file_name)
_, file_extension = os.path.splitext(base_file_name)
try:
supabase_client = get_supabase_client()
tmp_name = file_name.replace("/", "_")
base_file_name = os.path.basename(file_name)
_, file_extension = os.path.splitext(base_file_name)
with NamedTemporaryFile(
suffix="_" + tmp_name, # pyright: ignore reportPrivateUsage=none
) as tmp_file:
res = supabase_client.storage.from_("quivr").download(file_name)
tmp_file.write(res)
tmp_file.flush()
file_instance = File(
file_name=base_file_name,
tmp_file_path=tmp_file.name,
bytes_content=res,
file_size=len(res),
file_extension=file_extension,
)
brain_vector_service = BrainVectorService(brain_id)
knowledge_service = KnowledgeService()
if delete_file: # TODO fix bug
brain_vector_service.delete_file_from_brain(
file_original_name, only_vectors=True
with NamedTemporaryFile(
suffix="_" + tmp_name, # pyright: ignore reportPrivateUsage=none
) as tmp_file:
res = supabase_client.storage.from_("quivr").download(file_name)
tmp_file.write(res)
tmp_file.flush()
file_instance = File(
file_name=base_file_name,
tmp_file_path=tmp_file.name,
bytes_content=res,
file_size=len(res),
file_extension=file_extension,
)
brain_vector_service = BrainVectorService(brain_id)
knowledge_service = KnowledgeService()
if delete_file: # TODO fix bug
brain_vector_service.delete_file_from_brain(
file_original_name, only_vectors=True
)
filter_file(
file=file_instance,
brain_id=brain_id,
original_file_name=file_original_name,
)
filter_file(
file=file_instance,
brain_id=brain_id,
original_file_name=file_original_name,
)
if notification_id:
notification_service.update_notification_by_id(
notification_id,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.SUCCESS,
description="Your file has been properly uploaded!",
),
)
if knowledge_id:
knowledge_service.update_status_knowledge(
knowledge_id, KnowledgeStatus.UPLOADED
)
brain_service.update_brain_last_update_time(brain_id)
if notification_id:
notification_service.update_notification_by_id(
notification_id,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.SUCCESS,
description="Your file has been properly uploaded!",
),
)
if knowledge_id:
knowledge_service.update_status_knowledge(
knowledge_id, KnowledgeStatus.UPLOADED
)
brain_service.update_brain_last_update_time(brain_id)
return True
except TimeoutError:
logger.error("TimeoutError")
self.retry()
return True
except Exception as e:
logger.exception(e)
self.retry()
except TimeoutError:
logger.error("TimeoutError")
except Exception as e:
logger.exception(e)
except MaxRetriesExceededError:
logger.error("MaxRetriesExceededError")
knowledge_service = KnowledgeService()
notification_service.update_notification_by_id(
notification_id,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.ERROR,
description=f"An error occurred while processing the file: {e}",
description=f"An error occurred while processing the file",
),
)
if knowledge_id: