我正在使用一个需要在循环中不断更新某些RDD的spark程序:
var totalRandomPath: RDD[String] = null
for (iter <- 0 until config.numWalks) {
var randomPath: RDD[String] = examples.map { case (nodeId, clickNode) =>
clickNode.path.mkString("\t")
}
for (walkCount <- 0 until config.walkLength) {
randomPath = edge2attr.join(randomPath.mapPartitions { iter =>
iter.map { pathBuffer =>
val paths: Array[String] = pathBuffer.split("\t")
(paths.slice(paths.size - 2, paths.size).mkString(""), pathBuffer)
}
}).mapPartitions { iter =>
iter.map { case (edge, (attr, pathBuffer)) =>
try {
if (pathBuffer != null && pathBuffer.nonEmpty && attr.dstNeighbors != null && attr.dstNeighbors.nonEmpty) {
val nextNodeIndex: PartitionID = GraphOps.drawAlias(attr.J, attr.q)
val nextNodeId: VertexId = attr.dstNeighbors(nextNodeIndex)
s"$pathBuffer\t$nextNodeId"
} else {
pathBuffer //add
}
} catch {
case e: Exception => throw new RuntimeException(e.getMessage)
}
}.filter(_ != null)
}
}
if (totalRandomPath != null) {
totalRandomPath = totalRandomPath.union(randomPath)
} else {
totalRandomPath = randomPath
}
}
在这个程序中,RDD totalRandomPath
和randomPath
不断更新,进行了大量的转换操作:join
和mapPartitions
。该计划将以collect
行动结束。
那么我需要坚持不断更新的RDD(totalRandomPath,randomPath)来加速我的火花程序吗? 我注意到这个程序在单节点机器上运行速度很快,但在三节点集群中运行时速度慢,为什么会发生这种情况呢?
是的,您需要保持更新的RDD并且还要使用旧的RDD
var totalRandomPath:RDD[String] = spark.sparkContext.parallelize(List.empty[String]).cache()
for (iter <- 0 until config.numWalks){
// existing logic
val tempRDD = totalRandomPath.union(randomPath).cache()
tempRDD foreach { _ => } //this will trigger cache operation for tempRDD immediately
totalRandomPath.unpersist() //unpersist old RDD which is no longer needed
totalRandomPath = tempRDD // point totalRandomPath to updated RDD
}