我有一个简单的 Delta Live Tables 管道,它执行从 cloudFiles(s3 存储)到发布到 hive metastore 的 delta 表的多个 csv 文件的流式读取。
我有两个要求使我的情况更加复杂/独特:
skipRows
参数用于 autoLoader。这需要使用 Databricks 运行时的预览通道(撰写本文时为 v11.3)。 来源columnMapping.mode
属性设置为 name
,因为 csv 数据在列名称中包含 Delta / Parquet 本身不允许的字符。 来源以上两个似乎都是预览/测试版功能,所以我观察到的行为可能是一个错误。
我的流水线定义如下:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
s3_url = "s3://<path_to_csvs>"
@dlt.table(
comment="...",
table_properties={
'delta.minReaderVersion' : '2',
'delta.minWriterVersion' : '5',
'delta.columnMapping.mode' : 'name',
'quality': 'bronze'
}
)
def bronze_my_csv_data_raw():
return (
spark.readStream.format("cloudFiles")
.option("skipRows", 1)
.option("header", "true")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("pathGlobFilter", "*.csv")
.load(s3_url)
)
这在管道首次设置和运行时按预期工作,但即使没有进行任何代码更改,重新运行管道的“全部刷新”(刷新所有数据)也会出现以下错误:
com.databricks.sql.transaction.tahoe.DeltaColumnMappingUnsupportedException:
Schema change is detected:
old schema:
root
new schema:
root
|-- TIMESTAMP: string (nullable = true)
|-- RECORD: string (nullable = true)
|-- Samples_Max: string (nullable = true)
...
Schema changes are not allowed during the change of column mapping mode.
即使我更改目标表名称以创建一个新的空表,也会发生这种情况。一旦发生,即使在常规(不是完全刷新)运行中也会出现同样的错误。
任何帮助将不胜感激
我遇到了完全相同的问题。但是我的列名中唯一的特殊字符是空格,所以我只是替换了它们:
@dlt.table(name=table_name)
def table():
df = spark.readStream.format("cloudFiles").options(**cloud_files_options).load(source)
# Replace all spaces with underscores
df = df.select([col(c).alias(c.replace(" ", "_")) for c in df.columns])
return df
这显然不理想,因为我的原始表不再是“原始的”,但至少我现在可以从头开始重新计算我的整个管道。
编辑 - 如果您决定走这条路,请不要忘记从您的表格属性中删除
columnMapping.mode
。