执行器失败后 Spark 无法在 HDFS 中找到检查点数据

问题描述 投票:0回答:1

我正在从 Kafka 传输数据,如下所示:

final JavaPairDStream<String, Row> transformedMessages = 


    rtStream
                    .mapToPair(record -> new Tuple2<String, GenericDataModel>(record.key(), record.value()))                
                    .mapWithState(StateSpec.function(updateDataFunc).numPartitions(32)).stateSnapshots()                        
                    .foreachRDD(rdd -> {
                    --logic goes here
                    }); 

我有四个工作线程,以及该应用程序的多个执行程序,我正在尝试检查 Spark 的容错能力。

由于我们使用的是mapWithState,spark会将数据检查点发送到HDFS,因此如果任何执行程序/工作线程发生故障,我们应该能够恢复丢失的数据(死掉的执行程序中丢失的数据),并继续使用剩余的执行程序/工作线程。

因此我杀死了其中一个工作节点以查看应用程序是否仍然顺利运行,但我在 HDFS 中得到了 FileNotFound 的异常,如下所示:

这有点奇怪,因为 Spark 有时会在 HDFS 中检查数据,为什么它找不到它。显然HDFS没有删除任何数据,所以为什么会出现这个异常。

或者我在这里遗漏了什么?

[ERROR] 2018-08-21 13:07:24,067 org.apache.spark.streaming.scheduler.JobScheduler logError - Error running job streaming job 1534871220000 ms.2
                org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/user1/sparkCheckpointData/2db59817-d954-41a7-9b9d-4ec874bc86de/rdd-1005/part-00000
                java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/user1/sparkCheckpointData/2db59817-d954-41a7-9b9d-4ec874bc86de/rdd-1005/part-00000
                        at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
                at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
                at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
                at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
                at org.apache.spark.rdd.ReliableCheckpointRDD.getPreferredLocations(ReliableCheckpointRDD.scala:89)
                at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:273)
                at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:273)
                at scala.Option.map(Option.scala:146)
                at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:273)
                at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1615)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1626)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1625)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1625)
                at scala.collection.immutable.List.foreach(List.scala:381)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1625)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1623)

进一步更新: 我发现 Spark 试图在 HDFS 中查找的 RDD 已被“ReliableRDDCheckpointData”进程删除,并且它为检查点数据创建了一个新的 RDD。 DAG 以某种方式指向这个旧的 RDD。如果有任何对此数据的引用,则不应将其删除。

apache-spark spark-streaming spark-checkpoint
1个回答
2
投票

考虑 Spark 流上的转换管道:

rtStream
                    .mapToPair(record -> new Tuple2<String, GenericDataModel>(record.key(), record.value()))                
                    .mapWithState(StateSpec.function(updateDataFunc).numPartitions(32)).stateSnapshots()                        
                    .foreachRDD(rdd -> {
                      if(counter ==1){
                       --convert RDD to Dataset, and register it as a SQL table names "InitialDataTable"
                      } else
                       --convert RDD to Dataset, and register it as a SQL table names "ActualDataTable"

                     
                    }); 

mapWithState
与每个批次后状态数据的自动检查点相关联,因此上述
rdd
块中的每个
forEachRdd
都会被设置检查点。在检查点时,它会覆盖之前的检查点(因为显然最新的状态需要保留在检查点中)。

但是假设用户仍在使用

rdd
数字 1,就像在我的例子中,我将第一个
rdd
注册为不同的表,并将每个其他
rdd
注册为不同的表,那么它不应该被覆盖。 (在java中也是一样的:如果某个东西引用了一个对象引用,那么该对象将不符合垃圾回收的条件)。

现在,当我尝试访问表

InitialDataTable
时,显然用于创建该表的
rdd
已不在内存中,因此它将前往HDFS从检查点恢复它,并且在那里找不到它也因为它被下一个
rdd
覆盖,并且 Spark 应用程序停止引用原因。

org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: java.io.FileNotFoundException: File does not exist: hdfs://mycluster/user/user1/sparkCheckpointData/2db59817-d954-41a7-9b9d-4ec874bc86de/rdd-1005/part-00000

因此,为了解决这个问题,我必须明确地检查第一个

rdd

© www.soinside.com 2019 - 2024. All rights reserved.