Azure 数据流:将相同类型的多个 JSON 对象解析为 SQL

问题描述 投票:0回答:2

我有一个如下所示的 Json,其中“报告”包含多个具有不同名称的对象。我需要使用 Azure 数据工厂将每个对象转换为数据库的行。

{
"reports": {
    "Z1-B5(CR)-L5-M": {
        "location_id": 1580,
        "date": "2023-05-24",
        "h19": 0,
        "h20": 0,
        "h21": 0,
        "h22": 0,
        "h23": 0,
        "peak_hour": 10,
        "total": 6,
        "location_name": "Z1-B5(CR)-L5-M"
    },
    "Z2-B22(FC)-L1-M": {
        "location_id": 1589,
        "date": "2023-05-24",
        "h19": 0,
        "h20": 0,
        "h21": 0,
        "h22": 0,
        "h23": 0,
        "peak_hour": 14,
        "total": 212,
        "location_name": "Z2-B22(FC)-L1-F"
    }
}

}

预期输出:

json azure azure-data-factory transform
2个回答
0
投票
  • 由于
    reports
    是一组对象,因此您必须结合使用 azure 数据流和 ADF 活动。我需要提取
    reports
    属性内的键,使用该属性创建一个对象数组,其中每个对象表示一行。
  • 我采用了一个示例 JSON 文件作为数据流的源(包含问题中给出的数据)。

enter image description here

  • 使用选择转换并使用 csv 作为接收器(没有更改,选择第一行作为标题),我已经写入了数据。以下是数据流 JSON:
{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "Json1",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "DelimitedText1",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "transformations": [
                {
                    "name": "select1"
                }
            ],
            "scriptLines": [
                "source(output(",
                "          reports as ({Z1-B5(CR)-L5-M} as (location_id as integer, date as string, h19 as integer, h20 as integer, h21 as integer, h22 as integer, h23 as integer, peak_hour as integer, total as integer, location_name as string), {Z2-B22(FC)-L1-M} as (location_id as integer, date as string, h19 as integer, h20 as integer, h21 as integer, h22 as integer, h23 as integer, peak_hour as integer, total as integer, location_name as string))",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false,",
                "     documentForm: 'singleDocument') ~> source1",
                "source1 select(mapColumn(",
                "          each(reports,match(true()))",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> select1",
                "select1 sink(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     partitionFileNames:['op.csv'],",
                "     umask: 0022,",
                "     preCommands: [],",
                "     postCommands: [],",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     saveOrder: 1,",
                "     partitionBy('hash', 1)) ~> sink1"
            ]
        }
    }
}

enter image description here

  • 如您所见,转换后,

    reports
    属性键被提取到第一行。

  • 现在,运行此数据流,对上述文件使用查找(lookup1)和数据集配置,如下图所示:

enter image description here

  • 此查找结果如下图所示:

enter image description here

  • 现在使用

    ,
    作为分隔符分割字符串,并使用生成的数组对每个循环进行迭代。对原始源文件使用第二次查找 (Lookup2) 来提取数据。

  • 在每个活动中使用追加变量活动来获取所需的对象,如下所示:

@activity('Lookup2').output.value[0].reports[item()]
  • 我使用了一个示例集变量来显示追加变量活动的结果。

enter image description here

  • 如果您的数据库是 SQL 服务器,则可以使用
    openjson
    在查询中使用上述变量数据。如果是任何其他格式,请将此变量写入新文件并使用复制数据活动复制到数据库表中。以下是管道JSON供参考。
{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "Data flow1",
                "type": "ExecuteDataFlow",
                "dependsOn": [],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "dataflow": {
                        "referenceName": "dataflow1",
                        "type": "DataFlowReference"
                    },
                    "compute": {
                        "coreCount": 8,
                        "computeType": "General"
                    },
                    "traceLevel": "None",
                    "cacheSinks": {
                        "firstRowOnly": false
                    }
                }
            },
            {
                "name": "Lookup1",
                "type": "Lookup",
                "dependsOn": [
                    {
                        "activity": "Data flow1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "DelimitedTextSource",
                        "storeSettings": {
                            "type": "AzureBlobFSReadSettings",
                            "recursive": true,
                            "enablePartitionDiscovery": false
                        },
                        "formatSettings": {
                            "type": "DelimitedTextReadSettings"
                        }
                    },
                    "dataset": {
                        "referenceName": "DelimitedText2",
                        "type": "DatasetReference"
                    }
                }
            },
            {
                "name": "ForEach1",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "Lookup1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    },
                    {
                        "activity": "Lookup2",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@split(activity('Lookup1').output.firstRow['Prop_0'],',')",
                        "type": "Expression"
                    },
                    "isSequential": true,
                    "activities": [
                        {
                            "name": "Append variable1",
                            "type": "AppendVariable",
                            "dependsOn": [],
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "final_data",
                                "value": {
                                    "value": "@activity('Lookup2').output.value[0].reports[item()]",
                                    "type": "Expression"
                                }
                            }
                        }
                    ]
                }
            },
            {
                "name": "Lookup2",
                "type": "Lookup",
                "dependsOn": [],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "JsonSource",
                        "storeSettings": {
                            "type": "AzureBlobFSReadSettings",
                            "recursive": true,
                            "enablePartitionDiscovery": false
                        },
                        "formatSettings": {
                            "type": "JsonReadSettings"
                        }
                    },
                    "dataset": {
                        "referenceName": "Json1",
                        "type": "DatasetReference"
                    },
                    "firstRowOnly": false
                }
            },
            {
                "name": "Set variable1",
                "type": "SetVariable",
                "dependsOn": [
                    {
                        "activity": "ForEach1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "variableName": "final",
                    "value": {
                        "value": "@variables('final_data')",
                        "type": "Expression"
                    }
                }
            }
        ],
        "variables": {
            "tp": {
                "type": "String"
            },
            "final_data": {
                "type": "Array"
            },
            "final": {
                "type": "Array"
            }
        },
        "annotations": []
    }
}

0
投票

这对我的项目确实有帮助,因为要求与本页中的要求相同。但是我面临另一个问题,因为在我的情况下,api 包含大约 500 个对象,因此运行了 500 次附加变量,这不好。所以不使用追加变量我们可以一次读取 500 个对象吗?

© www.soinside.com 2019 - 2024. All rights reserved.