给出两个大的RDD,a
具有一组(key, value)
对,而b
仅具有keys
,那么将它们联接起来的最佳方式是什么,以便a
仅保留那些与键匹配的行的b
?
更具体地说,这是我想做的:
val a: RDD[(Int, Double)] = ...
val b: RDD[Int] = ...
val c: RDD[(Int, Double)] = a.myFilterJoin(b)
其中c
仅包含与a
中的键匹配的b
行,并且我们可以假定a
仅包含唯一键。是否有类似myFilterJoin
的内容?
[请注意,如果b
足够小,我可以简单地将其作为一组广播,然后将其用作b
上的过滤器。但让我们假设b
足够大,以致于此广播的成本过高。
[我通常要做的是向b
添加一个虚拟变量,以使b
获得(key, dummy)
的形式以便能够进行联接,然后在映射中删除该虚拟变量。但这似乎很hacky,我想知道是否有更好的解决方案。
听起来您应该使用内部联接:
import spark.implicits._
val a: DataFrame = spark.sparkContext.parallelize(Seq((1, 2.5), (2, 4.4), (3, 2.1))).toDF("keyA", "value")
val b: DataFrame = spark.sparkContext.parallelize(Seq(3, 5, 1)).toDF("keyB")
val c = a.join(b, $"keyA" === $"keyB", "inner").drop("keyB")
c.show()
+----+-----+
|keyA|value|
+----+-----+
| 1| 2.5|
| 3| 2.1|
+----+-----+
并且如果您想返回c为RDD [(Int,Double)],则可以使用:
val d = c.rdd.map(row => (row.get(0).asInstanceOf[Int], row.get(1).asInstanceOf[Double]))
类似于ShemTov的答案,但是通过使用Datasets而不是DataFrames来保持类型安全。(PS:我建议您只使用Datasets而不是RDDs]
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
val a = sc.parallelize(List((1 -> 0.0), (2 -> 3.3), (3 -> 5.5), (5 -> 10.11)))
val b = sc.parallelize(List(2, 3, 4, 5))
val c = b.toDS.joinWith(a.toDS, $"value" === $"_1", "inner").map {
case (_, (key, value)) => key -> value
}.rdd