我的 Apache Kafka 生产者 (0.9.0.1) 间歇性地抛出一个
org.apache.kafka.common.errors.NotLeaderForPartitionException
我执行 Kafka 发送的代码类似于此
final Future<RecordMetadata> futureRecordMetadata = KAFKA_PRODUCER.send(new ProducerRecord<String, String>(kafkaTopic, UUID.randomUUID().toString(), jsonMessage));
try {
futureRecordMetadata.get();
} catch (final InterruptedException interruptedException) {
interruptedException.printStackTrace();
throw new RuntimeException("sendKafkaMessage(): Failed due to InterruptedException(): " + sourceTableName + " " + interruptedException.getMessage());
} catch (final ExecutionException executionException) {
executionException.printStackTrace();
throw new RuntimeException("sendKafkaMessage(): Failed due to ExecutionException(): " + sourceTableName + " " + executionException.getMessage());
}
我在
NotLeaderForPartitionException
块内抓住了 catch (final ExecutionException executionException) {}
。
可以忽略这个特定的异常吗?
我的Kafka消息发送成功了吗?
如果您收到
NotLeaderForPartitionException
,则您的数据未写入成功。
每个主题分区由一个或多个 Broker 存储(其中一个领导者;其余代理称为追随者),具体取决于您的复制因子。生产者需要向领导者 Broker 发送新消息(到追随者的数据复制发生在内部)。
您的生产者客户端未连接到正确的 Broker,即连接到跟随者而不是领导者(或者连接到不再是跟随者的代理),并且该代理拒绝您的发送请求。如果领导者发生了变化,但生产者仍然拥有关于哪个代理是分区领导者的过时的缓存元数据,就会发生这种情况。
我在 Kubernetes 中尝试使用 Kafka 集群时遇到了同样的问题。 Kubernetes 集群托管在云实例上。我尝试从我的本地机器运行Kafka生产者代码。在第一次运行期间,发生了
UnknownHostException
。我通过将 Kafka Kubernetes 服务地址添加到我的 /etc/hosts
文件中解决了这个问题。
示例:
10.160.160.60 kafka-controller-0.kafka-controller-headless.default.svc.cluster.local:9092,kafka-controller-1.kafka-controller-headless.default.svc.cluster.local:9092,kafka-controller-2.kafka-controller-headless.default.svc.cluster.local:9092
然而,我随后遇到了
NotLeaderOrFollowerException
。尽管尝试了多种解决方案,但似乎没有一个能正常工作。有时,重新启动本地应用程序可以暂时解决问题。但后续重启后问题又会出现。
最终,我将本地应用程序移至 Kafka 集群中,从而解决了问题。出现此问题的原因是我的
/etc/hosts
文件中的所有 Kafka 代理地址都指向同一 IP 地址。当生产者尝试连接 Kafka 主题领导者时,Kubernetes Proxy 将请求路由到随机 Kafka 节点,从而导致此问题。
我开始在本地 Docker 上使用单节点 Kafka。避免尝试连接到远程服务器上的 Kafka 集群。如有必要,您应该为每个 Kafka 节点开放对不同 IP:Port 组合的公共访问,并在本地应用程序配置中指定所有代理的列表。