我正在处理一个大型数据集,其中包含由日期和组 ID 标识的多个唯一数据组。每个组包含多个 ID,每个 ID 具有多个属性。这是我的数据的简化结构:
| date | group_id | inner_id | attr_a | attr_b | attr_c |
|------------|----------|----------|--------|--------|--------|
| 2023-06-01 | A1 | 001 | val | val | val |
| 2023-06-01 | A1 | 002 | val | val | val |
...
此外,对于每个日期,我都有一个与之关联的大矩阵:
| date | matrix |
|------------|--------------|
| 2023-06-01 | [[...], ...] |
...
我需要为每个日期和 group_id 应用一个函数,使用组属性和与该日期关联的矩阵来处理数据。该函数如下所示:
def run(group_data: pd.DataFrame, matrix) -> pd.DataFrame:
# process data
return processed_data
这里,
group_data
包含特定组的属性:
| inner_id | attr_a | attr_b | attr_c |
|----------|--------|--------|--------|
| 001 | val | val | val |
...
这是我当前的实现,它有效,但我一次只能运行约 200 个日期,因为我将所有数据广播给所有工作人员(我有约 2k 日期,每个日期约 100 个组,每组约 150 个内部元素)
def calculate_metrics(data: DataFrame, matrices: DataFrame) -> DataFrame:
# Convert matrices to a dictionary mapping dates to matrix
date_matrices = matrices.rdd.collectAsMap()
# Broadcast the matrices
broadcasted_matrices = spark_context.broadcast(date_matrices)
# Function to apply calculations
def apply_calculation(group_key: Tuple[str, str], data_group: pd.DataFrame) -> pd.DataFrame:
date = group_key[1]
return custom_calculation_function(broadcasted_matrices.value[date], data_group)
# Apply the function to each group
return data.groupby('group_id', 'date').applyInPandas(apply_calculation, schema_of_result)
如何优化此计算以有效地并行处理,确保矩阵不会不必要地多余地加载到内存中?
您似乎不想将所有矩阵广播给所有工作人员,从而导致相当大的开销。 这个答案似乎也解决了类似的问题,即在函数范围之外访问数据会导致大量计算。
我自己还不完全熟悉 pySpark,但我认为 Spark 可以相当高效地处理
join
(基本上是您的查找 broadcasted_matrices.value[date]
)和 groupby
。也许你可以尝试这样的事情:
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import DataFrame
from typing import Tuple
# this was just used for my type hinting
spark_context = SparkContext()
schema = None
def custom_calculation_function(group_key: Tuple[str, str], data_group: pd.DataFram) -> pd.DataFrame:
matrix = data_group["matrix"]
rest_of_data = data_group.loc[:, data_group.columns != 'matrix']
... # whatever you want to do
def calculate_metrics(data: DataFrame, matrices: DataFrame) -> DataFrame:
return (data
.join(matrices, on="date", how="left")
.groupby('date', 'group_id', "inner_id")
.applyInPandas(custom_calculation_function, schema)
)
# maybe some additional work / intermediate schema is needed to construct your final schema