我经常在 Spark 数据集行上使用
map
函数来在 Scala 中对类型对象进行转换。我通常的模式是转换从数据框转换(withColumn
、groupBy
等)创建的中间结果,并创建中间结果的类型化数据集,以便我可以使用map
。
这效果很好,但会导致大量“临时”案例类用于中间结果或笨重的元组类型。
另一种方法是在数据帧上运行
map
并使用 getAs[T]
从行中检索键入的字段,但如果 spark.implicits
是一个案例类,则这似乎不适用于 T
。
例如这给出了错误
ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to Person
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{round => colRound, min => colMin, max => colMax, _}
import org.apache.spark.sql.expressions.Window
import spark.implicits._
final case class Person(name: String, age: Integer)
val people = Seq(Person("Alice", 20), Person("Bob", 30), Person("Charlie", 40)).toDS
val df = people.alias("p")
.select($"p.name", struct($"p.*").alias("person"))
val ds = df.map(row => {
val name = row.getAs[String]("name")
val person = row.getAs[Person]("person")
(name, person)
})
display(ds)
虽然这工作正常:
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{round => colRound, min => colMin, max => colMax, _}
import org.apache.spark.sql.expressions.Window
import spark.implicits._
final case class Person(name: String, age: Integer)
val people = Seq(Person("Alice", 20), Person("Bob", 30), Person("Charlie", 40)).toDS
val df = people.alias("p")
.select($"p.name", struct($"p.*").alias("person"))
.as[Tuple2[String, Person]]
val ds = df.map(row => {
val name = row._1
val person = row._2
(name, person)
})
display(ds)
因此,spark 很高兴在第二个示例中将数据帧 person 结构转换为
Person
案例类,但在第一个示例中不会这样做。有谁知道解决这个问题的简单方法吗?
谢谢,
大卫
“简单”,可能是:),但很大程度上使用了可能会发生变化(并且已经完成)的内部API。 此代码在 Spark 4 上也无法按原样运行。
作为一种方法,它也可能比您提供的使用元组的第二个示例慢,因为 Spark 代码在输入地图代码之前从 InternalRow 转换为用户区域行。 然后,下面的代码在调用解码器之前转换回InternalRow。
resolveAndBind 在此类示例中通常没问题,但也不能保证在所有情况下都有效,因为字段名称等的解析通常需要作为查询计划完整分析的一部分进行。
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{round => colRound, min => colMin, max => colMax, _}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import spark.implicits._
implicit val pEnc = implicitly[Encoder[Person]].asInstanceOf[ExpressionEncoder[Person]]
val decoder = pEnc.resolveAndBind().objDeserializer
val people = Seq(Person("Alice", 20), Person("Bob", 30), Person("Charlie", 40)).toDS
val df = people.alias("p")
.select($"p.name", struct($"p.*").alias("person"))
val ds = df.map(row => {
val name = row.getAs[String]("name")
val personRow = row.getAs[Row]("person")
val person = decoder.eval(CatalystTypeConverters.convertToCatalyst(personRow).asInstanceOf[InternalRow]).asInstanceOf[Person]
(name, person)
})
ds.show
总而言之,您最好尽可能使用元组包装器和内置编码,它更快,并且经过设计和测试以这种方式工作。