假设我们有以下 json 结构:
{
"positions": {
"node": "abc"
}
"submissions" :{
"submissionOffsets":[
{
"attributeName": "sample1",
"attributeValue": 1224
},
{
"attributeName": "sample2",
"attributeValue": 1224
},
{
"attributeName": "sample3",
"attributeValue": 1224
},
{
"attributeName": "sample4",
"attributeValue": 1224
}
}
}
我们想要读取“submissionOffsets”并根据属性名称提取 attributeName 和 attributeValue,例如“sample1”,预期的结构应该是
{
"positions": {
"node": "abc"
}
"submissions" :{
"submissionOffsets":[
{
"attributeName": "sample1",
"attributeValue": 1224
},
{
"attributeName": "sample2",
"attributeValue": 1224
},
{
"attributeName": "sample3",
"attributeValue": 1224
},
{
"attributeName": "sample4",
"attributeValue": 1224
}
},
"attributeName": "sample1",
"attributeValue": 1224
}
这必须在数据框中完成
我正在尝试使用数据框,我分解了submissions.submissionOffsets,然后检查属性名称和值,但这给出了一列,我必须将其连接回原始数据框。
filter
高阶函数,用于从嵌套数组 json 中过滤特定属性。
inline
或 inline_outer
- 分解数组值。
以下是示例代码
scala> val df = spark
.read
.option("multiLine", "true")
.json(Seq(data).toDS)
df: org.apache.spark.sql.DataFrame = [positions: struct<node: string>, submissions: struct<submissionOffsets: array<struct<attributeName:string,attributeValue:bigint>>>]
scala> df.show(false)
+---------+----------------------------------------------------------------------+
|positions|submissions |
+---------+----------------------------------------------------------------------+
|{abc} |{[{sample1, 1224}, {sample2, 1224}, {sample3, 1224}, {sample4, 1224}]}|
+---------+----------------------------------------------------------------------+
scala> df.printSchema
root
|-- positions: struct (nullable = true)
| |-- node: string (nullable = true)
|-- submissions: struct (nullable = true)
| |-- submissionOffsets: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- attributeName: string (nullable = true)
| | | |-- attributeValue: long (nullable = true)
scala>
// filter higher order function to filter nested json array values / attributes
// inline_outer is to explode array values
df
.selectExpr(
"*", // to select all columns from the dataset / dataframe
"inline_outer(filter(submissions.submissionOffsets, i -> i.attributeName == 'sample1')) as (attributeName, attributeValue)"
)
.toJSON.show(false)
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"positions":{"node":"abc"},"submissions":{"submissionOffsets":[{"attributeName":"sample1","attributeValue":1224},{"attributeName":"sample2","attributeValue":1224},{"attributeName":"sample3","attributeValue":1224},{"attributeName":"sample4","attributeValue":1224}]},"attributeName":"sample1","attributeValue":1224}|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+