feat: quivr api send notification (#2865)

# Description

- Quivr notification service. listens to celery events and updates
knowledge status.
- Process file notification will wait for retried tasks
This commit is contained in:
AmineDiro 2024-07-15 15:37:42 +02:00 committed by GitHub
parent c8dd70affc
commit a90a01ccf2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 160 additions and 75 deletions

View File

@ -25,6 +25,7 @@ NEXT_PUBLIC_AUTH_MODES=password
# BACKEND
########
LOG_LEVEL=INFO
SUPABASE_URL=http://host.docker.internal:54321
SUPABASE_SERVICE_KEY=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZS1kZW1vIiwicm9sZSI6InNlcnZpY2Vfcm9sZSIsImV4cCI6MTk4MzgxMjk5Nn0.EGIM96RAZx35lJzdJsyH-qQwv8Hdp7fsn3W0YpN81IU
PG_DATABASE_URL=postgresql://postgres:postgres@host.docker.internal:54322/postgres

2
.gitignore vendored
View File

@ -93,3 +93,5 @@ backend/core/examples/chatbot/.files/*
backend/core/examples/chatbot/.python-version
backend/core/examples/chatbot/.chainlit/config.toml
backend/core/examples/chatbot/.chainlit/translations/en-US.json
*.log

View File

@ -36,6 +36,8 @@ elif CELERY_BROKER_URL.startswith("redis"):
task_concurrency=4,
worker_prefetch_multiplier=2,
task_serializer="json",
result_extended=True,
task_send_sent_event=True,
)
else:
raise ValueError(f"Unsupported broker URL: {CELERY_BROKER_URL}")

View File

@ -0,0 +1,83 @@
from celery.result import AsyncResult
from quivr_api.celery_config import celery
from quivr_api.logger import get_logger
from quivr_api.modules.knowledge.dto.inputs import KnowledgeStatus
from quivr_api.modules.knowledge.service.knowledge_service import KnowledgeService
from quivr_api.modules.notification.dto.inputs import NotificationUpdatableProperties
from quivr_api.modules.notification.entity.notification import NotificationsStatusEnum
from quivr_api.modules.notification.service.notification_service import (
NotificationService,
)
logger = get_logger("notifier_service", "notifier_service.log")
notification_service = NotificationService()
knowledge_service = KnowledgeService()
def notifier(app):
state = app.events.State()
def handle_task_event(event):
try:
state.event(event)
task = state.tasks.get(event["uuid"])
task_result = AsyncResult(task.id, app=app)
task_name, task_kwargs = task_result.name, task_result.kwargs
if task_name == "process_file_and_notify":
notification_id = task_kwargs["notification_id"]
knowledge_id = task_kwargs["knowledge_id"]
if event["type"] == "task-failed":
logger.error(
f"task {task.id} process_file_and_notify {task_kwargs} failed. Sending notifition {notification_id}"
)
notification_service.update_notification_by_id(
notification_id,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.ERROR,
description=f"An error occurred while processing the file: {event['exception']}",
),
)
knowledge_service.update_status_knowledge(
knowledge_id, KnowledgeStatus.ERROR
)
logger.error(
f"task {task.id} process_file_and_notify {task_kwargs} failed. Updating knowledge {knowledge_id} to Error"
)
if event["type"] == "task-succeeded":
logger.info(
f"task {task.id} process_file_and_notify {task_kwargs} succeeded. Sending notification {notification_id}"
)
notification_service.update_notification_by_id(
notification_id,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.SUCCESS,
description="Your file has been properly uploaded!",
),
)
# TODO(@aminediro): implement retry if failure
knowledge_service.update_status_knowledge(
knowledge_id, KnowledgeStatus.UPLOADED
)
logger.info(
f"task {task.id} process_file_and_notify {task_kwargs} failed. Updating knowledge {knowledge_id} to UPLOADED"
)
except Exception as e:
logger.exception(f"handling event {event} raised exception: {e}")
with app.connection() as connection:
recv = app.events.Receiver(
connection,
handlers={
"task-failed": handle_task_event,
"task-succeeded": handle_task_event,
},
)
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == "__main__":
logger.info("Started quivr-notifier notification...")
notifier(celery)

View File

@ -3,9 +3,9 @@ from datetime import datetime, timedelta
from tempfile import NamedTemporaryFile
from uuid import UUID
from celery.exceptions import MaxRetriesExceededError
from celery.schedules import crontab
from pytz import timezone
from quivr_api.celery_config import celery
from quivr_api.logger import get_logger
from quivr_api.middlewares.auth.auth_bearer import AuthBearer
@ -14,8 +14,6 @@ from quivr_api.models.settings import get_supabase_client, get_supabase_db
from quivr_api.modules.brain.integrations.Notion.Notion_connector import NotionConnector
from quivr_api.modules.brain.service.brain_service import BrainService
from quivr_api.modules.brain.service.brain_vector_service import BrainVectorService
from quivr_api.modules.knowledge.dto.inputs import KnowledgeStatus
from quivr_api.modules.knowledge.service.knowledge_service import KnowledgeService
from quivr_api.modules.notification.dto.inputs import NotificationUpdatableProperties
from quivr_api.modules.notification.entity.notification import NotificationsStatusEnum
from quivr_api.modules.notification.service.notification_service import (
@ -36,93 +34,55 @@ auth_bearer = AuthBearer()
@celery.task(
bind=True,
retries=3,
default_retry_delay=1,
name="process_file_and_notify",
autoretry_for=(Exception,),
)
def process_file_and_notify(
self,
file_name: str,
file_original_name: str,
brain_id,
notification_id=None,
notification_id: UUID,
knowledge_id: UUID,
integration=None,
delete_file=False,
knowledge_id: UUID = None,
):
try:
try:
supabase_client = get_supabase_client()
tmp_name = file_name.replace("/", "_")
base_file_name = os.path.basename(file_name)
_, file_extension = os.path.splitext(base_file_name)
logger.debug(
f"process_file file_name={file_name}, knowledge_id={knowledge_id}, brain_id={brain_id}, notification_id={notification_id}"
)
supabase_client = get_supabase_client()
tmp_name = file_name.replace("/", "_")
base_file_name = os.path.basename(file_name)
_, file_extension = os.path.splitext(base_file_name)
with NamedTemporaryFile(
suffix="_" + tmp_name, # pyright: ignore reportPrivateUsage=none
) as tmp_file:
res = supabase_client.storage.from_("quivr").download(file_name)
tmp_file.write(res)
tmp_file.flush()
file_instance = File(
file_name=base_file_name,
tmp_file_path=tmp_file.name,
bytes_content=res,
file_size=len(res),
file_extension=file_extension,
)
brain_vector_service = BrainVectorService(brain_id)
knowledge_service = KnowledgeService()
if delete_file: # TODO fix bug
brain_vector_service.delete_file_from_brain(
file_original_name, only_vectors=True
)
filter_file(
file=file_instance,
brain_id=brain_id,
original_file_name=file_original_name,
)
if notification_id:
notification_service.update_notification_by_id(
notification_id,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.SUCCESS,
description="Your file has been properly uploaded!",
),
)
if knowledge_id:
knowledge_service.update_status_knowledge(
knowledge_id, KnowledgeStatus.UPLOADED
)
brain_service.update_brain_last_update_time(brain_id)
return True
except TimeoutError:
logger.error("TimeoutError")
self.retry()
except Exception as e:
logger.exception(e)
self.retry()
except MaxRetriesExceededError:
logger.error("MaxRetriesExceededError")
knowledge_service = KnowledgeService()
notification_service.update_notification_by_id(
notification_id,
NotificationUpdatableProperties(
status=NotificationsStatusEnum.ERROR,
description=f"An error occurred while processing the file",
),
with NamedTemporaryFile(
suffix="_" + tmp_name, # pyright: ignore reportPrivateUsage=none
) as tmp_file:
res = supabase_client.storage.from_("quivr").download(file_name)
tmp_file.write(res)
tmp_file.flush()
file_instance = File(
file_name=base_file_name,
tmp_file_path=tmp_file.name,
bytes_content=res,
file_size=len(res),
file_extension=file_extension,
)
if knowledge_id:
knowledge_service.update_status_knowledge(
knowledge_id, KnowledgeStatus.ERROR
brain_vector_service = BrainVectorService(brain_id)
if delete_file: # TODO fix bug
brain_vector_service.delete_file_from_brain(
file_original_name, only_vectors=True
)
filter_file(
file=file_instance,
brain_id=brain_id,
original_file_name=file_original_name,
)
brain_service.update_brain_last_update_time(brain_id)
@celery.task(name="process_crawl_and_notify")
def process_crawl_and_notify(

View File

@ -4,6 +4,7 @@ from typing import List, Optional
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from pydantic import BaseModel
from quivr_api.logger import get_logger
from quivr_api.models.databases.supabase.supabase import SupabaseDB
from quivr_api.models.settings import get_supabase_db
@ -82,7 +83,8 @@ class File(BaseModel):
brain_id (str): Brain id
"""
response = self.supabase_db.get_brain_vectors_by_brain_id_and_file_sha1(
brain_id, self.file_sha1 # type: ignore
brain_id,
self.file_sha1, # type: ignore
)
if len(response.data) == 0:

View File

@ -2,6 +2,7 @@ from typing import Optional
from uuid import UUID
from fastapi import APIRouter, Depends, Query, Request
from quivr_api.celery_worker import process_crawl_and_notify
from quivr_api.logger import get_logger
from quivr_api.middlewares.auth import AuthBearer, get_current_user

View File

@ -42,6 +42,24 @@ services:
ports:
- 6379:6379
notifier:
pull_policy: never
image: backend-base:latest
extra_hosts:
- "host.docker.internal:host-gateway"
env_file:
- .env
container_name: notifier
volumes:
- ./backend/:/code/
command:
- "python"
- "/code/api/quivr_api/celery_monitor.py"
restart: always
depends_on:
- redis
- worker
worker:
pull_policy: never
image: backend-base:latest

View File

@ -48,6 +48,22 @@ services:
ports:
- 5050:5050
notifier:
pull_policy: never
image: backend-base:latest
extra_hosts:
- "host.docker.internal:host-gateway"
env_file:
- .env
container_name: notifier
command:
- "python"
- "/code/api/quivr_api/celery_monitor.py"
restart: always
depends_on:
- redis
- worker
redis:
image: redis:latest@sha256:a7cee7c8178ff9b5297cb109e6240f5072cdaaafd775ce6b586c3c704b06458e
container_name: redis