根据模式读取 Spark DataFrame 中的数据,但不区分大小写

问题描述 投票:0回答:1

我已经在 Spark conf 中设置了

"spark.sql.caseSensitive","False"

这就是我创建架构的方式:

schema = {
    "type": "struct",
    "fields": [
        {"metadata": {}, "name": "firstName", "nullable": True, "type": "string"},
        {"metadata": {}, "name": "address", "nullable": True, "type": {
            "type": "struct",
            "fields": 
            [
                {"metadata": {}, "name": "addressline1", "nullable": True, "type": "string"},
                {"metadata": {}, "name": "addressline2", "nullable": True, "type": "string"}
            ]}
        }
        
    ]
}

这就是我的数据:

data = [
    {
        "name":"some name 1",
        "address": {
            "addressLine1": "some address line 1",
            "addressLine2": "some address line 2"
        }
    }
]

这就是我创建数据框的方式:

schema_as_json = StructType.fromJson(schema)

df = spark.createDataFrame(data, schema_as_json)

这不会从

addressline1
addressline2
读取数据,因为它们的情况不同。 (注意L)

我无法控制数据,因此无法将列名更改为小写。

如何从给定输入读取数据并仍将字段名称保留为小写模式?

如果我将

addressline1
更改为
addressLine1
并将
addressline2
更改为
addressLine2
,它会正确读取数据,但是当我尝试合并 Iceberg 表中的数据时,它会抛出一个错误:

AnalysisException saying Could not align Iceberg MERGE INTO

pyspark aws-glue apache-iceberg
1个回答
0
投票

即使将spark.sql.caseSensitive设置为False,模式也必须与数据结构完全匹配(如PySpark文档所示,此配置仅适用于Spark SQL)。

最简单的方法是在不修改数据的情况下动态转换模式。这是工作代码。

from pyspark.sql.types import StructType, StructField, StringType

 
schema = StructType([
    StructField("name", StringType(), True),
    StructField("address", StructType([
        StructField("addressline1", StringType(), True),
        StructField("addressline2", StringType(), True)
    ]), True)
])

 
data = [
    {
        "name": "some name 1",
        "address": {
            "addressLine1": "some address line 1",
            "addressLine2": "some address line 2"
        }
    }
]

 def transform_keys(obj):
    if isinstance(obj, dict):
        return {k.lower(): transform_keys(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [transform_keys(i) for i in obj]
    return obj

 transformed_data = [transform_keys(record) for record in data]

 
df = spark.createDataFrame(transformed_data, schema)
 
df.show(truncate=False)

如果您需要更多信息,请告诉我。

© www.soinside.com 2019 - 2024. All rights reserved.