用于合并青铜层和银层的合并语句

问题描述 投票:0回答:1

我正在尝试使用 Merge 语句实现构建缓慢变化的维度的逻辑。在我的青铜层或暂存层中,我每天都会收到完整的原始提取物。请参阅下面的示例。

CREATE TABLE IF NOT EXISTS bronze_customers (
    Id INT, 
    CustomerName STRING, 
    ContactName STRING,
    Address STRING,
    City STRING,
    PostalCode STRING,
    Country STRING,
    loading_date DATE
) USING DELTA;

在银层中,我想跟踪添加、更改或删除的记录。我想为此使用以下表结构(已添加 start_date、end_date 和 is_current)。

CREATE TABLE IF NOT EXISTS silver_customers (
    Id INT, 
    CustomerName STRING, 
    ContactName STRING,
    Address STRING,
    City STRING,
    PostalCode STRING,
    Country STRING,
    start_date DATE,
    end_date DATE,
    is_current INT
) USING DELTA;

对于合并,我使用以下语句:

%%sql
MERGE INTO silver_customers AS c
USING
(  
    SELECT cc.Id AS mergekey,
           cc.Id, cc.CustomerName, cc.ContactName, cc.Address, cc.City, cc.PostalCode, cc.Country, cc.loading_date as start_date,
           '' as end_date,
           1 as is_current
    FROM bronze_customers cc  
    UNION ALL  
    SELECT NULL as mergekey,  
           cc.Id, cc.CustomerName, cc.ContactName, cc.Address, cc.City, cc.PostalCode, cc.Country, cc.loading_date as start_date,
           '' as enddate,
           1 as is_current
    FROM bronze_customers cc
) ud
ON c.id = ud.mergekey
AND c.is_current = 1
WHEN MATCHED
    THEN UPDATE SET is_current = 0,
                    end_date = ud.start_date
WHEN NOT MATCHED
AND mergekey is null
    THEN INSERT *

但是,问题是当源系统中没有更改记录时,我收到此错误:

无法执行合并,因为多个源行匹配并尝试 修改Delta表中的相同目标行可能会发生冲突 方法。

这是预期的吗?合并是解决这个问题的最佳方法吗?

sql merge
1个回答
0
投票

该问题是由于 MERGE 语句尝试使用相同的 Id 和 is_current 值更新目标表(“silver_customers”)中的多行。

解决方案:处理源表数据中的重复键

可能的方法是:

  1. 对源数据进行重复数据删除:每次加载时仅对每个 ID 进行唯一记录。
  2. 过滤未更改的行:排除与目标表中 is_current = 1 行相比未更改的行
  3. 使用 ROW_NUMBER 来标识最近的更改
WITH ranked_updates AS (
    SELECT
        cc.Id AS mergekey,
        cc.Id, 
        cc.CustomerName, 
        cc.ContactName, 
        cc.Address, 
        cc.City, 
        cc.PostalCode, 
        cc.Country, 
        cc.loading_date as start_date,
        '' as end_date,
        1 as is_current,
        ROW_NUMBER() OVER (PARTITION BY cc.Id ORDER BY cc.loading_date DESC) as rn
    FROM bronze_customers cc
)
MERGE INTO silver_customers AS c
USING (
    SELECT *
    FROM ranked_updates
    WHERE rn = 1 -- Only consider the most recent record for each Id
) ud
ON c.id = ud.mergekey
AND c.is_current = 1
WHEN MATCHED
    THEN UPDATE SET is_current = 0,
                    end_date = ud.start_date
WHEN NOT MATCHED
AND mergekey is null
    THEN INSERT (Id, CustomerName, ContactName, Address, City, PostalCode, Country, start_date, end_date, is_current)
         VALUES (ud.Id, ud.CustomerName, ud.ContactName, ud.Address, ud.City, ud.PostalCode, ud.Country, ud.start_date, ud.end_date, ud.is_current);

合并是最好的方法吗?

是的,MERGE 是满足 SCD 要求的标准方法,特别是对于 SCD 类型 2 变更。挑战是确保源表没有重复的条目。

© www.soinside.com 2019 - 2024. All rights reserved.