使用reduceByKey(numPartitions)或重新分区标准化SPARK RDD分区

问题描述 投票:0回答:1

使用Spark 2.4.0。我的生产数据非常不对称,因此其中一项任务花费的时间比其他所有任务都要长7倍。我尝试了不同的策略对数据进行规范化,以便所有执行程序均能正常工作-

  1. spark.default.parallelism
  2. reduceByKey(numPartitions)
  3. repartition(numPartitions)

我期望他们三个都应该均匀划分,但是在Spark Local / Standalone上使用一些虚拟的非生产数据表明,选项1,2的标准化要好于3。

数据如下:(我正在尝试简单地减少每个帐户+ CCY组合的余额

account}date}ccy}amount
A1}2020/01/20}USD}100.12
A2}2010/01/20}SGD}200.24
A2}2010/01/20}USD}300.36
A1}2020/01/20}USD}400.12

预期结果应为[A1-USD,500.24], [A2-SGD,200.24], [A2-USD,300.36],理想情况下,应将它们划分为3个不同的分区。

javaRDDWithoutHeader
.mapToPair((PairFunction<Balance, String, Integer>) balance -> new Tuple2<>(balance.getAccount() + balance.getCcy(), 1))        
    .mapToPair(new MyPairFunction())
   .reduceByKey(new ReductionFunction())

检查分区的代码

     System.out.println("b4 = " +pairRDD.getNumPartitions());
     System.out.println(pairRDD.glom().collect());
     JavaPairRDD<DummyString, BigDecimal> newPairRDD = pairRDD.repartition(3);
     System.out.println("Number of partitions = " +newPairRDD.getNumPartitions());
     System.out.println(newPairRDD.glom().collect());
  1. 选项1:不执行任何操作
  2. 选项2:将spark.default.parallelism设置为3
  3. 选项3:numPartitions = 3的reduceByKey
  4. 选项4:repartition(3)

    对于选项1分区数= 2[[(DummyString {account ='A2',ccy ='SGD'},200.24),(DummyString {account ='A2',ccy ='USD'},300.36)],[(DummyString {account ='A1',ccy ='USD'},500.24)]]

    对于选项2

    分区数= 3[[(DummyString {account ='A1',ccy ='USD'},500.24)],[(DummyString {account ='A2',ccy ='USD'},300.36)],[(DummyString {account ='A2',ccy ='SGD'},200.24)]]

    对于选项3分区数= 3[[(DummyString {account ='A1',ccy ='USD'},500.24)],[(DummyString {account ='A2',ccy ='USD'},300.36)],[(DummyString {account ='A2',ccy ='SGD'},200.24)]]

    对于选项4分区数= 3[[],[(DummyString {account ='A2',ccy ='SGD'},200.24)],[(DummyString {account ='A2',ccy ='USD'},300.36),(DummyString {account ='A1',ccy ='USD'},500.24)]]

Conclusion:选项2(spark.default.parallelism)和3(reduceByKey(numPartitions)的规格化比选项4(分区)好得多确定性的结果,从未见过option4归一化为3个分区。

问题:

  1. reduceByKey(numPartitions)比重新分区或]好多了>
  2. 这仅仅是因为样本数据集如此之小?或
  3. 当我们通过YARN群集提交时,这种行为是否会有所不同

使用Spark 2.4.0。我的生产数据非常不对称,因此其中一项任务花费的时间比其他所有任务都要长7倍。我尝试了不同的策略来规范化数据,以便所有执行程序...

performance apache-spark concurrency reducers
1个回答
0
投票

我认为这个问题涉及一些问题,因此很难回答。

© www.soinside.com 2019 - 2024. All rights reserved.