Files
ssii-cai-3/backend/app/routes/auth.py
2025-11-10 19:49:26 +01:00

121 lines
3.9 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from app.db.database import get_db
from app.models.user import UserModel
from app.schemas.user import UserLogin, User2FA, Token2FA
from app.core.security import verify_password, generate_jwt, verify_jwt
from datetime import timedelta
import hashlib
import pyotp
from app.core.config import settings
import qrcode
import io
import base64
router = APIRouter()
def get_digest_function(algorithm: str):
algo = algorithm.upper()
if algo == "SHA256":
return hashlib.sha256
elif algo == "SHA512":
return hashlib.sha512
else:
return hashlib.sha1
@router.post("/login")
def login(user: UserLogin, db: Session = Depends(get_db)):
existing_user = db.query(UserModel).filter(UserModel.user_name == user.user_name).first()
if not existing_user:
raise HTTPException(status_code=404, detail="El usuario no existe")
if not verify_password(user.password, existing_user.password):
raise HTTPException(status_code=401, detail="Credenciales inválidas")
pre_auth_token = generate_jwt(
data={"user_id": existing_user.user_id, "scope": "pre_auth"},
expires=timedelta(minutes=2)
)
return {"pre_auth_token": pre_auth_token}
@router.post("/2fa")
def two_factor_auth(user_2fa: User2FA, db: Session = Depends(get_db)):
token = verify_jwt(user_2fa.pre_auth_token)
user_id = token.get("user_id")
if not user_id:
raise HTTPException(status_code=401, detail="Token inválido o expirado")
user = db.query(UserModel).filter(UserModel.user_id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Usuario no encontrado")
digest_func = get_digest_function(settings.TOTP_ALGORITHM)
totp = pyotp.TOTP(
s=user.totp_secret,
digits=settings.TOTP_DIGITS,
digest=digest_func,
interval=settings.TOTP_INTERVAL,
issuer=settings.TOTP_ISSUER,
name=f"{settings.TOTP_NAME_PREFIX}:{user.user_name}"
)
if not totp.verify(user_2fa.totp_code):
raise HTTPException(status_code=401, detail="Código 2FA inválido")
expires = timedelta(seconds=settings.EXPIRATION)
token = generate_jwt(
data={"user_id": user.user_id, "user_name": user.user_name, "scope": "2fa_authenticated"},
expires=expires
)
return {
"access_token": token,
"token_type": "bearer",
"message": "Autenticación 2FA exitosa"
}
@router.post("/enable-2fa")
def enable_2fa(token2fa: Token2FA, db: Session = Depends(get_db)):
token = verify_jwt(token2fa.token)
user_id = token.get("user_id")
if not user_id:
raise HTTPException(status_code=401, detail="Token inválido o expirado")
user = db.query(UserModel).filter(UserModel.user_id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="Usuario no encontrado")
secret = pyotp.random_base32()
user.totp_secret = secret
db.commit()
digest_func = get_digest_function(settings.TOTP_ALGORITHM)
totp = pyotp.TOTP(
s=secret,
digits=settings.TOTP_DIGITS,
digest=digest_func,
interval=settings.TOTP_INTERVAL,
issuer=settings.TOTP_ISSUER,
name=f"{settings.TOTP_NAME_PREFIX}:{user.user_name}"
)
otpauth_url = totp.provisioning_uri(
name=f"{settings.TOTP_NAME_PREFIX}:{user.user_name}",
issuer_name=settings.TOTP_ISSUER
)
qr = qrcode.make(otpauth_url)
buf = io.BytesIO()
qr.save(buf, format="PNG")
img_base64 = base64.b64encode(buf.getvalue()).decode("utf-8")
return {
"user_id": user.user_id,
"user_name": user.user_name,
"totp_secret": secret,
"otpauth_url": otpauth_url,
"qr_base64": img_base64,
"message": "2FA habilitado correctamente. Escanea el QR o introduce el código en tu app de autenticación."
}