多键GroupBy,一键共享数据

问题描述 投票:0回答:1

我正在处理一个大型数据集,其中包含由日期和组 ID 标识的多个唯一数据组。每个组包含多个 ID,每个 ID 具有多个属性。这是我的数据的简化结构:

| date       | group_id | inner_id | attr_a | attr_b | attr_c |
|------------|----------|----------|--------|--------|--------|
| 2023-06-01 | A1       | 001      | val    | val    | val    |
| 2023-06-01 | A1       | 002      | val    | val    | val    |
...

此外,对于每个日期,我都有一个与之关联的大矩阵:

| date       | matrix       |
|------------|--------------|
| 2023-06-01 | [[...], ...] |
...

我需要为每个日期和 group_id 应用一个函数,使用组属性和与该日期关联的矩阵来处理数据。该函数如下所示:

def run(group_data: pd.DataFrame, matrix) -> pd.DataFrame:
    # process data
    return processed_data

这里,

group_data
包含特定组的属性:

| inner_id | attr_a | attr_b | attr_c |
|----------|--------|--------|--------|
| 001      | val    | val    | val    |
...

这是我当前的实现,它有效,但我一次只能运行约 200 个日期,因为我将所有数据广播给所有工作人员(我有约 2k 日期,每个日期约 100 个组,每组约 150 个内部元素)

def calculate_metrics(data: DataFrame, matrices: DataFrame) -> DataFrame:
    # Convert matrices to a dictionary mapping dates to matrix
    date_matrices = matrices.rdd.collectAsMap()

    # Broadcast the matrices
    broadcasted_matrices = spark_context.broadcast(date_matrices)

    # Function to apply calculations
    def apply_calculation(group_key: Tuple[str, str], data_group: pd.DataFrame) -> pd.DataFrame:
        date = group_key[1]
        return custom_calculation_function(broadcasted_matrices.value[date], data_group)

    # Apply the function to each group
    return data.groupby('group_id', 'date').applyInPandas(apply_calculation, schema_of_result)

如何优化此计算以有效地并行处理,确保矩阵不会不必要地多余地加载到内存中?

python pandas pyspark
1个回答
0
投票

您似乎不想将所有矩阵广播给所有工作人员,从而导致相当大的开销。 这个答案似乎也解决了类似的问题,即在函数范围之外访问数据会导致大量计算。

我自己还不完全熟悉 pySpark,但我认为 Spark 可以相当高效地处理

join
(基本上是您的查找
broadcasted_matrices.value[date]
)和
groupby
。也许你可以尝试这样的事情:

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import DataFrame
from typing import Tuple

# this was just used for my type hinting
spark_context = SparkContext()
schema = None

def custom_calculation_function(group_key: Tuple[str, str], data_group: pd.DataFram) -> pd.DataFrame:
    matrix = data_group["matrix"]
    rest_of_data = data_group.loc[:, data_group.columns != 'matrix']
    ... # whatever you want to do

def calculate_metrics(data: DataFrame, matrices: DataFrame) -> DataFrame:
    return (data
            .join(matrices, on="date", how="left")
            .groupby('date', 'group_id', "inner_id")
            .applyInPandas(custom_calculation_function, schema)
            )
# maybe some additional work / intermediate schema is needed to construct your final schema
© www.soinside.com 2019 - 2024. All rights reserved.