当我尝试在此文件夹上运行它时,它每次都会抛出 ExecutorLostFailure
嗨,我是 Spark 的初学者。我试图在 Spark 1.4.1 上运行一个作业,有 8 个从节点,每个节点有 11.7 GB 内存,每个 3.2 GB 磁盘。我从其中一个从节点(来自 8 个节点)运行 Spark 任务(因此,每个节点上只有 0.7 个存储部分,大约 4.8 GB 可用),并使用 Mesos 作为集群管理器。我正在使用这个配置:
spark.master mesos://uc1f-bioinfocloud-vamp-m-1:5050
spark.eventLog.enabled true
spark.driver.memory 6g
spark.storage.memoryFraction 0.7
spark.core.connection.ack.wait.timeout 800
spark.akka.frameSize 50
spark.rdd.compress true
我正在尝试在大约 14 GB 数据的文件夹上运行 Spark MLlib 朴素贝叶斯算法。 (当我在 6 GB 文件夹上运行任务时没有问题)我从谷歌存储中读取此文件夹作为 RDD 并给出 32 作为分区参数。(我也尝试过增加分区)。然后使用 TF 创建特征向量并在此基础上进行预测。 但是当我尝试在这个文件夹上运行它时,它每次都会抛出“ExecutorLostFailure”。我尝试了不同的配置,但没有任何帮助。可能我错过了一些非常基本但无法弄清楚的东西。任何帮助或建议都非常有价值。 日志是:
15/07/21 01:18:20 ERROR TaskSetManager: Task 3 in stage 2.0 failed 4 times; aborting job
15/07/21 01:18:20 INFO TaskSchedulerImpl: Cancelling stage 2
15/07/21 01:18:20 INFO TaskSchedulerImpl: Stage 2 was cancelled
15/07/21 01:18:20 INFO DAGScheduler: ResultStage 2 (collect at /opt/work/V2ProcessRecords.py:213) failed in 28.966 s
15/07/21 01:18:20 INFO DAGScheduler: Executor lost: 20150526-135628-3255597322-5050-1304-S8 (epoch 3)
15/07/21 01:18:20 INFO BlockManagerMasterEndpoint: Trying to remove executor 20150526-135628-3255597322-5050-1304-S8 from BlockManagerMaster.
15/07/21 01:18:20 INFO DAGScheduler: Job 2 failed: collect at /opt/work/V2ProcessRecords.py:213, took 29.013646 s
Traceback (most recent call last):
File "/opt/work/V2ProcessRecords.py", line 213, in <module>
secondPassRDD = firstPassRDD.map(lambda ( name, title, idval, pmcId, pubDate, article, tags , author, ifSigmaCust, wclass): ( str(name), title, idval, pmcId, pubDate, article, tags , author, ifSigmaCust , "Yes" if ("PMC" + pmcId) in rddNIHGrant else ("No") , wclass)).collect()
File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 745, in collect
File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 2.0 failed 4 times, most recent failure: Lost task 3.3 in stage 2.0 (TID 12, vamp-m-2.c.quantum-854.internal): ExecutorLostFailure (executor 20150526-135628-3255597322-5050-1304-S8 lost)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/07/21 01:18:20 INFO BlockManagerMaster: Removed 20150526-135628-3255597322-5050-1304-S8 successfully in removeExecutor
15/07/21 01:18:20 INFO DAGScheduler: Host added was in lost list earlier:vamp-m-2.c.quantum-854.internal
Jul 21, 2015 1:01:15 AM INFO: parquet.hadoop.ParquetFileReader: Initiating action with parallelism: 5
15/07/21 01:18:20 INFO SparkContext: Invoking stop() from shutdown hook
{
"Event": "SparkListenerTaskStart",
"Stage ID": 2,
"Stage Attempt ID": 0,
"Task Info": {
"Task ID": 11,
"Index": 6,
"Attempt": 2,
"Launch Time": 1437616381852,
"Executor ID": "20150526-135628-3255597322-5050-1304-S8",
"Host": "uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal",
"Locality": "PROCESS_LOCAL",
"Speculative": false,
"Getting Result Time": 0,
"Finish Time": 0,
"Failed": false,
"Accumulables": []
}
}
{
"Event": "SparkListenerExecutorRemoved",
"Timestamp": 1437616389696,
"Executor ID": "20150526-135628-3255597322-5050-1304-S8",
"Removed Reason": "Lost executor"
}
{
"Event": "SparkListenerTaskEnd",
"Stage ID": 2,
"Stage Attempt ID": 0,
"Task Type": "ResultTask",
"Task End Reason": {
"Reason": "ExecutorLostFailure",
"Executor ID": "20150526-135628-3255597322-5050-1304-S8"
},
"Task Info": {
"Task ID": 11,
"Index": 6,
"Attempt": 2,
"Launch Time": 1437616381852,
"Executor ID": "20150526-135628-3255597322-5050-1304-S8",
"Host": "uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal",
"Locality": "PROCESS_LOCAL",
"Speculative": false,
"Getting Result Time": 0,
"Finish Time": 1437616389697,
"Failed": true,
"Accumulables": []
}
}
{
"Event": "SparkListenerExecutorAdded",
"Timestamp": 1437616389707,
"Executor ID": "20150526-135628-3255597322-5050-1304-S8",
"Executor Info": {
"Host": "uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal",
"Total Cores": 1,
"Log Urls": {}
}
}
{
"Event": "SparkListenerTaskStart",
"Stage ID": 2,
"Stage Attempt ID": 0,
"Task Info": {
"Task ID": 12,
"Index": 6,
"Attempt": 3,
"Launch Time": 1437616389702,
"Executor ID": "20150526-135628-3255597322-5050-1304-S8",
"Host": "uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal",
"Locality": "PROCESS_LOCAL",
"Speculative": false,
"Getting Result Time": 0,
"Finish Time": 0,
"Failed": false,
"Accumulables": []
}
}
{
"Event": "SparkListenerExecutorRemoved",
"Timestamp": 1437616397743,
"Executor ID": "20150526-135628-3255597322-5050-1304-S8",
"Removed Reason": "Lost executor"
}
{
"Event": "SparkListenerTaskEnd",
"Stage ID": 2,
"Stage Attempt ID": 0,
"Task Type": "ResultTask",
"Task End Reason": {
"Reason": "ExecutorLostFailure",
"Executor ID": "20150526-135628-3255597322-5050-1304-S8"
},
"Task Info": {
"Task ID": 12,
"Index": 6,
"Attempt": 3,
"Launch Time": 1437616389702,
"Executor ID": "20150526-135628-3255597322-5050-1304-S8",
"Host": "uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal",
"Locality": "PROCESS_LOCAL",
"Speculative": false,
"Getting Result Time": 0,
"Finish Time": 1437616397743,
"Failed": true,
"Accumulables": []
}
}
{
"Event": "SparkListenerStageCompleted",
"Stage Info": {
"Stage ID": 2,
"Stage Attempt ID": 0,
"Stage Name": "collect at /opt/work/V2ProcessRecords.py:215",
"Number of Tasks": 72,
"RDD Info": [
{
"RDD ID": 6,
"Name": "PythonRDD",
"Parent IDs": [
0
],
"Storage Level": {
"Use Disk": false,
"Use Memory": false,
"Use ExternalBlockStore": false,
"Deserialized": false,
"Replication": 1
},
"Number of Partitions": 72,
"Number of Cached Partitions": 0,
"Memory Size": 0,
"ExternalBlockStore Size": 0,
"Disk Size": 0
},
{
"RDD ID": 0,
"Name": "gs://uc1f-bioinfocloud-vamp-m/literature/xml/P*/*.nxml",
"Scope": "{\"id\":\"0\",\"name\":\"wholeTextFiles\"}",
"Parent IDs": [],
"Storage Level": {
"Use Disk": false,
"Use Memory": false,
"Use ExternalBlockStore": false,
"Deserialized": false,
"Replication": 1
},
"Number of Partitions": 72,
"Number of Cached Partitions": 0,
"Memory Size": 0,
"ExternalBlockStore Size": 0,
"Disk Size": 0
}
],
"Parent IDs": [],
"Details": "",
"Submission Time": 1437616365566,
"Completion Time": 1437616397753,
"Failure Reason": "Job aborted due to stage failure: Task 6 in stage 2.0 failed 4 times, most recent failure: Lost task 6.3 in stage 2.0 (TID 12, uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal): ExecutorLostFailure (executor 20150526-135628-3255597322-5050-1304-S8 lost)\nDriver stacktrace:",
"Accumulables": []
}
}
{
"Event": "SparkListenerJobEnd",
"Job ID": 2,
"Completion Time": 1437616397755,
"Job Result": {
"Result": "JobFailed",
"Exception": {
"Message": "Job aborted due to stage failure: Task 6 in stage 2.0 failed 4 times, most recent failure: Lost task 6.3 in stage 2.0 (TID 12, uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal): ExecutorLostFailure (executor 20150526-135628-3255597322-5050-1304-S8 lost)\nDriver stacktrace:",
"Stack Trace": [
{
"Declaring Class": "org.apache.spark.scheduler.DAGScheduler",
"Method Name": "org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages",
"File Name": "DAGScheduler.scala",
"Line Number": 1266
},
{
"Declaring Class": "org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1",
"Method Name": "apply",
"File Name": "DAGScheduler.scala",
"Line Number": 1257
},
{
"Declaring Class": "org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1",
"Method Name": "apply",
"File Name": "DAGScheduler.scala",
"Line Number": 1256
},
{
"Declaring Class": "scala.collection.mutable.ResizableArray$class",
"Method Name": "foreach",
"File Name": "ResizableArray.scala",
"Line Number": 59
},
{
"Declaring Class": "scala.collection.mutable.ArrayBuffer",
"Method Name": "foreach",
"File Name": "ArrayBuffer.scala",
"Line Number": 47
},
{
"Declaring Class": "org.apache.spark.scheduler.DAGScheduler",
"Method Name": "abortStage",
"File Name": "DAGScheduler.scala",
"Line Number": 1256
},
{
"Declaring Class": "org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1",
"Method Name": "apply",
"File Name": "DAGScheduler.scala",
"Line Number": 730
},
{
"Declaring Class": "org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1",
"Method Name": "apply",
"File Name": "DAGScheduler.scala",
"Line Number": 730
},
{
"Declaring Class": "scala.Option",
"Method Name": "foreach",
"File Name": "Option.scala",
"Line Number": 236
},
{
"Declaring Class": "org.apache.spark.scheduler.DAGScheduler",
"Method Name": "handleTaskSetFailed",
"File Name": "DAGScheduler.scala",
"Line Number": 730
},
{
"Declaring Class": "org.apache.spark.scheduler.DAGSchedulerEventProcessLoop",
"Method Name": "onReceive",
"File Name": "DAGScheduler.scala",
"Line Number": 1450
},
{
"Declaring Class": "org.apache.spark.scheduler.DAGSchedulerEventProcessLoop",
"Method Name": "onReceive",
"File Name": "DAGScheduler.scala",
"Line Number": 1411
},
{
"Declaring Class": "org.apache.spark.util.EventLoop$$anon$1",
"Method Name": "run",
"File Name": "EventLoop.scala",
"Line Number": 48
}
]
}
}
}
--conf "spark.default.parallelism=100"
将并行度值设置为集群上可用核心数的 2 到 3 倍。如果那不起作用。尝试以指数方式增加并行性。即,如果您当前的并行性不起作用,请将其乘以二,依此类推。我还观察到,如果您的并行级别是素数,特别是如果您使用 groupByKkey,这会有所帮助。
为了解决 OOM 问题,需要弄清楚到底是什么原因造成的。简单地增加默认并行度或增加执行器内存并不是一个战略解决方案。
如果你看看增加并行性的作用,它会尝试创建更多的执行器,以便每个执行器可以处理越来越少的数据。但是,如果您的数据存在偏差,以致发生数据分区(用于并行性)的键具有更多数据,则仅增加并行性将没有任何效果。
类似地,仅仅增加 Executor 内存将是一种非常低效的处理这种情况的方法,就好像只有一个 executor 因 ExecutorLostFailure 失败一样,为所有 executor 请求增加内存将使您的应用程序需要比实际预期更多的内存。
reduceByKey
任务中一个键的出现率非常高。 (我认为)这会导致在其中一个执行器上收集大量列表,然后会引发 OOM 错误。
对我来说,解决方案是在执行
reduceByKey
之前过滤掉人口较多的键,但我知道这可能会也可能不会,具体取决于您的应用程序。无论如何,我不需要我的所有数据。