我们正在开发一个项目,该项目使用rabbitMQ作为消息代理,并使用pika作为主干。我们使用在同一网络上的计算机上运行的发布者每 0.5 秒发布一条消息。消费者在同一网络中的另一个系统上运行来消费消息。不幸的是,在连续运行发布者和订阅者超过 10 分钟后,我在将消息发布到 MQ 时面临延迟。向 MQ 发布消息有时需要几秒钟(甚至有几次需要 30 秒)。我使用小脚本来发布和订阅来隔离这个问题。 观察
发布者代码:
import pika
import json
import time
PUBLISH_FREQUENCY = 2.0
PUBLISH_TIMEOUT = 0.5 #Seconds
MQ_IP_ADDRESS = #MQ IP ADDRESS
EXCHANGE = "test3"
EXCHANGE_TYPE = "topic"
ROUTING_KEY = "ag.ack"
QUEUE = ROUTING_KEY
feedback = {"task": {"start_time": 1655203648.916807, "id": 135, "end_time": None, "marker_scanned_a": None, "name_user": "Z_Z", "actual_duration": None, "status_work": "Progress", "message": None}, "consumer_identifier": "W1", "worker_availability": "In_progress", "worker_battery_parameter": {"battery_capacity": 1.0, "temp": 12.586999893188477}}
params = pika.ConnectionParameters(
heartbeat=60, blocked_connection_timeout=60, host=MQ_IP_ADDRESS
)
print('Connecting to MQ at IP '+MQ_IP_ADDRESS +' '+ ROUTING_KEY)
connection = pika.BlockingConnection(params)
channel = connection.channel()
# channel.queue_declare(queue=QUEUE)
channel.exchange_declare(
exchange=EXCHANGE,
exchange_type=EXCHANGE_TYPE
)
# channel.queue_bind(exchange=EXCHANGE, queue=QUEUE,routing_key=ROUTING_KEY)
channel.confirm_delivery()
print('Connected to MQ at IP '+MQ_IP_ADDRESS+' '+ ROUTING_KEY)
count = 0
while True:
feedback['task']['id'] = count
message = json.dumps(feedback)
count = count + 1
start_time = time.time()
a= channel.basic_publish(
exchange=EXCHANGE,
routing_key=ROUTING_KEY,
body=message
)
end_time = time.time()
duration = end_time - start_time
if end_time-start_time > PUBLISH_TIMEOUT:
print('Time taken to send the message is '+str(duration)+'. For message number '+str(count) + ' . Resetting the count to zero.')
count = 0
time.sleep(1/PUBLISH_FREQUENCY)
channel.close()
消费者代码
import pika
import time
import json
MQ_IP_ADDRESS = '127.0.0.1'
EXCHANGE = "test3"
EXCHANGE_TYPE = "topic"
ROUTING_KEY = "ag.ack"
QUEUE = ROUTING_KEY
params = pika.ConnectionParameters(
heartbeat=600, blocked_connection_timeout=300, host=MQ_IP_ADDRESS
)
try:
connection = pika.BlockingConnection(params)
print('connected')
except Exception as e:
print(e)
time.sleep(20)
print("waiting to reconnect")
connection = pika.BlockingConnection(params)
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE, exchange_type=EXCHANGE_TYPE)
channel.queue_declare(queue=QUEUE, durable=False)
channel.queue_bind(exchange=EXCHANGE, queue=QUEUE,routing_key=ROUTING_KEY)
print(' Dummy AOS receiver...')
print('[*] Waiting for messages from Agent. To exit press CTRL+C')
def callback(ch, method, properties, body):
payload = body.decode()
message = json.loads(payload)
msg_counter = message["task"]["id"]
time.sleep(0.5)
ch.basic_ack(delivery_tag=method.delivery_tag)
print('received '+str(msg_counter))
channel.basic_consume(
queue=QUEUE, on_message_callback=callback,)
channel.start_consuming()
每当您报告任何软件(而不仅仅是 RabbitMQ)的问题时,您必须报告您正在使用的软件的版本。在这种情况下,我假设您正在使用以下版本,因为它们是最新的:
您的 Python 代码使用
time.sleep
,它会阻塞 Pika 的 I/O 循环。您应该改用 connection.sleep
方法。
你还把心跳设置为600秒,这是没有必要的。
最后,通过使用
BlockingConnection
和 confirm_delivery()
,您的代码正在使用 同步 发布者确认。相反,您应该使用 SelectConnection
并异步处理确认。您将保留未完成确认的字典,然后在调用确认回调时将这些消息标记为已确认。请注意,RabbitMQ 可能会批量确认消息。
请参阅此示例。
这是一个非常有趣的情况,如果您需要进一步的帮助,请在 GitHub 上分享您完整的、可运行的代码,并在 Pika 讨论 中发布消息,我们可以在那里继续。