无法获取 CsvReaderFormat 以供 Flink 读取 CSV 文件

问题描述 投票:0回答:1

我正在尝试使用文档中提到的 CsvReaderFormat 读取 flink 执行类中的 CSV 文件的简单任务。我有一个名为subscriberADSR的pojo(我知道以小写字母开头是一个不好的做法,但稍后会更正)

按照简单的说明创建 CSVReaderFormat 时

CsvReaderFormat<subscriberADSR> csvFormat = CsvReaderFormat<subscriberADSR>.

之后我什么也没得到。在代码帮助上。如果我使用以下内容,我会收到以下编译错误

CsvReaderFormat<subscriberADSR> csvFormat = CsvReaderFormat.forPojo(subscriberADSR.class);

The type org.apache.flink.connector.file.src.reader.SimpleStreamFormat cannot be resolved. It is indirectly referenced from required type org.apache.flink.formats.csv.CsvReaderFormat

以下是flink和flink csv的pom依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.19.1</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>1.19.1</version>
</dependency>

请帮助我理解我做错了什么

如果有人报告类似问题,我已经浏览了大部分论坛问题,但找不到任何内容

csv apache-flink flink-batch
1个回答
0
投票

你尝试过这个例子吗?

//Has to match the exact order of columns in the CSV file
@JsonPropertyOrder({"city","lat","lng","country","iso2",
                    "adminName","capital","population"})
    public static class CityPojo {
    public String city;
    public BigDecimal lat;
    public BigDecimal lng;
    public String country;
    public String iso2;
    public String adminName;
    public String capital;
    public long population;
}

Function<CsvMapper, CsvSchema> schemaGenerator = mapper ->
        mapper.schemaFor(CityPojo.class).withoutQuoteChar().withColumnSeparator('|');

CsvReaderFormat<CityPojo> csvFormat =
        CsvReaderFormat.forSchema(() -> new CsvMapper(), schemaGenerator, TypeInformation.of(CityPojo.class));

FileSource<CityPojo> source =
        FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();

https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/formats/csv/#advanced-configuration

© www.soinside.com 2019 - 2024. All rights reserved.