我已经在 Spark conf 中设置了
"spark.sql.caseSensitive","False"
。
这就是我创建架构的方式:
schema = {
"type": "struct",
"fields": [
{"metadata": {}, "name": "firstName", "nullable": True, "type": "string"},
{"metadata": {}, "name": "address", "nullable": True, "type": {
"type": "struct",
"fields":
[
{"metadata": {}, "name": "addressline1", "nullable": True, "type": "string"},
{"metadata": {}, "name": "addressline2", "nullable": True, "type": "string"}
]}
}
]
}
这就是我的数据:
data = [
{
"name":"some name 1",
"address": {
"addressLine1": "some address line 1",
"addressLine2": "some address line 2"
}
}
]
这就是我创建数据框的方式:
schema_as_json = StructType.fromJson(schema)
df = spark.createDataFrame(data, schema_as_json)
这不会从
addressline1
和 addressline2
读取数据,因为它们的情况不同。 (注意L)
我无法控制数据,因此无法将列名更改为小写。
如何从给定输入读取数据并仍将字段名称保留为小写模式?
如果我将
addressline1
更改为 addressLine1
并将 addressline2
更改为 addressLine2
,它会正确读取数据,但是当我尝试合并 Iceberg 表中的数据时,它会抛出一个错误:
AnalysisException saying Could not align Iceberg MERGE INTO
即使将spark.sql.caseSensitive设置为False,模式也必须与数据结构完全匹配(如PySpark文档所示,此配置仅适用于Spark SQL)。
最简单的方法是在不修改数据的情况下动态转换模式。这是工作代码。
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([
StructField("name", StringType(), True),
StructField("address", StructType([
StructField("addressline1", StringType(), True),
StructField("addressline2", StringType(), True)
]), True)
])
data = [
{
"name": "some name 1",
"address": {
"addressLine1": "some address line 1",
"addressLine2": "some address line 2"
}
}
]
def transform_keys(obj):
if isinstance(obj, dict):
return {k.lower(): transform_keys(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [transform_keys(i) for i in obj]
return obj
transformed_data = [transform_keys(record) for record in data]
df = spark.createDataFrame(transformed_data, schema)
df.show(truncate=False)
如果您需要更多信息,请告诉我。