我有一个 PySpark DataFrame,其中包含一个字符串列,其中包含结构为对象数组的 JSON 数据。但是,这些 JSON 对象的架构可能因行而异。
这是 DataFrame 中两行的示例:
+---------------------------------------------------------------------------------------------------+
| column |
+---------------------------------------------------------------------------------------------------+
| [{"_t":"TypeA","id":"123","value":"100","details":{"key1":"val1","key2":"val2"}}] |
| [{"_t":"TypeB","id":"456","extra_field":"info","other_details":{"key3":"val3","key4":"val4"}}] |
+---------------------------------------------------------------------------------------------------+
我需要将列从字符串类型转换为数组类型,并动态推断架构以适应 JSON 结构中的所有变化。目前,我的方法仅适用于第一行,不考虑架构变化。
这是我迄今为止尝试过的:
from pyspark.sql.functions import schema_of_json, from_json, col
json_sample = df.select("column").head()[0] # Sample JSON from the first row
inferred_schema = schema_of_json(json_sample) # Infer schema from the sample
# Convert the column to array type using the inferred schema
df = df.withColumn("column", from_json(col("column"), inferred_schema))
问题: 推断的架构仅与第一行的 JSON 结构匹配。因此,具有不同模式的行(例如第二行)无法正确解析。
我正在寻找一种方法: 动态推断列的架构,以处理行间 JSON 对象的所有变化。 应用推断的架构将列转换为数组类型,而不会丢失任何数据。
我认为唯一的方法是合并数据框中所有行的模式,并在创建新列时将其用作推断的模式。它效率不高,但如果你的数据框中只有这样的异构数据,我看不到任何其他选择。所以这不是一个理想的情况,但你可以让它发挥作用。这里有一个解决方案
样本数据
data = [
('[{"_t":"TypeA","id":"123","value":"100","details":{"key1":"val1","key2":"val2"}}]',),
('[{"_t":"TypeB","id":"456","extra_field":"info","other_details":{"key3":"val3","key4":"val4"}}]',)
]
df = spark.createDataFrame(data, ["column"])
从数据框中收集所有架构行
json_rows = df.select("column").rdd.flatMap(lambda row: row).collect()
用于从行推断模式并合并所有行的模式的实用函数
def infer_schema(json_strings):
schemas = []
for json_string in json_strings:
parsed = json.loads(json_string)
df = spark.createDataFrame(parsed)
schemas.append(df.schema)
return schemas
def merge_schemas(schemas):
base_schema = StructType()
for schema in schemas:
for field in schema:
if field.name not in [f.name for f in base_schema.fields]:
base_schema.add(field)
return ArrayType(base_schema)
获取单个合并模式
merged_schema = merge_schemas(infer_schema(json_rows))
创建新列时使用合并的架构
df_parsed = df.withColumn("column", from_json(col("column"), merged_schema))
df_parsed.show(truncate=False)
df_parsed.printSchema()
产品
+--------------------------------------------------------------+
|column |
+--------------------------------------------------------------+
|[{TypeA, {key1 -> val1, key2 -> val2}, 123, 100, NULL, NULL}] |
|[{TypeB, NULL, 456, NULL, info, {key3 -> val3, key4 -> val4}}]|
+--------------------------------------------------------------+
root
|-- column: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _t: string (nullable = true)
| | |-- details: map (nullable = true)
| | | |-- key: string
| | | |-- value: string (valueContainsNull = true)
| | |-- id: string (nullable = true)
| | |-- value: string (nullable = true)
| | |-- extra_field: string (nullable = true)
| | |-- other_details: map (nullable = true)
| | | |-- key: string
| | | |-- value: string (valueContainsNull = true)