Spark RDD 连接后拆包嵌套元组

问题描述 投票:0回答:1

这方面的资源很稀缺,我不确定这个问题是否有解决方案。

假设您有 3 个简单的

RDD
。或者更具体地说,是 3 个
PairRDD

val rdd1: RDD[(Int, Int)] = sc.parallelize(Seq((1,0),(2,0),(3,0),(4,0),(5,0)))
val rdd2: RDD[(Int, Int)] = sc.parallelize(Seq((1,1),(2,1),(3,1),(4,1),(5,1)))
val rdd3: RDD[(Int, Int)] = sc.parallelize(Seq((1,2),(2,2),(3,2),(4,2),(5,2)))

rdd1              rdd2              rdd3
+------+------+   +------+------+   +------+------+
| K    | V    |   | K    | V    |   | K    | V    |
+------+------+   +------+------+   +------+------+
|    1 |    0 |   |    1 |    1 |   |    1 |    2 |
|    2 |    0 |   |    2 |    1 |   |    2 |    2 |
|    3 |    0 |   |    3 |    1 |   |    3 |    2 |
|    4 |    0 |   |    4 |    1 |   |    4 |    2 |
|    5 |    0 |   |    5 |    1 |   |    5 |    2 |
+------+------+   +------+------+   +------+------+

当一个人试图加入所有 3 个

RDD
时,问题就出现了。当我们加入它们时,我们得到以下结构

val joinedRdd: RDD[(Int,((Int, Int), Int))] = rdd1.join(rdd2).join(rdd3)

joinedRdd
+------+----------------+
| K    | V              |
+------+----------------+  
|    1 |   ((0, 1), 2)  |
|    2 |   ((0, 1), 2)  |
|    3 |   ((0, 1), 2)  |
|    4 |   ((0, 1), 2)  |
|    5 |   ((0, 1), 2)  |
+------+----------------+

问题如下:假设你有n个

RDD
想要连接,连接的结果将是n-1个嵌套元组。您将如何解决解包嵌套元组的问题。事实证明,解除这些值的嵌套非常困难。模式匹配要求您知道要取消嵌套的元组的结构,我没有那么奢侈。

额外问题:在 Spark 的更高级别 API(数据帧/数据集 API)中,这个问题是否在代码生成中处理?例如,代码生成是否考虑了代码生成过程中的预期输出结构?

scala apache-spark rdd
1个回答
0
投票

如果我们的任务压平随机计数元组或整数,也许

productIterator
在这种情况下会很有用,所有元组都实现了接口Product,然后我们可以通过productIterator方法迭代元组。

def flat(el: Product): List[Int] = {
    el.productIterator.foldLeft(List[Int]()) {
      (acc, v) => {
       v match {
         case v: Int => acc ++ List(v)
         case v: Product => acc ++ flat(v)
       }
      }
    }
}

joinedRdd.collect().foreach(println)
joinedRdd.map(flat).collect().foreach(println)
(1,((0,1),2))
(2,((0,1),2))
(3,((0,1),2))
(4,((0,1),2))
(5,((0,1),2))

List(1, 0, 1, 2)
List(2, 0, 1, 2)
List(3, 0, 1, 2)
List(4, 0, 1, 2)
List(5, 0, 1, 2)
© www.soinside.com 2019 - 2024. All rights reserved.