Kafka 生产者超时异常:1 条记录即将过期

问题描述 投票:0回答:5

我将 Kafka 与 Spring-boot 一起使用:

Kafka 生产者类

@Service
public class MyKafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private static Logger LOGGER = LoggerFactory.getLogger(NotificationDispatcherSender.class);

    // Send Message
    public void sendMessage(String topicName, String message) throws Exception {
        LOGGER.debug("========topic Name===== " + topicName + "=========message=======" + message);
        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topicName, message);
        result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOGGER.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(Throwable ex) {
                LOGGER.error(Constants.PRODUCER_MESSAGE_EXCEPTION.getValue() + " : " + ex.getMessage());
            }
        });
    }
}

Kafka 配置:

spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=100000
spring.kafka.producer.request.timeout.ms=30000
spring.kafka.producer.linger.ms=10
spring.kafka.producer.acks=0
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.max.block.ms=5000
spring.kafka.bootstrap-servers=192.168.1.161:9092,192.168.1.162:9093

假设我已在主题

my-test-topic
中发送了 10 次 1000 条消息。

十分之八我成功地获取了消费者中的所有消息,但有时我会收到以下信息错误

2017-10-05 07:24:11, [ERROR] [my-service - LoggingProducerListener - onError:76] Exception thrown when sending a message with key='null' and payload='{"deviceType":"X","deviceKeys":[{"apiKey":"X-X-o"}],"devices...' to topic my-test-topic

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for my-test-topic-4 due to 30024 ms has passed since batch creation plus linger time

apache-kafka kafka-consumer-api kafka-producer-api spring-kafka
5个回答
20
投票

有3种可能:

  1. 增加
    request.timeout.ms
    - 这是 Kafka 等待整个批次在缓冲区中准备就绪的时间。因此,在您的情况下,如果缓冲区中的消息少于 100 000 条,则会发生超时。更多信息在这里:https://stackoverflow.com/a/34794261/2707179
  2. 减少
    batch-size
    - 与上一点相关,它将更频繁地发送批次,但它们将包含更少的消息。
  3. 根据消息大小,您的网络可能无法满足高负载?检查您的吞吐量是否不是瓶颈。

2
投票

就我而言,我有

replication factor < min.insync.replicas
。写入永远无法获得足够的确认,因此超时。 使用
replication factor > min.insync.replicas
重新创建主题修复了该问题。


1
投票
  1. 错误中的第一个线索是

    30024 ms has passed
    - 配置
    spring.kafka.producer.request.timeout.ms=30000
    是相关的。这 30 秒的等待是为了填满 Producer 端的缓冲区。

  2. 当消息发布时,它会在生产者端进行缓冲,并等待 30 秒(参见上面的配置)来填满。

    spring.kafka.producer.batch-size=100000
    表示 100KB,因此如果消息摄取负载较低,并且缓冲区在 30 秒内没有被更多消息填满至 100KB,您会期望收到此消息。

  3. spring.kafka.producer.linger.ms=10
    用于摄取负载较高并且生产者希望限制对 Kafka 代理的
    send()
    调用。这是在批次准备好后(即在缓冲区填充到批次大小为 100KB 后)生产者在向代理发送消息之前等待的持续时间。

解决方案:

  • 增加
    linger.ms
    可在批次准备就绪后将消息保留更长时间。如果需要更多时间来填充批次,请增加
    request.timeout.ms
  • 另一种方法:减少
    batch-size
    ,或增加
    request.timeout.ms
    ,或两者兼而有之。

0
投票

一个可能的原因是Kafka服务器达到了最大堆大小限制(-Xmx)。在高负载下,当垃圾收集器运行时,可能会导致服务器需要更长的时间来处理请求。 就我而言,在增加 Kafka 的最大内存限制后,错误得到解决。


-1
投票

我通过使用 DNS 正确寻址主机 spring.kafka.bootstrap-servers 解决了这个问题。即使网络解析了IP地址,似乎也需要DNS。

© www.soinside.com 2019 - 2024. All rights reserved.