Databricks Autoloader 批处理模式

问题描述 投票:0回答:1

我正在寻求有关使用 Autoloader 在 Databricks 中处理满载场景的指导。请不要对我太苛刻,因为我目前缺乏实践经验。

我的场景是一个每天进行完整提取的系统。每天它都会将所有文件发送到特定位置:mnt/landing/。之后可以清理该文件夹,确保自动加载器查找新文件。

我想使用这个文件来更新(覆盖)我的青铜层。在此过程中,我想验证架构。为此,我首先获取当前的架构。请注意,下面的代码从着陆区获取数据。它更有活力一点。如果数据已经加载,我想从我的青铜层获取模式。

schema_json = spark.read.parquet("/mnt/landing/20240826/SalesLT.Address.parquet").schema.json()
ddl = spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(schema_json).toDDL()
print(ddl)

接下来,我将使用 Autoloader 读取和写入所有数据。我使用架构作为输入。同样,如果还没有数据,我将使用着陆区的架构,确保可以正确加载数据。

(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "parquet")
 .option("cloudFiles.schemaLocation", "/mnt/landing/_checkpoint/address_autoload/")
 .schema(ddl)
 .load("/mnt/landing/SalesLT.Address.parquet")
 .writeStream
 .format("delta")
 .outputMode("append")
 .option("checkpointLocation", "/mnt/landing/_checkpoint/address_autoload/")
 .trigger(once = True)
 .option("mode", "overwrite")
 .toTable("adventureworks.address"))

但是,当我检查目的地时。数据为空。

df = spark.sql("SELECT * FROM adventureworks.address")
display(df)

知道这里可能出了什么问题吗?

apache-spark databricks databricks-autoloader
1个回答
0
投票

可以尝试使用 mergeSchema 来代替之前确定 schema 吗?

(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "parquet")
 .option("cloudFiles.schemaLocation", "/mnt/landing/_checkpoint/address_autoload/")
 .load("/mnt/landing/SalesLT.Address.parquet")
 .writeStream
 .format("delta")
 .outputMode("append")
 .option("checkpointLocation", "/mnt/landing/_checkpoint/address_autoload/")
 .option("mergeSchema", "true")  # Enable schema evolution
 .trigger(once=True)
 .option("mode", "overwrite")
 .toTable("adventureworks.address"))
© www.soinside.com 2019 - 2024. All rights reserved.