Kafka的fetch.max.bytes和max.partition.fetch.bytes不起作用

问题描述 投票:0回答:1

根据文档,fetch.max.bytes 和 max.partition.fetch.bytes 应该限制 Kafka 客户端消耗的消息的最大大小。

我将“max.poll.records”设置为“1”,因为文档指出第一批的大小不受限制。我将“fetch.max.bytes”和“max.partition.fetch.bytes”设置为“1”。我使用默认代理和主题属性。

当产生大消息时,尽管有文档,它们仍被成功消费。我假设如果消息超出大小,代理应该抛出异常,或者应该忽略此类消息。相反,这个消费者接收并处理大消息。

我尝试运行以下代码。

package io.conduktor.demos.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerDemo {
    private static final Logger log = LoggerFactory.getLogger(ConsumerDemo.class);

    public static void main(String[] args) {
        log.info("I am a Kafka Consumer");

        String bootstrapServers = "127.0.0.1:9092";
        String groupId = "my-fifth-application";
        String topic = "demo_java";

        // create consumer configs
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
        properties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "1"); 
        properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "1");

        // create consumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        try {
            // subscribe consumer to our topic(s)
            consumer.subscribe(Arrays.asList(topic));

            // poll for new data
            while (true) {
                ConsumerRecords<String, String> records =
                        consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {
                    log.info("Key: " + record.key() + ", Value: " + record.value());
                    log.info("Partition: " + record.partition() + ", Offset:" + record.offset());
                }
            }

        } catch (WakeupException e) {
            log.info("Wake up exception!");
            // we ignore this as this is an expected exception when closing a consumer
        } catch (Exception e) {
            log.error("Unexpected exception", e);
        } finally {
            consumer.close(); // this will also commit the offsets if need be.
            log.info("The consumer is now gracefully closed.");
        }

    }
}

并通过 kafka-console- Producer 发送重要消息。

java apache-kafka kafka-consumer-api
1个回答
0
投票

这是预期的行为。

fetch.max.bytes
max.partition.fetch.bytes
的文档都指出(强调我的):

消费者批量获取记录,并且 如果第一个 提取的第一个非空分区中的记录批次较大 超过这个值,记录批次仍然会被返回以确保 消费者可以取得进步。

© www.soinside.com 2019 - 2024. All rights reserved.