在 Azure Synapse 管道中展平 JSON 嵌套数组

问题描述 投票:0回答:1

我正在尝试展平具有 1:M 映射嵌套数组对象的 JSON。请参阅下面的 JSON 响应。

{
    "result": [
        {
            "data": [
                {
                    "dimensions": [
                        "SYNTHETIC-1"
                    ],
                    "timestamps": [
                        17064756000,
                        17064752000,
                        17064751000,
                        17064750000
                    ],
                    "values": [
                        null,
                        null,
                        100,
                        99.6354
                    ]
                },
                {
                    "dimensions": [
                        "SYNTHETIC-2"
                    ],
                    "timestamps": [
                        17064755000,
                        17064754000,
                        17064753000,
                        17064752000
                    ],
                    "values": [
                        101,
                        null,
                        100,
                        99.6354
                    ]
                }
            ]
        }
    ]
}

上述 JSON 响应是 Azure Synapse 中 Web 活动的输出。我使用数据流来展平此 JSON 并按第一个“结果”然后按“数据”展开。这给了我维度[]、时间戳[]和值[]数组,但它没有给出正确的输出。我得到的输出如下:

dimensions    timestamps    values
SYNTHETIC-1   17064756000   null
SYNTHETIC-1   17064756000   null
SYNTHETIC-1   17064756000   100
SYNTHETIC-1   17064756000   99.6354
SYNTHETIC-1   17064752000   null
SYNTHETIC-1   17064752000   null
SYNTHETIC-1   17064752000   100
SYNTHETIC-1   17064752000   99.6354
SYNTHETIC-1   17064751000   null
SYNTHETIC-1   17064751000   null
SYNTHETIC-1   17064751000   100
SYNTHETIC-1   17064751000   99.6354
SYNTHETIC-1   17064750000   null
SYNTHETIC-1   17064750000   null
SYNTHETIC-1   17064750000   100
SYNTHETIC-1   17064750000   99.6354
SYNTHETIC-2   17064755000   101
SYNTHETIC-2   17064755000   null
SYNTHETIC-2   17064755000   100
SYNTHETIC-2   17064755000   99.6354
SYNTHETIC-2   17064754000   101
SYNTHETIC-2   17064754000   null
SYNTHETIC-2   17064754000   100
SYNTHETIC-2   17064754000   99.6354
SYNTHETIC-2   17064753000   101
SYNTHETIC-2   17064753000   null
SYNTHETIC-2   17064753000   100
SYNTHETIC-2   17064753000   99.6354
SYNTHETIC-2   17064752000   101
SYNTHETIC-2   17064752000   null
SYNTHETIC-2   17064752000   100
SYNTHETIC-2   17064752000   99.6354

我期望的输出如下:

dimensions    timestamps    values
SYNTHETIC-1   17064756000   null
SYNTHETIC-1   17064752000   null
SYNTHETIC-1   17064751000   100
SYNTHETIC-1   17064750000   99.6354
SYNTHETIC-2   17064755000   101
SYNTHETIC-2   17064754000   null
SYNTHETIC-2   17064753000   100
SYNTHETIC-2   17064752000   99.6354

有人可以帮助我实现这个目标吗?

TIA。

json azure google-cloud-dataflow flatten azure-synapse-pipeline
1个回答
0
投票

目前,Dataflow 没有任何功能可以根据您的要求展平所有嵌套数组。对于您的情况,您可以尝试使用展平和连接转换组合的以下解决方法。

由于某种原因,当类型是整数数组时,您的

timestamps
数组只给我空值。所以,我将它设置为字符串数组。您需要根据源数据集架构中的要求更改此数组。

首先,使用如下所示的展平变换来展平

data
数组并将内部数组作为列。为此使用基于规则的映射。

enter image description here

它会给出如下结果。

enter image description here

然后按照上图所示进行转换。

  • 使用

    flatten2
    变换来展平
    dimensions
    数组。在展平中,为
    Unroll by
    Unroll root 选项提供 dimension 数组。它将展平这个数组。

    enter image description here

  • 然后进行

    select2
    变换并删除
    timestamps
    列。

  • flatten2
    获取一个新分支并添加另一个选择转换
    select1
    并在此处删除
    values
    列。

  • 现在,向 flatten3 添加另一个

    展平变换
    select2
    并展平数组列
    values
    。对于 Unroll byUnroll root 选项,仅给出
    values
    列。

  • 类似地,将

    flatten4
    添加到
    select1
    ,并在此处为
    timestamps
    数组列执行此操作。

  • 之后,将 SurrogateKey 转换

    surrogateKey1
    转换为
    flatten3
    并创建一个键列
    key1
    并将步骤和起始值指定为 1。

    enter image description here

  • 类似地,对

    flatten4
    变换执行相同操作。也给出相同的键名。

  • 现在,内连接两个代理键转换使用基于公共列key1

    连接转换

    enter image description here

  • 现在,它将获得所需的结果,使用另一个选择转换并删除重复的列和额外的

    key1
    列。

现在,您将得到想要的结果。

enter image description here

使用下面的数据流脚本构建您的数据流

source(output(
        result as (data as (dimensions as string[], timestamps as string[], values as string[])[])[]
    ),
    allowSchemaDrift: true,
    validateSchema: false,
    ignoreNoFilesFound: false,
    documentForm: 'singleDocument') ~> source1
source1 foldDown(unroll(result.data, result),
    mapColumn(
        each(result.data,match(true()))
    ),
    skipDuplicateMapInputs: false,
    skipDuplicateMapOutputs: false) ~> flatten1
flatten1 foldDown(unroll(dimensions, dimensions),
    mapColumn(
        dimensions,
        timestamps,
        values
    ),
    skipDuplicateMapInputs: false,
    skipDuplicateMapOutputs: false) ~> flatten2
flatten2 select(mapColumn(
        dimensions,
        timestamps
    ),
    skipDuplicateMapInputs: true,
    skipDuplicateMapOutputs: true) ~> select1
flatten2 select(mapColumn(
        dimensions,
        values
    ),
    skipDuplicateMapInputs: true,
    skipDuplicateMapOutputs: true) ~> select2
select2 foldDown(unroll(values, values),
    mapColumn(
        dimensions,
        values
    ),
    skipDuplicateMapInputs: false,
    skipDuplicateMapOutputs: false) ~> flatten3
select1 foldDown(unroll(timestamps, timestamps),
    mapColumn(
        dimensions,
        timestamps
    ),
    skipDuplicateMapInputs: false,
    skipDuplicateMapOutputs: false) ~> flatten4
flatten3 keyGenerate(output(key1 as long),
    startAt: 1L,
    stepValue: 1L) ~> surrogateKey1
flatten4 keyGenerate(output(key1 as long),
    startAt: 1L,
    stepValue: 1L) ~> surrogateKey2
surrogateKey1, surrogateKey2 join(surrogateKey1@key1 == surrogateKey2@key1,
    joinType:'inner',
    matchType:'exact',
    ignoreSpaces: false,
    broadcast: 'auto')~> join1
join1 select(mapColumn(
        dimensions = flatten3@dimensions,
        values,
        dimensions = flatten4@dimensions,
        timestamps
    ),
    skipDuplicateMapInputs: true,
    skipDuplicateMapOutputs: true) ~> select3
select3 sink(allowSchemaDrift: true,
    validateSchema: false,
    input(
        result as (data as (dimensions as string[], timestamps as string[], values as string[])[])[]
    ),
    umask: 0022,
    preCommands: [],
    postCommands: [],
    skipDuplicateMapInputs: true,
    skipDuplicateMapOutputs: true) ~> sink1
© www.soinside.com 2019 - 2024. All rights reserved.