卡夫卡生产者不接新分区

问题描述 投票:3回答:1

我是新来的卡夫卡,并正尝试建立一个服务来服务于IT消息platofrm。这里是我的设置:

卡夫卡0.9.0.1 动物园管理员3.4.8 kafka-python 1.3.3

我的应用程序创建从我发送消息流于单一话题6个分区的KafkaProducer。我还创建7个KafkaConsumers(单个group_id下,其中6获得分配给6个分区和一个被留在空闲状态(预期)。虽然生产者是流,我增加分区计数到7,用期望流不会在7个分区分布,并会醒来的空闲的消费者。但是,这似乎是生产者不拿起新添加的分区,直到我通过重新启动应用程序重新初始化它。我扩展分区通过运行这个数:

kafka-topics --alter --zookeeper localhost:2181 --topic test --partitions 7

有没有办法为生产者拿起无需重新初始化它的分区计数的变化?

我这里还有相关的代码片段:

制片人

class Producer(threading.Thread):
daemon = True

def __init__(self, name, manager):
    super(Producer, self).__init__()
    self.producer = KafkaProducer(bootstrap_servers='localhost:9092')

def run(self):
    while not self.killed:
        if not self.q.empty():
            self._busy()
            self.producer.send('test', value=self.q.get())
        else:
            self._free()

消费者

class Consumer(threading.Thread):
    daemon = True

    def __init__(self, name, manager):
        super(Consumer, self).__init__()
        self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 group_id='test_group',
                                 client_id="Consumer " + self.name)
        self.consumer.subscribe(['test'])

    def run(self):
        while not self.killed:
            messages = self.consumer.poll()

            for topic, records in messages.iteritems():
                print self.consumer.config['client_id'] + ": " + str(records)
python apache-kafka kafka-consumer-api kafka-producer-api kafka-python
1个回答
0
投票

我遇到了一个可能类似的问题,并能找到解决方案。我在这里写的:How does librdkafka producer learn about new topic partitions in Kafka

如果您的测试时间太短,这可能就是生产者未了解新分区的原因。参数topic.metadata.refresh.interval.ms为300000(毫秒)默认情况下,这样的经纪人将在每5分钟刷新生产者的元数据。如果您的测试添加分区后,花了5分钟以上,那么这是不是原因。

© www.soinside.com 2019 - 2024. All rights reserved.