我需要建议,因为我没有太多使用 Python 的经验。
我正在考虑一种更好的方法(为了性能,如果存在的话)在Python中在同一个表中添加多个LOOKUPVALUE列。因此,我有以下函数可以多次重用(8次),以将 LOOKUPVALUE 添加到带有过滤的表中。
def get_column_value_in_question_by_pk(df, df1, filter, result_column_name, alt_value):
"""Filter column Question by parameter "filter" and rename the lookupvalue column to result_column"""
df1 = df1.filter(F.col("column_filter") == filter).withColumnRenamed(
"lookupvalue_column", result_column_name
)
df = df.join(df1, df1["pk"] == df["pk"], "left").select(df["*"], df1[result_column_name])
df = df.withColumn(
result_column_name,
F.when(F.col(result_column_name).isNull(), alt_value).otherwise(F.col(result_column_name)),
)
return df
现在使用它很好,但是当我重复使用它8次来创建1个表时(因为我需要那些额外的列),那么我认为它效率不高。
有更好的方法吗?
**Input data:**
| pk | column_filter | lookupvalue_column |
| -------- | -------------- | -------------- |
| 123acb | Location | City1 |
| 456bca | Location | City2 |
| 123acb | Question1 | Unhappy |
| 456bca | Question1 | Disappointed |
| 123acb | Question2 | Happy |
| 456bca | Question2 | Very happy |
**Expected output:**
| pk | Name | result_column1(filter on Question1) | result_column2(filter on Question2) | result_column_name3(on Location)|
| -------- | ------------- | --------------------------- | ----------------------- | --------------------------|
| 123acb | Name1 | Unhappy | Happy | City1 |
| 456bca | Name2 | Disappointed | Very happy | City2 |
我调用这样的函数来将新列添加到 df 数据框中:
get_column_value_in_question_by_pk(df, df1, "Location", "result_column_name3", "None")
pk
有重复的行,它会选取第一次出现的情况。
from pyspark.sql import functions as f
data1 = [
("123acb", "Location", "City1",),
("456bca", "Location", "City2"),
("123acb", "Question1", "Unhappy"),
("456bca", "Question1", "Disappointed"),
("123acb", "Question2", "Happy"),
("456bca", "Question2", "Very Happy"),
]
columns1 = ["pk", "column_filter", "lookupvalue_column"]
df1 = spark.createDataFrame(data1, columns1)
data2 = [
("123acb", "Name1"),
("456bca", "Name2"),
]
columns2 = ["pk", "Name"]
df2 = spark.createDataFrame(data2, columns2)
df_pivot = (
df.withColumn("pivot_col", f.concat(f.lit("result_"), f.col("column_filter")))
.groupBy("pk")
.pivot("pivot_col")
.agg(f.first(f.col("lookupvalue_column")))
)
df_result = (
df2
.join(df_pivot, ["pk"], "left")
)
结果df_pivot
:
+------+---------------+----------------+----------------+
| pk|result_Location|result_Question1|result_Question2|
+------+---------------+----------------+----------------+
|123acb| City1| Unhappy| Happy|
|456bca| City2| Disappointed| Very Happy|
+------+---------------+----------------+----------------+
结果df_result
:
+------+-----+---------------+----------------+----------------+
| pk| Name|result_Location|result_Question1|result_Question2|
+------+-----+---------------+----------------+----------------+
|123acb|Name1| City1| Unhappy| Happy|
|456bca|Name2| City2| Disappointed| Very Happy|
+------+-----+---------------+----------------+----------------+