如何使用Spark RDD进行批量提交?

问题描述 投票:-2回答:1

我有很多项目的RDD,只是简化它喜欢:

[0,1,2,3,4,5,6,7,8,9]

并将这些项目提交给批处理API(API.post(a[]))。但API限制最大批次(exp.3)。因此,为了获得最佳性能,我需要将RDD迭代器转换为极限数组:

[[0,1,2], [3,4,5], [6,7,8], [9]]

我使用Spark Java将数据推送到API。

rdd.foreach(a -> { API.post(a)}

我的问题是如何改造它?

apache-spark rdd
1个回答
0
投票

要清楚,没有RDD迭代器,而是每个分区的迭代器。要访问它们,可以使用foreachPartition,然后使用普通的旧Java迭代器操作来完成对迭代器的批处理。以下是使用Spark Java API http://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/api/java/JavaRDD.html#foreachPartition-org.apache.spark.api.java.function.VoidFunction-和Guava的解决方案:

rdd.foreachPartition(it -> 
  Iterators.partition(it, batchSize)
           .forEachRemaining(API::post));
© www.soinside.com 2019 - 2024. All rights reserved.