给定一个数据集,如下面的代码(df
)所示,我的要求是能够添加派生列(DerivedCol
)。此列的值对于idcol
行组是常量,并且通过在另一列的值(此处为filter
)上应用a)谓词,然后b)在匹配组上的聚合函数(此处使用max
)派生。
val df = Seq(("id1","k1","7"),("id2","k1","5"),("id1","k3","2"),("id3","k1","4"),("id2","k5","1"),("id4","k5","1"))
.toDF("idcol","keycol","valcol")
val aggDf = df.filter($"keycol" === "k1")
.select($"idcol",$"valcol")
.groupBy($"idcol")
.agg(max($"valcol".cast(IntegerType)).cast(StringType).as("DerivedCol"))
.withColumnRenamed("idcol", "newidcol")
df.join(aggDf, df("idcol") === aggDf("newidcol"), "left_outer")
.drop(aggDf("newidcol"))
我正在使用left outer join
。我的数据集非常庞大(数百万行)。我有以下问题:
idcol
专栏的基数非常高。 Spark版本是2.1.1。
有没有其他方法来实现这一目标?
有 - 窗口功能。
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.expressions.Window
df.withColumn(
"derivedcol",
max($"valcol".cast(IntegerType)).over(Window.partitionBy($"idcol")
)
取决于:
这可能比聚合后加入更好或更差。
我应该使用什么分区逻辑来减少混乱?
可能没有。至少有两个原因: