这方面的资源很稀缺,我不确定这个问题是否有解决方案。
假设您有 3 个简单的
RDD
。或者更具体地说,是 3 个PairRDD
。
val rdd1: RDD[(Int, Int)] = sc.parallelize(Seq((1,0),(2,0),(3,0),(4,0),(5,0)))
val rdd2: RDD[(Int, Int)] = sc.parallelize(Seq((1,1),(2,1),(3,1),(4,1),(5,1)))
val rdd3: RDD[(Int, Int)] = sc.parallelize(Seq((1,2),(2,2),(3,2),(4,2),(5,2)))
rdd1 rdd2 rdd3
+------+------+ +------+------+ +------+------+
| K | V | | K | V | | K | V |
+------+------+ +------+------+ +------+------+
| 1 | 0 | | 1 | 1 | | 1 | 2 |
| 2 | 0 | | 2 | 1 | | 2 | 2 |
| 3 | 0 | | 3 | 1 | | 3 | 2 |
| 4 | 0 | | 4 | 1 | | 4 | 2 |
| 5 | 0 | | 5 | 1 | | 5 | 2 |
+------+------+ +------+------+ +------+------+
当一个人试图加入所有 3 个
RDD
时,问题就出现了。当我们加入它们时,我们得到以下结构
val joinedRdd: RDD[(Int,((Int, Int), Int))] = rdd1.join(rdd2).join(rdd3)
joinedRdd
+------+----------------+
| K | V |
+------+----------------+
| 1 | ((0, 1), 2) |
| 2 | ((0, 1), 2) |
| 3 | ((0, 1), 2) |
| 4 | ((0, 1), 2) |
| 5 | ((0, 1), 2) |
+------+----------------+
问题如下:假设你有n个
RDD
想要连接,连接的结果将是n-1个嵌套元组。您将如何解决解包嵌套元组的问题。事实证明,解除这些值的嵌套非常困难。模式匹配要求您知道要取消嵌套的元组的结构,我没有那么奢侈。
额外问题:在 Spark 的更高级别 API(数据帧/数据集 API)中,这个问题是否在代码生成中处理?例如,代码生成是否考虑了代码生成过程中的预期输出结构?
如果我们的任务压平随机计数元组或整数,也许
productIterator
在这种情况下会很有用,所有元组都实现了接口Product,然后我们可以通过productIterator方法迭代元组。
def flat(el: Product): List[Int] = {
el.productIterator.foldLeft(List[Int]()) {
(acc, v) => {
v match {
case v: Int => acc ++ List(v)
case v: Product => acc ++ flat(v)
}
}
}
}
joinedRdd.collect().foreach(println)
joinedRdd.map(flat).collect().foreach(println)
(1,((0,1),2))
(2,((0,1),2))
(3,((0,1),2))
(4,((0,1),2))
(5,((0,1),2))
List(1, 0, 1, 2)
List(2, 0, 1, 2)
List(3, 0, 1, 2)
List(4, 0, 1, 2)
List(5, 0, 1, 2)