如何在pyspark中将rdd转换为嵌套的json

问题描述 投票:-1回答:1

我是新手,我有以下格式的数据

类别,子类别,名称

Food,Thai,Restaurant A
Food,Thai,Restaurant B
Food, Chinese, Restaurant C
Lodging, Hotel, Hotel A

我希望数据采用以下格式

{Category : Food , Subcategories : [ {subcategory : Thai , names : [Restaurant A , Restaurant B] }, {subcategory : Chinese , names : [Restaurant C]}]}

{Category : Hotel , Subcategories : [ {subcategory : Lodging , names : [Hotel A] }]}

有人可以帮我解决这个问题,使用pyspark RDD吗?

谢谢!

json apache-spark pyspark apache-spark-sql rdd
1个回答
0
投票

这里有用的解决方案:

创建一个窗口函数来收集名称groupBy Category和Subcategory

  from pyspark.sql import functions as F
  from pyspark.sql import Window

  groupByCateWind = Window.partitionBy("Category", "Subcategory")

    finalDf = df.withColumn("names", F.collect_list("Name").over(groupByCateWind)) \
        .withColumn("Subcategories", F.struct("Subcategory", "names")) \
        .groupBy("Category").agg(F.collect_set("Subcategories").alias("Subcategories")).toJSON()
  1. 收集名称groupBy以上的Window函数
  2. 使用Subcategory和names列创建具有Struct类型的子类别列。
  3. 再次按类别分组并收集子类别列值。

输出如下:

+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"Category":"Food","Subcategories":[{"Subcategory":"Thai","names":["Restaurant A","Restaurant B"]},{"Subcategory":" Chinese","names":[" Restaurant C"]}]}|
|{"Category":"Lodging","Subcategories":[{"Subcategory":" Hotel","names":[" Hotel A"]}]}                                                                   |
+---------------------------------------------------------------------------------------------------------------------------------------------------------+
© www.soinside.com 2019 - 2024. All rights reserved.