是否可以使用扩展点在DataFrame API / SQL中添加/替换现有列表达式。
例如:假设我们注入了可以从计划中检查项目节点并检查列“名称”的解析规则,例如将其替换为upper(name)。
是否可以使用扩展点。我发现的例子大多是简单的,不会以我需要的方式操纵输入表达式。
如果可能,请告诉我。
是的,这是可能的。
让我们举个例子。假设我们要编写一个检查Project运算符的规则,如果项目是针对某个特定列(比如'column2'),那么它将它乘以2。
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._
object DoubleColumn2OptimizationRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case p: Project =>
if (p.projectList.filter(_.name == "column2").size >= 1) {
val newList = p.projectList.map { case x =>
if (x.name == "column2") {
Alias(Multiply(Literal(2, IntegerType), x), "column2_doubled")()
} else {
x
}
}
p.copy(projectList = newList)
} else {
p
}
}
}
假设我们有一个表“table1”,它有两列 - column1,column2。
没有这个规则 -
> spark.sql("select column2 from table1 limit 10").collect()
Array([1], [2], [3], [4], [5], [6], [7], [8], [9], [10])
有这个规则 -
> spark.experimental.extraOptimizations = Seq(DoubleColumn2OptimizationRule)
> spark.sql("select column2 from table1 limit 10").collect()
Array([2], [4], [6], [8], [10], [12], [14], [16], [18], [20])
您也可以在DataFrame上调用explain来检查计划 -
> spark.sql("select column2 from table1 limit 10").explain == Physical Plan == CollectLimit 10 +- *(1) LocalLimit 10 +- *(1) Project [(2 * column2#213) AS column2_doubled#214] +- HiveTableScan [column2#213], HiveTableRelation `default`.`table1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [column1#212, column2#213]