39 lines
1.6 KiB
Python
39 lines
1.6 KiB
Python
from datetime import datetime, timedelta, timezone
|
|
from typing import Annotated
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import OAuth2PasswordBearer
|
|
from jose import jwt, JWTError
|
|
from passlib.context import CryptContext
|
|
from sqlalchemy.orm import Session
|
|
from app.core.config import settings
|
|
from app.core.database import get_db
|
|
from app.models.admin_user import AdminUser
|
|
|
|
pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
oauth2_scheme_admin = OAuth2PasswordBearer(tokenUrl=f"{settings.admin_api_prefix}/auth/login")
|
|
|
|
def verify_password(plain, hashed): return pwd_ctx.verify(plain, hashed)
|
|
def hash_password(pw): return pwd_ctx.hash(pw)
|
|
|
|
def create_admin_access_token(sub: str, minutes: int = 60):
|
|
exp = datetime.now(timezone.utc) + timedelta(minutes=minutes)
|
|
payload = {"sub": sub, "role": "admin", "exp": exp}
|
|
return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
|
|
|
|
def get_current_admin(
|
|
token: Annotated[str, Depends(oauth2_scheme_admin)],
|
|
db: Annotated[Session, Depends(get_db)],
|
|
) -> AdminUser:
|
|
cred_exc = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid admin token")
|
|
try:
|
|
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
|
|
if payload.get("role") != "admin":
|
|
raise cred_exc
|
|
username = payload.get("sub")
|
|
except JWTError:
|
|
raise cred_exc
|
|
user = db.query(AdminUser).filter(AdminUser.username == username, AdminUser.is_active == True).first()
|
|
if not user:
|
|
raise cred_exc
|
|
return user
|