我正在开发一个利用 Databricks 上的 Delta Live Tables 的项目,其中我需要创建一个具有缓慢变化的维度类型 2 的维度(Kimball 样式)。该维度是使用自动加载器增量加载的几个青铜表之间的联接结果。生成的银表需要流式传输或仅追加,因为我需要将其用作使用 scd 类型 2 的应用更改加载的流式传输表的源。但是,由于流式传输行为,我面临着挑战。
以下是该场景的细分:
青铜层(bronze_raw):使用自动加载器每天一次增量加载数据。
银层:此处应用业务逻辑来创建维度,我需要生成的表是流式的或仅追加的。
SCD 类型 2 处理 (silver_full_hist):银表充当流表的源,我们在其中应用更改来实现 SCD 类型 2。
问题出现在银层的连接操作过程中。由于流处理仅处理新行,因此与现有记录连接可能会导致数据丢失。例如,如果在 CRM 系统中添加一个新客户,并且我们需要将帐户表与另一个表连接起来以检索未更改的客户代表,则由于不存在现有记录,内部连接将导致没有行。另一方面,左连接最终会导致代表列的值为 null,即使存在一个值。
我正在寻求有关如何在这种情况下实现无缝流程的指导。对于在 Delta Live Tables 中实施此工作流程的任何见解或最佳实践,我们将不胜感激。
我注意到有关该主题的另一个问题:Joining Tables for Databricks Delta Live Tables SCD Type 2
得到的答复是,在加入青铜表之前,需要将 scd 类型 2 应用到每个青铜表上。但是,您需要决定从哪个青铜表中选择 __START_AT 和 __END_AT 列作为 scd 类型 2 的结果维度。这可能并非在所有情况下都很明显,并且可能会迫使您推断出非常复杂的逻辑。
下面是我迄今为止尝试创建的流式银表以用作应用更改的源:
silver_data_load_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
#Ingest files using autoloader. Customer representative are found in a table called 'user'
@dlt.table (
name = "bronze_account_raw"
)
def collect_raw_bronze():
return (
spark.readStream
.format("cloudFiles")
.options(**csv_file_options)
.load(directory_account)
@dlt.table (
name = "bronze_user_raw"
)
def collect_raw_bronze():
return (
spark.readStream
.format("cloudFiles")
.options(**csv_file_options)
.load(directory_user)
#Get the latest state of the sources
dlt.create_streaming_table(name="bronze_account_latest")
dlt.apply_changes(
target = "bronze_account_latest",
source = "bronze_account_raw",
keys = [account_id],
sequence_by ="commit_timestamp",
apply_as_deletes =F.expr(f"operation = 'delete'"),
except_column_list=['operation'],
stored_as_scd_type = 1
)
dlt.create_streaming_table(name="bronze_user_latest")
dlt.apply_changes(
target = "bronze_user_latest",
source = "bronze_user_raw",
keys = [user_id],
sequence_by ="commit_timestamp",
apply_as_deletes =F.expr(f"operation = 'delete'"),
except_column_list=['operation'],
stored_as_scd_type = 1
)
#Get dimension by joining bronze tables. Created as temporary table here since I initially it forgets about the state and reads all rows in the bronze tables and does not raise the exception about deletes and updates (not the case, it raised the error).
transform_query = f"""
SELECT a.customer_name
,u.rep_name
FROM STREAM(LIVE.bronze_account_latest) as a
INNER JOIN STREAM(LIVE.bronze_user_latest) as u on a.rep_id= u.user_id
"""
@dlt.table(
name="silver_customer",
temporary=True
)
def load_silver():
df = spark.sql(transform_query)
columns = df.columns
return (
df.withColumn("silver_commit_timestamp",F.lit(silver_data_load_timestamp))
.select(*columns, "silver_commit_timestamp")
)
#Here I expect a scd type 2 table based on the silver table. I can then choose to (1) get the latest state of the dimension by setting END_AT IS NULL or (2) get the state for a particular point in time.
dlt.create_streaming_table(name="silver_customer_scd",table_properties={"quality": "silver"},)
dlt.apply_changes(
target = "silver_customer_scd",
source = "silver_customer",
keys = ["customer_name"],
sequence_by ="silver_commit_timestamp",
stored_as_scd_type = 2
)
流表继承了 Apache Spark 结构化流的处理保证,并配置为处理来自仅追加数据源的查询,其中新行始终插入到源表中而不是修改。
一旦修改了“bronze_account_latest”、“bronze_user_latest”中的任何记录,这些表就不能用作 silver_customer_scd 的流源,除非您可以按照此处所述对“silver_customer_scd”或“skipChangeCommits”进行“完全刷新” https://docs.databricks.com/en/structured-streaming/delta-lake.html#ignore-changes
另一种方法是使“silver_customer_scd”成为物化视图,DLT 在基础表更新时负责刷新它https://docs.databricks.com/en/views/materialized-views-how-it-works.html