我长期运行RabbitMQ使用者时遇到问题。我的几封邮件最终都处于未确认状态。
我的RabbitMQ版本:3.6.15皮卡版本:0.11.0b
import pika
import time
import sys
import threading
from Queue import Queue
rabbitmq_server = "<SERVER>"
queue = "<QUEUE>"
connection = None
def check_acknowledge(channel, connection, ack_queue):
delivery_tag = None
while(True):
try:
delivery_tag = ack_queue.get_nowait()
channel.basic_nack(delivery_tag=delivery_tag)
break
except:
connection.process_data_events()
time.sleep(1)
def process_message(body, delivery_tag, ack_queue):
print "Received %s" % (body)
print "Waiting for 600 seconds before receiving next ID\n"
start = time.time()
elapsed = 0
while elapsed < 10:
elapsed = time.time() - start
print "loop cycle time: %f, seconds count: %02d" %(time.clock(), elapsed)
time.sleep(1)
ack_queue.put(delivery_tag)
def callback(ch, method, properties, body):
global connection
ack_queue = Queue()
t = threading.Thread(target=process_message, args=(body, method.delivery_tag, ack_queue))
t.start()
check_acknowledge(ch, connection, ack_queue)
while True:
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_server))
channel = connection.channel()
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=queue)
channel.start_consuming()
except KeyboardInterrupt:
break
channel.close()
connection.close()
exit(0)
我在这里想念什么吗?
我使用以下多线程使用者来解决此问题。
import pika
import time
import sys
import threading
from Queue import Queue
rabbitmq_server = "<RABBITMQ_SERVER_IP>"
queue = "hello1"
connection = None
def check_acknowledge(channel, connection, ack_queue):
delivery_tag = None
while(True):
try:
delivery_tag = ack_queue.get_nowait()
channel.basic_ack(delivery_tag=delivery_tag)
break
except:
connection.process_data_events()
time.sleep(1)
def process_message(body, delivery_tag, ack_queue):
print "Received %s" % (body)
print "Waiting for 600 seconds before receiving next ID\n"
start = time.time()
elapsed = 0
while elapsed < 300:
elapsed = time.time() - start
print "loop cycle time: %f, seconds count: %02d" %(time.clock(), elapsed)
time.sleep(1)
ack_queue.put(delivery_tag)
def callback(ch, method, properties, body):
global connection
ack_queue = Queue()
t = threading.Thread(target=process_message, args=(body, method.delivery_tag, ack_queue))
t.start()
check_acknowledge(ch, connection, ack_queue)
while True:
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_server))
channel = connection.channel()
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=queue)
channel.start_consuming()
except KeyboardInterrupt:
break
channel.close()
connection.close()
exit(0)
callback
函数将在主线程本身中触发一个单独的函数check_acknowledge
。因此,连接和通道对象保留在同一线程中。请注意,Pika不是线程安全的,因此我们需要在同一线程中维护这些对象。process_message
处理完成后,将delivery_tag
放入队列。
check_acknowledge
无限循环,直到找到delivery_tag
放入队列中的process_message
。一旦找到,它将acks
消息返回。
我已经通过sleep
运行此使用者5分钟,10分钟,30分钟和1小时来测试此实现。这对我来说很好用。