Spring Boot:Kafka 健康指标

问题描述 投票:0回答:3

我有类似下面的东西,效果很好,但我更喜欢在不发送任何消息的情况下检查运行状况(不仅仅是检查套接字连接)。 我知道 Kafka 有像 KafkaHealthIndicator 这样开箱即用的东西,有人有使用它的经验或例子吗?

   public class KafkaHealthIndicator implements HealthIndicator {
   private final Logger log = LoggerFactory.getLogger(KafkaHealthIndicator.class);

   private KafkaTemplate<String, String> kafka;

   public KafkaHealthIndicator(KafkaTemplate<String, String> kafka) {
   this.kafka = kafka;
   }

  @Override
  public Health health() {
  try {
     kafka.send("kafka-health-indicator", "❥").get(100, TimeUnit.MILLISECONDS);
  } catch (InterruptedException | ExecutionException | TimeoutException e) {
      return Health.down(e).build();
  }
  return Health.up().build();
 }
}
java spring spring-boot apache-kafka spring-kafka
3个回答
11
投票

为了触发运行状况指示器,请从未来对象之一检索数据,否则即使 Kafka 关闭,指示器也是

UP
!!!

当 Kafka 未连接时,future.get() 会抛出异常,从而设置此指示器

down

@Configuration
public class KafkaConfig {

    @Autowired
    private KafkaAdmin kafkaAdmin;

    @Bean
    public AdminClient kafkaAdminClient() {
        return AdminClient.create(kafkaAdmin.getConfigurationProperties());
    }

    @Bean
    public HealthIndicator kafkaHealthIndicator(AdminClient kafkaAdminClient) {
        final DescribeClusterOptions options = new DescribeClusterOptions()
            .timeoutMs(1000);

        return new AbstractHealthIndicator() {
            @Override
            protected void doHealthCheck(Health.Builder builder) throws Exception {
                DescribeClusterResult clusterDescription = kafkaAdminClient.describeCluster(options);

                // In order to trip health indicator DOWN retrieve data from one of
                // future objects otherwise indicator is UP even when Kafka is down!!!
                // When Kafka is not connected future.get() throws an exception which 
                // in turn sets the indicator DOWN.
                clusterDescription.clusterId().get();
                // or clusterDescription.nodes().get().size()
                // or clusterDescription.controller().get();

                builder.up().build();

                // Alternatively directly use data from future in health detail.
                builder.up()
                        .withDetail("clusterId", clusterDescription.clusterId().get())
                        .withDetail("nodeCount", clusterDescription.nodes().get().size())
                        .build();
            }
        };
    }

}

1
投票

使用 AdminClient API 通过描述集群和/或您将与之交互的主题来检查集群的运行状况,并验证这些主题是否具有所需数量的同步副本,例如

Kafka 具有开箱即用的 KafkaHealthIndicator 功能

事实并非如此。 Spring 的 Kafka 集成 可能


0
投票

为此类应用程序构建运行状况检查时需要考虑一些注意事项。

首先考虑何时以及为什么应该检查 Kafka 代理的运行状况。如果您的应用程序可以在没有 Kafka 的情况下继续运行,那么您不希望您的应用程序在 Kafka 关闭时被视为关闭。在这种情况下,您可能仍然需要进行健康检查,但要小心如何使用它。

其次,生产 Kafka 集群将拥有多个代理,并且将以提供高可用性的方式进行设置。如果其中一位经纪人出现故障,您的消费者和生产者很可能会继续正常运行。可能会出现暂时的情况,您可能会看到一些失败的请求,但在大多数情况下,我观察到生产系统在单个代理关闭的情况下运行得很好。通常需要发生一些重大灾难才会导致整个 Kafka 集群宕机。

第三,管理客户端本身可能会超时并面临其他问题,这些问题可能导致其任何方法抛出异常。如果发生这种情况,集群并未关闭,因此您可能需要考虑暂时性故障。

这里有一些简单的伪代码,您可以将其用于健康检查功能:

    N = 3
    failureCounter = 0

    try {
        adminClient = AdminClient.create()
        adminClient.describeCluster()
        failureCounter = 0
        // health up
    } catch (KafkaException e) {
        // Keep track of how many times KafkaException is thrown successively
        failureCounter++
        if (failureCounter >= N) { // N successive failures might indicate a problem
            // health down
        }
    }

describeCluster()
方法足以作为检查。但此外,您可能还想使用
describeAcls()
方法来检查是否还设置了特定的 ACL,因为任何 ACL 问题都可能阻止您的生产者或消费者正常运行。

我在这里回答了类似的问题:https://stackoverflow.com/a/79349717/5468867

© www.soinside.com 2019 - 2024. All rights reserved.