我想为 Kafka 中消费者(事件侦听器)的当前滞后发布一个指标“量表”。我想监控消息的消费速度是否很快,或者是否太慢。这些指标最终会与CloudWatch连接。我正在使用 Springboot、Micrometer 库、CloudWatch。
在事件处理程序或侦听器(消费者)中,我执行了以下操作 -
@KafkaListener(topics = "your-topic",filter ="my-filter")
public void listenerExample(CloudEvent event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topicName,
Consumer<?, ?> consumer) {
String currentLag = consumer.metrics().values().stream().filter(m -> "records-lag- max".equals(m.metricName().name()))
.map(Metric::metricValue).map(Object::toString).distinct()
.collect(Collectors.joining(";", "[Kafka current consumer lag]", " records"));
LOGGER.info(lag);
//The currentLag is a String here. We need to convert it to an Integer but the String is pretty big due to //the prefix and the suffix. Do we need to substring the number from the string?
meterRegistry.gauge("consumer.lag",Integer.parseInt(currentLag));
}
借助 CloudWatchConfig()、CloudWatchMeterRegistry() 完成与 CloudWatch 的连接,并使用此注册表进一步发布指标。
MeterRegistry registry = new SimpleMeterRegistry();
registry = new CloudWatchMeterRegistry(new CloudWatchConfig() {
private final Map<String, String> configuration =
Map.of("cloudwatch.namespace", "Name/" + namespace, "cloudwatch.step", Duration.ofMinutes(5).toString());
但是,我在 CloudWatch 中没有看到任何指标“consumer.lag”。我做得正确吗?我还是不知道应该如何实施
meterRegistry.gauge("consumer.lag",Integer.parseInt(currentLag));
KafkaMetrics
,它应该为您提供此信息(以及更多信息)。
另外,请小心自己这样做:
Gauge
默认只维护对正在观察的对象的弱引用,以免阻止对象的垃圾收集。当GC发生并且对象被收集时,它将不再可供Gauge使用,您需要维护强引用,请参阅文档:Gauges。consumer.metrics()
在Kafka中非常棘手。 Kafka 客户端有时会删除对所有这些指标的引用,并在 Micrometer 或您尝试使用它们时重新创建它们。如果您查看KafkaMetrics
,它会尝试通过重新注册仪表来解决这个问题(破解这个问题)。Kafka 客户端问题的真正解决方案是 Kafka 使用 Micrometer 来检测自身,如果您认为您也可以从中受益,请对此问题发表评论,以便 Kafka 开发人员可以添加此/接受 PR:https ://issues.apache.org/jira/browse/KAFKA-15191