如何提高Spark的性能?

问题描述 投票:0回答:1

我有一个Java程序,工作在大型数据集上。该数据集存储在hdfs(csv)中。

程序工作正常,但速度很慢。

程序要做什么?

  1. 加载csv文件
  2. 分行到String[]
  3. 过滤器 字符串数组
  4. 映射到MyObject
  5. 保存MyObject到Cassandra

这是我的主要方法。

public static void main(String[] args) {

        // configure spark
        SparkConf sparkConf = new SparkConf().setAppName("Write to cassandra app")
                .setMaster("local[*]")
                .set("spark.executor.memory", "4g");

        if (args.length > 1)
            sparkConf.set("spark.cassandra.connection.host", args[1]);

        // start a spark context
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        // read text file to RDD
        JavaRDD<String> lines = sc.textFile(args[0]);

        JavaRDD<MyObject> myObjectJavaRDD = lines
                .map(line -> line.split(","))
                .filter(someFilter)
                .map(MyObject::new);

        javaFunctions(myObjectJavaRDD).writerBuilder("ks", "table", mapToRow(MyObject.class)).saveToCassandra();
    }

如何才能提高敷衍了事的能力?

谢谢你的回答。

java apache-spark cassandra hdfs
1个回答
3
投票

你的代码不存在shuffle的问题(除非你要写出来到HDFS),而且默认的分区是由输入格式定义的,在Hadoop上是按HDFS核心分割的,filter或map不会改变分区。如果能先过滤,可以看到一些改进的地方

        JavaRDD<MyObject> myObjectJavaRDD = lines
                .filter(someFilter)
                .map(line -> line.split(","))
                .map(MyObject::new);

Spark只能为RDD的每个分区运行1个并发任务,最多是集群中的核数。因此,如果你的集群有50个核,你希望你的RDDs至少有50个分区。至于选择一个 "好 "的分区数量,一般来说,您希望至少有与并行执行器数量相同的分区。您可以通过调用

sc.defaultParallelism

或通过以下方式查看RDD分区号

someRDD.partitions.size

当使用下列方法读取文件创建RDD时

rdd = SparkContext().textFile("hdfs://…/file.txt") 

分区的数量可能会更少。理想情况下,你会得到与 HDFS 中相同的块数,但如果你的文件中的行太长(长于块的大小),分区的数量就会减少。

设置 RDD 分区数量的首选方法是直接将其作为调用中的第二个输入参数,如

rdd = sc.textFile("hdfs://… /file.txt", 400) 

其中 400 是分区的数量。在这种情况下,分区使得400个分割将由Hadoop的TextInputFormat ,而不是Spark来完成,它的工作速度会快很多。It'salso,代码会产生400个并发任务,尝试将file.txt直接加载到400partition中。

重新分区:增加分区,过滤器增加paralellism后重新平衡分区。

        repartition(numPartitions: Int)

Coalesce: 在输出到HDFSexternal之前,减少分区,不进行洗牌合并。

    coalesce(numPartitions: Int, suffle: Boolean = false)

最后,也是同样重要的,你可以做一些试验,用不同的值和基准,看看有多少时间是采取的过程。

  val start = System.nanoTime()

  // my process

  val end = System.nanoTime()

  val time = end - start
  println(s"My App takes: $time")

我希望,它能帮助你


0
投票
  1. 使用本地模式将在您的本地机器上启动任务。
    def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
    // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
    val threadCount = if (threads == "*") localCpuCount else threads.toInt 

查看 集群模式

  1. hdfs文件的并行性取决于它的块大小,请根据你的集群核心数尝试增加或减少。

  2. 如果可能的话,在读取文件后立即移动过滤器。

© www.soinside.com 2019 - 2024. All rights reserved.