使用 ADF 进行 JSON 重组

问题描述 投票:0回答:1

我有一个使用 Azure 数据工厂处理 JSON API 的业务需求,最好避免使用 Azure Functions 等替代的基于代码的函数。 API 的形式如下:

{ 
  "parentProperty": {
       "key1": {
           "key2": {
               "property1": x,
               "property2: y
           },
           "key3": {
               "property1": z,
               "property2: a
           }
       },
       "key4": {
           "key5": {
               "property1": b,
               "property2: c
           },
           "key6": {
               "property1": d,
               "property2: e
           }
       }
  }
}

这需要转换成这样的表格:

第1栏 第2栏 第3栏 第4栏
键1 键2 x y
键1 键3 z a
键4 键5 b c
键4 键6 d e

这在函数等代码中非常容易,但 Azure 数据工厂中的运算符对于此类转换似乎很有限。有没有办法使用原生运算符来转换此 JSON?

更新 在当前测试下,源和汇是:

来源:JSON Blob 存储 接收器:分隔文本 Blob 存储

尝试转型

  • 选择列 - 无法执行此操作,因为顶级属性的值可能会更改。这个数据结构更像是一个地图/字典,我们无法确定它的结构。
  • 派生列 - 类似地 - 这个运算符是有问题的,因为除非已知 JSON 属性结构,否则它无法工作。
json azure-data-factory
1个回答
0
投票

您可以在数据流中使用 select、衍生列和过滤器转换的组合来实现您的要求,如下所示。

但是这种方法仅适用于这种结构(3级)并且会涉及一些手动操作。

在数据流中遵循以下转换。

  • - 在此处添加您的 JSON 数据集,并确保在源的 JSON 设置中选择 单个文档

  • 派生 Column2 - 创建列

    myjson
    并使用表达式
    parentProperty
    存储来自
    toString(parentProperty)
    的 JSON 字符串。

  • select1 - 使用基于规则的映射提取

    parentProperty
    对象中的内部键作为选择转换中的新列
    enter image description here

  • 派生 Column1 - 创建 2 个名为 columns

    names
     的列。 
    columns
    列将存储提取的列名称的数组,这些列名称是第一级键名称,而
    names
    列将以相同的顺序存储JSON中所有键的数组。
    使用以下表达式。

    columns - filter(columnNames(),(#item!='myjson')) names - slice(map(split(myjson,'":{'),split(#item,'"')[size(split(#item,'"'))]),1,size(map(split(myjson,'":{'),split(#item,'"')[size(split(#item,'"'))]))-1)
    
    
  • select2 - 从 select1 中删除提取的列,仅保留列 myjson

    columns
    names

  • 衍生列3 - 展开 columns

     数组列并将结果存储在带有表达式 
    column1
     的新列 
    toString(unfold(columns))
     中。这将包含第一级键名称。

  • 衍生列4 - 类似地展开 names

     数组列,并将结果存储在带有表达式 
    column2
     的新列 
    toString(unfold(columns))
     中。这将是第二级键名称,但会有额外的数据。这将给出如下结果。这里,
    column1
    column2
    包含第一级和第二级键,但会有重复和额外的组合。
    

    enter image description here

  • filter1 - 这将通过比较两列来过滤掉重复的键行,并且还将删除 column2

     中的第一级键名称。为此,请使用 
    (column1!=column2) && (!contains(columns,#item==column2))
     表达式。

  • filter2 - 这将过滤掉第一级和第二级键的正确组合行。使用下面的表达式。

    (toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column2)>0),'-')[2])>toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column1)>0),'-')[2]))&&(divide(size(names),size(columns)))>(minus(toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column2)>0),'-')[2]),toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column1)>0),'-')[2])))
    它会给出如下结果。

    enter image description here

  • 衍生列5 - 创建新列 column3

    column4
     以获取内部值。为此,请使用以下表达式。

    column3 - replace(split(split(split(myjson,column2)[2],':')[3],',')[1],'"','') column4 - replace(replace(split(split(split(myjson,column2)[2],':')[4],',')[1],'"',''),'}','')
    这将给出所需的列。

    enter image description here


  • select3 - 删除此处多余的列。
  • sink1 - 添加分隔文本数据集并从管道执行数据流。

结果 csv 文件:

enter image description here

这是我的数据流JSON供您参考:

{ "name": "dataflow6", "properties": { "type": "MappingDataFlow", "typeProperties": { "sources": [ { "dataset": { "referenceName": "connstruct_json", "type": "DatasetReference" }, "name": "source1" } ], "sinks": [ { "dataset": { "referenceName": "construct_csv", "type": "DatasetReference" }, "name": "sink1" } ], "transformations": [ { "name": "select1" }, { "name": "derivedColumn1" }, { "name": "derivedColumn2" }, { "name": "select2" }, { "name": "derivedColumn3" }, { "name": "derivedColumn4" }, { "name": "filter1" }, { "name": "filter2" }, { "name": "derivedColumn5" }, { "name": "select3" } ], "scriptLines": [ "source(output(", " parentProperty as (key1 as (key2 as (property1 as string, property2 as string), key3 as (property1 as string, property2 as string)), key4 as (key5 as (property1 as string, property2 as string), key6 as (property1 as string, property2 as string)))", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " ignoreNoFilesFound: false,", " documentForm: 'singleDocument') ~> source1", "derivedColumn2 select(mapColumn(", " each(parentProperty,match(true())),", " myjson", " ),", " skipDuplicateMapInputs: true,", " skipDuplicateMapOutputs: true) ~> select1", "select1 derive(columns = filter(columnNames(),(#item!='myjson')),", " names = slice(map(split(myjson,'\":{'),split(#item,'\"')[size(split(#item,'\"'))]),1,size(map(split(myjson,'\":{'),split(#item,'\"')[size(split(#item,'\"'))]))-1)) ~> derivedColumn1", "source1 derive(myjson = toString(parentProperty)) ~> derivedColumn2", "derivedColumn1 select(mapColumn(", " myjson,", " columns,", " names", " ),", " skipDuplicateMapInputs: true,", " skipDuplicateMapOutputs: true) ~> select2", "select2 derive(column1 = toString(unfold(columns))) ~> derivedColumn3", "derivedColumn3 derive(column2 = toString(unfold(names))) ~> derivedColumn4", "derivedColumn4 filter((column1!=column2) && (!contains(columns,#item==column2))) ~> filter1", "filter1 filter((toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column2)>0),'-')[2])>toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column1)>0),'-')[2]))&&(divide(size(names),size(columns)))>(minus(toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column2)>0),'-')[2]),toInteger(split(find(mapIndex(names,concat(#item,'-',toString(#index))),instr(#item,column1)>0),'-')[2])))) ~> filter2", "filter2 derive(column3 = replace(split(split(split(myjson,column2)[2],':')[3],',')[1],'\"',''),", " column4 = replace(replace(split(split(split(myjson,column2)[2],':')[4],',')[1],'\"',''),'}','')) ~> derivedColumn5", "derivedColumn5 select(mapColumn(", " column1,", " column2,", " column3,", " column4", " ),", " skipDuplicateMapInputs: true,", " skipDuplicateMapOutputs: true) ~> select3", "select3 sink(allowSchemaDrift: true,", " validateSchema: false,", " umask: 0022,", " preCommands: [],", " postCommands: [],", " skipDuplicateMapInputs: true,", " skipDuplicateMapOutputs: true) ~> sink1" ] } } }
    
© www.soinside.com 2019 - 2024. All rights reserved.