我正在尝试从具有两个数组 [NewData、OldData] 的 JSON 消息创建增量实时表。我在 readStream 代码中传递我的架构,并仅选择 NewData.* 以获取 NewData 数组中的字段。但是,我注意到该表最终创建了 NewData 和 OldData 列以及 NewData 字段。
新数据 | 旧数据 | 新数据_字段1 | 新数据_字段2 | ... |
---|---|---|---|---|
空 | 空 | 1 | 2 | ... |
有没有办法只获取我的Delta直播流表中的NewData记录?
代码如下。
schema = StructType([
StructField("NewData", StructType([
StructField(...
]), True),
StructField("OldData", StructType([
StructField(...
@dlt.table(
name="newdata_raw",
table_properties={"quality": "bronze"},
temporary=False,
)
def create_table():
query = (
spark.readStream.format("cloudFiles")
.schema(schema)
.option("cloudFiles.format", "json")
.option("checkpointLocation", sink_dir +"checkpoint/")
.load(sink_dir)
.select("NewData.*")
.withColumn("load_date", to_timestamp(current_timestamp()))
)
return query
已解决:我最终需要对桌子进行全面刷新。由于它最初在没有 select 语句的情况下运行,因此创建了两个顶级数组,然后保留在后续运行中。