我在 pyspark 中有一个非常简单的数据框,如下所示:
from pyspark.sql import Row
from pyspark.mllib.linalg import DenseVector
row = Row("a", "b")
df = spark.sparkContext.parallelize([
offer_row(DenseVector([1, 1, 1]), DenseVector([1, 0, 0])),
]).toDF()
我想计算这些向量的点积而不诉诸 UDF 调用。
spark MLLIB 文档引用了
dot
上的 DenseVectors
方法,但如果我尝试按如下方式应用此方法:
df_offers = df_offers.withColumn("c", col("a").dot(col("b")))
我收到如下错误:
TypeError: 'Column' object is not callable
有谁知道这些 mllib 方法是否能够在 DataFrame 对象上调用?
在这里,您在列上应用
dot
方法,而不是在 DenseVector
上,这确实不起作用 :
df_offers = df_offers.withColumn("c", col("a").dot(col("b")))
你必须使用 udf :
from pyspark.sql.functions import udf, array
from pyspark.sql.types import DoubleType
def dot_fun(array):
return array[0].dot(array[1])
dot_udf = udf(dot_fun, DoubleType())
df_offers = df_offers.withColumn("c", dot_udf(array('a', 'b')))
没有。你必须使用 udf:
from pyspark.sql.functions import udf
@udf("double")
def dot(x, y):
if x is not None and y is not None:
return float(x.dot(y))
您可以在不使用 UDF 的情况下将两列相乘,方法是先将它们转换为 BlockMatrix,然后将它们相乘,如下例所示
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
ac = offer_row.select('a')
bc = offer_row.select('a')
mata = IndexedRowMatrix(ac.rdd.map(lambda row: IndexedRow(*row)))
matb = IndexedRowMatrix(bc.rdd.map(lambda row: IndexedRow(*row)))
ma = mata.toBlockMatrix(100,100)
mb = matb.toBlockMatrix(100,100)
ans = ma.multiply(mb.transpose())
这是一个 hack,但可能比 Python udf 性能更高。您可以将点积转换为 SQL:
import pandas as pd
from pyspark.sql.functions import expr
coefs = pd.Series({'a': 1.0, 'b': 2.0})
dot_sql = ' + '.join(
'{} * {}'.format(coef, colname)
for colname, coef
in coefs.items()
)
dot_expr = expr(dot_sql)
df.withColumn('dot_product', dot_expr)
作为对第一个答案的评论,我现在得到的只是:
AttributeError: 'list' object has no attribute 'dot'
。
即使我调用
np.dot(a, b)
,类型总是有错误。比如:
Job aborted due to stage failure: Task 0 in stage 225.0 failed 4 times, most recent failure: ... : net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)
我必须将返回值更改为 item(),以便它不再在 numpy 中。工作解决方案:
@F.udf(returnType=T.FloatType())
def dot_udf(arr1, arr2):
if arr1 is not None and arr2 is not None:
return np.dot(arr1, arr2).astype(np.float32).item()
df_ = df.withColumn("c", dot_udf(F.col('a'), F.col('b')))