我有这个玩具示例函数返回缓存的 Spark DataFrame (DF):
def foo(df):
try:
base = complicated_query(df)
base.cache() # lazy cache
base.count() # trigger cache - wrong design???
num1 = base.withColumn('number', f.lit('1'))
num2 = base.withColumn('number', f.lit('2'))
return num1.union(num2)
finally:
None
# base.unpersist()
foo
的目的是简单地封装我不想在外部作用域中使用的临时变量(DataFrame)。 base
是一些复杂的DF,使用了两次,因此我cache
它。我的问题是围绕明确的 count
调用(由 ChatGPT 建议)。这是为了触发缓存,但我觉得这是一个错误的设计。
count
(一个动作)?收获是什么?如果我没有调用 foo
,实际的缓存无论如何都会发生在对 count
返回值的第一个操作上。foo
调用 df
两次具有不同的执行时间:第一次很慢,这是预期的,因为调用了 count
。但第二次几乎是瞬间的。这是为什么?当然 base
已经被缓存,然后 count
是微不足道的,但第二次运行中的 base
引用与第一次运行中的引用不同。 Spark 如何知道它可以重用第一次运行时缓存的 DF? (顺便说一句,我猜想我泄露了他的记忆,因为在 foo
退出后我无法 unpersist
它。)foo
退出时,我们真的有内存泄漏吗?我怎样才能unpersist
基地?我必须完全放弃吗?我知道有 spark.catalog.clearCache()
可以清除所有缓存的 DF,但我想明确地为 base
执行此操作。这就是为什么函数中有一个 finally
子句,以防止泄漏,但这是一次失败的尝试,因为在这种情况下,我在使用缓存之前就释放了它......您能帮忙解决这些问题吗?
count()
是为了从缓存的 base
num1 = base.withColumn('number', f.lit('1'))
num2 = base.withColumn('number', f.lit('2'))
上面,操作查找缓存的
base
并使用该数据帧来减少延迟。
foo
时,缓存完成,并且需要一些时间,但是当它使用不同的对象第二次执行时,它会检查逻辑计划。可能的原因是当逻辑计划匹配时它使用相同的缓存结果。这是我得到的结果。
2 个不同的 Dataframe 对象
print(id(df1))
print(id(df2))
输出:
140492486613904
140492486794896
执行时间
import time
start_time = time.time()
d_foo = foo(df1)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken df1: {elapsed_time:.6f} seconds")
start_time = time.time()
t_foo = foo(df2)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken df2: {elapsed_time:.6f} seconds")
输出
Time taken df1: 1.444211 seconds
Time taken df2: 0.730322 seconds
在这里,第二个执行得更快。
逻辑计划比较。
logical_plan_df1 = d_foo._jdf.queryExecution().logical()
logical_plan_df2 = t_foo._jdf.queryExecution().logical()
are_logical_plans_equal = logical_plan_df1.canEqual(logical_plan_df2)
print(f"\nAre the logical plans identical? {are_logical_plans_equal}")
输出:
Are the logical plans identical? True
这里,计划是相同的,所以它采用缓存的结果。
此外,每次调用
foo
数据帧都会被缓存,并且 Spark 将其保存在内存中。所以你需要unpersist
他们。
这里是下面的代码。
def foo(df):
try:
base = df
base.cache()
base.count()
num1 = base.withColumn('number', lit('1'))
num2 = base.withColumn('number', lit('2'))
num1.explain()
return num1.union(num2)
finally:
if base is not None:
base.unpersist()
num1.explain()
在这里,您可以使用
explain()
检查计划,如果它由 InMemoryTableScan
组成,那么它正在使用缓存的数据帧,在 unpersist
之后它应该使用现有的数据帧。