quivr/backend/api/quivr_api/utils/telemetry.py
Stan Girard 380cf82706
feat: quivr core 0.1 (#2970)
# Description


# Testing backend 

## Docker setup
1. Copy `.env.example` to `.env`. Some env variables were added :
EMBEDDING_DIM
2. Apply supabase migratrions : 
```sh
supabase stop
supabase db reset
supabase start
```
3. Start backend containers
```
make dev
```
## Local setup 
You can also run backend without docker.
1. Install [`rye`](https://rye.astral.sh/guide/installation/). Choose
the managed python version and set the version to 3.11
2. Run the following: 
```
cd quivr/backend
rye sync
```
3. Source `.venv` virtual env : `source .venv/bin/activate`
4. Run the backend, make sure you are running redis and supabase
API: 
```
LOG_LEVEL=debug uvicorn quivr_api.main:app --log-level debug --reload --host 0.0.0.0 --port 5050 --workers 1
```
Worker: 
```
LOG_LEVEL=debug celery -A quivr_worker.celery_worker worker -l info -E --concurrency 1
```
Notifier: 
```
LOG_LEVEL=debug python worker/quivr_worker/celery_monitor.py
```

---------

Co-authored-by: chloedia <chloedaems0@gmail.com>
Co-authored-by: aminediro <aminedirhoussi1@gmail.com>
Co-authored-by: Antoine Dewez <44063631+Zewed@users.noreply.github.com>
Co-authored-by: Chloé Daems <73901882+chloedia@users.noreply.github.com>
Co-authored-by: Zewed <dewez.antoine2@gmail.com>
2024-09-02 10:20:53 +02:00

62 lines
1.6 KiB
Python

import hashlib
import json
import os
import httpx
from fastapi import Request
from quivr_api.logger import get_logger
logger = get_logger(__name__)
# Assume these are your Supabase Function endpoint and any necessary headers
TELEMETRY_URL = "https://ovbvcnwemowuuuaebizd.supabase.co/functions/v1/telemetry"
HEADERS = {
"Content-Type": "application/json",
}
def generate_machine_key():
# Get the OpenAI API key from the environment variables
seed = os.getenv("OPENAI_API_KEY") or ""
# Use SHA-256 hash to generate a unique key from the seed
unique_key = hashlib.sha256(seed.encode()).hexdigest()
return unique_key
def send_telemetry(event_name: str, event_data: dict, request: Request | None = None):
# Generate a unique machine key
machine_key = generate_machine_key()
domain = None
if request:
domain = request.url.hostname
logger.info(f"Domain: {domain}")
event_data = {**event_data, "domain": domain}
# Prepare the payload
payload = json.dumps(
{
"anonymous_identifier": machine_key,
"event_name": event_name,
"event_data": event_data,
}
)
# TODO: client should only live once
# Send the telemetry data
with httpx.Client() as client:
_ = client.post(TELEMETRY_URL, headers=HEADERS, data=payload)
def maybe_send_telemetry(
event_name: str,
event_data: dict,
request: Request | None = None,
):
enable_telemetry = os.getenv("TELEMETRY_ENABLED", "false")
if enable_telemetry.lower() == "false":
return
send_telemetry(event_name, event_data, request)