我有一个像这样结构的 PySpark 数据框(这是一个近似值,因为我不确定如何准确地重新创建示例数据,但它看起来像这样):
data = [
("Item A", "2024-12-01", {"City": "Palo Alto", "State": "CA", "Zip": "94301"}),
("Item B", "2024-12-02", {"State": "NY", "City": "New York", "Zip": "10001"}),
("Item B", "2024-12-03", {"City": "Austin", "State": "TX", "Zip": "73301"})
]
schema = StructType([
StructField("item", StringType(), True),
StructField("date", StringType(), True),
StructField("geo_data", MapType(StringType(), StringType()), True)
])
sample_df = spark.createDataFrame(data, schema)
我想使用
geo_data
将“状态”值从 .withcolumn
列提取到新列中,但我遇到了问题,因为 geo_data
列中的数组排序不一致,并且在某些行中数组中没有“状态”项。这意味着我不能使用像这样简单的东西:
.withColumn("state_code", F.expr("geo_data[1]"))
我还尝试通过以下方式使用
getItem()
和 getField()
:
new_data = sample_df.withColumn(
"state_code",
F.col("geo_data").getField("State")
)
每种方式的结果都是这个错误:
[DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE] Cannot resolve "geo_data[State]" due to data type mismatch: Parameter 2 requires the "INTEGRAL" type, however "State" has the type "STRING". SQLSTATE: 42K09
寻找更好的方法来做到这一点。谢谢。
这是一个字典,只需像这样提取状态即可:
sample_df= sample_df.withColumn('State' , sample_df.geo_data['State'])