我正在使用 Python 的
multiprocessing
模块来并行处理任务。我的目标是让多个进程尽可能高效地协作处理一组任务。为了避免重复工作,我使用共享字典来跟踪已完成的任务。如果一项任务已经由一个进程完成,其他进程应该跳过它并选择不同的任务。
但是,我遇到了性能瓶颈。检查共享字典中已完成任务的过程会显着减慢整体执行速度。这是我的代码的简化版本:
import multiprocessing
import random
def worker(shared_dict):
for i in range(1000000):
num = random.randint(1, 100000)
if num not in shared_dict:
shared_dict[num] = num**2
if __name__ == "__main__":
# Create a manager and a shared dictionary
manager = multiprocessing.Manager()
shared_dict = manager.dict()
# List to keep track of processes
processes = []
# Create and start 5 processes
for _ in range(5):
p = multiprocessing.Process(target=worker, args=(shared_dict,))
processes.append(p)
p.start()
# Wait for all processes to complete
for p in processes:
p.join()
print(shared_dict)
我使用 PySpy 进行分析,并注意到从共享字典发送和接收数据花费了大量时间(如所附分析图像所示)。
我正在寻找优化此数据交换的建议。有没有更有效的方法来管理Python多处理中的共享数据?如何减少与访问和修改共享字典相关的开销? 如果可以显着提高性能,我愿意接受一些重复
Python 中的多处理以及使用管理器实际创建共享字典确实可能很慢,长话短说,它涉及进程间通信,其中对特定共享字典的每次访问都需要两种方式通信(工作进程和管理进程)。
嗯,您共享的代码在这里至少没有给我一点上下文,但一般来说,在这些情况下优化速度时您会有几种方法:
multiprocessing.Value
或 multiprocessing.Array
,为什么?原因是它不需要使用管理器。此外,这对于固定大小的共享状态集合非常有效。multiprocessing.Queue
,这样您就可以将任务分配给工作进程,完成后您可以将其写入结果队列。multiprocessing.Lock
- 通过使用它,您可以控制对共享字典的访问,以避免竞争条件并确保您正在使用只有一个进程(一个进程一次可以访问共享状态)。顺便提一句。小心这个,因为如果你过度使用它,它可能会成为瓶颈。此外,我还看到人们切换架构,因为这些场景的性质太复杂 - 基本上这意味着,他们开始使用 Redis 或 RabbitMQ 等消息代理来处理通信过程。