我正在尝试比较Java和Kryo序列化,并将rdd保存在磁盘上,使用saveAsObjectFile时,它的大小相同,但是在持久性上,它在spark ui中显示不同。 Kryo比Java更小,但具有讽刺意味的是,Java的处理时间比Kryo的要少,这是Spark UI所没有的?
val conf = new SparkConf()
.setAppName("kyroExample")
.setMaster("local[*]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(
Array(classOf[Person],classOf[Array[Person]])
)
val sparkContext = new SparkContext(conf)
val personList: Array[Person] = (1 to 100000).map(value => Person(value + "", value)).toArray
val rddPerson: RDD[Person] = sparkContext.parallelize(personList)
val evenAgePerson: RDD[Person] = rddPerson.filter(_.age % 2 == 0)
case class Person(name: String, age: Int)
evenAgePerson.saveAsObjectFile("src/main/resources/objectFile")
evenAgePerson.persist(StorageLevel.MEMORY_ONLY_SER)
evenAgePerson.count()
火花持续,并且saveAsObjectFile完全不同。
persist-将您的RDD DAG保留到请求的StorageLevel,这意味着从现在开始,对该RDD进行的任何转换都将仅从保留的DAG中进行计算。
saveAsObjectFile-只需将RDD保存到序列化对象的SequenceFile中。
saveAsObjectFile根本不使用“ spark.serializer”配置。如您所见,以下代码:
/**
* Save this RDD as a SequenceFile of serialized objects.
*/
def saveAsObjectFile(path: String): Unit = withScope {
this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
.saveAsSequenceFile(path)
}
当序列化方法定义为:时,saveAsObjectFile使用Utils.serialize序列化您的对象:
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(bos)
oos.writeObject(o)
oos.close()
bos.toByteArray
}
saveAsObjectFile始终使用Java序列化。
另一方面,persist将使用您配置的配置的spark.serializer。
[persist
和saveAsObjectFile
解决了不同的需求。
persist
的名称具有误导性。不应将其用于永久保留rdd结果。 Persist用于在火花工作流期间临时保存rdd的计算结果。用户无法控制持久数据帧的位置。 Persist只是使用不同的缓存策略进行缓存-内存,磁盘或两者。实际上,cache
只会使用默认的缓存策略来调用persist
。
例如
val errors = df.filter(col("line").like("%ERROR%"))
// Counts all the errors
errors.count()
// Counts errors mentioning MySQL
// Runs again on the full dataframe of all the lines , repeats the above operation
errors.filter(col("line").like("%MySQL%")).count()
vs
val errors = df.filter(col("line").like("%ERROR%"))
errors.persist()
// Counts all the errors
errors.count()
// Counts errors mentioning MySQL
// Runs only on the errors tmp result containing only the filtered error lines
errors.filter(col("line").like("%MySQL%")).count()
saveAsObjectFile
用于永久保留。它用于将Spark作业的最终结果序列化到一个持久的,通常是分布式的文件系统上,例如hdfs
或amazon s3