如何使用spark/jdbc从azure databricks更新postgresql表?

问题描述 投票:0回答:2

我能够使用以下语句连接到在 Azure Databricks 群集上安装的 JDBC 驱动程序:

remote_table = (spark.read
  .format("jdbc")
  .option("driver", driver)
  .option("url", url)
  .option("dbtable", table)
  .option("user", user)
  .option("password", password)
  .load()
  )

我能够成功加载此数据框并使用以下代码更新某些行:

remote_table = remote_table.withColumn("status", when(remote_table.transactionKey == transaction_key, "sucess").otherwise(remote_table.status))

现在我尝试更新(使用覆盖模式保存)数据库中的表,但是当我重新加载它时,我要么得到一个空表,要么得到一个未更改的表。为了保存表格,我尝试了这两个代码:

remote_table.write \
  .format("jdbc") \
  .option("url", url) \
  .option("dbtable", table) \
  .option("user", user) \
  .option("password", password) \
  .mode("overwrite") \
  .save()

remote_table.write.mode("overwrite").saveAsTable("remote")

有什么关于为什么这不起作用的提示吗?我没有收到错误消息,只是没有得到我期望的结果。提前非常感谢!

编辑:我最终使用了不同的方法来解决这个问题,我使用了 psycopg2 和以下代码并且它正在工作:

import psycopg2
from psycopg2 import sql

def update_table(transaction_key):
""" update status of request based on the transaction key """
query = sql.SQL("update {table} set {column}='success' where {key} = %s").format(
table=sql.Identifier('table_name'),
column=sql.Identifier('status'),
key=sql.Identifier('transactionKey'))

conn = None
updated_rows = 0
try:
    # read database configuration (hard coded for now)
    params = {"host": "...", "port": "5432", "dbname": "...", "user": "...", "password": "..."}
    # connect to the PostgreSQL database
    conn = psycopg2.connect(**params)
    # create a new cursor
    cur = conn.cursor()
    # execute the UPDATE  statement
    cur.execute(query, (transaction_key,))
    # get the number of updated rows
    updated_rows = cur.rowcount
    # Commit the changes to the database
    conn.commit()
    # Close communication with the PostgreSQL database
    cur.close()
except (Exception, psycopg2.DatabaseError) as error:
    print(error)
finally:
    if conn is not None:
        conn.close()

return updated_rows

我从我发现的另一个代码片段中得到了启发,但我再也找不到该网站了!

postgresql azure jdbc databricks
2个回答
0
投票

我尝试在我的环境中重现相同的场景,并在从 databricks 数据帧在 Postgres sql 中写入数据时遇到类似的问题。

它只是创建表,但不向其中插入行。有了这个文档,您可以使用

jdbc
通过
SQL

插入数据

您可以尝试的解决方法是使用 Databricks SQL 查询写入数据。

  • 首先使用以下语法为数据框创建一个临时视图
remote_table1.createOrReplaceTempView("temp_view_name")

enter image description here

  • 然后使用以下代码将该临时视图数据写入 Postgres SQL 表:
%sql
CREATE  TABLE  new_employees_table
  USING  JDBC
OPTIONS  (
  url  "<jdbc_url>",
  dbtable  "<table_name>",
  user  '<username>',
  password  '<password>'
)  AS
SELECT  *  FROM  employees_table_vw

enter image description here

输出插入数据

enter image description here

对于 Spark 或 Scala,它不会插入数据。检查您的数据框是否有值,如果问题仍然存在,您可以向 Microsoft 提出支持票以进行更深入的调查


0
投票

您能否分享一下您如何使用您提到的其他库?

© www.soinside.com 2019 - 2024. All rights reserved.