发送字符串列表时构建kafka生产者失败

问题描述 投票:0回答:1

我是 Kafka 新手,我正在尝试读取一个文本文件并创建一个要发送给消费者的字符串列表。我正在使用 Java 21 和 Spring Boot 3.2.0(快照)。

这是 Kafka 生产者配置:

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    public Map<String, Object> producerConfig(){
        Map<String,Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ListSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ListSerializer.class.getName());

        return properties;
    }

    @Bean
    public ProducerFactory<String, List<String>> producerFactory(){
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, List<String>> kafkaTemplate(ProducerFactory<String,List<String>> producerFactory){
        return new KafkaTemplate<>(producerFactory);
    }
}

这是主题配置:

@Configuration
public class KafkaTopicConfig {
    
    @Bean
    public NewTopic alexTopic(){
        return TopicBuilder.name("securityTopic")
                .build();
    }
}

这是“application.properties”:

spring.servlet.multipart.enabled=true
spring.servlet.multipart.max-file-size=10MB
spring.servlet.multipart.max-request-size=10MB
spring.kafka.bootstrap-servers=192.168.241.133:9092

这就是我接收文件的方式:

public Optional uploadFile(MultipartFile file){
    if(file.isEmpty()){
        return Optional.of(new EmptyFileException("File is empty please double check"));
    }
    
    return Optional.of(this.upload.uploadFile(file));
}

这就是我将“List”发送到kafka的方式:

public UploadFileService(KafkaTemplate<String, List<String>> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
}

@Override
public String uploadFile(MultipartFile file) {
    try(BufferedReader reader = new BufferedReader(
                                    new InputStreamReader(file.getInputStream()))){

        String line;
        int i=0;
        List<String> list = new ArrayList<>();
        while ((line=reader.readLine())!=null) {
            if(i==10){
                this.kafkaTemplate.send("securityTopic",list);

如果我发送纯字符串并且我正在使用:

StringSerializer.class

而不是:

ListSerializer.class.getName()

在 Kafka 生产者配置中,还将其他泛型更改为 String 而不是 List 是有效的。但是我怎样才能发送List呢?我再次使用 Kafka 2 或 3 天,我不知道是否必须创建自己的序列化器或使用内置的东西?

给我的错误如标题所示:“无法构建kafka生产者”。

我也尝试过使用“ListSerializer.class”,但不起作用。

我也尝试使用“JsonSerialize.class”,但仍然是同样的错误。

感谢您花时间阅读本文!

spring-boot apache-kafka spring-kafka kafka-producer-api java-21
1个回答
0
投票

导入错误。正确的导入是:

“导入org.springframework.kafka.support.serializer.JsonSerializer;”

© www.soinside.com 2019 - 2024. All rights reserved.