在 pyspark 数据帧的列值中查找列表的平均值和众数

问题描述 投票:0回答:2

我有一个 pyspark 数据框,如下所示:

+--------------------------+--------------+
| score                    |review        |
+--------------------------+--------------+
|[83.52, 81.79, 84, 75]    |[P,N,P,P]     |
|[86.13, 85.48]            |[N,N,N,P]     |
+--------------------------+--------------+

该数据框的架构是这样的:

root
 |--score: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |--review        : array (nullable = true)
 |    |-- element: string (containsNull = true)

我想找到

score
的每列值的平均值和
review
的每列值的众数,并用它们创建新列。这些新列的数据类型应分别为浮点型和字符串型。像这样:

+--------------------------+--------------+---------+----------+
| score                    |review        |scoreMean|reviewMode|
+--------------------------+--------------+---------+----------+
|[83.52, 81.79, 84, 75]    |[P,N,P,P]     |81.08    |P         |
|[86.13, 85.48]            |[N,N,N,P]     |85.81    |N         |
+--------------------------+--------------+---------+----------+

我已经使用udf尝试过了:

import statistics
import pyspark.sql.functions as F

#define UDFs
def mean_udf(data):
    if len(data) == 0:
        return None
    data_float = [eval(i) for i in data]
    return statistics.mean(data_float)

def mode_udf(data):
    if len(data) == 0:
        return None    
    return statistics.mode(data)

#register the UDFs
mean_func = F.udf(mean_udf)
mode_func = F.udf(mode_udf)

#apply UDFs
df= (df.withColumn("scoreMean", mean_func(F.col("score")))
       .withColumn("reviewMode", mode_func(F.col("review")))
    )

但是在评估过程中它会抛出计算失败错误(当我执行

show
collect
时)。

欢迎任何帮助。谢谢。

python apache-spark pyspark user-defined-functions mean
2个回答
0
投票

我尝试了使用

mean_score
group by
表达式和自定义 udf 混合更改
explode
计算来获取
mode_review
,如下所示:

# Define the schema for the DataFrame
schema = StructType([
    StructField("score", ArrayType(DoubleType(), True), True),
    StructField("review", ArrayType(StringType(), True), True)
])

# Define the data to be used in the DataFrame
data = [([83.52, 81.79, 84.0, 75.0], ["P", "N", "P", "P"]), ([86.13, 85.48], ["N", "N", "N", "P"])]

# Create a DataFrame with the defined schema and data
df = spark.createDataFrame(data, schema)

# Define a function to calculate the mode of a list
def mode_of_list(input_list):
    return max(set(input_list), key=input_list.count)


# Register the function as a UDF (User Defined Function)
modeUDF = udf(mode_of_list, StringType())

# Create a new DataFrame with the following columns: score, review, mean score and mode review
df_result = (df
             .withColumn("score_exploded", explode(col("score")))
             .groupBy("review")
             .agg(avg("score_exploded").alias("mean_score"))
             .withColumn("mode_review", modeUDF(col("review")))
             .join(df.alias("input"), on="review")
             .selectExpr("input.score", "input.review", "mean_score", "mode_review"))

# Display the DataFrame with the mean score and median review
df_result.show(truncate=False)

0
投票

我在没有使用任何 UDF 的情况下解决了这个问题。该方法与此处已有的答案之一有点相似。为了计算

mean
,我对每个列值中的列表进行了 exolod,然后使用
groupBy
aggregate
功能。

df_score_exploded = (
    df.select(df.id, 
              F.explode(df.score)
             )
)

df_score_exploded = (
    df_score_exploded 
    .groupby("id")
    .agg(F.avg("col").alias("scoreMean"))
)

df= (df.join(df_score_exploded,
             df["id"] == df_score_exploded["id"]
            )                                 
            .select(df["id"], df["score"], df["review"], df_score_exploded ["scoreMean"])
     )
© www.soinside.com 2019 - 2024. All rights reserved.