NotEnoughReplicasException 与 min.insync.replicas 的意外行为

问题描述 投票:0回答:1

这是我之前的问题

的延续

我正在探索卡夫卡的

min.insync.replicas
,总结如下:

  1. 在本地设置3个经纪人,使用
    insync
    创建主题
    min.insync.replicas=2
  2. 消息由 kafka-console- Producer 使用
    acks=all
    生成,并由 kafka-console-consumer
  3. 读取
  4. 买下了 2 家经纪商,只留下 1 家
    insync.replicas
    ,并预计生产者会有例外,如 此处 此处

但它从未发生过,生产者正在生成消息,消费者从控制台读取消息,没有任何错误。(更多详细信息请参阅上一个问题

然后,我没有从 console- Producer 生成消息,而是编写了一个与控制台生产者配置相同的 java 生产者,最后得到了以下异常。

错误 [Broker 0 上的副本管理器]:处理分区 insync-0 上的追加操作时出错 (kafka.server.ReplicaManager) org.apache.kafka.common.errors.NotEnoughReplicasException:分区 [insync,0] 的同步副本数量为 1 ,低于所需的最小值 2

虽然我期望它来自生产者(java代码),但它出现在kafka代理中

控制台生产者命令

 ./kafka-console-producer.sh --broker-list localhost:9092 --topic insync --producer.config ../config/producer.properties

kafka-console-生产者属性:

bootstrap.servers=localhost:9092,localhost:9093,localhost:9094
compression.type=none
batch.size=20
acks=all

Java 生产者代码:

public static void main(String[] args) {
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(producerConfigs());

        try {

            int count = 0;
            while (true) {
                ProducerRecord<String, String> record = new ProducerRecord<String, String>("insync",
                        "test message: " + count);
                kafkaProducer.send(record);

                Thread.sleep(3000);
                count++;
            }
        } catch (Exception e) {

            e.printStackTrace();
        } finally {
            kafkaProducer.close();
        }
    }

    private static Properties producerConfigs() {
        Properties properties = new Properties();

        properties.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        properties.put("acks", "all");

        return properties;
    }

这给我带来了更多问题。

  1. 为什么在运行java Producer时会发生这种情况,而不是在控制台生产者中发生。?
  2. 为什么异常发生在broker而不是生产者(java代码)? min.insync.replicas
    文档

如果无法满足此最小值,则生产者将引发异常(NotEnoughReplicas 或 NotEnoughReplicasAfterAppend)

这种情况下kafka如何保证可靠性?

apache-kafka kafka-consumer-api kafka-producer-api
1个回答
2
投票

当使用

acks=all
生产同步副本数少于
min.insync.replicas
的主题时,生产者应该获得
NotEnoughReplicas

您没有看到此行为的原因是控制台生产者命令和 Java 代码都存在问题。

1。控制台制作人

要在

acks=all
中启用
kafka-console-producer.sh
,您需要指定
--request-required-acks
标志:

./kafka-console-producer.sh --broker-list localhost:9092 \
    --topic insync --request-required-acks all

这是因为

--request-required-acks
标志优先于通过
--producer.config
指定的值,并且默认为
1

2。 Java代码

您粘贴的代码应该无法发送任何消息,但按照当前的逻辑,您应该只会收到

WARN
日志消息,例如:

在主题分区上生成相关 ID 为 15 的响应时出现错误,正在重试...

要在代码中收到通知,您需要检查

send()
的结果,方法是检查它返回的
Future
或传递
Callback
作为第二个参数。也不是说
NotEnoughReplicasException
是一个可重试的异常,因此对于最新的客户端,默认情况下,它将永远重试,而不是通知调用代码。

例如:

Properties configs = new Properties();
configs.put("bootstrap.servers", "localhost:9092");
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("retries", "5");
configs.put("acks", "all");

try (KafkaProducer<String, String> producer = new KafkaProducer<>(configs)) {
    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "value");
    producer.send(record, (RecordMetadata metadata, Exception exc) -> {
        if (exc != null) {
            System.err.println(exc);
        }
    });
}

当主题低于最小 ISR 时,生产者将重试 5 次才记录失败。然后它会调用带有异常的 lambda,所以你会得到:

org.apache.kafka.common.errors.NotEnoughReplicasException:消息被拒绝,因为同步副本数量少于所需数量。


所以总而言之,

min.insync.replicas
已正确处理,但您需要小心地将正确的参数传递给工具,并在 Java 逻辑中正确处理异常。

© www.soinside.com 2019 - 2024. All rights reserved.