KAFKAPYTHON消费者组会话定时出现

问题描述 投票:0回答:1

I使用Confluent-kafka v1.3.0,我在消费者组会话超时方面存在以下问题。 我的配置看起来像:

c['KAFKA'] = {
    'bootstrap.servers': 'host.docker.internal:9104',
    'consumer': {
        'group.id': 'consumer',
        'enable.auto.commit': True,
        'default.topic.config': {
            'auto.offset.reset': 'earliest
        },
        'heartbeat.interval.ms': 100000,
        'max.poll.interval.ms': 300000,
        'session.timeout.ms': 100000
    },
}

代码中的逻辑如下:

consumer.subscribe('database_changes')

with ThreadPoolExecutor(max_workers=500) as executor:
    while True:
        msg = consumer.poll(100)
        if msg is not None:
            executor.submit(process_message, msg)

函数过程中的编码等待几个ms,因为它确实很简单。一切都很好,但是每时每刻都会收到这个错误:

{"asctime":"2020-04-27 08:42:25,759","levelname":"WARNING","name":"services.kafka","message":"SESSTMOUT [rdkafka#consumer-2] [thrd:main]: Consumer group session timed out (in join-state started) after 30131 ms without a successful response from the group coordinator (broker 0, last error was Success): revoking assignment and rejoining group"}
这些重新分割极大地妨碍了整个过程。

没有人知道可能错误地设定什么?我怀疑不工作的心跳,但我不知道它是如何验证或更好的解决方案。 thanks

contruent
python apache-kafka kafka-consumer-api producer-consumer confluent-platform
1个回答
2
投票
,heartbeat.interval.ms的设置应设置不高于session.timeout.ms的1/3。由于在sessions.timeout.ms值之后,假定消费者已死亡,因此建议等待至少三个心跳来假设。

© www.soinside.com 2019 - 2025. All rights reserved.