在我的单元测试中,我想简单地开始消耗、发布消息,并接收到一个响应返回,并断言响应是否是我所期望的。然而,我已经尝试做了几个小时,但没有找到解决方案。
问题是我不能在一个类中定义一个方法来停止消耗。我试着定义了一个这样的方法。
def stop(self):
self.channel.basic_cancel()
def stop(self):
self.channel.stop_consuming()
def stop(self):
self.connection.close()
但似乎没有任何效果 我读到,这是因为一旦你执行了 start_consuming()
,唯一的办法就是在发送消息后取消消耗。但如果我这样做,那么我就会修改原来的 on_request
而这对我的应用没有用,因为连接会在第一条消息后关闭。我发现 pytest-rabbitmq 但文档对我来说并不清楚,因此不知道是否可以使用这个插件来实现我想要的东西。
顺便说一下,这两者之间的区别是什么?basic_cancel
, stop_consuming
和 close
?
我对你的方案没有一个清晰的认识!。以我的理解,你可以在同一个方法中创建连接和通道,这样你就可以在需要的时候发布、消费、断言和停止消费了。
希望能帮到你
def test_rabbitmq():
from pika import BlockingConnection, ConnectionParameters, PlainCredentials
conn = BlockingConnection(ConnectionParameters(host='host', virtual_host='vhost', credentials=PlainCredentials('username', 'password')))
channel = conn.channel()
# define your consumer
def on_message(channel, method_frame, header_frame, body):
message = body.decode()
# assert your message here
# asset message == 'value'
channel.basic_cancel('test-consumer') # stops the consumer
# define your publisher
def publish_message(message):
channel.basic_publish(exchange='', routing_key='', body=message')
publish('your message')
tag = channel.basic_consume(queue='queue', on_message_callback=on_message, consumer_tag='test-consumer')
stop_consuming
- 取消所有消费者,标志着start_consuming循环的退出。
basic_cancel
- 本方法取消一个消费者。一个消费者标签将被作为输入。
close
- 关闭连接通道。