通过多个自定义函数优化 Pandas GroupBy 和大型数据集的聚合

问题描述 投票:0回答:1

我正在处理一个大型 Pandas 数据框(大约 3050 万行),我需要按多列进行分组并应用不同的自定义聚合函数。但目前性能是瓶颈。

这是我当前方法的简化版本:

import pandas as pd
 
df = pd.DataFrame({
    'category': ['A', 'B', 'A', 'B'] * 10**6,
    'subcategory': ['X', 'Y', 'X', 'Y'] * 10**6,
    'value': [1, 2, 3, 4] * 10**6,
    'quantity': [10, 20, 30, 40] * 10**6
})
 
agg_functions = {
    'value': ['sum', 'mean'],
    'quantity': [lambda x: x.sum(), lambda x: (x > 20).mean()]
}

result = df.groupby(['category', 'subcategory']).agg(agg_functions)

问题:

1.数据帧太大,上面的代码在处理实际数据时会耗尽内存。

2.一些自定义聚合函数(例如用于数量的 lambda 函数)速度很慢,并且妨碍了矢量化运算的有效使用。

3.我想以更节省内存的方式应用这些聚合,而不需要对数据进行分块。

问题:

1.如何优化 groupby 和 agg 操作以更有效地处理大型数据集,特别是在应用自定义函数时?

2.是否有任何先进的技术(例如使用 numba、cython 或并行处理)来加速 Pandas groupby 中的自定义聚合?

3.在这种情况下,将数据帧转换为不同的结构(例如使用 Polars、Dask 或 PySpark)是否会提供显着的性能改进,或者是否有办法将其保留在 Pandas 中并仍然优化性能?

我正在寻找内存管理和速度优化的性能技巧,同时保持自定义聚合的灵活性。任何有关最佳实践或先进技术的指导将不胜感激!

我尝试过的:

1。使用 Pandas 进行基本 GroupBy: 我开始使用 Pandas 的 groupby() 函数和自定义聚合函数(例如 lambda 函数),但对于拥有数百万行的数据集来说,它太慢了。 lambda 函数,尤其是条件聚合,显着降低了性能。

2。优化内存使用:我尝试通过将列转换为更高效的数据类型(分类数据为类别,整数为 int32)来减少内存消耗,但虽然它有助于内存使用,但 groupby 和聚合的执行时间仍然非常慢。

3.分块数据: 我尝试使用 pd.read_csv() 和 chunksize 来处理块中的数据,但是管理块并跨块执行 groupby 操作变得很困难,并且合并结果增加了更多开销。

4。使用 joblib 进行多线程: 我使用 joblib 并行化自定义聚合函数的应用程序,但性能增益很小,可能是由于自定义 lambda 函数的性质和管理多个线程的开销。

5。 Dask DataFrame: 我尝试使用 Dask 在多个核心之间分配工作负载,但管理自定义功能并确保有效分配所有操作会增加复杂性,而不会显着提高性能。

python pandas numpy group-by dask
1个回答
0
投票
import pandas as pd  # pip install  pandas
import numpy as np
from numba import njit  # pip install numba


class DataFrameOptimizer:
    def __init__(self, dataframe):
        self.df = dataframe
        self._optimize_memory_usage()

    def _optimize_memory_usage(self):
        """
        Data type conversion to reduce memory consumption.
        """
        for col in self.df.select_dtypes(include=['int', 'float']).columns:
            self.df[col] = pd.to_numeric(self.df[col], downcast='unsigned')
        for col in self.df.select_dtypes(include=['object']).columns:
            num_unique_values = len(self.df[col].unique())
            num_total_values = len(self.df[col])
            if num_unique_values / num_total_values < 0.5:
                self.df[col] = self.df[col].astype('category')

    @staticmethod
    @njit
    def _conditional_mean_numba(values):
        """
        Speeding up conditional average calculation using Numba.
        """
        count = 0
        total = 0
        for val in values:
            if val > 20:
                total += 1
            count += 1
        return total / count if count > 0 else 0

    def optimized_groupby_agg(self):
        """
        Optimized groupby method using vectorization and Numba.
        """
        # Defining standard aggregations for the 'value' column
        agg_value = self.df['value'].agg(['sum', 'mean'])

        # Using a custom function with Numba for 'quantity'
        quantity_values = self.df['quantity'].values
        conditional_mean = DataFrameOptimizer._conditional_mean_numba(
            quantity_values)
        quantity_sum = np.sum(quantity_values)

        result = {
            'value_sum': agg_value['sum'],
            'value_mean': agg_value['mean'],
            'quantity_sum': quantity_sum,
            'quantity_conditional_mean': conditional_mean
        }
        return result


if __name__ == "__main__":
    df = pd.DataFrame({
        'category': ['A', 'B', 'A', 'B'] * 10**6,
        'subcategory': ['X', 'Y', 'X', 'Y'] * 10**6,
        'value': [1, 2, 3, 4] * 10**6,
        'quantity': [10, 20, 30, 40] * 10**6
    })

    optimizer = DataFrameOptimizer(df)
    print(optimizer.optimized_groupby_agg())

© www.soinside.com 2019 - 2024. All rights reserved.