我有一个处理各种表的Azure Databricks集群,然后作为最后一步,我将这些表推送到Azure SQL Server以供其他一些进程使用。我在databricks中有一个单元格,看起来像这样:
def generate_connection():
jdbcUsername = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlUserName")
jdbcPassword = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlPassword")
connectionProperties = {
"user" : jdbcUsername,
"password" : jdbcPassword,
"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
return connectionProperties
def generate_url():
jdbcHostname = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlHostName")
jdbcDatabase = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlDatabase")
jdbcPort = 1433
return "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
def persist_table(table, sql_table, mode):
jdbcUrl = generate_url();
connectionProperties = generate_connection()
table.write.jdbc(jdbcUrl, sql_table, properties=connectionProperties, mode=mode)
persist_table(spark.table("Sales.OpenOrders"), "Sales.OpenOrders", "overwrite")
persist_table(spark.table("Sales.Orders"), "Sales.Orders", "overwrite")
这按预期工作。我遇到的问题是Orders表非常大,每天只有一小部分行可能会改变,所以我想做的是将覆盖模式更改为追加模式并将数据框更改为整个表只是可以更改的行。所有这些我知道如何轻松做到,但我想要做的是对Azure SQL数据库运行一个简单的SQL语句来删除已经存在的行,以便它们可能更改的行将被插回。
我想对Azure SQL数据库运行一个SQL语句,如
Delete From Sales.Orders Where CreateDate >= '01/01/2019'
您需要使用pyodbc库。您可以连接并使用sql语句。
import pyodbc
conn = pyodbc.connect( 'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=mydatabe.database.azure.net;'
'DATABASE=AdventureWorks;UID=jonnyFast;'
'PWD=MyPassword')
# Example doing a simple execute
conn.execute('INSERT INTO Bob (Bob1, Bob2) VALUES (?, ?)', ('A', 'B'))
不幸的是,让它在数据库上工作有点痛苦。我不久前写了一篇博文,应该有所帮助。 https://datathirst.net/blog/2018/10/12/executing-sql-server-stored-procedures-on-databricks-pyspark