我有以下类型的纯Scala代码:
import breeze.numerics.log
import spire.random.Dist
import org.apache.commons.math3.distribution.NormalDistribution
import scala.collection.mutable.Buffer
def foo1(zs: Buffer[Double])={
val S = zs.zip(zs.reverse)
.map { case (x, y) =>log(x) * log(1 - y) }.sum
S
}
val x = Dist.uniform(0.0, 1.0).sample[Buffer](10)
val y = x.sortWith(_<_)
val cdf=new NormalDistribution(0, 1)
val z = y.map(x_ => cdf.cumulativeProbability(x_))
foo1(z)
z
排序因为cdf
正在增加
我想为Spark重写它,但对于RDD数据类型,没有反向方法。我如何为Spark编写此代码?
def foo2(z_rdd: RDD[Double])={
var S = z_rdd.zip(z_rdd.???)
.map { case (x, y) =>log(x) * log(1 - y) }.sum
S
}
其中???
function反转z_rdd
。
如果你试图用自己的反转副本压缩RDD,你应该记住,Spark zip需要两个RDD同等分区:
假设两个RDD在每个分区中具有相同数量的分区和相同数量的元素(例如,一个是通过另一个映射制作的)。
因此,完成rdd zip rdd.reversed
的方法是:
zipWithIndex
应用于RDDreduceByKey
或groupByKey
来自步骤1和2的RDD的联合,以索引为关键我不确定这个食谱是否可以改进。
您可以使用zipWithIndex
将索引添加到RDD的值,然后按索引反向排序:
z_rdd.zip(
z_rdd.zipWithIndex()
.sortBy(_._2, ascending = false)
).map({ case (doubleA, (doubleB, _)) =>
…
})