我正在尝试使用 createDataFrame 方法创建一个数据框,但收到以下代码的错误,
from pyspark.sql import SparkSession
# Create a Spark session
spark = SparkSession.builder\
.appName("MyApp") \
.getOrCreate()
person = spark.createDataFrame([
(0, "AA", 0),
(1, "BB", 1),
(2, "CC", 1)
],schema= ["id", "name", "graduate"])
person.take(6)
这是我遇到的错误
Py4JJavaError: An error occurred while calling o43.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (QUASAR executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
但是当我从 csv 或任何其他文件导入数据时,一切正常,这是工作示例
flightData2015 = spark.read.option("inferSchema", "true").option("header","true").csv("flight-data\\csv\\2015-summary.csv")
flightData2015.take(5)
不知道为什么当我尝试打印使用 createDataFrame 方法创建的 DF 时出现错误
上面的代码在 databricks 中有效,但在标准 pyspark 中失败,因为, 非 Databricks 不支持将列名列表传递给架构。
修复:
createDataFrame 的 Schema 参数必须是:
显式定义架构的 StructType 对象。
如果 PySpark 预计从以下内容推断模式,则完全省略 数据。
person = spark.createDataFrame(
[(0, "AA", 0),
(1, "BB", 1),
(2, "CC", 1)],
["id", "name", "graduate"])
person.take(6)