RabbitMQ 发布消息时的延迟

问题描述 投票:0回答:1

我们正在开发一个项目,该项目使用rabbitMQ作为消息代理,并使用pika作为主干。我们使用在同一网络上的计算机上运行的发布者每 0.5 秒发布一条消息。消费者在同一网络中的另一个系统上运行来消费消息。不幸的是,在连续运行发布者和订阅者超过 10 分钟后,我在将消息发布到 MQ 时面临延迟。向 MQ 发布消息有时需要几秒钟(甚至有几次需要 30 秒)。我使用小脚本来发布和订阅来隔离这个问题。 观察

  1. 当发布者和订阅者在同一系统上运行时,不会观察到这种延迟。
  2. 即使未设置channel.confirm_delivery(),也不会观察到此延迟。
  3. 无论系统规格如何,都会出现此问题。我们在多个系统上进行了尝试,包括配备 corei7 cpu 的 HP OMEN。
  4. 网络连接也很好,我们能够在不同地方使用不同网络重现此问题。

发布者代码:

import pika
import json
import time



PUBLISH_FREQUENCY = 2.0
PUBLISH_TIMEOUT = 0.5 #Seconds


MQ_IP_ADDRESS = #MQ IP ADDRESS
EXCHANGE = "test3" 
EXCHANGE_TYPE = "topic"
ROUTING_KEY = "ag.ack"
QUEUE = ROUTING_KEY


feedback = {"task": {"start_time": 1655203648.916807, "id": 135, "end_time": None, "marker_scanned_a": None, "name_user": "Z_Z", "actual_duration": None, "status_work": "Progress", "message": None}, "consumer_identifier": "W1", "worker_availability": "In_progress", "worker_battery_parameter": {"battery_capacity": 1.0, "temp": 12.586999893188477}}
params = pika.ConnectionParameters(
    heartbeat=60, blocked_connection_timeout=60, host=MQ_IP_ADDRESS
)
print('Connecting to MQ at IP '+MQ_IP_ADDRESS +' '+ ROUTING_KEY)

connection = pika.BlockingConnection(params)

channel = connection.channel()


# channel.queue_declare(queue=QUEUE)
channel.exchange_declare(
    exchange=EXCHANGE,
    exchange_type=EXCHANGE_TYPE
)
# channel.queue_bind(exchange=EXCHANGE, queue=QUEUE,routing_key=ROUTING_KEY)

channel.confirm_delivery()
print('Connected to MQ at IP '+MQ_IP_ADDRESS+' '+ ROUTING_KEY)
count = 0
while True:
    feedback['task']['id'] = count
    message = json.dumps(feedback)

    count = count + 1
    start_time = time.time()    
    a= channel.basic_publish(
            exchange=EXCHANGE,
            routing_key=ROUTING_KEY,
            body=message
        )
    
    end_time = time.time()
    duration = end_time - start_time
    if end_time-start_time > PUBLISH_TIMEOUT:
        print('Time taken to send the message is '+str(duration)+'. For message number '+str(count) + ' . Resetting the count to zero.')
        count = 0
    time.sleep(1/PUBLISH_FREQUENCY)


channel.close()

消费者代码

import pika
import time
import json


MQ_IP_ADDRESS = '127.0.0.1'
EXCHANGE = "test3" 
EXCHANGE_TYPE = "topic"
ROUTING_KEY = "ag.ack"
QUEUE = ROUTING_KEY


params = pika.ConnectionParameters(
    heartbeat=600, blocked_connection_timeout=300, host=MQ_IP_ADDRESS
)
try:
    connection = pika.BlockingConnection(params)
    print('connected')
except Exception as e:
    print(e)
    time.sleep(20)
    print("waiting to reconnect")
    connection = pika.BlockingConnection(params)
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE)

channel.queue_declare(queue=QUEUE, durable=False)
channel.queue_bind(exchange=EXCHANGE, queue=QUEUE,routing_key=ROUTING_KEY)
print(' Dummy AOS receiver...')
print('[*] Waiting for messages from Agent. To exit press CTRL+C')


def callback(ch, method, properties, body):

    payload = body.decode()
    message = json.loads(payload)
    msg_counter = message["task"]["id"]
    time.sleep(0.5)

    ch.basic_ack(delivery_tag=method.delivery_tag)
    
    print('received '+str(msg_counter))

channel.basic_consume(
    queue=QUEUE, on_message_callback=callback,)
channel.start_consuming()
rabbitmq delay message latency pika
1个回答
0
投票

每当您报告任何软件(而不仅仅是 RabbitMQ)的问题时,您必须报告您正在使用的软件的版本。在这种情况下,我假设您正在使用以下版本,因为它们是最新的:

  • Erlang 25.0.2
  • RabbitMQ 3.10.5
  • 皮卡1.2.1
  • Python 3.10.5

您的 Python 代码使用

time.sleep
,它会阻塞 Pika 的 I/O 循环。您应该改用
connection.sleep
方法。

你还把心跳设置为600秒,这是没有必要的。

最后,通过使用

BlockingConnection
confirm_delivery()
,您的代码正在使用 同步 发布者确认。相反,您应该使用
SelectConnection
并异步处理确认。您将保留未完成确认的字典,然后在调用确认回调时将这些消息标记为已确认。请注意,RabbitMQ 可能会批量确认消息。

请参阅此示例

这是一个非常有趣的情况,如果您需要进一步的帮助,请在 GitHub 上分享您完整的、可运行的代码,并在 Pika 讨论 中发布消息,我们可以在那里继续。

© www.soinside.com 2019 - 2024. All rights reserved.