- 搭建 FastAPI、Redis、PostgreSQL、Nginx 与 Docker Compose 基础结构 - 实现反向代理、首用绑定、拦截告警、归档任务和管理接口 - 提供 Vue3 管理后台初版,以及 uv/requirements 双依赖配置
105 lines
3.2 KiB
Python
105 lines
3.2 KiB
Python
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import hmac
|
|
from datetime import UTC, datetime, timedelta
|
|
|
|
from fastapi import HTTPException, status
|
|
from jose import JWTError, jwt
|
|
from redis.asyncio import Redis
|
|
|
|
from app.config import Settings
|
|
|
|
ALGORITHM = "HS256"
|
|
|
|
|
|
def mask_token(token: str) -> str:
|
|
if not token:
|
|
return "unknown"
|
|
if len(token) <= 8:
|
|
return f"{token[:2]}...{token[-2:]}"
|
|
return f"{token[:4]}...{token[-4:]}"[:20]
|
|
|
|
|
|
def hash_token(token: str, secret: str) -> str:
|
|
return hmac.new(secret.encode("utf-8"), token.encode("utf-8"), hashlib.sha256).hexdigest()
|
|
|
|
|
|
def extract_bearer_token(authorization: str | None) -> str | None:
|
|
if not authorization:
|
|
return None
|
|
scheme, _, token = authorization.partition(" ")
|
|
if scheme.lower() != "bearer" or not token:
|
|
return None
|
|
return token.strip()
|
|
|
|
|
|
def verify_admin_password(password: str, settings: Settings) -> bool:
|
|
return hmac.compare_digest(password, settings.admin_password)
|
|
|
|
|
|
def create_admin_jwt(settings: Settings) -> tuple[str, int]:
|
|
expires_in = settings.admin_jwt_expire_hours * 3600
|
|
now = datetime.now(UTC)
|
|
payload = {
|
|
"sub": "admin",
|
|
"iat": int(now.timestamp()),
|
|
"exp": int((now + timedelta(seconds=expires_in)).timestamp()),
|
|
}
|
|
token = jwt.encode(payload, settings.admin_jwt_secret, algorithm=ALGORITHM)
|
|
return token, expires_in
|
|
|
|
|
|
def decode_admin_jwt(token: str, settings: Settings) -> dict:
|
|
try:
|
|
payload = jwt.decode(token, settings.admin_jwt_secret, algorithms=[ALGORITHM])
|
|
except JWTError as exc:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid or expired admin token.",
|
|
) from exc
|
|
|
|
if payload.get("sub") != "admin":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid admin token subject.",
|
|
)
|
|
return payload
|
|
|
|
|
|
def login_failure_key(client_ip: str) -> str:
|
|
return f"sentinel:login:fail:{client_ip}"
|
|
|
|
|
|
async def ensure_login_allowed(redis: Redis, client_ip: str, settings: Settings) -> None:
|
|
try:
|
|
current = await redis.get(login_failure_key(client_ip))
|
|
if current is None:
|
|
return
|
|
if int(current) >= settings.login_lockout_threshold:
|
|
ttl = await redis.ttl(login_failure_key(client_ip))
|
|
retry_after = max(ttl, 0)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
|
|
detail=f"Too many failed login attempts. Retry after {retry_after} seconds.",
|
|
)
|
|
except HTTPException:
|
|
raise
|
|
except Exception as exc:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail="Login lock service is unavailable.",
|
|
) from exc
|
|
|
|
|
|
async def register_login_failure(redis: Redis, client_ip: str, settings: Settings) -> None:
|
|
key = login_failure_key(client_ip)
|
|
async with redis.pipeline(transaction=True) as pipeline:
|
|
pipeline.incr(key)
|
|
pipeline.expire(key, settings.login_lockout_seconds)
|
|
await pipeline.execute()
|
|
|
|
|
|
async def clear_login_failures(redis: Redis, client_ip: str) -> None:
|
|
await redis.delete(login_failure_key(client_ip))
|