我有一个目录,其中包含许多日志文件。我必须根据文件名解析日志文件。因此,我们完成的第一种方法是创建一个WholeTextFileRDD来并行化我的数据,如下所示
现在,当我将文件内容传递给单独的解析器以解析日志文件时,由于出现错误[
Spark Deploy模式是Cluster驱动程序内存20 GB执行器内存16 GB
val fileRDD = spark.sparkContext.wholeTextFiles(logBundle.path.trim)
fileRDD.map(tupleOfFileAndContent =>parseLog(tupleOfFileAndContent._2))
def parseLog(logfilecontent: String): List[Map[String, String]] = {
val txt = new Scanner(logfilecontent)
var linNum = 1
val logEntries = new ListBuffer[Map[String, String]]()
while (txt.hasNextLine) {
val line = txt.nextLine()
var logEntry = Map[String, String]()
if (line.startsWith(" EVENT SENDING:")){
logEntry += ("line_number" -> linNum.toString)
logEntry += ("event_sending" -> ( line.splitAt(18)._2.trim))
logEntries += logEntry
}
linNum += 1
}
logEntries.toList
}
希望我能有所帮助,欢呼。