我正在尝试获取源自MS SQL 2007 Server表的spark数据集,并在进行一些转换后将其写到同一数据库上的另一个现有表(也具有新的表名失败)。执行.show()时,数据集的内容似乎很好。无论我尝试使用SaveMode.Append还是SaveMode.Overwrite进行写操作,spark都会尝试创建一个新表,并失败并显示以下错误:com.microsoft.sqlserver.jdbc.SQLServerException:';'附近的语法不正确。在调试时,jdbc驱动程序会中断以下语句:
CREATE TABLE "RMS.cacr_rms.POC_REFUND_DETAILS"; ("REFUND_ID" NVARCHAR(MAX) , "ORDER_ID" NVARCHAR(MAX) , "PROFILE_ID" NVARCHAR(MAX) , "AGENT_ID" INTEGER , "CONSUMER_ID" INTEGER , "REASON_CODE_ID" INTEGER , "REFUND_TYPE" INTEGER , "STORE_ID" INTEGER , "STATUS" NVARCHAR(MAX) , "REFUND_CREATED_DATE" DATETIME , "LAST_UPDATED_DATE" INTEGER , "ORDER_SUBMITTED_DATE" DATETIME , "SHIP_DATE" INTEGER , "ORDER_AMOUNT" INTEGER , "REFUND_AMOUNT" INTEGER , "REFUND_CHANNEL" INTEGER , "TENANT" INTEGER , "VERTICAL" INTEGER , "ATG_REFUND_ID" NVARCHAR(MAX) )
其他信息:spark sql中的JdbcRelationProvider类具有属性tableExists false。
完整的堆栈跟踪:
com.microsoft.sqlserver.jdbc.SQLServerException: Incorrect syntax near ';'.
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1624)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(SQLServerStatement.java:868)
at com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(SQLServerStatement.java:768)
at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7194)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:2979)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:248)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:223)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeUpdate(SQLServerStatement.java:711)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:859)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:81)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at RmsSparkSession.write(RmsSparkSession.java:27)
at MainApp.main(MainApp.java:28)
触发错误的代码:
ds.write().mode(SaveMode.Append).format("jdbc").option("url", config.getString("sqlserver.url"))
.option("dbtable", tableName).option("user", config.getString("sqlserver.username"))
.option("password", config.getString("sqlserver.password")).save();```
[SaveMode.Overwrite将创建表,而SaveMode.Append将追加到现有表而不创建它。
请尝试以下语法,它对我有用。
Properties connectionProps = new Properties();
connectionProps.put("user", config.getString("sqlserver.username"));
connectionProps.put("password", config.getString("sqlserver.password");
connectionProps.put("driver", connection details);
dataframe.write.mode(SaveMode.Append).jdbc(jdbcURL, tableName, connectionProps)
请参阅https://docs.oracle.com/javase/tutorial/jdbc/basics/connecting.html以设置JDBC属性。