使用自动加载器数据块过滤目录

问题描述 投票:0回答:1

我有一个具有这种结构的数据湖。不幸的是,正如您在第二张图片中看到的那样,数据中存在错误,因此我的未来和过去的年份毫无意义,并且它们具有虚拟数据,例如带有数据的year = 720或year = 2050。该数据每 n-1 天更新一次,并用新数据重写分区。

使用自动加载器如何预先过滤数据,以便列表不是很密集。我不想列出所有内容然后进行过滤,我只是希望自动加载器不查看某些分区。

enter image description here enter image description here

apache-spark pyspark databricks spark-streaming spark-structured-streaming
1个回答
0
投票

要使用 Databricks Autoloader 过滤分区并排除无效数据(例如,超出有效范围的年份),您可以使用 pathGlobFilter 选项来限制其处理的分区。

示例代码:

from pyspark.sql.functions import col

valid_years = "(2000|2001|2002|2003|2004|2005|2006|2007|2008|2009|2010|2011|2012|2013|2014|2015|2016|2017|2018|2019|2020|2021|2022|2023|2024)"
autoloader_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("pathGlobFilter", f"/year={valid_years}/*")
    .option("cloudFiles.useNotifications", "true")  # For optimal performance
    .load("dbfs:/mnt/eredes/sgtes/gg/cons/dcgroup.xchanneldata/")
)

filtered_df = autoloader_df.filter(
    (col("year").cast("int") >= 2000) & (col("year").cast("int") <= 2024)
)

(filtered_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "dbfs:/mnt/your-checkpoint-path/")
    .start("dbfs:/mnt/your-target-path/"))

祝你好运!

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.