我一直在 Spark 中(特别是在 Databricks 中)努力理解的事情是,Spark 在第一次读取后是否会持久保存数据(以某种不是缓存的存储格式)?
更具体地说,当我在单元格 A 中运行以下代码时
val rdd1 = spark.sql("Select STH from STH").rdd //reading from unity catalogue (azure datalake blob storage)
rdd1.count
紧接着,我在单元格 B 中再次运行计数操作
rdd1.count
我收到以下错误
此错误仅在从数据帧转换为 RDD 时发生。如果我只使用数据帧(就像我不转换为 RDD),由于某种原因,一切都会正常工作。
经过长时间的反复试验,我发现如果我将
RDD
保留在磁盘缓存(或 RAM)中,那么我就不会再收到错误了。因此,如果我执行以下操作:
例如,当我执行以下操作时,问题就消失了:
val rdd1 = spark.sql("Select STH from STH").rdd.persist(DISK_ONLY) // or MEMORY_ONLY
rdd1.count
这一切让我更倾向于认为数据确实被持久化,即使我没有明确要求持久化。否则,当我没有明确持久化 RDD 时,只有第一个单元格中的
count
起作用的原因是什么? (换句话说,为什么计数只在第一次实际扫描表时起作用)。
Spark 在计算过程中管理延迟计算和数据存储,特别关注 RDD 及其与底层存储层的交互。 当您执行
spark.sql("Select STH from STH").rdd
时,它与惰性相关,意味着在调用操作(如计数)之前不会发生计算。
当您对 RDD 执行计数等操作时,Spark 会从源 (ADLS) 读取数据并进行处理。 如果不持久化RDD,Spark会在action完成后丢弃数据。 当您再次运行计数操作时,Spark 会尝试再次从源读取数据。
以下是我尝试过的方法作为示例:
val rdd2 = dilip_sampleData.rdd.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
println(s"Count: ${rdd2.count}")
println(s"Count from second action:: ${rdd2.count}")
结果:
Count: 4
Count from second action:: 4
rdd2: dilip_sampleData.rdd.type = MapPartitionsRDD[55] at rdd at command-3745231482926884:2