函数内的 Spark 缓存

问题描述 投票:0回答:1

我有这个玩具示例函数返回缓存的 Spark DataFrame (DF):

def foo(df):
    try:
        base = complicated_query(df)
        base.cache()                                    # lazy cache
        base.count()                                    # trigger cache - wrong design???
        num1 = base.withColumn('number', f.lit('1'))
        num2 = base.withColumn('number', f.lit('2'))
        return num1.union(num2)
    finally:
        None
        # base.unpersist()

foo
的目的是简单地封装我不想在外部作用域中使用的临时变量(DataFrame)。
base
是一些复杂的DF,使用了两次,因此我
cache
它。我的问题是围绕明确的
count
调用(由 ChatGPT 建议)。这是为了触发缓存,但我觉得这是一个错误的设计。

  • 为什么此时我们需要
    count
    (一个动作)?收获是什么?如果我没有调用
    foo
    ,实际的缓存无论如何都会发生在对
    count
    返回值的第一个操作上。
  • 我注意到,使用相同的输入
    foo
    调用
    df
    两次具有不同的执行时间:第一次很慢,这是预期的,因为调用了
    count
    。但第二次几乎是瞬间的。这是为什么?当然
    base
    已经被缓存,然后
    count
    是微不足道的,但第二次运行中的
    base
    引用与第一次运行中的引用不同。 Spark 如何知道它可以重用第一次运行时缓存的 DF? (顺便说一句,我猜想我泄露了他的记忆,因为在
    foo
    退出后我无法
    unpersist
    它。)
  • foo
    退出时,我们真的有内存泄漏吗?我怎样才能
    unpersist
    基地?我必须完全放弃吗?我知道有
    spark.catalog.clearCache()
    可以清除所有缓存的 DF,但我想明确地为
    base
    执行此操作。这就是为什么函数中有一个
    finally
    子句,以防止泄漏,但这是一次失败的尝试,因为在这种情况下,我在使用缓存之前就释放了它......

您能帮忙解决这些问题吗?

python apache-spark pyspark azure-databricks
1个回答
0
投票
  1. 缓存是惰性发生的,仅当执行操作时数据帧才会被缓存。 这里使用
    count()
    是为了从缓存的
    base
  2. 中受益于后续操作
num1 = base.withColumn('number', f.lit('1'))
num2 = base.withColumn('number', f.lit('2'))

上面,操作查找缓存的

base
并使用该数据帧来减少延迟。

  1. 当第一次执行
    foo
    时,缓存完成,并且需要一些时间,但是当它使用不同的对象第二次执行时,它会检查逻辑计划。可能的原因是当逻辑计划匹配时它使用相同的缓存结果。

这是我得到的结果。

2 个不同的 Dataframe 对象

print(id(df1))
print(id(df2))

输出:

140492486613904 
140492486794896

执行时间



import time

start_time = time.time()
d_foo = foo(df1)
end_time = time.time()

elapsed_time = end_time - start_time
print(f"Time taken df1: {elapsed_time:.6f} seconds")


start_time = time.time()
t_foo = foo(df2)
end_time = time.time()

elapsed_time = end_time - start_time
print(f"Time taken df2: {elapsed_time:.6f} seconds")

输出

Time taken df1: 1.444211 seconds
Time taken df2: 0.730322 seconds

在这里,第二个执行得更快。

逻辑计划比较。

logical_plan_df1 = d_foo._jdf.queryExecution().logical()
logical_plan_df2 = t_foo._jdf.queryExecution().logical()

are_logical_plans_equal = logical_plan_df1.canEqual(logical_plan_df2)
print(f"\nAre the logical plans identical? {are_logical_plans_equal}")

输出:

Are the logical plans identical? True

这里,计划是相同的,所以它采用缓存的结果。

此外,每次调用

foo
数据帧都会被缓存,并且 Spark 将其保存在内存中。所以你需要
unpersist
他们。

这里是下面的代码。

def foo(df):
    try:
        base = df
        base.cache()                                    
        base.count()                                    
        num1 = base.withColumn('number', lit('1'))
        num2 = base.withColumn('number', lit('2'))
        num1.explain()
        return num1.union(num2)
    finally:
        if base is not None:
            base.unpersist()
            num1.explain()

在这里,您可以使用

explain()
检查计划,如果它由
InMemoryTableScan
组成,那么它正在使用缓存的数据帧,在
unpersist
之后它应该使用现有的数据帧。

© www.soinside.com 2019 - 2024. All rights reserved.