需要解决的问题
需要从delta表中提取json字符串并最终解析它。
show
函数可以用来查看数据,但需要将其提取到地图或案例类中进行处理。
数据从json文件插入到delta表中。表中的列的类型为
String
。在此过程中,仅数据存储在增量表列中,而不是 json 数组中的键值。所以 select
查询会返回 partial
数据。这是增量表的默认行为吗?
例如
要存储的数据 { "name" : "Sample"} 实际存储的内容:{“Sample”}
背景
已经参考了以下SO问题并按照那里的说明进行操作,但找不到解决方案
数据已从 json 文件插入到增量表中
其中一个字段是 json 数组,它应该按原样插入
val jsonData = spark.read
.option("multiLine", true)
.option("mode",
"PERMISSIVE"
).option(
"dropFieldIfAllNull",
false
).json(FILE_PATH)
.createOrReplaceTempView("data")
val data = spark.sql("""SELECT * from Data""")
val inputDataFrame = data
.select(
col("Id"),
col("Version"),
col("StartDate"),
col("EndDate"),
explode(col("Configuration")).alias(
"Config"
)
)
.withColumn("ConfigId", col("Config.Id"))
.
.
.
.toDF
val deltaTable = DeltaTable.forPath(.....)
deltaTable
.as("Config")
.merge(
inputDataFrame.as("input"),
)
.whenMatched
.update(
Map(
"ConfigData" -> col("input.Config"),
)
)
.whenNotMatched
.insert(
Map(
"ConfigData" -> col("input.Config"),
)
)
.execute()
}
运行
select
查询后,数据显示没有键,仅插入了 json 字符串中的值
方法一
val arrayOffStringSchema = ArrayType(StringType)
var configData = df.select("Config").as[String]
configData.show(false) // shows Config column and data without keys
var desiredData = configData.withColumn("Config",from_json(col("Config"), arrayofStringSchema))
desiredData.show(false) // shows null
方法2
val json_schema = spark.read
.option("multiLine", true)
.option(
"mode",
"PERMISSIVE"
)
.option(
"dropFieldIfAllNull",
false
).json(df.select("Config").as[String]).schema
var config = configData.withColumn("Config", from_json(col("Config"),json_schema))
config.show(false) // shows Config column and data without keys
config.printSchema() // Shows following
/*
root
|-- Config: struct (nullable = true)
| |-- _corrupt_record: string (nullable = true)
*/
平台
Scala 2.12.18
阿帕奇火花3.5.1
这里缺少什么?插入有问题还是有其他方法可以实现此目的?
编辑1 显然,当存储数据时,会自动推断模式,并且不会将其存储在增量表中。因此,在获取数据时,不知道模式来自哪里。基本上需要一个 Map,它将能够从存储为增量表中列的 json 字符串中获取字段。
让我简单地解释一下这个问题
正在读取 json 文件中的数据并将其插入到 Delta Table 列中。
该列的类型为 String
据观察,当包含 json 数据的数据帧插入 Delta 表列时,它会自动推断架构,并且 json 结构中的所有键都会被删除,只存储值
解析数据变得困难,因为数据是稍后从增量表列中读取的。
解决方案
to_json:用于将数据帧列转换为json并存储到增量表列中
from_json:用于将数据帧列转换为结构化json
val json_schema = spark.read
.option("multiLine", true)
.option(
"mode",
"PERMISSIVE"
)
.option(
"dropFieldIfAllNull",
false
).json(df.select("Config").as[String]).schema
var struct_json = levers.withColumn("StructuredConfig", from_json(col("Config"),json_schema))
struct_json.printSchema // shows desired schema
现在可以从数据框中选择
StructuredConfig.name
或StructuredConfig.id
,这在以前是不可能的