我有一个用 Scala 编写的 Spark 作业,偶尔会失败并显示以下消息:
org.apache.spark.SparkException: Job aborted due to stage failure:
A shuffle map stage with indeterminate output was failed and retried.
However, Spark cannot rollback the ResultStage 20 to re-process the input data,
and has to fail this job. Please eliminate the indeterminacy by checkpointing
the RDD before repartition and try again.
代码如下所示:
val rdd = getRDD(...)
val output = rdd.repartition(NumPartitions).mapPartitions { ... }.collect
我不太熟悉检查点,但在公司代码库的其他地方找到了一些实例,并将其复制到我自己的工作中:
spark.sparkContext.setCheckpointDir("/tmp/checkpoints")
val rdd = getRDD(...)
rdd.checkpoint()
val output = rdd.repartition(NumPartitions).mapPartitions { ... }.collect
但是,作业仍然间歇性失败,并显示完全相同的消息。 堆栈跟踪指向其中包含
repartition
的行,并且在任何情况下,作业都不包含其他 repartition
调用。
有人知道即使添加检查点后我如何也会遇到相同的错误吗?
使用解释计划检查您的转换,以确保它们是确定性的。避免依赖随机数生成或系统时间的操作,因为这些操作可能会带来不确定性。