I使用Confluent-kafka v1.3.0,我在消费者组会话超时方面存在以下问题。 我的配置看起来像:
c['KAFKA'] = {
'bootstrap.servers': 'host.docker.internal:9104',
'consumer': {
'group.id': 'consumer',
'enable.auto.commit': True,
'default.topic.config': {
'auto.offset.reset': 'earliest
},
'heartbeat.interval.ms': 100000,
'max.poll.interval.ms': 300000,
'session.timeout.ms': 100000
},
}
代码中的逻辑如下:
consumer.subscribe('database_changes')
with ThreadPoolExecutor(max_workers=500) as executor:
while True:
msg = consumer.poll(100)
if msg is not None:
executor.submit(process_message, msg)
函数过程中的编码等待几个ms,因为它确实很简单。一切都很好,但是每时每刻都会收到这个错误:
{"asctime":"2020-04-27 08:42:25,759","levelname":"WARNING","name":"services.kafka","message":"SESSTMOUT [rdkafka#consumer-2] [thrd:main]: Consumer group session timed out (in join-state started) after 30131 ms without a successful response from the group coordinator (broker 0, last error was Success): revoking assignment and rejoining group"}
这些重新分割极大地妨碍了整个过程。 没有人知道可能错误地设定什么?我怀疑不工作的心跳,但我不知道它是如何验证或更好的解决方案。 thanks
contruent