我们正在使用 Azure Databricks 笔记本,在其中连接 Azure 数据仓库服务以从表中获取记录并将该表的数据复制到沙盒架构中的另一个表中。
这是代码,
jdbcHostname = "sqldwserver.database.windows.net"
jdbcPort = 1433
jdbcDatabase = "sqldwdb"
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectProperties = {
"user" : dbutils.secrets.get(scope='DWH_Scope_IT_Data_Users', key='sql_username'),
"password" : dbutils.secrets.get(scope='DWH_Scope_IT_Data_Users', key='sql_password'),
"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
Customers_query = "(SELECT TOP 10 * FROM RAW_IT_NO_CLOSED.PT) demoalias"
df = spark.read.jdbc(url=jdbcUrl, table=Customers_query, properties=connectProperties)
# Define the SCHEMA and target table
target_table = "Sandbox_IT.Analysis1"
# SQL query to create the target table if it does not already exist
create_table_query = """
IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'Sandbox_IT' AND TABLE_NAME = 'Testing1')
BEGIN
CREATE TABLE Sandbox_IT.Testing1 (
PROCEDURE_CD VARCHAR(10) NOT NULL,
PROCEDURE_DESCRIPTION VARCHAR(512) NULL,
PROCESSING_DATE DATE NOT NULL
)
END
"""
# Function to execute a SQL query using the Spark JDBC connection
def execute_query(query):
# Establish JDBC connection to execute SQL commands
conn = None
stmt = None
try:
conn = spark._sc._gateway.jvm.java.sql.DriverManager.getConnection(
jdbcUrl,
connectProperties["user"],
connectProperties["password"]
)
stmt = conn.createStatement()
stmt.execute(query)
finally:
if stmt is not None:
stmt.close()
if conn is not None:
conn.close()
# Execute the create table query
execute_query(create_table_query)
# Write the DataFrame to the target table
df.write.jdbc(
url=jdbcUrl,
table=target_table,
mode="append", # Use "overwrite" if you want to replace the existing data
properties=connectProperties
)
上面的代码显示了结果,但无法在沙盒模式中创建新表,并且我们得到了列“PROCEDURE_CD”的数据类型无法参与列存储索引错误。
最后添加了完整的错误。
我尝试将数据类型更改为 Char、Nvarchar 等,还增加了变量的大小,甚至保持最大,但仍然不起作用。
任何人都可以告诉我如何修复此错误吗?
完全错误--
com.microsoft.sqlserver.jdbc.SQLServerException: The statement failed. Column 'PROCEDURE_CD' has a data type that cannot participate in a columnstore index.
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
File <command-1365204618373826>, line 53
50 execute_query(create_table_query)
52 # Write the DataFrame to the target table
---> 53 df.write.jdbc(
54 url=jdbcUrl,
55 table=target_table,
56 mode="append", # Use "overwrite" if you want to replace the existing data
57 properties=connectProperties
58 )
File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function.<locals>.wrapper(*args, **kwargs)
46 start = time.perf_counter()
47 try:
---> 48 res = func(*args, **kwargs)
49 logger.log_success(
50 module_name, class_name, function_name, time.perf_counter() - start, signature
51 )
52 return res
File /databricks/spark/python/pyspark/sql/readwriter.py:1984, in DataFrameWriter.jdbc(self, url, table, mode, properties)
1982 for k in properties:
1983 jprop.setProperty(k, properties[k])
-> 1984 self.mode(mode)._jwrite.jdbc(url, table, jprop)
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args)
1349 command = proto.CALL_COMMAND_NAME +\
1350 self.command_header +\
1351 args_command +\
1352 proto.END_COMMAND_PART
1354 answer = self.gateway_client.send_command(command)
-> 1355 return_value = get_return_value(
1356 answer, self.gateway_client, self.target_id, self.name)
1358 for temp_arg in temp_args:
1359 if hasattr(temp_arg, "_detach"):
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:188, in capture_sql_exception.<locals>.deco(*a, **kw)
186 def deco(*a: Any, **kw: Any) -> Any:
187 try:
--> 188 return f(*a, **kw)
189 except Py4JJavaError as e:
190 converted = convert_exception(e.java_exception)
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o407.jdbc.
: com.microsoft.sqlserver.jdbc.SQLServerException: The statement failed. Column 'PROCEDURE_CD' has a data type that cannot participate in a columnstore index.
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:265)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1676)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(SQLServerStatement.java:907)
at com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(SQLServerStatement.java:802)
at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7627)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:3916)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:268)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:242)
at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeUpdate(SQLServerStatement.java:742)
at org.apache.spark.sql.jdbc.JdbcDialect.createTable(JdbcDialects.scala:180)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:933)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.saveTableToJDBC(JdbcRelationProvider.scala:87)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:100)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:49)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.$anonfun$sideEffectResult$1(commands.scala:82)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:80)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:79)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:91)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$3(QueryExecution.scala:286)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:166)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:286)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$9(SQLExecution.scala:303)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:533)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:226)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1148)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:155)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:482)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:285)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:259)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:280)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:265)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:465)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:465)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:39)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:339)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:335)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:441)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:265)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:395)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:265)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:217)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:214)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:356)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:956)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:424)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:391)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:258)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:853)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
at py4j.Gateway.invoke(Gateway.java:306)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
at java.lang.Thread.run(Thread.java:750)
您没有在
Sandbox_IT.Analysis1
中创建目标表 create_table_query
。
您刚刚创建
CREATE TABLE Sandbox_IT.Testing1
。
您可以在写入操作中将目标表指定为
Sandbox_IT.Testing1
,或者创建表Sandbox_IT.Analysis1
,如下所示。
df.write.jdbc(
url=jdbcUrl,
table="Sandbox_IT.Testing1",
mode="append",
properties=connectProperties
)
或创建下表
create_table_query = """
IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'Sandbox_IT' AND TABLE_NAME = 'Analysis1')
BEGIN
CREATE TABLE Sandbox_IT.Analysis1 (
PROCEDURE_CD VARCHAR(10) NOT NULL,
PROCEDURE_DESCRIPTION VARCHAR(512) NULL,
PROCESSING_DATE DATE NOT NULL
)
END
"""