Databricks DLT 流架构

问题描述 投票:0回答:1

我正在尝试从具有两个数组 [NewData、OldData] 的 JSON 消息创建增量实时表。我在 readStream 代码中传递我的架构,并仅选择 NewData.* 以获取 NewData 数组中的字段。但是,我注意到该表最终创建了 NewData 和 OldData 列以及 NewData 字段。

新数据 旧数据 新数据_字段1 新数据_字段2 ...
1 2 ...

有没有办法只获取我的Delta直播流表中的NewData记录?

代码如下。

schema = StructType([
    StructField("NewData", StructType([
        StructField(...
    ]), True),
    StructField("OldData", StructType([
        StructField(...


@dlt.table(
    name="newdata_raw",
    table_properties={"quality": "bronze"},
    temporary=False,
)
def create_table():
    query = (
        spark.readStream.format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "json")
        .option("checkpointLocation", sink_dir +"checkpoint/")
        .load(sink_dir)
        .select("NewData.*")
        .withColumn("load_date", to_timestamp(current_timestamp()))
    )
    return query
databricks delta-live-tables
1个回答
0
投票

已解决:我最终需要对桌子进行全面刷新。由于它最初在没有 select 语句的情况下运行,因此创建了两个顶级数组,然后保留在后续运行中。

© www.soinside.com 2019 - 2024. All rights reserved.