我有一大堆的是把他们的结果,并将其张贴到RabbitMQ的消息队列芹菜任务。该贴得到的结果可能会变得非常大(可达几MEG)。意见混合,以把大量的数据在RabbitMQ的消息是否是一个好主意,但我已经看到了这个工作,在其他情况下只要记忆保留在控制之下,它似乎工作。
但是,我现在的任务集,兔似乎只是删除,似乎是过大的消息。我已经缩小它归结为一个相当简单的测试案例:
#!/usr/bin/env python
import string
import random
import pika
import os
qname='examplequeue'
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='mq.example.com'))
channel = connection.channel()
channel.queue_declare(queue=qname,durable=True)
N=100000
body = ''.join(random.choice(string.ascii_uppercase) for x in range(N))
promise = channel.basic_publish(exchange='', routing_key=qname, body=body, mandatory=0, immediate=0, properties=pika.BasicProperties(content_type="text/plain",delivery_mode=2))
print " [x] Sent 'Hello World!'"
connection.close()
我有一个3节点的RabbitMQ群集,mq.example.com
圆罗宾斯到每个节点。客户端在Ubuntu 12.04使用鼠兔0.9.5和RabbitMQ的集群上二郎R14B04运行的RabbitMQ 2.8.7。
执行该脚本打印打印语句,并没有任何异常退出被提出。该消息不会出现在RabbitMQ的。
更改N
到10000
使其正常工作。
为什么?
我想你必须在RabbitMQ的TCP-背压机制问题。你可以阅读有关qazxsw POI。我看到两个办法来解决这个问题:
http://www.rabbitmq.com/memory.html
这是我做的发送和接收的数据包。这是有点比hexlify更有效,因为BASE64可使用一个字节,其中两个字节被hexlify需要来表示一个字符。
def compress(s):
return binascii.hexlify(zlib.compress(s))
def decompress(s):
return zlib.decompress(binascii.unhexlify(s))