假设我有一个名为mention_rdd
的RDD的Spark Scala程序,如下所示:
(name, (filename, sum))
...
(Maria, (file0, 3))
(John, (file0, 1))
(Maria, (file1, 6))
(Maria, (file2, 1))
(John, (file2, 3))
...
我们拥有文件名和每个名称的出现次数。
我想为每个名称减少并查找出现次数最多的文件名。例如:
(name, (filename, max(sum))
...
(Maria, (file1, 6))
(John, (file2, 3))
...
[我试图自己访问RDD的(filename,sum)
元组,所以我可以从那里减少name
(由于错误,我说我无法从mention_rdd
遍历,因为(String,Int)
是不是TraversableOnce
类型):
val output = mention_rdd.flatMap(file_counts => file_counts._2.map(file_counts._2._1, file_counts._2._2))
.reduceByKey((a, b) => if (a > b) a else b)
但是我得到一个错误,说值映射不是(String,Int)的成员
这是否可以在Spark中完成?如果是这样,怎么办?我的方法从一开始就存在缺陷吗?
为什么不只是:
val output = mention_rdd.reduceByKey {
case ((file1, sum1), (file2, sum2)) =>
if (sum2 >= sum1) (file2, sum2)
else (file1, sum1)
}