下面的 Spark Scala 代码片段重现了我试图理解的行为。在较高层次上,我们构造两个元组,每个元组包含一个 DF 和相应 DF 的 id 列的布隆过滤器。然后我们过滤
b
,删除 a
中的行中包含 ID 的任何行,并将此过滤结果与 a
的并集存储为 c.
val a_df = Seq(("b", List("4", "16"))).toDF("id", "data")
val a_bloom_filter = a_df.stat.bloomFilter(col("id"), 2, 0.000001)
val a = (a_df, a_bloom_filter)
println("a")
a_df.show(20, false)
val b_df = Seq(("b", List("4")), ("c", List("4"))).toDF("id", "data")
val b_bloom_filter = b_df.stat.bloomFilter(col("id"), 2, 0.000001)
val b = (b_df, b_bloom_filter)
println("b")
b_df.show(20, false)
val a_bloom_filter_udf = udf((s: String) => !a._2.mightContain(s))
val filtered_b_df = b._1.filter(a_bloom_filter_udf(col("id")))
val c = a._1.union(filtered_b_df)
println("c")
c.show(20, false)
val merged_bloom_filter = a._2.mergeInPlace(b._2)
println("c")
c.show(20, false)
在 Spark REPL 中运行它会产生我不明白的输出:
a
+---+-------+
|id |data |
+---+-------+
|b |[4, 16]|
+---+-------+
b
+---+-------+
|id |data |
+---+-------+
|b |[4, 16]|
+---+-------+
c
+---+-------+
|id |data |
+---+-------+
|b |[4, 16]|
|c |[4] |
+---+-------+
c
+---+-------+
|id |data |
+---+-------+
|b |[4, 16]|
+---+-------+
具体来说,为什么当我们执行
c
操作时,mergeInPlace
似乎发生了变化?我的期望是 c
在调用 show
之间不会改变。
阅读 mergeInPlace
的
文档,我看到“突变发生在 this 实例上。”,在本例中是
a._2
,或 a
的布隆过滤器。
我目前的假设是,在
mergeInPlace
调用之后, a._2
发生了变异,因此 a._2 := a._2 OR b._2
,或者在本例中实际上是 (b, c)
。然后对于第二个show()
,似乎一切都被重新评估,即a._2 = (b, c)
,所以
a._2 = (b, c)
filtered_b_df
被重新评估为空 DF,因为 b._1
的两个 ID 都在“新”布隆过滤器中找到 a._2
c
被重新评估为 a._1
和 filtered_b_df
(空 DF)之间的并集,因此实际上只是 a._1
,这正是我们从第二个 show()
这种信念似乎被以下事实所强化:
b._2.mergeInPlace(a._2)
而不是a._2.mergeInPlace(b._2)
产生了预期的行为(即c不变),因为a._2
没有被破坏,从而触发了某种重新评估。
是的。你自己回答了。
show
是一个动作。 发生重新评估...并不完全正确。只是 mergeInPlace 的相反情况实际上并没有在重新执行的代码中使用。