我有一个 pandas 数据框,我想在其上对几列应用行操作。返回我需要的 pandas 系列的函数如下所示:
def get_info_previous_flight(row, original_column_name, new_column_name):
if row['n_flight_of_day'] == 1:
return 0
else:
flight_date = row['dep_sched_date']
ac_registration = row['ac_registration']
previous_flight_number = row['n_flight_of_day'] - 1
value_previous_flight = flight_info_df.loc[
(flight_info_df['ac_registration'] == ac_registration) &
(flight_info_df['dep_sched_date'] == flight_date) &
(flight_info_df['n_flight_of_day'] == previous_flight_number)
][['leg_no', original_column_name]]
output = pd.Series({'leg_no': value_previous_flight['leg_no'], new_column_name: value_previous_flight[original_column_name]})
return output
我用来为多列执行此任务的函数如下:
def perform_task(parameters):
return parameters[2].apply(lambda row: get_info_previous_flight(row, parameters[0],
parameters[1]), axis=1)
如果我创建这样的池,它似乎不起作用:
previous_flight_info = {'change_reason_code_previous_flight': 'change_reason_code',
'dep_delay_previous_flight': 'dep_delay',
'act_trans_time_previous_flight': 'trans_time',
'sched_trans_time_previous_flight': 'sched_trans_time',
'act_groundtime_previous_flight': 'Act Groundtime',
'sched_groundtime_previous_flight': 'Sched Groundtime'}
tasks = [(key, value, flight_info_df) for key, value in previous_flight_info.items()]
pool = mp.Pool()
results = pool.map(perform_task, tasks)
pool.close()
pool.join()
print(results)
问题是,如果我对
previous_flight_info
中的每个关键项组合执行此 .apply() 函数,则需要大约 3 分钟才能运行。如果我使用上面的示例,我不会看到 CPU 使用率有任何增加,并且我的内核最终会超时。
multiprocessing
不起作用,因为每个进程都有自己的数据帧副本,并且更改不会传播到父进程中。
您可以使用
apply
做一些事情,而不是用 groupby
做大量工作。
这里的想法是对原始数据帧进行排序,以便航班按顺序排列,然后按用于计算之前航班的对(
(dep_sched_date, ac_registration)
元组)进行分组,然后您可以简单地从每个组中进行分配:
import pandas as pd
pd.options.display.width = 0
flight_info_df = pd.DataFrame(
{
"dep_sched_date": ["2020-01-01", "2020-01-01", "2020-01-02", "2020-01-02", "2020-01-02", "2020-01-02", "2020-01-02"],
"ac_registration": ["B-1234", "B-1234", "B-1234", "B-1234", "B-2345", "B-2345", "B-2345"],
"n_flight_of_day": [1, 2, 1, 2, 1, 2, 3],
"dep_delay": [7, 3, 5, 2, 0, 0.6, -8],
"meals": ["beef", "chicken", "vegetarian", "just cheese", "soup", "tart", "beef"],
}
)
flight_info_df.sort_values(
["dep_sched_date", "ac_registration", "n_flight_of_day"], inplace=True
)
grouped = flight_info_df.groupby(["dep_sched_date", "ac_registration"])
flight_info_df["prev_dep_delay"] = grouped["dep_delay"].shift(1)
flight_info_df["prev_meals"] = grouped["meals"].shift(1)
print(flight_info_df)
打印出来
dep_sched_date ac_registration n_flight_of_day dep_delay meals prev_dep_delay prev_meals
0 2020-01-01 B-1234 1 7.0 beef NaN NaN
1 2020-01-01 B-1234 2 3.0 chicken 7.0 beef
2 2020-01-02 B-1234 1 5.0 vegetarian NaN NaN
3 2020-01-02 B-1234 2 2.0 just cheese 5.0 vegetarian
4 2020-01-02 B-2345 1 0.0 soup NaN NaN
5 2020-01-02 B-2345 2 0.6 tart 0.0 soup
6 2020-01-02 B-2345 3 -8.0 beef 0.6 tart