动态组合多个 PySpark DataFrame:将每个 DataFrame 的静态列与年度动态列合并

问题描述 投票:0回答:1

我拥有多个 PySpark DataFrame,需要连接或联合以生成具有以下结构的最终 DataFrame: 输入:

df1 :[colA, colB, colC, avg_salary_y2020]
df2 :[colA, colB, colC, avg_salary_y2021]
df3 :[colA, colB, colC, avg_salary_y2022]

输出:

df_final: [colA, colB, colC, avg_salary_y2020, avg_salary_y2021, avg_salary_y2022]

有一个优雅的解决方案来完成这个任务吗?

python pandas pyspark fabric
1个回答
0
投票

我构建了这个解决方案:

from pyspark.sql import DataFrame
from pyspark.sql.functions import lit, col

def append_dfs(df1: DataFrame, df2: DataFrame) -> DataFrame:
    """Append two DataFrames, aligning their columns by adding missing columns as None."""
    list1 = set(df1.columns)
    list2 = set(df2.columns)
    
    # Add missing columns in df1
    for col in list2 - list1:
        df1 = df1.withColumn(col, lit(None))
    
    # Add missing columns in df2
    for col in list1 - list2:
        df2 = df2.withColumn(col, lit(None))
    
    return df1.unionByName(df2)

def create_final_dataframe(spark_dfs: dict) -> DataFrame:
    """Create the final DataFrame by concatenating all yearly DataFrames."""
    union_df = None  # Initialize an empty DataFrame to union
    years = set()  # To store unique years

    for name, df in spark_dfs.items():
        # Print schema for debugging
        print(f"Schema of {name}:")
        df.printSchema()
        df.show(truncate=False)  # Show all columns without truncation
        
        # Ensure the year column exists
        if "year" not in df.columns:
            print(f"Year column missing in DataFrame {name}.")
            continue
        
        # Get the year from the DataFrame's year column
        year = df.select("year").distinct().first()[0]
        years.add(year)  # Add the year to the set

        # Check if the required columns exist
        required_columns = ["id", "area_code", "occupation_code", "job_characteristic_code", "average_hourly_wage_($)"]
        for col in required_columns:
            if col not in df.columns:
                print(f"Column {col} is missing in DataFrame {name}.")
                return None, []  # Early exit if required columns are missing

        # Select columns without renaming static ones
        df_selected = df.select(
            "id",
            "area_code",
            "occupation_code",
            "job_characteristic_code",
            col("average_hourly_wage_($)").alias(f"avg_hourly_wage_{year}")  # Rename wage column dynamically
        )
        
        # Union the DataFrame using append_dfs
        if union_df is None:
            union_df = df_selected  # Initialize the union DataFrame
        else:
            union_df = append_dfs(union_df, df_selected)  # Use the append function to union DataFrames

    return union_df, sorted(years)  # Return both the DataFrame and the sorted list of years

# Create the final DataFrame
final_df, unique_years = create_final_dataframe(all_spark_dfs)

# Create the list of output columns based on the unique years
dynamic_output_columns = (
    ['id', 'area_code', 'occupation_code', 'job_characteristic_code'] + 
    [f'avg_hourly_wage_{year}' for year in unique_years]
)

# Check available columns in final_df before selecting
if final_df:
    print("Available columns in final_df before selection:")
    print(final_df.columns)  # Print columns for debugging
    
    # Select only the required columns if they exist
    available_columns = [col for col in dynamic_output_columns if col in final_df.columns]
    
    if available_columns:
        final_df = final_df.select(available_columns).distinct()  # Use distinct to avoid duplicates
        final_df.show()
    else:
        print("No available columns to select in final_df.")
else:
    print("final_df is None or empty.")

我希望这对任何人都有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.