我正在使用AWS Glue将多个文件从S3移动到RDS实例。每天我都会在S3中获得一个新文件,其中可能包含新数据,但也可以包含我已经保存的记录和一些更新值。如果我多次运行该作业,我当然会在数据库中获得重复记录。而不是插入多个记录,我希望Glue尝试更新该记录,如果它注意到字段已更改,则每个记录都有唯一的ID。这可能吗?
不幸的是,使用Glue没有优雅的方法。如果你写信给Redshift你可以use postactions
to implement Redshift merge操作。但是,其他jdbc接收器(afaik)是不可能的。
或者,在ETL脚本中,您可以从数据库加载现有数据,以在保存之前过滤掉现有记录。但是,如果您的数据库表很大,那么该作业可能需要一段时间来处理它。
另一种方法是先写入模式'覆盖'的临时表(替换现有的临时数据),然后通过API调用数据库,只将新记录复制到最终表中。
我采用了类似的方法,Yuriy建议作为第二选择。获取现有数据以及新数据,然后进行一些处理以合并它们并使用ovewrite模式进行写入。以下代码将帮助您了解如何解决此问题。
sc = SparkContext()
glueContext = GlueContext(sc)
#get your source data
src_data = create_dynamic_frame.from_catalog(database = src_db, table_name = src_tbl)
src_df = src_data.toDF()
#get your destination data
dst_data = create_dynamic_frame.from_catalog(database = dst_db, table_name = dst_tbl)
dst_df = dst_data.toDF()
#Now merge two data frames to remove duplicates
merged_df = dst_df.union(src_df)
#Finally save data to destination with OVERWRITE mode
merged_df.write.format('jdbc').options( url = dest_jdbc_url,
user = dest_user_name,
password = dest_password,
dbtable = dest_tbl ).mode("overwrite").save()
我已经使用INSERT进入表.... ON DUPLICATE KEY ..用于UPSERT进入运行mysql引擎的Aurora RDS。也许这将是您的用例的参考。我们不能使用JDBC,因为我们目前只支持APPEND,OVERWRITE和ERROR模式。
我不确定您使用的RDS数据库引擎,以下是mysql UPSERTS的示例。
请参阅此参考资料,其中我使用INSERT INTO TABLE..ON DUPLICATE KEY为mysql发布了一个解决方案:
Error while using INSERT INTO table ON DUPLICATE KEY, using a for loop array