当我逐一运行它们时,我的测试可以正确运行,但如果我在测试套件中运行它们,则无法相互干扰。
/postgresql/asyncpg.py", line 789, in _handle_exception
raise translated_error from error
sqlalchemy.exc.InterfaceError: (sqlalchemy.dialects.postgresql.asyncpg.InterfaceError) <class 'asyncpg.exceptions._base.InterfaceError'>: cannot perform operation: another operation is in progress
我尝试简化场景并编写了一个存储库,它将 CRUD 操作封装到数据库中。
我编写了一个测试套件,通过测试覆盖了这个存储库。
但是第一次启动显示了异常,我还没有弄清楚。
async def create_inventory(self, inventory_data: dict) -> None:
> async with self.database.get_async_session() as session:
E AttributeError: 'async_generator' object has no attribute 'get_async_session'
@pytest.fixture
async def test_database() -> Database:
"""Fixture for setting up a test database."""
# Initialize the Database
db = Database()
# Create the database tables if needed
async with db.get_async_session() as session:
async with session.begin():
# Here you might want to create your tables for tests
await session.run_sync(Base.metadata.create_all)
yield db # This provides the db instance to the tests
# Teardown can go here if necessary
async with db.get_async_session() as session:
async with session.begin():
await session.run_sync(Base.metadata.drop_all)
@pytest.mark.asyncio
async def test_create_inventory(test_database: Database):
"""Test creating a new Inventory record."""
repo = InventoryRepository(test_database)
inventory_data = {
"time": 1234567890,
"flight": "FL456",
"departure": 9876543211,
"flight_booking_class": "Economy",
"idle_seats_count": 15
}
await repo.create_inventory(inventory_data)
new_inventory = await repo.get_inventory(
flight=inventory_data["flight"],
flight_booking_class=inventory_data["flight_booking_class"]
)
assert new_inventory.flight == inventory_data["flight"]
assert new_inventory.idle_seats_count == inventory_data["idle_seats_count"]
存储库和数据库层:
class Database:
def __init__(self) -> None:
# Database credentials
self.db_username = os.getenv('POSTGRES_USER', "myuser")
self.db_password = os.getenv('POSTGRES_PASSWORD', "mypassword")
self.db_name = os.getenv('POSTGRES_DB', "mydatabase")
self.db_host = os.getenv('POSTGRES_HOST')
self.db_port = os.getenv('POSTGRES_PORT', 5432)
# Ensure all necessary environment variables are set
if not all([self.db_username, self.db_password, self.db_name, self.db_host]):
raise ValueError("Missing one or more environment variables for source or sink database.")
# Database URLs
self.sqlalchemy_database_url = (
f"postgresql+asyncpg://{self.db_username}:{self.db_password}@"
f"{self.db_host}:{self.db_port}/{self.db_name}"
)
# Create async engines
self.engine = create_async_engine(self.sqlalchemy_database_url, echo=True)
# Async session makers for each database
self.AsyncSourceSessionLocal = async_scoped_session(
sessionmaker(
bind=self.engine,
class_=AsyncSession,
autocommit=False,
autoflush=False,
), scopefunc=current_task
)
async def get_async_session(self) -> AsyncGenerator[AsyncSession, None]:
return await self.AsyncSourceSessionLocal()
class InventoryRepository:
def __init__(self, database: Database) -> None:
self.database = database
async def create_inventory(self, inventory_data: dict) -> None:
async with self.database.get_async_session() as session:
async with session.begin():
stmt = insert(Inventory).values(**inventory_data)
await session.execute(stmt)
async def get_inventory(self, flight: str, flight_booking_class: str) -> Optional[Inventory]:
async with self.database.get_async_session() as session:
stmt = select(Inventory).where(
Inventory.flight == flight,
Inventory.flight_booking_class == flight_booking_class
)
result = await session.execute(stmt)
return result.scalar_one_or_none()
您能否请我弄清楚如何克服这个问题,或者给我一个正确的例子,如何通过可以作为测试套件运行的测试来覆盖这个场景?
同性恋!
首先,我犯了一个错误,留下了重命名建议而没有解释原因(我认为
pytest.fixture
不允许修补名称以test_
开头的函数)
其次,通过提供包含导入的完整示例来节省帮助者的时间是一种礼貌,以便其他人可以将其复制到文件并运行
python xxx.py
来重现问题。
第三,看到这些话我很生气,很不高兴:
Shall we move politics out of this platform...
最后,正如 document 所说,您可能需要使用
@pytest_asyncio.fixture
而不是 @pytest.fixture