在 PySpark 中进行分区时,Delta 表覆盖无法按预期工作

问题描述 投票:0回答:1

我正在处理一个大型数据集,这就是为什么我需要按特定的

id
进行分区。

我有两个笔记本可以分阶段转换数据,我确信问题出在第一个笔记本上。当我手动从第一个笔记本写入的目录中删除文件时,问题就解决了。

数据转换流程

正常流程中:

  1. Input 1
    转换为
    Output 2
  2. Output 2
    然后转换为
    Output 3

但是,当我从输入 1 中删除一行时,它仍然出现在输出 2 中,即使以下代码应覆盖输出 2 中的数据:

df1.repartition('id').write.format("delta").option("overwriteSchema", "True").mode("overwrite").save('/transformed/output2')

之后,使用以下方法生成输出 3:

df2.write.format("delta").option("overwriteSchema", "True").mode("overwrite").save('/transformed/output3')
问题

当我手动删除

output2
目录时,一切正常(不再有旧数据),但旧数据不应该被自动覆盖吗?大多数时候它都按预期工作,但在这种情况下,旧数据似乎仍然存在。

我尝试过的事情
  • 调查了是否是partitionBy导致了这个问题。
  • 分离各个转换步骤以观察任何差异。

使用分区的主要原因是为了提高性能,但我不确定它是否会导致问题。

任何有关为什么会发生这种情况的见解将不胜感激!

apache-spark pyspark partitioning overwrite delta-lake
1个回答
0
投票

感谢@AlexanderPavlov 评论中的答案。


问题是我无意中读取了所有数据文件,包括旧版本,而不仅仅是最新版本。

我通过更新代码解决了这个问题:

# Old code (incorrect)
df_subscriptions = spark.read.load(f'/transformed/output2')

正确的解决方案:

# New code (correct)
df_subscriptions = spark.read.format('delta').load(f'/transformed/output2')

这可确保仅读取最新版本的数据。

© www.soinside.com 2019 - 2024. All rights reserved.