Files
sentinel/app/models/db.py
chy88 ab1bd90c65 feat(core): 初始化 Key-IP Sentinel 服务与部署骨架
- 搭建 FastAPI、Redis、PostgreSQL、Nginx 与 Docker Compose 基础结构
- 实现反向代理、首用绑定、拦截告警、归档任务和管理接口
- 提供 Vue3 管理后台初版,以及 uv/requirements 双依赖配置
2026-03-04 00:18:33 +08:00

49 lines
1.2 KiB
Python

from __future__ import annotations
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase
from app.config import Settings
class Base(DeclarativeBase):
pass
_engine: AsyncEngine | None = None
_session_factory: async_sessionmaker[AsyncSession] | None = None
def init_db(settings: Settings) -> None:
global _engine, _session_factory
if _engine is not None and _session_factory is not None:
return
_engine = create_async_engine(
settings.pg_dsn,
pool_pre_ping=True,
pool_size=20,
max_overflow=40,
)
_session_factory = async_sessionmaker(_engine, expire_on_commit=False)
def get_engine() -> AsyncEngine:
if _engine is None:
raise RuntimeError("Database engine has not been initialized.")
return _engine
def get_session_factory() -> async_sessionmaker[AsyncSession]:
if _session_factory is None:
raise RuntimeError("Database session factory has not been initialized.")
return _session_factory
async def close_db() -> None:
global _engine, _session_factory
if _engine is not None:
await _engine.dispose()
_engine = None
_session_factory = None