无法在reactor kafka中使用schemaRegistry

问题描述 投票:0回答:1

我正在使用反应器 kafka 设置 kafka 消费者。 Producer 与 kafka 模式注册表集成



    @Value("${spring.kafka.schemaRegistryUrls}")
    private String schemaRegistryEnvVarValue;


 @Bean
    public ReceiverOptions<String, MyProto> kafkaReceiverOptionsFloor(
            KafkaProperties kafkaProperties) {

        final Map<String, Object> kafkaConsumerProperties =
                kafkaProperties.buildConsumerProperties();
        for (Map.Entry<String, KafkaProperties.Consumer> entry :
                kafkaConsumerPropertiesConfig.getConsumer().entrySet()) {
            if (kafkaTopics.contains(entry.getKey())) {
                kafkaConsumerProperties.putAll(entry.getValue().buildProperties());
            }
        }
        kafkaConsumerProperties.put("schema.registry.url", schemaRegistryEnvVarValue);
        final ReceiverOptions<String, MyProto> basicReceiverOptions =
                ReceiverOptions.<String, MyProto>create(
                                kafkaConsumerProperties)
                        .withValueDeserializer(new MyProtoDeserializer())
                        // disabling auto commit, since we are managing committing once
                        // record is
                        // processed
                        .commitInterval(Duration.ZERO)
                        .commitBatchSize(0);

        kafkaConsumerProperties.forEach((k, v) -> log.debug("k2 {} v2 {}", k, v));

        return basicReceiverOptions
                .subscription(kafkaTopicsFloor)
                .addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
                .addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));
    }


  @Bean
    public ReactiveKafkaConsumerTemplate<String, MyProto>
            reactiveKafkaConsumerTemplate(
                    ReceiverOptions<String, MyProto>
                            kafkaReceiverOptions) {
        return new ReactiveKafkaConsumerTemplate<>(kafkaReceiverOptions);
    }

我收到异常,因为协议消息包含无效标签(零)。 它能够在我的单元测试中解析(无需架构注册表)

看起来 schemaregistry 没有被使用。我在这里做错了什么。

解串器如下所示

@Slf4j
public class MyProtoDeserializer implements Deserializer<MyProto> {
    public MyProtoDeserializer() {}

    /**
     * Deserializes the data to my_proto from byte array.
     *
     * @param topic
     * @param data
     * @return
     */
    @Override
    public MyProto deserialize(final String topic, final byte[] data) {
        if (data == null) {
            return null;
        }
        // TODO: Use schemaregistry and kpow
      
        try {
            return MyProto.getDefaultInstance()
                    .getParserForType()
                    .parseFrom(data);
        } catch (Exception ex) {
            log.debug("Exception in MyProto parse {}", ex.getMessage());
            return MyProto.getDefaultInstance();
        }
    }
}
java apache-kafka project-reactor confluent-schema-registry reactor-kafka
1个回答
0
投票

反应堆不是问题。

schema.registry.url
只是 Confluence Deserializer 类的一个属性。您没有在反序列化器中实现
configure
函数,因此您忽略了该属性。同样,直接调用
parseFrom
并不使用任何 HTTP 客户端与注册表交互。

导入库,而不是编写自己的

https://mvnrepository.com/artifact/io.confluence/kafka-protobuf-serializer/7.4.0

此外,这是如何使用该属性自动配置 Spring Boot

spring:
  kafka:
    consumer:
      value-deserializer: io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
      properties:
        "[schema.registry.url]": http://...

参考https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.additional-properties

© www.soinside.com 2019 - 2024. All rights reserved.