加入两个RDD,其中一个只有键,没有值

问题描述 投票:0回答:2

给出两个大的RDD,a具有一组(key, value)对,而b仅具有keys,那么将它们联接起来的最佳方式是什么,以便a仅保留那些与键匹配的行的b

更具体地说,这是我想做的:

val a: RDD[(Int, Double)] = ...
val b: RDD[Int] = ...
val c: RDD[(Int, Double)] = a.myFilterJoin(b)

其中c仅包含与a中的键匹配的b行,并且我们可以假定a仅包含唯一键。是否有类似myFilterJoin的内容?

[请注意,如果b足够小,我可以简单地将其作为一组广播,然后将其用作b上的过滤器。但让我们假设b足够大,以致于此广播的成本过高。

[我通常要做的是向b添加一个虚拟变量,以使b获得(key, dummy)的形式以便能够进行联接,然后在映射中删除该虚拟变量。但这似乎很hacky,我想知道是否有更好的解决方案。

scala apache-spark bigdata rdd
2个回答
1
投票

听起来您应该使用内部联接:

  import spark.implicits._

  val a: DataFrame = spark.sparkContext.parallelize(Seq((1, 2.5), (2, 4.4), (3, 2.1))).toDF("keyA", "value")
  val b: DataFrame = spark.sparkContext.parallelize(Seq(3, 5, 1)).toDF("keyB")

  val c = a.join(b, $"keyA" === $"keyB", "inner").drop("keyB")

  c.show()

+----+-----+
|keyA|value|
+----+-----+
|   1|  2.5|
|   3|  2.1|
+----+-----+

并且如果您想返回c为RDD [(Int,Double)],则可以使用:

  val d = c.rdd.map(row => (row.get(0).asInstanceOf[Int], row.get(1).asInstanceOf[Double]))

0
投票

类似于ShemTov的答案,但是通过使用Datasets而不是DataFrames来保持类型安全。(PS:我建议您只使用Datasets而不是RDDs]

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local[*]").getOrCreate()
val sc = spark.sparkContext
import spark.implicits._

val a = sc.parallelize(List((1 -> 0.0), (2 -> 3.3), (3 -> 5.5), (5 -> 10.11)))
val b = sc.parallelize(List(2, 3, 4, 5))

val c = b.toDS.joinWith(a.toDS, $"value" === $"_1", "inner").map {
  case (_, (key, value)) => key -> value
}.rdd
© www.soinside.com 2019 - 2024. All rights reserved.