我想知道如何才能拥有两个完全相同的阶段,尽管我在 Spark 中的每个操作之前缓存了数据。 你能看一下下面的截图吗,我觉得这很奇怪。这是否意味着我要执行两次阶段中的任务?
不幸的是,很难给出代码示例,但我会尝试解释我所做的。
spark.Session.createDataFrame(df.rdd,schema)
我在第 2) 点和第 4) 点之后仅保留要比较的两个数据帧。比较后不再坚持。比较是一个漫长而复杂的过程。
您可以使用解释运算符查看 DataFrame 是否缓存在您的物理计划中(其中 InMemoryRelation 实体反映缓存的数据集及其存储级别):
== Physical Plan ==
*Project [id#0L, id#0L AS newId#16L]
+- InMemoryTableScan [id#0L]
+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Range (0, 1, step=1, splits=Some(8))
缓存(或持久化)DataFrame 后,第一个查询可能会变慢,但后续查询将会得到回报。
您可以使用以下代码检查数据集是否已缓存:
scala> :type q2
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
val cache = spark.sharedState.cacheManager
scala> cache.lookupCachedData(q2.queryExecution.logical).isDefined
res0: Boolean = false
Spark SQL 中的缓存有一个令人惊讶的地方。缓存是惰性的,这就是为什么您需要支付额外的费用来缓存第一个操作的行,但这仅发生在 DataFrame API 中。在 SQL 中,缓存是急切的,这会对查询性能产生巨大的影响,因为您不需要调用操作来触发缓存。
缓存不会截断查询计划,只有
checkpoint()
会截断。
cache()
可以被驱逐,所以必须保留原来的计划。在缓存的阶段中有一个绿点而不是黑点。您还可以测量性能并查看缓存的执行速度更快。
另一方面,
checkpoint()
将Dataframe写入持久存储(磁盘,以及其他执行器中的副本),因此计划可以被截断