Kafka 没有可用的确认作为参数

问题描述 投票:0回答:2

我们正在尝试在 java spring 项目中实现 Kafka Acknowledgment。如果没有确认,我们可以成功接收并读取消息,但是当我们在方法中添加确认时,我们会收到此错误:

org.springframework.kafka.listener.ListenerExecutionFailedException: invokeHandler Failed; nested exception is java.lang.IllegalStateException: 
No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.;
 nested exception is java.lang.IllegalStateException:
 No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
 Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from 
 [proto.DeviceModelOuterClass$DevModel] to
 [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=sender: "12345678-1234-1234-1234-123456789102"

我们实现Acknowledge的方式就像Kafka API中描述的那样:

  @KafkaListener(
      id = Constants.TOPIC_LISTENER,
      topics = "${info.dev.name}",
      autoStartup = "false",
     properties = {
        "value.deserializer=com.cit.iomt.core.DevModelDeserializer",
        "key.deserializer=org.apache.kafka.common.serialization.UUIDDeserializer"
      })
  public void listenToUpdateTopic(@Payload DevModel message, Acknowledgment a) throws Exception {
    LOG.info(Constants.READ_KAFKA_TOPIC, message);
 a.acknowledge();}

在属性文件中我们有这个:

spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
java spring spring-boot apache-kafka spring-kafka
2个回答
3
投票

您是否有机会定义自定义

KafkaListenerContainerFactory
bean?就我而言,我是这样的,在这种情况下,
spring.kafka.listener.ack-mode=manual_immediate
配置没有效果。我必须以编程方式将 ack 模式设置到
kafkaListenerContainerFactory
bean 中。我的容器工厂 bean 定义(在 Kotlin 中)最终如下所示:

    @Bean
    fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<Any, Any>): KafkaListenerContainerFactory<KafkaMessageListenerContainer<Any, Any>> {
        var containerFactory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
        containerFactory.consumerFactory = consumerFactory
        containerFactory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE

        return containerFactory as KafkaListenerContainerFactory<KafkaMessageListenerContainer<Any, Any>>
    }

0
投票

错误:

Caused by: java.lang.IllegalStateException: No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.

这意味着侦听器容器未配置正确的确认模式来支持手动确认。

确保正确的AckMode 如果您的侦听器方法需要确认参数,则必须将侦听器容器工厂配置为使用手动确认模式:

package com.rakuten.restock.eventsprocessor.config;

import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.config.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
@RequiredArgsConstructor
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.client-id}")
    private String clientId;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);

        DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
        return consumerFactory;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaManualAckListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
}

Kafka 监听器方法应该同时具有有效负载和确认参数:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class MessageProcessor {

    @KafkaListener(topics = "${app.kafka.consumer-topic}", containerFactory = "kafkaManualAckListenerContainerFactory")
    public void processMessage(@Payload String payload, Acknowledgment ack) {
        try {
            // Process the message payload
            // Deserialize payload, process data, etc.
        } finally {
            // Acknowledge the message manually
            ack.acknowledge();
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.