简单的问题,但 Google 或 Pika 开源代码没有帮助。有没有办法在 Pika 中查询当前队列大小(项目计数器)?
我知道这个问题有点老了,但这里有一个使用鼠兔执行此操作的示例。
对于 AMQP 和 RabbitMQ,如果您已经声明了队列,则可以在启用 passive flag 的情况下重新声明队列,并保持所有其他队列参数相同。对此声明的响应declare-ok将包括队列中的消息数量。
这是 pika 0.9.5 的示例:
import pika
def on_callback(msg):
print msg
params = pika.ConnectionParameters(
host='localhost',
port=5672,
credentials=pika.credentials.PlainCredentials('guest', 'guest'),
)
# Open a connection to RabbitMQ on localhost using all default parameters
connection = pika.BlockingConnection(parameters=params)
# Open the channel
channel = connection.channel()
# Declare the queue
channel.queue_declare(
callback=on_callback,
queue="test",
durable=True,
exclusive=False,
auto_delete=False
)
# ...
# Re-declare the queue with passive flag
res = channel.queue_declare(
callback=on_callback,
queue="test",
durable=True,
exclusive=False,
auto_delete=False,
passive=True
)
print 'Messages in queue %d' % res.method.message_count
这将打印以下内容:
<Method(['frame_type=1', 'channel_number=1', "method=<Queue.DeclareOk(['queue=test', 'message_count=0', 'consumer_count=0'])>"])>
<Method(['frame_type=1', 'channel_number=1', "method=<Queue.DeclareOk(['queue=test', 'message_count=0', 'consumer_count=0'])>"])>
Messages in queue 0
您会收到来自
message_count
会员的消息数量。
以下是如何使用 pika 获取队列长度(考虑到您在本地主机上使用默认用户和密码) 将 q_name 替换为您的队列名称。
import pika
connection = pika.BlockingConnection()
channel = connection.channel()
q = channel.queue_declare(q_name)
q_len = q.method.message_count
AMQP协议中有两种获取队列大小的方法。您可以使用 Queue.Declare 或 Basic.Get。
如果您使用 Basic.Consume 在消息到达时使用它们,那么您将无法获取此信息,除非您断开连接(超时)并重新声明队列,或者获取一条消息但不确认它。在较新版本的 AMQP 中,您可以主动重新排队消息。
至于 Pika,我不知道具体情况,但 AMQP 的 Python 客户端一直是我的眼中钉。通常,您需要对类进行猴子补丁才能获取所需的信息,或者允许队列使用者超时,以便您可以定期执行其他操作,例如记录统计信息或找出队列中有多少消息。
解决此问题的另一种方法是放弃,并使用 Pipe 类来运行
sudo rabbitmqctl list_queues -p my_vhost
。然后解析输出以查找所有队列的大小。如果您这样做,您将需要配置 /etc/sudoers
不要求通常的 sudo 密码。
我祈祷有更多 Pika 经验的其他人通过指出如何做我提到的所有事情来回答这个问题,在这种情况下,我将下载 Pika 并踢轮胎。但如果这种情况没有发生,并且您在对 Pika 代码进行猴子修补时遇到困难,那么请查看
haigha
。我发现他们的代码比其他 Python AMQP 客户端库更简单,因为它们更接近 AMQP 协议。
发布此内容以防其他人遇到此讨论。得票最多的答案,即:
# Re-declare the queue with passive flag
res = channel.queue_declare(
callback=on_callback,
queue="test",
durable=True,
exclusive=False,
auto_delete=False,
passive=True
)
对我非常有帮助,但它有一个严重的警告。根据 pika 文档,
passive
标志用于“仅检查队列是否存在”。因此,人们会想象您可以使用带有 passive
标志的queue_declare 函数来检查在队列可能从未声明过的情况下是否存在队列。根据我的测试,如果您使用 passive
标志调用此函数并且队列不存在,则 api 不仅会抛出异常,还会引发异常。它还会导致代理断开您的通道,因此即使您优雅地捕获异常,您也会失去与代理的连接。我使用 2 个不同的 python 脚本针对在 minikube 中运行的普通 RabbitMQ 容器进行了测试。我已经多次运行此测试,每次都会得到相同的行为。
我的测试代码:
import logging
import pika
logging.basicConfig(level="INFO")
logger = logging.getLogger(__name__)
logging.getLogger("pika").setLevel(logging.WARNING)
def on_callback(msg):
logger.info(f"Callback msg: {msg}")
queue_name = "testy"
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost", port=5672, credentials=credentials)
)
logger.info("Connection established")
channel = connection.channel()
logger.info("Channel created")
channel.exchange_declare(exchange="svc-exchange", exchange_type="direct", durable=True)
response = channel.queue_declare(
queue=queue_name, durable=True, exclusive=False, auto_delete=False, passive=True
)
logger.info(f"queue_declare response: {response}")
channel.queue_delete(queue=queue_name)
connection.close()
输出:
INFO:__main__:Connection established
INFO:__main__:Channel created
WARNING:pika.channel:Received remote Channel.Close (404): "NOT_FOUND - no queue 'testy' in vhost '/'" on <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x1047e2700> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>
Traceback (most recent call last):
File "check_queue_len.py", line 29, in <module>
response = channel.queue_declare(
File "/Users/dbailey/dev/asc-service-deployment/venv/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2521, in queue_declare
self._flush_output(declare_ok_result.is_ready)
File "/Users/dbailey/dev/asc-service-deployment/venv/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1354, in _flush_output
raise self._closing_reason # pylint: disable=E0702
pika.exceptions.ChannelClosedByBroker: (404, "NOT_FOUND - no queue 'testy' in vhost '/'")
当我将
passive
设置为 False 时:
scripts % python check_queue_len.py
INFO:__main__:Connection established
INFO:__main__:Channel created
INFO:__main__:queue_declare response: <METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=0', 'queue=testy'])>"])>
如果我在这里遗漏了什么,请告诉我。
我迟到了,但这是一个使用 pyrabbit 或 pyrabbit2 从 AWS AmazonMQ 通过 HTTPS 获取队列计数的示例,也应该适用于 RabbitMQ:
from pyrabbit2.api import Client
cl = Client('b-xxxxxx.mq.ap-southeast-1.amazonaws.com', 'user', 'password', scheme='https')
if not cl.is_alive():
raise Exception("Failed to connect to rabbitmq")
for i in cl.get_all_vhosts():
print(i['name'])
queues = [q['name'] for q in cl.get_queues('/')]
print(queues)
itemCount = cl.get_queue_depth('/', 'event.stream.my-api')
print(itemCount)
如果您使用 aio-pika ver. 9.0.5(或兼容)这将起作用:
import aio_pika
from ..some_path import settings
connection = await aio_pika.connect(settings.RABBIT_MQ_URL)
async with connection:
channel = await connection.channel()
queue = await channel.declare_queue(
settings.MY_QUEUE_NAME,
durable=True,
)
print(queue.declaration_result.message_count)
declaration_result
未在文档中描述,但可在 源代码 中找到。因此,这个对象可能在 pika
和 aio-pika
的所有版本中以不同的名称存在,您只需检查 Queue
版本的类 pika
的源代码,或者只执行 dir()
并尝试弄清楚这个属性叫什么:)