在 Pika 中获取队列大小(AMQP Python)

问题描述 投票:0回答:7

简单的问题,但 Google 或 Pika 开源代码没有帮助。有没有办法在 Pika 中查询当前队列大小(项目计数器)?

python amqp
7个回答
53
投票

我知道这个问题有点老了,但这里有一个使用鼠兔执行此操作的示例。

对于 AMQP 和 RabbitMQ,如果您已经声明了队列,则可以在启用 passive flag 的情况下重新声明队列,并保持所有其他队列参数相同。对此声明的响应declare-ok将包括队列中的消息数量。

这是 pika 0.9.5 的示例:

import pika

def on_callback(msg):
    print msg

params = pika.ConnectionParameters(
        host='localhost',
        port=5672,
        credentials=pika.credentials.PlainCredentials('guest', 'guest'),
    )

# Open a connection to RabbitMQ on localhost using all default parameters
connection = pika.BlockingConnection(parameters=params)

# Open the channel
channel = connection.channel()

# Declare the queue
channel.queue_declare(
        callback=on_callback,
        queue="test",
        durable=True,
        exclusive=False,
        auto_delete=False
    )

# ...

# Re-declare the queue with passive flag
res = channel.queue_declare(
        callback=on_callback,
        queue="test",
        durable=True,
        exclusive=False,
        auto_delete=False,
        passive=True
    )
print 'Messages in queue %d' % res.method.message_count

这将打印以下内容:

<Method(['frame_type=1', 'channel_number=1', "method=<Queue.DeclareOk(['queue=test', 'message_count=0', 'consumer_count=0'])>"])>
<Method(['frame_type=1', 'channel_number=1', "method=<Queue.DeclareOk(['queue=test', 'message_count=0', 'consumer_count=0'])>"])>
Messages in queue 0

您会收到来自

message_count
会员的消息数量。


25
投票

以下是如何使用 pika 获取队列长度(考虑到您在本地主机上使用默认用户和密码) 将 q_name 替换为您的队列名称。

import pika
connection = pika.BlockingConnection()
channel = connection.channel()
q = channel.queue_declare(q_name)
q_len = q.method.message_count

6
投票

你尝试过PyRabbit吗?它有一个

get_queue_depth()
方法,听起来就像你正在寻找的。


5
投票

AMQP协议中有两种获取队列大小的方法。您可以使用 Queue.Declare 或 Basic.Get。

如果您使用 Basic.Consume 在消息到达时使用它们,那么您将无法获取此信息,除非您断开连接(超时)并重新声明队列,或者获取一条消息但不确认它。在较新版本的 AMQP 中,您可以主动重新排队消息。

至于 Pika,我不知道具体情况,但 AMQP 的 Python 客户端一直是我的眼中钉。通常,您需要对类进行猴子补丁才能获取所需的信息,或者允许队列使用者超时,以便您可以定期执行其他操作,例如记录统计信息或找出队列中有多少消息。

解决此问题的另一种方法是放弃,并使用 Pipe 类来运行

sudo rabbitmqctl list_queues -p my_vhost
。然后解析输出以查找所有队列的大小。如果您这样做,您将需要配置
/etc/sudoers
不要求通常的 sudo 密码。

我祈祷有更多 Pika 经验的其他人通过指出如何做我提到的所有事情来回答这个问题,在这种情况下,我将下载 Pika 并踢轮胎。但如果这种情况没有发生,并且您在对 Pika 代码进行猴子修补时遇到困难,那么请查看

haigha
。我发现他们的代码比其他 Python AMQP 客户端库更简单,因为它们更接近 AMQP 协议。


1
投票

发布此内容以防其他人遇到此讨论。得票最多的答案,即:

# Re-declare the queue with passive flag
res = channel.queue_declare(
        callback=on_callback,
        queue="test",
        durable=True,
        exclusive=False,
        auto_delete=False,
        passive=True
    )

对我非常有帮助,但它有一个严重的警告。根据 pika 文档,

passive
标志用于“仅检查队列是否存在”。因此,人们会想象您可以使用带有
passive
标志的queue_declare 函数来检查在队列可能从未声明过的情况下是否存在队列。根据我的测试,如果您使用
passive
标志调用此函数并且队列不存在,则 api 不仅会抛出异常,还会引发异常。它还会导致代理断开您的通道,因此即使您优雅地捕获异常,您也会失去与代理的连接。我使用 2 个不同的 python 脚本针对在 minikube 中运行的普通 RabbitMQ 容器进行了测试。我已经多次运行此测试,每次都会得到相同的行为。

我的测试代码:

import logging
import pika

logging.basicConfig(level="INFO")
logger = logging.getLogger(__name__)
logging.getLogger("pika").setLevel(logging.WARNING)


def on_callback(msg):
    logger.info(f"Callback msg: {msg}")


queue_name = "testy"

credentials = pika.PlainCredentials("guest", "guest")

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host="localhost", port=5672, credentials=credentials)
)

logger.info("Connection established")

channel = connection.channel()

logger.info("Channel created")

channel.exchange_declare(exchange="svc-exchange", exchange_type="direct", durable=True)

response = channel.queue_declare(
    queue=queue_name, durable=True, exclusive=False, auto_delete=False, passive=True
)

logger.info(f"queue_declare response: {response}")

channel.queue_delete(queue=queue_name)

connection.close()

输出:

INFO:__main__:Connection established
INFO:__main__:Channel created
WARNING:pika.channel:Received remote Channel.Close (404): "NOT_FOUND - no queue 'testy' in vhost '/'" on <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x1047e2700> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>
Traceback (most recent call last):
  File "check_queue_len.py", line 29, in <module>
    response = channel.queue_declare(
  File "/Users/dbailey/dev/asc-service-deployment/venv/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2521, in queue_declare
    self._flush_output(declare_ok_result.is_ready)
  File "/Users/dbailey/dev/asc-service-deployment/venv/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1354, in _flush_output
    raise self._closing_reason  # pylint: disable=E0702
pika.exceptions.ChannelClosedByBroker: (404, "NOT_FOUND - no queue 'testy' in vhost '/'")

当我将

passive
设置为 False 时:

scripts % python check_queue_len.py
INFO:__main__:Connection established
INFO:__main__:Channel created
INFO:__main__:queue_declare response: <METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=0', 'queue=testy'])>"])>

如果我在这里遗漏了什么,请告诉我。


0
投票

我迟到了,但这是一个使用 pyrabbitpyrabbit2 从 AWS AmazonMQ 通过 HTTPS 获取队列计数的示例,也应该适用于 RabbitMQ:

from pyrabbit2.api import Client

cl = Client('b-xxxxxx.mq.ap-southeast-1.amazonaws.com', 'user', 'password', scheme='https')
if not cl.is_alive():
    raise Exception("Failed to connect to rabbitmq")

for i in cl.get_all_vhosts():
    print(i['name'])

queues = [q['name'] for q in cl.get_queues('/')]
print(queues)    

itemCount = cl.get_queue_depth('/', 'event.stream.my-api')
print(itemCount)

0
投票

如果您使用 aio-pika ver. 9.0.5(或兼容)这将起作用:

import aio_pika
from ..some_path import settings


connection = await aio_pika.connect(settings.RABBIT_MQ_URL)

async with connection:
    channel = await connection.channel()
        
    queue = await channel.declare_queue(
        settings.MY_QUEUE_NAME,
        durable=True,
    )

    print(queue.declaration_result.message_count)

declaration_result
未在文档中描述,但可在 源代码 中找到。因此,这个对象可能在
pika
aio-pika
的所有版本中以不同的名称存在,您只需检查
Queue
版本的类
pika
的源代码,或者只执行
dir()
并尝试弄清楚这个属性叫什么:)

© www.soinside.com 2019 - 2024. All rights reserved.