在 Scala Spark 中从具有嵌套序列的数据集创建 Dataframe 的问题

问题描述 投票:0回答:1

我正在尝试从包含嵌套序列的序列创建数据框,但出现 scala.Match 错误。

val data = Seq(("Java", Seq(Seq(true, 5L), 0, 7.5)), ("Python", Seq(Seq(true, 10L), 1, 8.5)), ("Scala", Seq(Seq(false, 8L), 2, 9.0)))

val rdd = spark.sparkContext.parallelize(data).map {
  case (l, s) => Row(l, Row.fromSeq(s.map{
    case (s: Seq[Any], i, d) => Row(Row.fromSeq(s), i, d)})) }

val schema = StructType(Seq(
  StructField("language", StringType, true),
  StructField("stats", StructType(Seq(
    StructField("users", StructType(Seq(
      StructField("active", BooleanType, true),
      StructField("level", LongType, true)
    ))),
    StructField("difficulty", IntegerType, true),
    StructField("average_review", DoubleType, true)
  )))
))

val ds = spark.createDataFrame(rdd, schema)
ds.show()

当我

ds.show()
时管道破裂 并给出错误
scala.MatchError: List(true, 5) (of class scala.collection.immutable.$colon$colon)

我怀疑问题出在我的

spark.sparkContext.parallelize(data).map...
部分代码上,但我不知道问题出在哪里。

dataframe scala apache-spark rdd
1个回答
0
投票

试试

val rdd: RDD[Row] = spark.sparkContext.parallelize(data).map {
  case (language, Seq(Seq(active, level), difficulty, average_review)) =>
    Row(language, Row(Row(active, level), difficulty, average_review))
}

//+--------+--------------------+
//|language|               stats|
//+--------+--------------------+
//|    Java| {{true, 5}, 0, 7.5}|
//|  Python|{{true, 10}, 1, 8.5}|
//|   Scala|{{false, 8}, 2, 9.0}|
//+--------+--------------------+ 

val rdd: RDD[Row] = spark.sparkContext.parallelize(data).map {
  case (language, Seq(users: Seq[Any], difficulty, average_review)) =>
    Row(language, Row(Row.fromSeq(users), difficulty, average_review))
}

val rdd: RDD[Row] = spark.sparkContext.parallelize(data).map {
  case (language, stats) =>
    Row(language, Row.fromSeq(stats.map{
      case users: Seq[Any] => Row.fromSeq(users)
      case x => x
    }))
}

case class Data(language: String, stats: Stats)
case class Stats(users: Users, difficulty: Int, average_review: Double)
case class Users(active: Boolean, level: Long)

val data1 = data.map {
  case (language, Seq(Seq(active: Boolean, level: Long), difficulty: Int, average_review: Double)) =>
    Data(language, Stats(Users(active, level), difficulty, average_review))
}

import spark.implicits._

val ds: Dataset[Data] = data1.toDS()
© www.soinside.com 2019 - 2024. All rights reserved.