当 kafka 节点从集群中关闭时,面临连接此 kafka 集群的 kafka 流问题

问题描述 投票:0回答:1

我使用3个节点的kafka集群,并且有多个kafka流和生产者连接。 当其中一个节点从该 kafka 集群中断开时,我遇到问题。

我面临的问题就像多个 kafka 流处于连接状态,但当其中一个 kafka 节点出现故障时,它们不会消耗消息。即使流处于连接状态,所有消息都在 kafka 主题队列中。

详情:-

  1. 每个主题的replication_factor配置为3。
  2. 每个主题有 6 个或 6 个以上分区。
  3. 所有 3 个 kafka 节点均配置为 BrokerController
  4. kafka 流线程数量为 3 个及以上。
  5. Kafka集群部署在kubernetes环境中。
  6. 在引导服务器中也提到了所有kafka节点的地址。

我在 kafka 流中添加了以下属性,并通过删除 kafka 节点 pod 进行了测试。

特性:-

  1. (StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG,30000)。
  2. (StreamsConfig.RETRY_BACKOFF_MS_CONFIG,3000)。
  3. (StreamsConfig.TASK_TIMEOUT_MS_CONFIG,300000)。

但仍然面临同样的问题,卡夫卡流已连接但未消费消息,并且消息存储在卡夫卡主题队列中。

要开始消费消息,需要重新启动 kafka 流。

有人可以帮助进行哪些更改,以便即使其中一个 kafka 节点出现故障,kafka 流仍会继续消费消息。

apache-kafka kafka-consumer-api apache-kafka-streams
1个回答
0
投票

我建议检查这两件事,因为它们可能是添加/删除 pod(不一定是 Kafka 节点)或 Kafka 尝试重新平衡主题分区到 pod 时无流处理的根本原因:

检查您的应用程序/服务,当 Kafka 进入重新平衡时,您的流不会停止处理,因为并非每个状态更改都会导致流停止并退出。例如,如果您使用闩锁机制,请检查闩锁上的状况,并“针对”重新平衡状态强化它,如下所示:

kafkaStreams.setStateListener((state, previousState) -> { 
  // example; more conditions may be required here 
  if (KafkaStreams.State.RUNNING == previousState && state != KafkaStreams.State.REBALANCING) { 
                latch.countDown(); // allows the Stream thread to close and exit.  
            }
});

还要检查您使用的每个主题的消费者组 ID:验证每个主题都有自己的消费者组 ID,因为对多个主题使用相同的组 ID 可能会导致重新平衡循环。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.