我有类似下面的东西,效果很好,但我更喜欢在不发送任何消息的情况下检查运行状况(不仅仅是检查套接字连接)。 我知道 Kafka 有像 KafkaHealthIndicator 这样开箱即用的东西,有人有使用它的经验或例子吗?
public class KafkaHealthIndicator implements HealthIndicator {
private final Logger log = LoggerFactory.getLogger(KafkaHealthIndicator.class);
private KafkaTemplate<String, String> kafka;
public KafkaHealthIndicator(KafkaTemplate<String, String> kafka) {
this.kafka = kafka;
}
@Override
public Health health() {
try {
kafka.send("kafka-health-indicator", "❥").get(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
return Health.down(e).build();
}
return Health.up().build();
}
}
为了触发运行状况指示器,请从未来对象之一检索数据,否则即使 Kafka 关闭,指示器也是
UP
!!!
当 Kafka 未连接时,future.get() 会抛出异常,从而设置此指示器
down
。
@Configuration
public class KafkaConfig {
@Autowired
private KafkaAdmin kafkaAdmin;
@Bean
public AdminClient kafkaAdminClient() {
return AdminClient.create(kafkaAdmin.getConfigurationProperties());
}
@Bean
public HealthIndicator kafkaHealthIndicator(AdminClient kafkaAdminClient) {
final DescribeClusterOptions options = new DescribeClusterOptions()
.timeoutMs(1000);
return new AbstractHealthIndicator() {
@Override
protected void doHealthCheck(Health.Builder builder) throws Exception {
DescribeClusterResult clusterDescription = kafkaAdminClient.describeCluster(options);
// In order to trip health indicator DOWN retrieve data from one of
// future objects otherwise indicator is UP even when Kafka is down!!!
// When Kafka is not connected future.get() throws an exception which
// in turn sets the indicator DOWN.
clusterDescription.clusterId().get();
// or clusterDescription.nodes().get().size()
// or clusterDescription.controller().get();
builder.up().build();
// Alternatively directly use data from future in health detail.
builder.up()
.withDetail("clusterId", clusterDescription.clusterId().get())
.withDetail("nodeCount", clusterDescription.nodes().get().size())
.build();
}
};
}
}
使用 AdminClient API 通过描述集群和/或您将与之交互的主题来检查集群的运行状况,并验证这些主题是否具有所需数量的同步副本,例如
Kafka 具有开箱即用的 KafkaHealthIndicator 功能
事实并非如此。 Spring 的 Kafka 集成 可能
为此类应用程序构建运行状况检查时需要考虑一些注意事项。
首先考虑何时以及为什么应该检查 Kafka 代理的运行状况。如果您的应用程序可以在没有 Kafka 的情况下继续运行,那么您不希望您的应用程序在 Kafka 关闭时被视为关闭。在这种情况下,您可能仍然需要进行健康检查,但要小心如何使用它。
其次,生产 Kafka 集群将拥有多个代理,并且将以提供高可用性的方式进行设置。如果其中一位经纪人出现故障,您的消费者和生产者很可能会继续正常运行。可能会出现暂时的情况,您可能会看到一些失败的请求,但在大多数情况下,我观察到生产系统在单个代理关闭的情况下运行得很好。通常需要发生一些重大灾难才会导致整个 Kafka 集群宕机。
第三,管理客户端本身可能会超时并面临其他问题,这些问题可能导致其任何方法抛出异常。如果发生这种情况,集群并未关闭,因此您可能需要考虑暂时性故障。
这里有一些简单的伪代码,您可以将其用于健康检查功能:
N = 3
failureCounter = 0
try {
adminClient = AdminClient.create()
adminClient.describeCluster()
failureCounter = 0
// health up
} catch (KafkaException e) {
// Keep track of how many times KafkaException is thrown successively
failureCounter++
if (failureCounter >= N) { // N successive failures might indicate a problem
// health down
}
}
describeCluster()
方法足以作为检查。但此外,您可能还想使用 describeAcls()
方法来检查是否还设置了特定的 ACL,因为任何 ACL 问题都可能阻止您的生产者或消费者正常运行。
我在这里回答了类似的问题:https://stackoverflow.com/a/79349717/5468867