所以我在 SQL Server 数据库中将此表作为源表:
客户
身份证 | 姓名 | 地址 | 城市 | 状态 | 拉链 | 账单地址 | 账单城市 | 账单状态 | 帐单邮编 |
---|---|---|---|---|---|---|---|---|---|
1 | 客户A | 123海洋大道 | 迈阿密 | FL | 32001 | 001 鲁斯塔维利大道 | 格鲁吉亚 | GA | 010801 |
2 | 客户B | 024 比斯坎大道 | 迈阿密 | FL | 32005 | 001 鲁斯塔维利大道 | 格鲁吉亚 | GA | 010801 |
我必须将记录移动到具有以下结构的另一个数据库中。
客户
身份证 | 姓名 | 地址ID | 账单地址Id |
---|---|---|---|
1 | 客户A | 1 | 2 |
2 | 客户B | 3 | 4 |
地址
身份证 | 地址 | 城市 | 状态 | 拉链 |
---|---|---|---|---|
1 | 123海洋大道 | 迈阿密 | FL | 32001 |
2 | 001 鲁斯塔维利大道 | 格鲁吉亚 | GA | 010801 |
3 | 024 比斯坎大道 | 迈阿密 | FL | 32005 |
4 | 001 鲁斯塔维利大道 | 格鲁吉亚 | GA | 010801 |
请注意,无论 CustomerAddresses 表中是否存在该地址,每个地址都必须单独填写。
我得到的最接近的是在第一个数据流中填充客户和客户地址表,在第二个数据流中我尝试更新AddressId和BillingAddressId,在源客户表和目标客户表之间进行联接(两个ID相同)但是对于 AddressId 并且没有相同的引用,我可以使用街道、城市、州和邮政编码进行另一个连接,但由于存在重复的记录,这对我来说是一个问题。
你将如何前进?
Addresses
Customers
。分别选择2个分行的地址和帐单地址栏count(1)
Addresses
桌子原始 json
{
"name": "CreateAddresses",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "Customers",
"type": "DatasetReference"
},
"name": "Customers"
}
],
"sinks": [
{
"dataset": {
"referenceName": "Addresses",
"type": "DatasetReference"
},
"name": "CreateAddresses"
}
],
"transformations": [
{
"name": "address"
},
{
"name": "billingAddress"
},
{
"name": "union"
},
{
"name": "dedupe"
},
{
"name": "CreateUUID"
},
{
"name": "CreateId"
}
],
"scriptLines": [
"source(output(",
" Id as integer,",
" Name as string,",
" Address as string,",
" City as string,",
" State as string,",
" Zip as string,",
" BillingAddress as string,",
" BillingCity as string,",
" BillingState as string,",
" BillingZip as string",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" isolationLevel: 'READ_UNCOMMITTED',",
" format: 'table') ~> Customers",
"Customers select(mapColumn(",
" Address,",
" City,",
" State,",
" Zip",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> address",
"Customers select(mapColumn(",
" Address = BillingAddress,",
" City = BillingCity,",
" State = BillingState,",
" Zip = BillingZip",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> billingAddress",
"address, billingAddress union(byName: true)~> union",
"union aggregate(groupBy(Address,",
" City,",
" State,",
" Zip),",
" Id = count(1)) ~> dedupe",
"dedupe derive(uuid = uuid()) ~> CreateUUID",
"CreateUUID rank(asc(uuid, true),",
" output(uuid as long)) ~> CreateId",
"CreateId sink(allowSchemaDrift: true,",
" validateSchema: false,",
" input(",
" Id as integer,",
" Address as string,",
" City as string,",
" State as string,",
" Zip as string",
" ),",
" deletable:false,",
" insertable:true,",
" updateable:false,",
" upsertable:false,",
" format: 'table',",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" errorHandlingOption: 'stopOnFirstError') ~> CreateAddresses"
]
}
}
}
Addresses
查找 Customers
数据集。该表应该是管道中设置的先决条件。Addresses
中选择 ID,仅保留必要的列:Id、名称、Addresses.Id、4 个账单列。原始 json
{
"name": "UpdateCustomers",
"properties": {
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "Customers",
"type": "DatasetReference"
},
"name": "Customers"
},
{
"dataset": {
"referenceName": "Addresses",
"type": "DatasetReference"
},
"name": "Addresses"
}
],
"sinks": [
{
"dataset": {
"referenceName": "CustomersOutput",
"type": "DatasetReference"
},
"name": "UpdateCustomers"
}
],
"transformations": [
{
"name": "lookupAddress"
},
{
"name": "select1"
},
{
"name": "lookupBillingAddress"
},
{
"name": "select2"
}
],
"scriptLines": [
"source(output(",
" Id as integer,",
" Name as string,",
" Address as string,",
" City as string,",
" State as string,",
" Zip as string,",
" BillingAddress as string,",
" BillingCity as string,",
" BillingState as string,",
" BillingZip as string",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" isolationLevel: 'READ_UNCOMMITTED',",
" format: 'table') ~> Customers",
"source(output(",
" Id as integer,",
" Address as string,",
" City as string,",
" State as string,",
" Zip as string",
" ),",
" allowSchemaDrift: true,",
" validateSchema: false,",
" isolationLevel: 'READ_UNCOMMITTED',",
" format: 'table') ~> Addresses",
"Customers, Addresses lookup(Customers@Address == Addresses@Address",
" && Customers@City == Addresses@City",
" && Customers@State == Addresses@State",
" && Customers@Zip == Addresses@Zip,",
" multiple: false,",
" pickup: 'any',",
" broadcast: 'auto')~> lookupAddress",
"lookupAddress select(mapColumn(",
" Id = Customers@Id,",
" Name,",
" BillingAddress,",
" BillingCity,",
" BillingState,",
" BillingZip,",
" AddressId = Addresses@Id",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> select1",
"select1, Addresses lookup(BillingAddress == Address",
" && BillingCity == City",
" && BillingState == State",
" && BillingZip == Zip,",
" multiple: false,",
" pickup: 'any',",
" broadcast: 'auto')~> lookupBillingAddress",
"lookupBillingAddress select(mapColumn(",
" Id = select1@Id,",
" Name,",
" AddressId,",
" BillingAddressId = Addresses@Id",
" ),",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true) ~> select2",
"select2 sink(allowSchemaDrift: true,",
" validateSchema: false,",
" input(",
" Id as integer,",
" Name as string,",
" Address as string,",
" City as string,",
" State as string,",
" Zip as string,",
" BillingAddress as string,",
" BillingCity as string,",
" BillingState as string,",
" BillingZip as string",
" ),",
" deletable:false,",
" insertable:true,",
" updateable:false,",
" upsertable:false,",
" format: 'table',",
" skipDuplicateMapInputs: true,",
" skipDuplicateMapOutputs: true,",
" saveOrder: 1,",
" errorHandlingOption: 'stopOnFirstError') ~> UpdateCustomers"
]
}
}
}