我正在使用 Python Azure ServiceBus 包 并具有以下代码片段:
# receiver.py
import logging
logger = logging.getLogger("test")
def receive_message(
connection_str,
queue_name
) -> None:
"""
Call Azure ServiceBus API to retrieve message from a queue
"""
with ServiceBusClient.from_connection_string(
connection_str, logging_enable=True
) as servicebus_client:
with servicebus_client.get_queue_receiver(
queue_name=queue_name,
max_wait_time=20,
receive_mode=ServiceBusReceiveMode.PEEK_LOCK,
) as receiver:
for message in receiver:
logger.debug(f"Received message {message}")
我正在尝试为此函数编写单元测试,并且希望能够模拟
recevier
。这是我编写单元测试的尝试,但失败了,因为我不知道如何让测试进入 for message in receiver
块。
# test_receiver.py
@patch("receiver.ServiceBusClient")
@patch("receiver.logger")
def test_receive_message(mock_logger, mock_svcbus_client):
# Figure out how to mock
mock_svcbus_client.from_connection_string.return_value.get_queue_receiver.return_value = iter(["message"])
receive_message("mock_connection_str", "mock_q_name")
# Assertion fails
mock_logger.return_value.debug.assert_called_once()
你可以尝试
from mocks import MockReceivedMessage, MockReceiver
来嘲笑receiver
示例1:
class MockReceivedMessage(ServiceBusReceivedMessage):
def __init__(self, prevent_renew_lock=False, exception_on_renew_lock=False, **kwargs):
self._lock_duration = kwargs.get("lock_duration", 2)
self._raw_amqp_message = None
self._received_timestamp_utc = utc_now()
self.locked_until_utc = self._received_timestamp_utc + timedelta(seconds=self._lock_duration)
self._settled = False
self._receiver = MockReceiver()
self._prevent_renew_lock = prevent_renew_lock
self._exception_on_renew_lock = exception_on_renew_lock
示例2:
def test_queue_message_receive_and_delete(self, servicebus_namespace_connection_string, servicebus_queue, **kwargs):
with ServiceBusClient.from_connection_string(
servicebus_namespace_connection_string, logging_enable=False) as sb_client:
with sb_client.get_queue_sender(servicebus_queue.name) as sender:
message = ServiceBusMessage("Receive and delete test")
sender.send_messages(message)
with sb_client.get_queue_receiver(servicebus_queue.name,
receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE) as receiver:
messages = receiver.receive_messages(max_wait_time=10)
assert len(messages) == 1
message = messages[0]
print_message(_logger, message)
with pytest.raises(ValueError):
receiver.complete_message(message)
with pytest.raises(ValueError):
receiver.abandon_message(message)
with pytest.raises(ValueError):
receiver.defer_message(message)
with pytest.raises(ValueError):
receiver.dead_letter_message(message)
with pytest.raises(ValueError):
receiver.renew_message_lock(message)
time.sleep(10)
with sb_client.get_queue_receiver(servicebus_queue.name) as receiver:
messages = receiver.receive_messages(max_wait_time=10)
for m in messages:
print_message(_logger, m)
assert len(messages) == 0
可以参考mocks.py 和 test_queues.py
如果仍有疑问,可以在 GitHub 上提出问题:azure-sdk-for-python
可以模拟从
get_queue_receiver
方法返回的接收器。我有一个服务总线客户端 async
版本的示例,但如果需要,很容易将其调整为同步版本。因此,请安装以下软件包:
pip install azure-servicebus pytest pytest-asyncio
首先,您需要注入
ServiceBusClient
实例以在测试代码中模拟它。创建一个类,其中构造函数采用 ServiceBusClient
实例:
# azure_servicebus.py
from azure.servicebus.aio import ServiceBusClient
class AzureServiceBus():
def __init__(self, client: ServiceBusClient) -> None:
self._client = client
async def receive_events_async(self, queue_name: str) -> None:
async with self._client.get_queue_receiver(queue_name) as receiver:
messages = await receiver.receive_messages()
for message in messages:
print(str(message))
await receiver.complete_message(message)
在测试文件中,
AsyncMock
类和@pytest.mark.asyncio
装饰器可用于测试async/await代码。我们的重点是修补 get_queue_receiver
语句:
# azure_servicebus_test.py
from unittest.mock import AsyncMock, patch
import pytest
from azure.servicebus.aio import ServiceBusClient
from module.path.for.azure_servicebus import AzureServiceBus
@pytest.mark.asyncio
async def test_receive_message():
mock_receiver = AsyncMock()
mock_receiver.__aenter__.return_value = mock_receiver
mock_receiver.__aexit__.return_value = False
mock_receiver.receive_messages.return_value= ["message 1", "message 2"]
mock_receiver.complete_message.return_value = None
with patch("azure.servicebus.aio.ServiceBusClient.get_queue_receiver", return_value=mock_receiver):
connstr = "Endpoint=sb://service-bus-hostname-test.servicebus.windows.net/;SharedAccessKeyName=shared-access-key-test;SharedAccessKey=sharedaccesskeytest=;EntityPath=topic-path-test"
async with ServiceBusClient.from_connection_string(connstr) as client:
service_bus = AzureServiceBus(client)
await service_bus.receive_events_async("my_queue_name")
需要设置
__aenter__
和 __aexit__
方法,因为我们正在使用异步上下文管理器。具体来说,async with
将调用这些方法。
另一个重要的部分是连接字符串的形式。如果格式不正确,则初始化将引发异常。您不需要有效的连接字符串,而是格式正确的连接字符串。
测试的输出应如下所示:
$ pytest <path_to_test>/azure_servicebus_test.py -vs
=================================================================================================== test session starts ====================================================================================================
platform win32 -- Python 3.10.11, pytest-7.4.4, pluggy-1.5.0 -- C:\<path-to-virtual_env>\.venv\Scripts\python.exe
cachedir: .pytest_cache
rootdir: C:\<path_to_root_dir>
configfile: pyproject.toml
plugins: anyio-4.4.0, asyncio-0.23.7, azurepipelines-1.0.5, cov-4.1.0, mock-3.14.0, nunit-1.0.7
asyncio: mode=strict
collected 1 item
<path_to_test>/azure_servicebus_test.py::test_receive_message message 1
message 2
PASSED
这两条消息是从模拟的接收者实例接收的。
注意:对于同步版本,您不需要需要
pytest-asyncio
插件、装饰器和async/await关键字。此外,使用 MagicMock
而不是 AsyncMock
类,并设置同步上下文管理器(即 __enter__
和 __exit__
方法)。