我正在处理一个大型 Pandas 数据框(大约 3050 万行),我需要按多列进行分组并应用不同的自定义聚合函数。但目前性能是瓶颈。
这是我当前方法的简化版本:
import pandas as pd
df = pd.DataFrame({
'category': ['A', 'B', 'A', 'B'] * 10**6,
'subcategory': ['X', 'Y', 'X', 'Y'] * 10**6,
'value': [1, 2, 3, 4] * 10**6,
'quantity': [10, 20, 30, 40] * 10**6
})
agg_functions = {
'value': ['sum', 'mean'],
'quantity': [lambda x: x.sum(), lambda x: (x > 20).mean()]
}
result = df.groupby(['category', 'subcategory']).agg(agg_functions)
问题:
1.数据帧太大,上面的代码在处理实际数据时会耗尽内存。
2.一些自定义聚合函数(例如用于数量的 lambda 函数)速度很慢,并且妨碍了矢量化运算的有效使用。
3.我想以更节省内存的方式应用这些聚合,而不需要对数据进行分块。
问题:
1.如何优化 groupby 和 agg 操作以更有效地处理大型数据集,特别是在应用自定义函数时?
2.是否有任何先进的技术(例如使用 numba、cython 或并行处理)来加速 Pandas groupby 中的自定义聚合?
3.在这种情况下,将数据帧转换为不同的结构(例如使用 Polars、Dask 或 PySpark)是否会提供显着的性能改进,或者是否有办法将其保留在 Pandas 中并仍然优化性能?
我正在寻找内存管理和速度优化的性能技巧,同时保持自定义聚合的灵活性。任何有关最佳实践或先进技术的指导将不胜感激!
我尝试过的:
1。使用 Pandas 进行基本 GroupBy: 我开始使用 Pandas 的 groupby() 函数和自定义聚合函数(例如 lambda 函数),但对于拥有数百万行的数据集来说,它太慢了。 lambda 函数,尤其是条件聚合,显着降低了性能。
2。优化内存使用:我尝试通过将列转换为更高效的数据类型(分类数据为类别,整数为 int32)来减少内存消耗,但虽然它有助于内存使用,但 groupby 和聚合的执行时间仍然非常慢。
3.分块数据: 我尝试使用 pd.read_csv() 和 chunksize 来处理块中的数据,但是管理块并跨块执行 groupby 操作变得很困难,并且合并结果增加了更多开销。
4。使用 joblib 进行多线程: 我使用 joblib 并行化自定义聚合函数的应用程序,但性能增益很小,可能是由于自定义 lambda 函数的性质和管理多个线程的开销。
5。 Dask DataFrame: 我尝试使用 Dask 在多个核心之间分配工作负载,但管理自定义功能并确保有效分配所有操作会增加复杂性,而不会显着提高性能。
import pandas as pd # pip install pandas
import numpy as np
from numba import njit # pip install numba
class DataFrameOptimizer:
def __init__(self, dataframe):
self.df = dataframe
self._optimize_memory_usage()
def _optimize_memory_usage(self):
"""
Data type conversion to reduce memory consumption.
"""
for col in self.df.select_dtypes(include=['int', 'float']).columns:
self.df[col] = pd.to_numeric(self.df[col], downcast='unsigned')
for col in self.df.select_dtypes(include=['object']).columns:
num_unique_values = len(self.df[col].unique())
num_total_values = len(self.df[col])
if num_unique_values / num_total_values < 0.5:
self.df[col] = self.df[col].astype('category')
@staticmethod
@njit
def _conditional_mean_numba(values):
"""
Speeding up conditional average calculation using Numba.
"""
count = 0
total = 0
for val in values:
if val > 20:
total += 1
count += 1
return total / count if count > 0 else 0
def optimized_groupby_agg(self):
"""
Optimized groupby method using vectorization and Numba.
"""
# Defining standard aggregations for the 'value' column
agg_value = self.df['value'].agg(['sum', 'mean'])
# Using a custom function with Numba for 'quantity'
quantity_values = self.df['quantity'].values
conditional_mean = DataFrameOptimizer._conditional_mean_numba(
quantity_values)
quantity_sum = np.sum(quantity_values)
result = {
'value_sum': agg_value['sum'],
'value_mean': agg_value['mean'],
'quantity_sum': quantity_sum,
'quantity_conditional_mean': conditional_mean
}
return result
if __name__ == "__main__":
df = pd.DataFrame({
'category': ['A', 'B', 'A', 'B'] * 10**6,
'subcategory': ['X', 'Y', 'X', 'Y'] * 10**6,
'value': [1, 2, 3, 4] * 10**6,
'quantity': [10, 20, 30, 40] * 10**6
})
optimizer = DataFrameOptimizer(df)
print(optimizer.optimized_groupby_agg())