Spark 不支持解串器:需要一个“ARRAY”字段,但得到“MAP<STRING, STRING>”

问题描述 投票:0回答:1

最近我们已迁移到

dataproc image 2.2
版本并支持
scala 2.12.18
spark 3.5 version

package test

import org.apache.spark.sql.SparkSession
import test.Model._

object readbigquery   {
  def main(args: Array[String]) = {

    val runLocally = true
    val jobName ="Test Spark Logging Case"

    implicit val spark: SparkSession = Some(SparkSession.builder.appName(jobName))
      .map(sparkSessionBuilder =>
        if (runLocally) sparkSessionBuilder.master("local[2]") else sparkSessionBuilder
      )
      .map(_.getOrCreate())
      .get

    val datasetConfigurationRowsRaw = getDatasetConfigurations(spark,confProjectId="test-project",mappingsDatasetName="MAPPINGS",datasetConfigurationsTableName="dataset_configurations")
    println(s"datasetConfigurationRowsRaw:$datasetConfigurationRowsRaw")

  }

  def getDatasetConfigurations(
                                spark: SparkSession,
                                confProjectId: String,
                                mappingsDatasetName: String,
                                datasetConfigurationsTableName: String,
                              ): Seq[DatasetConfigurationRow] = {

    import org.apache.spark.sql.functions._
    import spark.implicits._
    spark.read
      .format("bigquery")
      .option("table", s"$confProjectId.$mappingsDatasetName.$datasetConfigurationsTableName")
      .option("project", confProjectId)
      .load()
      .select(
        col("technology"),
        col("name"),
        col("source_name"),
        col("temporal_unit"),
        col("regional_unit"),
        col("identifier_column_names"),
        col("metadata_column_mappings"),
        col("column_mappings"),
        col("timestamp_column_name"))
      .as[DatasetConfigurationRow]
      .collect()
  }
}

package test

object Model {

case class DatasetConfigurationRow
(
  technology: String,
  name: String,
  identifier_column_names: Seq[String],
  column_mappings: Seq[ColumnMapping],
  timestamp_column_name: String,
)


case class ColumnMapping
(
  mapping_type: String,
  source_column_name: String,
  column_name: String,
  name: String,
  display_name: String,
  description: String,
  keep_source_column: Boolean,
  formula: String,
  functions: Functions
)

case class DataUnitFunction(key: String, value: String)

case class Functions
(
  fun_temporal: String,
  fun_regional: String,
  fun_temporal_unit: Seq[DataUnitFunction],
  fun_regional_unit: Seq[DataUnitFunction]
)

}

上面是实际代码的片段,与

dataproc image 2.0
scala 2.12.16
spark 3.4 version
一起工作得很好。

在这里,我们正在读取 bigquery 表并尝试将它们加载到上面定义的案例类集中。现在我们面临以下问题

Exception in thread "main" org.apache.spark.sql.AnalysisException: [UNSUPPORTED_DESERIALIZER.DATA_TYPE_MISMATCH] The deserializer is not supported: need a(n) "ARRAY" field but got "MAP<STRING, STRING>".

我确实尝试使用一些地图并更改案例类的数据类型,但没有成功。看起来加载

DataUnitFunction
时失败。

我也尝试将其读入数据帧,然后使用地图进行转换,然后将其加载到案例类中。考虑到我们的 bigquery 表的复杂性和架构,这种方法也极大地改变了我们的代码。

添加给定

bigquery
表的模式

CREATE TABLE `test-project.MAPPINGS.dataset_configurations`
(
  technology STRING,
  name STRING,
  identifier_column_names ARRAY<STRING>,
  column_mappings ARRAY<STRUCT<mapping_type STRING, source_column_name STRING, column_name STRING, name STRING, display_name STRING, description STRING, keep_source_column BOOL, formula STRING, functions STRUCT<fun_temporal STRING, fun_regional STRING, fun_temporal_unit ARRAY<STRUCT<key STRING, value STRING>>, fun_regional_unit ARRAY<STRUCT<key STRING, value STRING>>>>>,
  timestamp_column_name STRING
)
;
scala apache-spark google-bigquery google-cloud-dataproc
1个回答
0
投票

我必须更改

case class Functions
的架构才能解决这个问题。

  case class Functions
(
  fun_temporal: String,
  fun_regional: String,
  fun_temporal_unit: Option[Map[String,String]]=None,
  fun_regional_unit: Option[Map[String,String]]=None
)

下面是已成功加载到案例类集的数据框架构。

|-- technology: string (nullable = true)
 |-- name: string (nullable = true)
 |-- identifier_column_names: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- timestamp_column_name: string (nullable = true)
 |-- column_mappings: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- mapping_type: string (nullable = true)
 |    |    |-- source_column_name: string (nullable = true)
 |    |    |-- column_name: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- display_name: string (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- keep_source_column: boolean (nullable = true)
 |    |    |-- formula: string (nullable = true)
 |    |    |-- functions: struct (nullable = true)
 |    |    |    |-- fun_temporal: string (nullable = true)
 |    |    |    |-- fun_regional: string (nullable = true)
 |    |    |    |-- fun_temporal_unit: map (nullable = false)
 |    |    |    |    |-- key: string
 |    |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |    |-- fun_regional_unit: map (nullable = false)
 |    |    |    |    |-- key: string
 |    |    |    |    |-- value: string (valueContainsNull = true)
© www.soinside.com 2019 - 2024. All rights reserved.