我有一个 pyspark 数据框,如下所示 我需要分解 Items 和 Value1 列。这是我目前的代码
df_ob_exploded = df.withColumn('op_it.objects',
F.explode(F.array(*[F.array(F.col('op_it.objects')[i]['Items'][j].Name,
F.array(*[F.col('op_it.Objects')[i]['Items'[j]['Items'][k].Name for k in range(9)]).cast('string'),
F.array(*[F.from_json(F.col('op_it.objects')[i]['Items'][k].Selected, sc).getItem('Value') for k in range(9)]).cast('string'),
F.from_json(F.col('op_it.objects')[i]['Items'][j].Selected,sc).getItem('Value'))for i in range(9) for j in range(9)]))).dropDuplicates()
df_obj_json = df_ob_exploded.select('ID', F.col('op_it.object'[0].alias('Name'), F.col('op_it.objects')[1].alias('Items'),
F.col('op_it.object'[2].alias('Value1'), F.col('op_it.objects')[1].alias('Value'))\
.na.drop(how='all', subset=['Name','Items','Value1','Value'])
我无法分解 Items 和 Value1 列。
ID Name Items Value1 Value1
1 Contact [platform,chat, , ,] [,,,,,] null
1 action [,,,,,,] [,,,,,] windows
1 cycle [,,,,,,] [,,,,,] article
我假设数组的长度不同,因此简单的爆炸函数在这里不起作用 - 相反,您可以首先压缩这两列,然后从这些列中获取值,如下所示:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
data = [
(1, "Contact", ["platform", "chat"], ["something", "something_else", "something_else_else"], None),
(1, "action", [None], [None], "windows"),
(1, "cycle", [None], [None], "article")
]
columns = ["ID", "Name", "Items", "Value1", "Value"]
df_obj_json = spark.createDataFrame(data, columns)
df_zipped = df_obj_json.withColumn("zipped", F.arrays_zip("Items", "Value1"))
df_exploded = df_zipped.select(
"ID",
"Name",
F.explode("zipped").alias("zipped_col"),
"Value"
)
df_final = df_exploded.select(
"ID",
"Name",
df_exploded.zipped_col["Items"].alias("Item"),
df_exploded.zipped_col["Value1"].alias("Value1"),
"Value"
)
df_final.show()
# Output:-
# +---+-------+--------+-------------------+-------+
# | ID| Name| Item| Value1| Value|
# +---+-------+--------+-------------------+-------+
# | 1|Contact|platform| something| NULL|
# | 1|Contact| chat| something_else| NULL|
# | 1|Contact| NULL|something_else_else| NULL|
# | 1| action| NULL| NULL|windows|
# | 1| cycle| NULL| NULL|article|
# +---+-------+--------+-------------------+-------+