所以,我在Sql Server中有一个带有Id列的表,它是一个标识列。我面临的问题是,当我尝试将数据框推入其中时,它会抱怨identity_insert设置为'off'。现在,我使用jdbc明确地将其设置为'on'但是因为这是sql server端的会话变量,所以在命中dataframe push命令时它会变回'off',因为两者都是sql的不同会话服务器。
有没有办法在'on'时转动并在同一个会话中推送数据帧?
一些代码--Sql Server表
create table dbo.testtable
(
[Id] int identity,
[Name] varchar(100),
[Address] varchar(100),
[ExtraColumn] int,
[Age] int
)
我的数据框架 -
case class TestClass(Id: Int, Name: String, Address: String, ExtraColumn:
Int, Age: Int)
val seqClass = Seq(TestClass(1, "kv", "riata", 2, 30),
TestClass(2, "xyz", "xyz's place", 2, 31),
TestClass(3, "abc", "abc's place", 2, 32))
val sparkSession = createSparkSession //creating through some method
val df = sparkSession.sqlContext.createDataFrame(seqClass)
JDBCUtils.setIdentityInsertOn(conn, JDBC.SQL_SERVER.TYPE,
"testdb1.dbo.testtable", None) //my method to turn on identity_insert
//code to push data frame to sql server
df.coalesce(1).write.mode("append").jdbc(jdbcUrl,"testdb1.dbo.testtable",
getConnectionProperties(username,password, dbType))
//getConnectionProperties is my own method that provides connection
//properties for jdbc.
请注意,如果我从数据框中删除Id列,则上述工作正常。所以代码整体工作,只是我需要能够在数据帧上维护Id并将其推送到testtable。为什么我不能简单地使用testtable中的身份生成?因为上面的代码是很复杂的工作流程的一部分,我需要在数据框中生成如上所述的Id列。
任何帮助表示赞赏!
谢谢
能够在联系已经实施解决方案的Vaibhav离线后解决此问题。我在这里张贴相同的内容供其他人使用。
在下面的位置从JDBCUtils.java创建SaveTable()和相关函数的本地副本 - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
更新SavePartition函数以插入下面的代码行 -
if(Identity_Insert_Off) {
val sql = "set IDENTITY_INSERT " + table + " ON";
val statement = conn.createStatement()
statement.execute(sql)
}
在第640行循环之前。
while (iterator.hasNext) {...}
如果条件基于场景更新(我只为SqlServer使用此代码,因此只检查传递给函数的Identity Insert Flag)
以下查询可用于检查特定表的身份是开还是关 -
SELECT OBJECTPROPERTY(OBJECT_ID('<TableName>'), 'TableHasIdentity');