我正在使用反应器 kafka 设置 kafka 消费者。 Producer 与 kafka 模式注册表集成
@Value("${spring.kafka.schemaRegistryUrls}")
private String schemaRegistryEnvVarValue;
@Bean
public ReceiverOptions<String, MyProto> kafkaReceiverOptionsFloor(
KafkaProperties kafkaProperties) {
final Map<String, Object> kafkaConsumerProperties =
kafkaProperties.buildConsumerProperties();
for (Map.Entry<String, KafkaProperties.Consumer> entry :
kafkaConsumerPropertiesConfig.getConsumer().entrySet()) {
if (kafkaTopics.contains(entry.getKey())) {
kafkaConsumerProperties.putAll(entry.getValue().buildProperties());
}
}
kafkaConsumerProperties.put("schema.registry.url", schemaRegistryEnvVarValue);
final ReceiverOptions<String, MyProto> basicReceiverOptions =
ReceiverOptions.<String, MyProto>create(
kafkaConsumerProperties)
.withValueDeserializer(new MyProtoDeserializer())
// disabling auto commit, since we are managing committing once
// record is
// processed
.commitInterval(Duration.ZERO)
.commitBatchSize(0);
kafkaConsumerProperties.forEach((k, v) -> log.debug("k2 {} v2 {}", k, v));
return basicReceiverOptions
.subscription(kafkaTopicsFloor)
.addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
.addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, MyProto>
reactiveKafkaConsumerTemplate(
ReceiverOptions<String, MyProto>
kafkaReceiverOptions) {
return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions);
}
我收到异常,因为协议消息包含无效标签(零)。 它能够在我的单元测试中解析(无需架构注册表)
看起来 schemaregistry 没有被使用。我在这里做错了什么。
解串器如下所示
@Slf4j
public class MyProtoDeserializer implements Deserializer<MyProto> {
public MyProtoDeserializer() {}
/**
* Deserializes the data to my_proto from byte array.
*
* @param topic
* @param data
* @return
*/
@Override
public MyProto deserialize(final String topic, final byte[] data) {
if (data == null) {
return null;
}
// TODO: Use schemaregistry and kpow
try {
return MyProto.getDefaultInstance()
.getParserForType()
.parseFrom(data);
} catch (Exception ex) {
log.debug("Exception in MyProto parse {}", ex.getMessage());
return MyProto.getDefaultInstance();
}
}
}
反应堆不是问题。
schema.registry.url
只是 Confluence Deserializer 类的一个属性。您没有在反序列化器中实现 configure
函数,因此您忽略了该属性。同样,直接调用 parseFrom
并不使用任何 HTTP 客户端与注册表交互。
导入库,而不是编写自己的
https://mvnrepository.com/artifact/io.confluence/kafka-protobuf-serializer/7.4.0
此外,这是如何使用该属性自动配置 Spring Boot
spring:
kafka:
consumer:
value-deserializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
properties:
"[schema.registry.url]": http://...