from datetime import datetime, timedelta, timezone from typing import Annotated from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import jwt, JWTError from passlib.context import CryptContext from sqlalchemy.orm import Session from app.core.config import settings from app.core.database import get_db from app.models.admin_user import AdminUser pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme_admin = OAuth2PasswordBearer(tokenUrl=f"{settings.admin_api_prefix}/auth/login") def verify_password(plain, hashed): return pwd_ctx.verify(plain, hashed) def hash_password(pw): return pwd_ctx.hash(pw) def create_admin_access_token(sub: str, minutes: int = 60): exp = datetime.now(timezone.utc) + timedelta(minutes=minutes) payload = {"sub": sub, "role": "admin", "exp": exp} return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm) def get_current_admin( token: Annotated[str, Depends(oauth2_scheme_admin)], db: Annotated[Session, Depends(get_db)], ) -> AdminUser: cred_exc = HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid admin token") try: payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm]) if payload.get("role") != "admin": raise cred_exc username = payload.get("sub") except JWTError: raise cred_exc user = db.query(AdminUser).filter(AdminUser.username == username, AdminUser.is_active == True).first() if not user: raise cred_exc return user