我正在使用rabbitMQ接收消息。 我有 1,000,000 条消息需要在 1 分钟内发送并排队。我正在使用 python 的多重处理。我的代码可以发送超过 5 分钟。是否可以在一台电脑上1分钟内发送它们? 这是我的代码
import multiprocessing
from datetime import datetime
import pika
import time
import uuid
import sys
class PyPikaTest:
def publish(self,no_message,producer):
c = pika.BlockingConnection(pika.ConnectionParameters(port=5672,virtual_host="test"))
channel = c.channel()
qname = str(uuid.uuid4())
channel.queue_declare(queue='letterbox')
print("start: %s" % (time.ctime(time.time())))
for i in range(1, int(no_message)):
sendtime = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-2]
body = ('aa cccc ' + str(sendtime))
_properties = pika.BasicProperties(
content_type='application/json',
content_encoding='utf-8',
message_id=producer + "_message_no_" + str(i),
timestamp=int(time.time())
)
channel.basic_publish(
exchange='',
routing_key='letterbox',
properties=_properties,
body=body
)
print("end: %s" % (time.ctime(time.time())))
c.close()
def thread_publish(self, no_publisher, no_message):
jobs = []
for i in range(int(no_publisher)):
process = multiprocessing.Process(target=self.publish,args=(no_message, "test_publisher_no_" + str(i)))
jobs.append(process)
#Start the threads (i.e. calculate the random number lists)
for j in jobs:
j.start()
#Ensure all of the threads have finished
for j in jobs:
j.join()
print("List processing complete.")
if __name__ == "__main__":
print('starting .. %s')
x = PyPikaTest()
x.thread_publish(sys.argv[1],sys.argv[2])
您是否尝试过鼠兔非阻塞连接之一?我使用 AsyncioConnection。当仅从单个 Python 进程发布时,我每秒可以发送 14k-17k 条消息,即一分钟内大约 1mm 消息。这是在 2018 MacBook Pro i7 上,使用 Homebrew 和 Python 3.11 在本地安装的 RabbitMQ。
我创建了一个脚本,通过将其分成 10 批 100k 异步请求,我可以在 61 秒内发送 1mm 消息。
total_request_time = 0
batch_size = 100000
number_of_batches = 10
for i in range(number_of_batches):
rr_start_time = time.time()
tasks = [asyncio.create_task(send_message(client_api)) for _ in range(batch_size)]
_ = await asyncio.wait(tasks)
rr_taken = time.time() - rr_start_time
total_request_time += rr_taken
logger.info("batch %s of %s calls took %s seconds and %s calls/s", i, batch_size, rr_taken, batch_size / rr_taken)
为了充分披露,我运行了 4 个消费者,我发现如果队列不会变得太大,RabbitMQ 在发送时效果最好。
完整的示例代码可以在这里找到
这是我的程序输出。
I 2023-09-25 00:01:29,536 one_million_example main 17 : Client connected
I 2023-09-25 00:01:35,772 one_million_example main 41 : batch 0 of 100000 calls took 6.235662937164307 seconds and 16036.787268279675 calls/s
I 2023-09-25 00:01:41,478 one_million_example main 41 : batch 1 of 100000 calls took 5.705104827880859 seconds and 17528.161710771692 calls/s
I 2023-09-25 00:01:47,438 one_million_example main 41 : batch 2 of 100000 calls took 5.960079193115234 seconds and 16778.30054934751 calls/s
I 2023-09-25 00:01:53,282 one_million_example main 41 : batch 3 of 100000 calls took 5.84413480758667 seconds and 17111.172704329678 calls/s
I 2023-09-25 00:01:59,617 one_million_example main 41 : batch 4 of 100000 calls took 6.335207939147949 seconds and 15784.801534620101 calls/s
I 2023-09-25 00:02:05,506 one_million_example main 41 : batch 5 of 100000 calls took 5.888730049133301 seconds and 16981.590116313437 calls/s
I 2023-09-25 00:02:12,120 one_million_example main 41 : batch 6 of 100000 calls took 6.614094972610474 seconds and 15119.226502508423 calls/s
I 2023-09-25 00:02:18,071 one_million_example main 41 : batch 7 of 100000 calls took 5.950514793395996 seconds and 16805.268698933756 calls/s
I 2023-09-25 00:02:24,473 one_million_example main 41 : batch 8 of 100000 calls took 6.401550054550171 seconds and 15621.21660345697 calls/s
I 2023-09-25 00:02:30,549 one_million_example main 41 : batch 9 of 100000 calls took 6.075830936431885 seconds and 16458.654140683902 calls/s
I 2023-09-25 00:02:30,549 one_million_example main 44 : Request only performance: 1000000 total calls in 61.010910511016846 seconds @ 16390.511002444855 calls/s