saveAsObjectFile和持久保存在Apache Spark中有什么区别?

问题描述 投票:0回答:1

我正在尝试比较Java和Kryo​​序列化,并将rdd保存在磁盘上,使用saveAsObjectFile时,它的大小相同,但是在持久性上,它在spark ui中显示不同。 Kryo比Java更小,但具有讽刺意味的是,Java的处理时间比Kryo的要少,这是Spark UI所没有的?

val conf = new SparkConf()
    .setAppName("kyroExample")
    .setMaster("local[*]")
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .registerKryoClasses(
      Array(classOf[Person],classOf[Array[Person]])
    )

  val sparkContext = new SparkContext(conf)
  val personList: Array[Person] = (1 to 100000).map(value => Person(value + "", value)).toArray

  val rddPerson: RDD[Person] = sparkContext.parallelize(personList)
  val evenAgePerson: RDD[Person] = rddPerson.filter(_.age % 2 == 0)

  case class Person(name: String, age: Int)

  evenAgePerson.saveAsObjectFile("src/main/resources/objectFile")
  evenAgePerson.persist(StorageLevel.MEMORY_ONLY_SER)
  evenAgePerson.count()
scala apache-spark serialization bigdata
1个回答
0
投票

火花持续,并且saveAsObjectFile完全不同。

persist-将您的RDD DAG保留到请求的StorageLevel,这意味着从现在开始,对该RDD进行的任何转换都将仅从保留的DAG中进行计算。

saveAsObjectFile-只需将RDD保存到序列化对象的SequenceFile中。

saveAsObjectFile根本不使用“ spark.serializer”配置。如您所见,以下代码:

  /**
   * Save this RDD as a SequenceFile of serialized objects.
   */
  def saveAsObjectFile(path: String): Unit = withScope {
    this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
      .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
      .saveAsSequenceFile(path)
  }

当序列化方法定义为:时,saveAsObjectFile使用Utils.serialize序列化您的对象:

  /** Serialize an object using Java serialization */
  def serialize[T](o: T): Array[Byte] = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(o)
    oos.close()
    bos.toByteArray
  }

saveAsObjectFile始终使用Java序列化。

另一方面,persist将使用您配置的配置的spark.serializer。


0
投票

[persistsaveAsObjectFile解决了不同的需求。

persist的名称具有误导性。不应将其用于永久保留rdd结果。 Persist用于在火花工作流期间临时保存rdd的计算结果。用户无法控制持久数据帧的位置。 Persist只是使用不同的缓存策略进行缓存-内存,磁盘或两者。实际上,cache只会使用默认的缓存策略来调用persist

例如

val errors = df.filter(col("line").like("%ERROR%"))
// Counts all the errors
errors.count()
// Counts errors mentioning MySQL
// Runs again on the full dataframe of all the lines , repeats the above operation 
errors.filter(col("line").like("%MySQL%")).count()

vs

val errors = df.filter(col("line").like("%ERROR%"))
errors.persist()
// Counts all the errors
errors.count()
// Counts errors mentioning MySQL
// Runs only on the errors tmp result containing only the filtered error lines

errors.filter(col("line").like("%MySQL%")).count()

saveAsObjectFile用于永久保留。它用于将Spark作业的最终结果序列化到一个持久的,通常是分布式的文件系统上,例如hdfsamazon s3

© www.soinside.com 2019 - 2024. All rights reserved.