我正在致力于实现 CDC(更改数据捕获),它将使用 Azure Sql 作为源,使用 Azure Databricks delta Lake 作为目标。 Azew Sql 创建具有非常具体的列名的 CDC 表,其中包括特殊字符:“__$start_lsn”就是一个例子。 我在执行以下 PySpark / Delta“MERGE”语句时遇到问题:
MERGE INTO poc_table_2 AS target
USING cdc_data AS source
ON target.last_processed_lsn = source.__$start_lsn
WHEN MATCHED AND source.__$operation = 1 THEN DELETE
WHEN MATCHED AND source.__$operation IN (3, 4) THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
在此上下文中,“源”指的是 Azure Sql cdc 表,我无法更改它。 当我运行此语句时,我收到此错误:
[PARSE_SYNTAX_ERROR] Syntax error at or near '$'. SQLSTATE: 42601
File <command-1987778677957359>, line 40
30 merge_query = """
31 MERGE INTO poc_table_2 AS target
32 USING cdc_data AS source
(...)
36 WHEN NOT MATCHED THEN INSERT *
37 """
39 # Execute the merge query
---> 40 spark.sql(merge_query)
42 # Update the last processed LSN
43 last_lsn = cdc_data.agg({"__$start_lsn": "max"}).collect()[0][0]
我尝试在列名称周围使用单引号和双引号,但它不起作用。当我使用引号时,我得到一个略有不同的错误:
[PARSE_SYNTAX_ERROR] Syntax error at or near '"__$start_lsn"'. SQLSTATE: 42601
如何解决这个问题?
我遇到了同样的错误:
您收到的错误是由于在列名称中使用特殊字符 (特别是 $) 造成的。 在 PySpark 中,当列名包含特殊字符时,应使用 反引号 () 括住列名。 这告诉 PySpark 按字面解释所包含的字符串,包括它可能包含的任何特殊字符。
我尝试过以下方法:
from pyspark.sql.functions import lit
from delta.tables import DeltaTable
deltaTablePath = "/FileStore/tables/d01"
if not DeltaTable.isDeltaTable(spark, deltaTablePath):
(sampleTableDF.withColumn("ID", lit(None).cast("int"))
.withColumn("`__$start_lsn`", lit(None).cast("string"))
.withColumn("Name", lit(None).cast("string"))
.write
.format("delta")
.save(deltaTablePath))
deltaTable = DeltaTable.forPath(spark, deltaTablePath)
deltaTable.alias("target") \
.merge(
sampleTableDF.alias("source"),
"target.ID = source.ID"
) \
.whenMatchedUpdate(set = {
"ID": "source.ID",
"`__$start_lsn`": "source.`__$start_lsn`",
"Name": "source.Name"
}) \
.whenNotMatchedInsert(values = {
"ID": "source.ID",
"`__$start_lsn`": "source.`__$start_lsn`",
"Name": "source.Name"
}) \
.execute()
结果:
1 00000021:00000001:0001 Alice
2 00000022:00000002:0002 Bob
3 00000023:00000003:0003 Charlie