我正在对集群进行计算,最后当我用df.describe()询问我的Spark数据帧的汇总统计时.show()我得到一个错误:
序列化任务15:0是137500581字节,超过了最大允许值:spark.rpc.message.maxSize(134217728字节)。考虑增加spark.rpc.message.maxSize或使用广播变量来获取较大的值
在我的Spark配置中,我已经尝试增加上述参数:
spark = (SparkSession
.builder
.appName("TV segmentation - dataprep for scoring")
.config("spark.executor.memory", "25G")
.config("spark.driver.memory", "40G")
.config("spark.dynamicAllocation.enabled", "true")
.config("spark.dynamicAllocation.maxExecutors", "12")
.config("spark.driver.maxResultSize", "3g")
.config("spark.kryoserializer.buffer.max.mb", "2047mb")
.config("spark.rpc.message.maxSize", "1000mb")
.getOrCreate())
我还尝试使用以下方法重新分区我的数据帧:
dfscoring = dfscoring.repartition(100)
但我仍然继续得到同样的错误。
我的环境:Python 3.5,Anaconda 5.0,Spark 2
我怎样才能避免这个错误?
我遇到了同样的问题,浪费了我生命中的一天,我永远不会回来。我不确定为什么会这样,但这就是我如何让它为我工作。
第1步:确保正确设置了PYSPARK_PYTHON和PYSPARK_DRIVER_PYTHON。原来,worker(2.6)中的python与驱动程序(3.6)中的版本不同。您应该检查是否正确设置了环境变量PYSPARK_PYTHON和PYSPARK_DRIVER_PYTHON。
我通过简单地将我的内核从Python 3 Spark 2.2.0切换到Jupyter中的Python Spark 2.3.1来修复它。您可能需要手动设置它。以下是如何确保您的PySpark正确设置https://mortada.net/3-easy-steps-to-set-up-pyspark.html
第2步:如果这不起作用,请尝试解决它:这个内核开关适用于我没有添加任何列的DF:spark_df - > panda_df - > back_to_spark_df ....但是它没有用于在我添加了5个额外列的DF。所以我尝试了它并且它起作用如下:
# 1. Select only the new columns:
df_write = df[['hotel_id','neg_prob','prob','ipw','auc','brier_score']]
# 2. Convert this DF into Spark DF:
df_to_spark = spark.createDataFrame(df_write)
df_to_spark = df_to_spark.repartition(100)
df_to_spark.registerTempTable('df_to_spark')
# 3. Join it to the rest of your data:
final = df_to_spark.join(data,'hotel_id')
# 4. Then write the final DF.
final.write.saveAsTable('schema_name.table_name',mode='overwrite')
希望有所帮助!