Apache Spark中的转换过程

问题描述 投票:1回答:2

转换基于现有RDD创建新的RDD。基本上,RDD是不可变的,Spark中的所有转换都是惰性的。在执行操作但未处理数据之前,RDD中的数据不会被处理,新的RDD如何创建?例如,在filter操作中,如何在不将RDD实际加载到内存并处理它的情况下创建新RDD?

apache-spark rdd
2个回答
4
投票

问题:例如,在过滤操作中,如何在不将RDD实际加载到内存并处理它的情况下创建新RDD?

Apache Spark中的转换过程:

enter image description here

例如:

firstRDD=spark.textFile("hdfs://...")

secondRDD=firstRDD.filter(someFunction);

thirdRDD = secondRDD.map(someFunction);

result = thirdRDD.count()

由于RDD是通过一组转换创建的,因此它会记录这些转换,而不是实际数据(这就像我们使用这个特定的预定义进行过滤时需要完成的行动计划)。这些转换的图形生成一个RDD被称为像下面的Lineage图。

Spark RDD Lineage Graph in this above example would be :

enter image description here

请参阅RDD.scala只有在遇到使用filter的谓词时才会创建新的RDD ..这就像行动计划一样。只有当您调用count之类的操作时,才会执行此计划。

/*** Return a new RDD containing only the elements that satisfy a predicate.
       */
      def filter(f: T => Boolean): RDD[T] = withScope {
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[T, T](
          this,
          (context, pid, iter) => iter.filter(cleanF),
          preservesPartitioning = true)
      }
  • 延迟评估意味着当我们在RDD上调用转换时(例如,调用map()),操作不会立即执行。
  • 相反,Spark内部记录元数据以指示已请求此操作。不要将RDD视为包含特定数据,而是最好将每个RDD视为如何计算我们通过转换构建的数据的指令。
  • 将数据加载到RDD中的方式与Transormations相同。因此,当我们调用sc.textFile()时,在有必要之前不会加载数据。与转换一样,操作(在这种情况下,读取数据)可以多次发生。

懒惰的评估:(纠正你的引用“Spark中的所有转换都是懒惰的”到“Spark中的所有转换都被懒惰地评估”)

Spark首次在动作中使用它时会懒惰地计算RDD,以便它可以进行管道转换。因此,在上面的示例中,仅在调用count()操作时才会评估RDD。

希望有帮助......


0
投票

Spark变换在运行中是懒惰的。这些操作不会立即计算,它只记得在RDD上应用的转换并返回指向操作输出的指针。只有在对其应用操作时才会计算转换操作。应用操作后,spark会将操作分解为任务并将它们分发到节点上以供执行。

© www.soinside.com 2019 - 2024. All rights reserved.