Pyspark。 Spark.SparkException:作业因阶段失败而中止:阶段 15.0 中的任务 0 失败 1 次,java.net.SocketException:连接重置

问题描述 投票:0回答:5

我是 pyspark 的新手,我正在尝试使用 pyspark 在 Prophet 中运行多个时间序列(作为分布式计算,因为我有 100 个时间序列需要预测),但我有如下错误。

import time 
start_time = time.time()
sdf = spark.createDataFrame(data)
print('%0.2f min: Lags' % ((time.time() - start_time) / 60))
sdf.createOrReplaceTempView('Quantity')
spark.sql("select Reseller_City, Business_Unit, count(*) from Quantity group by Reseller_City, Business_Unit order by Reseller_City, Business_Unit").show()
query = 'SELECT Reseller_City, Business_Unit, conditions, black_week, promos, Sales_Date as ds, sum(Rslr_Sales_Quantity) as y FROM Quantity GROUP BY Reseller_City, Business_Unit, conditions, black_week, promos, ds ORDER BY Reseller_City, Business_Unit, ds'
spark.sql(query).show()
sdf.rdd.getNumPartitions()
store_part = (spark.sql(query).repartition(spark.sparkContext.defaultParallelism['Reseller_City','Business_Unit'])).cache()

store_part.explain()

from pyspark.sql.types import *

result_schema =StructType([
  StructField('ds',TimestampType()),
  StructField('Reseller_City',StringType()),
  StructField('Business_Unit',StringType()),
  StructField('y',DoubleType()),
  StructField('yhat',DoubleType()),
  StructField('yhat_upper',DoubleType()),
  StructField('yhat_lower',DoubleType())
  ])
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf( result_schema, PandasUDFType.GROUPED_MAP )
def forecast_sales( store_pd ):
    
    model = Prophet(interval_width=0.95, holidays = lock_down)
    model.add_country_holidays(country_name='DE')
    model.add_regressor('conditions')
    model.add_regressor('black_week')
    model.add_regressor('promos')
    
    train = store_pd[store_pd['ds']<'2021-10-01 00:00:00']
    future_pd = store_pd[store_pd['ds']>='2021-10-01 00:00:00']
    model.fit(train[['ds', 'y', 'conditions', 'black_week', 'promos']])


    forecast_pd = model.predict(future_pd[['ds', 'conditions', 'black_week', 'promos']])  

    f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds')

    #store_pd = store_pd.filter(store_pd['ds']<'2021-10-01 00:00:00')

    st_pd = future_pd[['ds','Reseller_City','Business_Unit','y']].set_index('ds')

    results_pd = f_pd.join( st_pd, how='left' )
    results_pd.reset_index(level=0, inplace=True)

    results_pd[['Reseller_City','Business_Unit']] = future_pd[['Reseller_City','Business_Unit']].iloc[0]

    return results_pd[ ['ds', 'Reseller_City','Business_Unit','y', 'yhat', 'yhat_upper', 'yhat_lower'] ]
results = (store_part.groupBy(['Reseller_City','Business_Unit']).apply(forecast_sales).withColumn('training date', current_date() ))
results.cache()
results.show()

所有行都执行完美,但错误来自 results.show() 行我不明白我哪里做错了,如果有人帮助我,非常感谢

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-46-8c647e8bf4d9> in <module>
----> 1 results.show()

C:\spark-3.0.3-bin-hadoop2.7\python\pyspark\sql\dataframe.py in show(self, n, truncate, vertical)
    438         """
    439         if isinstance(truncate, bool) and truncate:
--> 440             print(self._jdf.showString(n, 20, vertical))
    441         else:
    442             print(self._jdf.showString(n, int(truncate), vertical))

C:\spark-3.0.3-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

C:\spark-3.0.3-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
    126     def deco(*a, **kw):
    127         try:
--> 128             return f(*a, **kw)
    129         except py4j.protocol.Py4JJavaError as e:
    130             converted = convert_exception(e.java_exception)

C:\spark-3.0.3-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o128.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 1243, Grogu.profiflitzer.local, executor driver): java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at java.io.BufferedInputStream.fill(Unknown Source)
    at java.io.BufferedInputStream.read(Unknown Source)
    at java.io.DataInputStream.readInt(Unknown Source)
    at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:86)
    at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anon$1.hasNext(InMemoryRelation.scala:132)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1371)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1298)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1362)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1186)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(Unknown Source)
    at java.net.SocketInputStream.read(Unknown Source)
    at java.io.BufferedInputStream.fill(Unknown Source)
    at java.io.BufferedInputStream.read(Unknown Source)
    at java.io.DataInputStream.readInt(Unknown Source)
    at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:86)
    at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anon$1.hasNext(InMemoryRelation.scala:132)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1371)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1298)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1362)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1186)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:463)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    ... 1 more

python apache-spark pyspark apache-spark-sql
5个回答
10
投票

您还可以按照以下步骤设置操作系统环境变量, 在 SparkSession/SparkContext 之前运行它

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

它对我有用


2
投票

确保您已设置spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

在此处了解更多信息:https://spark.apache.org/docs/3.0.1/sql-pyspark-pandas-with-arrow.html

如果这不起作用,请尝试增加驱动程序和工作线程内存大小。


0
投票

我建议你降级你的Python版本,然后重新安装pyspark,这个错误是因为你使用的是最新版本的Python。

使用python==3.7,以后可能需要调整。


-1
投票

如果您在 Jupyter Notebook 或 Anaconda 上运行 pyspark,那么由于 Python 版本不同或最新,因此会出现此问题。所以我建议您在 Anaconda 中创建一个虚拟环境,然后使用以下命令在虚拟环境中打开 Jupyter Notebook。

  1. pip 安装虚拟环境
  2. virtualenv venv --python=python3.7
  3. 。u000benv\脚本激活

-1
投票
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable

os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

上面的解决方案也对我有用,我被困了将近三天

© www.soinside.com 2019 - 2024. All rights reserved.