使用 Reactor Kafka 定期轮询消费者指标

问题描述 投票:0回答:1

我们有一个使用 Reactor Kafka 的 Spring Boot 项目和用于消费的

KafkaReceiver
,我们希望收集并发出底层消费者指标。看起来我们可以利用
KafkaReceiver.doOnConsumer()
来实现这样的效果:

receiver.doOnConsumer(Consumer::metrics)
                .flatMapIterable(Map::entrySet)
                .map(m -> Tuples.of(m.getKey(), m.getValue()))

如果这是最好的方法,我不确定定期执行此操作的最佳方法是什么。

我注意到还有一个版本的

KafkaReceiver.create()
工厂方法需要自定义
ConsumerFactory
,也许有某种方法可以使用它在创建时向 Micrometer 注册底层 Kafka 消费者?我对 Spring Boot 很陌生,对 Kafka Reactor 也比较陌生,所以我不太确定。

这是迄今为止我的代码片段,以了解更多上下文:

KafkaReceiver.create(receiverOptions(Collections.singleton(topic)).commitInterval(Duration.ZERO))
    .receive()
    .groupBy(m -> m.receiverOffset().topicPartition())
    .flatMap(partitionFlux -> partitionFlux.publishOn(this.scheduler)
        .map(r -> processEvent(partitionFlux.key(), r))
        .concatMap(this::commit))
    .doOnCancel(this::close)
    .doOnError(error -> LOG.error("An error was encountered", error))
    .blockLast();

如果采用

doOnConsumer()
方法有意义,我们可能会加入
doOnNext()
但随后我们会收集并发出每个事件的指标,这太多了,如果我们可以交错和批量处理会更好。

任何建议或提示表示赞赏,谢谢。

spring-boot apache-kafka kafka-consumer-api metrics reactor
1个回答
0
投票

使用

receive
处理 Kafka 消息时,您可以单独处理每个
ConsumerRecord
。如果您喜欢批量处理消息,您有两种选择:使用
receiveAutoAck
自动确认记录,或 receiveBatch 手动确认或提交记录。这样,您可以在批次级别和每个单独的记录上执行操作。

使用

receiveAutoAck
的示例:

KafkaReceiver.create(receiverOptions(Collections.singleton(topic)).commitInterval(Duration.ZERO))
        .receiveAutoAck()
        .flatMap(this::processBatchActions)
        .concatMap(this::processMessageActions)
        .doOnCancel(this::close)
        .doOnError(error -> LOG.error("An error was encountered", error))
        .subscribe();

private Flux<ConsumerRecord<String, String>> processBatchActions(Flux<ConsumerRecord<String, String>> recordFlux) {
    // Do your things for each batch
}

  private Mono<Void> processMessageActions(ConsumerRecord<String, String> record) {
    // Do your things for each message
}

使用

receiveBatch
的示例:

KafkaReceiver.create(receiverOptions(Collections.singleton(topic)).commitInterval(Duration.ZERO))
        .receiveBatch()
        .flatMap(this::processBatchActions)
        .concatMap(this::processMessageActions)
        .doOnCancel(this::close)
        .doOnError(error -> LOG.error("An error was encountered", error))
        .subscribe();

private Flux<ReceiverRecord<String, String>> processBatchActions(Flux<ReceiverRecord<String, String>> recordFlux) {
    // Do your things for each batch
    return recordFlux;
}

  private Mono<Void> processMessageActions(ReceiverRecord<String, String> record) {
    // Do your things for each message
    record.receiverOffset().acknowledge();
    return record.receiverOffset().commit(); //acknowledge records or commit offsets
}

使用receiveAutoAck时,偏移量提交会在批处理完成后自动发生,或者根据配置的commitInterval和commitBatchSize发生。相反,使用 receiveBatch 方法,您可以通过手动确认每条记录来控制偏移量提交,并且 commitInterval 和 commitBatchSize 设置仍然适用,或者您可以根据需要手动提交偏移量。

更多信息请参考官方文档: https://projectreactor.io/docs/kafka/snapshot/reference/

© www.soinside.com 2019 - 2024. All rights reserved.