如何在Python MagicMock中模拟Azure ServiceBus接收器

问题描述 投票:0回答:2

我正在使用 Python Azure ServiceBus 包 并具有以下代码片段:

# receiver.py

import logging
logger = logging.getLogger("test")

def receive_message(
        connection_str,
        queue_name
) -> None:
    """
    Call Azure ServiceBus API to retrieve message from a queue
    """
    with ServiceBusClient.from_connection_string(
            connection_str, logging_enable=True
    ) as servicebus_client:

        with servicebus_client.get_queue_receiver(
                queue_name=queue_name,
                max_wait_time=20,
                receive_mode=ServiceBusReceiveMode.PEEK_LOCK,
        ) as receiver:
            for message in receiver:
                logger.debug(f"Received message {message}")

我正在尝试为此函数编写单元测试,并且希望能够模拟

recevier
。这是我编写单元测试的尝试,但失败了,因为我不知道如何让测试进入
for message in receiver
块。

# test_receiver.py

@patch("receiver.ServiceBusClient")
@patch("receiver.logger")
def test_receive_message(mock_logger, mock_svcbus_client):

    # Figure out how to mock
    mock_svcbus_client.from_connection_string.return_value.get_queue_receiver.return_value = iter(["message"])


    receive_message("mock_connection_str", "mock_q_name")
    
    # Assertion fails
    mock_logger.return_value.debug.assert_called_once()
python azure unit-testing pytest azureservicebus
2个回答
3
投票

你可以尝试

from mocks import MockReceivedMessage, MockReceiver
来嘲笑
receiver

示例1:

class MockReceivedMessage(ServiceBusReceivedMessage):
    def __init__(self, prevent_renew_lock=False, exception_on_renew_lock=False, **kwargs):
        self._lock_duration = kwargs.get("lock_duration", 2)
        self._raw_amqp_message = None
        self._received_timestamp_utc = utc_now()
        self.locked_until_utc = self._received_timestamp_utc + timedelta(seconds=self._lock_duration)
        self._settled = False
        self._receiver = MockReceiver()

        self._prevent_renew_lock = prevent_renew_lock
        self._exception_on_renew_lock = exception_on_renew_lock

示例2

 def test_queue_message_receive_and_delete(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
        
        with ServiceBusClient.from_connection_string(
            servicebus_namespace_connection_string, logging_enable=False) as sb_client:
                
            with sb_client.get_queue_sender(servicebus_queue.name) as sender:
                message = ServiceBusMessage("Receive and delete test")
                sender.send_messages(message)
    
            with sb_client.get_queue_receiver(servicebus_queue.name,
                                                 receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE) as receiver:
                messages = receiver.receive_messages(max_wait_time=10)
                assert len(messages) == 1
                message = messages[0]
                print_message(_logger, message)
                with pytest.raises(ValueError):
                    receiver.complete_message(message)
                with pytest.raises(ValueError):
                    receiver.abandon_message(message)
                with pytest.raises(ValueError):
                    receiver.defer_message(message)
                with pytest.raises(ValueError):
                    receiver.dead_letter_message(message)
                with pytest.raises(ValueError):
                    receiver.renew_message_lock(message)
    
            time.sleep(10)
    
            with sb_client.get_queue_receiver(servicebus_queue.name) as receiver:
                messages = receiver.receive_messages(max_wait_time=10)
                for m in messages:
                    print_message(_logger, m)
                assert len(messages) == 0

可以参考mocks.pytest_queues.py

如果仍有疑问,可以在 GitHub 上提出问题:azure-sdk-for-python


0
投票

可以模拟从

get_queue_receiver
方法返回的接收器。我有一个服务总线客户端
async
版本的示例,但如果需要,很容易将其调整为同步版本。因此,请安装以下软件包:

pip install azure-servicebus pytest pytest-asyncio

首先,您需要注入

ServiceBusClient
实例以在测试代码中模拟它。创建一个类,其中构造函数采用
ServiceBusClient
实例:

# azure_servicebus.py
from azure.servicebus.aio import ServiceBusClient


class AzureServiceBus():
    def __init__(self, client: ServiceBusClient) -> None:
        self._client = client

    async def receive_events_async(self, queue_name: str) -> None:
        async with self._client.get_queue_receiver(queue_name) as receiver:
            messages = await receiver.receive_messages()
            for message in messages:
                print(str(message))
                await receiver.complete_message(message)

在测试文件中,

AsyncMock
类和
@pytest.mark.asyncio
装饰器可用于测试async/await代码。我们的重点是修补
get_queue_receiver
语句:

# azure_servicebus_test.py
from unittest.mock import AsyncMock, patch

import pytest
from azure.servicebus.aio import ServiceBusClient
from module.path.for.azure_servicebus import AzureServiceBus


@pytest.mark.asyncio
async def test_receive_message():
    mock_receiver = AsyncMock()
    mock_receiver.__aenter__.return_value = mock_receiver
    mock_receiver.__aexit__.return_value = False
    mock_receiver.receive_messages.return_value= ["message 1", "message 2"]
    mock_receiver.complete_message.return_value = None

    with patch("azure.servicebus.aio.ServiceBusClient.get_queue_receiver", return_value=mock_receiver):
        connstr = "Endpoint=sb://service-bus-hostname-test.servicebus.windows.net/;SharedAccessKeyName=shared-access-key-test;SharedAccessKey=sharedaccesskeytest=;EntityPath=topic-path-test"
        async with ServiceBusClient.from_connection_string(connstr) as client:
            service_bus = AzureServiceBus(client)
            await service_bus.receive_events_async("my_queue_name")

需要设置

__aenter__
__aexit__
方法,因为我们正在使用异步上下文管理器。具体来说,
async with
将调用这些方法。

另一个重要的部分是连接字符串的形式。如果格式不正确,则初始化将引发异常。您不需要有效的连接字符串,而是格式正确的连接字符串。

测试的输出应如下所示:

$ pytest <path_to_test>/azure_servicebus_test.py -vs
=================================================================================================== test session starts ====================================================================================================
platform win32 -- Python 3.10.11, pytest-7.4.4, pluggy-1.5.0 -- C:\<path-to-virtual_env>\.venv\Scripts\python.exe
cachedir: .pytest_cache
rootdir: C:\<path_to_root_dir>
configfile: pyproject.toml
plugins: anyio-4.4.0, asyncio-0.23.7, azurepipelines-1.0.5, cov-4.1.0, mock-3.14.0, nunit-1.0.7
asyncio: mode=strict
collected 1 item

<path_to_test>/azure_servicebus_test.py::test_receive_message message 1
message 2
PASSED

这两条消息是从模拟的接收者实例接收的。

注意:对于同步版本,您不需要需要

pytest-asyncio
插件、装饰器和async/await关键字。此外,使用
MagicMock
而不是
AsyncMock
类,并设置同步上下文管理器(即
__enter__
__exit__
方法)。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.