我在 pyspark 中有以下代码,生成一个表格,显示列的不同值及其计数。我想要另一列显示每行占总计数的百分比。我该怎么做?
difrgns = (df1
.groupBy("column_name")
.count()
.sort(desc("count"))
.show())
提前致谢!
如果对窗口化不满意,则作为替代方案的示例,正如评论所暗示的那样,这是更好的方法:
# Running in Databricks, not all stuff required
from pyspark.sql import Row
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
#from pyspark.sql.functions import col
data = [("A", "X", 2, 100), ("A", "X", 7, 100), ("B", "X", 10, 100),
("C", "X", 1, 100), ("D", "X", 50, 100), ("E", "X", 30, 100)]
rdd = sc.parallelize(data)
someschema = rdd.map(lambda x: Row(c1=x[0], c2=x[1], val1=int(x[2]), val2=int(x[3])))
df = sqlContext.createDataFrame(someschema)
tot = df.count()
df.groupBy("c1") \
.count() \
.withColumnRenamed('count', 'cnt_per_group') \
.withColumn('perc_of_count_total', (F.col('cnt_per_group') / tot) * 100 ) \
.show()
返回:
+---+-------------+-------------------+
| c1|cnt_per_group|perc_of_count_total|
+---+-------------+-------------------+
| E| 1| 16.666666666666664|
| B| 1| 16.666666666666664|
| D| 1| 16.666666666666664|
| C| 1| 16.666666666666664|
| A| 2| 33.33333333333333|
+---+-------------+-------------------+
我专注于 Scala,这看起来更容易。也就是说,通过评论建议的解决方案使用 Window,这就是我在 Scala 中使用 over() 所做的事情。
当
df
本身是一个更复杂的转换链并且运行两次(首先计算总数,然后分组和计算百分比)成本太高时,可以利用窗口函数来实现类似的结果。 这是一个更通用的代码(扩展 bluephantom 的 answer),可以与多个分组维度一起使用:
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
data = [("A", "X", 2, 100), ("A", "X", 7, 100), ("B", "X", 10, 100),
("C", "X", 1, 100), ("D", "X", 50, 100), ("E", "X", 30, 100)]
rdd = sc.parallelize(data)
someschema = rdd.map(lambda x: Row(c1=x[0], c2=x[1], val1=int(x[2]), val2=int(x[3])))
df = (sqlContext.createDataFrame(someschema)
.withColumn('total_count', count('*').over(Window.partitionBy(<your N-1 dimensions here>)))
.groupBy(<your N dimensions here>)
.agg((count('*')/first(col('total_count'))).alias('percent_total'))
)
df.show()
您可以
groupby
并使用 agg
进行聚合。例如,对于以下 DataFrame:
+--------+-----+
|category|value|
+--------+-----+
| a| 1|
| b| 2|
| a| 3|
+--------+-----+
您可以使用:
import pyspark.sql.functions as F
(
df
.groupby("category")
.agg(
F.count("value").alias("count"),
(F.count("value") / df.count()).alias("percentage")
)
.show()
)
输出:
+--------+-----+------------------+
|category|count| percentage|
+--------+-----+------------------+
| b| 1|0.3333333333333333|
| a| 2|0.6666666666666666|
+--------+-----+------------------+
或者,您可以使用SQL:
df.createOrReplaceTempView("df")
(
spark
.sql(
"""
SELECT category,
COUNT(*) AS count,
COUNT(*) / (SELECT COUNT(*) FROM df) AS ratio
FROM df
GROUP BY category
"""
)
.show()
)
更加“幸福”的输出,消除多余的小数并排序
import pyspark.sql.functions as func
count_cl = data_fr.count()
data_fr \
.groupBy('col_name') \
.count() \
.withColumn('%', func.round((func.col('count')/count_cl)*100,2)) \
.orderBy('count', ascending=False) \
.show(4, False)
+--------------+-----+----+
| col_name |count| %|
+--------------------+----+
| C.LQQQQ |30957|8.91|
| C.LQQQQ |29688|8.54|
| C-LQQQQ |29625|8.52|
| CLQQQQ |29342|8.44|
+--------------------+----+