我们有一个使用 Reactor Kafka 的 Spring Boot 项目和用于消费的
KafkaReceiver
,我们希望收集并发出底层消费者指标。看起来我们可以利用 KafkaReceiver.doOnConsumer()
来实现这样的效果:
receiver.doOnConsumer(Consumer::metrics)
.flatMapIterable(Map::entrySet)
.map(m -> Tuples.of(m.getKey(), m.getValue()))
如果这是最好的方法,我不确定定期执行此操作的最佳方法是什么。
我注意到还有一个版本的
KafkaReceiver.create()
工厂方法需要自定义 ConsumerFactory
,也许有某种方法可以使用它在创建时向 Micrometer 注册底层 Kafka 消费者?我对 Spring Boot 很陌生,对 Kafka Reactor 也比较陌生,所以我不太确定。
这是迄今为止我的代码片段,以了解更多上下文:
KafkaReceiver.create(receiverOptions(Collections.singleton(topic)).commitInterval(Duration.ZERO))
.receive()
.groupBy(m -> m.receiverOffset().topicPartition())
.flatMap(partitionFlux -> partitionFlux.publishOn(this.scheduler)
.map(r -> processEvent(partitionFlux.key(), r))
.concatMap(this::commit))
.doOnCancel(this::close)
.doOnError(error -> LOG.error("An error was encountered", error))
.blockLast();
如果采用
doOnConsumer()
方法有意义,我们可能会加入 doOnNext()
但随后我们会收集并发出每个事件的指标,这太多了,如果我们可以交错和批量处理会更好。
任何建议或提示表示赞赏,谢谢。
使用
receive
处理 Kafka 消息时,您可以单独处理每个 ConsumerRecord
。如果您喜欢批量处理消息,您有两种选择:使用 receiveAutoAck
自动确认记录,或 receiveBatch 手动确认或提交记录。这样,您可以在批次级别和每个单独的记录上执行操作。
使用
receiveAutoAck
的示例:
KafkaReceiver.create(receiverOptions(Collections.singleton(topic)).commitInterval(Duration.ZERO))
.receiveAutoAck()
.flatMap(this::processBatchActions)
.concatMap(this::processMessageActions)
.doOnCancel(this::close)
.doOnError(error -> LOG.error("An error was encountered", error))
.subscribe();
private Flux<ConsumerRecord<String, String>> processBatchActions(Flux<ConsumerRecord<String, String>> recordFlux) {
// Do your things for each batch
}
private Mono<Void> processMessageActions(ConsumerRecord<String, String> record) {
// Do your things for each message
}
使用
receiveBatch
的示例:
KafkaReceiver.create(receiverOptions(Collections.singleton(topic)).commitInterval(Duration.ZERO))
.receiveBatch()
.flatMap(this::processBatchActions)
.concatMap(this::processMessageActions)
.doOnCancel(this::close)
.doOnError(error -> LOG.error("An error was encountered", error))
.subscribe();
private Flux<ReceiverRecord<String, String>> processBatchActions(Flux<ReceiverRecord<String, String>> recordFlux) {
// Do your things for each batch
return recordFlux;
}
private Mono<Void> processMessageActions(ReceiverRecord<String, String> record) {
// Do your things for each message
record.receiverOffset().acknowledge();
return record.receiverOffset().commit(); //acknowledge records or commit offsets
}
使用receiveAutoAck时,偏移量提交会在批处理完成后自动发生,或者根据配置的commitInterval和commitBatchSize发生。相反,使用 receiveBatch 方法,您可以通过手动确认每条记录来控制偏移量提交,并且 commitInterval 和 commitBatchSize 设置仍然适用,或者您可以根据需要手动提交偏移量。
更多信息请参考官方文档: https://projectreactor.io/docs/kafka/snapshot/reference/