如何在 PySpark 中使用隐式类型转换处理 from_json 中的字符串数字?

问题描述 投票:0回答:1

我有一个 PySpark DataFrame 架构,其中

quantity
字段指定为
IntegerType
。但是,当 JSON 数据包含数字的字符串表示形式(例如,
"30"
)时,记录将移动到
corrupt_records

from pyspark.sql.functions import from_json, col, when
from pyspark.sql.types import StringType, StructType, StructField, IntegerType

data = [
    ("{'fruit':'Apple', 'quantity':10}",),
    ("{'fruit':'Banana', 'quantity':20}",),
    ("{'fruit':'Cherry', 'quantity':'30'}",),
    ("{'fruit':'Date', 'quantity':'40'}",),
    ("{'fruit':'Elderberry', 'quantity':'50'}",)
]

# Define schema
schema = StructType([
    StructField("json_column", StringType(), True)
])

# Create DataFrame
input_df = spark.createDataFrame(data, schema)
input_df.show(truncate=False)
+---------------------------------------+
|json_column                            |
+---------------------------------------+
|{'fruit':'Apple', 'quantity':10}       |
|{'fruit':'Banana', 'quantity':20}      |
|{'fruit':'Cherry', 'quantity':'30'}    |
|{'fruit':'Date', 'quantity':'40'}      |
|{'fruit':'Elderberry', 'quantity':'50'}|
+---------------------------------------+
json_options = {"columnNameOfCorruptRecord":"corrupt_json"}
json_schema =  StructType([StructField('fruit', StringType(), True), 
                           StructField('quantity', IntegerType(), True), 
                           StructField('corrupt_json', StringType(), True)]
                          )

json_df = input_df.select(from_json("json_column", schema = json_schema, options=json_options).alias("data"))
df = json_df.select("data.*")
df.show(truncate = False)

+----------+--------+---------------------------------------+
|fruit     |quantity|corrupt_json                           |
+----------+--------+---------------------------------------+
|Apple     |10      |NULL                                   |
|Banana    |20      |NULL                                   |
|Cherry    |NULL    |{'fruit':'Cherry', 'quantity':'30'}    |
|Date      |NULL    |{'fruit':'Date', 'quantity':'40'}      |
|Elderberry|NULL    |{'fruit':'Elderberry', 'quantity':'50'}|
+----------+--------+---------------------------------------+

有没有办法让

from_json
函数隐式将字符串数字转换为整数,这样带有字符串数字的记录就不会被移动到
corrupt_records

json apache-spark pyspark jsonschema implicit-conversion
1个回答
0
投票

由于您无法在同一字段中指定多种数据类型,因此实现目标的一种方法是先将这些字段转换为字符串类型,然后再转换为整数类型:

json_options = {"columnNameOfCorruptRecord":"corrupt_json"}
json_schema =  StructType([StructField('fruit', StringType(), True), 
                           StructField('quantity', StringType(), True), 
                           StructField('corrupt_json', StringType(), True)]
                          )

json_df = input_df.select(from_json("json_column", schema = json_schema, options=json_options).alias("data"))
df = json_df.select(
    "data.*"
)
df = df.withColumn("quantity", col("quantity").cast(IntegerType()))
df.show(truncate = False)

+----------+--------+------------+
|fruit     |quantity|corrupt_json|
+----------+--------+------------+
|Apple     |10      |NULL        |
|Banana    |20      |NULL        |
|Cherry    |30      |NULL        |
|Date      |40      |NULL        |
|Elderberry|50      |NULL        |
+----------+--------+------------+
© www.soinside.com 2019 - 2024. All rights reserved.