我无法理解persist(StorageLevel.Memory_and_disk)的工作原理。我下面的代码段运行正常。
val df1= Spark.read.from.hive.table()
// Perform a high complex calculation & Aggregation.
.toDF()
df1.write.toAnotherHiveTable.
这大约需要1个小时才能获得140gb的数据。阶段/任务有
1.CalcStage = 750个任务(耗时50-55分钟) 2.插入到Hive阶段= 100个任务(3-4分钟)
我对修改如下内容有新要求。
val df1= Spark.read.from.hive.table()
// Perform a high complex calculation & Aggregation.
.toDF()
val df2=df1.filter($"exchange" === "commodities")
val finalDF = df1.join(df2)
finalDF .write.toAnotherHiveTable.
对于相同数量的数据,这大约需要1小时40分钟。而且阶段/任务有
1.CalcStage = 750个任务(耗时1小时30分钟) 2.CalcStage = 750tasks(耗时1小时30分钟)//前两个阶段开始并行运行。两个阶段的日志 从Hive表条目中读取 3.插入到Hive阶段= 100个任务(8-10分钟)
我假设由于df1和df2依赖于df1 calc逻辑,所以它会进行calc&聚合。并且我添加了df1的persist,如下所示。
val df1= Spark.read.from.hive.table()
// Perform a high complex calculation & Aggregation.
.toDF().persist(StorageLevel.Memory_and_disk)
val df2=df1.filter($"exchange" === "commodities")
val finalDF = df1.join(df2)
finalDF .write.toAnotherHiveTable
我以为添加persistent将有助于减少运行完整的calc / Aggregation的第二个df。但是我错了。 DAG计划和阶段日志与以前的运行相同。没有观察到变化。
我在这里想念什么吗??请帮助我理解为什么persist方法未更改任何内容。
您需要调用一个动作n,然后保存您的第一个数据帧,然后才将计算结果保存在您的内存中>]