所以我是相当新的卡夫卡。我试图运行一个简单的卡夫卡消费者和生产者。当我运行我的消费者之前在for循环你好正确打印。但迟迟没有打印在for循环,导致我相信它不会进入在首位环和消费者不消耗来自生产者的消息。我是一个Linux系统上运行此。
谁能给上什么可能是错误的生产者或消费者的意见?我有我的显示生产者和消费者的代码都是唯一的几行代码。
这是我的制片人:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:2181',api_version=(1,0,1))
producer.send('MyFirstTopic1', 'Hello, World!')
这是我的消费者:
from kafka import KafkaConsumer,KafkaProducer,TopicPartition,OffsetAndMetadata
consumer = KafkaConsumer(
bootstrap_servers=['localhost:2181'],api_version=(1,0,1),
group_id=None,
enable_auto_commit=False,
auto_offset_reset='smallest'
)
consumer.subscribe('MyFirstTopic1',0)
print("hello")
for message in consumer:
print(message)
所以运行我的制片人,当它最终给出了一个error.Anyone知道这意味着什么,如果这是可能的什么是错的。
File "producer.py", line 3, in <module>
producer.send('MyFirstTopic1', 'Hello, World!')
File "/usr/local/lib/python3.5/site-packages/kafka/producer/kafka.py", line 543, in send
self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)
File "/usr/local/lib/python3.5/site-packages/kafka/producer/kafka.py", line 664, in _wait_on_metadata
"Failed to update metadata after %.1f secs." % max_wait)
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
它看起来像你正在使用的客户机配置错误的主机。 localhost:2181
通常是动物园管理员服务器。
为您的客户的工作,你需要设置bootstrap_servers
到卡夫卡代理主机名和端口。这是默认localhost:9092
。
见https://kafka-python.readthedocs.io/en/latest/apidoc/KafkaProducer.html