从 Apache Spark Row 获取字段,该字段使用 Scala 将数组作为 Seq 包装到列表中

问题描述 投票:0回答:1

背景

  • 从delta表中获取json格式的数据
  • 使用 Apache Spark 和 Scala

数据格式

  val factories = """
      {
        "cities": {
          "name": "Sao Paulo"
          "areas": [
            {
              "code": "41939",
              "type": "downtown"
            },
            {
              "code": "48294",
              "type": "residential"
            }
          ],
        },
       
        "domains": [
            {
               "id": "19sk2nfb",
               "name" : "defense"
            }
        ]
    }

代码

这将从增量表中获取数据并创建案例类对象

fetchedData
DataFrame
使用某些条件获取

factoriesSchema
是json模式

val structuredData =
    fetchedData.withColumn(
      "StructuredFactoryJson",
      from_json(col("FactoryData"), factoriesSchema)
  )

val factories = structuredData.collect().map { row =>
      val structJson = row.getAs[Row]("StructuredFactoryJson")
      val citiesRow = structJson.getAs[Row]("cities")
      val city = City(
        citiesRow.getAs[String]("name"),
        citiesRow
          .getAs[Seq[Row]]("areas")
          .map(areaRow =>
            Area(
              area.getAs[String]("type"),
              area.getAs[String]("code")
            )
          )
      )
      val domains = structJson
        .getAs[Seq[Row]]("domains")
        .map( area ->
           Area( area.getAs
             .
             .
             .

    }


问题

效果很好,并且获得了

Seq
。但问题是,是否有办法得到
List
而不是
Seq
并按原样构建更大的对象

scala apache-spark
1个回答
0
投票

考虑到需要预先存在的类进行编码,sparkutils frameless 似乎最适合,因为它支持开箱即用的 List。

它基于优秀的无框架库,并进行了一些额外的更改(包括列表支持)。

要使用它,您必须确保不像通常那样包含 SparkSession 隐式。 请参阅此处了解示例。

© www.soinside.com 2019 - 2024. All rights reserved.