Azure 数据工厂 - 将行规范化为单独的记录

问题描述 投票:0回答:1

所以我在 SQL Server 数据库中将此表作为源表:

客户

身份证 姓名 地址 城市 状态 拉链 账单地址 账单城市 账单状态 帐单邮编
1 客户A 123海洋大道 迈阿密 FL 32001 001 鲁斯塔维利大道 格鲁吉亚 GA 010801
2 客户B 024 比斯坎大道 迈阿密 FL 32005 001 鲁斯塔维利大道 格鲁吉亚 GA 010801

我必须将记录移动到具有以下结构的另一个数据库中。

客户

身份证 姓名 地址ID 账单地址Id
1 客户A 1 2
2 客户B 3 4

地址

身份证 地址 城市 状态 拉链
1 123海洋大道 迈阿密 FL 32001
2 001 鲁斯塔维利大道 格鲁吉亚 GA 010801
3 024 比斯坎大道 迈阿密 FL 32005
4 001 鲁斯塔维利大道 格鲁吉亚 GA 010801

请注意,无论 CustomerAddresses 表中是否存在该地址,每个地址都必须单独填写。

我得到的最接近的是在第一个数据流中填充客户和客户地址表,在第二个数据流中我尝试更新AddressId和BillingAddressId,在源客户表和目标客户表之间进行联接(两个ID相同)但是对于 AddressId 并且没有相同的引用,我可以使用街道、城市、州和邮政编码进行另一个连接,但由于存在重复的记录,这对我来说是一个问题。

你将如何前进?

azure-data-factory etl database-normalization
1个回答
0
投票

您可以创建具有 2 个步骤/数据流的管道: enter image description here

  1. CreateAddresses:提取地址和账单地址。将它们存储到数据库表中
    Addresses
  2. UpdateCustomers:查找创建的地址并更新结果

1.创建地址

enter image description here

  1. 从数据集分支出来
    Customers
    。分别选择2个分行的地址和帐单地址栏
  2. 合并 2 个分支的结果
  3. 聚合结果以删除地址记录的重复数据。为聚合函数随机填充一些内容,因为它们没有用处。在我的示例中,我刚刚填写了
    count(1)
  4. 为地址的每条记录创建一个UUID
  5. 对 uuid 进行排名以创建地址的 Id 列
  6. 沉入
    Addresses
    桌子

原始 json

{
    "name": "CreateAddresses",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "Customers",
                        "type": "DatasetReference"
                    },
                    "name": "Customers"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "Addresses",
                        "type": "DatasetReference"
                    },
                    "name": "CreateAddresses"
                }
            ],
            "transformations": [
                {
                    "name": "address"
                },
                {
                    "name": "billingAddress"
                },
                {
                    "name": "union"
                },
                {
                    "name": "dedupe"
                },
                {
                    "name": "CreateUUID"
                },
                {
                    "name": "CreateId"
                }
            ],
            "scriptLines": [
                "source(output(",
                "          Id as integer,",
                "          Name as string,",
                "          Address as string,",
                "          City as string,",
                "          State as string,",
                "          Zip as string,",
                "          BillingAddress as string,",
                "          BillingCity as string,",
                "          BillingState as string,",
                "          BillingZip as string",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     isolationLevel: 'READ_UNCOMMITTED',",
                "     format: 'table') ~> Customers",
                "Customers select(mapColumn(",
                "          Address,",
                "          City,",
                "          State,",
                "          Zip",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> address",
                "Customers select(mapColumn(",
                "          Address = BillingAddress,",
                "          City = BillingCity,",
                "          State = BillingState,",
                "          Zip = BillingZip",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> billingAddress",
                "address, billingAddress union(byName: true)~> union",
                "union aggregate(groupBy(Address,",
                "          City,",
                "          State,",
                "          Zip),",
                "     Id = count(1)) ~> dedupe",
                "dedupe derive(uuid = uuid()) ~> CreateUUID",
                "CreateUUID rank(asc(uuid, true),",
                "     output(uuid as long)) ~> CreateId",
                "CreateId sink(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     input(",
                "          Id as integer,",
                "          Address as string,",
                "          City as string,",
                "          State as string,",
                "          Zip as string",
                "     ),",
                "     deletable:false,",
                "     insertable:true,",
                "     updateable:false,",
                "     upsertable:false,",
                "     format: 'table',",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     errorHandlingOption: 'stopOnFirstError') ~> CreateAddresses"
            ]
        }
    }
}

2.更新客户

enter image description here

  1. Addresses
    查找
    Customers
    数据集。该表应该是管道中设置的先决条件。
  2. Addresses
    中选择 ID,仅保留必要的列:Id、名称、Addresses.Id、4 个账单列。
  3. 再次重复查找账单列
  4. 仅选择必要的列:Id、name、AddressId、BillingAddressId
  5. 下沉到数据集/写入其他数据集

原始 json

{
    "name": "UpdateCustomers",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "Customers",
                        "type": "DatasetReference"
                    },
                    "name": "Customers"
                },
                {
                    "dataset": {
                        "referenceName": "Addresses",
                        "type": "DatasetReference"
                    },
                    "name": "Addresses"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "CustomersOutput",
                        "type": "DatasetReference"
                    },
                    "name": "UpdateCustomers"
                }
            ],
            "transformations": [
                {
                    "name": "lookupAddress"
                },
                {
                    "name": "select1"
                },
                {
                    "name": "lookupBillingAddress"
                },
                {
                    "name": "select2"
                }
            ],
            "scriptLines": [
                "source(output(",
                "          Id as integer,",
                "          Name as string,",
                "          Address as string,",
                "          City as string,",
                "          State as string,",
                "          Zip as string,",
                "          BillingAddress as string,",
                "          BillingCity as string,",
                "          BillingState as string,",
                "          BillingZip as string",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     isolationLevel: 'READ_UNCOMMITTED',",
                "     format: 'table') ~> Customers",
                "source(output(",
                "          Id as integer,",
                "          Address as string,",
                "          City as string,",
                "          State as string,",
                "          Zip as string",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     isolationLevel: 'READ_UNCOMMITTED',",
                "     format: 'table') ~> Addresses",
                "Customers, Addresses lookup(Customers@Address == Addresses@Address",
                "     && Customers@City == Addresses@City",
                "     && Customers@State == Addresses@State",
                "     && Customers@Zip == Addresses@Zip,",
                "     multiple: false,",
                "     pickup: 'any',",
                "     broadcast: 'auto')~> lookupAddress",
                "lookupAddress select(mapColumn(",
                "          Id = Customers@Id,",
                "          Name,",
                "          BillingAddress,",
                "          BillingCity,",
                "          BillingState,",
                "          BillingZip,",
                "          AddressId = Addresses@Id",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> select1",
                "select1, Addresses lookup(BillingAddress == Address",
                "     && BillingCity == City",
                "     && BillingState == State",
                "     && BillingZip == Zip,",
                "     multiple: false,",
                "     pickup: 'any',",
                "     broadcast: 'auto')~> lookupBillingAddress",
                "lookupBillingAddress select(mapColumn(",
                "          Id = select1@Id,",
                "          Name,",
                "          AddressId,",
                "          BillingAddressId = Addresses@Id",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> select2",
                "select2 sink(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     input(",
                "          Id as integer,",
                "          Name as string,",
                "          Address as string,",
                "          City as string,",
                "          State as string,",
                "          Zip as string,",
                "          BillingAddress as string,",
                "          BillingCity as string,",
                "          BillingState as string,",
                "          BillingZip as string",
                "     ),",
                "     deletable:false,",
                "     insertable:true,",
                "     updateable:false,",
                "     upsertable:false,",
                "     format: 'table',",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     saveOrder: 1,",
                "     errorHandlingOption: 'stopOnFirstError') ~> UpdateCustomers"
            ]
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.