带有分组窗口的Spark窗口问题

问题描述 投票:-1回答:1

我想在窗口上方填充聚合,其窗口与选择分组依据的粒度不同。使用Scala sql。

Select c1,c2,c3,max(c4),max(c5),
Max(c4) over (partition by c1,c2,c3),
Avg(c5) over (partition by c1,c2,c3)
From temp_view 
Group by c1,c2,c3

得到错误说:

c4 and c5 not being part of Group by or use first().
apache-spark apache-spark-sql apache-spark-dataset
1个回答
0
投票

正如我在评论中所说,GroupByPartitionBy在几个方面具有相同的目的。如果使用GroupBy,则所有聚合仅在这些GroupBy列上进行。当您使用partition by时,也会发生同样的事情。两者之间唯一的主要区别是groupBy减少编号。记录,在选择中,我们只需要使用在分组依据中使用的列,但在ParitionBy中,记录数不会减少。取而代之的是,它将添加一个额外的聚合列,并且在选择中,我们可以使用N no。列。

对于您的问题,您正在使用分组依据中的c1,c2,c3列,并将Max(c4),AVG(c5)与partition by一起使用,因此它给您带来错误。对于您的用例,可以使用以下查询之一:

Select c1,c2,c3,max(c4),max(c5)
From temp_view 
Group by c1,c2,c3

OR

Select c1,c2,c3,
Max(c4) over (partition by c1,c2,c3),
Avg(c5) over (partition by c1,c2,c3)
From temp_view

下面的示例将为您提供清晰的图片,

scala> spark.sql("""SELECT * from table""").show()
+---+----------------+-------+------+
| ID|            NAME|COMPANY|SALARY|
+---+----------------+-------+------+
|  1|    Gannon Chang|    ABC|440993|
|  2|   Hashim Morris|    XYZ| 49140|
|  3|       Samson Le|    ABC|413890|
|  4|   Brandon Doyle|    XYZ|384118|
|  5|    Jacob Coffey|    BCD|504819|
|  6|   Dillon Holder|    ABC|734086|
|  7|Salvador Vazquez|    NGO|895082|
|  8|    Paki Simpson|    BCD|305046|
|  9|   Laith Stewart|    ABC|943750|
| 10|  Simon Whitaker|    NGO|561896|
| 11|   Denton Torres|    BCD| 10442|
| 12|Garrison Sellers|    ABC| 53024|
| 13| Theodore Bolton|    TTT|881521|
| 14|   Kamal Roberts|    TTT|817422|
+---+----------------+-------+------+

//You can only use column to select that is in group by
scala> spark.sql("""SELECT COMPANY, max(SALARY) from table group by COMPANY""").show()
+-------+-----------+
|COMPANY|max(SALARY)|
+-------+-----------+
|    NGO|     895082|
|    BCD|     504819|
|    XYZ|     384118|
|    TTT|     881521|
|    ABC|     943750|
+-------+-----------+

//It will give error if you select all column or column other than Group By

scala> spark.sql("""SELECT *, max(SALARY) from table group by COMPANY""").show()
org.apache.spark.sql.AnalysisException: expression 'table.`ID`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
Aggregate [COMPANY#94], [ID#92, NAME#93, COMPANY#94, SALARY#95L, max(SALARY#95L) AS max(SALARY)#213L]
+- SubqueryAlias table
   +- Relation[ID#92,NAME#93,COMPANY#94,SALARY#95L] parquet

  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:92)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.org$apache$spark$sql$catalyst$analysis$CheckAnalysis$class$$anonfun$$checkValidAggregateExpression$1(CheckAnalysis.scala:187)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$9.apply(CheckAnalysis.scala:220)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$9.apply(CheckAnalysis.scala:220)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:220)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
  at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
  at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
  at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
  at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
  at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:641)
  ... 49 elided


//But you can select all columns with partition by
scala> spark.sql("""SELECT *, Max(SALARY) over (PARTITION BY COMPANY) as Max_Salary from table""").show()
+---+----------------+-------+------+----------+
| ID|            NAME|COMPANY|SALARY|Max_Salary|
+---+----------------+-------+------+----------+
|  7|Salvador Vazquez|    NGO|895082|    895082|
| 10|  Simon Whitaker|    NGO|561896|    895082|
|  5|    Jacob Coffey|    BCD|504819|    504819|
|  8|    Paki Simpson|    BCD|305046|    504819|
| 11|   Denton Torres|    BCD| 10442|    504819|
|  2|   Hashim Morris|    XYZ| 49140|    384118|
|  4|   Brandon Doyle|    XYZ|384118|    384118|
| 13| Theodore Bolton|    TTT|881521|    881521|
| 14|   Kamal Roberts|    TTT|817422|    881521|
|  1|    Gannon Chang|    ABC|440993|    943750|
|  3|       Samson Le|    ABC|413890|    943750|
|  6|   Dillon Holder|    ABC|734086|    943750|
|  9|   Laith Stewart|    ABC|943750|    943750|
| 12|Garrison Sellers|    ABC| 53024|    943750|
+---+----------------+-------+------+----------+
© www.soinside.com 2019 - 2024. All rights reserved.