Python worker意外退出异常-在mapPartitions中导入psycopyg时

问题描述 投票:3回答:1

我是新来的火花。所以忍受我。

这是我想要做的:

我从CSV文件中读取条目,并检查该条目是否存在于数据库中,如果不存在则插入。我不想使用rdd.write.jdbc选项,因为我认为它将写入整个数据帧。

我正在使用mapPartition,并尝试通过psycopg2库初始化postgres连接,如下所示:

def save_to_db(records):
import psycopg2
from psycopg2.extensions import AsIs
url = 'postgres://postgres:@127.0.0.1:5432/spark_learn'
conn = psycopg2.connect(url)
conn.autocommit = True
cursor = conn.cursor()
for record in records:
    columns = record.keys()
    values = [record[key] for key in columns]
    cursor.execute("INSERT INTO heroes (%s) VALUES %s", (AsIs(','.join(columns)), tuple(values)))
return records

但是我收到如下错误:

Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:486)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:475)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:593)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:578)
... 26 more

请帮助我。

apache-spark pyspark psycopg2
1个回答
0
投票

该错误可能是由各种问题引起的。您必须首先找到根本原因。

首先请确保您在udf中具有最高级别的try-except-block,并在其中记录异常。

第二,请尽早在驱动程序和工作程序代码(udf)中注册类似import faulthandler; faulthandler.enable()的处理程序。如果原因是分段错误,则处理程序将打印一个堆栈跟踪,在日志中可见。

这两种方法都可以帮助您了解根本问题。

© www.soinside.com 2019 - 2024. All rights reserved.