Files
sentinel/app/services/archive_service.py

137 lines
4.9 KiB
Python
Raw Permalink Normal View History

from __future__ import annotations
import logging
from datetime import UTC, datetime, timedelta
from typing import Callable
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from sqlalchemy import delete, select, text
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncConnection, AsyncEngine, AsyncSession, async_sessionmaker
from app.config import RuntimeSettings, Settings
from app.models.token_binding import TokenBinding
from app.services.binding_service import BindingService
logger = logging.getLogger(__name__)
class ArchiveService:
def __init__(
self,
settings: Settings,
engine: AsyncEngine,
session_factory: async_sessionmaker[AsyncSession],
binding_service: BindingService,
runtime_settings_getter: Callable[[], RuntimeSettings],
) -> None:
self.settings = settings
self.engine = engine
self.session_factory = session_factory
self.binding_service = binding_service
self.runtime_settings_getter = runtime_settings_getter
self.scheduler = AsyncIOScheduler(timezone="UTC")
self._leader_connection: AsyncConnection | None = None
async def start(self) -> None:
if self.scheduler.running:
return
if not await self._acquire_leader_lock():
logger.info("Archive scheduler leader lock not acquired; skipping local scheduler start.")
return
try:
self.scheduler.add_job(
self.archive_inactive_bindings,
trigger="interval",
minutes=self.settings.archive_job_interval_minutes,
id="archive-inactive-bindings",
replace_existing=True,
max_instances=1,
coalesce=True,
)
self.scheduler.start()
except Exception:
await self._release_leader_lock()
raise
logger.info("Archive scheduler started on current worker.")
async def stop(self) -> None:
if self.scheduler.running:
self.scheduler.shutdown(wait=False)
await self._release_leader_lock()
async def archive_inactive_bindings(self) -> int:
runtime_settings = self.runtime_settings_getter()
cutoff = datetime.now(UTC) - timedelta(days=runtime_settings.archive_days)
total_archived = 0
while True:
async with self.session_factory() as session:
try:
result = await session.execute(
select(TokenBinding.token_hash)
.where(TokenBinding.last_used_at < cutoff)
.order_by(TokenBinding.last_used_at.asc())
.limit(self.settings.archive_batch_size)
)
token_hashes = list(result.scalars())
if not token_hashes:
break
await session.execute(delete(TokenBinding).where(TokenBinding.token_hash.in_(token_hashes)))
await session.commit()
except SQLAlchemyError:
await session.rollback()
logger.exception("Failed to archive inactive bindings.")
break
await self.binding_service.invalidate_many(token_hashes)
total_archived += len(token_hashes)
if len(token_hashes) < self.settings.archive_batch_size:
break
if total_archived:
logger.info("Archived inactive bindings.", extra={"count": total_archived})
return total_archived
async def _acquire_leader_lock(self) -> bool:
if self._leader_connection is not None:
return True
connection = await self.engine.connect()
try:
acquired = bool(
await connection.scalar(
text("SELECT pg_try_advisory_lock(:lock_key)"),
{"lock_key": self.settings.archive_scheduler_lock_key},
)
)
except Exception:
await connection.close()
logger.exception("Failed to acquire archive scheduler leader lock.")
return False
if not acquired:
await connection.close()
return False
self._leader_connection = connection
return True
async def _release_leader_lock(self) -> None:
if self._leader_connection is None:
return
connection = self._leader_connection
self._leader_connection = None
try:
await connection.execute(
text("SELECT pg_advisory_unlock(:lock_key)"),
{"lock_key": self.settings.archive_scheduler_lock_key},
)
except Exception:
logger.warning("Failed to release archive scheduler leader lock cleanly.")
finally:
await connection.close()