我有一个kafka客户端代码连接到Kafka(服务器0.10.1和客户端是0.10.2)代理。代码中有2个主题和2个不同的消费者组,还有一个生产者。偶尔从生产者代码中获取NetworkException(一次在2天内,一次在5天内,......)。我们看到消费者组(Re)在日志中加入了消费者组的信息,然后是生产者future.get()调用中的NetworkException。不知道为什么我们会收到此错误。
代码: -
final Future<RecordMetadata> futureResponse =
producer.send(new ProducerRecord<>("ping_topic", "ping"));
futureResponse.get();
例外: -
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:70)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:57)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
NetworkException的Kafka API定义,
“在发出请求时发生了与网络相关的错误IOException。这可能是因为客户端的元数据已经过时,它正在向一个现在已经死亡的节点发出请求。”
谢谢
我在测试Kafka Consumer时遇到了同样的错误。我正在使用发件人模板。在消费者配置中,我另外设置了以下属性:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
发送消息后我添加了一个线程睡眠:
ListenableFuture<SendResult<String, String>> future =
senderTemplate.send(MyConsumer.TOPIC_NAME, jsonPayload);
Thread.Sleep(10000).
有必要使测试工作,但可能不适合您的情况。