我正在与 PySpark 作战。我有一本关于 Python 的字典,名为 data,例如
{
"key1": "xx:xx:xx",
"key2": "xxxxxxxxxxx",
"key9": "xxxxxxxxxxx",
"key10": [
{
"key2": "xxxx-xx-xxTxx:xx:xxZ",
"key3": "xxx",
"key4": "xxxx",
"key5": null,
"key6": "xxxx",
"key7": "xxxxxxxxxxxxxxxxxx"
}
],
"key11": [],
"key12": "x.xx",
"key13": "xxxxxxxx",
"key14": "xxxx-xx-xxTxx:xx:xx.xxx"
}
我正在尝试使用以下代码从中创建一个数据框:
schema = StructType([
StructField("key1", StringType(), True),
StructField("key2", StringType(), True),
StructField("key9", StringType(), True),
StructField("key10", StringType(), True),
StructField("key11", StringType(), True),
StructField("key12", StringType(), True),
StructField("key13", StringType(), True),
StructField("key14", StringType(), True)
])
df = spark.createDataFrame(data, schema=schema)
我面临的问题是,在创建数据帧时,key10 值(它是一个 JSON 字符串)被重构。列值看起来像这样重构:
[
{
key2=xxxx-xx-xxTxx:xx:xxZ,
key3=xxx,
key4=xxxx,
key5=null,
key6=xxxx,
key7=xxxxxxxxxxxxxxxxxx
}
]
':' 被替换为 '='...key10 需要是一个通用字符串,而不是 MapType、ArrayType 或 StructType,因为 JSON 的结构可能会有所不同,所以我无法将 key10 的模式定义为以下任何一种这些类型。
我尝试过使用 to_json() 方法,它适用于第一级 JSON 键,但第二级、第三级等 JSON 级的值也被重构,用“=”修改“:”。
稍后读取数据帧列时,我无法将“=”替换为“:”,因为其中一个值包含可能包含“=”的 Base64 图像。
我已经找到了解决方案。它可以改进,因为这是同步执行的,当您有大量数据需要迭代时,这可以利用性能问题,但这是它对我有用的唯一解决方案。
在创建 DataFrame 之前,迭代嵌套字典和 json.dump 将键 'key10' 的值转储到格式良好的字符串中,而不是在创建 DataFrame 后使用 pyspark 的 to_json 方法。
data = [
{
"key1": "xx:xx:xx",
"key2": "xxxxxxxxxxx",
"key9": "xxxxxxxxxxx",
"key10": [
{
"key2": "xxxx-xx-xxTxx:xx:xxZ",
"key3": "xxx",
"key4": "xxxx",
"key5": null,
"key6": "xxxx",
"key7": "xxxxxxxxxxxxxxxxxx"
}
],
"key11": [],
"key12": "x.xx",
"key13": "xxxxxxxx",
"key14": "xxxx-xx-xxTxx:xx:xx.xxx"
},
{
"key1": "xx:xx:xx",
"key2": "xxxxxxxxxxx",
"key9": "xxxxxxxxxxx",
"key10": [
{
"key2": "xxxx-xx-xxTxx:xx:xxZ",
"key3": "xxx",
"key4": "xxxx",
"key5": null,
"key6": "xxxx",
"key7": "xxxxxxxxxxxxxxxxxx"
}
],
"key11": [],
"key12": "x.xx",
"key13": "xxxxxxxx",
"key14": "xxxx-xx-xxTxx:xx:xx.xxx"
}
]
for item in data:
item['key10'] = json.dumps(item['key10'])
df = spark.createDataFrame(data)