我有一个字符串数组
tokens
,它由单词和数字组成,我正在尝试在 Apache Spark 中同时计算单个单词、单词-单词对和数字-单词对的计数。我有函数 isWord(token)
和 isNumber(token)
返回标记是单词还是数字。
这是一个结果应该是什么样子的例子:
(states,1)
(united,10)
(alameda,3)
((united,states), 1)
((states,located), 3)
((located,alameda), 11)
((19,century),1)
((100,inhabitants),1)
((2020,music),1)
我想出了下面的代码来解决问题:
val tokens = textFile.flatMap(line => findTokens(line))
// Find the word counts
val wordCounts = tokens.filter(token => isWord(token))
.map(token => (token, 1))
.reduceByKey(_ + _)
// Find the counts of all pairs
val pairCounts = tokens.sliding(2)
.map(pair => (pair(0), pair(1)))
.map(pair => (pair, 1))
.reduceByKey(_ + _)
// Find the word-word counts
val wordWordCounts = pairCounts.filter(pair => isWord(pair._1._1) && isWord(pair._1._2))
// Find the number-word counts
val numberWordCounts = pairCounts.filter(pair => isNumber(pair._1._1) && isWord(pair._1._2))
代码运行良好,但我不确定这是否是在 Spark 中执行此操作的最有效和最优雅的方式。单次执行此操作有意义吗?
我是 Spark 和 Scala 的新手,但这是我在想的事情:
val (wordCounts, wordWordCounts, numberWordCounts) = tokens
.sliding(2)
.map({case Array(prevToken, currToken) => {
val wordCount = if (isWord(prevToken)) Seq((prevToken, 1)) else Seq.empty
val wordWordCount = if (isWord(prevToken) && isWord(currToken)) Seq(((prevToken, currToken), 1)) else Seq.empty
val numberWordCount = if (isNumber(prevToken) && isWord(currToken)) Seq(((prevToken, currToken), 1)) else Seq.empty
(wordCount, wordWordCount, numberWordCount)
}
})
val wordCountsRDD = sc.parallelize(wordCounts.reduceByKey(_ + _))
val wordWordCountsRDD = sc.parallelize(wordWordCounts.reduceByKey(_ + _))
val numberWordCountsRDD = sc.parallelize(numberWordCounts.reduceByKey(_ + _))
这段代码还没有功能,我一直在努力让它工作,我不确定这是否是在 map 函数内创建多个序列的好习惯。尝试这样的事情有意义吗?
如果我需要创建多个 RDD,我应该在原始 RDD 上使用转换函数并分别计算每个键类型的计数吗?有更好的方法吗?谢谢!