mirror of
https://github.com/QuivrHQ/quivr.git
synced 2024-12-15 09:32:22 +03:00
ec29f30f32
* fix: edge cases on migration scripts * chore: remove unused deps. * refactor: user_routes * refactor: chat_routes * refactor: upload_routes * refactor: explore_routes * refactor: crawl_routes * chore(refactor): get current user * refactor: more dead dependencies * bug: wrap email in credentials dict. --------- Co-authored-by: Stan Girard <girard.stanislas@gmail.com>
35 lines
1.3 KiB
Python
35 lines
1.3 KiB
Python
import os
|
|
from typing import Optional
|
|
|
|
from fastapi import HTTPException, Request, Depends
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
from models.users import User
|
|
from .auth_handler import decode_access_token
|
|
|
|
|
|
class JWTBearer(HTTPBearer):
|
|
def __init__(self, auto_error: bool = True):
|
|
super().__init__(auto_error=auto_error)
|
|
|
|
async def __call__(self, request: Request):
|
|
credentials: Optional[HTTPAuthorizationCredentials] = await super().__call__(request)
|
|
if os.environ.get("AUTHENTICATE") == "false":
|
|
return True
|
|
if credentials:
|
|
if not credentials.scheme == "Bearer":
|
|
raise HTTPException(status_code=402, detail="Invalid authorization scheme.")
|
|
token = credentials.credentials
|
|
if not self.verify_jwt(token):
|
|
raise HTTPException(status_code=402, detail="Invalid token or expired token.")
|
|
return self.verify_jwt(token) # change this line
|
|
else:
|
|
raise HTTPException(status_code=403, detail="Invalid authorization code.")
|
|
|
|
def verify_jwt(self, jwtoken: str):
|
|
payload = decode_access_token(jwtoken)
|
|
return payload
|
|
|
|
|
|
def get_current_user(credentials: dict = Depends(JWTBearer())) -> User:
|
|
return User(email=credentials.get('email', 'none'))
|