我正在使用 PySpark 从按 DATE_KEY 分区的 HDFS 位置读取镶木地板文件。以下代码始终从 MAX(DATE_KEY) 分区读取文件并转换为 Polars 数据帧。
def hdfs_fetch_latest_parquet_file(parquet_file_name):
sc = SparkSession.builder.appName("hdfs-spark").config("spark.sql.execution.arrow.pyspark.enabled", "true").master("local[*]").config("spark.executor.memory", "70g").config("spark.driver.memory", "50g").config("spark.memory.offHeap.enabled", "true").config("spark.memory.offHeap.size", "16g").config("spark.driver.maxResultSize", "4g").config("spark.sql.parquet.int96RebaseModeInRead", "CORRECTED").config("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED").config("spark.sql.parquet.datetimeRebaseModeInRead", "CORRECTED").config("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED").getOrCreate()
spark_df = sc.read.parquet(config.HDFS_DEV_URL + config.PARQUET_FILE_REFINED_PATH + parquet_file_name + "/").createOrReplaceTempView("allDateKeysParquet")
spark_df_latest_date_key = sc.sql('select * from allDateKeysParquet where DATE_KEY = (select MAX(DATE_KEY) from allDateKeysParquet)')
df = pl.from_arrow(pa.Table.from_batches(spark_df_latest_date_key._collect_as_arrow())).drop('DATE_KEY')
return df
如果其中没有任何子分区,则此方法可以很好地找到最大 DATE_KEY,即:
但挑战是,如果 DATE_KEY 进一步具有基于 BASE_FEED 的子分区,则代码会失败。
我的目标是读取 MAX(DATE_KEY) 内的镶木地板文件,如果其中包含子文件夹,则也读取其中的所有内容。
我尝试使用下面的代码,但它有例外
spark_df = sc.read.parquet(config.HDFS_DEV_URL + config.PARQUET_FILE_REFINED_PATH + parquet_file_name + "/*").createOrReplaceTempView("allDateKeysParquet")
检测到冲突的目录结构。可疑路径
有没有其他方法可以解决这个问题,以便代码始终找到 MAX(DATE_KEY) 分区并读取其中的所有镶木地板文件,而不管 date_key 包含更多分区(即本例中的 BASE_FEED)?
有人可以帮我吗?
“/*”位于镶木地板的路径上,您应该只访问目录而不是单个文件,在本例中为 REFERENCE_DATA.parquet