我需要将自定义值序列化器设置为Spring的KafkaTemplate。值序列化器如下所示:JsonSerializer<JourneyMailExchange> serializer = new JsonSerializer<>(customObjectMapper);
(例如,为了应用PropertyNamingStrategy.SNAKE_CASE
)
可以使用KafkaProducer轻松完成:
new KafkaProducer<>(properties, Serdes.String().serializer(), valueSerializer);
但我没有找到与KafkaTemplate
相同的可能性。我只看到props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
的ProducerFactory
属性,但它不是我正在寻找的(不可能提供特定的实例)。
我们通过以下方式创建KafkaTemplate:
@Bean
public ProducerFactory<String, Object> producerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaProducerFactory<>(producerConfigs(kafkaProperties.getDefaultSettings()));
}
private static Map<String, Object> producerConfigs(Map<String, String> defaultSettings) {
Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setDefaultTopic("topicName");
return kafkaTemplate;
}
还发现了以下方法:
kafkaTemplate.setMessageConverter(new StringJsonMessageConverter(customObjectMapper));
但是只有当我们使用Spring的Message
实例并且如果我们使用键和值调用send方法时忽略提供的转换,它才应用转换。
spring-kafka版本:2.1.0.RELEASE
最后我想通过在构造函数中提供值序列化器来配置多个DefaultKafkaProducerFactory
可以做到这一点:
public DefaultKafkaProducerFactory(Map<String, Object> configs, Serializer<K> keySerializer,
Serializer<V> valueSerializer)
特定值序列化程序的最终版本如下所示(已配置的ProducerFactory
的数量与所需的不同值序列化程序的数量相同):
@Bean
public ProducerFactory<String, Object> producerFactoryWithSnakeCaseValueSerializer(KafkaProperties kafkaProperties) {
Map<String, Object> props = producerConfigsWithSnakeCaseValueSerializer(kafkaProperties.getDefaultSettings());
JsonSerializer<Object> valueSerializer = new JsonSerializer<>(createObjectMapper(SNAKE_CASE));
valueSerializer.setAddTypeInfo(false);
return new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);
}
private static Map<String, Object> producerConfigsWithSnakeCaseValueSerializer(Map<String, String> defaultSettings) {
Map<String, Object> props = new HashMap<>(defaultSettings);
...
return props;
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerConfigsWithSnakeCaseValueSerializer) {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerConfigsWithSnakeCaseValueSerializer);
kafkaTemplate.setDefaultTopic("topicName");
return kafkaTemplate;
}