from __future__ import annotations import hashlib import hmac from datetime import UTC, datetime, timedelta from typing import Mapping from fastapi import HTTPException, status from jose import JWTError, jwt from redis.asyncio import Redis from app.config import Settings ALGORITHM = "HS256" def mask_token(token: str) -> str: if not token: return "unknown" if len(token) <= 8: return f"{token[:2]}...{token[-2:]}" return f"{token[:4]}...{token[-4:]}"[:20] def hash_token(token: str, secret: str) -> str: return hmac.new(secret.encode("utf-8"), token.encode("utf-8"), hashlib.sha256).hexdigest() def extract_bearer_token(authorization: str | None) -> str | None: if not authorization: return None scheme, _, token = authorization.partition(" ") if scheme.lower() != "bearer" or not token: return None return token.strip() def extract_request_token(headers: Mapping[str, str]) -> tuple[str | None, str | None]: bearer_token = extract_bearer_token(headers.get("authorization")) if bearer_token: return bearer_token, "authorization" for header_name in ("x-api-key", "api-key"): header_value = headers.get(header_name) if header_value and header_value.strip(): return header_value.strip(), header_name return None, None def verify_admin_password(password: str, settings: Settings) -> bool: return hmac.compare_digest(password, settings.admin_password) def create_admin_jwt(settings: Settings) -> tuple[str, int]: expires_in = settings.admin_jwt_expire_hours * 3600 now = datetime.now(UTC) payload = { "sub": "admin", "iat": int(now.timestamp()), "exp": int((now + timedelta(seconds=expires_in)).timestamp()), } token = jwt.encode(payload, settings.admin_jwt_secret, algorithm=ALGORITHM) return token, expires_in def decode_admin_jwt(token: str, settings: Settings) -> dict: try: payload = jwt.decode(token, settings.admin_jwt_secret, algorithms=[ALGORITHM]) except JWTError as exc: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired admin token.", ) from exc if payload.get("sub") != "admin": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid admin token subject.", ) return payload def login_failure_key(client_ip: str) -> str: return f"sentinel:login:fail:{client_ip}" async def ensure_login_allowed(redis: Redis, client_ip: str, settings: Settings) -> None: try: current = await redis.get(login_failure_key(client_ip)) if current is None: return if int(current) >= settings.login_lockout_threshold: ttl = await redis.ttl(login_failure_key(client_ip)) retry_after = max(ttl, 0) raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail=f"Too many failed login attempts. Retry after {retry_after} seconds.", ) except HTTPException: raise except Exception as exc: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Login lock service is unavailable.", ) from exc async def register_login_failure(redis: Redis, client_ip: str, settings: Settings) -> None: key = login_failure_key(client_ip) async with redis.pipeline(transaction=True) as pipeline: pipeline.incr(key) pipeline.expire(key, settings.login_lockout_seconds) await pipeline.execute() async def clear_login_failures(redis: Redis, client_ip: str) -> None: await redis.delete(login_failure_key(client_ip))