DefaultKafkaHeaderMapper 无法解码 json 类型

问题描述 投票:0回答:1

我使用的是spring-kafka,我的生产者是

org.apache.kafka:kafka-clients:2.3.12
,消费者是
org.springframework.kafka:spring-kafka:2.1.7.RELEASE
我在消费者方面收到以下错误

org.springframework.kafka.KafkaListenerEndpointContainer#41-0-C-1 DefaultKafkaHeaderMapper Could not decode json type: abc-123 for key: myKey1 com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'abc': was expecting ('true', 'false' or 'null')
at [Source: (byte[])"abc-123"; line: 1, column: ]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:703)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3532)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2627)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:832)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:729)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091)
at org.springframework.kafka.support.DefaultKafkaHeaderMapper.lambda$toHeaders$1(DefaultKafkaHeaderMapper.java:233)
at java.lang.Iterable.forEach(Iterable.java:75)
at org.springframework.kafka.support.DefaultKafkaHeaderMapper.toHeaders(DefaultKafkaHeaderMapper.java:216)
at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:106)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:229)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:75)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1071)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1051)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:998)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:866)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:724)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)

我的消费者代码:

@KafkaListener(containerFactory = "ListenerContainerFactory", topics = "${kafka.topic}")
public void receive(ConsumerRecord<String, String> consumerRecord, @Headers Map headers ) (

}

配置与此类似ListenerConfig 包括方法

@Bean
public DefaultKafkaHeaderMapper headerMapper(){
    return new DefaultKafkaHeaderMapper();
}
spring-boot apache-kafka spring-kafka
1个回答
0
投票

使用 DefaultKafkaHeaderMapper.setEncodeStrings(true) 添加 bean

像这样

@Configuration
public class KafkaConfig {

  @Bean
  public RecordMessageConverter recordMessageConverter() {
    DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
    headerMapper.setEncodeStrings(true);

    MessagingMessageConverter converter = new MessagingMessageConverter();
    converter.setHeaderMapper(headerMapper);

    return converter;
  }

}

DefaultKafkaHeaderMapper 中的注释

/**
 * Set to true to encode String-valued headers as JSON ("..."), by default just the
 * raw String value is converted to a byte array using the configured charset. Set to
 * true if a consumer of the outbound record is using Spring for Apache Kafka version
 * less than 2.3
 * @param encodeStrings true to encode (default false).
 * @since 2.3
 */
public void setEncodeStrings(boolean encodeStrings) {
    this.encodeStrings = encodeStrings;
}
© www.soinside.com 2019 - 2024. All rights reserved.