转换基于现有RDD创建新的RDD。基本上,RDD是不可变的,Spark中的所有转换都是惰性的。在执行操作但未处理数据之前,RDD中的数据不会被处理,新的RDD如何创建?例如,在filter
操作中,如何在不将RDD实际加载到内存并处理它的情况下创建新RDD?
问题:例如,在过滤操作中,如何在不将RDD实际加载到内存并处理它的情况下创建新RDD?
例如:
firstRDD=spark.textFile("hdfs://...")
secondRDD=firstRDD.filter(someFunction);
thirdRDD = secondRDD.map(someFunction);
result = thirdRDD.count()
由于RDD是通过一组转换创建的,因此它会记录这些转换,而不是实际数据(这就像我们使用这个特定的预定义进行过滤时需要完成的行动计划)。这些转换的图形生成一个RDD被称为像下面的Lineage图。
请参阅RDD.scala只有在遇到使用filter
的谓词时才会创建新的RDD ..这就像行动计划一样。只有当您调用count
之类的操作时,才会执行此计划。
/*** Return a new RDD containing only the elements that satisfy a predicate.
*/
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}
懒惰的评估:(纠正你的引用“Spark中的所有转换都是懒惰的”到“Spark中的所有转换都被懒惰地评估”)
Spark首次在动作中使用它时会懒惰地计算RDD,以便它可以进行管道转换。因此,在上面的示例中,仅在调用
count()
操作时才会评估RDD。
希望有帮助......
Spark变换在运行中是懒惰的。这些操作不会立即计算,它只记得在RDD上应用的转换并返回指向操作输出的指针。只有在对其应用操作时才会计算转换操作。应用操作后,spark会将操作分解为任务并将它们分发到节点上以供执行。