我正在尝试使用文档中提到的 CsvReaderFormat 读取 flink 执行类中的 CSV 文件的简单任务。我有一个名为subscriberADSR的pojo(我知道以小写字母开头是一个不好的做法,但稍后会更正)
按照简单的说明创建 CSVReaderFormat 时
CsvReaderFormat<subscriberADSR> csvFormat = CsvReaderFormat<subscriberADSR>.
之后我什么也没得到。在代码帮助上。如果我使用以下内容,我会收到以下编译错误
CsvReaderFormat<subscriberADSR> csvFormat = CsvReaderFormat.forPojo(subscriberADSR.class);
The type org.apache.flink.connector.file.src.reader.SimpleStreamFormat cannot be resolved. It is indirectly referenced from required type org.apache.flink.formats.csv.CsvReaderFormat
以下是flink和flink csv的pom依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.19.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.19.1</version>
</dependency>
请帮助我理解我做错了什么
如果有人报告类似问题,我已经浏览了大部分论坛问题,但找不到任何内容
你尝试过这个例子吗?
//Has to match the exact order of columns in the CSV file
@JsonPropertyOrder({"city","lat","lng","country","iso2",
"adminName","capital","population"})
public static class CityPojo {
public String city;
public BigDecimal lat;
public BigDecimal lng;
public String country;
public String iso2;
public String adminName;
public String capital;
public long population;
}
Function<CsvMapper, CsvSchema> schemaGenerator = mapper ->
mapper.schemaFor(CityPojo.class).withoutQuoteChar().withColumnSeparator('|');
CsvReaderFormat<CityPojo> csvFormat =
CsvReaderFormat.forSchema(() -> new CsvMapper(), schemaGenerator, TypeInformation.of(CityPojo.class));
FileSource<CityPojo> source =
FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();