Spark数据帧:派生列的连接方法

问题描述 投票:0回答:1

给定一个数据集,如下面的代码(df)所示,我的要求是能够添加派生列(DerivedCol)。此列的值对于idcol行组是常量,并且通过在另一列的值(此处为filter)上应用a)谓词,然后b)在匹配组上的聚合函数(此处使用max)派生。

val df = Seq(("id1","k1","7"),("id2","k1","5"),("id1","k3","2"),("id3","k1","4"),("id2","k5","1"),("id4","k5","1"))
  .toDF("idcol","keycol","valcol")

val aggDf = df.filter($"keycol" === "k1")
  .select($"idcol",$"valcol")
  .groupBy($"idcol")
  .agg(max($"valcol".cast(IntegerType)).cast(StringType).as("DerivedCol"))
  .withColumnRenamed("idcol", "newidcol")

df.join(aggDf, df("idcol") === aggDf("newidcol"), "left_outer")
  .drop(aggDf("newidcol"))

我正在使用left outer join。我的数据集非常庞大(数百万行)。我有以下问题:

  1. 有没有其他方法来实现这一目标?
  2. 我应该使用什么分区逻辑来减少混乱?

idcol专栏的基数非常高。 Spark版本是2.1.1。

scala apache-spark apache-spark-sql
1个回答
1
投票

有没有其他方法来实现这一目标?

有 - 窗口功能。

import org.apache.spark.sql.functions.max
import org.apache.spark.sql.expressions.Window

df.withColumn(
   "derivedcol",  
   max($"valcol".cast(IntegerType)).over(Window.partitionBy($"idcol")
)

取决于:

  • 基数 - 高基数很好。
  • 群体的大小分布 - 没有大的正偏差的小群体是好的。

这可能比聚合后加入更好或更差。

我应该使用什么分区逻辑来减少混乱?

可能没有。至少有两个原因:

  • 如果你有大量的小组窗口函数会很好,并且不需要额外的分区。
  • 如果您有较少数量的较大组,则应广播数据,并且只需要进行聚合以进行聚合。
  • 如果存在大量的大型组 - 您可能会考虑按ID进行预分区,但根据因素的数量,您可以同时处于松散和增益状态,并且平均无需额外的随机播放(分区)。
© www.soinside.com 2019 - 2024. All rights reserved.