由于表名称中的特殊字符而执行 PySpark / Delta MERGE 语句时出现问题

问题描述 投票:0回答:1

我正在致力于实现 CDC(更改数据捕获),它将使用 Azure Sql 作为源,使用 Azure Databricks delta Lake 作为目标。 Azew Sql 创建具有非常具体的列名的 CDC 表,其中包括特殊字符:“__$start_lsn”就是一个例子。 我在执行以下 PySpark / Delta“MERGE”语句时遇到问题:

MERGE INTO poc_table_2 AS target
USING cdc_data AS source
ON target.last_processed_lsn = source.__$start_lsn
WHEN MATCHED AND source.__$operation = 1 THEN DELETE
WHEN MATCHED AND source.__$operation IN (3, 4) THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

在此上下文中,“源”指的是 Azure Sql cdc 表,我无法更改它。 当我运行此语句时,我收到此错误:

[PARSE_SYNTAX_ERROR] Syntax error at or near '$'. SQLSTATE: 42601
File <command-1987778677957359>, line 40
     30 merge_query = """
     31 MERGE INTO poc_table_2 AS target
     32 USING cdc_data AS source
   (...)
     36 WHEN NOT MATCHED THEN INSERT *
     37 """
     39 # Execute the merge query
---> 40 spark.sql(merge_query)
     42 # Update the last processed LSN
     43 last_lsn = cdc_data.agg({"__$start_lsn": "max"}).collect()[0][0]

我尝试在列名称周围使用单引号和双引号,但它不起作用。当我使用引号时,我得到一个略有不同的错误:

[PARSE_SYNTAX_ERROR] Syntax error at or near '"__$start_lsn"'. SQLSTATE: 42601

如何解决这个问题?

pyspark azure-databricks delta-lake cdc
1个回答
0
投票

我遇到了同样的错误:

enter image description here

您收到的错误是由于在列名称中使用特殊字符 (特别是 $) 造成的。 在 PySpark 中,当列名包含特殊字符时,应使用 反引号 () 括住列名。 这告诉 PySpark 按字面解释所包含的字符串,包括它可能包含的任何特殊字符。

我尝试过以下方法:

from pyspark.sql.functions import lit
from delta.tables import DeltaTable
deltaTablePath = "/FileStore/tables/d01"
if not DeltaTable.isDeltaTable(spark, deltaTablePath):
    (sampleTableDF.withColumn("ID", lit(None).cast("int"))
                  .withColumn("`__$start_lsn`", lit(None).cast("string"))  
                  .withColumn("Name", lit(None).cast("string"))
                  .write
                  .format("delta")
                  .save(deltaTablePath))
deltaTable = DeltaTable.forPath(spark, deltaTablePath)
deltaTable.alias("target") \
    .merge(
        sampleTableDF.alias("source"),
        "target.ID = source.ID"
    ) \
    .whenMatchedUpdate(set = {
        "ID": "source.ID",
        "`__$start_lsn`": "source.`__$start_lsn`",  
        "Name": "source.Name"
    }) \
    .whenNotMatchedInsert(values = {
        "ID": "source.ID",
        "`__$start_lsn`": "source.`__$start_lsn`",  
        "Name": "source.Name"
    }) \
    .execute()

结果:

1   00000021:00000001:0001  Alice
2   00000022:00000002:0002  Bob
3   00000023:00000003:0003  Charlie
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.