我们正在尝试在 java spring 项目中实现 Kafka Acknowledgment。如果没有确认,我们可以成功接收并读取消息,但是当我们在方法中添加确认时,我们会收到此错误:
org.springframework.kafka.listener.ListenerExecutionFailedException: invokeHandler Failed; nested exception is java.lang.IllegalStateException:
No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.;
nested exception is java.lang.IllegalStateException:
No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from
[proto.DeviceModelOuterClass$DevModel] to
[org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=sender: "12345678-1234-1234-1234-123456789102"
我们实现Acknowledge的方式就像Kafka API中描述的那样:
@KafkaListener(
id = Constants.TOPIC_LISTENER,
topics = "${info.dev.name}",
autoStartup = "false",
properties = {
"value.deserializer=com.cit.iomt.core.DevModelDeserializer",
"key.deserializer=org.apache.kafka.common.serialization.UUIDDeserializer"
})
public void listenToUpdateTopic(@Payload DevModel message, Acknowledgment a) throws Exception {
LOG.info(Constants.READ_KAFKA_TOPIC, message);
a.acknowledge();}
在属性文件中我们有这个:
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
您是否有机会定义自定义
KafkaListenerContainerFactory
bean?就我而言,我是这样的,在这种情况下, spring.kafka.listener.ack-mode=manual_immediate
配置没有效果。我必须以编程方式将 ack 模式设置到 kafkaListenerContainerFactory
bean 中。我的容器工厂 bean 定义(在 Kotlin 中)最终如下所示:
@Bean
fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<Any, Any>): KafkaListenerContainerFactory<KafkaMessageListenerContainer<Any, Any>> {
var containerFactory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
containerFactory.consumerFactory = consumerFactory
containerFactory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
return containerFactory as KafkaListenerContainerFactory<KafkaMessageListenerContainer<Any, Any>>
}
错误:
Caused by: java.lang.IllegalStateException: No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
这意味着侦听器容器未配置正确的确认模式来支持手动确认。
确保正确的AckMode 如果您的侦听器方法需要确认参数,则必须将侦听器容器工厂配置为使用手动确认模式:
package com.rakuten.restock.eventsprocessor.config;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.config.ContainerProperties;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
@RequiredArgsConstructor
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.client-id}")
private String clientId;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
return consumerFactory;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
}
Kafka 监听器方法应该同时具有有效负载和确认参数:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
@Service
public class MessageProcessor {
@KafkaListener(topics = "${app.kafka.consumer-topic}", containerFactory = "kafkaManualAckListenerContainerFactory")
public void processMessage(@Payload String payload, Acknowledgment ack) {
try {
// Process the message payload
// Deserialize payload, process data, etc.
} finally {
// Acknowledge the message manually
ack.acknowledge();
}
}
}