不兼容的类型:列出CSVRecords java

问题描述 投票:0回答:1

我是Spark-Java的初学者,我想从Java 8的List中获取一个subList。然后我将其转换为RDD。我在下面的代码中做到了:

            List<CSVRecord> inputRecords = readInputLayer(actorSystem, inputCatalog, inputCatalogVersion);
            LOGGER.info("Number of partition " +inputRecords.size());

            List<CSVRecord> inputRecordsTmp = inputRecords.stream().limit(100).collect(Collectors.toList());


            JavaRDD<CSVRecord> inputRecordsJavaRDD = JavaSparkContext.emptyRDD();
            for (List<CSVRecord> partition: inputRecordsTmp ){
                JavaRDD<CSVRecord> inputRecordsTmpRDD = (JavaRDD<CSVRecord>) JavaSparkContext.parallelize(partition);
                inputRecordsJavaRDD = JavaSparkContext.union(inputRecordsJavaRDD,inputRecordsTmpRDD);

            }

        LOGGER.info("Number of lines to insert JAVA RDD =" +inputRecordsJavaRDD.count());

但是我在循环中遇到错误,它不接受List<CSVRecord> partition:

Incompatible types:
Required: org.apache.commons.csvRecord
Found: java.util.list  <org.apache.commons.csvRecord>

我该如何纠正?谢谢

java list apache-spark rdd
1个回答
0
投票

编辑:刚看了一下文档。 parallelize实际上接受了一个List,但这只意味着你根本不需要foreach。你应该能够将你的inputRecordsTmp直接传递给并行化并从中获取你的RDD。

尽管如此,由于foreach的配方不正确,我将在此处留下以获取更多信息:

首先,你的for-each-loop对我来说看起来并不正确。在Java中编写for-each-loop时,冒号前面的部分应该是你要迭代的集合中元素的类型。

在您的情况下,您有一个包含CSVRecord对象的List。在for-each中,你基本上是说“对于这个CSVRecord对象列表中的每个CSVRecord ......”。但是,您已经为“CSVRecord对象列表中的每个CSVRecord对象列表编写了......”,这没有多大意义。

Java已经知道inputRecordsTmp是一个CSVRecords列表,所以不需要在任何地方再指定它。您想要告诉它的是,对于列表中的每个对象,您希望将该对象提取到变量(在您的情况下为partition),以便您可以在循环中使用该提取的对象。

所以不要写作

for (List<CSVRecord> partition: inputRecordsTmp ){

你必须写

for (CSVRecord record: inputRecordsTmp ){

这可能会解决您的问题。如果没有,你能否提供一下究竟哪条线路失败的信息?

© www.soinside.com 2019 - 2024. All rights reserved.