我有一个包含三列的 DataFrame:Data、Col1 和 Col2。我想使用 scala 中的 withColumn 函数根据以下逻辑创建 300 个附加列:
val outDataFrame = inputDataFrame;
var i = 0;
while (i < myObjList.size) {
val x = myObjList.get(i)
val stInd = x.getStartSplitIndex + 1
val length = x.getEndSplitIndex - x.getStartSplitIndex
val convertDataType = x.getDataType()
val outColName = x.getName()
// The following line is not working
outDataFrame .withColumn(outColName,col(myFunc(convertDataType,substring(col("Data"),stInd,length).toString())))
i += 1
}
在此代码片段中,
myObjList
是一个长度为300的对象列表,每个对象代表创建新列的规范。 myFunc
函数执行数据转换,采用两个参数:convertDataType
(字符串)和 dataToConvert
(字符串)。
stInd
和length
值源自当前对象x
,提供有关我应该使用数据列的哪个切片来生成相应的新列的信息。
我正在寻求有关实现此任务的优化方法的指导。输入DataFrame包含数十亿记录,因此性能优化至关重要。
谢谢你。
不要使用 withColumn,而是使用 select 和您在单个投影中构建的 Seq[Column] 。 Spark 并不总是优化预测,这最终会导致严重的性能损失。
Wrt。 myFunc 尽可能使用内置 Spark 函数,UDF 的速度要慢得多。 IF 在执行自定义转换逻辑时,您必须挤出每一滴性能,然后直接使用表达式并实现代码生成,但请注意,接口是 Spark 内部的,可以更改每个版本。
它可以像(取自质量)一样简单:
mport org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression}
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.unsafe.types.UTF8String
object AsUUID {
def apply(lower: Column, higher: Column): Column =
new Column(AsUUID(lower.expr, higher.expr))
}
/**
* Converts a lower and higher pair of longs into a uuid string
* @param left
* @param right
*/
case class AsUUID(left: Expression, right: Expression) extends BinaryExpression {
protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): Expression = copy(left = newLeft, right = newRight)
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode =
defineCodeGen(ctx, ev, (lower, higher) => s"org.apache.spark.unsafe.types.UTF8String.fromString(new java.util.UUID($higher, $lower).toString())")
override def dataType: DataType = StringType
override protected def nullSafeEval(lower: Any, higher: Any): Any =
UTF8String.fromString(new java.util.UUID(higher.asInstanceOf[Long], lower.asInstanceOf[Long]).toString)
}
允许您使用:
df.select(AsUUID(col("lower"), col("higher")))
但是此代码将与其他 Spark 代码生成内联执行,并且不会因转换为 Scala 类型并通过编码器再次返回 Catalyst 而造成性能损失。仅当可以带来显着节省时才采用此方法,例如如果对于 10 亿行,使用 UDF 需要 1 小时,使用表达式需要 40 分钟,那么节省 20 分钟对您来说可能是值得的(示例数字,但 UDF 通常要慢 2 倍)。除非您在其他地方使用这些表达式,否则使用内部 api 的学习曲线和障碍可能不值得在性能上获得一点小进步。
.withColumn
返回一个 new 数据帧,您基本上会丢弃该数据帧,并不断向原始数据帧添加列(并丢弃)。
您应该努力编写更惯用的代码,这将有助于避免将来出现此类问题和许多其他问题。
val newDF = myObjList.foldLeft(oldDF) { case (df, x) =>
val start = x.getStartSplitIndex + 1
val length = x.getEndSplitIndex - start + 1
df.withColumn(
x.getName,
col(
myFunc(x.getDataType, substring(col("Data"), start, length))
)
)
}