我是 Kafka 新手,我正在尝试读取一个文本文件并创建一个要发送给消费者的字符串列表。我正在使用 Java 21 和 Spring Boot 3.2.0(快照)。
这是 Kafka 生产者配置:
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
public Map<String, Object> producerConfig(){
Map<String,Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ListSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ListSerializer.class.getName());
return properties;
}
@Bean
public ProducerFactory<String, List<String>> producerFactory(){
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String, List<String>> kafkaTemplate(ProducerFactory<String,List<String>> producerFactory){
return new KafkaTemplate<>(producerFactory);
}
}
这是主题配置:
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic alexTopic(){
return TopicBuilder.name("securityTopic")
.build();
}
}
这是“application.properties”:
spring.servlet.multipart.enabled=true
spring.servlet.multipart.max-file-size=10MB
spring.servlet.multipart.max-request-size=10MB
spring.kafka.bootstrap-servers=192.168.241.133:9092
这就是我接收文件的方式:
public Optional uploadFile(MultipartFile file){
if(file.isEmpty()){
return Optional.of(new EmptyFileException("File is empty please double check"));
}
return Optional.of(this.upload.uploadFile(file));
}
这就是我将“List
public UploadFileService(KafkaTemplate<String, List<String>> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public String uploadFile(MultipartFile file) {
try(BufferedReader reader = new BufferedReader(
new InputStreamReader(file.getInputStream()))){
String line;
int i=0;
List<String> list = new ArrayList<>();
while ((line=reader.readLine())!=null) {
if(i==10){
this.kafkaTemplate.send("securityTopic",list);
如果我发送纯字符串并且我正在使用:
StringSerializer.class
而不是:
ListSerializer.class.getName()
在 Kafka 生产者配置中,还将其他泛型更改为 String 而不是 List
给我的错误如标题所示:“无法构建kafka生产者”。
我也尝试过使用“ListSerializer.class”,但不起作用。
我也尝试使用“JsonSerialize.class”,但仍然是同样的错误。
感谢您花时间阅读本文!
导入错误。正确的导入是:
“导入org.springframework.kafka.support.serializer.JsonSerializer;”