- 搭建 FastAPI、Redis、PostgreSQL、Nginx 与 Docker Compose 基础结构 - 实现反向代理、首用绑定、拦截告警、归档任务和管理接口 - 提供 Vue3 管理后台初版,以及 uv/requirements 双依赖配置
54 lines
1.6 KiB
Python
54 lines
1.6 KiB
Python
from __future__ import annotations
|
|
|
|
from collections.abc import AsyncIterator
|
|
|
|
from fastapi import Depends, HTTPException, Request, status
|
|
from redis.asyncio import Redis
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.config import RuntimeSettings, Settings
|
|
from app.core.security import decode_admin_jwt, extract_bearer_token
|
|
from app.services.alert_service import AlertService
|
|
from app.services.archive_service import ArchiveService
|
|
from app.services.binding_service import BindingService
|
|
|
|
|
|
def get_settings(request: Request) -> Settings:
|
|
return request.app.state.settings
|
|
|
|
|
|
def get_redis(request: Request) -> Redis | None:
|
|
return request.app.state.redis
|
|
|
|
|
|
async def get_db_session(request: Request) -> AsyncIterator[AsyncSession]:
|
|
session_factory = request.app.state.session_factory
|
|
async with session_factory() as session:
|
|
yield session
|
|
|
|
|
|
def get_binding_service(request: Request) -> BindingService:
|
|
return request.app.state.binding_service
|
|
|
|
|
|
def get_alert_service(request: Request) -> AlertService:
|
|
return request.app.state.alert_service
|
|
|
|
|
|
def get_archive_service(request: Request) -> ArchiveService:
|
|
return request.app.state.archive_service
|
|
|
|
|
|
def get_runtime_settings(request: Request) -> RuntimeSettings:
|
|
return request.app.state.runtime_settings
|
|
|
|
|
|
async def require_admin(request: Request, settings: Settings = Depends(get_settings)) -> dict:
|
|
token = extract_bearer_token(request.headers.get("authorization"))
|
|
if token is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Missing admin bearer token.",
|
|
)
|
|
return decode_admin_jwt(token, settings)
|