我如何根据下面的代码片段找到匹配字符串的出现,我能够将过滤后的字符串作为输出,但不是出现的
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object WordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("wordCount")
val sc = new SparkContext(conf)
// Load our input data.
val input = sc.textFile("file:///tmp/ganesh/*")
val matched_pattern = input.filter(line => line.contains("Title"))
// Split it up into words.
val words = matched_pattern.flatMap(line => line.split(" "))
// Transform into pairs and count.
val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}
// Save the word count back out to a text file, causing evaluation.
counts.saveAsTextFile("file:///tmp/sparkout")
}
}
这是一个例子 - 使用广播变量。 stopWords实际上包括单词。
val dfsFilename = "/FileStore/tables/7dxa9btd1477497663691/Text_File_01-880f5.txt"
val readFileRDD = spark.sparkContext.textFile(dfsFilename)
// res4: Array[String] = Array(The the is Is a A to To OK ok I) //stopWords
val stopWordsInput = spark.sparkContext.textFile("/FileStore/tables/filter_words.txt")
val stopWords = stopWordsInput.flatMap(x => x.split(" ")).map(_.trim).collect.toSet
val broadcasted = sc.broadcast(stopWords)
val wcounts1 = readFileRDD.map(x => (x.replaceAll("[^A-Za-z0-9]", " ")
.trim.toLowerCase))
.flatMap(line=>line.split(" "))
.filter(broadcasted.value.contains(_))
.map(word=>(word, 1))
.reduceByKey(_ + _)
wcounts1.collect
收益:
res2: Array[(String, Int)] = Array((The,1), (I,3), (to,1), (the,1))
你可以在stopWords上播放广播 - 这就是我所做的。
我看到你的XML输入和replaceAll。你可以根据自己的喜好来摆弄它。我还添加了一个条款,将其全部用于小写。