我正在尝试展平具有 1:M 映射嵌套数组对象的 JSON。请参阅下面的 JSON 响应。
{
"result": [
{
"data": [
{
"dimensions": [
"SYNTHETIC-1"
],
"timestamps": [
17064756000,
17064752000,
17064751000,
17064750000
],
"values": [
null,
null,
100,
99.6354
]
},
{
"dimensions": [
"SYNTHETIC-2"
],
"timestamps": [
17064755000,
17064754000,
17064753000,
17064752000
],
"values": [
101,
null,
100,
99.6354
]
}
]
}
]
}
上述 JSON 响应是 Azure Synapse 中 Web 活动的输出。我使用数据流来展平此 JSON 并按第一个“结果”然后按“数据”展开。这给了我维度[]、时间戳[]和值[]数组,但它没有给出正确的输出。我得到的输出如下:
dimensions timestamps values
SYNTHETIC-1 17064756000 null
SYNTHETIC-1 17064756000 null
SYNTHETIC-1 17064756000 100
SYNTHETIC-1 17064756000 99.6354
SYNTHETIC-1 17064752000 null
SYNTHETIC-1 17064752000 null
SYNTHETIC-1 17064752000 100
SYNTHETIC-1 17064752000 99.6354
SYNTHETIC-1 17064751000 null
SYNTHETIC-1 17064751000 null
SYNTHETIC-1 17064751000 100
SYNTHETIC-1 17064751000 99.6354
SYNTHETIC-1 17064750000 null
SYNTHETIC-1 17064750000 null
SYNTHETIC-1 17064750000 100
SYNTHETIC-1 17064750000 99.6354
SYNTHETIC-2 17064755000 101
SYNTHETIC-2 17064755000 null
SYNTHETIC-2 17064755000 100
SYNTHETIC-2 17064755000 99.6354
SYNTHETIC-2 17064754000 101
SYNTHETIC-2 17064754000 null
SYNTHETIC-2 17064754000 100
SYNTHETIC-2 17064754000 99.6354
SYNTHETIC-2 17064753000 101
SYNTHETIC-2 17064753000 null
SYNTHETIC-2 17064753000 100
SYNTHETIC-2 17064753000 99.6354
SYNTHETIC-2 17064752000 101
SYNTHETIC-2 17064752000 null
SYNTHETIC-2 17064752000 100
SYNTHETIC-2 17064752000 99.6354
我期望的输出如下:
dimensions timestamps values
SYNTHETIC-1 17064756000 null
SYNTHETIC-1 17064752000 null
SYNTHETIC-1 17064751000 100
SYNTHETIC-1 17064750000 99.6354
SYNTHETIC-2 17064755000 101
SYNTHETIC-2 17064754000 null
SYNTHETIC-2 17064753000 100
SYNTHETIC-2 17064752000 99.6354
有人可以帮助我实现这个目标吗?
TIA。
目前,Dataflow 没有任何功能可以根据您的要求展平所有嵌套数组。对于您的情况,您可以尝试使用展平和连接转换组合的以下解决方法。
由于某种原因,当类型是整数数组时,您的
timestamps
数组只给我空值。所以,我将它设置为字符串数组。您需要根据源数据集架构中的要求更改此数组。
首先,使用如下所示的展平变换来展平
data
数组并将内部数组作为列。为此使用基于规则的映射。
它会给出如下结果。
然后按照上图所示进行转换。
使用
flatten2
变换来展平 dimensions
数组。在展平中,为 Unroll by和 Unroll root 选项提供
dimension
数组。它将展平这个数组。
然后进行
select2
变换并删除 timestamps
列。
从
flatten2
获取一个新分支并添加另一个选择转换 select1
并在此处删除 values
列。
现在,向 flatten3
添加另一个
展平变换
select2
并展平数组列 values
。对于 Unroll by 和 Unroll root 选项,仅给出 values
列。
类似地,将
flatten4
添加到 select1
,并在此处为 timestamps
数组列执行此操作。
之后,将 SurrogateKey 转换
surrogateKey1
转换为 flatten3
并创建一个键列 key1
并将步骤和起始值指定为 1。
类似地,对
flatten4
变换执行相同操作。也给出相同的键名。
现在,内连接两个代理键转换使用基于公共列key1
的
连接转换。
现在,它将获得所需的结果,使用另一个选择转换并删除重复的列和额外的
key1
列。
现在,您将得到想要的结果。
使用下面的数据流脚本构建您的数据流。
source(output(
result as (data as (dimensions as string[], timestamps as string[], values as string[])[])[]
),
allowSchemaDrift: true,
validateSchema: false,
ignoreNoFilesFound: false,
documentForm: 'singleDocument') ~> source1
source1 foldDown(unroll(result.data, result),
mapColumn(
each(result.data,match(true()))
),
skipDuplicateMapInputs: false,
skipDuplicateMapOutputs: false) ~> flatten1
flatten1 foldDown(unroll(dimensions, dimensions),
mapColumn(
dimensions,
timestamps,
values
),
skipDuplicateMapInputs: false,
skipDuplicateMapOutputs: false) ~> flatten2
flatten2 select(mapColumn(
dimensions,
timestamps
),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> select1
flatten2 select(mapColumn(
dimensions,
values
),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> select2
select2 foldDown(unroll(values, values),
mapColumn(
dimensions,
values
),
skipDuplicateMapInputs: false,
skipDuplicateMapOutputs: false) ~> flatten3
select1 foldDown(unroll(timestamps, timestamps),
mapColumn(
dimensions,
timestamps
),
skipDuplicateMapInputs: false,
skipDuplicateMapOutputs: false) ~> flatten4
flatten3 keyGenerate(output(key1 as long),
startAt: 1L,
stepValue: 1L) ~> surrogateKey1
flatten4 keyGenerate(output(key1 as long),
startAt: 1L,
stepValue: 1L) ~> surrogateKey2
surrogateKey1, surrogateKey2 join(surrogateKey1@key1 == surrogateKey2@key1,
joinType:'inner',
matchType:'exact',
ignoreSpaces: false,
broadcast: 'auto')~> join1
join1 select(mapColumn(
dimensions = flatten3@dimensions,
values,
dimensions = flatten4@dimensions,
timestamps
),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> select3
select3 sink(allowSchemaDrift: true,
validateSchema: false,
input(
result as (data as (dimensions as string[], timestamps as string[], values as string[])[])[]
),
umask: 0022,
preCommands: [],
postCommands: [],
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> sink1