我正在从事 Google 数据流工作,我正在使用 apache beam
ReadFromKafka
来消费主题消息。我正在消耗 4 个主题。管道曾经工作正常,在我们向 kafka 集群添加一个新代理并触发重新平衡后,消费者开始在一个特定主题上失败,但成功地继续为该主题工作
其他 3 个主题。
The error is: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition topic_name-0 could be determined
这是我的代码:
(
pcoll
| f"Read From Kafka {self.transf_label}"
>> ReadFromKafka(
consumer_config=self.kafka_consumer_config,
topics=_topic_list,
with_metadata=True,
)
| f"Extract KVT {self.transf_label}" >> Map(extract_key_value_topic)
| f"Decode messages {self.transf_label}" >> ParDo(DecodeKafkaMessage())
)
在日志中我可以看到
[Consumer clientId=consumer-group-dummy-4, groupId=consumer-group-dummy] Subscribed to partition(s): topic_name-0
但几秒钟后,它因超时错误而失败,同时不断拉取其他主题的消息
这是配置:
{
"bootstrap.servers": BROKERS,
"security.protocol": SECURITY_PROTOCOL,
"sasl.mechanism": SASL_MECHANISM,
"group.id": "consumer-group-dummy",
"session.timeout.ms": "60000",
"sasl.jaas.config": f'org.apache.kafka.common.security.scram.ScramLoginModule required username="{SASL_USERNAME}" password="{SASL_PASSWORD}";',
"auto.offset.reset": "latest",
}
需要考虑的一件事是我们的其他 Kafka 消费者应用程序没有抛出任何问题,这种行为只有在 apache beam pipeline 中才会被注意到。此外,当我尝试使用 Kafka 汇合库和简单的 python 脚本在本地触发具有相同配置和相同消费者组的消费者时,它似乎工作正常并拉取消息。
我在 Apache flink 中发现了这个 bug,在 Java 的 Beam Kafka 连接器中是否也一样?
我的问题是新添加的代理没有在内部路由,因此应用程序很难到达持有导致超时的分区的代理。
请注意,此问题可能是由其他几个因素引起的,以下是我在搜索时遇到的一些因素:
其他因素:
过时的元数据:
控制节点负责更新主题元数据。如果主题分区的领导层发生了移动,如果消费者不知道哪个代理是该分区的当前领导层,则消费者将无法读取它。过时的元数据意味着控制器节点没有完成其工作,应该被替换。 zkCli.sh 删除 /controller 以强制选举新控制器
集群缩放/除垢:
详情这里