在 Scala Spark 中使用 withColumn 和 UDF 高效地将多个列添加到 DataFrame

问题描述 投票:0回答:2

我有一个包含三列的 DataFrame:Data、Col1 和 Col2。我想使用 scala 中的 withColumn 函数根据以下逻辑创建 300 个附加列:

val outDataFrame = inputDataFrame;
var i = 0;
while (i < myObjList.size) {
    val x = myObjList.get(i)
    val stInd = x.getStartSplitIndex + 1
    val length = x.getEndSplitIndex - x.getStartSplitIndex
    val convertDataType = x.getDataType()
    val outColName = x.getName()

    // The following line is not working
    outDataFrame .withColumn(outColName,col(myFunc(convertDataType,substring(col("Data"),stInd,length).toString())))

    i += 1
}

在此代码片段中,

myObjList
是一个长度为300的对象列表,每个对象代表创建新列的规范。
myFunc
函数执行数据转换,采用两个参数:
convertDataType
(字符串)和
dataToConvert
(字符串)。

stInd
length
值源自当前对象
x
,提供有关我应该使用数据列的哪个切片来生成相应的新列的信息。

我正在寻求有关实现此任务的优化方法的指导。输入DataFrame包含数十亿记录,因此性能优化至关重要

谢谢你。

scala apache-spark user-defined-functions
2个回答
0
投票

不要使用 withColumn,而是使用 select 和您在单个投影中构建的 Seq[Column] 。 Spark 并不总是优化预测,这最终会导致严重的性能损失。

Wrt。 myFunc 尽可能使用内置 Spark 函数,UDF 的速度要慢得多。 IF 在执行自定义转换逻辑时,您必须挤出每一滴性能,然后直接使用表达式并实现代码生成,但请注意,接口是 Spark 内部的,可以更改每个版本。

它可以像(取自质量)一样简单:

mport org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression}
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.unsafe.types.UTF8String

object AsUUID {
  def apply(lower: Column, higher: Column): Column =
    new Column(AsUUID(lower.expr, higher.expr))
}

/**
 * Converts a lower and higher pair of longs into a uuid string
 * @param left
 * @param right
 */
case class AsUUID(left: Expression, right: Expression) extends BinaryExpression {
  protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): Expression = copy(left = newLeft, right = newRight)

  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode =
    defineCodeGen(ctx, ev, (lower, higher) => s"org.apache.spark.unsafe.types.UTF8String.fromString(new java.util.UUID($higher, $lower).toString())")

  override def dataType: DataType = StringType

  override protected def nullSafeEval(lower: Any, higher: Any): Any =
    UTF8String.fromString(new java.util.UUID(higher.asInstanceOf[Long], lower.asInstanceOf[Long]).toString)
}

允许您使用:

df.select(AsUUID(col("lower"), col("higher")))

但是此代码将与其他 Spark 代码生成内联执行,并且不会因转换为 Scala 类型并通过编码器再次返回 Catalyst 而造成性能损失。仅当可以带来显着节省时才采用此方法,例如如果对于 10 亿行,使用 UDF 需要 1 小时,使用表达式需要 40 分钟,那么节省 20 分钟对您来说可能是值得的(示例数字,但 UDF 通常要慢 2 倍)。除非您在其他地方使用这些表达式,否则使用内部 api 的学习曲线和障碍可能不值得在性能上获得一点小进步。


0
投票

.withColumn
返回一个 new 数据帧,您基本上会丢弃该数据帧,并不断向原始数据帧添加列(并丢弃)。

您应该努力编写更惯用的代码,这将有助于避免将来出现此类问题和许多其他问题。

val newDF = myObjList.foldLeft(oldDF) { case (df, x) => 
 val start = x.getStartSplitIndex + 1
 val length = x.getEndSplitIndex - start + 1
 df.withColumn(
   x.getName, 
   col(
     myFunc(x.getDataType, substring(col("Data"), start, length))
   )
 )
}
© www.soinside.com 2019 - 2024. All rights reserved.