具有嵌套聚合的数据帧

问题描述 投票:1回答:2

我有json文件,看起来像这样:

{{"name":"jonh", "food":"tomato", "weight": 1},
 {"name":"jonh", "food":"carrot", "weight": 4},
 {"name":"bill", "food":"apple", "weight": 1},
 {"name":"john", "food":"tomato", "weight": 2},
 {"name":"bill", "food":"taco", "weight": 2}},
 {"name":"bill", "food":"taco", "weight": 4}},

我需要像这样创建新的json:

   {
     {"name":"jonh",
      "buy": [{"tomato": 3},{"carrot": 4}]
     },
     {"name":"bill",
      "buy": [{"apple": 1},{"taco": 6}]
     } 
   }

这是我的dataFrame

val df = Seq(
  ("john", "tomato", 1),
  ("john", "carrot", 4),
  ("bill", "apple", 1),
  ("john", "tomato", 2),
  ("bill", "taco", 2),
  ("bill", "taco", 4)            
).toDF("name", "food", "weight")

如何获得具有最终结构的数据帧? groupBy和agg给了我错误的结构

import org.apache.spark.sql.functions._
df.groupBy("name", "food").agg(sum("weight").as("weight"))
  .groupBy("name").agg(collect_list(struct("food", "weight")).as("acc"))

+----+------------------------+
|name|acc                     |
+----+------------------------+
|john|[[carrot,4], [tomato,3]]|
|bill|[[taco,6], [apple,1]]   |
+----+------------------------+

{"name":"john","acc":[{"food":"carrot","weight":4},{"food":"tomato","weight":3}]}
{"name":"bill","acc":[{"food":"taco","weight":6},{"food":"apple","weight":1}]}

请给我正确的方向如何解决它。

scala apache-spark
2个回答
1
投票

您可以手动转换值,通过迭代Rows,组装food-weight对,然后将它们转换为Map

val step1 = df.groupBy("name", "food").agg(sum("weight").as("weight")).
    groupBy("name").agg(collect_list(struct("food", "weight")).as("buy"))
val result = step1.map(row =>
    (row.getAs[String]("name"), row.getAs[Seq[Row]]("buy").map(map =>
        map.getAs[String]("food") -> map.getAs[Long]("weight")).toMap)
    ).toDF("name", "buy")
result.toJSON.show(false)

+---------------------------------------------+
|{"name":"john","buy":{"carrot":4,"tomato":3}}|
|{"name":"bill","buy":{"taco":6,"apple":1}}   |
+---------------------------------------------+

0
投票

您可以使用替换技术来实现所需的json格式

udf方式

udf函数适用于原始数据类型,因此replace函数可用于替换最终food中的weightdataframe字符串

import org.apache.spark.sql.functions._
def replaeUdf = udf((json: String) => json.replace("\"food\":", "").replace("\"weight\":", ""))

val temp = df.groupBy("name", "food").agg(sum("weight").as("weight"))
  .groupBy("name").agg(collect_list(struct(col("food"), col("weight"))).as("buy"))
  .toJSON.withColumn("value", replaeUdf(col("value")))

你应该输出dataframe作为

+-------------------------------------------------+
|value                                            |
+-------------------------------------------------+
|{"name":"john","buy":[{"carrot",4},{"tomato",3}]}|
|{"name":"bill","buy":[{"taco",6},{"apple",1}]}   |
+-------------------------------------------------+

regex_replace函数

regex_replace内置函数也可用于获得所需的输出

val temp = df.groupBy("name", "food").agg(sum("weight").as("weight"))
  .groupBy("name").agg(collect_list(struct(col("food"), col("weight"))).as("buy"))
  .toJSON.withColumn("value", regexp_replace(regexp_replace(col("value"), "\"food\":", ""), "\"weight\":", ""))
© www.soinside.com 2019 - 2024. All rights reserved.