mirror of
https://github.com/QuivrHQ/quivr.git
synced 2025-01-05 23:03:53 +03:00
2a25f442e5
This pull request adds a new telemetry ping task to the celery worker and main.py files. The ping task sends a ping message to the telemetry system.
227 lines
7.6 KiB
Python
227 lines
7.6 KiB
Python
import asyncio
|
|
import io
|
|
import os
|
|
from datetime import datetime, timezone
|
|
|
|
from celery.schedules import crontab
|
|
from celery_config import celery
|
|
from fastapi import UploadFile
|
|
from logger import get_logger
|
|
from models.files import File
|
|
from models.settings import get_supabase_client
|
|
from modules.brain.integrations.Notion.Notion_connector import NotionConnector
|
|
from modules.brain.repository.integration_brains import IntegrationBrain
|
|
from modules.brain.service.brain_service import BrainService
|
|
from modules.brain.service.brain_vector_service import BrainVectorService
|
|
from modules.notification.dto.inputs import NotificationUpdatableProperties
|
|
from modules.notification.entity.notification import NotificationsStatusEnum
|
|
from modules.notification.service.notification_service import NotificationService
|
|
from modules.onboarding.service.onboarding_service import OnboardingService
|
|
from packages.files.crawl.crawler import CrawlWebsite
|
|
from packages.files.parsers.github import process_github
|
|
from packages.files.processors import filter_file
|
|
from packages.utils.telemetry import maybe_send_telemetry
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
onboardingService = OnboardingService()
|
|
notification_service = NotificationService()
|
|
brain_service = BrainService()
|
|
|
|
|
|
@celery.task(name="process_file_and_notify")
|
|
def process_file_and_notify(
|
|
file_name: str,
|
|
file_original_name: str,
|
|
brain_id,
|
|
notification_id=None,
|
|
integration=None,
|
|
delete_file=False,
|
|
):
|
|
try:
|
|
supabase_client = get_supabase_client()
|
|
tmp_file_name = "tmp-file-" + file_name
|
|
tmp_file_name = tmp_file_name.replace("/", "_")
|
|
|
|
with open(tmp_file_name, "wb+") as f:
|
|
res = supabase_client.storage.from_("quivr").download(file_name)
|
|
f.write(res)
|
|
f.seek(0)
|
|
file_content = f.read()
|
|
|
|
upload_file = UploadFile(
|
|
file=f, filename=file_name.split("/")[-1], size=len(file_content)
|
|
)
|
|
|
|
file_instance = File(file=upload_file)
|
|
loop = asyncio.get_event_loop()
|
|
brain_vector_service = BrainVectorService(brain_id)
|
|
if delete_file: # TODO fix bug
|
|
brain_vector_service.delete_file_from_brain(
|
|
file_original_name, only_vectors=True
|
|
)
|
|
message = loop.run_until_complete(
|
|
filter_file(
|
|
file=file_instance,
|
|
brain_id=brain_id,
|
|
original_file_name=file_original_name,
|
|
)
|
|
)
|
|
|
|
f.close()
|
|
os.remove(tmp_file_name)
|
|
|
|
if notification_id:
|
|
notification_message = {
|
|
"status": message["type"],
|
|
"message": message["message"],
|
|
"name": file_instance.file.filename if file_instance.file else "",
|
|
}
|
|
notification_service.update_notification_by_id(
|
|
notification_id,
|
|
NotificationUpdatableProperties(
|
|
status=NotificationsStatusEnum.Done,
|
|
message=str(notification_message),
|
|
),
|
|
)
|
|
brain_service.update_brain_last_update_time(brain_id)
|
|
|
|
return True
|
|
except TimeoutError:
|
|
logger.error("TimeoutError")
|
|
|
|
except Exception as e:
|
|
notification_message = {
|
|
"status": "error",
|
|
"message": "There was an error uploading the file. Please check the file and try again. If the issue persist, please open an issue on Github",
|
|
"name": file_instance.file.filename if file_instance.file else "",
|
|
}
|
|
notification_service.update_notification_by_id(
|
|
notification_id,
|
|
NotificationUpdatableProperties(
|
|
status=NotificationsStatusEnum.Done,
|
|
message=str(notification_message),
|
|
),
|
|
)
|
|
raise e
|
|
|
|
|
|
@celery.task(name="process_crawl_and_notify")
|
|
def process_crawl_and_notify(
|
|
crawl_website_url,
|
|
brain_id,
|
|
notification_id=None,
|
|
):
|
|
crawl_website = CrawlWebsite(url=crawl_website_url)
|
|
|
|
if not crawl_website.checkGithub():
|
|
file_path, file_name = crawl_website.process()
|
|
|
|
with open(file_path, "rb") as f:
|
|
file_content = f.read()
|
|
|
|
# Create a file-like object in memory using BytesIO
|
|
file_object = io.BytesIO(file_content)
|
|
upload_file = UploadFile(
|
|
file=file_object, filename=file_name, size=len(file_content)
|
|
)
|
|
file_instance = File(file=upload_file)
|
|
|
|
loop = asyncio.get_event_loop()
|
|
message = loop.run_until_complete(
|
|
filter_file(
|
|
file=file_instance,
|
|
brain_id=brain_id,
|
|
original_file_name=crawl_website_url,
|
|
)
|
|
)
|
|
else:
|
|
loop = asyncio.get_event_loop()
|
|
message = loop.run_until_complete(
|
|
process_github(
|
|
repo=crawl_website.url,
|
|
brain_id=brain_id,
|
|
)
|
|
)
|
|
|
|
if notification_id:
|
|
notification_message = {
|
|
"status": message["type"],
|
|
"message": message["message"],
|
|
"name": crawl_website_url,
|
|
}
|
|
notification_service.update_notification_by_id(
|
|
notification_id,
|
|
NotificationUpdatableProperties(
|
|
status=NotificationsStatusEnum.Done,
|
|
message=str(notification_message),
|
|
),
|
|
)
|
|
brain_service.update_brain_last_update_time(brain_id)
|
|
return True
|
|
|
|
|
|
@celery.task
|
|
def remove_onboarding_more_than_x_days_task():
|
|
onboardingService.remove_onboarding_more_than_x_days(7)
|
|
|
|
|
|
@celery.task(name="NotionConnectorLoad")
|
|
def process_integration_brain_created_initial_load(brain_id, user_id):
|
|
notion_connector = NotionConnector(brain_id=brain_id, user_id=user_id)
|
|
|
|
pages = notion_connector.load()
|
|
|
|
print("pages: ", len(pages))
|
|
|
|
|
|
@celery.task
|
|
def process_integration_brain_sync_user_brain(brain_id, user_id):
|
|
notion_connector = NotionConnector(brain_id=brain_id, user_id=user_id)
|
|
|
|
notion_connector.poll()
|
|
|
|
|
|
@celery.task
|
|
def ping_telemetry():
|
|
maybe_send_telemetry("ping", {"ping": "pong"})
|
|
|
|
|
|
@celery.task
|
|
def process_integration_brain_sync():
|
|
integration = IntegrationBrain()
|
|
integrations = integration.get_integration_brain_by_type_integration("notion")
|
|
|
|
time = datetime.now(timezone.utc) # Make `time` timezone-aware
|
|
# last_synced is a string that represents a timestampz in the database
|
|
# only call process_integration_brain_sync_user_brain if more than 1 day has passed since the last sync
|
|
if not integrations:
|
|
return
|
|
# TODO fix this
|
|
# for integration in integrations:
|
|
# print(f"last_synced: {integration.last_synced}")
|
|
# print(f"Integration Name: {integration.name}")
|
|
# last_synced = datetime.strptime(
|
|
# integration.last_synced, "%Y-%m-%dT%H:%M:%S.%f%z"
|
|
# )
|
|
# if last_synced < time - timedelta(hours=12) and integration.name == "notion":
|
|
# process_integration_brain_sync_user_brain.delay(
|
|
# brain_id=integration.brain_id, user_id=integration.user_id
|
|
# )
|
|
|
|
|
|
celery.conf.beat_schedule = {
|
|
"remove_onboarding_more_than_x_days_task": {
|
|
"task": f"{__name__}.remove_onboarding_more_than_x_days_task",
|
|
"schedule": crontab(minute="0", hour="0"),
|
|
},
|
|
"process_integration_brain_sync": {
|
|
"task": f"{__name__}.process_integration_brain_sync",
|
|
"schedule": crontab(minute="*/5", hour="*"),
|
|
},
|
|
"ping_telemetry": {
|
|
"task": f"{__name__}.ping_telemetry",
|
|
"schedule": crontab(minute="*/30", hour="*"),
|
|
},
|
|
}
|