我们有一个非常复杂的 pyspark 作业,有一个巨大的执行计划。以前生成计划大约需要 20-30 分钟。就计划时间而言,缓存并没有真正改善太多。
我发现这篇文章表明缓存实际上并没有消除计划的复杂性(只有在解决后执行它),并且要将计划分割成更小的部分,您可以将数据帧转换为RDD并返回。添加这个为我们节省了大量时间,但是我们现在必须处理 java <-> python 序列化。
据我了解,pyspark数据框是java数据框的包装器,而java数据框是java RDD的包装器。然而,pyspark RDD 是一个 python 对象,要在 pyspark dataframe 和 python RDD 之间进行转换,需要在 python 和 JVM 之间发送数据。
我们可以像这样访问底层的java数据帧和rdd(并缓存它)
java_df = df._jdf
java_rdd = java_df.toJavaRDD()
cached_java_rdd = java_rdd.cache()
是否可以将生成的 java RDD 转换回 pyspark 数据帧?或者这只是 pyspark 的一个限制,是我们对不学习 scala 的惩罚?
我们尝试了以下方法:
df2 = spark.createDataFrame(cached_java_rdd, df.schema)
(不起作用,因为 java RDD 在 python 中不可迭代)
df2 = spark.createDataFrame([], df.schema)
jdf2 = cached_java_rdd.toDF()
df2._jdf = jdf2
(显然当我们在python中访问java rdd时缺少toDF()方法)
我找到了同一篇文章,可能也有类似的问题。我遇到了同样的问题,一旦我们有了 javaRDD,如何返回 python 数据框?我正在尝试这个,它在本地有效,但我还没有在我们实际的巨大数据集问题上尝试过。
>>> from pyspark import RDD
>>> spark.createDataFrame(RDD(sdf._jdf.toJavaRDD(), sc), sdf.schema)
DataFrame[date: timestamp, col1: string]
>>> spark.createDataFrame(RDD(sdf._jdf.toJavaRDD(), sc), sdf.schema).explain()
== Physical Plan ==
*(1) Scan ExistingRDD[date#13,col1#14]
我们使用 pyspark RDD 中的 jrdd 参数初始化 RDD。请参阅此文档https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html。
如果100%解决我的问题,稍后会更新