我目前正在开发一个具有微服务架构的 Spring Boot 应用程序。我有一个生成消息的身份验证服务和一个使用消息的用户服务。我在反序列化消费者中的消息时遇到问题。
我目前在 2 个事件上生成消息
public record UserDTO(
UUID id,
String username,
String email,
Role role
) {
}
// Note: I use same name for records accros service
public record EmailChangeDTO(
String username,
@NonNull String email
) {
}
我对键使用字符串序列化,对值使用 JsonSerialization。但是当我尝试在用户服务中反序列化即将到来的消息时,我遇到了以下错误
Caused by: java.lang.IllegalArgumentException: The class 'com.example.authservice.Dto.EmailChangeDTO' is not in the trusted packages: [java.util, java.lang, com.example.userservice.DTO]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
用户注册也一样。
我的消费者如下所示
package com.example.userservice.Config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String,Object> consumerFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "user-service-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES,"com.userservice.userservice.DTO");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
我什至尝试将来自身份验证的包添加到受信任的包中,但出现以下错误
Caused by: java.lang.ClassNotFoundException: com.example.authservice.Dto.UserDTO
但是当尝试将密钥反序列化为字符串,然后使用 objectmapper 将其映射到用户服务中的 DTO 时,它可以完美地工作。
@KafkaListener(topics = "customer-registration", groupId = "user-service-group")
public void consumeUserRegistration(String message) {
try {
log.info("Received user registration event: {}", message);
UserDTO userDTO = objectMapper.readValue(message, UserDTO.class);
userService.createUser(userDTO);
} catch (JsonProcessingException e) {
log.error("Error processing user registration event", e);
}
}
我应该如何配置 Kafka 的 JsonDeserializer 以正确信任多个包中的 DTO 类?
在这种情况下使用 ObjectMapper 进行手动反序列化是一个好方法,还是有更有效的方法?
因此,您发送了
com.example.authservice.Dto.UserDTO
,但期待 com.userservice.userservice.DTO.UserDTO
。这绝对与受信任的包无关,因为您完全尝试忽略生产者发送的类型信息。为此,您需要研究类型映射:https://docs.spring.io/spring-kafka/reference/kafka/serdes.html#serdes-mapping-types
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS,
"com.example.authservice.Dto.UserDTO:com.userservice.userservice.DTO.UserDTO");