最近我们已迁移到
dataproc image 2.2
版本并支持 scala 2.12.18
和 spark 3.5 version
。
package test
import org.apache.spark.sql.SparkSession
import test.Model._
object readbigquery {
def main(args: Array[String]) = {
val runLocally = true
val jobName ="Test Spark Logging Case"
implicit val spark: SparkSession = Some(SparkSession.builder.appName(jobName))
.map(sparkSessionBuilder =>
if (runLocally) sparkSessionBuilder.master("local[2]") else sparkSessionBuilder
)
.map(_.getOrCreate())
.get
val datasetConfigurationRowsRaw = getDatasetConfigurations(spark,confProjectId="test-project",mappingsDatasetName="MAPPINGS",datasetConfigurationsTableName="dataset_configurations")
println(s"datasetConfigurationRowsRaw:$datasetConfigurationRowsRaw")
}
def getDatasetConfigurations(
spark: SparkSession,
confProjectId: String,
mappingsDatasetName: String,
datasetConfigurationsTableName: String,
): Seq[DatasetConfigurationRow] = {
import org.apache.spark.sql.functions._
import spark.implicits._
spark.read
.format("bigquery")
.option("table", s"$confProjectId.$mappingsDatasetName.$datasetConfigurationsTableName")
.option("project", confProjectId)
.load()
.select(
col("technology"),
col("name"),
col("source_name"),
col("temporal_unit"),
col("regional_unit"),
col("identifier_column_names"),
col("metadata_column_mappings"),
col("column_mappings"),
col("timestamp_column_name"))
.as[DatasetConfigurationRow]
.collect()
}
}
package test
object Model {
case class DatasetConfigurationRow
(
technology: String,
name: String,
identifier_column_names: Seq[String],
column_mappings: Seq[ColumnMapping],
timestamp_column_name: String,
)
case class ColumnMapping
(
mapping_type: String,
source_column_name: String,
column_name: String,
name: String,
display_name: String,
description: String,
keep_source_column: Boolean,
formula: String,
functions: Functions
)
case class DataUnitFunction(key: String, value: String)
case class Functions
(
fun_temporal: String,
fun_regional: String,
fun_temporal_unit: Seq[DataUnitFunction],
fun_regional_unit: Seq[DataUnitFunction]
)
}
上面是实际代码的片段,与
dataproc image 2.0
和 scala 2.12.16
和 spark 3.4 version
一起工作得很好。
在这里,我们正在读取 bigquery 表并尝试将它们加载到上面定义的案例类集中。现在我们面临以下问题
Exception in thread "main" org.apache.spark.sql.AnalysisException: [UNSUPPORTED_DESERIALIZER.DATA_TYPE_MISMATCH] The deserializer is not supported: need a(n) "ARRAY" field but got "MAP<STRING, STRING>".
我确实尝试使用一些地图并更改案例类的数据类型,但没有成功。看起来加载
DataUnitFunction
时失败。
我也尝试将其读入数据帧,然后使用地图进行转换,然后将其加载到案例类中。考虑到我们的 bigquery 表的复杂性和架构,这种方法也极大地改变了我们的代码。
添加给定
bigquery
表的模式
CREATE TABLE `test-project.MAPPINGS.dataset_configurations`
(
technology STRING,
name STRING,
identifier_column_names ARRAY<STRING>,
column_mappings ARRAY<STRUCT<mapping_type STRING, source_column_name STRING, column_name STRING, name STRING, display_name STRING, description STRING, keep_source_column BOOL, formula STRING, functions STRUCT<fun_temporal STRING, fun_regional STRING, fun_temporal_unit ARRAY<STRUCT<key STRING, value STRING>>, fun_regional_unit ARRAY<STRUCT<key STRING, value STRING>>>>>,
timestamp_column_name STRING
)
;
我必须更改
case class Functions
的架构才能解决这个问题。
case class Functions
(
fun_temporal: String,
fun_regional: String,
fun_temporal_unit: Option[Map[String,String]]=None,
fun_regional_unit: Option[Map[String,String]]=None
)
下面是已成功加载到案例类集的数据框架构。
|-- technology: string (nullable = true)
|-- name: string (nullable = true)
|-- identifier_column_names: array (nullable = true)
| |-- element: string (containsNull = true)
|-- timestamp_column_name: string (nullable = true)
|-- column_mappings: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- mapping_type: string (nullable = true)
| | |-- source_column_name: string (nullable = true)
| | |-- column_name: string (nullable = true)
| | |-- name: string (nullable = true)
| | |-- display_name: string (nullable = true)
| | |-- description: string (nullable = true)
| | |-- keep_source_column: boolean (nullable = true)
| | |-- formula: string (nullable = true)
| | |-- functions: struct (nullable = true)
| | | |-- fun_temporal: string (nullable = true)
| | | |-- fun_regional: string (nullable = true)
| | | |-- fun_temporal_unit: map (nullable = false)
| | | | |-- key: string
| | | | |-- value: string (valueContainsNull = true)
| | | |-- fun_regional_unit: map (nullable = false)
| | | | |-- key: string
| | | | |-- value: string (valueContainsNull = true)