我有一个 PySpark DataFrame 架构,其中
quantity
字段指定为 IntegerType
。但是,当 JSON 数据包含数字的字符串表示形式(例如,"30"
)时,记录将移动到 corrupt_records
。
from pyspark.sql.functions import from_json, col, when
from pyspark.sql.types import StringType, StructType, StructField, IntegerType
data = [
("{'fruit':'Apple', 'quantity':10}",),
("{'fruit':'Banana', 'quantity':20}",),
("{'fruit':'Cherry', 'quantity':'30'}",),
("{'fruit':'Date', 'quantity':'40'}",),
("{'fruit':'Elderberry', 'quantity':'50'}",)
]
# Define schema
schema = StructType([
StructField("json_column", StringType(), True)
])
# Create DataFrame
input_df = spark.createDataFrame(data, schema)
input_df.show(truncate=False)
+---------------------------------------+
|json_column |
+---------------------------------------+
|{'fruit':'Apple', 'quantity':10} |
|{'fruit':'Banana', 'quantity':20} |
|{'fruit':'Cherry', 'quantity':'30'} |
|{'fruit':'Date', 'quantity':'40'} |
|{'fruit':'Elderberry', 'quantity':'50'}|
+---------------------------------------+
json_options = {"columnNameOfCorruptRecord":"corrupt_json"}
json_schema = StructType([StructField('fruit', StringType(), True),
StructField('quantity', IntegerType(), True),
StructField('corrupt_json', StringType(), True)]
)
json_df = input_df.select(from_json("json_column", schema = json_schema, options=json_options).alias("data"))
df = json_df.select("data.*")
df.show(truncate = False)
+----------+--------+---------------------------------------+
|fruit |quantity|corrupt_json |
+----------+--------+---------------------------------------+
|Apple |10 |NULL |
|Banana |20 |NULL |
|Cherry |NULL |{'fruit':'Cherry', 'quantity':'30'} |
|Date |NULL |{'fruit':'Date', 'quantity':'40'} |
|Elderberry|NULL |{'fruit':'Elderberry', 'quantity':'50'}|
+----------+--------+---------------------------------------+
有没有办法让
from_json
函数隐式将字符串数字转换为整数,这样带有字符串数字的记录就不会被移动到corrupt_records
?
由于您无法在同一字段中指定多种数据类型,因此实现目标的一种方法是先将这些字段转换为字符串类型,然后再转换为整数类型:
json_options = {"columnNameOfCorruptRecord":"corrupt_json"}
json_schema = StructType([StructField('fruit', StringType(), True),
StructField('quantity', StringType(), True),
StructField('corrupt_json', StringType(), True)]
)
json_df = input_df.select(from_json("json_column", schema = json_schema, options=json_options).alias("data"))
df = json_df.select(
"data.*"
)
df = df.withColumn("quantity", col("quantity").cast(IntegerType()))
df.show(truncate = False)
+----------+--------+------------+
|fruit |quantity|corrupt_json|
+----------+--------+------------+
|Apple |10 |NULL |
|Banana |20 |NULL |
|Cherry |30 |NULL |
|Date |40 |NULL |
|Elderberry|50 |NULL |
+----------+--------+------------+