在这里,我尝试模拟 Hudi 数据集的更新和删除,并希望看到 Athena 表中反映的状态。我们使用AWS的EMR、S3和Athena服务。
withdrawalID_mutate = 10382495
updateDF = final_df.filter(col("withdrawalID") == withdrawalID_mutate) \
.withColumn("accountHolderName", lit("Hudi_Updated"))
updateDF.write.format("hudi") \
.options(**hudi_options) \
.mode("append") \
.save(tablePath)
hudiDF = spark.read \
.format("hudi") \
.load(tablePath).filter(col("withdrawalID") == withdrawalID_mutate).show()
显示更新的记录,但它实际上附加在 Athena 表中。可能与胶水目录有关?
deleteDF = updateDF #deleting the updated record above
deleteDF.write.format("hudi") \
.option('hoodie.datasource.write.operation', 'upsert') \
.option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \
.options(**hudi_options) \
.mode("append") \
.save(tablePath)
仍然反映Athena表中已删除的记录
还尝试使用
mode("overwrite")
,但正如预期的那样,它会删除较旧的分区并仅保留最新的分区。
在您的deleteDF.write调用中,您将写入操作设置为
upsert
,这将更新插入记录而不是删除它。
尝试改变这个
.option('hoodie.datasource.write.operation', 'upsert')
到
.option('hoodie.datasource.write.operation', 'delete')
这将删除该记录。