在数据仓库中创建新表时,列的数据类型无法参与 Azure Databricks 中的列存储索引错误

问题描述 投票:0回答:1

我们正在使用 Azure Databricks 笔记本,在其中连接 Azure 数据仓库服务以从表中获取记录并将该表的数据复制到沙盒架构中的另一个表中。

这是代码,

jdbcHostname = "sqldwserver.database.windows.net"
jdbcPort = 1433
jdbcDatabase = "sqldwdb"
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectProperties = {
"user" : dbutils.secrets.get(scope='DWH_Scope_IT_Data_Users', key='sql_username'),
"password" : dbutils.secrets.get(scope='DWH_Scope_IT_Data_Users', key='sql_password'),
"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}



Customers_query = "(SELECT TOP 10 * FROM RAW_IT_NO_CLOSED.PT) demoalias"
df = spark.read.jdbc(url=jdbcUrl, table=Customers_query, properties=connectProperties)

# Define the SCHEMA and target table
target_table = "Sandbox_IT.Analysis1"

# SQL query to create the target table if it does not already exist
create_table_query = """
IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'Sandbox_IT' AND TABLE_NAME = 'Testing1')
BEGIN
    CREATE TABLE Sandbox_IT.Testing1 (
        PROCEDURE_CD VARCHAR(10) NOT NULL,
        PROCEDURE_DESCRIPTION VARCHAR(512) NULL,
        PROCESSING_DATE DATE NOT NULL
    )
END
"""

# Function to execute a SQL query using the Spark JDBC connection
def execute_query(query):
    # Establish JDBC connection to execute SQL commands
    conn = None
    stmt = None
    try:
        conn = spark._sc._gateway.jvm.java.sql.DriverManager.getConnection(
            jdbcUrl,
            connectProperties["user"],
            connectProperties["password"]
        )
        stmt = conn.createStatement()
        stmt.execute(query)
    finally:
        if stmt is not None:
            stmt.close()
        if conn is not None:
            conn.close()

# Execute the create table query
execute_query(create_table_query)

# Write the DataFrame to the target table
df.write.jdbc(
    url=jdbcUrl,
    table=target_table,
    mode="append",  # Use "overwrite" if you want to replace the existing data
    properties=connectProperties
)

上面的代码显示了结果,但无法在沙盒模式中创建新表,并且我们得到了列“PROCEDURE_CD”的数据类型无法参与列存储索引错误

最后添加了完整的错误。

我尝试将数据类型更改为 Char、Nvarchar 等,还增加了变量的大小,甚至保持最大,但仍然不起作用。

任何人都可以告诉我如何修复此错误吗?

完全错误--

com.microsoft.sqlserver.jdbc.SQLServerException: The statement failed. Column 'PROCEDURE_CD' has a data type that cannot participate in a columnstore index.
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
File <command-1365204618373826>, line 53
     50 execute_query(create_table_query)
     52 # Write the DataFrame to the target table
---> 53 df.write.jdbc(
     54     url=jdbcUrl,
     55     table=target_table,
     56     mode="append",  # Use "overwrite" if you want to replace the existing data
     57     properties=connectProperties
     58 )

File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     46 start = time.perf_counter()
     47 try:
---> 48     res = func(*args, **kwargs)
     49     logger.log_success(
     50         module_name, class_name, function_name, time.perf_counter() - start, signature
     51     )
     52     return res

File /databricks/spark/python/pyspark/sql/readwriter.py:1984, in DataFrameWriter.jdbc(self, url, table, mode, properties)
   1982 for k in properties:
   1983     jprop.setProperty(k, properties[k])
-> 1984 self.mode(mode)._jwrite.jdbc(url, table, jprop)

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args)
   1349 command = proto.CALL_COMMAND_NAME +\
   1350     self.command_header +\
   1351     args_command +\
   1352     proto.END_COMMAND_PART
   1354 answer = self.gateway_client.send_command(command)
-> 1355 return_value = get_return_value(
   1356     answer, self.gateway_client, self.target_id, self.name)
   1358 for temp_arg in temp_args:
   1359     if hasattr(temp_arg, "_detach"):

File /databricks/spark/python/pyspark/errors/exceptions/captured.py:188, in capture_sql_exception.<locals>.deco(*a, **kw)
    186 def deco(*a: Any, **kw: Any) -> Any:
    187     try:
--> 188         return f(*a, **kw)
    189     except Py4JJavaError as e:
    190         converted = convert_exception(e.java_exception)

File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o407.jdbc.
: com.microsoft.sqlserver.jdbc.SQLServerException: The statement failed. Column 'PROCEDURE_CD' has a data type that cannot participate in a columnstore index.
    at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:265)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1676)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(SQLServerStatement.java:907)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(SQLServerStatement.java:802)
    at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7627)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:3916)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:268)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:242)
    at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeUpdate(SQLServerStatement.java:742)
    at org.apache.spark.sql.jdbc.JdbcDialect.createTable(JdbcDialects.scala:180)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:933)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.saveTableToJDBC(JdbcRelationProvider.scala:87)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:100)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:49)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.$anonfun$sideEffectResult$1(commands.scala:82)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:80)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:79)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:91)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$3(QueryExecution.scala:286)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:166)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:286)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$9(SQLExecution.scala:303)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:533)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:226)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1148)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:155)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:482)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:285)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:259)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:280)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:265)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:465)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:69)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:465)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:39)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:339)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:335)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:441)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:265)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:395)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:265)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:217)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:214)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:356)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:956)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:424)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:391)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:258)
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:853)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
    at py4j.Gateway.invoke(Gateway.java:306)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:199)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:119)
    at java.lang.Thread.run(Thread.java:750)
jdbc azure-databricks azure-synapse data-warehouse columnstore
1个回答
0
投票

您没有在

Sandbox_IT.Analysis1
中创建目标表
create_table_query

您刚刚创建

CREATE TABLE Sandbox_IT.Testing1

您可以在写入操作中将目标表指定为

Sandbox_IT.Testing1
,或者创建表
Sandbox_IT.Analysis1
,如下所示。

df.write.jdbc(
    url=jdbcUrl,
    table="Sandbox_IT.Testing1",
    mode="append",
    properties=connectProperties
)

或创建下表

create_table_query = """
IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'Sandbox_IT' AND TABLE_NAME = 'Analysis1')
BEGIN
    CREATE TABLE Sandbox_IT.Analysis1 (
        PROCEDURE_CD VARCHAR(10) NOT NULL,
        PROCEDURE_DESCRIPTION VARCHAR(512) NULL,
        PROCESSING_DATE DATE NOT NULL
    )
END
"""
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.