SPARK:尽管缓存,仍生成相同的阶段

问题描述 投票:0回答:2

我想知道如何才能拥有两个完全相同的阶段,尽管我在 Spark 中的每个操作之前缓存了数据。 你能看一下下面的截图吗,我觉得这很奇怪。这是否意味着我要执行两次阶段中的任务?

不幸的是,很难给出代码示例,但我会尝试解释我所做的。

  1. 从 CSV 读取数据
  2. 对特定列进行一些转换。
  3. 应用架构 ->
    spark.Session.createDataFrame(df.rdd,schema)
  4. 在第 3 点创建的 DF 上使用不同的过滤器创建 7 个新数据帧
  5. 从第 4 点获取两个数据帧,并将它们传递给另一个方法以进行比较。

我在第 2) 点和第 4) 点之后仅保留要比较的两个数据帧。比较后不再坚持。比较是一个漫长而复杂的过程。

enter image description here

scala apache-spark apache-spark-sql
2个回答
0
投票

您可以使用解释运算符查看 DataFrame 是否缓存在您的物理计划中(其中 InMemoryRelation 实体反映缓存的数据集及其存储级别):

== Physical Plan ==
*Project [id#0L, id#0L AS newId#16L]
+- InMemoryTableScan [id#0L]
      +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
            +- *Range (0, 1, step=1, splits=Some(8))

缓存(或持久化)DataFrame 后,第一个查询可能会变慢,但后续查询将会得到回报。

您可以使用以下代码检查数据集是否已缓存:

scala> :type q2
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

val cache = spark.sharedState.cacheManager
scala> cache.lookupCachedData(q2.queryExecution.logical).isDefined
res0: Boolean = false

Spark SQL 中的缓存有一个令人惊讶的地方。缓存是惰性的,这就是为什么您需要支付额外的费用来缓存第一个操作的行,但这仅发生在 DataFrame API 中。在 SQL 中,缓存是急切的,这会对查询性能产生巨大的影响,因为您不需要调用操作来触发缓存。


0
投票

缓存不会截断查询计划,只有

checkpoint()
会截断。

cache()
可以被驱逐,所以必须保留原来的计划。在缓存的阶段中有一个绿点而不是黑点。您还可以测量性能并查看缓存的执行速度更快。

另一方面,

checkpoint()
将Dataframe写入持久存储(磁盘,以及其他执行器中的副本),因此计划可以被截断

© www.soinside.com 2019 - 2024. All rights reserved.