bootstrap.servers
和 group.id
外,均为默认设置。首先,我的一位消费者遇到了以下异常。
Confluent.Kafka.KafkaException: Broker: Specified group generation id is not valid
at Confluent.Kafka.Impl.SafeKafkaHandle.Commit(IEnumerable`1 offsets)
at Confluent.Kafka.Consumer`2.Commit(IEnumerable`1 offsets)
异常被捕获,消费者应该重试,但应用程序却处于空闲状态。 容器仍在运行,但不再消耗任何消息。
更奇怪的是,代理从不重新分配该消费者的分区,因此这些分区上的消费者滞后开始增长。 看起来消费者既是活的(因为代理没有重新分配其分区)又是死的(因为它无法提交其偏移量或消耗更多消息)。 如果我们干预并手动重新启动消费者,那么分区就会重新分配,情况就会恢复正常。
我不完全确定如何看待上述例外情况。 谷歌没有提供太多。 我拥有的最相关的线索是 GitHub 中的这个问题,其中涉及代理重新启动。 据我所知,在我的情况下并没有发生这种情况。 任何帮助将不胜感激。
当消费者没有及时响应心跳请求时,它会被踢出组并应该重新加入。如果消息处理在消费者运行的同一线程上花费很长时间,则可能会发生这种情况。
当自动提交关闭并且消费者仅在处理消息(在同一线程上)后才确认时,消费者可能已经被踢出组并尝试使用以前的(已经错误的)组ID进行提交。
这就是我的情况。因此,通过使用自动提交,问题就可以自行解决。但是,如果逻辑需要仅在处理消息时才确认消息,则此解决方案是不可接受的。就我而言,我通过在不同的线程上处理并使用类似信号量的机制来通知消费者在处理完成时使用下一条消息来解决问题。
并设置了EnableAutoCommit = false
。
EnableAutoCommit = true
。
之后就成功了。