我正在尝试使用 python 的多处理库(
pool_starmap
)在同一 Pandas DataFrame 上并行执行同一任务的代码上获得执行时间,但使用不同的调用参数。
当我在数据帧的一小部分和 10 个作业上执行此代码时,一切正常。然而,当我将整个 100 000 000 行数据集与 63 个作业(使用具有 64 个 CPU 核心的集群计算机)放在一起时,代码就……冻结了。它正在运行,但没有执行任何操作(我知道这一点是因为,每执行 10 000 个任务,代码就会打印出它还活着)。
我在互联网上搜索并发现了类似的问题,但没有一个答案适用于我的具体情况,所以我在这里。
我做了一个最小的、自我维持的例子来重现这个问题。 假设为了简化,我的数据框有 2 列;第一个是“stores”,另一个是“price”。我想恢复每个商店的mean_price。 当然,在这个特定问题中,人们只需对stores上的数据框进行分组并聚合price,但这是一种简化;让我们假设该任务只能完成一次一个商店(这是我的情况)。这是一个最小的示例:
#minimal example
import time
import pandas as pd
import random as rd
import multiprocessing
def create_datafile(nrows):
"""
create a random pandas dataframe file
To visualize this rather simple example,
let's say that we are looking at a pool of 0.1*nrows products across different stores,
that can have different values of the attribute "price"
(in the list "stores").
"""
price = [rd.randint(0,300) for i in range(nrows)]
stores = [i%(0.1*nrows) for i in range(nrows)]
data=zip(stores,price)
return pd.DataFrame(data=data, columns=["stores", "price"])
def task(data, store, dic):
"""
the task we want to accomplish: compute mean price
for each store in the dataframe.
"""
if rd.randint(1,10000)==1:
print('I am alive!')
product_df = data.groupby('stores', as_index = False).agg(mean_price = ("price", 'mean'))
selected_store = product_df[product_df['stores'] == store] #select the mean for a given store
#print(selected_store)
#time.sleep(1) #sleep for 1 second to simulate a "long" task...
dic[store] = selected_store['mean_price']
return(dic)
if __name__ == "__main__":
##
nrows=100000000
nb_jobs= 63
print('Creating data...')
data_df = create_datafile(nrows)
print('Data created.')
#print(data_df)
#create parameters for multiprocessing task
stores_list = [i%(0.1*nrows) for i in range(nrows)]
dics_stores=[{} for _ in stores_list]
parameters = [(data_df, stores_list[i], dics_stores[i]) for i in range(nrows)]
#launch multiprocessing tasks with starmap
#from multiprocessing import set_start_method
#set_start_method("spawn")
tic=time.time()
print(f'Max number of jobs: {multiprocessing.cpu_count() - 1}')
if nb_jobs > multiprocessing.cpu_count() - 1:
print(f'Running: {multiprocessing.cpu_count() - 1} jobs...')
pool = multiprocessing.Pool(processes = multiprocessing.cpu_count() - 1)
else:
print(f'Running: {nb_jobs} jobs...')
pool = multiprocessing.Pool(processes = nb_jobs)
results = pool.starmap(task, parameters)
pool.close()
pool.join()
toc=time.time()
print(f'Processed data in {round((toc-tic)/60,1)} minutes (rounded to 0.1).')
dics_mean = {}
for dic in results:
dics_mean.update(dic) #update dictionaries
#dic_means now contains all the means computed by each program.
我正在使用Python 3.9.2。
如果您使用以下命令启动此代码:
nrows = 10000
和nb_jobs = 10
,你应该不会遇到任何问题。nrows=100000000
和nb_jobs=63
,我提到的问题应该会发生。我对 Python 多重处理相当陌生,所以欢迎任何提示!预先感谢。
您让每个任务返回一个字典,随着您处理越来越多的行,该字典会变得越来越大。这不是你想做的事。
def task(data, store):
.....
return store, <result>
然后:
<change parameters not to include the dict>
with pool as multiprocessing.pool():
for store, result in pool.startmap(task, parameters)
put results into dictionary here.
您可以通过使用池初始值设定项来消除一些酸洗。
即便如此,DataFrame 中的 100m 行仍将占用大量 CPU 时间。
这是演示该技术的实现修订版:
import pandas as pd
import random as rd
import multiprocessing as mp
def create_datafile(nrows):
price = [rd.randint(0, 300) for i in range(nrows)]
stores = [i % (0.1 * nrows) for i in range(nrows)]
return pd.DataFrame(zip(stores, price), columns=["stores", "price"])
def task(store):
global data
product_df = data.groupby('stores', as_index = False).agg(mean_price = ("price", 'mean'))
selected_store = product_df[product_df['stores'] == store]
return {store: selected_store['mean_price']}
def pinit(_df):
global data
data = _df
def main():
nrows = 1_000_000
df = create_datafile(nrows)
stores_list = [(i % (0.1 * nrows),) for i in range(nrows)]
dic_mean = {}
with mp.Pool(initializer=pinit, initargs=(df,)) as pool:
for result in pool.starmap_async(task, stores_list).get():
dic_mean.update(result)
#print(dic_mean)
if __name__ == "__main__":
main()