- 您可以使用派生列转换来实现这一点。我已将以下内容作为我的来源。

- 现在,使用
associate
函数分别制作键值对,以使用派生列转换创建 2 个新列。
A: associate(CUST_ID_A,{SCORE A})
B: associate(CUST_ID_B,{SCORE B})

- 现在,使用新创建的 so 列创建一个数组
array(A,B)
.

- 现在,在接收器中,我选择一个 JSON 接收器文件并仅映射所需的列,如下所示:


{
"name": "dataflow1",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "DelimitedText1",
"type": "DatasetReference"
},
"name": "source1"
}
],
"sinks": [
{
"dataset": {
"referenceName": "Json1",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [
{
"name": "derivedColumn1"
},
{
"name": "derivedColumn2"
}
],
"scriptLines": [
"source(output(",
" TRANS_ID as string,",
" CUST_ID_A as string,",
" {SCORE A} as string,",
" CUST_ID_B as string,",
" {SCORE B} as string",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" ignoreNoFilesFound: false) ~> source1",
"source1 derive(A = associate(CUST_ID_A,{SCORE A}),",
" B = associate(CUST_ID_B,{SCORE B})) ~> derivedColumn1",
"derivedColumn1 derive(cust_conf = array(A,B)) ~> derivedColumn2",
"derivedColumn2 sink(allowSchemaDrift: true,",
" validateSchema: false,",
" partitionFileNames:['op.json'],",
" umask: 0022,",
" preCommands: [],",
" postCommands: [],",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" mapColumn(",
" TRANS_ID,",
" cust_conf",
" ),",
" partitionBy('hash', 1)) ~> sink1"
]
}
}
}