我有一个如下所示的 Json,其中“报告”包含多个具有不同名称的对象。我需要使用 Azure 数据工厂将每个对象转换为数据库的行。
{
"reports": {
"Z1-B5(CR)-L5-M": {
"location_id": 1580,
"date": "2023-05-24",
"h19": 0,
"h20": 0,
"h21": 0,
"h22": 0,
"h23": 0,
"peak_hour": 10,
"total": 6,
"location_name": "Z1-B5(CR)-L5-M"
},
"Z2-B22(FC)-L1-M": {
"location_id": 1589,
"date": "2023-05-24",
"h19": 0,
"h20": 0,
"h21": 0,
"h22": 0,
"h23": 0,
"peak_hour": 14,
"total": 212,
"location_name": "Z2-B22(FC)-L1-F"
}
}
}
预期输出:
reports
是一组对象,因此您必须结合使用 azure 数据流和 ADF 活动。我需要提取 reports
属性内的键,使用该属性创建一个对象数组,其中每个对象表示一行。{
"name": "dataflow1",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "Json1",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "DelimitedText1",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [
{
"name": "select1"
}
],
"scriptLines": [
"source(output(",
" reports as ({Z1-B5(CR)-L5-M} as (location_id as integer, date as string, h19 as integer, h20 as integer, h21 as integer, h22 as integer, h23 as integer, peak_hour as integer, total as integer, location_name as string), {Z2-B22(FC)-L1-M} as (location_id as integer, date as string, h19 as integer, h20 as integer, h21 as integer, h22 as integer, h23 as integer, peak_hour as integer, total as integer, location_name as string))",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false,",
" documentForm: 'singleDocument') ~> source1",
"source1 select(mapColumn(",
" each(reports,match(true()))",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> select1",
"select1 sink(allowSchemaDrift: true,",
" validateSchema: false,",
" partitionFileNames:['op.csv'],",
" umask: 0022,",
" preCommands: [],",
" postCommands: [],",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" saveOrder: 1,",
" partitionBy('hash', 1)) ~> sink1"
]
}
}
}
如您所见,转换后,
reports
属性键被提取到第一行。
现在,运行此数据流,对上述文件使用查找(lookup1)和数据集配置,如下图所示:
现在使用
,
作为分隔符分割字符串,并使用生成的数组对每个循环进行迭代。对原始源文件使用第二次查找 (Lookup2) 来提取数据。
在每个活动中使用追加变量活动来获取所需的对象,如下所示:
@activity('Lookup2').output.value[0].reports[item()]
openjson
在查询中使用上述变量数据。如果是任何其他格式,请将此变量写入新文件并使用复制数据活动复制到数据库表中。以下是管道JSON供参考。{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "Data flow1",
"type": "ExecuteDataFlow",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"dataflow": {
"referenceName": "dataflow1",
"type": "DataFlowReference"
},
"compute": {
"coreCount": 8,
"computeType": "General"
},
"traceLevel": "None",
"cacheSinks": {
"firstRowOnly": false
}
}
},
{
"name": "Lookup1",
"type": "Lookup",
"dependsOn": [
{
"activity": "Data flow1",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "DelimitedTextSource",
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "DelimitedTextReadSettings"
}
},
"dataset": {
"referenceName": "DelimitedText2",
"type": "DatasetReference"
}
}
},
{
"name": "ForEach1",
"type": "ForEach",
"dependsOn": [
{
"activity": "Lookup1",
"dependencyConditions": [
"Succeeded"
]
},
{
"activity": "Lookup2",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@split(activity('Lookup1').output.firstRow['Prop_0'],',')",
"type": "Expression"
},
"isSequential": true,
"activities": [
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "final_data",
"value": {
"value": "@activity('Lookup2').output.value[0].reports[item()]",
"type": "Expression"
}
}
}
]
}
},
{
"name": "Lookup2",
"type": "Lookup",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "JsonSource",
"storeSettings": {
"type": "AzureBlobFSReadSettings",
"recursive": true,
"enablePartitionDiscovery": false
},
"formatSettings": {
"type": "JsonReadSettings"
}
},
"dataset": {
"referenceName": "Json1",
"type": "DatasetReference"
},
"firstRowOnly": false
}
},
{
"name": "Set variable1",
"type": "SetVariable",
"dependsOn": [
{
"activity": "ForEach1",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"variableName": "final",
"value": {
"value": "@variables('final_data')",
"type": "Expression"
}
}
}
],
"variables": {
"tp": {
"type": "String"
},
"final_data": {
"type": "Array"
},
"final": {
"type": "Array"
}
},
"annotations": []
}
}
这对我的项目确实有帮助,因为要求与本页中的要求相同。但是我面临另一个问题,因为在我的情况下,api 包含大约 500 个对象,因此运行了 500 次附加变量,这不好。所以不使用追加变量我们可以一次读取 500 个对象吗?