我有一个Java程序,工作在大型数据集上。该数据集存储在hdfs(csv)中。
程序工作正常,但速度很慢。
程序要做什么?
这是我的主要方法。
public static void main(String[] args) {
// configure spark
SparkConf sparkConf = new SparkConf().setAppName("Write to cassandra app")
.setMaster("local[*]")
.set("spark.executor.memory", "4g");
if (args.length > 1)
sparkConf.set("spark.cassandra.connection.host", args[1]);
// start a spark context
JavaSparkContext sc = new JavaSparkContext(sparkConf);
// read text file to RDD
JavaRDD<String> lines = sc.textFile(args[0]);
JavaRDD<MyObject> myObjectJavaRDD = lines
.map(line -> line.split(","))
.filter(someFilter)
.map(MyObject::new);
javaFunctions(myObjectJavaRDD).writerBuilder("ks", "table", mapToRow(MyObject.class)).saveToCassandra();
}
如何才能提高敷衍了事的能力?
谢谢你的回答。
你的代码不存在shuffle的问题(除非你要写出来到HDFS),而且默认的分区是由输入格式定义的,在Hadoop上是按HDFS核心分割的,filter或map不会改变分区。如果能先过滤,可以看到一些改进的地方
JavaRDD<MyObject> myObjectJavaRDD = lines
.filter(someFilter)
.map(line -> line.split(","))
.map(MyObject::new);
Spark只能为RDD的每个分区运行1个并发任务,最多是集群中的核数。因此,如果你的集群有50个核,你希望你的RDDs至少有50个分区。至于选择一个 "好 "的分区数量,一般来说,您希望至少有与并行执行器数量相同的分区。您可以通过调用
sc.defaultParallelism
或通过以下方式查看RDD分区号
someRDD.partitions.size
当使用下列方法读取文件创建RDD时
rdd = SparkContext().textFile("hdfs://…/file.txt")
分区的数量可能会更少。理想情况下,你会得到与 HDFS 中相同的块数,但如果你的文件中的行太长(长于块的大小),分区的数量就会减少。
设置 RDD 分区数量的首选方法是直接将其作为调用中的第二个输入参数,如
rdd = sc.textFile("hdfs://… /file.txt", 400)
其中 400 是分区的数量。在这种情况下,分区使得400个分割将由Hadoop的TextInputFormat ,而不是Spark来完成,它的工作速度会快很多。It'salso,代码会产生400个并发任务,尝试将file.txt直接加载到400partition中。
重新分区:增加分区,过滤器增加paralellism后重新平衡分区。
repartition(numPartitions: Int)
Coalesce: 在输出到HDFSexternal之前,减少分区,不进行洗牌合并。
coalesce(numPartitions: Int, suffle: Boolean = false)
最后,也是同样重要的,你可以做一些试验,用不同的值和基准,看看有多少时间是采取的过程。
val start = System.nanoTime()
// my process
val end = System.nanoTime()
val time = end - start
println(s"My App takes: $time")
我希望,它能帮助你
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount = if (threads == "*") localCpuCount else threads.toInt
查看 集群模式
hdfs文件的并行性取决于它的块大小,请根据你的集群核心数尝试增加或减少。
如果可能的话,在读取文件后立即移动过滤器。