为什么Kafka JsonSerializer无法序列化kafka ProducerRecord?

问题描述 投票:0回答:1

我正在尝试使用 Spring Kafka

JsonSerializer
从生产者发送 JSON 对象,但在发送与
ProducerRecord
序列化相关的消息时遇到异常。我期望
JsonSerializer
处理我的消息的序列化,但在尝试序列化
ProducerRecord
时失败。这是我的设置:

生产者配置:

import com.app.integration.impl.dto.TransitECS;
import com.app.integration.impl.util.KafkaConstants;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        return Map.of(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class
        );
    }

    @Bean
    public ProducerFactory<String, TransitECS> transitECSProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, TransitECS> transitECSKafkaTemplate() {
        KafkaTemplate<String, TransitECS> kafkaTemplate = new KafkaTemplate<>(transitECSProducerFactory());
        kafkaTemplate.setDefaultTopic(KafkaConstants.TRANSIT_ECS_TOPIC);
        return kafkaTemplate;
    }
}

发送方式:

@RestController
@RequestMapping("/api/v1/topic")
@RequiredArgsConstructor
public class TopicController {
    private final KafkaTemplate<String, TransitECS> transitECSKafkaTemplate;

    /**
     * This method is used to send the transit ECS to the topic.
     *
     * @param transitECS The transit ECS to be sent.
     */
    @PostMapping("/send/transitECS")
    public CompletableFuture<SendResult<String, TransitECS>> sendTransit(@Valid @RequestBody TransitECS transitECS) {
        return this.transitECSKafkaTemplate.sendDefault(transitECS);
    }
}

异常消息:

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.apache.kafka.clients.producer.ProducerRecord and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: org.springframework.kafka.support.SendResult["producerRecord"])

导致异常的代码:

package com.fasterxml.jackson.databind.ser.impl;

public class UnknownSerializer
    extends ToEmptyObjectSerializer // since 2.13
{
   ...

    @Override
    public void serialize(Object value, JsonGenerator gen, SerializerProvider ctxt) throws IOException
    {
        // 27-Nov-2009, tatu: As per [JACKSON-201] may or may not fail...
        if (ctxt.isEnabled(SerializationFeature.FAIL_ON_EMPTY_BEANS)) {
            failForEmpty(ctxt, value); // <------ IS ENTERING HERE
        }
        super.serialize(value, gen, ctxt);
    }

    protected void failForEmpty(SerializerProvider prov, Object value)
            throws JsonMappingException {
        Class<?> cl = value.getClass();
        if (NativeImageUtil.needsReflectionConfiguration(cl)) {
            prov.reportBadDefinition(handledType(), String.format(
                    "No serializer found for class %s and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS). This appears to be a native image, in which case you may need to configure reflection for the class that is to be serialized",
                    cl.getName()));
        } else {
            prov.reportBadDefinition(handledType(), String.format(
                    "No serializer found for class %s and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)",
                    cl.getName()));
        }
    }
}

异常表明 Jackson 正在尝试序列化

ProducerRecord
,其中包含我的有效负载对象和其他 kafka 内容(标头、密钥等)。 重点是,
org.springframework.kafka.support.serializer.JsonSerializer
怎么可能无法序列化
org.apache.kafka.clients.producer.ProducerRecord
类呢?


Spring Boot 版本:

3.0.1

如有任何建议或见解,我们将不胜感激!


我尝试过的:

  • 禁用
    SerializationFeature.FAIL_ON_EMPTY_BEANS
    但没有成功。
    @Bean
    public ProducerFactory<String, TransitECS> transitECSProducerFactory() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
        mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
        return new DefaultKafkaProducerFactory<>(
                producerConfigs(),
                new StringSerializer(),
                new JsonSerializer<>(mapper)
        );
    }
spring-boot jackson spring-kafka
1个回答
0
投票

[已解决]

我已经解决了创建一个 ObjectMapper 类型的 Bean,而不是在 DefaultKafkaProducerFactory Bean 定义期间将其传递给 JsonSerializer 构造函数。

@Configuration
public class ObjectMapperConfig {
    @Bean
    @Primary
    public ObjectMapper objectMapper() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
        mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
        return mapper;
    }
}

© www.soinside.com 2019 - 2024. All rights reserved.