我有一个有3列的RDD(road_idx,snodeidx,enodeidx)。它看起来像这样:
(roadidx_995, 1138, 1145)
(roadidx_996, 1138, 1139)
(roadidx_997, 2740, 1020)
(roadidx_998, 2762, 2740)
(roadidx_999, 3251, 3240)
.........
如何将具有一个共同的snodeidx或enodeidx的road_idx组合在一起?从1开始为每组提供一个数字。
预期产量:
(1,[roadidx_995,roadidx_996])
(2,[roadidx_997,roadidx_998])
(3,[roadidx_999])
如上所示,
roadidx_995和roadidx_996具有相同的snodeidx 1138。
roadidx_997的snodeidx与roadidx_998的enodeidx相同,即2740。
roadidx_999独自在一个组中。
Scala代码或Python代码都可以。只要您能告诉我使用RDD API获取预期输出的逻辑。
非常感激!
可以实现为:
|------------------|----------------|--------------|----------------|
| start join start | start join end | end join end | end join start |
|------------------|----------------|--------------|----------------|
在Scala上可以实现:
val data = List(
("roadidx_995", 1138, 1145),
("roadidx_996", 1138, 1139),
("roadidx_997", 2740, 1020),
("roadidx_998", 2762, 2740),
("roadidx_999", 3251, 3240)
)
val original = sparkContext.parallelize(data)
val groupedByStart = original.map(v => (v._1, v._2)).groupBy(_._2).mapValues(_.map(_._1))
val groupedByEnd = original.map(v => (v._1, v._3)).groupBy(_._2).mapValues(_.map(_._1))
val indexesOnly = original.map(allRow => (allRow._2, allRow._3))
// join by start value
val startJoinsStart = indexesOnly.keyBy(_._1).join(groupedByStart)
val startJoinsEnd = startJoinsStart.leftOuterJoin(groupedByEnd)
// join by end value
val endKeys = startJoinsEnd.values.keyBy(_._1._1._2)
val endJoinsEnd = endKeys.join(groupedByEnd)
val endJoinsStart = endJoinsEnd.leftOuterJoin(groupedByStart)
// flatten to output format
val result = endJoinsStart
.values
.map(v => (v._1._1._1._2, v._1._1._2, v._1._2, v._2))
.map(v => v._1 ++ v._2.getOrElse(Seq()) ++ v._3 ++ v._4.getOrElse(Seq()))
.map(_.toSet)
.distinct()
result.foreach(println)
输出是:
Set(roadidx_995, roadidx_996)
Set(roadidx_998, roadidx_997)
Set(roadidx_999)