184 lines
6.3 KiB
Python
184 lines
6.3 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
|
|
from sqlalchemy import String, cast, func, select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.config import Settings
|
|
from app.core.ip_utils import extract_client_ip
|
|
from app.dependencies import get_binding_service, get_db_session, get_settings, require_admin
|
|
from app.models.token_binding import STATUS_ACTIVE, STATUS_BANNED, TokenBinding
|
|
from app.schemas.binding import (
|
|
BindingActionRequest,
|
|
BindingIPUpdateRequest,
|
|
BindingItem,
|
|
BindingListResponse,
|
|
)
|
|
from app.services.binding_service import BindingService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/admin/api/bindings", tags=["bindings"], dependencies=[Depends(require_admin)])
|
|
|
|
|
|
def to_binding_item(binding: TokenBinding, binding_service: BindingService) -> BindingItem:
|
|
return BindingItem(
|
|
id=binding.id,
|
|
token_display=binding.token_display,
|
|
bound_ip=str(binding.bound_ip),
|
|
binding_mode=binding.binding_mode,
|
|
allowed_ips=[str(item) for item in binding.allowed_ips],
|
|
status=binding.status,
|
|
status_label=binding_service.status_label(binding.status),
|
|
first_used_at=binding.first_used_at,
|
|
last_used_at=binding.last_used_at,
|
|
created_at=binding.created_at,
|
|
)
|
|
|
|
|
|
def apply_binding_filters(
|
|
statement,
|
|
token_suffix: str | None,
|
|
ip: str | None,
|
|
status_filter: int | None,
|
|
):
|
|
if token_suffix:
|
|
statement = statement.where(TokenBinding.token_display.ilike(f"%{token_suffix}%"))
|
|
if ip:
|
|
statement = statement.where(cast(TokenBinding.bound_ip, String).ilike(f"%{ip}%"))
|
|
if status_filter in {STATUS_ACTIVE, STATUS_BANNED}:
|
|
statement = statement.where(TokenBinding.status == status_filter)
|
|
return statement
|
|
|
|
|
|
async def get_binding_or_404(session: AsyncSession, binding_id: int) -> TokenBinding:
|
|
binding = await session.get(TokenBinding, binding_id)
|
|
if binding is None:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Binding was not found.")
|
|
return binding
|
|
|
|
|
|
def log_admin_action(request: Request, settings: Settings, action: str, binding_id: int) -> None:
|
|
logger.info(
|
|
"Admin binding action.",
|
|
extra={
|
|
"client_ip": extract_client_ip(request, settings),
|
|
"action": action,
|
|
"binding_id": binding_id,
|
|
},
|
|
)
|
|
|
|
|
|
async def commit_binding_cache(binding: TokenBinding, binding_service: BindingService) -> None:
|
|
await binding_service.sync_binding_cache(
|
|
binding.token_hash,
|
|
str(binding.bound_ip),
|
|
binding.binding_mode,
|
|
[str(item) for item in binding.allowed_ips],
|
|
binding.status,
|
|
)
|
|
|
|
|
|
async def update_binding_status(
|
|
session: AsyncSession,
|
|
binding: TokenBinding,
|
|
status_code: int,
|
|
binding_service: BindingService,
|
|
) -> None:
|
|
binding.status = status_code
|
|
await session.commit()
|
|
await commit_binding_cache(binding, binding_service)
|
|
|
|
|
|
@router.get("", response_model=BindingListResponse)
|
|
async def list_bindings(
|
|
page: int = Query(default=1, ge=1),
|
|
page_size: int = Query(default=20, ge=1, le=200),
|
|
token_suffix: str | None = Query(default=None),
|
|
ip: str | None = Query(default=None),
|
|
status_filter: int | None = Query(default=None, alias="status"),
|
|
session: AsyncSession = Depends(get_db_session),
|
|
binding_service: BindingService = Depends(get_binding_service),
|
|
) -> BindingListResponse:
|
|
statement = apply_binding_filters(select(TokenBinding), token_suffix, ip, status_filter)
|
|
|
|
total_result = await session.execute(select(func.count()).select_from(statement.subquery()))
|
|
total = int(total_result.scalar_one())
|
|
bindings = (
|
|
await session.scalars(
|
|
statement.order_by(TokenBinding.last_used_at.desc()).offset((page - 1) * page_size).limit(page_size)
|
|
)
|
|
).all()
|
|
|
|
return BindingListResponse(
|
|
items=[to_binding_item(item, binding_service) for item in bindings],
|
|
total=total,
|
|
page=page,
|
|
page_size=page_size,
|
|
)
|
|
|
|
|
|
@router.post("/unbind")
|
|
async def unbind_token(
|
|
payload: BindingActionRequest,
|
|
request: Request,
|
|
settings: Settings = Depends(get_settings),
|
|
session: AsyncSession = Depends(get_db_session),
|
|
binding_service: BindingService = Depends(get_binding_service),
|
|
):
|
|
binding = await get_binding_or_404(session, payload.id)
|
|
token_hash = binding.token_hash
|
|
await session.delete(binding)
|
|
await session.commit()
|
|
await binding_service.invalidate_binding_cache(token_hash)
|
|
log_admin_action(request, settings, "unbind", payload.id)
|
|
return {"success": True}
|
|
|
|
|
|
@router.put("/ip")
|
|
async def update_bound_ip(
|
|
payload: BindingIPUpdateRequest,
|
|
request: Request,
|
|
settings: Settings = Depends(get_settings),
|
|
session: AsyncSession = Depends(get_db_session),
|
|
binding_service: BindingService = Depends(get_binding_service),
|
|
):
|
|
binding = await get_binding_or_404(session, payload.id)
|
|
binding.binding_mode = payload.binding_mode
|
|
binding.allowed_ips = payload.allowed_ips
|
|
binding.bound_ip = binding_service.build_bound_ip_display(payload.binding_mode, payload.allowed_ips)
|
|
await session.commit()
|
|
await commit_binding_cache(binding, binding_service)
|
|
log_admin_action(request, settings, "update_ip", payload.id)
|
|
return {"success": True}
|
|
|
|
|
|
@router.post("/ban")
|
|
async def ban_token(
|
|
payload: BindingActionRequest,
|
|
request: Request,
|
|
settings: Settings = Depends(get_settings),
|
|
session: AsyncSession = Depends(get_db_session),
|
|
binding_service: BindingService = Depends(get_binding_service),
|
|
):
|
|
binding = await get_binding_or_404(session, payload.id)
|
|
await update_binding_status(session, binding, STATUS_BANNED, binding_service)
|
|
log_admin_action(request, settings, "ban", payload.id)
|
|
return {"success": True}
|
|
|
|
|
|
@router.post("/unban")
|
|
async def unban_token(
|
|
payload: BindingActionRequest,
|
|
request: Request,
|
|
settings: Settings = Depends(get_settings),
|
|
session: AsyncSession = Depends(get_db_session),
|
|
binding_service: BindingService = Depends(get_binding_service),
|
|
):
|
|
binding = await get_binding_or_404(session, payload.id)
|
|
await update_binding_status(session, binding, STATUS_ACTIVE, binding_service)
|
|
log_admin_action(request, settings, "unban", payload.id)
|
|
return {"success": True}
|