使用 MLLIB 的 pyspark 数据帧中的点积

问题描述 投票:0回答:5

我在 pyspark 中有一个非常简单的数据框,如下所示:

from pyspark.sql import Row
from pyspark.mllib.linalg import DenseVector

row = Row("a", "b")
df = spark.sparkContext.parallelize([
    offer_row(DenseVector([1, 1, 1]), DenseVector([1, 0, 0])),
]).toDF()

我想计算这些向量的点积而不诉诸 UDF 调用。

spark MLLIB 文档引用了

dot
上的
DenseVectors
方法,但如果我尝试按如下方式应用此方法:

df_offers = df_offers.withColumn("c", col("a").dot(col("b")))

我收到如下错误:

TypeError: 'Column' object is not callable

有谁知道这些 mllib 方法是否能够在 DataFrame 对象上调用?

python apache-spark pyspark apache-spark-mllib
5个回答
1
投票

在这里,您在列上应用

dot
方法,而不是在
DenseVector
上,这确实不起作用 :

df_offers = df_offers.withColumn("c", col("a").dot(col("b")))

你必须使用 udf :

from pyspark.sql.functions import udf, array
from pyspark.sql.types import DoubleType

def dot_fun(array):
    return array[0].dot(array[1])

dot_udf = udf(dot_fun, DoubleType())

df_offers = df_offers.withColumn("c", dot_udf(array('a', 'b')))

0
投票

没有。你必须使用 udf:

from pyspark.sql.functions import udf

@udf("double")
def dot(x, y):
    if x is not None and y is not None:
        return float(x.dot(y))

0
投票

您可以在不使用 UDF 的情况下将两列相乘,方法是先将它们转换为 BlockMatrix,然后将它们相乘,如下例所示


from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

ac = offer_row.select('a')
bc = offer_row.select('a')
mata = IndexedRowMatrix(ac.rdd.map(lambda row: IndexedRow(*row)))
matb = IndexedRowMatrix(bc.rdd.map(lambda row: IndexedRow(*row)))

ma = mata.toBlockMatrix(100,100)
mb = matb.toBlockMatrix(100,100)

ans = ma.multiply(mb.transpose())


0
投票

这是一个 hack,但可能比 Python udf 性能更高。您可以将点积转换为 SQL:

import pandas as pd
from pyspark.sql.functions import expr

coefs = pd.Series({'a': 1.0, 'b': 2.0})
dot_sql = ' + '.join(
    '{} * {}'.format(coef, colname)
    for colname, coef
    in coefs.items()
)
dot_expr = expr(dot_sql)

df.withColumn('dot_product', dot_expr)

0
投票

作为对第一个答案的评论,我现在得到的只是:

AttributeError: 'list' object has no attribute 'dot'

即使我调用

np.dot(a, b)
,类型总是有错误。比如:

Job aborted due to stage failure: Task 0 in stage 225.0 failed 4 times, most recent failure: ... : net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)

我必须将返回值更改为 item(),以便它不再在 numpy 中。工作解决方案:

@F.udf(returnType=T.FloatType())
def dot_udf(arr1, arr2):
    if arr1 is not None and arr2 is not None:
        return np.dot(arr1, arr2).astype(np.float32).item()

df_ = df.withColumn("c", dot_udf(F.col('a'), F.col('b')))
© www.soinside.com 2019 - 2024. All rights reserved.