mirror of
https://github.com/StanGirard/quivr.git
synced 2024-12-22 10:51:46 +03:00
feat: Update pytest command in Makefile and add new test (#1893)
This commit is contained in:
parent
dd293e4ac3
commit
4fa9a03cc1
11
.vscode/settings.json
vendored
11
.vscode/settings.json
vendored
@ -7,7 +7,9 @@
|
|||||||
},
|
},
|
||||||
"python.linting.enabled": true,
|
"python.linting.enabled": true,
|
||||||
"python.linting.flake8Enabled": true,
|
"python.linting.flake8Enabled": true,
|
||||||
"python.analysis.extraPaths": ["./backend"],
|
"python.analysis.extraPaths": [
|
||||||
|
"./backend"
|
||||||
|
],
|
||||||
"editor.formatOnSave": true,
|
"editor.formatOnSave": true,
|
||||||
"[python]": {
|
"[python]": {
|
||||||
"editor.defaultFormatter": "ms-python.black-formatter",
|
"editor.defaultFormatter": "ms-python.black-formatter",
|
||||||
@ -47,4 +49,9 @@
|
|||||||
"python.defaultInterpreterPath": "python3",
|
"python.defaultInterpreterPath": "python3",
|
||||||
"python.linting.flake8CategorySeverity.W": "Error",
|
"python.linting.flake8CategorySeverity.W": "Error",
|
||||||
"json.sortOnSave.enable": true,
|
"json.sortOnSave.enable": true,
|
||||||
}
|
"python.testing.pytestArgs": [
|
||||||
|
"backend"
|
||||||
|
],
|
||||||
|
"python.testing.unittestEnabled": false,
|
||||||
|
"python.testing.pytestEnabled": true,
|
||||||
|
}
|
2
Makefile
2
Makefile
@ -1,5 +1,5 @@
|
|||||||
test:
|
test:
|
||||||
pytest backend/tests
|
pytest backend/
|
||||||
|
|
||||||
dev:
|
dev:
|
||||||
docker compose -f docker-compose.dev.yml build backend-core
|
docker compose -f docker-compose.dev.yml build backend-core
|
||||||
|
@ -49,52 +49,10 @@ if sentry_dsn:
|
|||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
# if CREATE_FIRST_USER := os.getenv("CREATE_FIRST_USER", "False").lower() == "true":
|
|
||||||
# try:
|
|
||||||
# from supabase import create_client
|
|
||||||
|
|
||||||
# supabase_client_auth = create_client(
|
|
||||||
# os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_SERVICE_KEY")
|
|
||||||
# )
|
|
||||||
# res = supabase_client_auth.from_('users').select('*').eq('email', "admin@quivr.app").execute()
|
|
||||||
# if len(res.data) == 0:
|
|
||||||
# supabase_client_auth.auth.admin.create_user({"email": "admin@quivr.app","email_confirm": True, "password": "admin"})
|
|
||||||
# logger.info("👨💻 Created first user")
|
|
||||||
# else:
|
|
||||||
# logger.info("👨💻 First user already exists")
|
|
||||||
# except Exception as e:
|
|
||||||
# logger.error("👨💻 Error while creating first user")
|
|
||||||
# logger.error(e)
|
|
||||||
|
|
||||||
|
|
||||||
# telemetry_disabled = os.getenv("TELEMETRY_DISABLED", "False").lower() == "true"
|
|
||||||
# if not telemetry_disabled:
|
|
||||||
# try:
|
|
||||||
# logger.info("👨💻 You can disable TELEMETRY by addind TELEMETRY_DISABLED=True to your env variables")
|
|
||||||
# logger.info("Telemetry is used to measure the usage of the app. No personal data is collected.")
|
|
||||||
# import os
|
|
||||||
# from supabase import create_client
|
|
||||||
# import uuid
|
|
||||||
# supabase_url = os.environ.get("SUPABASE_URL", "NOT_SET")
|
|
||||||
# supabase_client_telemetry = create_client("https://phcwncasycjransxnmbf.supabase.co","eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6InBoY3duY2FzeWNqcmFuc3hubWJmIiwicm9sZSI6ImFub24iLCJpYXQiOjE3MDE0NDM5NDEsImV4cCI6MjAxNzAxOTk0MX0.0MDz2ETHdQve9yVy_YI79iGsrlpLXX1ObrjmnzyVKSo")
|
|
||||||
# ## insert in the usage table id as uuid of supabase_url
|
|
||||||
# uuid_from_string = uuid.uuid5(uuid.NAMESPACE_DNS, supabase_url)
|
|
||||||
# supabase_client_telemetry.table("usage").insert({"id": str(uuid_from_string)}).execute()
|
|
||||||
# except Exception as e:
|
|
||||||
# logger.error("Error while sending telemetry")
|
|
||||||
|
|
||||||
|
|
||||||
app = FastAPI()
|
app = FastAPI()
|
||||||
|
|
||||||
add_cors_middleware(app)
|
add_cors_middleware(app)
|
||||||
|
|
||||||
|
|
||||||
# @app.on_event("startup")
|
|
||||||
# async def startup_event():
|
|
||||||
# if not os.path.exists(pypandoc.get_pandoc_path()):
|
|
||||||
# pypandoc.download_pandoc()
|
|
||||||
|
|
||||||
|
|
||||||
app.include_router(brain_router)
|
app.include_router(brain_router)
|
||||||
app.include_router(chat_router)
|
app.include_router(chat_router)
|
||||||
app.include_router(crawl_router)
|
app.include_router(crawl_router)
|
||||||
|
83
backend/modules/api_key/tests/test_api_key.py
Normal file
83
backend/modules/api_key/tests/test_api_key.py
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
from modules.api_key.entity.api_key import ApiKey
|
||||||
|
from modules.api_key.service.api_key_service import ApiKeys
|
||||||
|
|
||||||
|
APIKeyService = ApiKeys()
|
||||||
|
|
||||||
|
|
||||||
|
def test_read_main(client, api_key):
|
||||||
|
response = client.get("/")
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json() == {"status": "OK"}
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_and_delete_api_key(client, api_key):
|
||||||
|
# First, let's create an API key
|
||||||
|
response = client.post(
|
||||||
|
"/api-key",
|
||||||
|
headers={
|
||||||
|
"Authorization": "Bearer " + api_key,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
api_key_info = response.json()
|
||||||
|
assert "api_key" in api_key_info
|
||||||
|
|
||||||
|
# Extract the created api_key from the response
|
||||||
|
api_key = api_key_info["api_key"]
|
||||||
|
|
||||||
|
# Now, let's verify the API key
|
||||||
|
verify_response = client.get(
|
||||||
|
"/user",
|
||||||
|
headers={
|
||||||
|
"Authorization": f"Bearer {api_key}",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert verify_response.status_code == 200
|
||||||
|
|
||||||
|
# Now, let's delete the API key
|
||||||
|
assert "key_id" in api_key_info
|
||||||
|
key_id = api_key_info["key_id"]
|
||||||
|
|
||||||
|
delete_response = client.delete(
|
||||||
|
f"/api-key/{key_id}", headers={"Authorization": f"Bearer {api_key}"}
|
||||||
|
)
|
||||||
|
assert delete_response.status_code == 200
|
||||||
|
assert delete_response.json() == {"message": "API key deleted."}
|
||||||
|
|
||||||
|
|
||||||
|
def test_api_key_model():
|
||||||
|
api_key_data = {
|
||||||
|
"api_key": "1234567890",
|
||||||
|
"key_id": "abcd1234",
|
||||||
|
"days": 7,
|
||||||
|
"only_chat": False,
|
||||||
|
"name": "Test API Key",
|
||||||
|
"creation_time": "2022-01-01T00:00:00Z",
|
||||||
|
"is_active": True,
|
||||||
|
}
|
||||||
|
api_key = ApiKey(**api_key_data)
|
||||||
|
assert api_key.api_key == "1234567890"
|
||||||
|
assert api_key.key_id == "abcd1234"
|
||||||
|
assert api_key.days == 7
|
||||||
|
assert api_key.only_chat == False
|
||||||
|
assert api_key.name == "Test API Key"
|
||||||
|
assert api_key.creation_time == "2022-01-01T00:00:00Z"
|
||||||
|
assert api_key.is_active == True
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_user_from_api_key(client, api_key):
|
||||||
|
# Call the function with a test API key
|
||||||
|
user = APIKeyService.get_user_id_by_api_key(api_key)
|
||||||
|
|
||||||
|
# Use an assertion to check the returned user
|
||||||
|
assert user is not None, "User should not be None"
|
||||||
|
|
||||||
|
|
||||||
|
def test_verify_api_key(client, api_key):
|
||||||
|
# Call the function with a test API key
|
||||||
|
user = APIKeyService.get_user_id_by_api_key(api_key).data[0]["user_id"]
|
||||||
|
|
||||||
|
user_api_keys = APIKeyService.get_user_api_keys(user)
|
||||||
|
# Use an assertion to check the returned user
|
||||||
|
assert user_api_keys is not None, "User should not be None"
|
||||||
|
assert len(user_api_keys) > 0, "User should have at least one API key"
|
@ -1,39 +0,0 @@
|
|||||||
def test_read_main(client, api_key):
|
|
||||||
response = client.get("/")
|
|
||||||
assert response.status_code == 200
|
|
||||||
assert response.json() == {"status": "OK"}
|
|
||||||
|
|
||||||
|
|
||||||
def test_create_and_delete_api_key(client, api_key):
|
|
||||||
# First, let's create an API key
|
|
||||||
response = client.post(
|
|
||||||
"/api-key",
|
|
||||||
headers={
|
|
||||||
"Authorization": "Bearer " + api_key,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
assert response.status_code == 200
|
|
||||||
api_key_info = response.json()
|
|
||||||
assert "api_key" in api_key_info
|
|
||||||
|
|
||||||
# Extract the created api_key from the response
|
|
||||||
api_key = api_key_info["api_key"]
|
|
||||||
|
|
||||||
# Now, let's verify the API key
|
|
||||||
verify_response = client.get(
|
|
||||||
"/user",
|
|
||||||
headers={
|
|
||||||
"Authorization": f"Bearer {api_key}",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
assert verify_response.status_code == 200
|
|
||||||
|
|
||||||
# Now, let's delete the API key
|
|
||||||
assert "key_id" in api_key_info
|
|
||||||
key_id = api_key_info["key_id"]
|
|
||||||
|
|
||||||
delete_response = client.delete(
|
|
||||||
f"/api-key/{key_id}", headers={"Authorization": f"Bearer {api_key}"}
|
|
||||||
)
|
|
||||||
assert delete_response.status_code == 200
|
|
||||||
assert delete_response.json() == {"message": "API key deleted."}
|
|
Loading…
Reference in New Issue
Block a user