diff --git a/.aws/task_definition_preview_beat.json b/.aws/task_definition_preview_beat.json new file mode 100644 index 000000000..685b5d8fc --- /dev/null +++ b/.aws/task_definition_preview_beat.json @@ -0,0 +1,83 @@ +{ + "taskDefinitionArn": "arn:aws:ecs:eu-west-3:253053805092:task-definition/quivr-preview-beat:1", + "containerDefinitions": [ + { + "name": "quivr-beat", + "image": "253053805092.dkr.ecr.eu-west-3.amazonaws.com/quivr:600ff1ede02741c66853cc3e4e7f5001aaba3bc2", + "cpu": "2048", + "memory": "4096", + "essential": true, + "command": ["celery", "-A", "celery_worker", "beat", "-l", "info"], + + "environment": [], + "environmentFiles": [ + { + "value": "arn:aws:s3:::quivr-env-variables/preview.env", + "type": "s3" + } + ], + "mountPoints": [], + "volumesFrom": [], + "logConfiguration": { + "logDriver": "awslogs", + "options": { + "awslogs-create-group": "true", + "awslogs-group": "/ecs/quivr-preview-beat", + "awslogs-region": "eu-west-3", + "awslogs-stream-prefix": "ecs" + } + } + } + ], + "family": "quivr-preview-beat", + "taskRoleArn": "arn:aws:iam::253053805092:role/ecsTaskExecutionRole", + "executionRoleArn": "arn:aws:iam::253053805092:role/ecsTaskExecutionRole", + "networkMode": "awsvpc", + "revision": 1, + "volumes": [], + "status": "ACTIVE", + "requiresAttributes": [ + { + "name": "com.amazonaws.ecs.capability.logging-driver.awslogs" + }, + { + "name": "ecs.capability.execution-role-awslogs" + }, + { + "name": "com.amazonaws.ecs.capability.ecr-auth" + }, + { + "name": "com.amazonaws.ecs.capability.docker-remote-api.1.19" + }, + { + "name": "ecs.capability.env-files.s3" + }, + { + "name": "com.amazonaws.ecs.capability.task-iam-role" + }, + { + "name": "ecs.capability.execution-role-ecr-pull" + }, + { + "name": "com.amazonaws.ecs.capability.docker-remote-api.1.18" + }, + { + "name": "ecs.capability.task-eni" + }, + { + "name": "com.amazonaws.ecs.capability.docker-remote-api.1.29" + } + ], + "placementConstraints": [], + "compatibilities": ["EC2", "FARGATE"], + "requiresCompatibilities": ["FARGATE"], + "cpu": "2048", + "memory": "4096", + "runtimePlatform": { + "cpuArchitecture": "X86_64", + "operatingSystemFamily": "LINUX" + }, + "registeredAt": "2023-08-18T09:01:56.187Z", + "registeredBy": "arn:aws:iam::253053805092:root", + "tags": [] +} diff --git a/.github/workflows/aws-preview.yml b/.github/workflows/aws-preview.yml index 8d583975a..fc208846d 100644 --- a/.github/workflows/aws-preview.yml +++ b/.github/workflows/aws-preview.yml @@ -85,6 +85,10 @@ jobs: service: "preview-worker" task_definition: ".aws/task_definition_preview_worker.json" container: "quivr-chat" + - name: "quivr-beat" + service: "preview-beat" + task_definition: ".aws/task_definition_preview_beat.json" + container: "quivr-beat" steps: - name: Checkout diff --git a/backend/.dockerignore b/backend/.dockerignore index 0bd3f40a9..9020943f7 100644 --- a/backend/.dockerignore +++ b/backend/.dockerignore @@ -7,4 +7,5 @@ **/.next/ **/build/ **/.docusaurus/ -**/node_modules/ \ No newline at end of file +**/node_modules/ +**/.venv/ \ No newline at end of file diff --git a/backend/celery_worker.py b/backend/celery_worker.py index ae1d68602..7d51c0811 100644 --- a/backend/celery_worker.py +++ b/backend/celery_worker.py @@ -1,6 +1,7 @@ import asyncio import io import os +from datetime import datetime, timedelta, timezone from celery.schedules import crontab from celery_config import celery @@ -9,7 +10,9 @@ from logger import get_logger from models.files import File from models.settings import get_supabase_client from modules.brain.integrations.Notion.Notion_connector import NotionConnector +from modules.brain.repository.integration_brains import IntegrationBrain from modules.brain.service.brain_service import BrainService +from modules.brain.service.brain_vector_service import BrainVectorService from modules.notification.dto.inputs import NotificationUpdatableProperties from modules.notification.entity.notification import NotificationsStatusEnum from modules.notification.service.notification_service import NotificationService @@ -32,6 +35,7 @@ def process_file_and_notify( brain_id, notification_id=None, integration=None, + delete_file=False, ): try: supabase_client = get_supabase_client() @@ -50,6 +54,11 @@ def process_file_and_notify( file_instance = File(file=upload_file) loop = asyncio.get_event_loop() + brain_vector_service = BrainVectorService(brain_id) + if delete_file: # TODO fix bug + brain_vector_service.delete_file_from_brain( + file_original_name, only_vectors=True + ) message = loop.run_until_complete( filter_file( file=file_instance, @@ -156,18 +165,48 @@ def remove_onboarding_more_than_x_days_task(): onboardingService.remove_onboarding_more_than_x_days(7) +@celery.task(name="NotionConnectorLoad") +def process_integration_brain_created_initial_load(brain_id, user_id): + notion_connector = NotionConnector(brain_id=brain_id, user_id=user_id) + + pages = notion_connector.load() + + print("pages: ", len(pages)) + + +@celery.task +def process_integration_brain_sync_user_brain(brain_id, user_id): + notion_connector = NotionConnector(brain_id=brain_id, user_id=user_id) + + notion_connector.poll() + + +@celery.task +def process_integration_brain_sync(): + integration = IntegrationBrain() + integrations = integration.get_integration_brain_by_type_integration("notion") + + time = datetime.now(timezone.utc) # Make `time` timezone-aware + # last_synced is a string that represents a timestampz in the database + # only call process_integration_brain_sync_user_brain if more than 1 day has passed since the last sync + for integration in integrations: + print(f"last_synced: {integration.last_synced}") # Add this line + last_synced = datetime.strptime( + integration.last_synced, "%Y-%m-%dT%H:%M:%S.%f%z" + ) + if last_synced < time - timedelta(hours=12): + process_integration_brain_sync_user_brain.delay( + brain_id=integration.brain_id, user_id=integration.user_id + ) + + celery.conf.beat_schedule = { "remove_onboarding_more_than_x_days_task": { "task": f"{__name__}.remove_onboarding_more_than_x_days_task", "schedule": crontab(minute="0", hour="0"), }, + "process_integration_brain_sync": { + "task": f"{__name__}.process_integration_brain_sync", + "schedule": crontab(minute="*/5", hour="*"), + }, } - - -@celery.task(name="NotionConnectorLoad") -def process_integration_brain_created_initial_load(brain_id, user_id): - notion_connector = NotionConnector(brain_id=brain_id, user_id=user_id) - - pages = notion_connector.compile_all_pages() - - print("pages: ", len(pages)) diff --git a/backend/modules/brain/entity/integration_brain.py b/backend/modules/brain/entity/integration_brain.py index 3a9a364e0..a1df9b9f6 100644 --- a/backend/modules/brain/entity/integration_brain.py +++ b/backend/modules/brain/entity/integration_brain.py @@ -27,3 +27,4 @@ class IntegrationEntity(BaseModel): integration_id: str settings: Optional[dict] = None credentials: Optional[dict] = None + last_synced: str diff --git a/backend/modules/brain/integrations/Notion/Notion_connector.py b/backend/modules/brain/integrations/Notion/Notion_connector.py index de144289f..243687980 100644 --- a/backend/modules/brain/integrations/Notion/Notion_connector.py +++ b/backend/modules/brain/integrations/Notion/Notion_connector.py @@ -1,5 +1,6 @@ import os import tempfile +import time from io import BytesIO from typing import Any, List, Optional @@ -8,7 +9,7 @@ from celery_config import celery from fastapi import UploadFile from logger import get_logger from modules.brain.entity.integration_brain import IntegrationEntity -from modules.brain.repository.integration_brains import IntegrationBrain +from modules.brain.repository.integration_brains import Integration, IntegrationBrain from modules.knowledge.dto.inputs import CreateKnowledgeProperties from modules.knowledge.repository.knowledge_interface import KnowledgeInterface from modules.knowledge.service.knowledge_service import KnowledgeService @@ -37,7 +38,7 @@ class NotionSearchResponse(BaseModel): has_more: bool = False -class NotionConnector(IntegrationBrain): +class NotionConnector(IntegrationBrain, Integration): """A class to interact with the Notion API""" credentials: dict[str, str] = None @@ -219,6 +220,24 @@ class NotionConnector(IntegrationBrain): page_url = self._read_page_url(page) return page_title, page_content, child_pages, page_url + def _filter_pages_by_time( + self, + pages: list[dict[str, Any]], + start: str, + filter_field: str = "last_edited_time", + ) -> list[NotionPage]: + filtered_pages: list[NotionPage] = [] + start_time = time.mktime( + time.strptime(start, "%Y-%m-%dT%H:%M:%S.%f%z") + ) # Convert `start` to a float + for page in pages: + compare_time = time.mktime( + time.strptime(page[filter_field], "%Y-%m-%dT%H:%M:%S.%f%z") + ) + if compare_time > start_time: # Compare `compare_time` with `start_time` + filtered_pages += [NotionPage(**page)] + return filtered_pages + def get_all_pages(self) -> list[NotionPage]: """ Get all the pages from Notion. @@ -248,6 +267,7 @@ class NotionConnector(IntegrationBrain): """ Add a file to the knowledge base """ + logger.info(f"Adding file to knowledge: {page_name}") filename_with_brain_id = ( str(self.brain_id) + "/" + str(page_name) + "_notion.txt" ) @@ -269,7 +289,9 @@ class NotionConnector(IntegrationBrain): temp_file_path = temp_file.name # Upload the temporary file to the knowledge base - response = upload_file_storage(temp_file_path, filename_with_brain_id) + response = upload_file_storage( + temp_file_path, filename_with_brain_id, "true" + ) logger.info(f"File {response} uploaded successfully") # Delete the temporary file @@ -292,12 +314,13 @@ class NotionConnector(IntegrationBrain): "file_name": filename_with_brain_id, "file_original_name": page_name + "_notion.txt", "brain_id": self.brain_id, + "delete_file": True, }, ) except Exception: logger.error("Error adding knowledge") - def compile_all_pages(self): + def load(self): """ Get all the pages, blocks, databases from Notion into a single document per page """ @@ -316,18 +339,52 @@ class NotionConnector(IntegrationBrain): self.add_file_to_knowledge(page_content, page_title, page_url) return documents + def poll(self): + """ + Update all the brains with the latest data from Notion + """ + integration = self.get_integration_brain(self.brain_id, self.user_id) + last_synced = integration.last_synced + + query_dict = { + "page_size": self.max_pages, + "sort": {"timestamp": "last_edited_time", "direction": "descending"}, + "filter": {"property": "object", "value": "page"}, + } + documents = [] + + while True: + db_res = self._search_notion(query_dict) + pages = self._filter_pages_by_time( + db_res.results, last_synced, filter_field="last_edited_time" + ) + for page in pages: + logger.info(f"Reading page: {page.id}") + page_title, page_content, child_pages, page_url = self._read_page( + page.id + ) + document = { + "page_title": page_title, + "page_content": page_content, + "child_pages": child_pages, + "page_url": page_url, + } + documents.append(document) + self.add_file_to_knowledge(page_content, page_title, page_url) + if not db_res.has_more: + break + query_dict["start_cursor"] = db_res.next_cursor + logger.info( + f"last Synced: {self.update_last_synced(self.brain_id, self.user_id)}" + ) + return documents + if __name__ == "__main__": notion = NotionConnector( - brain_id="b3ab23c5-9e13-4dd8-8883-106d613e3de8", + brain_id="73f7d092-d596-4fd0-b24f-24031e9b53cd", user_id="39418e3b-0258-4452-af60-7acfcc1263ff", ) - celery.send_task( - "NotionConnectorLoad", - kwargs={ - "brain_id": "b3ab23c5-9e13-4dd8-8883-106d613e3de8", - "user_id": "39418e3b-0258-4452-af60-7acfcc1263ff", - }, - ) + print(notion.poll()) diff --git a/backend/modules/brain/knowledge_brain_qa.py b/backend/modules/brain/knowledge_brain_qa.py index aef2fe072..a7fd7a679 100644 --- a/backend/modules/brain/knowledge_brain_qa.py +++ b/backend/modules/brain/knowledge_brain_qa.py @@ -86,9 +86,11 @@ def generate_source(source_documents, brain_id): if file_path in generated_urls: source_url = generated_urls[file_path] else: - source_url = generate_file_signed_url(file_path).get( - "signedURL", "" - ) + generated_url = generate_file_signed_url(file_path) + if generated_url is not None: + source_url = generated_url.get("signedURL", "") + else: + source_url = "" # Store the generated URL generated_urls[file_path] = source_url diff --git a/backend/modules/brain/repository/brains_vectors.py b/backend/modules/brain/repository/brains_vectors.py index 984c2ab68..a937652a1 100644 --- a/backend/modules/brain/repository/brains_vectors.py +++ b/backend/modules/brain/repository/brains_vectors.py @@ -57,6 +57,7 @@ class BrainsVectors(BrainsVectorsInterface): def delete_file_from_brain(self, brain_id, file_name: str): # First, get the vector_ids associated with the file_name + # TODO: filter by brain_id file_vectors = ( self.db.table("vectors") .select("id") diff --git a/backend/modules/brain/repository/integration_brains.py b/backend/modules/brain/repository/integration_brains.py index 6da928b7a..c13d119f6 100644 --- a/backend/modules/brain/repository/integration_brains.py +++ b/backend/modules/brain/repository/integration_brains.py @@ -1,3 +1,6 @@ +from abc import ABC, abstractmethod +from typing import List + from models.settings import get_supabase_client from modules.brain.entity.integration_brain import ( IntegrationDescriptionEntity, @@ -9,6 +12,17 @@ from modules.brain.repository.interfaces.integration_brains_interface import ( ) +class Integration(ABC): + + @abstractmethod + def load(self): + pass + + @abstractmethod + def poll(self): + pass + + class IntegrationBrain(IntegrationBrainInterface): """This is all the methods to interact with the integration brain. @@ -32,6 +46,18 @@ class IntegrationBrain(IntegrationBrainInterface): return IntegrationEntity(**response.data[0]) + def update_last_synced(self, brain_id, user_id): + response = ( + self.db.table("integrations_user") + .update({"last_synced": "now()"}) + .filter("brain_id", "eq", str(brain_id)) + .filter("user_id", "eq", str(user_id)) + .execute() + ) + if len(response.data) == 0: + return None + return IntegrationEntity(**response.data[0]) + def add_integration_brain(self, brain_id, user_id, integration_id, settings): response = ( @@ -70,6 +96,20 @@ class IntegrationBrain(IntegrationBrainInterface): ).filter("user_id", "eq", str(user_id)).execute() return None + def get_integration_brain_by_type_integration( + self, integration_name + ) -> List[IntegrationEntity]: + response = ( + self.db.table("integrations_user") + .select("*, integrations ()") + .filter("integrations.integration_name", "eq", integration_name) + .execute() + ) + if len(response.data) == 0: + return None + + return [IntegrationEntity(**data) for data in response.data] + class IntegrationDescription(IntegrationDescriptionInterface): diff --git a/backend/modules/brain/service/brain_vector_service.py b/backend/modules/brain/service/brain_vector_service.py index 261852785..3617e50d4 100644 --- a/backend/modules/brain/service/brain_vector_service.py +++ b/backend/modules/brain/service/brain_vector_service.py @@ -44,10 +44,11 @@ class BrainVectorService: return self.files - def delete_file_from_brain(self, file_name: str): + def delete_file_from_brain(self, file_name: str, only_vectors: bool = False): file_name_with_brain_id = f"{self.id}/{file_name}" storage = Storage() - storage.remove_file(file_name_with_brain_id) + if not only_vectors: + storage.remove_file(file_name_with_brain_id) return self.repository.delete_file_from_brain(self.id, file_name) # type: ignore def delete_file_url_from_brain(self, file_name: str): diff --git a/backend/modules/knowledge/repository/knowledges.py b/backend/modules/knowledge/repository/knowledges.py index 8623bb02a..35082aa47 100644 --- a/backend/modules/knowledge/repository/knowledges.py +++ b/backend/modules/knowledge/repository/knowledges.py @@ -14,6 +14,18 @@ class Knowledges(KnowledgeInterface): """ Add a knowledge """ + # Check if the knowledge already exists + knowledge_exists = ( + self.db.from_("knowledge") + .select("*") + .filter("brain_id", "eq", knowledge.brain_id) + .filter("file_name", "eq", knowledge.file_name) + .execute() + ).data + + if knowledge_exists: + return Knowledge(**knowledge_exists[0]) # TODO fix this + response = (self.db.from_("knowledge").insert(knowledge.dict()).execute()).data return Knowledge(**response[0]) diff --git a/backend/repository/files/upload_file.py b/backend/repository/files/upload_file.py index 19eb58c77..98bc8d1b6 100644 --- a/backend/repository/files/upload_file.py +++ b/backend/repository/files/upload_file.py @@ -36,7 +36,7 @@ mime_types = { } -def upload_file_storage(file, file_identifier: str): +def upload_file_storage(file, file_identifier: str, upsert: str = "false"): supabase_client: Client = get_supabase_client() response = None @@ -48,13 +48,29 @@ def upload_file_storage(file, file_identifier: str): mime_type = mime_types.get(file_extension, "text/html") response = supabase_client.storage.from_("quivr").upload( - file_identifier, file, file_options={"content-type": mime_type} + file_identifier, + file, + file_options={ + "content-type": mime_type, + "upsert": upsert, + "cache-control": "3600", + }, ) return response except Exception as e: - logger.error(e) - raise e + if "The resource already exists" in str(e) and upsert == "true": + response = supabase_client.storage.from_("quivr").update( + file_identifier, + file, + file_options={ + "content-type": mime_type, + "upsert": upsert, + "cache-control": "3600", + }, + ) + else: + raise e class DocumentSerializable(Document):