Spring Kafka Consumer 在 LeaveGroup 请求后无法重新加入

问题描述 投票:0回答:1

我们在高负载的生产环境中使用Spring Kafka。 我们使用了 @KafkaListener 注释并创建了这些侦听器作为 Spring Boot 服务的一部分。

这些消费者经常向协调器发送 LeaveGroup 请求,然后消费者无限期地挂起/卡住,没有任何日志或错误。在这种情况下,我们剩下的唯一选择是重新部署该特定实例。

这是我们看到的一系列日志:

Attempt to heartbeat failed since group is rebalancing
Attempt to heartbeat failed since group is rebalancing
This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
Member consumer-1-0d0b333d-9e5b-4038-ac3f-e5d59c4e9d19 sending LeaveGroup request to coordinator 172.25.128.233:9092 (id: 2147483560 rack: null)

附加信息: 我们正在使用以下 Kafka 配置:

  • 批量大小:10
  • 最大轮询间隔:12 分钟
  • 心跳:8秒
  • 最大分区大小:10 MB
  • 会话超时:25秒
  • 请求超时:20分钟

基本上,我们想知道消费者一旦离开群组,为什么不再发送加入请求?

apache-kafka spring-kafka
1个回答
0
投票

您应该了解重试会挂起消费者线程(如果使用 BackOffPolicy)。重试期间不会调用 Consumer.poll()。 Kafka 有两个属性来确定消费者的健康状况。 session.timeout.ms 用于确定消费者是否处于活动状态。从 kafka-clients 版本 0.10.1.0 开始,心跳在后台线程上发送,因此缓慢的消费者不再影响这一点。 max.poll.interval.ms(默认值:五分钟)用于确定消费者是否出现挂起(处理上次轮询的记录花费太长时间)。如果 poll() 调用之间的时间超过此时间,代理将撤销分配的分区并执行重新平衡。对于冗长的重试序列,如果有后退,这种情况很容易发生。

从版本 2.1.3 开始,您可以通过将状态重试与 SeekToCurrentErrorHandler 结合使用来避免此问题。在这种情况下,每次传递尝试都会将异常抛出回容器,错误处理程序重新寻找未处理的偏移量,并且下一次 poll() 会重新传递相同的消息。这避免了超过 max.poll.interval.ms 属性的问题(只要尝试之间的单个延迟不超过它)。因此,当您使用 ExponentialBackOffPolicy 时,必须确保 maxInterval 小于 max.poll.interval.ms 属性。要启用有状态重试,您可以使用采用有状态布尔参数(将其设置为 true)的 RetryingMessageListenerAdapter 构造函数。当您配置侦听器容器工厂(对于 @KafkaListener)时,请将工厂的 statefulRetry 属性设置为 true。

https://docs.spring.io/spring-kafka/reference/html/#stateful-retry

© www.soinside.com 2019 - 2024. All rights reserved.