BloomFilter mergeInPlace() 产生意外行为

问题描述 投票:0回答:1

下面的 Spark Scala 代码片段重现了我试图理解的行为。在较高层次上,我们构造两个元组,每个元组包含一个 DF 和相应 DF 的 id 列的布隆过滤器。然后我们过滤

b
,删除
a
中的行中包含 ID 的任何行,并将此过滤结果与
a
的并集存储为
c.

val a_df = Seq(("b", List("4", "16"))).toDF("id", "data")
val a_bloom_filter = a_df.stat.bloomFilter(col("id"), 2, 0.000001)
val a = (a_df, a_bloom_filter)

println("a")
a_df.show(20, false)

val b_df = Seq(("b", List("4")), ("c", List("4"))).toDF("id", "data")
val b_bloom_filter = b_df.stat.bloomFilter(col("id"), 2, 0.000001)
val b = (b_df, b_bloom_filter)

println("b")
b_df.show(20, false)

val a_bloom_filter_udf = udf((s: String) => !a._2.mightContain(s))
val filtered_b_df = b._1.filter(a_bloom_filter_udf(col("id")))

val c = a._1.union(filtered_b_df)

println("c")
c.show(20, false)

val merged_bloom_filter = a._2.mergeInPlace(b._2)

println("c")
c.show(20, false)

在 Spark REPL 中运行它会产生我不明白的输出:

a
+---+-------+
|id |data   |
+---+-------+
|b  |[4, 16]|
+---+-------+

b
+---+-------+
|id |data   |
+---+-------+
|b  |[4, 16]|
+---+-------+

c
+---+-------+
|id |data   |
+---+-------+
|b  |[4, 16]|
|c  |[4]    |
+---+-------+

c
+---+-------+
|id |data   |
+---+-------+
|b  |[4, 16]|
+---+-------+

具体来说,为什么当我们执行

c
操作时,
mergeInPlace
似乎发生了变化?我的期望是
c
在调用
show
之间不会改变。

阅读 mergeInPlace

文档
,我看到“突变发生在 this 实例上。”,在本例中是
a._2
,或
a
的布隆过滤器。

我目前的假设是,在

mergeInPlace
调用之后,
a._2
发生了变异,因此
a._2 := a._2 OR b._2
,或者在本例中实际上是
(b, c)
。然后对于第二个
show()
,似乎一切都被重新评估,即
a._2 = (b, c)
,所以

  • UDF 重新评估
    a._2 = (b, c)
  • filtered_b_df
    被重新评估为空 DF,因为
    b._1
    的两个 ID 都在“新”布隆过滤器中找到
    a._2
  • c
    被重新评估为
    a._1
    filtered_b_df
    (空 DF)之间的并集,因此实际上只是
    a._1
    ,这正是我们从第二个
    show()
  • 中看到的

这种信念似乎被以下事实所强化:

b._2.mergeInPlace(a._2)
而不是
a._2.mergeInPlace(b._2)
产生了预期的行为(即c不变),因为
a._2
没有被破坏,从而触发了某种重新评估。

apache-spark lazy-evaluation bloom-filter
1个回答
0
投票

是的。你自己回答了。

show
是一个动作。 发生重新评估...并不完全正确。只是 mergeInPlace 的相反情况实际上并没有在重新执行的代码中使用。

© www.soinside.com 2019 - 2024. All rights reserved.