取消保留 (py)spark 中的所有数据帧

问题描述 投票:0回答:3

我是一个 Spark 应用程序,有几个点我想保留当前状态。这通常是在一个大步骤之后,或者缓存我想多次使用的状态之后。看来,当我第二次在数据帧上调用缓存时,新副本会缓存到内存中。在我的应用程序中,这会导致扩展时出现内存问题。尽管在我当前的测试中给定的数据帧最大约为 100 MB,但中间结果的累积大小会超出执行器上分配的内存。请参阅下面的一个小示例来显示此行为。

cache_test.py:

from pyspark import SparkContext, HiveContext

spark_context = SparkContext(appName='cache_test')
hive_context = HiveContext(spark_context)

df = (
    hive_context
    .read
    .format('com.databricks.spark.csv')
    .load('simple_data.csv')
)
df.cache()
df.show()

df = df.withColumn('C1+C2', df['C1'] + df['C2'])
df.cache()
df.show()

spark_context.stop()

简单_数据.csv:

1,2,3
4,5,6
7,8,9

查看应用程序 UI,除了带有新列的数据框之外,还有原始数据框的副本。我可以通过在 withColumn 行之前调用

df.unpersist()
来删除原始副本。这是删除缓存的中间结果的推荐方法吗(即在每个
cache()
之前调用 unpersist)。

另外,是否可以清除所有缓存的对象。在我的应用程序中,存在自然断点,我可以在其中简单地清除所有内存,然后继续处理下一个文件。我想在不为每个输入文件创建新的 Spark 应用程序的情况下执行此操作。

提前谢谢您!

python caching apache-spark pyspark apache-spark-sql
3个回答
99
投票

Spark 2.x

您可以使用

Catalog.clearCache

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate
...
spark.catalog.clearCache()

Spark 1.x

您可以使用

SQLContext.clearCache
方法,其中

从内存缓存中删除所有缓存的表。

from pyspark.sql import SQLContext
from pyspark import SparkContext

sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
...
sqlContext.clearCache()

30
投票

我们经常使用这个

for (id, rdd) in sc._jsc.getPersistentRDDs().items():
    rdd.unpersist()
    print("Unpersisted {} rdd".format(id))

其中

sc
是一个sparkContext变量。


1
投票

当您在数据帧上使用缓存时,它是一种转换,并且当您对其执行任何操作(如 count()、show() 等)时,它会被延迟评估。

在您的情况下,在执行第一次缓存后,您正在调用 show(),这就是数据帧缓存在内存中的原因。现在,您再次对数据帧执行转换以添加附加列,并再次缓存新数据帧,然后再次调用操作命令 show ,这将在内存中缓存第二个数据帧。如果数据帧的大小足以容纳一个数据帧,那么当您缓存第二个数据帧时,它将从内存中删除第一个数据帧,因为它没有足够的空间来容纳第二个数据帧。

要记住的事情:除非在多个操作中使用数据帧,否则不应缓存数据帧,否则会导致性能过载,因为缓存本身是成本更高的操作。

© www.soinside.com 2019 - 2024. All rights reserved.