我正在寻求有关使用 Autoloader 在 Databricks 中处理满载场景的指导。请不要对我太苛刻,因为我目前缺乏实践经验。
我的场景是一个每天进行完整提取的系统。每天它都会将所有文件发送到特定位置:mnt/landing/。之后可以清理该文件夹,确保自动加载器查找新文件。
我想使用这个文件来更新(覆盖)我的青铜层。在此过程中,我想验证架构。为此,我首先获取当前的架构。请注意,下面的代码从着陆区获取数据。它更有活力一点。如果数据已经加载,我想从我的青铜层获取模式。
schema_json = spark.read.parquet("/mnt/landing/20240826/SalesLT.Address.parquet").schema.json()
ddl = spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(schema_json).toDDL()
print(ddl)
接下来,我将使用 Autoloader 读取和写入所有数据。我使用架构作为输入。同样,如果还没有数据,我将使用着陆区的架构,确保可以正确加载数据。
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation", "/mnt/landing/_checkpoint/address_autoload/")
.schema(ddl)
.load("/mnt/landing/SalesLT.Address.parquet")
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/landing/_checkpoint/address_autoload/")
.trigger(once = True)
.option("mode", "overwrite")
.toTable("adventureworks.address"))
但是,当我检查目的地时。数据为空。
df = spark.sql("SELECT * FROM adventureworks.address")
display(df)
知道这里可能出了什么问题吗?
可以尝试使用 mergeSchema 来代替之前确定 schema 吗?
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation", "/mnt/landing/_checkpoint/address_autoload/")
.load("/mnt/landing/SalesLT.Address.parquet")
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/landing/_checkpoint/address_autoload/")
.option("mergeSchema", "true") # Enable schema evolution
.trigger(once=True)
.option("mode", "overwrite")
.toTable("adventureworks.address"))