Python 多处理:当我在一个巨大的 pandas 数据帧上启动许多进程时,程序会卡住

问题描述 投票:0回答:2

我正在尝试使用 python 的多处理库(

pool_starmap
)在同一 Pandas DataFrame 上并行执行同一任务的代码上获得执行时间,但使用不同的调用参数。

当我在数据帧的一小部分和 10 个作业上执行此代码时,一切正常。然而,当我将整个 100 000 000 行数据集与 63 个作业(使用具有 64 个 CPU 核心的集群计算机)放在一起时,代码就……冻结了。它正在运行,但没有执行任何操作(我知道这一点是因为,每执行 10 000 个任务,代码就会打印出它还活着)。

我在互联网上搜索并发现了类似的问题,但没有一个答案适用于我的具体情况,所以我在这里。

最小示例

我做了一个最小的、自我维持的例子来重现这个问题。 假设为了简化,我的数据框有 2 列;第一个是“stores”,另一个是“price”。我想恢复每个商店的mean_price。 当然,在这个特定问题中,人们只需对stores上的数据框进行分组并聚合price,但这是一种简化;让我们假设该任务只能完成一次一个商店(这是我的情况)。这是一个最小的示例:

#minimal example

import time
import pandas as pd
import random as rd
import multiprocessing

def create_datafile(nrows):
    """
    create a random pandas dataframe file
    To visualize this rather simple example,
    let's say that we are looking at a pool of 0.1*nrows products across different stores,
    that can have different values of the attribute "price"
    (in the list "stores").
    """
    
    price = [rd.randint(0,300) for i in range(nrows)]
    stores = [i%(0.1*nrows) for i in range(nrows)]

    data=zip(stores,price)
 
    return pd.DataFrame(data=data, columns=["stores", "price"])

            
def task(data, store, dic):
    """
    the task we want to accomplish: compute mean price
    for each store in the dataframe.
    """

    if rd.randint(1,10000)==1:
        print('I am alive!')

    product_df = data.groupby('stores', as_index = False).agg(mean_price = ("price", 'mean'))

    selected_store = product_df[product_df['stores'] == store] #select the mean for a given store

    #print(selected_store)
    #time.sleep(1) #sleep for 1 second to simulate a "long" task...

    dic[store] = selected_store['mean_price']
    return(dic)



if __name__ == "__main__":
    ##
    nrows=100000000
    nb_jobs= 63

    print('Creating data...')
    data_df = create_datafile(nrows)
    print('Data created.')
    #print(data_df)

    #create parameters for multiprocessing task
    stores_list = [i%(0.1*nrows) for i in range(nrows)]
    dics_stores=[{} for _ in stores_list]
    parameters = [(data_df, stores_list[i], dics_stores[i]) for i in range(nrows)]

    #launch multiprocessing tasks with starmap
    #from multiprocessing import set_start_method
    #set_start_method("spawn")
    tic=time.time()
    print(f'Max number of jobs: {multiprocessing.cpu_count() - 1}')
    if nb_jobs > multiprocessing.cpu_count() - 1:
        print(f'Running: {multiprocessing.cpu_count() - 1} jobs...')
        pool = multiprocessing.Pool(processes = multiprocessing.cpu_count() - 1)
    else:
        print(f'Running: {nb_jobs} jobs...')
        pool = multiprocessing.Pool(processes = nb_jobs)
    results = pool.starmap(task, parameters)
    pool.close()
    pool.join()
    toc=time.time()
    print(f'Processed data in {round((toc-tic)/60,1)} minutes (rounded to 0.1).') 

    dics_mean = {}
    for dic in results:
        dics_mean.update(dic) #update dictionaries

    #dic_means now contains all the means computed by each program.

我正在使用Python 3.9.2。

如果您使用以下命令启动此代码:

  • nrows = 10000
    nb_jobs = 10
    ,你应该不会遇到任何问题。
  • 但是使用
    nrows=100000000
    nb_jobs=63
    ,我提到的问题应该会发生。

我对 Python 多重处理相当陌生,所以欢迎任何提示!预先感谢。

python pandas multiprocessing large-data
2个回答
1
投票

您让每个任务返回一个字典,随着您处理越来越多的行,该字典会变得越来越大。这不是你想做的事。

def task(data, store):
   .....
   return store, <result>

然后:

<change parameters not to include the dict>

with pool as multiprocessing.pool():
    for store, result in pool.startmap(task, parameters)
        put results into dictionary here.

0
投票

您可以通过使用池初始值设定项来消除一些酸洗。

即便如此,DataFrame 中的 100m 行仍将占用大量 CPU 时间。

这是演示该技术的实现修订版:

import pandas as pd
import random as rd
import multiprocessing as mp


def create_datafile(nrows):
    price = [rd.randint(0, 300) for i in range(nrows)]
    stores = [i % (0.1 * nrows) for i in range(nrows)]
    return pd.DataFrame(zip(stores, price), columns=["stores", "price"])


def task(store):
    global data 
    product_df = data.groupby('stores', as_index = False).agg(mean_price = ("price", 'mean'))
    selected_store = product_df[product_df['stores'] == store]
    return {store: selected_store['mean_price']}

def pinit(_df):
    global data 
    data = _df

def main():
    nrows = 1_000_000
    df = create_datafile(nrows)
    stores_list = [(i % (0.1 * nrows),) for i in range(nrows)]
    dic_mean = {}
    with mp.Pool(initializer=pinit, initargs=(df,)) as pool:
        for result in pool.starmap_async(task, stores_list).get():
            dic_mean.update(result)
    #print(dic_mean)

if __name__ == "__main__":
    main()
© www.soinside.com 2019 - 2024. All rights reserved.