多重处理似乎不适用于 df.apply()

问题描述 投票:0回答:1

我有一个 pandas 数据框,我想在其上对几列应用行操作。返回我需要的 pandas 系列的函数如下所示:

def get_info_previous_flight(row, original_column_name, new_column_name):
if row['n_flight_of_day'] == 1:
    return 0
else:
    flight_date = row['dep_sched_date']
    ac_registration = row['ac_registration']
    previous_flight_number = row['n_flight_of_day'] - 1
    value_previous_flight = flight_info_df.loc[
        (flight_info_df['ac_registration'] == ac_registration) &
        (flight_info_df['dep_sched_date']  == flight_date) &
        (flight_info_df['n_flight_of_day'] == previous_flight_number)
    ][['leg_no', original_column_name]]
    output = pd.Series({'leg_no': value_previous_flight['leg_no'], new_column_name: value_previous_flight[original_column_name]})
    return output

我用来为多列执行此任务的函数如下:

def perform_task(parameters):
    return parameters[2].apply(lambda row: get_info_previous_flight(row, parameters[0], 
           parameters[1]), axis=1)

如果我创建这样的池,它似乎不起作用:

    previous_flight_info = {'change_reason_code_previous_flight': 'change_reason_code',
                        'dep_delay_previous_flight': 'dep_delay',
                        'act_trans_time_previous_flight': 'trans_time',
                        'sched_trans_time_previous_flight': 'sched_trans_time',
                        'act_groundtime_previous_flight': 'Act Groundtime',
                        'sched_groundtime_previous_flight': 'Sched Groundtime'}

tasks = [(key, value, flight_info_df) for key, value in previous_flight_info.items()]
         
pool = mp.Pool()

results = pool.map(perform_task, tasks)
pool.close()
pool.join()

print(results)

问题是,如果我对

previous_flight_info
中的每个关键项组合执行此 .apply() 函数,则需要大约 3 分钟才能运行。如果我使用上面的示例,我不会看到 CPU 使用率有任何增加,并且我的内核最终会超时。

python pandas multithreading
1个回答
0
投票

multiprocessing
不起作用,因为每个进程都有自己的数据帧副本,并且更改不会传播到父进程中。

您可以使用

apply
做一些事情,而不是用
groupby
做大量工作。

这里的想法是对原始数据帧进行排序,以便航班按顺序排列,然后按用于计算之前航班的对(

(dep_sched_date, ac_registration)
元组)进行分组,然后您可以简单地从每个组中进行分配:

import pandas as pd

pd.options.display.width = 0

flight_info_df = pd.DataFrame(
    {
        "dep_sched_date": ["2020-01-01", "2020-01-01", "2020-01-02", "2020-01-02", "2020-01-02", "2020-01-02", "2020-01-02"],
        "ac_registration": ["B-1234", "B-1234", "B-1234", "B-1234", "B-2345", "B-2345", "B-2345"],
        "n_flight_of_day": [1, 2, 1, 2, 1, 2, 3],
        "dep_delay": [7, 3, 5, 2, 0, 0.6, -8],
        "meals": ["beef", "chicken", "vegetarian", "just cheese", "soup", "tart", "beef"],
    }
)

flight_info_df.sort_values(
    ["dep_sched_date", "ac_registration", "n_flight_of_day"], inplace=True
)
grouped = flight_info_df.groupby(["dep_sched_date", "ac_registration"])

flight_info_df["prev_dep_delay"] = grouped["dep_delay"].shift(1)
flight_info_df["prev_meals"] = grouped["meals"].shift(1)

print(flight_info_df)

打印出来

  dep_sched_date ac_registration  n_flight_of_day  dep_delay        meals  prev_dep_delay  prev_meals
0     2020-01-01          B-1234                1        7.0         beef             NaN         NaN
1     2020-01-01          B-1234                2        3.0      chicken             7.0        beef
2     2020-01-02          B-1234                1        5.0   vegetarian             NaN         NaN
3     2020-01-02          B-1234                2        2.0  just cheese             5.0  vegetarian
4     2020-01-02          B-2345                1        0.0         soup             NaN         NaN
5     2020-01-02          B-2345                2        0.6         tart             0.0        soup
6     2020-01-02          B-2345                3       -8.0         beef             0.6        tart
© www.soinside.com 2019 - 2024. All rights reserved.