我正在尝试在pyspark中实现LSH,为此我为集合中的每个文档创建了min-hash签名,然后将其划分为带(在这里,我发布了一个简化示例,其中仅2个带,并且签名由5个哈希组成。
我使用了此功能:
signatures = signatures.groupByKey().map(lambda x: (x[0], [elem for elem in x[1].__iter__()])).groupByKey()\
.map(lambda x: [x[0][1], x[0][0], [elem for elem in x[1].__iter__()][0]]).cache()
该函数返回了此输出:
[1, 1, [31891011288540205849559551829790241508456516432, 28971434183002082500813759681605076406295898007, 84354247191629359011614612371642003229438145118, 14879564999779411946535520329978444194295073263, 28999405396879353085885150485918753398187917441]]
[2, 2, [6236085341917560680351285350168314740288121088, 28971434183002082500813759681605076406295898007, 47263781832612219468430472591505267902435456768, 48215701840864104930367382664962486536872207556, 28999405396879353085885150485918753398187917441]]
[1, 3, [274378016236983705444587880288109426115402687, 120052627645426913871540455290804229381930764767, 113440107283022891200151098422815365240954899060, 95554518001487118601391311753326782629149232562, 84646902172764559093309166129305123869359546269]]
[2, 4, [6236085341917560680351285350168314740288121088, 28971434183002082500813759681605076406295898007, 47263781832612219468430472591505267902435456768, 48215701840864104930367382664962486536872207556, 28999405396879353085885150485918753398187917441]]
[1, 5, [6236085341917560680351285350168314740288121088, 28971434183002082500813759681605076406295898007, 47263781832612219468430472591505267902435456768, 48215701840864104930367382664962486536872207556, 28999405396879353085885150485918753398187917441]]
使用这种方案:[<,,]
现在,我的问题是:如何在pyspark中模拟嵌套的for循环,以针对每对签名(DOCi,DOCj)查找i = \ = j,计算签名共有多少个元素并返回由以下类型的元组组成的集合:
((DOCi,DOCj,签名中共有的元素数)
而且我只能比较具有相同波段编号的元素,我该怎么做?是在pyspark中实施LSH的正确方法吗?
IIUC,不需要嵌套的for
循环,只需在列表理解中使用itertools.combinations:
from itertools import combinations
signatures.groupBy(lambda x: x[0]) \
.mapValues(lambda x: set([ (y1[1], y2[1], len(set(y1[2]).intersection(set(y2[2])))) for y1,y2 in combinations(x,2) ])) \
.collect()
#[(2, {(2, 4, 5)}), (1, {(1, 3, 0), (1, 5, 2), (3, 5, 0)})]
注意:如果您可以在DataFrame之上构建应用程序,因为Spark 2.0自从Spark 2.0起就将基于RDD的pyspark.mllib引入了maintain mode,那么您可以选中pyspark.ml.feature.MinHashLSH以简化您的任务。