Pyspark,如何使用udf计算泊松分布?

问题描述 投票:0回答:2

我有一个数据框,如下所示:

df_schema = StructType([StructField("date", StringType(), True),\
                              StructField("col1", FloatType(), True),\
                             StructField("col2", FloatType(), True)])
df_data = [('2020-08-01',0.09,0.8),\
                 ('2020-08-02',0.0483,0.8)]
rdd = sc.parallelize(df_data)
df = sqlContext.createDataFrame(df_data, df_schema)
df = df.withColumn("date",to_date("date", 'yyyy-MM-dd'))
df.show() 

+----------+------+----+
|      date|  col1|col2|
+----------+------+----+
|2020-08-01|  0.09| 0.8|
|2020-08-02|0.0483| 0.8|
+----------+------+----+

我想使用 col1 和 col2 计算泊松 CDF。

我们可以轻松地在 pandas dataframe 中使用 from scipy.stats import poisson 但我不知道如何处理 pyspark。

prob = poisson.cdf(x, mu) 其中 x= col1 ,在我们的例子中 mu = col2 。

尝试1:

from scipy.stats import poisson
from pyspark.sql.functions import udf,col
def poisson_calc(a,b):
    return poisson.cdf(a,b,axis=1)

poisson_calc = udf(poisson_calc, FloatType())

df_new = df.select(
  poisson_calc(col('col1'),col('col2')).alias("want") )

df_new.show()

给我一个错误:TypeError:_parse_args()有一个意外的关键字参数“axis”

pyspark apache-spark-sql user-defined-functions
2个回答
2
投票

我发现您的尝试存在一些问题。

  • 您将
    udf
    命名为与底层函数相同的名称。令人惊讶的是,这实际上并不是一个问题,但我会避免它。
  • axis
     没有 
    scipy.stats.poisson.cdf
  • 关键字参数
  • 您必须将输出显式转换为
    float
    ,否则您将遇到 此错误

解决所有问题,以下内容应该有效:

from scipy.stats import poisson
from pyspark.sql.functions import udf,col

def poisson_calc(a,b):
    return float(poisson.cdf(a,b))

poisson_calc_udf = udf(poisson_calc, FloatType())

df_new = df.select(
  poisson_calc_udf(col('col1'),col('col2')).alias("want") 
)

df_new.show()
#+----------+
#|      want|
#+----------+
#|0.44932896|
#|0.44932896|
#+----------+

0
投票

使用 pandas udf 更好,因为 scipy 和 numpy 是可广播的

from scipy.stats import poisson
import pandas as pd

# Define a pandas UDF
@F.pandas_udf(DoubleType())
def calculate_p_value_vectorized(observed: pd.Series, expected: pd.Series) -> pd.Series:
    # Vectorized computation
    return pd.Series(poisson.cdf(observed, expected))

# Apply the pandas UDF
df_new = df_new.withColumn(
    "p_value", calculate_p_value_vectorized(F.col("count"), F.col("expected_mean"))
)
© www.soinside.com 2019 - 2024. All rights reserved.