测试套件覆盖存储库,所有测试都在 python 上同时运行

问题描述 投票:0回答:1

当我逐一运行它们时,我的测试可以正确运行,但如果我在测试套件中运行它们,则无法相互干扰。

/postgresql/asyncpg.py", line 789, in _handle_exception
    raise translated_error from error
sqlalchemy.exc.InterfaceError: (sqlalchemy.dialects.postgresql.asyncpg.InterfaceError) <class 'asyncpg.exceptions._base.InterfaceError'>: cannot perform operation: another operation is in progress

我尝试简化场景并编写了一个存储库,它将 CRUD 操作封装到数据库中。

我编写了一个测试套件,通过测试覆盖了这个存储库。

但是第一次启动显示了异常,我还没有弄清楚。

async def create_inventory(self, inventory_data: dict) -> None:
>       async with self.database.get_async_session() as session:
E       AttributeError: 'async_generator' object has no attribute 'get_async_session'
@pytest.fixture
async def test_database() -> Database:
    """Fixture for setting up a test database."""
    # Initialize the Database
    db = Database()
    
    # Create the database tables if needed
    async with db.get_async_session() as session:
        async with session.begin():
            # Here you might want to create your tables for tests
            await session.run_sync(Base.metadata.create_all)

    yield db  # This provides the db instance to the tests

    # Teardown can go here if necessary
    async with db.get_async_session() as session:
        async with session.begin():
            await session.run_sync(Base.metadata.drop_all)  
    
@pytest.mark.asyncio
async def test_create_inventory(test_database: Database):
    """Test creating a new Inventory record."""
    repo = InventoryRepository(test_database)
    inventory_data = {
        "time": 1234567890,
        "flight": "FL456",
        "departure": 9876543211,
        "flight_booking_class": "Economy",
        "idle_seats_count": 15
    }
    
    await repo.create_inventory(inventory_data)

    new_inventory = await repo.get_inventory(
        flight=inventory_data["flight"], 
        flight_booking_class=inventory_data["flight_booking_class"]
    )
    
    assert new_inventory.flight == inventory_data["flight"]
    assert new_inventory.idle_seats_count == inventory_data["idle_seats_count"]

存储库和数据库层:

class Database:
    def __init__(self) -> None:
        # Database credentials
        self.db_username = os.getenv('POSTGRES_USER', "myuser")
        self.db_password = os.getenv('POSTGRES_PASSWORD', "mypassword")
        self.db_name = os.getenv('POSTGRES_DB', "mydatabase")
        self.db_host = os.getenv('POSTGRES_HOST')
        self.db_port = os.getenv('POSTGRES_PORT', 5432)
        
        # Ensure all necessary environment variables are set
        if not all([self.db_username, self.db_password, self.db_name, self.db_host]):
            raise ValueError("Missing one or more environment variables for source or sink database.")

        # Database URLs
        self.sqlalchemy_database_url = (
            f"postgresql+asyncpg://{self.db_username}:{self.db_password}@"
            f"{self.db_host}:{self.db_port}/{self.db_name}"
        )

        # Create async engines
        self.engine = create_async_engine(self.sqlalchemy_database_url, echo=True)

        # Async session makers for each database
        self.AsyncSourceSessionLocal = async_scoped_session(
            sessionmaker(
                bind=self.engine,
                class_=AsyncSession,
                autocommit=False,
                autoflush=False,
            ), scopefunc=current_task
        )
    
    async def get_async_session(self) -> AsyncGenerator[AsyncSession, None]:
        return await self.AsyncSourceSessionLocal() 
    
class InventoryRepository:
    def __init__(self, database: Database) -> None:
        self.database = database

    async def create_inventory(self, inventory_data: dict) -> None:
        async with self.database.get_async_session() as session:
            async with session.begin():
                stmt = insert(Inventory).values(**inventory_data)
                await session.execute(stmt)

    async def get_inventory(self, flight: str, flight_booking_class: str) -> Optional[Inventory]:
        async with self.database.get_async_session() as session:
            stmt = select(Inventory).where(
                Inventory.flight == flight,
                Inventory.flight_booking_class == flight_booking_class
            )
            result = await session.execute(stmt)
            return result.scalar_one_or_none()

您能否请我弄清楚如何克服这个问题,或者给我一个正确的例子,如何通过可以作为测试套件运行的测试来覆盖这个场景?

python database testing pytest
1个回答
0
投票

同性恋!

首先,我犯了一个错误,留下了重命名建议而没有解释原因(我认为

pytest.fixture
不允许修补名称以
test_
开头的函数)

其次,通过提供包含导入的完整示例来节省帮助者的时间是一种礼貌,以便其他人可以将其复制到文件并运行

python xxx.py
来重现问题。

第三,看到这些话我很生气,很不高兴:

Shall we move politics out of this platform...

最后,正如 document 所说,您可能需要使用

@pytest_asyncio.fixture
而不是
@pytest.fixture

© www.soinside.com 2019 - 2024. All rights reserved.