我有一个包含密集向量的 spark 数据框,作为 Col_W_DensV1 和 Col_w_DenseV2 列,现在我想计算它们之间的余弦相似度,因此需要点积。我目前正在使用 UDF 并进行行操作,它非常慢并且只使用 1 个核心进行操作。有人可以提出更好的方法来实现这一目标吗?
Col1 | Col2 | Col_W_DensV1 | Col_w_DenseV2
a | b | [0.1 0.1 0.2..]| [0.3 0.5 0.8..]
需要
x.Dot(y)
在列级而不是行级和并行化
@udf("double")def cosim(x, y):
import numpy as np
return float(x.dot(y) / np.sqrt(x.dot(x)) /np.sqrt(y.dot(y)))
cs_table1 = cs_table.withColumn("similarity",cosim(cs_table.p_result,cs_table.result))
cs_table1.show()
为此我使用了 pandas udf:
def bulk_dot(
iterator: Iterator[Tuple[pd.Series, pd.Series]]
) -> Iterator[pd.Series]:
for input_series1, input_series2 in iterator:
m1 = np.stack(input_series1)
m2 = np.stack(input_series2)
dots = np.einsum("ij, ij -> i", m1, m2)
yield pd.Series(dots)
bulk_dot_udf = pandas_udf(bulk_dot, FloatType())
spark.udf.register("bulk_dot", bulk_dot_udf)