在可读队列python中关闭连接

问题描述 投票:0回答:1

当微服务从 RabbitMQ 获取消息并且处理数据很长时间,并且与窃听上的队列关闭连接时

Traceback (most recent call last):
  File "/home/saturn/Logic/MAIN_1.py", line 200, in <module>
    channel.start_consuming()
  File "/usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py", line 1780, in start_consuming
    self.connection.process_data_events(time_limit=None)
  File "/usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py", line 707, in process_data_events
    self._flush_output(common_terminator)
  File "/usr/local/lib/python3.5/dist-packages/pika/adapters/blocking_connection.py", line 474, in _flush_output
result.reason_text)
pika.exceptions.ConnectionClosed: (-1, "ConnectionResetError(104, 'Connection reset by peer')") 

现在任务处理时间接近 5 分钟。主要代码如 -

credentials = pika.PlainCredentials(username='NAME',password='PASSWORD')
ConnParr = pika.ConnectionParameters(host='HOST', credentials=credentials)
connection = pika.BlockingConnection(ConnParr)
channel = connection.channel()

def callback(ch, method, properties, body):
    in_data = json.loads(body.decode('utf-8'))
    main(in_data)
    ch.basic_ack(delivery_tag=method.delivery_tag)

def main(in_data):
    time.sleep(300)

channel.queue_declare(queue=IN_QUEUE)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=IN_QUEUE)
channel.start_consuming()     
python rabbitmq pika
1个回答
0
投票

发生这种情况是因为

time.sleep
调用会阻塞您的主线程并阻止 Pika 从 RabbitMQ 发送和接收心跳消息。您有几种方法可以解决此问题:

  • 升级到 Pika

    0.12.0
    ,在单独的线程中运行
    main
    方法,并在该线程中使用
    add_callback_threadsafe
    在通道
    docs
    上调用 basic_ack

  • 使用异步消费者示例作为代码的起点。

要记住的重要一点是,您不能阻止 Pika 的内部事件循环并期望连接保持活动状态。

© www.soinside.com 2019 - 2024. All rights reserved.