Files
sentinel/app/api/logs.py
chy88 0a1eeb9ddf refactor(admin): 收敛后台接口封装与页面状态逻辑
- 简化绑定和日志接口的查询、序列化与前端数据请求路径
- 统一登录流程与前端 API 调用层,补充后台图标依赖
- 抽取通用异步状态处理,减少多个管理页面的重复逻辑
2026-03-04 00:18:47 +08:00

113 lines
3.7 KiB
Python

from __future__ import annotations
import csv
import io
from datetime import datetime
from fastapi import APIRouter, Depends, Query
from fastapi.responses import StreamingResponse
from sqlalchemy import String, cast, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.dependencies import get_db_session, require_admin
from app.models.intercept_log import InterceptLog
from app.schemas.log import InterceptLogItem, LogListResponse
router = APIRouter(prefix="/admin/api/logs", tags=["logs"], dependencies=[Depends(require_admin)])
def apply_log_filters(
statement,
token: str | None,
attempt_ip: str | None,
start_time: datetime | None,
end_time: datetime | None,
):
if token:
statement = statement.where(InterceptLog.token_display.ilike(f"%{token}%"))
if attempt_ip:
statement = statement.where(cast(InterceptLog.attempt_ip, String).ilike(f"%{attempt_ip}%"))
if start_time:
statement = statement.where(InterceptLog.intercepted_at >= start_time)
if end_time:
statement = statement.where(InterceptLog.intercepted_at <= end_time)
return statement
def to_log_item(item: InterceptLog) -> InterceptLogItem:
return InterceptLogItem(
id=item.id,
token_display=item.token_display,
bound_ip=str(item.bound_ip),
attempt_ip=str(item.attempt_ip),
alerted=item.alerted,
intercepted_at=item.intercepted_at,
)
def write_log_csv(buffer: io.StringIO, logs: list[InterceptLog]) -> None:
writer = csv.writer(buffer)
writer.writerow(["id", "token_display", "bound_ip", "attempt_ip", "alerted", "intercepted_at"])
for item in logs:
writer.writerow(
[
item.id,
item.token_display,
str(item.bound_ip),
str(item.attempt_ip),
item.alerted,
item.intercepted_at.isoformat(),
]
)
@router.get("", response_model=LogListResponse)
async def list_logs(
page: int = Query(default=1, ge=1),
page_size: int = Query(default=20, ge=1, le=200),
token: str | None = Query(default=None),
attempt_ip: str | None = Query(default=None),
start_time: datetime | None = Query(default=None),
end_time: datetime | None = Query(default=None),
session: AsyncSession = Depends(get_db_session),
) -> LogListResponse:
statement = apply_log_filters(select(InterceptLog), token, attempt_ip, start_time, end_time)
total_result = await session.execute(select(func.count()).select_from(statement.subquery()))
total = int(total_result.scalar_one())
logs = (
await session.scalars(
statement.order_by(InterceptLog.intercepted_at.desc()).offset((page - 1) * page_size).limit(page_size)
)
).all()
return LogListResponse(
items=[to_log_item(item) for item in logs],
total=total,
page=page,
page_size=page_size,
)
@router.get("/export")
async def export_logs(
token: str | None = Query(default=None),
attempt_ip: str | None = Query(default=None),
start_time: datetime | None = Query(default=None),
end_time: datetime | None = Query(default=None),
session: AsyncSession = Depends(get_db_session),
):
statement = apply_log_filters(select(InterceptLog), token, attempt_ip, start_time, end_time).order_by(
InterceptLog.intercepted_at.desc()
)
logs = (await session.scalars(statement)).all()
buffer = io.StringIO()
write_log_csv(buffer, logs)
filename = f"sentinel-logs-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}.csv"
return StreamingResponse(
iter([buffer.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)