我正在尝试使用 Spark 的 GraphX 库实现 拓扑排序。
这是我到目前为止编写的代码:
MyObject.scala
import java.util.ArrayList
import scala.collection.mutable.Queue
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.EdgeDirection
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.Graph.graphToGraphOps
import org.apache.spark.graphx.VertexId
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
object MyObject {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Spark-App").setMaster("local[2]")
val sc = new SparkContext(conf)
val resources: RDD[Resource] = makeResources(sc)
val relations: RDD[Relation] = makeRelations(sc)
println("Building graph ...")
var graph = buildGraph(resources, relations, sc)
println("Graph built!!")
println("Testing topo sort ...")
val topoSortResult = topoSort(graph, sc);
println("topoSortResult = " + topoSortResult)
println("Testing topo sort done!")
}
def buildGraph(resources: RDD[Resource], relations: RDD[Relation], sc: SparkContext): Graph[Resource, Relation] =
{
val vertices: RDD[(Long, Resource)] = resources.map(resource => (resource.id, resource))
val edges: RDD[Edge[Relation]] = relations.map(relation => Edge(relation.srcId, relation.dstId, relation))
var graph = Graph[Resource, Relation](vertices, edges)
graph
}
def makeResources(sc: SparkContext): RDD[Resource] =
{
var list: List[Resource] = List()
list = list :+ new Resource(1L)
list = list :+ new Resource(2L)
list = list :+ new Resource(3L)
list = list :+ new Resource(4L)
list = list :+ new Resource(5L)
sc.parallelize(list)
}
def makeRelations(sc: SparkContext): RDD[Relation] =
{
var list: List[Relation] = List()
list = list :+ new Relation(1L, "depends_on", 2L)
list = list :+ new Relation(3L, "depends_on", 2L)
list = list :+ new Relation(4L, "depends_on", 2L)
list = list :+ new Relation(5L, "depends_on", 2L)
sc.parallelize(list)
}
def topoSort(graph: Graph[Resource, Relation], sc: SparkContext): java.util.List[(VertexId, Resource)] =
{
// Will contain the result
val sortedResources: java.util.List[(VertexId, Resource)] = new ArrayList()
// Contains all the vertices
val vertices = graph.vertices
// Contains all the vertices whose in-degree > 0
val inDegrees = graph.inDegrees;
val inDegreesKeys_array = inDegrees.keys.collect();
// Contains all the vertices whose in-degree == 0
val inDegreeZeroList = vertices.filter(vertex => !inDegreesKeys_array.contains(vertex._1))
// A map of vertexID vs its in-degree
val inDegreeMapRDD = inDegreeZeroList.map(vertex => (vertex._1, 0)).union(inDegrees);
// Insert all the resources whose in-degree == 0 into a queue
val queue = new Queue[(VertexId, Resource)]
for (vertex <- inDegreeZeroList.toLocalIterator) { queue.enqueue(vertex) }
// Get an RDD containing the outgoing edges of every vertex
val neighbours = graph.collectNeighbors(EdgeDirection.Out)
// Initiate the algorithm
while (!queue.isEmpty) {
val vertex_top = queue.dequeue()
// Add the topmost element of the queue to the result
sortedResources.add(vertex_top)
// Get the neigbours (from outgoing edges) of this vertex
// This will be an RDD containing just 1 element which will be an array of neighbour vertices
val vertex_neighbours = neighbours.filter(vertex => vertex._1.equals(vertex_top._1))
// For each vertex, decrease its in-degree by 1
vertex_neighbours.foreach(arr => {
val neighbour_array = arr._2
neighbour_array.foreach(vertex => {
val oldInDegree = inDegreeMapRDD.filter(vertex_iter => (vertex_iter._1 == vertex._1)).first()._2
val newInDegree = oldInDegree - 1
// Reflect the new in-degree in the in-degree map RDD
inDegreeMapRDD.map(vertex_iter => {
if (vertex_iter._1 == vertex._1) {
(vertex._1, newInDegree)
}
else{
vertex_iter
}
});
// Add this vertex to the result if its in-degree has become zero
if (newInDegree == 0) {
queue.enqueue(vertex)
}
})
})
}
return sortedResources
}
}
资源.scala
class Resource(val id: Long) extends Serializable {
override def toString(): String = {
"id = " + id
}
}
Relation.scala
class Relation(val srcId: Long, val name: String, val dstId: Long) extends Serializable {
override def toString(): String = {
srcId + " " + name + " " + dstId
}
}
我收到错误:
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
线路
val oldInDegree = inDegreeMapRDD.filter(vertex_iter => (vertex_iter._1 == vertex._1)).first()._2
。
我猜这是因为在其他 RDD 的 for-each 循环内修改 RDD 是非法的。
另外,我担心
queue.enqueue(vertex)
不起作用,因为不可能在for-each
循环内修改本地集合。
如何正确实现这个拓扑排序算法?
异常的完整堆栈跟踪已上传此处(必须从外部上传以防止超出StackOverflow的主体大小限制)。
vertex_neighbours.foreach(arr => {
val neighbour_array = arr._2
neighbour_array.foreach(vertex => {
. . .
外部的 foreach 可以用 for 循环代替。
val vertex_neighbours = neighbours.filter(vertex => vertex._1.equals(vertex_top._1)).collect()
在对 RDD 进行 for 循环之前,您需要获取 RDD。
使用spark graphx实现的拓扑排序算法。消息传递从入度为零的节点开始,然后减小具有从 u(入度 == 0)到 v(入度非零)的连接路径的节点的入度值。
import scala.reflect.ClassTag
import org.apache.spark.graphx._
object TopologicalSort {
def run[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED],
maxIterations: Int = 500
): Graph[(Int, Int), ED] = {
val degreesMap = graph.inDegrees.collect().toMap
// initialize the graph vertices with in-degree value & topological order
// equal to zero (in-degree, order)
val tsGraph = graph.mapVertices((vid, _) => (degreesMap.getOrElse(vid, 0), 0))
def sendMessage(edge: EdgeTriplet[(Int, Int), ED]): Iterator[(VertexId, (Int, Int))] =
if (edge.srcAttr._1 == 0 && edge.dstAttr._1 > 0)
Iterator((edge.dstId, (1, edge.srcAttr._2 + 1)))
else Iterator.empty
def mergeMessage(a: (Int, Int), b: (Int, Int)): (Int, Int) = (a._1 + b._1, Math.min(a._2, b._2))
def vertexProgram(vid: VertexId, attr: (Int, Int), message: (Int, Int)): (Int, Int) =
if (attr._1 == 0) (0, message._2) else (attr._1 - message._1, message._2)
val initialMsg = (0, 0)
Pregel(tsGraph, initialMsg, maxIterations, EdgeDirection.Out)(
(vid, attr, msg) => vertexProgram(vid, attr, msg),
sendMessage,
mergeMessage
)}}