Dask 数据帧并行运行并按列分区

问题描述 投票:0回答:1

我有一个包含多个公司和国家/地区数据的数据框,我正在尝试使用函数并行转换这些数据。数据采用这样的格式,但更大并且有更多的客户端:

enter image description here

我制作了一个 dask 数据框,它结合了公司组名称和国家/地区来创建一个混合列,我试图将其用作分区的索引。

在制作 dask 数据帧时,我不想使用 npartitions > 1,因为这样可以随机地将客户的一半数据放入一个分区,然后将另一半放入另一个分区(据我所知)

这是我尝试过的,我的问题是如何正确地将数据帧分割成指定的分区,以便通过我的转换函数并行处理每个分区?

提前非常感谢您的帮助

def calculate_predictions(df):

    df['CompanyCountryCombined'] = df['CompanyGroupName'].astype(str) + '-' + df['Country']

    # Convert pandas DataFrame to Dask DataFrame with one partition
    ddf = dd.from_pandas(df, npartitions=1)
    ddf = ddf.set_index('CompanyCountryCombined', sorted=True).repartition(partition_size="10MB")  # Here I am trying to repartition the data across the combined index
    list_of_output_columns = [] # defined elsewhere

    # Define meta DataFrame to specify the output format
    meta = pd.DataFrame(columns=[list_of_output_columns])

    ddf = ddf.sort_values(by=['CompanyCountryCombined','Date'])

    models = {} # defined elsewhere too

    # Processing the partitions through the process_forecast_group function which is also defined elsewhere
    final_results = ddf.groupby('CompanyCountryCombined').apply(
        lambda partition: process_forecast_group(partition, models), 
        meta=meta
    ).compute()

    return final_results
python parallel-processing multiprocessing dask dask-dataframe
1个回答
0
投票

我最终通过获取所需分区的唯一列表然后对这些分区重新分区来解决这个问题:

import dask.dataframe as dd
import pandas as pd

def calculate_predictions(df):

    def create_dask_df(pandas_df, columns_to_combine=list):
        # Creates a column that will be used to partition the data based on desired column list
        pandas_df['combined'] = pandas_df[columns_to_combine].astype(str).agg('-'.join, axis=1)
        # Dask requires the data to be partitioned, we set it to 1 initially to create the ddf with all the data in one dataframe
        ddf = dd.from_pandas(df, npartitions=1)
        # Sets the index of the dask dataframe to the combined column 
        ddf = ddf.set_index('combined', sorted=True)
        # Gets the list of unique combined values into a list, ready to repartition by these values
        list_of_uniques = sorted(df['combined'].unique().tolist())
        # The range of dask repartitioning cuts the list short by 1 so we need to increase it intentionally with a dupe
        division_list = list_of_uniques + [list_of_uniques[-1]]
        # Repartitions the dask dataframe based on the list of unique combined column
        ddf = ddf.repartition(divisions=division_list)
        return ddf
    
    columns_to_partition_over = ['CompanyGroupName', 'Country']
    ddf = create_dask_df(df, columns_to_partition_over)

    list_of_output_columns = [] # defined elsewhere
    meta = pd.DataFrame(columns=[list_of_output_columns]) # Define meta DataFrame to specify the output format
    models = {} # defined elsewhere too

    # Processing the partitions through the process_forecast_group function which is also defined elsewhere
    final_results = ddf.map_partitions(
        lambda partition: process_forecast_group(partition.sort_values('Date'), models), 
        meta=meta
    ).compute()

    return final_results
© www.soinside.com 2019 - 2024. All rights reserved.