from __future__ import annotations from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.orm import DeclarativeBase from app.config import Settings class Base(DeclarativeBase): pass _engine: AsyncEngine | None = None _session_factory: async_sessionmaker[AsyncSession] | None = None def init_db(settings: Settings) -> None: global _engine, _session_factory if _engine is not None and _session_factory is not None: return _engine = create_async_engine( settings.pg_dsn, pool_pre_ping=True, pool_size=20, max_overflow=40, ) _session_factory = async_sessionmaker(_engine, expire_on_commit=False) def get_engine() -> AsyncEngine: if _engine is None: raise RuntimeError("Database engine has not been initialized.") return _engine def get_session_factory() -> async_sessionmaker[AsyncSession]: if _session_factory is None: raise RuntimeError("Database session factory has not been initialized.") return _session_factory async def close_db() -> None: global _engine, _session_factory if _engine is not None: await _engine.dispose() _engine = None _session_factory = None