如何在 pyspark 数据框中分解 arraytype 列

问题描述 投票:0回答:1

我有一个 pyspark 数据框,如下所示 我需要分解 Items 和 Value1 列。这是我目前的代码

df_ob_exploded = df.withColumn('op_it.objects',
                    F.explode(F.array(*[F.array(F.col('op_it.objects')[i]['Items'][j].Name,
                                F.array(*[F.col('op_it.Objects')[i]['Items'[j]['Items'][k].Name for k in range(9)]).cast('string'),
                                F.array(*[F.from_json(F.col('op_it.objects')[i]['Items'][k].Selected, sc).getItem('Value') for k in range(9)]).cast('string'),
                                F.from_json(F.col('op_it.objects')[i]['Items'][j].Selected,sc).getItem('Value'))for i in range(9) for j in range(9)]))).dropDuplicates()
                                
df_obj_json = df_ob_exploded.select('ID', F.col('op_it.object'[0].alias('Name'), F.col('op_it.objects')[1].alias('Items'),
                                          F.col('op_it.object'[2].alias('Value1'), F.col('op_it.objects')[1].alias('Value'))\
                                          .na.drop(how='all', subset=['Name','Items','Value1','Value'])
                                          

我无法分解 Items 和 Value1 列。

ID      Name        Items                   Value1      Value1
1       Contact     [platform,chat, , ,]    [,,,,,]     null
1       action      [,,,,,,]                [,,,,,]     windows
1       cycle       [,,,,,,]                [,,,,,]     article
python apache-spark pyspark
1个回答
0
投票

我假设数组的长度不同,因此简单的爆炸函数在这里不起作用 - 相反,您可以首先压缩这两列,然后从这些列中获取值,如下所示:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

data = [
    (1, "Contact", ["platform", "chat"], ["something", "something_else", "something_else_else"], None),
    (1, "action", [None], [None], "windows"),
    (1, "cycle", [None], [None], "article")
]

columns = ["ID", "Name", "Items", "Value1", "Value"]

df_obj_json = spark.createDataFrame(data, columns)

df_zipped = df_obj_json.withColumn("zipped", F.arrays_zip("Items", "Value1"))

df_exploded = df_zipped.select(
    "ID",
    "Name",
    F.explode("zipped").alias("zipped_col"),
    "Value"
)

df_final = df_exploded.select(
    "ID",
    "Name",
    df_exploded.zipped_col["Items"].alias("Item"),
    df_exploded.zipped_col["Value1"].alias("Value1"),
    "Value"
)

df_final.show()

# Output:-
# +---+-------+--------+-------------------+-------+
# | ID|   Name|    Item|             Value1|  Value|
# +---+-------+--------+-------------------+-------+
# |  1|Contact|platform|          something|   NULL|
# |  1|Contact|    chat|     something_else|   NULL|
# |  1|Contact|    NULL|something_else_else|   NULL|
# |  1| action|    NULL|               NULL|windows|
# |  1|  cycle|    NULL|               NULL|article|
# +---+-------+--------+-------------------+-------+
© www.soinside.com 2019 - 2024. All rights reserved.