我拥有多个 PySpark DataFrame,需要连接或联合以生成具有以下结构的最终 DataFrame: 输入:
df1 :[colA, colB, colC, avg_salary_y2020]
df2 :[colA, colB, colC, avg_salary_y2021]
df3 :[colA, colB, colC, avg_salary_y2022]
输出:
df_final: [colA, colB, colC, avg_salary_y2020, avg_salary_y2021, avg_salary_y2022]
有一个优雅的解决方案来完成这个任务吗?
我构建了这个解决方案:
from pyspark.sql import DataFrame
from pyspark.sql.functions import lit, col
def append_dfs(df1: DataFrame, df2: DataFrame) -> DataFrame:
"""Append two DataFrames, aligning their columns by adding missing columns as None."""
list1 = set(df1.columns)
list2 = set(df2.columns)
# Add missing columns in df1
for col in list2 - list1:
df1 = df1.withColumn(col, lit(None))
# Add missing columns in df2
for col in list1 - list2:
df2 = df2.withColumn(col, lit(None))
return df1.unionByName(df2)
def create_final_dataframe(spark_dfs: dict) -> DataFrame:
"""Create the final DataFrame by concatenating all yearly DataFrames."""
union_df = None # Initialize an empty DataFrame to union
years = set() # To store unique years
for name, df in spark_dfs.items():
# Print schema for debugging
print(f"Schema of {name}:")
df.printSchema()
df.show(truncate=False) # Show all columns without truncation
# Ensure the year column exists
if "year" not in df.columns:
print(f"Year column missing in DataFrame {name}.")
continue
# Get the year from the DataFrame's year column
year = df.select("year").distinct().first()[0]
years.add(year) # Add the year to the set
# Check if the required columns exist
required_columns = ["id", "area_code", "occupation_code", "job_characteristic_code", "average_hourly_wage_($)"]
for col in required_columns:
if col not in df.columns:
print(f"Column {col} is missing in DataFrame {name}.")
return None, [] # Early exit if required columns are missing
# Select columns without renaming static ones
df_selected = df.select(
"id",
"area_code",
"occupation_code",
"job_characteristic_code",
col("average_hourly_wage_($)").alias(f"avg_hourly_wage_{year}") # Rename wage column dynamically
)
# Union the DataFrame using append_dfs
if union_df is None:
union_df = df_selected # Initialize the union DataFrame
else:
union_df = append_dfs(union_df, df_selected) # Use the append function to union DataFrames
return union_df, sorted(years) # Return both the DataFrame and the sorted list of years
# Create the final DataFrame
final_df, unique_years = create_final_dataframe(all_spark_dfs)
# Create the list of output columns based on the unique years
dynamic_output_columns = (
['id', 'area_code', 'occupation_code', 'job_characteristic_code'] +
[f'avg_hourly_wage_{year}' for year in unique_years]
)
# Check available columns in final_df before selecting
if final_df:
print("Available columns in final_df before selection:")
print(final_df.columns) # Print columns for debugging
# Select only the required columns if they exist
available_columns = [col for col in dynamic_output_columns if col in final_df.columns]
if available_columns:
final_df = final_df.select(available_columns).distinct() # Use distinct to avoid duplicates
final_df.show()
else:
print("No available columns to select in final_df.")
else:
print("final_df is None or empty.")
我希望这对任何人都有帮助。