pyspark——对 Array(Integer()) 类型的列中的值求和的最佳方法

问题描述 投票:0回答:6

可以说这是我的数据框......

name | scores
Dan  |  [10,5,2,12]
Ann  |  [ 12,3,5]
Jon  |  [ ] 

所需的输出类似于

name | scores         | Total
Dan  |  [10,5,2,12]   | 29
Ann  |   [ 12,3,5]    | 20
Jon  |  [ ]           | 0

我按照...制作了一个 UDF

sum_cols = udf(lambda arr: if arr == [] then 0 else __builtins__.sum(arr),IntegerType())

df.withColumn('Total', sum_cols(col('scores'))).show()

但是,我了解到 UDF 与纯 pySpark 函数相比相对较慢。

有没有办法在没有 UDF 的情况下在 pySpark 中执行上面的代码?

apache-spark pyspark apache-spark-sql
6个回答
15
投票

您可以使用高阶 SQL 函数 AGGREGATE(函数式编程中的 reduce),如下所示:

import pyspark.sql.functions as F
df = df.select(
  'name',
  F.expr('AGGREGATE(scores, 0, (acc, x) -> acc + x)').alias('Total')
)

第一个参数是数组列,第二个参数是初始值(应该与您求和的值具有相同的类型,因此如果您的输入不是整数,您可能需要使用“0.0”或“DOUBLE(0)”等),第三个参数是argument 是一个 lambda 函数,它将数组的每个元素添加到累加器变量中(一开始这将被设置为初始值 0)。

转换将在单个投影运算符中运行,因此非常高效。此外,您不需要提前知道数组的大小,并且数组每行可以有不同的长度。


12
投票

对于 Spark 3.1+,您可以简单地调用

pyspark.sql.functions.aggregate
:

import pyspark.sql.functions as F
df = df.withColumn(
    "Total",
    F.aggregate("scores", F.lit(0), lambda acc, x: acc + x)
)

请注意,如果列类型不是整数,则应使用

F.lit(0.0)


感谢@Simon30的评论,我们也可以使用

df = df.withColumn(
    "Total",
    F.aggregate("scores", F.lit(0), operator.add)
)

3
投票

如果您不知道数组的长度(如您的示例中所示):

import pyspark.sql.functions as F
psaudo_counts = df.select('name').distinct().withColumn('score', F.lit(0))
df = df.select('name', F.explode('scores').alias('score')).unionByName(psaudo_counts)
df = df.groupby('name').agg(F.sum('name').alias('Total'))

如果您确实知道数组的长度:

import pyspark.sql.functions as F
length_of_array = 3
df = df.select('name', sum([F.col('scores').getItem(i) for i in range(length_of_array)]).alias('Total'))

感谢 cricket_007 的提示和这封旧邮件的固定长度想法


3
投票
import pyspark.sql.functions as F
df = df.select(
  'name',
  F.expr('AGGREGATE(scores,cast(0 as float), (acc, x) -> acc + x)').alias('Total')
)

只有在聚合函数中使用cast(0 as 'datatype')时才不会出现错误。


3
投票
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
data = spark.createDataFrame(
    [
        ('Dan', [10, 5, 2, 12]), 
        ('Ann', [12, 3, 5]),
        ('Jon', [])
    ],
    ['name', 'scores'])

data.show()

+----+--------------+
|name|        scores|
+----+--------------+
| Dan|[10, 5, 2, 12]|
| Ann|    [12, 3, 5]|
| Jon|            []|
+----+--------------+
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

udf_function = udf(lambda x: int(np.array(x, dtype=int).sum()), IntegerType())
data = data.withColumn('sum', udf_function('scores'))
data.show()

+----+--------------+---+
|name|        scores|sum|
+----+--------------+---+
| Dan|[10, 5, 2, 12]| 29|
| Ann|    [12, 3, 5]| 20|
| Jon|            []|  0|
+----+--------------+---+


2
投票

我建议使用 withColumn 方法进行更简单的制定/实现:

import pyspark.sql.functions as F
df = df.withColumn('Total',F.expr('AGGREGATE(scores, 0, (acc, x) -> acc + x)'))
© www.soinside.com 2019 - 2024. All rights reserved.