在 Spark 中运行任务时出现错误 ExecutorLostFailure

问题描述 投票:0回答:4

当我尝试在此文件夹上运行它时,它每次都会抛出 ExecutorLostFailure

嗨,我是 Spark 的初学者。我试图在 Spark 1.4.1 上运行一个作业,有 8 个从节点,每个节点有 11.7 GB 内存,每个 3.2 GB 磁盘。我从其中一个从节点(来自 8 个节点)运行 Spark 任务(因此,每个节点上只有 0.7 个存储部分,大约 4.8 GB 可用),并使用 Mesos 作为集群管理器。我正在使用这个配置:

spark.master mesos://uc1f-bioinfocloud-vamp-m-1:5050
spark.eventLog.enabled true
spark.driver.memory 6g
spark.storage.memoryFraction 0.7
spark.core.connection.ack.wait.timeout 800
spark.akka.frameSize 50
spark.rdd.compress true

我正在尝试在大约 14 GB 数据的文件夹上运行 Spark MLlib 朴素贝叶斯算法。 (当我在 6 GB 文件夹上运行任务时没有问题)我从谷歌存储中读取此文件夹作为 RDD 并给出 32 作为分区参数。(我也尝试过增加分区)。然后使用 TF 创建特征向量并在此基础上进行预测。 但是当我尝试在这个文件夹上运行它时,它每次都会抛出“ExecutorLostFailure”。我尝试了不同的配置,但没有任何帮助。可能我错过了一些非常基本但无法弄清楚的东西。任何帮助或建议都非常有价值。 日志是:

15/07/21 01:18:20 ERROR TaskSetManager: Task 3 in stage 2.0 failed 4 times; aborting job 15/07/21 01:18:20 INFO TaskSchedulerImpl: Cancelling stage 2 15/07/21 01:18:20 INFO TaskSchedulerImpl: Stage 2 was cancelled 15/07/21 01:18:20 INFO DAGScheduler: ResultStage 2 (collect at /opt/work/V2ProcessRecords.py:213) failed in 28.966 s 15/07/21 01:18:20 INFO DAGScheduler: Executor lost: 20150526-135628-3255597322-5050-1304-S8 (epoch 3) 15/07/21 01:18:20 INFO BlockManagerMasterEndpoint: Trying to remove executor 20150526-135628-3255597322-5050-1304-S8 from BlockManagerMaster. 15/07/21 01:18:20 INFO DAGScheduler: Job 2 failed: collect at /opt/work/V2ProcessRecords.py:213, took 29.013646 s Traceback (most recent call last): File "/opt/work/V2ProcessRecords.py", line 213, in <module> secondPassRDD = firstPassRDD.map(lambda ( name, title, idval, pmcId, pubDate, article, tags , author, ifSigmaCust, wclass): ( str(name), title, idval, pmcId, pubDate, article, tags , author, ifSigmaCust , "Yes" if ("PMC" + pmcId) in rddNIHGrant else ("No") , wclass)).collect() File "/usr/local/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 745, in collect File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/usr/local/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 2.0 failed 4 times, most recent failure: Lost task 3.3 in stage 2.0 (TID 12, vamp-m-2.c.quantum-854.internal): ExecutorLostFailure (executor 20150526-135628-3255597322-5050-1304-S8 lost) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 15/07/21 01:18:20 INFO BlockManagerMaster: Removed 20150526-135628-3255597322-5050-1304-S8 successfully in removeExecutor 15/07/21 01:18:20 INFO DAGScheduler: Host added was in lost list earlier:vamp-m-2.c.quantum-854.internal Jul 21, 2015 1:01:15 AM INFO: parquet.hadoop.ParquetFileReader: Initiating action with parallelism: 5 15/07/21 01:18:20 INFO SparkContext: Invoking stop() from shutdown hook

{
  "Event": "SparkListenerTaskStart",
  "Stage ID": 2,
  "Stage Attempt ID": 0,
  "Task Info": {
    "Task ID": 11,
    "Index": 6,
    "Attempt": 2,
    "Launch Time": 1437616381852,
    "Executor ID": "20150526-135628-3255597322-5050-1304-S8",
    "Host": "uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal",
    "Locality": "PROCESS_LOCAL",
    "Speculative": false,
    "Getting Result Time": 0,
    "Finish Time": 0,
    "Failed": false,
    "Accumulables": []
  }
}
{
  "Event": "SparkListenerExecutorRemoved",
  "Timestamp": 1437616389696,
  "Executor ID": "20150526-135628-3255597322-5050-1304-S8",
  "Removed Reason": "Lost executor"
}
{
  "Event": "SparkListenerTaskEnd",
  "Stage ID": 2,
  "Stage Attempt ID": 0,
  "Task Type": "ResultTask",
  "Task End Reason": {
    "Reason": "ExecutorLostFailure",
    "Executor ID": "20150526-135628-3255597322-5050-1304-S8"
  },
  "Task Info": {
    "Task ID": 11,
    "Index": 6,
    "Attempt": 2,
    "Launch Time": 1437616381852,
    "Executor ID": "20150526-135628-3255597322-5050-1304-S8",
    "Host": "uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal",
    "Locality": "PROCESS_LOCAL",
    "Speculative": false,
    "Getting Result Time": 0,
    "Finish Time": 1437616389697,
    "Failed": true,
    "Accumulables": []
  }
}
{
  "Event": "SparkListenerExecutorAdded",
  "Timestamp": 1437616389707,
  "Executor ID": "20150526-135628-3255597322-5050-1304-S8",
  "Executor Info": {
    "Host": "uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal",
    "Total Cores": 1,
    "Log Urls": {}
  }
}
{
  "Event": "SparkListenerTaskStart",
  "Stage ID": 2,
  "Stage Attempt ID": 0,
  "Task Info": {
    "Task ID": 12,
    "Index": 6,
    "Attempt": 3,
    "Launch Time": 1437616389702,
    "Executor ID": "20150526-135628-3255597322-5050-1304-S8",
    "Host": "uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal",
    "Locality": "PROCESS_LOCAL",
    "Speculative": false,
    "Getting Result Time": 0,
    "Finish Time": 0,
    "Failed": false,
    "Accumulables": []
  }
}
{
  "Event": "SparkListenerExecutorRemoved",
  "Timestamp": 1437616397743,
  "Executor ID": "20150526-135628-3255597322-5050-1304-S8",
  "Removed Reason": "Lost executor"
}
{
  "Event": "SparkListenerTaskEnd",
  "Stage ID": 2,
  "Stage Attempt ID": 0,
  "Task Type": "ResultTask",
  "Task End Reason": {
    "Reason": "ExecutorLostFailure",
    "Executor ID": "20150526-135628-3255597322-5050-1304-S8"
  },
  "Task Info": {
    "Task ID": 12,
    "Index": 6,
    "Attempt": 3,
    "Launch Time": 1437616389702,
    "Executor ID": "20150526-135628-3255597322-5050-1304-S8",
    "Host": "uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal",
    "Locality": "PROCESS_LOCAL",
    "Speculative": false,
    "Getting Result Time": 0,
    "Finish Time": 1437616397743,
    "Failed": true,
    "Accumulables": []
  }
}
{
  "Event": "SparkListenerStageCompleted",
  "Stage Info": {
    "Stage ID": 2,
    "Stage Attempt ID": 0,
    "Stage Name": "collect at /opt/work/V2ProcessRecords.py:215",
    "Number of Tasks": 72,
    "RDD Info": [
      {
        "RDD ID": 6,
        "Name": "PythonRDD",
        "Parent IDs": [
          0
        ],
        "Storage Level": {
          "Use Disk": false,
          "Use Memory": false,
          "Use ExternalBlockStore": false,
          "Deserialized": false,
          "Replication": 1
        },
        "Number of Partitions": 72,
        "Number of Cached Partitions": 0,
        "Memory Size": 0,
        "ExternalBlockStore Size": 0,
        "Disk Size": 0
      },
      {
        "RDD ID": 0,
        "Name": "gs://uc1f-bioinfocloud-vamp-m/literature/xml/P*/*.nxml",
        "Scope": "{\"id\":\"0\",\"name\":\"wholeTextFiles\"}",
        "Parent IDs": [],
        "Storage Level": {
          "Use Disk": false,
          "Use Memory": false,
          "Use ExternalBlockStore": false,
          "Deserialized": false,
          "Replication": 1
        },
        "Number of Partitions": 72,
        "Number of Cached Partitions": 0,
        "Memory Size": 0,
        "ExternalBlockStore Size": 0,
        "Disk Size": 0
      }
    ],
    "Parent IDs": [],
    "Details": "",
    "Submission Time": 1437616365566,
    "Completion Time": 1437616397753,
    "Failure Reason": "Job aborted due to stage failure: Task 6 in stage 2.0 failed 4 times, most recent failure: Lost task 6.3 in stage 2.0 (TID 12, uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal): ExecutorLostFailure (executor 20150526-135628-3255597322-5050-1304-S8 lost)\nDriver stacktrace:",
    "Accumulables": []
  }
}
{
  "Event": "SparkListenerJobEnd",
  "Job ID": 2,
  "Completion Time": 1437616397755,
  "Job Result": {
    "Result": "JobFailed",
    "Exception": {
      "Message": "Job aborted due to stage failure: Task 6 in stage 2.0 failed 4 times, most recent failure: Lost task 6.3 in stage 2.0 (TID 12, uc1f-bioinfocloud-vamp-m-2.c.quantum-device-854.internal): ExecutorLostFailure (executor 20150526-135628-3255597322-5050-1304-S8 lost)\nDriver stacktrace:",
      "Stack Trace": [
        {
          "Declaring Class": "org.apache.spark.scheduler.DAGScheduler",
          "Method Name": "org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages",
          "File Name": "DAGScheduler.scala",
          "Line Number": 1266
        },
        {
          "Declaring Class": "org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1",
          "Method Name": "apply",
          "File Name": "DAGScheduler.scala",
          "Line Number": 1257
        },
        {
          "Declaring Class": "org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1",
          "Method Name": "apply",
          "File Name": "DAGScheduler.scala",
          "Line Number": 1256
        },
        {
          "Declaring Class": "scala.collection.mutable.ResizableArray$class",
          "Method Name": "foreach",
          "File Name": "ResizableArray.scala",
          "Line Number": 59
        },
        {
          "Declaring Class": "scala.collection.mutable.ArrayBuffer",
          "Method Name": "foreach",
          "File Name": "ArrayBuffer.scala",
          "Line Number": 47
        },
        {
          "Declaring Class": "org.apache.spark.scheduler.DAGScheduler",
          "Method Name": "abortStage",
          "File Name": "DAGScheduler.scala",
          "Line Number": 1256
        },
        {
          "Declaring Class": "org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1",
          "Method Name": "apply",
          "File Name": "DAGScheduler.scala",
          "Line Number": 730
        },
        {
          "Declaring Class": "org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1",
          "Method Name": "apply",
          "File Name": "DAGScheduler.scala",
          "Line Number": 730
        },
        {
          "Declaring Class": "scala.Option",
          "Method Name": "foreach",
          "File Name": "Option.scala",
          "Line Number": 236
        },
        {
          "Declaring Class": "org.apache.spark.scheduler.DAGScheduler",
          "Method Name": "handleTaskSetFailed",
          "File Name": "DAGScheduler.scala",
          "Line Number": 730
        },
        {
          "Declaring Class": "org.apache.spark.scheduler.DAGSchedulerEventProcessLoop",
          "Method Name": "onReceive",
          "File Name": "DAGScheduler.scala",
          "Line Number": 1450
        },
        {
          "Declaring Class": "org.apache.spark.scheduler.DAGSchedulerEventProcessLoop",
          "Method Name": "onReceive",
          "File Name": "DAGScheduler.scala",
          "Line Number": 1411
        },
        {
          "Declaring Class": "org.apache.spark.util.EventLoop$$anon$1",
          "Method Name": "run",
          "File Name": "EventLoop.scala",
          "Line Number": 48
        }
      ]
    }
  }
}

apache-spark pyspark apache-spark-mllib collect
4个回答
8
投票

--conf "spark.default.parallelism=100"

将并行度值设置为集群上可用核心数的 2 到 3 倍。如果那不起作用。尝试以指数方式增加并行性。即,如果您当前的并行性不起作用,请将其乘以二,依此类推。我还观察到,如果您的并行级别是素数,特别是如果您使用 groupByKkey,这会有所帮助。 


8
投票

为了解决 OOM 问题,需要弄清楚到底是什么原因造成的。简单地增加默认并行度或增加执行器内存并不是一个战略解决方案。

如果你看看增加并行性的作用,它会尝试创建更多的执行器,以便每个执行器可以处理越来越少的数据。但是,如果您的数据存在偏差,以致发生数据分区(用于并行性)的键具有更多数据,则仅增加并行性将没有任何效果。

类似地,仅仅增加 Executor 内存将是一种非常低效的处理这种情况的方法,就好像只有一个 executor 因 ExecutorLostFailure 失败一样,为所有 executor 请求增加内存将使您的应用程序需要比实际预期更多的内存。


2
投票


2
投票
reduceByKey

任务中一个键的出现率非常高。 (我认为)这会导致在其中一个执行器上收集大量列表,然后会引发 OOM 错误。


对我来说,解决方案是在执行

reduceByKey

之前过滤掉人口较多的键,但我知道这可能会也可能不会,具体取决于您的应用程序。无论如何,我不需要我的所有数据。

    

© www.soinside.com 2019 - 2024. All rights reserved.