我有很多项目的RDD,只是简化它喜欢:
[0,1,2,3,4,5,6,7,8,9]
并将这些项目提交给批处理API(API.post(a[])
)。但API限制最大批次(exp.3)。因此,为了获得最佳性能,我需要将RDD迭代器转换为极限数组:
[[0,1,2], [3,4,5], [6,7,8], [9]]
我使用Spark Java将数据推送到API。
rdd.foreach(a -> { API.post(a)}
我的问题是如何改造它?
要清楚,没有RDD迭代器,而是每个分区的迭代器。要访问它们,可以使用foreachPartition
,然后使用普通的旧Java迭代器操作来完成对迭代器的批处理。以下是使用Spark Java API http://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/api/java/JavaRDD.html#foreachPartition-org.apache.spark.api.java.function.VoidFunction-和Guava的解决方案:
rdd.foreachPartition(it ->
Iterators.partition(it, batchSize)
.forEachRemaining(API::post));