我是Spark-Java
的初学者,我想从Java 8
的List中获取一个subList。然后我将其转换为RDD。我在下面的代码中做到了:
List<CSVRecord> inputRecords = readInputLayer(actorSystem, inputCatalog, inputCatalogVersion);
LOGGER.info("Number of partition " +inputRecords.size());
List<CSVRecord> inputRecordsTmp = inputRecords.stream().limit(100).collect(Collectors.toList());
JavaRDD<CSVRecord> inputRecordsJavaRDD = JavaSparkContext.emptyRDD();
for (List<CSVRecord> partition: inputRecordsTmp ){
JavaRDD<CSVRecord> inputRecordsTmpRDD = (JavaRDD<CSVRecord>) JavaSparkContext.parallelize(partition);
inputRecordsJavaRDD = JavaSparkContext.union(inputRecordsJavaRDD,inputRecordsTmpRDD);
}
LOGGER.info("Number of lines to insert JAVA RDD =" +inputRecordsJavaRDD.count());
但是我在循环中遇到错误,它不接受List<CSVRecord> partition:
Incompatible types:
Required: org.apache.commons.csvRecord
Found: java.util.list <org.apache.commons.csvRecord>
我该如何纠正?谢谢
编辑:刚看了一下文档。 parallelize
实际上接受了一个List,但这只意味着你根本不需要foreach。你应该能够将你的inputRecordsTmp
直接传递给并行化并从中获取你的RDD。
尽管如此,由于foreach的配方不正确,我将在此处留下以获取更多信息:
首先,你的for-each-loop对我来说看起来并不正确。在Java中编写for-each-loop时,冒号前面的部分应该是你要迭代的集合中元素的类型。
在您的情况下,您有一个包含CSVRecord对象的List。在for-each中,你基本上是说“对于这个CSVRecord对象列表中的每个CSVRecord ......”。但是,您已经为“CSVRecord对象列表中的每个CSVRecord对象列表编写了......”,这没有多大意义。
Java已经知道inputRecordsTmp
是一个CSVRecords列表,所以不需要在任何地方再指定它。您想要告诉它的是,对于列表中的每个对象,您希望将该对象提取到变量(在您的情况下为partition
),以便您可以在循环中使用该提取的对象。
所以不要写作
for (List<CSVRecord> partition: inputRecordsTmp ){
你必须写
for (CSVRecord record: inputRecordsTmp ){
这可能会解决您的问题。如果没有,你能否提供一下究竟哪条线路失败的信息?