如何在Python中修复这个多重处理?

问题描述 投票:0回答:1

我正在尝试运行模型 1000 次并将其统计信息存储在字典中。因为模型本身可能还没有使用多重处理,所以我考虑使用 Python 中的

multiprocessing
包。
这是我到目前为止的代码:

import multiprocessing

class ensemble_model:
    def __init__(self, df, k) -> None:
        ## do some calculations
        self.r2_list = [...]
        self.rmse_list = [...]    

def worker(components, dic1, dic2):
    result = ensemble_model(
        df,
        k = components
    )
    dic1[components] = result.r2_list
    dic2[components] = result.rmse_list

r2_dic = multiprocessing.Manager().dict()
rmse_dic = multiprocessing.Manager().dict()

processes = []
for n_comp in range(1,1001):
    p = multiprocessing.Process(target=worker, args=(n_comp, r2_dic, rmse_dic))
    processes.append(p)
    p.start()

for p in processes:
    p.join()

问题是我得到一个空字典,它应该包含每个不同数量的组件的列表

n_comp

我使用 ChatGPT 并阅读文档来找出问题所在,但到目前为止我无法理解。
有人知道为什么会发生这种情况以及如何解决吗?

python multiprocessing
1个回答
0
投票

您关于应该使用多少个进程的问题没有直接的答案。

如果您只有 24 个核心 并且您的工作函数 worker 是 100% CPU 限制,那么就没有理由创建超过 24 个进程。如果您混合使用 CPU 和 I/O,那么也许您可以稍微增加这个数字,或者结合使用 I/O 多线程和纯 CPU 工作的多处理。不管怎样,创建 1000 个进程似乎效率很低。我还会考虑不使用托管字典,它非常慢。

您应该创建一个多处理池(默认情况下,为池创建的子进程数将是核心数,并且这是合理的)。然后我会让你的工作函数返回到主进程键和值信息,然后更新字典。但是您的代码中有些事情不清楚,例如为什么

ensemble_model.__init__
执行的计算是在类初始值设定项而不是普通函数中完成的,以及
df
在哪里定义?

import multiprocessing, cpu_count
from functools import partial

def ensemble_model(df, k):    
    # do some calculations 
    r2_list = [...]
    rmse_list = [...]
    
    # return key and values
    return k, r2_list, rsme_list

r2_dic = {}
rmse_dic = {}

N_TASKS = 1000
pool_size = cpu_count()
chunk_size = N_TASKS // (4 * pool_size) # Good approximation

with multiprocessing.Pool(pool_size) as pool:
    worker = partial(ensemble_model, df)
    for k, r2_list, rsme_list in pool.imap_unordered(
        worker,
        range(1, N_TASKS + 1),
        chunksize=chunk_size
    ):
        r2_dic[k] = r2_list
        rsme_dic[k] = rsme_list

但是您可以使用列表代替字典:

import multiprocessing, cpu_count
from functools import partial

def ensemble_model(df, k):
    # do some calculations 
    r2_list = [...]
    rmse_list = [...]
    
    # return key and values
    return k, r2_list, rsme_list

N_TASKS = 1000
r2_list_list = [None] * N_TASKS
rmse_list_list = [None] * N_TASKS

pool_size = cpu_count()
chunk_size = N_TASKS // (4 * pool_size) # Good approximation

with multiprocessing.Pool(pool_size) as pool:
    worker = partial(ensemble_model, df)
    for k, r2_list, rsme_list in pool.imap_unordered(
        worker,
        range(1, N_TASKS + 1),
        chunksize=chunk_size
    ):
        # Origin-0 lists:
        r2_list_list[k - 1] = r2_list
        rmse_list_list[k - 1] = rsme_list
© www.soinside.com 2019 - 2024. All rights reserved.