在pyspark(python)中将json字符串扩展到多列

问题描述 投票:0回答:1

我需要将 Json 对象(b 列)扩展到多列。

从这张表来看,

A 栏 B 栏
id1 [{a:1,b:2}]
id2 [{a:1,b:2,c:3,d:4}]

到这张桌子,

A 栏 a b c d
id1 1 2
id2 1 2 3 4

我尝试过从本地和 Spark 转换数据帧,但都没有成功。

从本地,我通过多次循环提取了B列中的kv(这一步成功了)。

但是当我尝试将提取的kv(在字典结构中)转换为数据帧时,发生了此错误:“ValueError:所有数组必须具有相同的长度”。

因为Json对象中的键c和键d是空的。所以没能做到。

本案例中提到了以下答案。

将包含 JSON 对象的 Dataframe 扩展为更大的 Dataframe


从spark,当将pandas数据帧转换为spark数据帧时,我得到了类型错误(像longtype和stringtype之类的东西无法被识别)。

因此,我将 pandas 数据框转换为字符串类型

df.astype(str)
,然后我可以将其转换为 Spark 数据框。

def func(df):    
    spark = (
        SparkSession.builder.appName("data")
        .enableHiveSupport()
        .getOrCreate()
    )
    

    df1 = spark.createDataFrame(df)

现在,当我尝试扩展它时......

for i in df.columns:
 if i == 'a_column':
    # Since the rows became string instead of list.
    # I need to remove the first and last characters which are the [ and ].
    # But I get error here: failed due to error Column is not iterable
    df.withColumn(i,substring(i,2,length(i)))
    df.withColumn(i,substring(i,1,length(i)-1))
    
    # transform each row (json string) to json object
    # But I get error here: ValueError: 'json' is not in list ; AttributeError: json
    # I assume the x.json means convert row to json obejct?
    df = df.map(lambda x:x.json)
    print(df.take(10))

本案例中提到了以下答案。 我无法对模式进行硬编码,因为有很多不同的 JSON 列。

Pyspark:将列中的 json 分解为多列

Pyspark:解析一列 json 字符串

请有人帮忙。你能告诉我如何从本地和火花中做到这一点吗?

每一个欣赏。

python json apache-spark transform
1个回答
0
投票

杰克9406。您可以仅使用 Spark 中的内置 SQL 函数来实现相同的功能,如下所示。

from pyspark.sql.functions import col, expr, map_entries

# Sample data
data = [
    ("id1", {"a": 1, "b": 2}),
    ("id2", {"a": 1, "b": 2, "c": 3, "d": 4}),
]

# Make Dataframe
df = spark.createDataFrame(data, ["id", "B"])

df = df.withColumn("entries", map_entries(col("B")))
keys = df.selectExpr("explode(entries.key)").distinct().rdd.flatMap(lambda x: x).collect()

for key in keys:
    df = df.withColumn(key, expr(f"transform(entries, x -> if(x.key = '{key}', x.value, NULL))"))
    df = df.withColumn(key, expr(f"get(filter({key}, x -> x is not NULL), 0)"))

df = df.drop(col("entries"))

df.display()

上面的代码主要由三个部分组成:

  1. 将 JSON 转换为可迭代的数组类型。
  2. 从 JSON 中提取所有键值以创建列。
  3. 使用
    transform
    filter
    提取与创建的列匹配的数据。

但是,这种方法可能不是最好的选择。在 Scala 中编写 UDF 可以使代码更简单、运行更高效。

如果您在编写本地代码(使用 pandas)时需要其他帮助,请发表评论。

希望我的回答对你有帮助。

谢谢你。

© www.soinside.com 2019 - 2024. All rights reserved.