mirror of
https://github.com/StanGirard/quivr.git
synced 2024-11-24 05:55:13 +03:00
327074c5d4
* feat(auth): backend authentification verification * feat(auth): added to all endpoints * feat(auth): added to all endpoints * feat(auth): redirect if not connected * chore(print): removed * feat(login): redirect * feat(icon): added * chore(yarn): removed lock * chore(gitignore): removed
31 lines
1.2 KiB
Python
31 lines
1.2 KiB
Python
from fastapi import Request, HTTPException
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from typing import Optional
|
|
import os
|
|
|
|
from auth_handler import decode_access_token
|
|
|
|
class JWTBearer(HTTPBearer):
|
|
def __init__(self, auto_error: bool = True):
|
|
super().__init__(auto_error=auto_error)
|
|
|
|
async def __call__(self, request: Request):
|
|
credentials: Optional[HTTPAuthorizationCredentials] = await super().__call__(request)
|
|
if os.environ.get("AUTHENTICATE") == "false":
|
|
return True
|
|
if credentials:
|
|
if not credentials.scheme == "Bearer":
|
|
raise HTTPException(status_code=402, detail="Invalid authorization scheme.")
|
|
token = credentials.credentials
|
|
if not self.verify_jwt(token):
|
|
raise HTTPException(status_code=402, detail="Invalid token or expired token.")
|
|
return credentials.credentials
|
|
else:
|
|
raise HTTPException(status_code=403, detail="Invalid authorization code.")
|
|
|
|
def verify_jwt(self, jwtoken: str) -> bool:
|
|
isTokenValid: bool = False
|
|
payload = decode_access_token(jwtoken)
|
|
if payload:
|
|
isTokenValid = True
|
|
return isTokenValid |