如何正确使用mapPartitions函数

问题描述 投票:0回答:1

我正在做一个包含大数据的程序,这就是我使用Spark和Scala的原因。我需要对数据库进行分区,为此我使用

var data0 = conf.dataBase.repartition (8) .persist (StorageLevel.MEMORY_AND_DISK_SER)

但后来我需要在分区中做一些事情,然后继续使用与该分区对应的数据库并为此使用

var tester = data0.mapPartitions {x =>
   configFuzzyPredProblem ()
   Strategy.getStrategy.executeStrategy (conf.iterByRun, 5, GeneratorType.HillClimbing)
 } .persist (StorageLevel.MEMORY_AND_DISK_SER)

在方法executeStrategy()中我使用数据库,但我不知道它是全局的还是对应于该分区的数据库。我怎么知道我正在使用哪一个然后只用该分区的数据库执行分区处理?

scala apache-spark intellij-idea-2016
1个回答
2
投票

下面是一个使用mapPartitionsWithIndex的简单示例,它遵循mapPartitions的相同规则 - 不包括索引方面。

你可以看到在mapPartitions中你需要处理一个interable,在这个例子中是一个Interator Int。在这种情况下,在您的情况8中处理3个分区,其中包含一些条目或可能为零条目。

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = {
    iter.map(x => index + "," + x)
}
val rdd2 = rdd1.mapPartitionsWithIndex(myfunc)

我看不到你的功能,但我认为它没问题,它将处理一个分区 - 数据库的一部分。

© www.soinside.com 2019 - 2024. All rights reserved.