我有一个使用 Azure 数据工厂处理 JSON API 的业务需求,最好避免使用 Azure Functions 等替代的基于代码的函数。 API 的形式如下:
{
"parentProperty": {
"key1": {
"key2": {
"property1": x,
"property2: y
},
"key3": {
"property1": z,
"property2: a
}
},
"key4": {
"key5": {
"property1": b,
"property2: c
},
"key6": {
"property1": d,
"property2: e
}
}
}
}
这需要转换成这样的表格:
第1栏 | 第2栏 | 第3栏 | 第4栏 |
---|---|---|---|
键1 | 键2 | x | y |
键1 | 键3 | z | a |
键4 | 键5 | b | c |
键4 | 键6 | d | e |
这在函数等代码中非常容易,但 Azure 数据工厂中的运算符对于此类转换似乎很有限。有没有办法使用原生运算符来转换此 JSON?
更新 在当前测试下,源和汇是:
来源:JSON Blob 存储 接收器:分隔文本 Blob 存储
尝试转型
您可以在数据流中使用 select、衍生列和过滤器转换的组合来实现您的要求,如下所示。
但是这种方法仅适用于这种结构(3级)并且会涉及一些手动操作。
在数据流中遵循以下转换。
源 - 在此处添加您的 JSON 数据集,并确保在源的 JSON 设置中选择 单个文档。
派生 Column2 - 创建列
myjson
并使用表达式 parentProperty
存储来自 toString(parentProperty)
的 JSON 字符串。
select1 - 使用基于规则的映射提取
parentProperty
对象中的内部键作为选择转换中的新列。
派生 Column1 - 创建 2 个名为 columns
和
names
的列。
columns
列将存储提取的列名称的数组,这些列名称是第一级键名称,而
names
列将以相同的顺序存储JSON中所有键的数组。使用以下表达式。
columns - filter(columnNames(),(#item!='myjson'))
names - slice(map(split(myjson,'":{'),split(#item,'"')[size(split(#item,'"'))]),1,size(map(split(myjson,'":{'),split(#item,'"')[size(split(#item,'"'))]))-1)
select2 - 从 select1 中删除提取的列,仅保留列 myjson
、
columns
、
names
。
衍生列3 - 展开 columns
数组列并将结果存储在带有表达式
column1
的新列
toString(unfold(columns))
中。这将包含第一级键名称。
衍生列4 - 类似地展开 names
数组列,并将结果存储在带有表达式
column2
的新列
toString(unfold(columns))
中。这将是第二级键名称,但会有额外的数据。这将给出如下结果。这里,
column1
和
column2
包含第一级和第二级键,但会有重复和额外的组合。
filter1 - 这将通过比较两列来过滤掉重复的键行,并且还将删除 column2
中的第一级键名称。为此,请使用
(column1!=column2) && (!contains(columns,#item==column2))
表达式。
filter2 - 这将过滤掉第一级和第二级键的正确组合行。使用下面的表达式。
(toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column2)>0),'-')[2])>toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column1)>0),'-')[2]))&&(divide(size(names),size(columns)))>(minus(toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column2)>0),'-')[2]),toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column1)>0),'-')[2])))
它会给出如下结果。
衍生列5 - 创建新列 column3
和
column4
以获取内部值。为此,请使用以下表达式。
column3 - replace(split(split(split(myjson,column2)[2],':')[3],',')[1],'"','')
column4 - replace(replace(split(split(split(myjson,column2)[2],':')[4],',')[1],'"',''),'}','')
这将给出所需的列。
结果 csv 文件:
这是我的数据流JSON供您参考:
{
"name": "dataflow6",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "connstruct_json",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "construct_csv",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [
{
"name": "select1"
},
{
"name": "derivedColumn1"
},
{
"name": "derivedColumn2"
},
{
"name": "select2"
},
{
"name": "derivedColumn3"
},
{
"name": "derivedColumn4"
},
{
"name": "filter1"
},
{
"name": "filter2"
},
{
"name": "derivedColumn5"
},
{
"name": "select3"
}
],
"scriptLines": [
"source(output(",
" parentProperty as (key1 as (key2 as (property1 as string, property2 as string), key3 as (property1 as string, property2 as string)), key4 as (key5 as (property1 as string, property2 as string), key6 as (property1 as string, property2 as string)))",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false,",
" documentForm: 'singleDocument') ~> source1",
"derivedColumn2 select(mapColumn(",
" each(parentProperty,match(true())),",
" myjson",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> select1",
"select1 derive(columns = filter(columnNames(),(#item!='myjson')),",
" names = slice(map(split(myjson,'\":{'),split(#item,'\"')[size(split(#item,'\"'))]),1,size(map(split(myjson,'\":{'),split(#item,'\"')[size(split(#item,'\"'))]))-1)) ~> derivedColumn1",
"source1 derive(myjson = toString(parentProperty)) ~> derivedColumn2",
"derivedColumn1 select(mapColumn(",
" myjson,",
" columns,",
" names",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> select2",
"select2 derive(column1 = toString(unfold(columns))) ~> derivedColumn3",
"derivedColumn3 derive(column2 = toString(unfold(names))) ~> derivedColumn4",
"derivedColumn4 filter((column1!=column2) && (!contains(columns,#item==column2))) ~> filter1",
"filter1 filter((toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column2)>0),'-')[2])>toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column1)>0),'-')[2]))&&(divide(size(names),size(columns)))>(minus(toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column2)>0),'-')[2]),toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column1)>0),'-')[2])))) ~> filter2",
"filter2 derive(column3 = replace(split(split(split(myjson,column2)[2],':')[3],',')[1],'\"',''),",
" column4 = replace(replace(split(split(split(myjson,column2)[2],':')[4],',')[1],'\"',''),'}','')) ~> derivedColumn5",
"derivedColumn5 select(mapColumn(",
" column1,",
" column2,",
" column3,",
" column4",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> select3",
"select3 sink(allowSchemaDrift: true,",
" validateSchema: false,",
" umask: 0022,",
" preCommands: [],",
" postCommands: [],",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> sink1"
]
}
}
}