我是新来的火花。所以忍受我。
这是我想要做的:
我从CSV文件中读取条目,并检查该条目是否存在于数据库中,如果不存在则插入。我不想使用rdd.write.jdbc
选项,因为我认为它将写入整个数据帧。
我正在使用mapPartition
,并尝试通过psycopg2
库初始化postgres连接,如下所示:
def save_to_db(records):
import psycopg2
from psycopg2.extensions import AsIs
url = 'postgres://postgres:@127.0.0.1:5432/spark_learn'
conn = psycopg2.connect(url)
conn.autocommit = True
cursor = conn.cursor()
for record in records:
columns = record.keys()
values = [record[key] for key in columns]
cursor.execute("INSERT INTO heroes (%s) VALUES %s", (AsIs(','.join(columns)), tuple(values)))
return records
但是我收到如下错误:
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:486)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:475)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:593)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:578)
... 26 more
请帮助我。
该错误可能是由各种问题引起的。您必须首先找到根本原因。
首先请确保您在udf中具有最高级别的try-except-block,并在其中记录异常。
第二,请尽早在驱动程序和工作程序代码(udf)中注册类似import faulthandler; faulthandler.enable()
的处理程序。如果原因是分段错误,则处理程序将打印一个堆栈跟踪,在日志中可见。
这两种方法都可以帮助您了解根本问题。