我是一个 Spark 应用程序,有几个点我想保留当前状态。这通常是在一个大步骤之后,或者缓存我想多次使用的状态之后。看来,当我第二次在数据帧上调用缓存时,新副本会缓存到内存中。在我的应用程序中,这会导致扩展时出现内存问题。尽管在我当前的测试中给定的数据帧最大约为 100 MB,但中间结果的累积大小会超出执行器上分配的内存。请参阅下面的一个小示例来显示此行为。
cache_test.py:
from pyspark import SparkContext, HiveContext
spark_context = SparkContext(appName='cache_test')
hive_context = HiveContext(spark_context)
df = (
hive_context
.read
.format('com.databricks.spark.csv')
.load('simple_data.csv')
)
df.cache()
df.show()
df = df.withColumn('C1+C2', df['C1'] + df['C2'])
df.cache()
df.show()
spark_context.stop()
简单_数据.csv:
1,2,3
4,5,6
7,8,9
查看应用程序 UI,除了带有新列的数据框之外,还有原始数据框的副本。我可以通过在 withColumn 行之前调用
df.unpersist()
来删除原始副本。这是删除缓存的中间结果的推荐方法吗(即在每个 cache()
之前调用 unpersist)。
另外,是否可以清除所有缓存的对象。在我的应用程序中,存在自然断点,我可以在其中简单地清除所有内存,然后继续处理下一个文件。我想在不为每个输入文件创建新的 Spark 应用程序的情况下执行此操作。
提前谢谢您!
Spark 2.x
您可以使用
Catalog.clearCache
:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate
...
spark.catalog.clearCache()
Spark 1.x
SQLContext.clearCache
方法,其中
从内存缓存中删除所有缓存的表。
from pyspark.sql import SQLContext
from pyspark import SparkContext
sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
...
sqlContext.clearCache()
我们经常使用这个
for (id, rdd) in sc._jsc.getPersistentRDDs().items():
rdd.unpersist()
print("Unpersisted {} rdd".format(id))
其中
sc
是一个sparkContext变量。
当您在数据帧上使用缓存时,它是一种转换,并且当您对其执行任何操作(如 count()、show() 等)时,它会被延迟评估。
在您的情况下,在执行第一次缓存后,您正在调用 show(),这就是数据帧缓存在内存中的原因。现在,您再次对数据帧执行转换以添加附加列,并再次缓存新数据帧,然后再次调用操作命令 show ,这将在内存中缓存第二个数据帧。如果数据帧的大小足以容纳一个数据帧,那么当您缓存第二个数据帧时,它将从内存中删除第一个数据帧,因为它没有足够的空间来容纳第二个数据帧。
要记住的事情:除非在多个操作中使用数据帧,否则不应缓存数据帧,否则会导致性能过载,因为缓存本身是成本更高的操作。