我有一个高吞吐量kafka生产者的用例,我想每秒钟推送数千条json消息。
我有一个3个节点的kafka集群,我使用最新的kafka-python库,我有以下方法来生成消息。
def publish_to_kafka(topic):
data = get_data(topic)
producer = KafkaProducer(bootstrap_servers=['b1', 'b2', 'b3'],
value_serializer=lambda x: dumps(x).encode('utf-8'), compression_type='gzip')
try:
for obj in data:
producer.send(topic, value=obj)
except Exception as e:
logger.error(e)
finally:
producer.close()
我的主题有3个分区。
方法有时能正常工作,有时会出现错误 "KafkaTimeoutError: Failed to update metadata after 60.0 secs."。
我需要更改哪些设置才能让它顺利工作?
如果一个topic不存在,而你试图对该topic进行生产,并且自动创建topic被设置为false,那么就会出现这种情况。
可能的解决方法。 在broker配置中(server.properties) auto.create.topics.enable=true
(注意,这在Confluent Kafka中是默认的)
另一种情况可能是网络拥堵或速度问题,如果与Kafka代理更新元数据的时间超过60秒。
可能的解决方法。 生产者配置。max.block.ms = 1200000
(120秒,例如)
检查你的经纪人是否因为某些原因(例如,太多负载)而停止工作,以及为什么他们不能响应元数据请求。你可以在server.log文件中看到它们,通常。