可以说这是我的数据框......
name | scores
Dan | [10,5,2,12]
Ann | [ 12,3,5]
Jon | [ ]
所需的输出类似于
name | scores | Total
Dan | [10,5,2,12] | 29
Ann | [ 12,3,5] | 20
Jon | [ ] | 0
我按照...制作了一个 UDF
sum_cols = udf(lambda arr: if arr == [] then 0 else __builtins__.sum(arr),IntegerType())
df.withColumn('Total', sum_cols(col('scores'))).show()
但是,我了解到 UDF 与纯 pySpark 函数相比相对较慢。
有没有办法在没有 UDF 的情况下在 pySpark 中执行上面的代码?
您可以使用高阶 SQL 函数 AGGREGATE(函数式编程中的 reduce),如下所示:
import pyspark.sql.functions as F
df = df.select(
'name',
F.expr('AGGREGATE(scores, 0, (acc, x) -> acc + x)').alias('Total')
)
第一个参数是数组列,第二个参数是初始值(应该与您求和的值具有相同的类型,因此如果您的输入不是整数,您可能需要使用“0.0”或“DOUBLE(0)”等),第三个参数是argument 是一个 lambda 函数,它将数组的每个元素添加到累加器变量中(一开始这将被设置为初始值 0)。
转换将在单个投影运算符中运行,因此非常高效。此外,您不需要提前知道数组的大小,并且数组每行可以有不同的长度。
pyspark.sql.functions.aggregate
:
import pyspark.sql.functions as F
df = df.withColumn(
"Total",
F.aggregate("scores", F.lit(0), lambda acc, x: acc + x)
)
请注意,如果列类型不是整数,则应使用
F.lit(0.0)
。
感谢@Simon30的评论,我们也可以使用
df = df.withColumn(
"Total",
F.aggregate("scores", F.lit(0), operator.add)
)
如果您不知道数组的长度(如您的示例中所示):
import pyspark.sql.functions as F
psaudo_counts = df.select('name').distinct().withColumn('score', F.lit(0))
df = df.select('name', F.explode('scores').alias('score')).unionByName(psaudo_counts)
df = df.groupby('name').agg(F.sum('name').alias('Total'))
如果您确实知道数组的长度:
import pyspark.sql.functions as F
length_of_array = 3
df = df.select('name', sum([F.col('scores').getItem(i) for i in range(length_of_array)]).alias('Total'))
感谢 cricket_007 的提示和这封旧邮件的固定长度想法
import pyspark.sql.functions as F
df = df.select(
'name',
F.expr('AGGREGATE(scores,cast(0 as float), (acc, x) -> acc + x)').alias('Total')
)
只有在聚合函数中使用cast(0 as 'datatype')时才不会出现错误。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = spark.createDataFrame(
[
('Dan', [10, 5, 2, 12]),
('Ann', [12, 3, 5]),
('Jon', [])
],
['name', 'scores'])
data.show()
+----+--------------+
|name| scores|
+----+--------------+
| Dan|[10, 5, 2, 12]|
| Ann| [12, 3, 5]|
| Jon| []|
+----+--------------+
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
udf_function = udf(lambda x: int(np.array(x, dtype=int).sum()), IntegerType())
data = data.withColumn('sum', udf_function('scores'))
data.show()
+----+--------------+---+
|name| scores|sum|
+----+--------------+---+
| Dan|[10, 5, 2, 12]| 29|
| Ann| [12, 3, 5]| 20|
| Jon| []| 0|
+----+--------------+---+
我建议使用 withColumn 方法进行更简单的制定/实现:
import pyspark.sql.functions as F
df = df.withColumn('Total',F.expr('AGGREGATE(scores, 0, (acc, x) -> acc + x)'))