我有一个像这样的 pyspark 数据框
id | trx_假期 | trx_外卖 | trx_宠物 | 最大值 | 最大 |
---|---|---|---|---|---|
1 | 12.5 | 5.5 | 9.5 | 12.5 | trx_假期 |
2 | 3.0 | 14.0 | 6.7 | 14.0 | trx_外卖 |
我想创建第二列“MAX_2”,其中包含客户花费第二多的类别。
我想调整下面的代码(请参阅:如何获取 pyspark 数据框中具有最大值的列的名称),方法是从创建“max_value”列的 WithColumn 语句中排除“MAX”列中的 column_name。
cond = "psf.when" + ".when".join(["(psf.col('" + c + "') == psf.col('max_value'), psf.lit('" + c + "'))" for c in df.columns])
df = df.withColumn("max_value", psf.greatest(*[c for c in columns])) \
.withColumn("MAX", when(cond, 1).otherwise(0))
理想情况下,新的数据框看起来像这样。 |身份证号 | trx_holiday |trx_takeout |trx_pet |max_value |MAX |max_value_2 |MAX_2 | | ----| ------------|------------|----------|------------| ------------|------------|--------| | 1 | 12.5 |5.5 |9.5 |12.5 |trx_holiday |9.5 |trx_pet| | 2 | 3.0 |14.0 |6.7 |14.0 |trx_takeout |6.7 |trx_pet|
为了实现所需的输出,您应该识别最大值和相应的列名称,并通过排除步骤中找到的最大列来识别第二个最大值...如下所示:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, array, sort_array, struct
# Initialize SparkSession
spark = SparkSession.builder \
.appName("Max and Second Max Columns") \
.getOrCreate()
# Sample data
data = [
(1, 12.5, 5.5, 9.5),
(2, 3.0, 14.0, 6.7)
]
# Define schema
columns = ["id", "trx_holiday", "trx_takeout", "trx_pet"]
# Create DataFrame
df = spark.createDataFrame(data, columns)
# Step 1: Identify max value and corresponding column
df = df.withColumn(
"sorted_array",
sort_array(array(
struct(col("trx_holiday"), col("trx_holiday").alias("trx_holiday")),
struct(col("trx_takeout"), col("trx_takeout").alias("trx_takeout")),
struct(col("trx_pet"), col("trx_pet").alias("trx_pet"))
), asc=False)
).withColumn("max_value", col("sorted_array")[0][0]) \
.withColumn("MAX", col("sorted_array")[0][1]) \
.withColumn("max_value_2", col("sorted_array")[1][0]) \
.withColumn("MAX_2", col("sorted_array")[1][1]) \
.drop("sorted_array")
# Show the result
df.show()