from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from alien_gateway.config import settings # 异步数据库引擎 engine = create_async_engine( settings.SQLALCHEMY_DATABASE_URI.replace("mysql+pymysql", "mysql+aiomysql"), echo=True, pool_pre_ping=True, ) AsyncSessionLocal = async_sessionmaker( bind=engine, class_=AsyncSession, expire_on_commit=False, ) async def get_db(): """获取数据库 Session 的依赖项""" async with AsyncSessionLocal() as session: yield session