AWS Glue并更新重复数据

问题描述 投票:1回答:3

我正在使用AWS Glue将多个文件从S3移动到RDS实例。每天我都会在S3中获得一个新文件,其中可能包含新数据,但也可以包含我已经保存的记录和一些更新值。如果我多次运行该作业,我当然会在数据库中获得重复记录。而不是插入多个记录,我希望Glue尝试更新该记录,如果它注意到字段已更改,则每个记录都有唯一的ID。这可能吗?

python pyspark etl aws-glue
3个回答
3
投票

不幸的是,使用Glue没有优雅的方法。如果你写信给Redshift你可以use postactions to implement Redshift merge操作。但是,其他jdbc接收器(afaik)是不可能的。

或者,在ETL脚本中,您可以从数据库加载现有数据,以在保存之前过滤掉现有记录。但是,如果您的数据库表很大,那么该作业可能需要一段时间来处理它。

另一种方法是先写入模式'覆盖'的临时表(替换现有的临时数据),然后通过API调用数据库,只将新记录复制到最终表中。


1
投票

我采用了类似的方法,Yuriy建议作为第二选择。获取现有数据以及新数据,然后进行一些处理以合并它们并使用ovewrite模式进行写入。以下代码将帮助您了解如何解决此问题。

sc = SparkContext()
glueContext = GlueContext(sc)

#get your source data 
src_data = create_dynamic_frame.from_catalog(database = src_db, table_name = src_tbl)
src_df =  src_data.toDF()


#get your destination data 
dst_data = create_dynamic_frame.from_catalog(database = dst_db, table_name = dst_tbl)
dst_df =  dst_data.toDF()

#Now merge two data frames to remove duplicates
merged_df = dst_df.union(src_df)

#Finally save data to destination with OVERWRITE mode
merged_df.write.format('jdbc').options(   url = dest_jdbc_url, 
                                          user = dest_user_name,
                                          password = dest_password,
                                          dbtable = dest_tbl ).mode("overwrite").save()

0
投票

我已经使用INSERT进入表.... ON DUPLICATE KEY ..用于UPSERT进入运行mysql引擎的Aurora RDS。也许这将是您的用例的参考。我们不能使用JDBC,因为我们目前只支持APPEND,OVERWRITE和ERROR模式。

我不确定您使用的RDS数据库引擎,以下是mysql UPSERTS的示例。

请参阅此参考资料,其中我使用INSERT INTO TABLE..ON DUPLICATE KEY为mysql发布了一个解决方案:

Error while using INSERT INTO table ON DUPLICATE KEY, using a for loop array

© www.soinside.com 2019 - 2024. All rights reserved.