我有一个包含多个公司和国家/地区数据的数据框,我正在尝试使用函数并行转换这些数据。数据采用这样的格式,但更大并且有更多的客户端:
我制作了一个 dask 数据框,它结合了公司组名称和国家/地区来创建一个混合列,我试图将其用作分区的索引。
在制作 dask 数据帧时,我不想使用 npartitions > 1,因为这样可以随机地将客户的一半数据放入一个分区,然后将另一半放入另一个分区(据我所知)
这是我尝试过的,我的问题是如何正确地将数据帧分割成指定的分区,以便通过我的转换函数并行处理每个分区?
提前非常感谢您的帮助
def calculate_predictions(df):
df['CompanyCountryCombined'] = df['CompanyGroupName'].astype(str) + '-' + df['Country']
# Convert pandas DataFrame to Dask DataFrame with one partition
ddf = dd.from_pandas(df, npartitions=1)
ddf = ddf.set_index('CompanyCountryCombined', sorted=True).repartition(partition_size="10MB") # Here I am trying to repartition the data across the combined index
list_of_output_columns = [] # defined elsewhere
# Define meta DataFrame to specify the output format
meta = pd.DataFrame(columns=[list_of_output_columns])
ddf = ddf.sort_values(by=['CompanyCountryCombined','Date'])
models = {} # defined elsewhere too
# Processing the partitions through the process_forecast_group function which is also defined elsewhere
final_results = ddf.groupby('CompanyCountryCombined').apply(
lambda partition: process_forecast_group(partition, models),
meta=meta
).compute()
return final_results
我最终通过获取所需分区的唯一列表然后对这些分区重新分区来解决这个问题:
import dask.dataframe as dd
import pandas as pd
def calculate_predictions(df):
def create_dask_df(pandas_df, columns_to_combine=list):
# Creates a column that will be used to partition the data based on desired column list
pandas_df['combined'] = pandas_df[columns_to_combine].astype(str).agg('-'.join, axis=1)
# Dask requires the data to be partitioned, we set it to 1 initially to create the ddf with all the data in one dataframe
ddf = dd.from_pandas(df, npartitions=1)
# Sets the index of the dask dataframe to the combined column
ddf = ddf.set_index('combined', sorted=True)
# Gets the list of unique combined values into a list, ready to repartition by these values
list_of_uniques = sorted(df['combined'].unique().tolist())
# The range of dask repartitioning cuts the list short by 1 so we need to increase it intentionally with a dupe
division_list = list_of_uniques + [list_of_uniques[-1]]
# Repartitions the dask dataframe based on the list of unique combined column
ddf = ddf.repartition(divisions=division_list)
return ddf
columns_to_partition_over = ['CompanyGroupName', 'Country']
ddf = create_dask_df(df, columns_to_partition_over)
list_of_output_columns = [] # defined elsewhere
meta = pd.DataFrame(columns=[list_of_output_columns]) # Define meta DataFrame to specify the output format
models = {} # defined elsewhere too
# Processing the partitions through the process_forecast_group function which is also defined elsewhere
final_results = ddf.map_partitions(
lambda partition: process_forecast_group(partition.sort_values('Date'), models),
meta=meta
).compute()
return final_results