Confluence.Kafka.KafkaException:代理:指定的组生成 ID 无效

问题描述 投票:0回答:2

环境

  • 3节点Kafka集群
    • 亚马逊MSK
    • v2.3
  • 1 个主题
    • 6个分区
  • 1 个消费者组有 2 个消费者
    • 在 Kubernetes 中运行
    • Confluence .NET SDK 1.2.2
    • bootstrap.servers
      group.id
      外,均为默认设置。

问题

首先,我的一位消费者遇到了以下异常。

Confluent.Kafka.KafkaException: Broker: Specified group generation id is not valid
   at Confluent.Kafka.Impl.SafeKafkaHandle.Commit(IEnumerable`1 offsets)
   at Confluent.Kafka.Consumer`2.Commit(IEnumerable`1 offsets)

异常被捕获,消费者应该重试,但应用程序却处于空闲状态。 容器仍在运行,但不再消耗任何消息。

更奇怪的是,代理从不重新分配该消费者的分区,因此这些分区上的消费者滞后开始增长。 看起来消费者既是活的(因为代理没有重新分配其分区)又是死的(因为它无法提交其偏移量或消耗更多消息)。 如果我们干预并手动重新启动消费者,那么分区就会重新分配,情况就会恢复正常。

我不完全确定如何看待上述例外情况。 谷歌没有提供太多。 我拥有的最相关的线索是 GitHub 中的这个问题,其中涉及代理重新启动。 据我所知,在我的情况下并没有发生这种情况。 任何帮助将不胜感激。

apache-kafka
2个回答
0
投票

当消费者没有及时响应心跳请求时,它会被踢出组并应该重新加入。如果消息处理在消费者运行的同一线程上花费很长时间,则可能会发生这种情况。

当自动提交关闭并且消费者仅在处理消息(在同一线程上)后才确认时,消费者可能已经被踢出组并尝试使用以前的(已经错误的)组ID进行提交。

这就是我的情况。因此,通过使用自动提交,问题就可以自行解决。但是,如果逻辑需要仅在处理消息时才确认消息,则此解决方案是不可接受的。就我而言,我通过在不同的线程上处理并使用类似信号量的机制来通知消费者在处理完成时使用下一条消息来解决问题。


-2
投票
手动提交

并设置了EnableAutoCommit = false

不知何故,对于一个偏移量,提交可能被执行两次。我删除了消费者的手动提交并设置了 

EnableAutoCommit = true

之后就成功了。

© www.soinside.com 2019 - 2024. All rights reserved.