session.py 578 B

123456789101112131415161718
  1. """
  2. SQLAlchemy async engine and sessions tools
  3. https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html
  4. """
  5. from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
  6. from app.core import config
  7. if config.settings.ENVIRONMENT == "PYTEST":
  8. sqlalchemy_database_uri = config.settings.TEST_SQLALCHEMY_DATABASE_URI
  9. else:
  10. sqlalchemy_database_uri = config.settings.DEFAULT_SQLALCHEMY_DATABASE_URI
  11. async_engine = create_async_engine(sqlalchemy_database_uri, pool_pre_ping=True)
  12. async_session = async_sessionmaker(async_engine, expire_on_commit=False)