Files
sentinel/app/api/dashboard.py

110 lines
3.6 KiB
Python

from __future__ import annotations
from datetime import UTC, datetime, time, timedelta
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.dependencies import get_binding_service, get_db_session, require_admin
from app.models.intercept_log import InterceptLog
from app.models.token_binding import STATUS_ACTIVE, STATUS_BANNED, TokenBinding
from app.schemas.log import InterceptLogItem
from app.services.binding_service import BindingService
router = APIRouter(prefix="/admin/api", tags=["dashboard"], dependencies=[Depends(require_admin)])
class MetricSummary(BaseModel):
total: int
allowed: int
intercepted: int
class BindingSummary(BaseModel):
active: int
banned: int
class TrendPoint(BaseModel):
date: str
total: int
allowed: int
intercepted: int
class DashboardResponse(BaseModel):
today: MetricSummary
bindings: BindingSummary
trend: list[TrendPoint]
recent_intercepts: list[InterceptLogItem]
async def build_trend(
session: AsyncSession,
binding_service: BindingService,
) -> list[TrendPoint]:
series = await binding_service.get_metrics_window(days=7)
start_day = datetime.combine(datetime.now(UTC).date() - timedelta(days=6), time.min, tzinfo=UTC)
intercept_counts_result = await session.execute(
select(func.date(InterceptLog.intercepted_at), func.count())
.where(InterceptLog.intercepted_at >= start_day)
.group_by(func.date(InterceptLog.intercepted_at))
)
db_intercept_counts = {
row[0].isoformat(): int(row[1])
for row in intercept_counts_result.all()
}
trend: list[TrendPoint] = []
for item in series:
day = str(item["date"])
allowed = int(item["allowed"])
intercepted = max(int(item["intercepted"]), db_intercept_counts.get(day, 0))
total = max(int(item["total"]), allowed + intercepted)
trend.append(TrendPoint(date=day, total=total, allowed=allowed, intercepted=intercepted))
return trend
async def build_recent_intercepts(session: AsyncSession) -> list[InterceptLogItem]:
recent_logs = (
await session.scalars(select(InterceptLog).order_by(InterceptLog.intercepted_at.desc()).limit(10))
).all()
return [
InterceptLogItem(
id=item.id,
token_display=item.token_display,
bound_ip=item.bound_ip,
attempt_ip=str(item.attempt_ip),
alerted=item.alerted,
intercepted_at=item.intercepted_at,
)
for item in recent_logs
]
@router.get("/dashboard", response_model=DashboardResponse)
async def get_dashboard(
session: AsyncSession = Depends(get_db_session),
binding_service: BindingService = Depends(get_binding_service),
) -> DashboardResponse:
trend = await build_trend(session, binding_service)
active_count = await session.scalar(
select(func.count()).select_from(TokenBinding).where(TokenBinding.status == STATUS_ACTIVE)
)
banned_count = await session.scalar(
select(func.count()).select_from(TokenBinding).where(TokenBinding.status == STATUS_BANNED)
)
recent_intercepts = await build_recent_intercepts(session)
today = trend[-1] if trend else TrendPoint(date=datetime.now(UTC).date().isoformat(), total=0, allowed=0, intercepted=0)
return DashboardResponse(
today=MetricSummary(total=today.total, allowed=today.allowed, intercepted=today.intercepted),
bindings=BindingSummary(active=int(active_count or 0), banned=int(banned_count or 0)),
trend=trend,
recent_intercepts=recent_intercepts,
)