从 PySpark 列的 MapType() 列中按名称提取项目

问题描述 投票:0回答:1

我有一个像这样结构的 PySpark 数据框(这是一个近似值,因为我不确定如何准确地重新创建示例数据,但它看起来像这样):

data = [
    ("Item A", "2024-12-01", {"City": "Palo Alto", "State": "CA", "Zip": "94301"}),
    ("Item B", "2024-12-02", {"State": "NY", "City": "New York", "Zip": "10001"}),
    ("Item B", "2024-12-03", {"City": "Austin", "State": "TX", "Zip": "73301"})
]

schema = StructType([
    StructField("item", StringType(), True),
    StructField("date", StringType(), True),
    StructField("geo_data", MapType(StringType(), StringType()), True)
])

sample_df = spark.createDataFrame(data, schema)

我想使用

geo_data
将“状态”值从
.withcolumn
列提取到新列中,但我遇到了问题,因为
geo_data
列中的数组排序不一致,并且在某些行中数组中没有“状态”项。这意味着我不能使用像这样简单的东西:

.withColumn("state_code", F.expr("geo_data[1]"))

我还尝试通过以下方式使用

getItem()
getField()

new_data = sample_df.withColumn(
    "state_code", 
    F.col("geo_data").getField("State")
)

每种方式的结果都是这个错误:

[DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE] Cannot resolve "geo_data[State]" due to data type mismatch: Parameter 2 requires the "INTEGRAL" type, however "State" has the type "STRING". SQLSTATE: 42K09

寻找更好的方法来做到这一点。谢谢。

arrays dictionary pyspark extract
1个回答
0
投票

这是一个字典,只需像这样提取状态即可:

sample_df= sample_df.withColumn('State' , sample_df.geo_data['State'])
© www.soinside.com 2019 - 2024. All rights reserved.