我目前正在使用 AWS Glue 和 PySpark。我正在尝试手动创建 Spark 模式并将其应用到数据框以修复某些列的一些问题。
我当前的问题是具有结构类型的列。我的数据只有通过 Glue 读取的 JSON 中提供的 6 个字段,但将来可能有 8 个字段。如果我使用到目前为止收到的 6 个字段构建架构,它工作正常,但如果我使用应该获得的 8 个字段构建架构,则会收到以下错误: ValueError: field name_struct: Length of object (6)与字段长度 (8) 不匹配。
应用更新后的架构的最佳方法是什么?我认为我应该将一个函数应用于我的结构列,并创建具有空值的缺失键,以便在我的数据中包含预期的 8 个键。关于如何做到这一点有什么想法吗?如果我这样做,我认为对于所有结构类型列,我应该解析架构,比较列中的数据(asDict()?)和架构之间的键,最后添加缺少的键。我感觉 Spark 有更好的方法,或者可能还有一些我尚未找到的参数可以在创建数据帧时自动修复该问题?
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
spark = SparkSession.builder.master("local[1]") \
.appName('test_schema') \
.getOrCreate()
data = [
(32, ("James", "Smith"), ("street", 42)),
(18, ("Nina", "Smith"), ("street 2", 12))
]
schema = StructType([
StructField('age', IntegerType(), True),
StructField('name_struct', StructType([
StructField('first_name', StringType(), True),
StructField('last_name', StringType(), True)
])),
StructField('address_struct', StructType([
StructField('street', StringType(), True),
StructField('number', IntegerType(), True)
]))
])
df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
df.show()
schema2 = StructType([
StructField('age', IntegerType(), True),
StructField('name_struct', StructType([
StructField('first_name', StringType(), True),
StructField('last_name', StringType(), True),
StructField('middle_name', StringType(), True)
])),
StructField('address_struct', StructType([
StructField('street', StringType(), True),
StructField('number', IntegerType(), True),
StructField('zip_code', IntegerType(), True)
]))
])
# fix data here?
df2 = spark.createDataFrame(data=data, schema=schema2)
# error here: ValueError: field name_struct: Length of object (2) does not match with length of fields (3)
df2.printSchema()
df2.show()
谢谢!
尝试此代码,请将 rdd 替换为您的 rdd,在定义模式之前提及此行
#
data_class
=[line.strip().split(",")[:5] for line in data1]
data_class2
=[line.strip().split(",")[:5] for line in data2]