mirror of
https://github.com/StanGirard/quivr.git
synced 2024-12-20 01:21:50 +03:00
31 lines
1.2 KiB
Python
31 lines
1.2 KiB
Python
|
from fastapi import Request, HTTPException
|
||
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||
|
from typing import Optional
|
||
|
import os
|
||
|
|
||
|
from auth_handler import decode_access_token
|
||
|
|
||
|
class JWTBearer(HTTPBearer):
|
||
|
def __init__(self, auto_error: bool = True):
|
||
|
super().__init__(auto_error=auto_error)
|
||
|
|
||
|
async def __call__(self, request: Request):
|
||
|
credentials: Optional[HTTPAuthorizationCredentials] = await super().__call__(request)
|
||
|
if os.environ.get("AUTHENTICATE") == "false":
|
||
|
return True
|
||
|
if credentials:
|
||
|
if not credentials.scheme == "Bearer":
|
||
|
raise HTTPException(status_code=402, detail="Invalid authorization scheme.")
|
||
|
token = credentials.credentials
|
||
|
if not self.verify_jwt(token):
|
||
|
raise HTTPException(status_code=402, detail="Invalid token or expired token.")
|
||
|
return credentials.credentials
|
||
|
else:
|
||
|
raise HTTPException(status_code=403, detail="Invalid authorization code.")
|
||
|
|
||
|
def verify_jwt(self, jwtoken: str) -> bool:
|
||
|
isTokenValid: bool = False
|
||
|
payload = decode_access_token(jwtoken)
|
||
|
if payload:
|
||
|
isTokenValid = True
|
||
|
return isTokenValid
|