我想使用 Databricks 中的 AutoLoader 处理一些镶木地板文件(使用快速压缩)。其中许多文件都是空的或仅包含一条记录。另外,我无法更改它们的创建方式,也无法压缩它们。
以下是我迄今为止尝试过的一些方法:
我设置了以下 AutoLoader 配置:
我使用以下 readStream 配置:
spark.readStream.format("cloudFiles")
.options(**CLOUDFILE_CONFIG)
.option("cloudFiles.format", "parquet")
.option("pathGlobFilter", "*.snappy")
.option("recursiveFileLookup", True)
.schema(schema)
.option("locale", "de-DE")
.option("dateFormat", "dd.MM.yyyy")
.option("timestampFormat", "MM/dd/yyyy HH:mm:ss")
.load(<path-to-source>)
以及以下 writeStream 配置:
df.writeStream.format("delta")
.outputMode("append")
.option("checkpointLocation", <path_to_checkpoint>)
.queryName(<processed_table_name>)
.partitionBy(<partition-key>)
.option("mergeSchema", True)
.trigger(once=True)
.start(<path-to-target>)
我首选的解决方案是使用 DBX,但我不知道为什么作业会成功,我只在目标位置看到空文件夹。这是非常奇怪的行为,因为我认为 AutoLoader 一段时间后仅读取空文件超时!
附注当我使用 Parquet Spark Streaming 而不是 AutoLoader 时,也会发生同样的情况。
您知道发生这种情况的原因吗?我该如何克服这个问题?
您是否指定流读取的架构? (抱歉,还不能添加评论)
如果您正在读取使用快速压缩的 parquet 写入的文件,则文件扩展名是“.snappy.parquet”。您可以尝试更改 pathGlobFilter 以匹配 *.snappy.parquet。我的第二个疑问是关于 cloudFiles.allowOverwrites": True ,您可以在没有此选项的情况下尝试一下。