如何在 Spark 中动态应用数组列类型

问题描述 投票:0回答:1

我有一个 PySpark DataFrame,其中包含一个字符串列,其中包含结构为对象数组的 JSON 数据。但是,这些 JSON 对象的架构可能因行而异。

这是 DataFrame 中两行的示例:

+---------------------------------------------------------------------------------------------------+
| column                                                                                       |
+---------------------------------------------------------------------------------------------------+
| [{"_t":"TypeA","id":"123","value":"100","details":{"key1":"val1","key2":"val2"}}]                 |
| [{"_t":"TypeB","id":"456","extra_field":"info","other_details":{"key3":"val3","key4":"val4"}}]    |
+---------------------------------------------------------------------------------------------------+
  • 第一行:包含带有字段 _t、id、value 和 a 的 JSON 对象 嵌套细节对象。
  • 第二行:包含带有字段的 JSON 对象 _t、id、extra_field 和嵌套的 other_details 对象。

我需要将列从字符串类型转换为数组类型,并动态推断架构以适应 JSON 结构中的所有变化。目前,我的方法仅适用于第一行,不考虑架构变化。

这是我迄今为止尝试过的:

from pyspark.sql.functions import schema_of_json, from_json, col

json_sample = df.select("column").head()[0]  # Sample JSON from the first row
inferred_schema = schema_of_json(json_sample)    # Infer schema from the sample

# Convert the column to array type using the inferred schema
df = df.withColumn("column", from_json(col("column"), inferred_schema))

问题: 推断的架构仅与第一行的 JSON 结构匹配。因此,具有不同模式的行(例如第二行)无法正确解析。

我正在寻找一种方法: 动态推断列的架构,以处理行间 JSON 对象的所有变化。 应用推断的架构将列转换为数组类型,而不会丢失任何数据。

python apache-spark pyspark apache-spark-sql spark-streaming
1个回答
0
投票

我认为唯一的方法是合并数据框中所有行的模式,并在创建新列时将其用作推断的模式。它效率不高,但如果你的数据框中只有这样的异构数据,我看不到任何其他选择。所以这不是一个理想的情况,但你可以让它发挥作用。这里有一个解决方案

样本数据

data = [
    ('[{"_t":"TypeA","id":"123","value":"100","details":{"key1":"val1","key2":"val2"}}]',),
    ('[{"_t":"TypeB","id":"456","extra_field":"info","other_details":{"key3":"val3","key4":"val4"}}]',)
]

df = spark.createDataFrame(data, ["column"])

从数据框中收集所有架构行

json_rows = df.select("column").rdd.flatMap(lambda row: row).collect()

用于从行推断模式并合并所有行的模式的实用函数

def infer_schema(json_strings):
    schemas = []
    for json_string in json_strings:
        parsed = json.loads(json_string)
        df = spark.createDataFrame(parsed)
        schemas.append(df.schema)
    return schemas

def merge_schemas(schemas):
    base_schema = StructType()
    for schema in schemas:
        for field in schema:
            if field.name not in [f.name for f in base_schema.fields]:
                base_schema.add(field)
    return ArrayType(base_schema)

获取单个合并模式

merged_schema = merge_schemas(infer_schema(json_rows))

创建新列时使用合并的架构

df_parsed = df.withColumn("column", from_json(col("column"), merged_schema))

df_parsed.show(truncate=False)

df_parsed.printSchema()

产品

+--------------------------------------------------------------+
|column                                                        |
+--------------------------------------------------------------+
|[{TypeA, {key1 -> val1, key2 -> val2}, 123, 100, NULL, NULL}] |
|[{TypeB, NULL, 456, NULL, info, {key3 -> val3, key4 -> val4}}]|
+--------------------------------------------------------------+

root
 |-- column: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _t: string (nullable = true)
 |    |    |-- details: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |    |    |-- extra_field: string (nullable = true)
 |    |    |-- other_details: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
© www.soinside.com 2019 - 2024. All rights reserved.