Files
sentinel/app/core/security.py

105 lines
3.2 KiB
Python
Raw Normal View History

from __future__ import annotations
import hashlib
import hmac
from datetime import UTC, datetime, timedelta
from fastapi import HTTPException, status
from jose import JWTError, jwt
from redis.asyncio import Redis
from app.config import Settings
ALGORITHM = "HS256"
def mask_token(token: str) -> str:
if not token:
return "unknown"
if len(token) <= 8:
return f"{token[:2]}...{token[-2:]}"
return f"{token[:4]}...{token[-4:]}"[:20]
def hash_token(token: str, secret: str) -> str:
return hmac.new(secret.encode("utf-8"), token.encode("utf-8"), hashlib.sha256).hexdigest()
def extract_bearer_token(authorization: str | None) -> str | None:
if not authorization:
return None
scheme, _, token = authorization.partition(" ")
if scheme.lower() != "bearer" or not token:
return None
return token.strip()
def verify_admin_password(password: str, settings: Settings) -> bool:
return hmac.compare_digest(password, settings.admin_password)
def create_admin_jwt(settings: Settings) -> tuple[str, int]:
expires_in = settings.admin_jwt_expire_hours * 3600
now = datetime.now(UTC)
payload = {
"sub": "admin",
"iat": int(now.timestamp()),
"exp": int((now + timedelta(seconds=expires_in)).timestamp()),
}
token = jwt.encode(payload, settings.admin_jwt_secret, algorithm=ALGORITHM)
return token, expires_in
def decode_admin_jwt(token: str, settings: Settings) -> dict:
try:
payload = jwt.decode(token, settings.admin_jwt_secret, algorithms=[ALGORITHM])
except JWTError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired admin token.",
) from exc
if payload.get("sub") != "admin":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid admin token subject.",
)
return payload
def login_failure_key(client_ip: str) -> str:
return f"sentinel:login:fail:{client_ip}"
async def ensure_login_allowed(redis: Redis, client_ip: str, settings: Settings) -> None:
try:
current = await redis.get(login_failure_key(client_ip))
if current is None:
return
if int(current) >= settings.login_lockout_threshold:
ttl = await redis.ttl(login_failure_key(client_ip))
retry_after = max(ttl, 0)
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail=f"Too many failed login attempts. Retry after {retry_after} seconds.",
)
except HTTPException:
raise
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Login lock service is unavailable.",
) from exc
async def register_login_failure(redis: Redis, client_ip: str, settings: Settings) -> None:
key = login_failure_key(client_ip)
async with redis.pipeline(transaction=True) as pipeline:
pipeline.incr(key)
pipeline.expire(key, settings.login_lockout_seconds)
await pipeline.execute()
async def clear_login_failures(redis: Redis, client_ip: str) -> None:
await redis.delete(login_failure_key(client_ip))