Kafka - org.apache.kafka.common.errors.NetworkException

问题描述 投票:5回答:1

我有一个kafka客户端代码连接到Kafka(服务器0.10.1和客户端是0.10.2)代理。代码中有2个主题和2个不同的消费者组,还有一个生产者。偶尔从生产者代码中获取NetworkException(一次在2天内,一次在5天内,......)。我们看到消费者组(Re)在日志中加入了消费者组的信息,然后是生产者future.get()调用中的NetworkException。不知道为什么我们会收到此错误。

代码: -

final Future<RecordMetadata> futureResponse = 
producer.send(new ProducerRecord<>("ping_topic", "ping"));  
futureResponse.get();

例外: -

org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:70)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:57)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)

NetworkException的Kafka API定义,

“在发出请求时发生了与网络相关的错误IOException。这可能是因为客户端的元数据已经过时,它正在向一个现在已经死亡的节点发出请求。”

谢谢

apache-kafka kafka-producer-api
1个回答
1
投票

我在测试Kafka Consumer时遇到了同样的错误。我正在使用发件人模板。在消费者配置中,我另外设置了以下属性:

 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);

发送消息后我添加了一个线程睡眠:

   ListenableFuture<SendResult<String, String>> future =
    senderTemplate.send(MyConsumer.TOPIC_NAME, jsonPayload);

  Thread.Sleep(10000). 

有必要使测试工作,但可能不适合您的情况。

© www.soinside.com 2019 - 2024. All rights reserved.