我有一个RDD P映射到类:
case class MyRating(userId:Int, itemId:Int, rating:Double)
我有兴趣为每个用户(即GroupBy userId)找到TopN条目,并且在每个形成的组内,根据最高评级筛选出TopN(例如10个)条目。
我做了以下事情:
val A : RDD[((Int), Iterable[MyRating])] = P.keyBy(r => (r.userId)).groupByKey
val B : RDD[((Int), List[MyRating])] = key.mapValues(iter => iter.toList.sortBy(_.rating, false))
val C = values.groupByKey.take(10)
在groupByKey给我留下10个键(用户)之后,明确地应用.take(10)并且不会过滤掉每个用户的前10个评级。
我们如何在groupBy之后应用.take(N),以便它对值的某些部分而不是键本身起作用?
如果我理解正确,你需要做的是:按用户ID分组RDD,然后为每个(id,list)元组返回id并将列表排序并修剪为10个元素
P
.groupBy(_.userId)
.map{ case (key, it) =>
(key, it.toList.sortBy(mr => -mr.rating).take(10))
}
一种天真的方法是采用n值:
B.mapValues(_.take(n))
但是如果你只需要一小部分值,那么最好使用例如aggregateByKey
并在运行中删除过时的记录而不是分组所有内容。你可能想要在实践中想要更高效的东西(你可以查看top
/ takeOrdered
的Spark实现),但你可以从这样的东西开始:
import scala.math.Ordering
import scala.collection.mutable.PriorityQueue
implicit val ord = Ordering.by[MyRating, Double](_.rating)
val pairs = rdd.keyBy(_.userId)
pairs.aggregateByKey(new scala.collection.mutable.PriorityQueue[MyRating]())(
(acc, x) => {
acc.enqueue(x)
acc.take(n)
},
(acc1, acc2) => (acc1 ++ acc2).take(n)
)
请注意,由于SI-7568,上面的代码段需要Scala 2.11+。
你非常接近,但你需要在A到B的映射中取得前N个条目。例如,如果你想从List中获取前2个MyRating项,下面的代码就可以了。 B将是一个RDD,其中包含每个userId的前2个MyRating的列表。 (另外,sortBy函数只需将评级设为负数即可)。
case class MyRating(userId:Int, itemId:Int, rating:Double)
val plist:List[MyRating] = List(MyRating(1,0,1),MyRating(1,1,5),MyRating(1,2,7),MyRating(1,3,9),MyRating(1,4,10),MyRating(2,5,1),MyRating(2,6,5),MyRating(2,6,7))
val P: org.apache.spark.rdd.RDD[MyRating] = sc.parallelize(plist)
val A : RDD[((Int), Iterable[MyRating])] = P.keyBy(r => (r.userId)).groupByKey
val TOPCOUNT = 2
val B : RDD[((Int), List[MyRating])] = A.mapValues(iter => iter.toList.sortBy(- _.rating).take(TOPCOUNT))
以下是使用aggregateByKey
的示例,如zero323所示:
val A : RDD[(Int, MyRating)] = P.keyBy(r => r.userId)
val B = A.aggregateByKey(List[MyRating]())(
(l, r) => (l :+ r).sortBy(-_.rating).take(10),
(l1, l2) => (l1 ++ l2).sortBy(-_.rating).take(10))
使用此方法的好处是您不可能在执行程序之间混洗大量数据。如果单个用户的评级分布在多个节点上,groupBy
需要将用户的所有评级发送给同一个执行者,而aggregateByKey
首先在每个执行者上建立一个前N个列表,然后只对这些列表进行洗牌并结合起来。
这对您是否有益取决于数据的分布。如果你没有获得比最终顶级更多的评分,那么你就没有获得太多的收益(尤其是我的天真实施,对每个单独的评级进行排序)。但是,如果每个遗嘱执行人的评级数量级更大,您可以赢得很多。
我找到了相关的帖子:Spark: Get top N by key
复制@ jbochi的推荐:
从版本1.4开始,有一种使用MLLib执行此操作的内置方法:https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala
val scores = sc.parallelize(Array(
("a", 1),
("a", 2),
("a", 3),
("b", 3),
("b", 1),
("a", 4),
("b", 4),
("b", 2)
))
import org.apache.spark.mllib.rdd.MLPairRDDFunctions.fromPairRDD
scores.topByKey(2) // Where the keys are a and b