我有一个 pyspark 数据框,如下所示:
+--------------------------+--------------+
| score |review |
+--------------------------+--------------+
|[83.52, 81.79, 84, 75] |[P,N,P,P] |
|[86.13, 85.48] |[N,N,N,P] |
+--------------------------+--------------+
该数据框的架构是这样的:
root
|--score: array (nullable = true)
| |-- element: string (containsNull = true)
|--review : array (nullable = true)
| |-- element: string (containsNull = true)
我想找到
score
的每列值的平均值和 review
的每列值的众数,并用它们创建新列。这些新列的数据类型应分别为浮点型和字符串型。像这样:
+--------------------------+--------------+---------+----------+
| score |review |scoreMean|reviewMode|
+--------------------------+--------------+---------+----------+
|[83.52, 81.79, 84, 75] |[P,N,P,P] |81.08 |P |
|[86.13, 85.48] |[N,N,N,P] |85.81 |N |
+--------------------------+--------------+---------+----------+
我已经使用udf尝试过了:
import statistics
import pyspark.sql.functions as F
#define UDFs
def mean_udf(data):
if len(data) == 0:
return None
data_float = [eval(i) for i in data]
return statistics.mean(data_float)
def mode_udf(data):
if len(data) == 0:
return None
return statistics.mode(data)
#register the UDFs
mean_func = F.udf(mean_udf)
mode_func = F.udf(mode_udf)
#apply UDFs
df= (df.withColumn("scoreMean", mean_func(F.col("score")))
.withColumn("reviewMode", mode_func(F.col("review")))
)
但是在评估过程中它会抛出计算失败错误(当我执行
show
或collect
时)。
欢迎任何帮助。谢谢。
我尝试了使用
mean_score
和 group by
表达式和自定义 udf 混合更改 explode
计算来获取 mode_review
,如下所示:
# Define the schema for the DataFrame
schema = StructType([
StructField("score", ArrayType(DoubleType(), True), True),
StructField("review", ArrayType(StringType(), True), True)
])
# Define the data to be used in the DataFrame
data = [([83.52, 81.79, 84.0, 75.0], ["P", "N", "P", "P"]), ([86.13, 85.48], ["N", "N", "N", "P"])]
# Create a DataFrame with the defined schema and data
df = spark.createDataFrame(data, schema)
# Define a function to calculate the mode of a list
def mode_of_list(input_list):
return max(set(input_list), key=input_list.count)
# Register the function as a UDF (User Defined Function)
modeUDF = udf(mode_of_list, StringType())
# Create a new DataFrame with the following columns: score, review, mean score and mode review
df_result = (df
.withColumn("score_exploded", explode(col("score")))
.groupBy("review")
.agg(avg("score_exploded").alias("mean_score"))
.withColumn("mode_review", modeUDF(col("review")))
.join(df.alias("input"), on="review")
.selectExpr("input.score", "input.review", "mean_score", "mode_review"))
# Display the DataFrame with the mean score and median review
df_result.show(truncate=False)
我在没有使用任何 UDF 的情况下解决了这个问题。该方法与此处已有的答案之一有点相似。为了计算
mean
,我对每个列值中的列表进行了 exolod,然后使用 groupBy
和 aggregate
功能。
df_score_exploded = (
df.select(df.id,
F.explode(df.score)
)
)
df_score_exploded = (
df_score_exploded
.groupby("id")
.agg(F.avg("col").alias("scoreMean"))
)
df= (df.join(df_score_exploded,
df["id"] == df_score_exploded["id"]
)
.select(df["id"], df["score"], df["review"], df_score_exploded ["scoreMean"])
)