Python的卡夫卡消费者消费习惯生产的消息

问题描述 投票:1回答:1

所以我是相当新的卡夫卡。我试图运行一个简单的卡夫卡消费者和生产者。当我运行我的消费者之前在for循环你好正确打印。但迟迟没有打印在for循环,导致我相信它不会进入在首位环和消费者不消耗来自生产者的消息。我是一个Linux系统上运行此。

谁能给上什么可能是错误的生产者或消费者的意见?我有我的显示生产者和消费者的代码都是唯一的几行代码。

这是我的制片人:

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:2181',api_version=(1,0,1))
producer.send('MyFirstTopic1', 'Hello, World!')

这是我的消费者:

from kafka import KafkaConsumer,KafkaProducer,TopicPartition,OffsetAndMetadata
consumer = KafkaConsumer(
 bootstrap_servers=['localhost:2181'],api_version=(1,0,1),
 group_id=None,
 enable_auto_commit=False,
 auto_offset_reset='smallest'
)
consumer.subscribe('MyFirstTopic1',0)
print("hello")
for message in consumer:
 print(message)

所以运行我的制片人,当它最终给出了一个error.Anyone知道这意味着什么,如果这是可能的什么是错的。

File "producer.py", line 3, in <module>
    producer.send('MyFirstTopic1', 'Hello, World!')
  File "/usr/local/lib/python3.5/site-packages/kafka/producer/kafka.py", line 543, in send
    self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
  File "/usr/local/lib/python3.5/site-packages/kafka/producer/kafka.py", line 664, in _wait_on_metadata
    "Failed to update metadata after %.1f secs." % max_wait)
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
python apache-kafka kafka-consumer-api kafka-producer-api
1个回答
1
投票

它看起来像你正在使用的客户机配置错误的主机。 localhost:2181通常是动物园管理员服务器。

为您的客户的工作,你需要设置bootstrap_servers到卡夫卡代理主机名和端口。这是默认localhost:9092

https://kafka-python.readthedocs.io/en/latest/apidoc/KafkaProducer.html

© www.soinside.com 2019 - 2024. All rights reserved.