我使用的是spring-kafka,我的生产者是
org.apache.kafka:kafka-clients:2.3.12
,消费者是org.springframework.kafka:spring-kafka:2.1.7.RELEASE
我在消费者方面收到以下错误
org.springframework.kafka.KafkaListenerEndpointContainer#41-0-C-1 DefaultKafkaHeaderMapper Could not decode json type: abc-123 for key: myKey1 com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'abc': was expecting ('true', 'false' or 'null')
at [Source: (byte[])"abc-123"; line: 1, column: ]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:703)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3532)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2627)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:832)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:729)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091)
at org.springframework.kafka.support.DefaultKafkaHeaderMapper.lambda$toHeaders$1(DefaultKafkaHeaderMapper.java:233)
at java.lang.Iterable.forEach(Iterable.java:75)
at org.springframework.kafka.support.DefaultKafkaHeaderMapper.toHeaders(DefaultKafkaHeaderMapper.java:216)
at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:106)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:229)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:75)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1071)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1051)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:998)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:866)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:724)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
我的消费者代码:
@KafkaListener(containerFactory = "ListenerContainerFactory", topics = "${kafka.topic}")
public void receive(ConsumerRecord<String, String> consumerRecord, @Headers Map headers ) (
}
配置与此类似ListenerConfig 包括方法
@Bean
public DefaultKafkaHeaderMapper headerMapper(){
return new DefaultKafkaHeaderMapper();
}
使用 DefaultKafkaHeaderMapper.setEncodeStrings(true) 添加 bean
像这样
@Configuration
public class KafkaConfig {
@Bean
public RecordMessageConverter recordMessageConverter() {
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
headerMapper.setEncodeStrings(true);
MessagingMessageConverter converter = new MessagingMessageConverter();
converter.setHeaderMapper(headerMapper);
return converter;
}
}
DefaultKafkaHeaderMapper 中的注释
/**
* Set to true to encode String-valued headers as JSON ("..."), by default just the
* raw String value is converted to a byte array using the configured charset. Set to
* true if a consumer of the outbound record is using Spring for Apache Kafka version
* less than 2.3
* @param encodeStrings true to encode (default false).
* @since 2.3
*/
public void setEncodeStrings(boolean encodeStrings) {
this.encodeStrings = encodeStrings;
}