KafkaTimeoutError: 60.0秒后更新元数据失败。

问题描述 投票:0回答:1

我有一个高吞吐量kafka生产者的用例,我想每秒钟推送数千条json消息。

我有一个3个节点的kafka集群,我使用最新的kafka-python库,我有以下方法来生成消息。

def publish_to_kafka(topic):
    data = get_data(topic)
    producer = KafkaProducer(bootstrap_servers=['b1', 'b2', 'b3'],
                             value_serializer=lambda x: dumps(x).encode('utf-8'), compression_type='gzip')
    try:
        for obj in data:
           producer.send(topic, value=obj)
    except Exception as e:
            logger.error(e)
    finally:
        producer.close()

我的主题有3个分区。

方法有时能正常工作,有时会出现错误 "KafkaTimeoutError: Failed to update metadata after 60.0 secs."。

我需要更改哪些设置才能让它顺利工作?

python apache-kafka kafka-producer-api kafka-python
1个回答
0
投票
  1. 如果一个topic不存在,而你试图对该topic进行生产,并且自动创建topic被设置为false,那么就会出现这种情况。

    可能的解决方法。 在broker配置中(server.properties) auto.create.topics.enable=true (注意,这在Confluent Kafka中是默认的)

  2. 另一种情况可能是网络拥堵或速度问题,如果与Kafka代理更新元数据的时间超过60秒。

    可能的解决方法。 生产者配置。max.block.ms = 1200000 (120秒,例如)

  3. 检查你的经纪人是否因为某些原因(例如,太多负载)而停止工作,以及为什么他们不能响应元数据请求。你可以在server.log文件中看到它们,通常。

© www.soinside.com 2019 - 2024. All rights reserved.