Spark保存功能使用MapReduce

问题描述 投票:0回答:1

我试图找出为什么saveAsText和更多一般的Spark保存功能似乎使用MapReduce。这是源代码:

RDD.scala

  def saveAsTextFile(path: String): Unit = withScope {
    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
    val textClassTag = implicitly[ClassTag[Text]]
    val r = this.mapPartitions { iter =>
      val text = new Text()
      iter.map { x =>
        text.set(x.toString)
        (NullWritable.get(), text)
      }
    }
    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
  }

PairRDDFunctions.scala

所以基本上在PairRDD中转换给定的RDD以调用saveAsHadoopFile函数:

def saveAsHadoopFile(
      path: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[_ <: OutputFormat[_, _]],
      conf: JobConf = new JobConf(self.context.hadoopConfiguration),
      codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {

    val hadoopConf = conf
    hadoopConf.setOutputKeyClass(keyClass)
    hadoopConf.setOutputValueClass(valueClass)
    conf.setOutputFormat(outputFormatClass)
    for (c <- codec) {
      hadoopConf.setCompressMapOutput(true)
      hadoopConf.set("mapred.output.compress", "true")
      hadoopConf.setMapOutputCompressorClass(c)
      hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
      hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
    }

    if (conf.getOutputCommitter == null) {
      hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
    }

    val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
    val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
    if (speculationEnabled && outputCommitterClass.contains("Direct")) {
      val warningMessage =
        s"$outputCommitterClass may be an output committer that writes data directly to " +
          "the final location. Because speculation is enabled, this output committer may " +
          "cause data loss (see the case in SPARK-10063). If possible, please use a output " +
          "committer that does not have this behavior (e.g. FileOutputCommitter)."
      logWarning(warningMessage)
    }

    FileOutputFormat.setOutputPath(hadoopConf,
      SparkHadoopWriter.createPathFromString(path, hadoopConf))
    saveAsHadoopDataset(hadoopConf)
  }

根据我的理解,它肯定会尝试配置MapReduce作业,设置outputKey,outputValue等等。

有人可以向我解释一下:

  • Spark节省操作如何发生
  • Spark save和MapReduce save之间的主要区别是什么?
apache-spark mapreduce rdd
1个回答
0
投票

它肯定试图配置MapReduce作业,设置outputKey,outputValue等。

不完全是。它正在设置Hadoop配置,但这并不意味着它正在设置MapReduce作业。这样的Hadoop包含多个不同的组件,而且大量的这些组件并没有与MapReduce紧密绑定。其中许多(如HDFS接口或安全组件)用于许多不同的项目。

Spark save和MapReduce save之间的主要区别是什么?

没有。通常,当Spark与文件系统交互时,它使用相关的Hadoop组件。但是,它们与MapReduce组件无关,不应与Hadoop MR作业混淆。

© www.soinside.com 2019 - 2024. All rights reserved.