Spark:GroupBy之后的TOPN

问题描述 投票:1回答:5

我有一个RDD P映射到类:

case class MyRating(userId:Int, itemId:Int, rating:Double)

我有兴趣为每个用户(即GroupBy userId)找到TopN条目,并且在每个形成的组内,根据最高评级筛选出TopN(例如10个)条目。

我做了以下事情:

val A : RDD[((Int), Iterable[MyRating])] = P.keyBy(r => (r.userId)).groupByKey
val B : RDD[((Int), List[MyRating])] = key.mapValues(iter => iter.toList.sortBy(_.rating, false))
val C = values.groupByKey.take(10)

在groupByKey给我留下10个键(用户)之后,明确地应用.take(10)并且不会过滤掉每个用户的前10个评级。

我们如何在groupBy之后应用.take(N),以便它对值的某些部分而不是键本身起作用?

scala apache-spark rdd
5个回答
3
投票

如果我理解正确,你需要做的是:按用户ID分组RDD,然后为每个(id,list)元组返回id并将列表排序并修剪为10个元素

P
  .groupBy(_.userId)  
  .map{ case (key, it) => 
    (key, it.toList.sortBy(mr => -mr.rating).take(10)) 
  }

3
投票

一种天真的方法是采用n值:

B.mapValues(_.take(n))

但是如果你只需要一小部分值,那么最好使用例如aggregateByKey并在运行中删除过时的记录而不是分组所有内容。你可能想要在实践中想要更高效的东西(你可以查看top / takeOrdered的Spark实现),但你可以从这样的东西开始:

import scala.math.Ordering
import scala.collection.mutable.PriorityQueue

implicit val ord = Ordering.by[MyRating, Double](_.rating)

val pairs = rdd.keyBy(_.userId)
pairs.aggregateByKey(new scala.collection.mutable.PriorityQueue[MyRating]())(
  (acc, x) => {
    acc.enqueue(x)
    acc.take(n)
  },
  (acc1, acc2) => (acc1 ++ acc2).take(n)
)

请注意,由于SI-7568,上面的代码段需要Scala 2.11+。


1
投票

你非常接近,但你需要在A到B的映射中取得前N个条目。例如,如果你想从List中获取前2个MyRating项,下面的代码就可以了。 B将是一个RDD,其中包含每个userId的前2个MyRating的列表。 (另外,sortBy函数只需将评级设为负数即可)。

case class MyRating(userId:Int, itemId:Int, rating:Double)

val plist:List[MyRating] = List(MyRating(1,0,1),MyRating(1,1,5),MyRating(1,2,7),MyRating(1,3,9),MyRating(1,4,10),MyRating(2,5,1),MyRating(2,6,5),MyRating(2,6,7))
val P: org.apache.spark.rdd.RDD[MyRating] = sc.parallelize(plist)

val A : RDD[((Int), Iterable[MyRating])] = P.keyBy(r => (r.userId)).groupByKey
val TOPCOUNT = 2
val B : RDD[((Int), List[MyRating])] = A.mapValues(iter => iter.toList.sortBy(- _.rating).take(TOPCOUNT))

1
投票

以下是使用aggregateByKey的示例,如zero323所示:

val A : RDD[(Int, MyRating)] = P.keyBy(r => r.userId)
val B = A.aggregateByKey(List[MyRating]())(
  (l, r) => (l :+ r).sortBy(-_.rating).take(10),
  (l1, l2) => (l1 ++ l2).sortBy(-_.rating).take(10))

使用此方法的好处是您不可能在执行程序之间混洗大量数据。如果单个用户的评级分布在多个节点上,groupBy需要将用户的所有评级发送给同一个执行者,而aggregateByKey首先在每个执行者上建立一个前N个列表,然后只对这些列表进行洗牌并结合起来。

这对您是否有益取决于数据的分布。如果你没有获得比最终顶级更多的评分,那么你就没有获得太多的收益(尤其是我的天真实施,对每个单独的评级进行排序)。但是,如果每个遗嘱执行人的评级数量级更大,您可以赢得很多。


0
投票

我找到了相关的帖子:Spark: Get top N by key

复制@ jbochi的推荐:

从版本1.4开始,有一种使用MLLib执行此操作的内置方法:https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala

val scores = sc.parallelize(Array(
      ("a", 1),  
      ("a", 2), 
      ("a", 3), 
      ("b", 3), 
      ("b", 1), 
      ("a", 4),  
      ("b", 4), 
      ("b", 2)
))
import org.apache.spark.mllib.rdd.MLPairRDDFunctions.fromPairRDD
scores.topByKey(2) // Where the keys are a and b
© www.soinside.com 2019 - 2024. All rights reserved.