我正在尝试从包含嵌套序列的序列创建数据框,但出现 scala.Match 错误。
val data = Seq(("Java", Seq(Seq(true, 5L), 0, 7.5)), ("Python", Seq(Seq(true, 10L), 1, 8.5)), ("Scala", Seq(Seq(false, 8L), 2, 9.0)))
val rdd = spark.sparkContext.parallelize(data).map {
case (l, s) => Row(l, Row.fromSeq(s.map{
case (s: Seq[Any], i, d) => Row(Row.fromSeq(s), i, d)})) }
val schema = StructType(Seq(
StructField("language", StringType, true),
StructField("stats", StructType(Seq(
StructField("users", StructType(Seq(
StructField("active", BooleanType, true),
StructField("level", LongType, true)
))),
StructField("difficulty", IntegerType, true),
StructField("average_review", DoubleType, true)
)))
))
val ds = spark.createDataFrame(rdd, schema)
ds.show()
当我
ds.show()
时管道破裂
并给出错误scala.MatchError: List(true, 5) (of class scala.collection.immutable.$colon$colon)
我怀疑问题出在我的
spark.sparkContext.parallelize(data).map...
部分代码上,但我不知道问题出在哪里。
试试
val rdd: RDD[Row] = spark.sparkContext.parallelize(data).map {
case (language, Seq(Seq(active, level), difficulty, average_review)) =>
Row(language, Row(Row(active, level), difficulty, average_review))
}
//+--------+--------------------+
//|language| stats|
//+--------+--------------------+
//| Java| {{true, 5}, 0, 7.5}|
//| Python|{{true, 10}, 1, 8.5}|
//| Scala|{{false, 8}, 2, 9.0}|
//+--------+--------------------+
或
val rdd: RDD[Row] = spark.sparkContext.parallelize(data).map {
case (language, Seq(users: Seq[Any], difficulty, average_review)) =>
Row(language, Row(Row.fromSeq(users), difficulty, average_review))
}
或
val rdd: RDD[Row] = spark.sparkContext.parallelize(data).map {
case (language, stats) =>
Row(language, Row.fromSeq(stats.map{
case users: Seq[Any] => Row.fromSeq(users)
case x => x
}))
}
或
case class Data(language: String, stats: Stats)
case class Stats(users: Users, difficulty: Int, average_review: Double)
case class Users(active: Boolean, level: Long)
val data1 = data.map {
case (language, Seq(Seq(active: Boolean, level: Long), difficulty: Int, average_review: Double)) =>
Data(language, Stats(Users(active, level), difficulty, average_review))
}
import spark.implicits._
val ds: Dataset[Data] = data1.toDS()