我试图找出为什么saveAsText
和更多一般的Spark保存功能似乎使用MapReduce。这是源代码:
RDD.scala
def saveAsTextFile(path: String): Unit = withScope {
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
val textClassTag = implicitly[ClassTag[Text]]
val r = this.mapPartitions { iter =>
val text = new Text()
iter.map { x =>
text.set(x.toString)
(NullWritable.get(), text)
}
}
RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}
PairRDDFunctions.scala
所以基本上在PairRDD中转换给定的RDD以调用saveAsHadoopFile
函数:
def saveAsHadoopFile(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf(self.context.hadoopConfiguration),
codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
val hadoopConf = conf
hadoopConf.setOutputKeyClass(keyClass)
hadoopConf.setOutputValueClass(valueClass)
conf.setOutputFormat(outputFormatClass)
for (c <- codec) {
hadoopConf.setCompressMapOutput(true)
hadoopConf.set("mapred.output.compress", "true")
hadoopConf.setMapOutputCompressorClass(c)
hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
}
if (conf.getOutputCommitter == null) {
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
}
val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
if (speculationEnabled && outputCommitterClass.contains("Direct")) {
val warningMessage =
s"$outputCommitterClass may be an output committer that writes data directly to " +
"the final location. Because speculation is enabled, this output committer may " +
"cause data loss (see the case in SPARK-10063). If possible, please use a output " +
"committer that does not have this behavior (e.g. FileOutputCommitter)."
logWarning(warningMessage)
}
FileOutputFormat.setOutputPath(hadoopConf,
SparkHadoopWriter.createPathFromString(path, hadoopConf))
saveAsHadoopDataset(hadoopConf)
}
根据我的理解,它肯定会尝试配置MapReduce作业,设置outputKey,outputValue等等。
有人可以向我解释一下:
它肯定试图配置MapReduce作业,设置outputKey,outputValue等。
不完全是。它正在设置Hadoop配置,但这并不意味着它正在设置MapReduce作业。这样的Hadoop包含多个不同的组件,而且大量的这些组件并没有与MapReduce紧密绑定。其中许多(如HDFS接口或安全组件)用于许多不同的项目。
Spark save和MapReduce save之间的主要区别是什么?
没有。通常,当Spark与文件系统交互时,它使用相关的Hadoop组件。但是,它们与MapReduce组件无关,不应与Hadoop MR作业混淆。