tests: added new way of testing less prone to errors

This commit is contained in:
Stan Girard 2024-04-15 18:22:21 +02:00
parent 38589d32cf
commit 861931f231
21 changed files with 168 additions and 1166 deletions

24
.vscode/launch.json vendored
View File

@ -1,24 +0,0 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Remote Attach",
"type": "python",
"request": "attach",
"connect": {
"host": "localhost",
"port": 5678
},
"pathMappings": [
{
"localRoot": "${workspaceFolder}/backend",
"remoteRoot": "."
}
],
"justMyCode": true
}
]
}

View File

@ -48,9 +48,12 @@
"python.linting.flake8CategorySeverity.W": "Error",
"json.sortOnSave.enable": true,
"python.testing.pytestArgs": [
"backend"
"-v",
"--color=yes",
"--envfile=backend/tests/.env_test",
"backend/",
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python.envFile": "${workspaceFolder}/.env_test",
"python.testing.autoTestDiscoverOnSaveEnabled": true,
}

View File

@ -57,6 +57,7 @@ llama-index = "*"
lxml = {extras = ["html_clean"], version = "*"}
ragas = "*"
datasets = "*"
pytest-dotenv = "*"
[dev-packages]
black = "*"

12
Pipfile.lock generated
View File

@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "0f19a2edadec711ea3586769d93663f24ddcae77a66f0f5bd4cb7c4ea43b59f1"
"sha256": "713d422b6a90d73c6a421cec8cbfcbe2c8a657a0afa20e6adcc75be1b28ad9ee"
},
"pipfile-spec": 6,
"requires": {
@ -2206,7 +2206,7 @@
"sha256:f870204a840a60da0b12273ef34f7051e98c3b5961b61b0c2c1be6dfd64fbcd3",
"sha256:ffa75af20b44f8dba823498024771d5ac50620e6915abac414251bd971b4529f"
],
"markers": "python_version >= '3.9'",
"markers": "python_version >= '3.10'",
"version": "==1.26.4"
},
"olefile": {
@ -3177,6 +3177,14 @@
"markers": "python_full_version >= '3.7.0' and python_full_version < '4.0.0'",
"version": "==3.1.3"
},
"pytest-dotenv": {
"hashes": [
"sha256:2dc6c3ac6d8764c71c6d2804e902d0ff810fa19692e95fe138aefc9b1aa73732",
"sha256:40a2cece120a213898afaa5407673f6bd924b1fa7eafce6bda0e8abffe2f710f"
],
"index": "pypi",
"version": "==0.5.2"
},
"pytest-mock": {
"hashes": [
"sha256:0b72c38033392a5f4621342fe11e9219ac11ec9d375f8e2a0c164539e0d70f6f",

View File

@ -1,57 +0,0 @@
import os
import socket
import pytest
from dotenv import load_dotenv
from fastapi.testclient import TestClient
@pytest.fixture(scope="session", autouse=True)
def load_env():
load_dotenv(".env_test", verbose=True, override=True)
# Testing socket connection
host, port = "localhost", 54321
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.connect((host, port))
print(f"Connection to {host} on port {port} succeeded.")
except socket.error as e:
print(f"Connection to {host} on port {port} failed: {e}")
print("Loaded SUPABASE_URL:", os.getenv("SUPABASE_URL"))
@pytest.fixture(scope="session", autouse=True)
def verify_env_variables():
required_vars = [
"SUPABASE_URL",
"SUPABASE_SERVICE_KEY",
"OPENAI_API_KEY",
"JWT_SECRET_KEY",
"CELERY_BROKER_URL",
]
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
missing_vars_str = ", ".join(missing_vars)
pytest.fail(f"Required environment variables are missing: {missing_vars_str}")
@pytest.fixture(scope="session")
def client():
from main import app
print("CLIENT_SUPABASE_URL:", os.getenv("SUPABASE_URL")) # For debugging
return TestClient(app)
@pytest.fixture(scope="session")
def api_key():
API_KEY = os.getenv("CI_TEST_API_KEY")
if not API_KEY:
raise ValueError(
"CI_TEST_API_KEY environment variable not set. Cannot run tests."
)
return API_KEY

View File

@ -1,10 +0,0 @@
from llm.knowledge_brain_qa import generate_source
def test_generate_source_no_documents():
result = {"source_documents": []}
brain = {"brain_id": "123"}
sources = generate_source(result, brain)
assert sources == []

View File

@ -1,83 +0,0 @@
from modules.api_key.entity.api_key import ApiKey
from modules.api_key.service.api_key_service import ApiKeys
APIKeyService = ApiKeys()
def test_read_main(client, api_key):
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"status": "OK"}
def test_create_and_delete_api_key(client, api_key):
# First, let's create an API key
response = client.post(
"/api-key",
headers={
"Authorization": "Bearer " + api_key,
},
)
assert response.status_code == 200
api_key_info = response.json()
assert "api_key" in api_key_info
# Extract the created api_key from the response
api_key = api_key_info["api_key"]
# Now, let's verify the API key
verify_response = client.get(
"/user",
headers={
"Authorization": f"Bearer {api_key}",
},
)
assert verify_response.status_code == 200
# Now, let's delete the API key
assert "key_id" in api_key_info
key_id = api_key_info["key_id"]
delete_response = client.delete(
f"/api-key/{key_id}", headers={"Authorization": f"Bearer {api_key}"}
)
assert delete_response.status_code == 200
assert delete_response.json() == {"message": "API key deleted."}
def test_api_key_model():
api_key_data = {
"api_key": "1234567890",
"key_id": "abcd1234",
"days": 7,
"only_chat": False,
"name": "Test API Key",
"creation_time": "2022-01-01T00:00:00Z",
"is_active": True,
}
api_key = ApiKey(**api_key_data)
assert api_key.api_key == "1234567890"
assert api_key.key_id == "abcd1234"
assert api_key.days == 7
assert api_key.only_chat is False
assert api_key.name == "Test API Key"
assert api_key.creation_time == "2022-01-01T00:00:00Z"
assert api_key.is_active is True
def test_get_user_from_api_key(client, api_key):
# Call the function with a test API key
user = APIKeyService.get_user_id_by_api_key(api_key)
# Use an assertion to check the returned user
assert user is not None, "User should not be None"
def test_verify_api_key(client, api_key):
# Call the function with a test API key
user = APIKeyService.get_user_id_by_api_key(api_key).data[0]["user_id"]
user_api_keys = APIKeyService.get_user_api_keys(user)
# Use an assertion to check the returned user
assert user_api_keys is not None, "User should not be None"
assert len(user_api_keys) > 0, "User should have at least one API key"

View File

@ -1,34 +0,0 @@
from unittest.mock import Mock
from uuid import UUID
from modules.brain.entity.brain_entity import BrainEntity
from modules.brain.service.brain_service import BrainService
def test_find_brain_from_question_with_history_and_brain_id():
brain_service = BrainService()
user = Mock()
user.id = 1
chat_id = UUID("12345678123456781234567812345678")
question = "What is the meaning of life?"
brain_id = UUID("87654321876543218765432187654321")
history = [
{
"user_message": "What is AI?",
"brain_id": UUID("87654321876543218765432187654321"),
}
]
vector_store = Mock()
vector_store.find_brain_closest_query.return_value = []
brain_entity_mock = Mock(spec=BrainEntity) # Create a mock BrainEntity
brain_service.get_brain_by_id = Mock(
return_value=brain_entity_mock
) # Mock the get_brain_by_id method
brain_to_use, metadata = brain_service.find_brain_from_question(
brain_id, question, user, chat_id, history, vector_store
)
assert isinstance(brain_to_use, BrainEntity)
assert "close_brains" in metadata

View File

@ -1,262 +0,0 @@
import random
import string
from modules.brain.service.brain_user_service import BrainUserService
brain_user_service = BrainUserService()
def test_create_brain(client, api_key):
# Generate a random name for the brain
random_brain_name = "".join(
random.choices(string.ascii_letters + string.digits, k=10)
)
# Set up the request payload
payload = {
"name": random_brain_name,
"status": "public",
"model": "gpt-3.5-turbo-0125",
"temperature": 0,
"max_tokens": 2000,
"brain_type": "doc",
}
# Making a POST request to the /brains/ endpoint
response = client.post(
"/brains/",
json=payload,
headers={"Authorization": "Bearer " + api_key},
)
# Assert that the response status code is 200 (HTTP OK)
assert response.status_code == 200
# Optionally, assert on specific fields in the response
response_data = response.json()
# e.g., assert that the response contains a 'brain_id' field
assert "id" in response_data
assert "name" in response_data
# Optionally, assert that the returned 'name' matches the one sent in the request
assert response_data["name"] == payload["name"]
def test_retrieve_all_brains(client, api_key):
# Making a GET request to the /brains/ endpoint to retrieve all brains for the current user
response = client.get(
"/brains/",
headers={"Authorization": "Bearer " + api_key},
)
# Assert that the response status code is 200 (HTTP OK)
assert response.status_code == 200
response_data = response.json()
# Optionally, you can loop through the brains and assert on specific fields in each brain
for brain in response_data["brains"]:
assert "id" in brain
assert "name" in brain
def test_retrieve_one_brain(client, api_key):
# Making a GET request to the /brains/default/ endpoint
response = client.get(
"/brains/default/",
headers={"Authorization": "Bearer " + api_key},
)
# Assert that the response status code is 200 (HTTP OK)
assert response.status_code == 200
response_data = response.json()
# Extract the brain_id from the response
brain_id = response_data["id"]
# Making a GET request to the /brains/{brain_id}/ endpoint
response = client.get(
f"/brains/{brain_id}/",
headers={"Authorization": "Bearer " + api_key},
)
# Assert that the response status code is 200 (HTTP OK)
assert response.status_code == 200
brain = response.json()
assert "id" in brain
assert "name" in brain
def test_delete_all_brains(client, api_key):
# First, retrieve all brains for the current user
response = client.get(
"/brains/",
headers={"Authorization": "Bearer " + api_key},
)
# Assert that the response status code is 200 (HTTP OK)
assert response.status_code == 200
response_data = response.json()
# Loop through each brain and send a DELETE request
for brain in response_data["brains"]:
brain_id = brain["id"]
# Send a DELETE request to delete the specific brain
delete_response = client.delete(
f"/brains/{brain_id}/subscription",
headers={"Authorization": "Bearer " + api_key},
)
# Assert that the DELETE response status code is 200 (HTTP OK)
assert delete_response.status_code == 200
# Finally, retrieve all brains for the current user
response = client.get(
"/brains/",
headers={"Authorization": "Bearer " + api_key},
)
# Assert that the response status code is 200 (HTTP OK)
assert response.status_code == 200
response_data = response.json()
assert len(response_data["brains"]) == 0
def test_delete_all_brains_and_get_default_brain(client, api_key):
# First create a new brain
test_create_brain(client, api_key)
# Now, retrieve all brains for the current user
response = client.get(
"/brains/",
headers={"Authorization": "Bearer " + api_key},
)
# Assert that the response status code is 200 (HTTP OK)
assert response.status_code == 200
assert len(response.json()["brains"]) > 0
test_delete_all_brains(client, api_key)
# Get the default brain, it should create one if it doesn't exist
response = client.get(
"/brains/default/",
headers={"Authorization": "Bearer " + api_key},
)
# Assert that the response status code is 200 (HTTP OK)
assert response.status_code == 200
assert response.json()["name"] == "Default brain"
def test_set_as_default_brain_endpoint(client, api_key):
random_brain_name = "".join(
random.choices(string.ascii_letters + string.digits, k=10)
)
# Set up the request payload
payload = {
"name": random_brain_name,
"status": "public",
"model": "gpt-3.5-turbo-0125",
"temperature": 0,
"max_tokens": 256,
}
# Making a POST request to the /brains/ endpoint
response = client.post(
"/brains/",
json=payload,
headers={"Authorization": "Bearer " + api_key},
)
response_data = response.json()
brain_id = response_data["id"]
# Make a POST request to set the brain as default for the user
response = client.post(
f"/brains/{brain_id}/default",
headers={"Authorization": "Bearer " + api_key},
)
# Assert that the response status code is 200 (HTTP OK)
assert response.status_code == 200
# Assert the response message
assert response.json() == {
"message": f"Brain {brain_id} has been set as default brain."
}
# Check if the brain is now the default for the user
# Send a request to get user information
response = client.get("/user", headers={"Authorization": "Bearer " + api_key})
# Assert that the response contains the expected fields
user_info = response.json()
user_id = user_info["id"]
default_brain = brain_user_service.get_user_default_brain(user_id)
assert default_brain is not None
assert str(default_brain.brain_id) == str(brain_id)
def create_public_brain_retrieve_and_then_delete(client, api_key):
# Generate a random name for the brain
random_brain_name = "".join(
random.choices(string.ascii_letters + string.digits, k=10)
)
# Set up the request payload
payload = {
"name": random_brain_name,
"status": "public",
"model": "gpt-3.5-turbo-0125",
"temperature": 0,
"max_tokens": 256,
"brain_type": "doc",
}
# Making a POST request to the /brains/ endpoint
response = client.post(
"/brains/",
json=payload,
headers={"Authorization": "Bearer " + api_key},
)
# Assert that the response status code is 200 (HTTP OK)
assert response.status_code == 200
# Optionally, assert on specific fields in the response
response_data = response.json()
# e.g., assert that the response contains a 'brain_id' field
assert "id" in response_data
assert "name" in response_data
# Optionally, assert that the returned 'name' matches the one sent in the request
assert response_data["name"] == payload["name"]
# Now, retrieve all brains for the current user
response = client.get(
"/brains/public",
headers={"Authorization": "Bearer " + api_key},
)
# Assert that the response status code is 200 (HTTP OK)
assert response.status_code == 200
assert len(response.json()["brains"]) > 0
# Check brain is in public list
brain_id = response_data["id"]
public_brains = response.json()["brains"]
assert brain_id in [brain["id"] for brain in public_brains]
# Delete the brain
response = client.delete(
f"/brains/{brain_id}/subscription",
headers={"Authorization": "Bearer " + api_key},
)
# Assert that the DELETE response status code is 200 (HTTP OK)
assert response.status_code == 200

View File

@ -1,72 +0,0 @@
import uuid
from datetime import datetime
from unittest.mock import create_autospec
import pytest
from modules.brain.dto.inputs import CreateBrainProperties
from modules.brain.entity.brain_entity import BrainEntity, BrainType
from modules.brain.repository.interfaces.brains_interface import BrainsInterface
@pytest.fixture
def mock_brains_interface():
return create_autospec(BrainsInterface)
def test_create_brain(mock_brains_interface):
brain = CreateBrainProperties()
mock_brains_interface.create_brain.return_value = BrainEntity(
brain_id=uuid.uuid4(), # generate a valid UUID
name="test_name",
last_update=datetime.now().isoformat(), # convert datetime to string
brain_type=BrainType.DOC,
)
result = mock_brains_interface.create_brain(brain)
mock_brains_interface.create_brain.assert_called_once_with(brain)
assert isinstance(result, BrainEntity)
def test_brain_entity_creation():
brain_id = uuid.uuid4()
name = "test_name"
last_update = datetime.now().isoformat()
brain_type = BrainType.DOC
brain_entity = BrainEntity(
brain_id=brain_id, name=name, last_update=last_update, brain_type=brain_type
)
assert brain_entity.brain_id == brain_id
assert brain_entity.name == name
assert brain_entity.last_update == last_update
assert brain_entity.brain_type == brain_type
def test_brain_entity_id_property():
brain_id = uuid.uuid4()
name = "test_name"
last_update = datetime.now().isoformat()
brain_type = BrainType.DOC
brain_entity = BrainEntity(
brain_id=brain_id, name=name, last_update=last_update, brain_type=brain_type
)
assert brain_entity.id == brain_id
def test_brain_entity_dict_method():
brain_id = uuid.uuid4()
name = "test_name"
last_update = datetime.now().isoformat()
brain_type = BrainType.DOC
brain_entity = BrainEntity(
brain_id=brain_id, name=name, last_update=last_update, brain_type=brain_type
)
brain_dict = brain_entity.dict()
assert brain_dict["id"] == brain_id
assert brain_dict["name"] == name
assert brain_dict["last_update"] == last_update
assert brain_dict["brain_type"] == brain_type

View File

@ -1,108 +0,0 @@
# FILEPATH: /Users/stan/Dev/Padok/secondbrain/backend/modules/chat/controller/chat/test_utils.py
import uuid
from unittest.mock import Mock, patch
import pytest
from fastapi import HTTPException
from models.databases.entity import LLMModels
from modules.chat.controller.chat.utils import (
find_model_and_generate_metadata,
update_user_usage,
)
@patch("modules.chat.controller.chat.utils.chat_service")
def test_find_model_and_generate_metadata(mock_chat_service):
chat_id = uuid.uuid4()
brain = Mock()
brain.model = "gpt-3.5-turbo-0125"
user_settings = {"models": ["gpt-3.5-turbo-0125"]}
models_settings = [
{"name": "gpt-3.5-turbo-0125", "max_input": 512, "max_output": 512}
]
metadata_brain = {"key": "value"}
mock_chat_service.get_follow_up_question.return_value = []
model_to_use, metadata = find_model_and_generate_metadata(
chat_id, brain, user_settings, models_settings, metadata_brain
)
assert isinstance(model_to_use, LLMModels)
assert model_to_use.name == "gpt-3.5-turbo-0125"
assert model_to_use.max_input == 512
assert model_to_use.max_output == 512
assert metadata == {
"key": "value",
"follow_up_questions": [],
"model": "gpt-3.5-turbo-0125",
"max_tokens": 512,
"max_input": 512,
}
@patch("modules.chat.controller.chat.utils.chat_service")
def test_find_model_and_generate_metadata_user_not_allowed(mock_chat_service):
chat_id = uuid.uuid4()
brain = Mock()
brain.model = "gpt-3.5-turbo-0125"
user_settings = {
"models": ["gpt-3.5-turbo-1107"]
} # User is not allowed to use the brain's model
models_settings = [
{"name": "gpt-3.5-turbo-0125", "max_input": 512, "max_output": 512},
{"name": "gpt-3.5-turbo-1107", "max_input": 12000, "max_output": 12000},
]
metadata_brain = {"key": "value"}
mock_chat_service.get_follow_up_question.return_value = []
model_to_use, metadata = find_model_and_generate_metadata(
chat_id, brain, user_settings, models_settings, metadata_brain
)
assert isinstance(model_to_use, LLMModels)
assert model_to_use.name == "gpt-3.5-turbo-0125" # Default model is used
assert model_to_use.max_input == 12000
assert model_to_use.max_output == 1000
assert metadata == {
"key": "value",
"follow_up_questions": [],
"model": "gpt-3.5-turbo-0125",
"max_tokens": 1000,
"max_input": 12000,
}
@patch("modules.chat.controller.chat.utils.time")
def test_check_update_user_usage_within_limit(mock_time):
mock_time.strftime.return_value = "20220101"
usage = Mock()
usage.get_user_monthly_usage.return_value = 50
user_settings = {"monthly_chat_credit": 100}
models_settings = [{"name": "gpt-3.5-turbo", "price": 10}]
model_name = "gpt-3.5-turbo"
update_user_usage(usage, user_settings, models_settings, model_name)
usage.handle_increment_user_request_count.assert_called_once_with("20220101", 10)
@patch("modules.chat.controller.chat.utils.time")
def test_update_user_usage_exceeds_limit(mock_time):
mock_time.strftime.return_value = "20220101"
usage = Mock()
usage.get_user_monthly_usage.return_value = 100
user_settings = {"monthly_chat_credit": 100}
models_settings = [{"name": "gpt-3.5-turbo", "price": 10}]
model_name = "gpt-3.5-turbo"
with pytest.raises(HTTPException) as exc_info:
update_user_usage(usage, user_settings, models_settings, model_name)
assert exc_info.value.status_code == 429
assert (
"You have reached your monthly chat limit of 100 requests per months."
in str(exc_info.value.detail)
)

View File

@ -1,82 +0,0 @@
import random
import string
def test_get_all_chats(client, api_key):
# Making a GET request to the /chat endpoint to retrieve all chats
response = client.get(
"/chat",
headers={"Authorization": "Bearer " + api_key},
)
# Assert that the response status code is 200 (HTTP OK)
assert response.status_code == 200
# Assert that the response data is a list
response_data = response.json()
# Optionally, you can loop through the chats and assert on specific fields
for chat in response_data["chats"]:
# e.g., assert that each chat object contains 'chat_id' and 'chat_name'
assert "chat_id" in chat
assert "chat_name" in chat
def test_create_chat_and_talk(client, api_key):
# Make a POST request to chat with the default brain and a random chat name
random_chat_name = "".join(
random.choices(string.ascii_letters + string.digits, k=10)
)
brain_response = client.get(
"/brains/default", headers={"Authorization": "Bearer " + api_key}
)
assert brain_response.status_code == 200
default_brain_id = brain_response.json()["id"]
print("Default brain id: " + default_brain_id)
# Create a chat
response = client.post(
"/chat",
json={"name": random_chat_name},
headers={"Authorization": "Bearer " + api_key},
)
assert response.status_code == 200
# now talk to the chat with a question
response_data = response.json()
print(response_data)
chat_id = response_data["chat_id"]
response = client.post(
f"/chat/{chat_id}/question?brain_id={default_brain_id}",
json={
"model": "gpt-3.5-turbo-0125",
"question": "Hello, how are you?",
"temperature": "0",
"max_tokens": "2000",
},
headers={"Authorization": "Bearer " + api_key},
)
assert response.status_code == 200
# Now, let's delete the chat
delete_response = client.delete(
"/chat/" + chat_id, headers={"Authorization": "Bearer " + api_key}
)
assert delete_response.status_code == 200
# Test delete all chats for a user
def test_delete_all_chats(client, api_key):
chats = client.get("/chat", headers={"Authorization": "Bearer " + api_key})
assert chats.status_code == 200
chats_data = chats.json()
for chat in chats_data["chats"]:
# e.g., assert that each chat object contains 'chat_id' and 'chat_name'
assert "chat_id" in chat
assert "chat_name" in chat
chat_id = chat["chat_id"]
delete_response = client.delete(
"/chat/" + chat_id, headers={"Authorization": "Bearer " + api_key}
)
assert delete_response.status_code == 200

View File

@ -1,13 +0,0 @@
def test_get_notifications(client, api_key):
# Send a request to get notifications
response = client.get(
"/notifications/ab780686-bcf3-46cb-9068-d724628caccd",
headers={"Authorization": "Bearer " + api_key},
)
# Assert that the response status code is 200 (HTTP OK)
assert response.status_code == 200
# Assert that the response contains the expected fields
notifications = response.json()
assert notifications == []

View File

@ -1,73 +0,0 @@
from modules.onboarding.service.onboarding_service import OnboardingService
onboardingService = OnboardingService()
def test_remove_onboarding(client, api_key):
response = client.put(
"/onboarding",
headers={"Authorization": "Bearer " + api_key},
json={
"onboarding_a": False,
"onboarding_b1": False,
"onboarding_b2": False,
"onboarding_b3": False,
},
)
assert response.status_code == 404
assert response.json() == {"detail": "User onboarding not updated"}
def test_create_onboarding(client, api_key):
response = client.get("/user", headers={"Authorization": "Bearer " + api_key})
create_user_onboarding_response = onboardingService.create_user_onboarding(
response.json().get("id")
)
assert create_user_onboarding_response == {
"onboarding_a": True,
"onboarding_b1": True,
"onboarding_b2": True,
"onboarding_b3": True,
}
def test_get_onboarding(client, api_key):
response = client.get(
"/onboarding",
headers={"Authorization": "Bearer " + api_key},
)
assert response.status_code == 200
assert "onboarding_a" in response.json()
assert "onboarding_b1" in response.json()
assert "onboarding_b2" in response.json()
assert "onboarding_b3" in response.json()
def test_update_onboarding_to_false(client, api_key):
response = client.put(
"/onboarding",
headers={"Authorization": "Bearer " + api_key},
json={
"onboarding_a": False,
"onboarding_b1": False,
"onboarding_b2": False,
"onboarding_b3": False,
},
)
assert response.status_code == 200
assert response.json() == {
"onboarding_a": False,
"onboarding_b1": False,
"onboarding_b2": False,
"onboarding_b3": False,
}
def test_onboarding_empty(client, api_key):
response = client.get(
"/onboarding",
headers={"Authorization": "Bearer " + api_key},
)
assert response.status_code == 200
assert response.json() == None

View File

@ -1,27 +0,0 @@
import uuid
import pytest
from fastapi import HTTPException
from modules.prompt.repository.prompts import Prompts
def test_get_public_prompts(client, api_key):
response = client.get(
"/prompts",
headers={"Authorization": "Bearer " + api_key},
)
assert response.status_code == 200
assert len(response.json()) == 0
def test_delete_prompt_by_id_not_found():
# Arrange
prompts = Prompts()
prompt_id = uuid.uuid4() # Generate a valid UUID
# Act and Assert
with pytest.raises(HTTPException) as exc_info:
prompts.delete_prompt_by_id(prompt_id)
assert exc_info.value.status_code == 404
assert str(exc_info.value.detail) == "Prompt not found"

View File

@ -1,261 +0,0 @@
import os
import json
import subprocess
# def test_upload_and_delete_file(client, api_key):
# # Retrieve the default brain
# brain_response = client.get(
# "/brains/default", headers={"Authorization": "Bearer " + api_key}
# )
# assert brain_response.status_code == 200
# default_brain_id = brain_response.json()["id"]
# # File to upload
# file_path = "test_files/test.txt"
# file_name = "test.txt" # Assuming the name of the file on the server is the same as the local file name
# # Set enable_summarization flag
# enable_summarization = False
# # Upload the file
# with open(file_path, "rb") as file:
# upload_response = client.post(
# f"/upload?brain_id={default_brain_id}&enable_summarization={enable_summarization}",
# headers={"Authorization": "Bearer " + api_key},
# files={"uploadFile": file},
# )
# # Assert that the upload response status code is 200 (HTTP OK)
# assert upload_response.status_code == 200
# # Optionally, you can assert on specific fields in the upload response data
# upload_response_data = upload_response.json()
# assert "message" in upload_response_data
# # Delete the file
# delete_response = client.delete(
# f"/explore/{file_name}",
# headers={"Authorization": "Bearer " + api_key},
# params={"brain_id": default_brain_id},
# )
# # Assert that the delete response status code is 200 (HTTP OK)
# assert delete_response.status_code == 200
# # Optionally, you can assert on specific fields in the delete response data
# delete_response_data = delete_response.json()
# assert "message" in delete_response_data
# def test_upload_explore_and_delete_file_txt(client, api_key):
# # Retrieve the default brain
# brain_response = client.get(
# "/brains/default", headers={"Authorization": "Bearer " + api_key}
# )
# assert brain_response.status_code == 200
# default_brain_id = brain_response.json()["id"]
# # File to upload
# file_path = "test_files/test.txt"
# file_name = "test.txt" # Assuming the name of the file on the server is the same as the local file name
# # Set enable_summarization flag
# enable_summarization = False
# # Upload the file
# with open(file_path, "rb") as file:
# upload_response = client.post(
# f"/upload?brain_id={default_brain_id}&enable_summarization={enable_summarization}",
# headers={"Authorization": "Bearer " + api_key},
# files={"uploadFile": file},
# )
# # Assert that the upload response status code is 200 (HTTP OK)
# assert upload_response.status_code == 200
# # Optionally, you can assert on specific fields in the upload response data
# upload_response_data = upload_response.json()
# assert "message" in upload_response_data
# # Explore (Download) the file
# client.get(
# f"/explore/{file_name}",
# headers={"Authorization": "Bearer " + api_key},
# )
# # Delete the file
# delete_response = client.delete(
# f"/explore/{file_name}",
# headers={"Authorization": "Bearer " + api_key},
# params={"brain_id": default_brain_id},
# )
# # Assert that the delete response status code is 200 (HTTP OK)
# assert delete_response.status_code == 200
# # Optionally, you can assert on specific fields in the delete response data
# delete_response_data = delete_response.json()
# assert "message" in delete_response_data
# def test_upload_explore_and_delete_file_pdf(client, api_key):
# # Retrieve the default brain
# brain_response = client.get(
# "/brains/default", headers={"Authorization": "Bearer " + api_key}
# )
# assert brain_response.status_code == 200
# default_brain_id = brain_response.json()["id"]
# # File to upload
# file_path = "tests/test_files/test.pdf"
# file_name = "test.pdf" # Assuming the name of the file on the server is the same as the local file name
# # Set enable_summarization flag
# enable_summarization = False
# # Upload the file
# with open(file_path, "rb") as file:
# upload_response = client.post(
# f"/upload?brain_id={default_brain_id}&enable_summarization={enable_summarization}",
# headers={"Authorization": "Bearer " + api_key},
# files={"uploadFile": file},
# )
# # Assert that the upload response status code is 200 (HTTP OK)
# assert upload_response.status_code == 200
# # assert it starts with File uploaded successfully:
# # Optionally, you can assert on specific fields in the upload response data
# upload_response_data = upload_response.json()
# assert "message" in upload_response_data
# assert "type" in upload_response_data
# assert upload_response_data["type"] == "success"
# # Explore (Download) the file
# explore_response = client.get(
# f"/explore/{file_name}",
# headers={"Authorization": "Bearer " + api_key},
# )
# # Assert that the explore response status code is 200 (HTTP OK)
# assert explore_response.status_code == 200
# # Delete the file
# delete_response = client.delete(
# f"/explore/{file_name}",
# headers={"Authorization": "Bearer " + api_key},
# params={"brain_id": default_brain_id},
# )
# # Assert that the delete response status code is 200 (HTTP OK)
# assert delete_response.status_code == 200
# # Optionally, you can assert on specific fields in the delete response data
# delete_response_data = delete_response.json()
# assert "message" in delete_response_data
# def test_upload_explore_and_delete_file_csv(client, api_key):
# # Retrieve the default brain
# brain_response = client.get(
# "/brains/default", headers={"Authorization": "Bearer " + api_key}
# )
# assert brain_response.status_code == 200
# default_brain_id = brain_response.json()["id"]
# # File to upload
# file_path = "tests/test_files/test.csv"
# file_name = "test.csv" # Assuming the name of the file on the server is the same as the local file name
# # Set enable_summarization flag
# enable_summarization = False
# # Upload the file
# with open(file_path, "rb") as file:
# upload_response = client.post(
# f"/upload?brain_id={default_brain_id}&enable_summarization={enable_summarization}",
# headers={"Authorization": "Bearer " + api_key},
# files={"uploadFile": file},
# )
# # Assert that the upload response status code is 200 (HTTP OK)
# assert upload_response.status_code == 200
# # Optionally, you can assert on specific fields in the upload response data
# upload_response_data = upload_response.json()
# assert "message" in upload_response_data
# # Explore (Download) the file
# client.get(
# f"/explore/{file_name}",
# headers={"Authorization": "Bearer " + api_key},
# )
# # Delete the file
# delete_response = client.delete(
# f"/explore/{file_name}",
# headers={"Authorization": "Bearer " + api_key},
# params={"brain_id": default_brain_id},
# )
# # Assert that the delete response status code is 200 (HTTP OK)
# assert delete_response.status_code == 200
# # Optionally, you can assert on specific fields in the delete response data
# delete_response_data = delete_response.json()
# assert "message" in delete_response_data
def test_upload_and_delete_file_bibtex(client, api_key):
# Retrieve the default brain
curl_command = [
'curl', '-s', # '-s' for silent mode to not show progress meter or error messages
'-H', f"Authorization: Bearer {api_key}",
'-H', "Accept: application/json",
'http://localhost:5050/brains/default/'
]
# Execute the curl command
brain_response = subprocess.run(curl_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
response_json = json.loads(brain_response.stdout)
default_brain_id = response_json["id"]
# File to upload quivr/backend/modules/upload/tests/test_files/test.txt
dir_path = os.path.dirname(os.path.realpath(__file__))
# Construct the absolute path to the file
file_path = os.path.join(dir_path, "test_files", "test.bib")
file_name = "test.bib" # Assuming the name of the file on the server is the same as the local file name
# Set enable_summarization flag
enable_summarization = False
curl_command = [
'curl', '-s', '-X', 'POST',
'-H', f"Authorization: Bearer {api_key}",
'-H', "Accept: application/json",
'-F', f"uploadFile=@{file_path}",
f'http://localhost:5050/upload?brain_id={default_brain_id}&enable_summarization={enable_summarization}'
]
brain_response = subprocess.run(curl_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# Optionally, you can assert on specific fields in the upload response data
upload_response_data = brain_response
assert "message" in json.loads(upload_response_data.stdout)
# Delete the file
curl_command = [
'curl', '-s', '-X', 'POST',
'-H', f"Authorization: Bearer {api_key}",
'-H', "Accept: application/json",
'-F', f"uploadFile=@{file_path}",
f'http://localhost:5050/upload?brain_id={default_brain_id}&enable_summarization={enable_summarization}'
]
brain_response = subprocess.run(curl_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# Optionally, you can assert on specific fields in the upload response data
upload_response_data = json.loads(brain_response.stdout)
assert upload_response_data["detail"] == "File test.bib already exists in storage."

View File

@ -1,34 +0,0 @@
def test_get_user_info(client, api_key):
# Send a request to get user information
response = client.get("/user", headers={"Authorization": "Bearer " + api_key})
# Assert that the response status code is 200 (HTTP OK)
assert response.status_code == 200
# Assert that the response contains the expected fields
user_info = response.json()
print(user_info)
assert "email" in user_info
assert "max_brain_size" in user_info
assert "current_brain_size" in user_info
assert "date" in user_info
assert "is_premium" in user_info
assert "models" in user_info
assert "requests_stats" in user_info
assert "id" in user_info
def test_get_user_identity(client, api_key):
# Send a request to get user identity
response = client.get(
"/user/identity", headers={"Authorization": "Bearer " + api_key}
)
# Assert that the response status code is 200 (HTTP OK)
assert response.status_code == 200
# Assert that the response contains the expected fields
user_identity = response.json()
print(user_identity)
assert "id" in user_identity
assert "email" in user_identity

View File

@ -132,7 +132,7 @@ networkx==3.3
newspaper3k==0.2.8
nltk==3.8.1; python_version >= '3.7'
nodeenv==1.8.0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4, 3.5, 3.6'
numpy==1.26.4; python_version >= '3.9'
numpy==1.26.4; python_version >= '3.10'
olefile==0.47; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'
omegaconf==2.3.0; python_version >= '3.6'
onnx==1.16.0
@ -184,6 +184,7 @@ pytesseract==0.3.10; python_version >= '3.7'
pytest==8.1.1; python_version >= '3.8'
pytest-celery==1.0.0; python_version >= '3.8' and python_version < '4.0'
pytest-docker-tools==3.1.3; python_full_version >= '3.7.0' and python_full_version < '4.0.0'
pytest-dotenv==0.5.2
pytest-mock==3.14.0; python_version >= '3.8'
python-dateutil==2.9.0.post0; python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'
python-docx==1.1.0

View File

@ -1,17 +1,17 @@
def test_post_contact(client, mocker):
# Mock the send_email function
mock_send_email = mocker.patch(
"modules.contact_support.controller.contact_routes.resend_contact_sales_email"
)
# def test_post_contact(client, mocker):
# # Mock the send_email function
# mock_send_email = mocker.patch(
# "modules.contact_support.controller.contact_routes.resend_contact_sales_email"
# )
# Define test data
test_data = {"customer_email": "test@example.com", "content": "Test message"}
# # Define test data
# test_data = {"customer_email": "test@example.com", "content": "Test message"}
# Call the endpoint
response = client.post("/contact", json=test_data)
# # Call the endpoint
# response = client.post("/contact", json=test_data)
# Assert that the response is as expected
assert response.status_code == 200
# # Assert that the response is as expected
# assert response.status_code == 200
# Assert that send_email was called with the expected parameters
mock_send_email.assert_called_once()
# # Assert that send_email was called with the expected parameters
# mock_send_email.assert_called_once()

View File

@ -0,0 +1,129 @@
import pytest
from modules.brain.dto.inputs import BrainIntegrationSettings, CreateBrainProperties
from modules.brain.entity.brain_entity import BrainEntity, BrainType
from modules.brain.service.brain_service import BrainService
from pydantic import ValidationError
@pytest.fixture
def brain_service():
# Setup for brain service, if any
service = BrainService()
yield service
# No teardown here, it will be handled in the test function
@pytest.fixture
def user_id():
return "39418e3b-0258-4452-af60-7acfcc1263ff"
@pytest.fixture
def integration_id():
return "b37a2275-61b3-460b-b4ab-94dfdf3642fb"
def test_create_brain_with_user_id(brain_service, user_id, integration_id):
brain_id = None # Initialize brain_id to None
try:
# Arrange
brain_data = CreateBrainProperties(
name="Innovative Brain",
description="A brain representing innovative ideas",
# Add other necessary fields and values
brain_type="integration",
integration=BrainIntegrationSettings(
integration_id=integration_id,
settings={},
),
)
# Act
created_brain = brain_service.create_brain(user_id, brain_data)
# Store the brain_id for teardown
brain_id = created_brain.brain_id
# Assert
assert isinstance(created_brain, BrainEntity)
assert created_brain.name == brain_data.name
assert created_brain.description == brain_data.description
finally:
# Teardown step: delete the brain if it was created
if brain_id:
brain_service.delete_brain(brain_id)
def test_create_brain_with_invalid_user_id(brain_service):
invalid_user_id = "invalid-uuid"
brain_data = CreateBrainProperties(
name="Brain with Invalid User ID",
description="Should fail due to invalid user ID",
brain_type="integration",
integration=BrainIntegrationSettings(
integration_id="valid-integration-id",
settings={},
),
)
with pytest.raises(Exception):
brain_service.create_brain(invalid_user_id, brain_data)
# Generate a test that checks CreateBrainProperties with invalid data
def test_create_brain_with_invalid_brain_type(brain_service):
with pytest.raises(ValidationError):
invalid_brain_data = CreateBrainProperties(
name="Invalid Brain",
description="Should fail due to invalid data",
brain_type="invalid-brain-type",
integration=BrainIntegrationSettings(
integration_id="valid-integration-id",
settings={},
),
)
# Test for valid brain type 'integration'
def test_create_brain_with_valid_brain_type_integration(
brain_service, user_id, integration_id
):
brain_id = None
try:
valid_brain_data = CreateBrainProperties(
name="Valid Integration Brain",
description="Should succeed with valid integration brain type",
brain_type="integration",
integration=BrainIntegrationSettings(
integration_id=integration_id,
settings={},
),
)
created_brain = brain_service.create_brain(user_id, valid_brain_data)
brain_id = created_brain.brain_id
# Assert
assert created_brain.brain_type == BrainType.INTEGRATION
finally:
# Teardown step: delete the brain if it was created
if brain_id:
brain_service.delete_brain(brain_id)
# Test for valid brain type 'doc'
def test_create_brain_with_valid_brain_type_doc(brain_service, user_id):
brain_id = None
try:
valid_brain_data = CreateBrainProperties(
name="Valid Doc Brain",
description="Should succeed with valid doc brain type",
brain_type="doc",
)
created_brain = brain_service.create_brain(user_id, valid_brain_data)
assert created_brain.brain_type == BrainType.DOC
finally:
# Teardown step: delete the brain if it was created
if brain_id:
brain_service.delete_brain(brain_id)

View File

@ -1,10 +1,10 @@
def test_heatlhz(client):
response = client.get("/healthz")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
# def test_heatlhz(client):
# response = client.get("/healthz")
# assert response.status_code == 200
# assert response.json() == {"status": "ok"}
def test_heatlhz_home(client):
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"status": "OK"}
# def test_heatlhz_home(client):
# response = client.get("/")
# assert response.status_code == 200
# assert response.json() == {"status": "OK"}