我想在Spark中转换这个基本的SQL查询
select Grade, count(*) * 100.0 / sum(count(*)) over()
from StudentGrades
group by Grade
我试过在这样的火花中使用窗口函数
val windowSpec = Window.rangeBetween(Window.unboundedPreceding,Window.unboundedFollowing)
df1.select(
$"Arrest"
).groupBy($"Arrest").agg(sum(count("*")) over windowSpec,count("*")).show()
+------+--------------------------------------------------------------------
----------+--------+
|Arrest|sum(count(1)) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING)|count(1)|
+------+--------------------------------------------------------------------
----------+--------+
| true|
665517| 184964|
| false|
665517| 480553|
+------+------------------------------------------------------------------------------+--------+
但是,当我尝试除以计数(*)时,通过错误
df1.select(
$"Arrest"
).groupBy($"Arrest").agg(count("*")/sum(count("*")) over
windowSpec,count("*")).show()
不允许在另一个聚合函数的参数中使用聚合函数。请在子查询中使用内部聚合函数。
我的问题是,当我在第一个查询中使用sum()内的count()时,我没有收到在另一个聚合函数中使用聚合函数的任何错误,但为什么在第二个聚合函数中得到错误?
一个例子:
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
val df = sc.parallelize(Seq(
("A", "X", 2, 100), ("A", "X", 7, 100), ("B", "X", 10, 100),
("C", "X", 1, 100), ("D", "X", 50, 100), ("E", "X", 30, 100)
)).toDF("c1", "c2", "Val1", "Val2")
val df2 = df
.groupBy("c1")
.agg(sum("Val1").alias("sum"))
.withColumn("fraction", col("sum") / sum("sum").over())
df2.show
你需要根据自己的情况定制。例如。计算而不是总和。如下:
val df2 = df
.groupBy("c1")
.agg(count("*"))
.withColumn("fraction", col("count(1)") / sum("count(1)").over())
返回:
+---+--------+-------------------+
| c1|count(1)| fraction|
+---+--------+-------------------+
| E| 1|0.16666666666666666|
| B| 1|0.16666666666666666|
| D| 1|0.16666666666666666|
| C| 1|0.16666666666666666|
| A| 2| 0.3333333333333333|
+---+--------+-------------------+
你可以做x 100.我注意别名似乎不是按照总和工作,所以解决了这个问题,并在上面进行了比较。同样,您需要根据自己的具体情况进行定制,这是我研究的一般模块的一部分。