org.apache.spark.SparkException:Python 工作线程无法重新连接

问题描述 投票:0回答:1

我正在尝试使用 createDataFrame 方法创建一个数据框,但收到以下代码的错误,

from pyspark.sql import SparkSession
# Create a Spark session
spark = SparkSession.builder\
        .appName("MyApp") \
        .getOrCreate()
person = spark.createDataFrame([
    (0, "AA", 0),
    (1, "BB", 1),
    (2, "CC", 1)
],schema= ["id", "name", "graduate"])
person.take(6)

这是我遇到的错误

Py4JJavaError: An error occurred while calling o43.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (QUASAR executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
    at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
    at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
    at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
    at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)

但是当我从 csv 或任何其他文件导入数据时,一切正常,这是工作示例

flightData2015 = spark.read.option("inferSchema", "true").option("header","true").csv("flight-data\\csv\\2015-summary.csv")
flightData2015.take(5)

不知道为什么当我尝试打印使用 createDataFrame 方法创建的 DF 时出现错误

apache-spark pyspark apache-spark-sql
1个回答
0
投票

上面的代码在 databricks 中有效,但在标准 pyspark 中失败,因为, 非 Databricks 不支持将列名列表传递给架构。

修复:

createDataFrame 的 Schema 参数必须是:

  1. 显式定义架构的 StructType 对象。

  2. 如果 PySpark 预计从以下内容推断模式,则完全省略 数据。

    person = spark.createDataFrame(
    [(0, "AA", 0),
    (1, "BB", 1),
    (2, "CC", 1)], 
    ["id", "name", "graduate"])
    person.take(6)
    
© www.soinside.com 2019 - 2024. All rights reserved.