如何从Spark中的数据帧创建EdgeRDD

问题描述 投票:0回答:1

我在Spark中有一个数据框。每行代表一个人,我想检索其中的可能联系。拥有链接的规则是,对于每个可能的对,如果它们具有相同的prop1:String,并且prop2:Int的绝对差小于5,则该链接存在。我正在尝试了解使用数据框完成此任务的最佳方法。

我正在尝试检索索引的RDD:

val idusers = people.select("ID")
                     .rdd
                     .map(r => r(0).asInstanceOf[Int])
                     .zipWithIndex
val prop1users = people.select("ID")
                        .rdd
                        .map(r => (r(0).asInstanceOf[Int], r(1).asInstanceOf[String]))
val prop2users = people.select("ID")
                        .rdd
                        .map(r => (r(0).asInstanceOf[Int], r(2).asInstanceOf[Int]))

然后开始删除重复项,例如:

var links = idusers
            .join(idusers)
            .filter{ case (v1, v2) => v2._1 != v2._2 }

但是后来我还是被困在检查prop1 ...,是否有办法仅使用数据帧来完成所有这些步骤?

scala apache-spark spark-dataframe
1个回答
2
投票

假设您有类似的东西:

val sqlc : SQLContext = ???

case class Person(id: Long, country: String, age: Int)

val testPeople = Seq(
  Person(1, "Romania"    , 15),
  Person(2, "New Zealand", 30),
  Person(3, "Romania"    , 17),
  Person(4, "Iceland"    , 20),
  Person(5, "Romania"    , 40),
  Person(6, "Romania"    , 44),
  Person(7, "Romania"    , 45),
  Person(8, "Iceland"    , 21),
  Person(9, "Iceland"    , 22)
)

val people = sqlc.createDataFrame(testPeople)

您可以使用重命名的列来创建第一个自我奇迹,以避免列在自我联接中发生冲突:

val peopleR = people
  .withColumnRenamed("id"     , "idR")
  .withColumnRenamed("country", "countryR")
  .withColumnRenamed("age"    , "ageR")

现在您可以将数据框与自身连接,删除掉交换对和环边:

import org.apache.spark.sql.functions._

val relations = people.join(peopleR,
      (people("id") < peopleR("idR")) &&
        (people("country") === peopleR("countryR")) &&
        (abs(people("age") - peopleR("ageR")) < 5))

最后,您可以构建所需的EdgeRDD

import org.apache.spark.graphx._

val edges = EdgeRDD.fromEdges(relations.map(row => Edge(
      row.getAs[Long]("id"), row.getAs[Long]("idR"), ())))

relations.show()现在将输出:

+---+-------+---+---+--------+----+
| id|country|age|idR|countryR|ageR|
+---+-------+---+---+--------+----+
|  1|Romania| 15|  3| Romania|  17|
|  4|Iceland| 20|  8| Iceland|  21|
|  4|Iceland| 20|  9| Iceland|  22|
|  5|Romania| 40|  6| Romania|  44|
|  6|Romania| 44|  7| Romania|  45|
|  8|Iceland| 21|  9| Iceland|  22|
+---+-------+---+---+--------+----+

[edges.toLocalIterator.foreach(println)将输出:

Edge(1,3,())
Edge(4,8,())
Edge(4,9,())
Edge(5,6,())
Edge(6,7,())
Edge(8,9,())

0
投票

这将引发错误:

val边缘= EdgeRDD.fromEdges(relations.map(row => Edge(row.getAsLong,row.getAsLong,())))

:59:错误:缺少参数类型val edge = EdgeRDD.fromEdges(relations.map(row => Edge(row.getAsLong,row.getAsLong,())))^

这将起作用,首先需要将DF转换为rdd

val边缘= EdgeRDD.fromEdges(relations.rdd.map(行=> Edge(row.getAsLong,row.getAsLong,())))

边缘:org.apache.spark.graphx.impl.EdgeRDDImpl [Unit,Nothing] = EdgeRDDImpl [1247]在RDD处EdgeRDD.scala:41

© www.soinside.com 2019 - 2024. All rights reserved.