我正在开发一个项目,需要使用 PySpark 更新 Oracle 数据库中现有表中的记录。然而,我正在使用的方法面临一些挑战。
这是我的问题的摘要:
我尝试使用spark.sql()方法对表执行更新查询,但我发现该方法不适合直接更新外部数据库(如Oracle)中的表。 我的表已经存在,并且在写回数据时无法使用覆盖模式,因为这会导致删除现有表,这在我的情况下是不允许的。
使用spark.sql():我尝试直接使用spark.sql()执行更新查询,但这会导致错误,因为spark.sql()主要用于处理在Spark中注册的临时表或视图。
用write.jdbc写入:我考虑过用write.jdbc来插入记录,确实,使用append模式可以正常插入。这是我的代码片段:
if ids_not_in_target.count() > 0:
ids_not_in_target.write.format("jdbc").mode("append").option("dbtable", target_table).option("url", target_jdbc_url).options(**target_oracle_properties).save()
else:
update_sql = f"""
UPDATE target_table
SET column1 = value1,
column2 = value2
WHERE condition
"""
spark.sql(update_sql) # This does not work as expected
读取数据或写入数据库时出错:
[TABLE_OR_VIEW_NOT_FOUND] The table or view `<schema_name>`.`<table_name>` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 2 pos 19;
+- 'UnresolvedRelation [<schema>, <table_name>], [], false
注意:我的表存在于数据库中,我可以在表中读取和追加。
问题: 使用 PySpark 对 Oracle 表执行 UPDATE 操作的最佳方法是什么?
如何实现一种有效的方法来更新记录而不使用覆盖? 如果无法使用spark.sql()或write.jdbc直接更新,是否有办法从表中读取数据,在DataFrame中应用更新,然后仅将更改写回数据库?
spark.sql()
将在该 SparkSession 的当前目录/模式上执行 sql。您可以使用目录界面获取当前会话中所有目录/模式/表的所有信息。您可以通过以下方式找到:print(spark.catalog.currentCatalog())
print(spark.catalog.currentDatabase())