我有一个具有这种结构的数据湖。不幸的是,正如您在第二张图片中看到的那样,数据中存在错误,因此我的未来和过去的年份毫无意义,并且它们具有虚拟数据,例如带有数据的year = 720或year = 2050。该数据每 n-1 天更新一次,并用新数据重写分区。
使用自动加载器如何预先过滤数据,以便列表不是很密集。我不想列出所有内容然后进行过滤,我只是希望自动加载器不查看某些分区。
要使用 Databricks Autoloader 过滤分区并排除无效数据(例如,超出有效范围的年份),您可以使用 pathGlobFilter 选项来限制其处理的分区。
示例代码:
from pyspark.sql.functions import col
valid_years = "(2000|2001|2002|2003|2004|2005|2006|2007|2008|2009|2010|2011|2012|2013|2014|2015|2016|2017|2018|2019|2020|2021|2022|2023|2024)"
autoloader_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("pathGlobFilter", f"/year={valid_years}/*")
.option("cloudFiles.useNotifications", "true") # For optimal performance
.load("dbfs:/mnt/eredes/sgtes/gg/cons/dcgroup.xchanneldata/")
)
filtered_df = autoloader_df.filter(
(col("year").cast("int") >= 2000) & (col("year").cast("int") <= 2024)
)
(filtered_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "dbfs:/mnt/your-checkpoint-path/")
.start("dbfs:/mnt/your-target-path/"))
祝你好运!